3.1.1 协议版本的异步 MQTT 客户端。
项目描述
关于
3.1.1 协议版本的异步 MQTT 客户端。
安装
推荐方式(通过 pip):
$ pip install aio-mqtt
例子
简单的回显服务器:
import asyncio as aio
import logging
import typing as ty
import aio_mqtt
logger = logging.getLogger(__name__)
class EchoServer:
def __init__(
self,
reconnection_interval: int = 10,
loop: ty.Optional[aio.AbstractEventLoop] = None
) -> None:
self._reconnection_interval = reconnection_interval
self._loop = loop or aio.get_event_loop()
self._client = aio_mqtt.Client(loop=self._loop)
self._tasks = [
self._loop.create_task(self._connect_forever()),
self._loop.create_task(self._handle_messages())
]
async def close(self) -> None:
for task in self._tasks:
if task.done():
continue
task.cancel()
try:
await task
except aio.CancelledError:
pass
if self._client.is_connected():
await self._client.disconnect()
async def _handle_messages(self) -> None:
async for message in self._client.delivered_messages('in'):
while True:
try:
await self._client.publish(
aio_mqtt.PublishableMessage(
topic_name='out',
payload=message.payload,
qos=aio_mqtt.QOSLevel.QOS_1
)
)
except aio_mqtt.ConnectionClosedError as e:
logger.error("Connection closed", exc_info=e)
await self._client.wait_for_connect()
continue
except Exception as e:
logger.error("Unhandled exception during echo message publishing", exc_info=e)
break
async def _connect_forever(self) -> None:
while True:
try:
connect_result = await self._client.connect('localhost')
logger.info("Connected")
await self._client.subscribe(('in', aio_mqtt.QOSLevel.QOS_1))
logger.info("Wait for network interruptions...")
await connect_result.disconnect_reason
except aio.CancelledError:
raise
except aio_mqtt.AccessRefusedError as e:
logger.error("Access refused", exc_info=e)
except aio_mqtt.ConnectionLostError as e:
logger.error("Connection lost. Will retry in %d seconds", self._reconnection_interval, exc_info=e)
await aio.sleep(self._reconnection_interval, loop=self._loop)
except aio_mqtt.ConnectionCloseForcedError as e:
logger.error("Connection close forced", exc_info=e)
return
except Exception as e:
logger.error("Unhandled exception during connecting", exc_info=e)
return
else:
logger.info("Disconnected")
return
if __name__ == '__main__':
logging.basicConfig(
level='DEBUG'
)
loop = aio.new_event_loop()
server = EchoServer(reconnection_interval=10, loop=loop)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(server.close())
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
执照
版权所有 2019-2020 不仅仅是玩具公司。
根据 Apache 许可证 2.0 版(“许可证”)获得许可;除非遵守许可,否则您不得使用此文件。您可以在以下网址获取许可证的副本
除非适用法律要求或书面同意,否则根据许可分发的软件将按“原样”分发,没有任何明示或暗示的保证或条件。有关许可下的特定语言管理权限和限制,请参阅许可。
项目详情
下载文件
下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。
源分布
aio-mqtt-0.2.0.tar.gz
(11.8 kB
查看哈希)
内置分布
aio_mqtt-0.2.0-py3-none-any.whl
(18.3 kB
查看哈希)