Skip to main content

3.1.1 协议版本的异步 MQTT 客户端。

项目描述

关于

3.1.1 协议版本的异步 MQTT 客户端。

安装

推荐方式(通过 pip):

$ pip install aio-mqtt

例子

简单的回显服务器:

import asyncio as aio
import logging
import typing as ty

import aio_mqtt

logger = logging.getLogger(__name__)


class EchoServer:

    def __init__(
            self,
            reconnection_interval: int = 10,
            loop: ty.Optional[aio.AbstractEventLoop] = None
    ) -> None:
        self._reconnection_interval = reconnection_interval
        self._loop = loop or aio.get_event_loop()
        self._client = aio_mqtt.Client(loop=self._loop)
        self._tasks = [
            self._loop.create_task(self._connect_forever()),
            self._loop.create_task(self._handle_messages())
        ]

    async def close(self) -> None:
        for task in self._tasks:
            if task.done():
                continue
            task.cancel()
            try:
                await task
            except aio.CancelledError:
                pass
        if self._client.is_connected():
            await self._client.disconnect()

    async def _handle_messages(self) -> None:
        async for message in self._client.delivered_messages('in'):
            while True:
                try:
                    await self._client.publish(
                        aio_mqtt.PublishableMessage(
                            topic_name='out',
                            payload=message.payload,
                            qos=aio_mqtt.QOSLevel.QOS_1
                        )
                    )
                except aio_mqtt.ConnectionClosedError as e:
                    logger.error("Connection closed", exc_info=e)
                    await self._client.wait_for_connect()
                    continue

                except Exception as e:
                    logger.error("Unhandled exception during echo message publishing", exc_info=e)

                break

    async def _connect_forever(self) -> None:
        while True:
            try:
                connect_result = await self._client.connect('localhost')
                logger.info("Connected")

                await self._client.subscribe(('in', aio_mqtt.QOSLevel.QOS_1))

                logger.info("Wait for network interruptions...")
                await connect_result.disconnect_reason
            except aio.CancelledError:
                raise

            except aio_mqtt.AccessRefusedError as e:
                logger.error("Access refused", exc_info=e)

            except aio_mqtt.ConnectionLostError as e:
                logger.error("Connection lost. Will retry in %d seconds", self._reconnection_interval, exc_info=e)
                await aio.sleep(self._reconnection_interval, loop=self._loop)

            except aio_mqtt.ConnectionCloseForcedError as e:
                logger.error("Connection close forced", exc_info=e)
                return

            except Exception as e:
                logger.error("Unhandled exception during connecting", exc_info=e)
                return

            else:
                logger.info("Disconnected")
                return


if __name__ == '__main__':
    logging.basicConfig(
        level='DEBUG'
    )
    loop = aio.new_event_loop()
    server = EchoServer(reconnection_interval=10, loop=loop)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    finally:
        loop.run_until_complete(server.close())
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

执照

版权所有 2019-2020 不仅仅是玩具公司。

根据 Apache 许可证 2.0 版(“许可证”)获得许可;除非遵守许可,否则您不得使用此文件。您可以在以下网址获取许可证的副本

http://www.apache.org/licenses/LICENSE-2.0

除非适用法律要求或书面同意,否则根据许可分发的软件将按“原样”分发,没有任何明示或暗示的保证或条件。有关许可下的特定语言管理权限和限制,请参阅许可。

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

aio-mqtt-0.2.0.tar.gz (11.8 kB 查看哈希

已上传 source

内置分布

aio_mqtt-0.2.0-py3-none-any.whl (18.3 kB 查看哈希

已上传 py3