Skip to main content

MQTT 版本 5.0/3.1.1 客户端类

项目描述

本文档描述了Eclipse Paho MQTT Python 客户端库的源代码,它实现了 MQTT 协议的 5.0、3.1.1 和 3.1 版本。

此代码提供了一个客户端类,使应用程序能够连接到MQTT代理以发布消息、订阅主题和接收发布的消息。它还提供了一些帮助函数,可以非常简单地将一次性消息发布到 MQTT 服务器。

它支持 Python 2.7.9+ 或 3.6+。

MQTT 协议是一种机器对机器 (M2M)/“物联网”连接协议。设计为极其轻量级的发布/订阅消息传输,它对于与需要小代码足迹和/或网络带宽非常宝贵的远程位置的连接非常有用。

Paho 是一个Eclipse 基金会项目。

内容

安装

Python 包索引 (PyPi) 中提供了最新的稳定版本,可以使用

pip install paho-mqtt

或使用virtualenv

virtualenv paho-mqtt
源 paho-mqtt/bin/激活
pip install paho-mqtt

要获取完整的代码,包括示例和测试,您可以克隆 git 存储库:

git clone https://github.com/eclipse/paho.mqtt.python

获得代码后,也可以从您的存储库中安装它:

cd paho.mqtt.python
python setup.py install

要执行所有测试(包括 MQTT v5 测试),您还需要在 paho.mqtt.python 文件夹中克隆 paho.mqtt.testing:

git clone https://github.com/eclipse/paho.mqtt.testing.git

已知限制

以下是已知的未实现的 MQTT 功能。

当 clean_session 为 False 时,会话只存储在内存中而不是持久化。这意味着当客户端重新启动时(不仅仅是重新连接,通常因为程序重新启动而重新创建对象)会话丢失。这可能导致消息丢失。

客户端会话的以下部分丢失:

  • 已从服务器收到但尚未完全确认的 QoS 2 消息。

    由于客户端会盲目地确认任何 PUBCOMP(QoS 2 事务的最后一条消息),它不会挂起但会丢失此 QoS 2 消息。

  • 已发送到服务器但尚未完全确认的 QoS 1 和 QoS 2 消息。

    这意味着传递给 publish() 的消息可能会丢失。这可以通过注意传递给 publish() 的所有消息都有相应的 on_publish() 调用来缓解。

    这也意味着代理可能在会话中有 Qos2 消息。由于客户端从一个空会话开始,它不知道它并将重新使用中间。这还没有解决。

此外,当 clean_session 为 True 时,此库将通过网络重新连接重新发布 QoS > 0 消息。这意味着 QoS > 0 消息不会丢失。但是标准说如果我们应该丢弃任何发送发布数据包的消息。我们的选择意味着我们不符合标准,并且可能会收到两次 QoS 2。如果您只需要一次交付的 QoS 2 保证,您应该 clean_session = False。

用法和 API

详细的 API 文档可通过pydoc获得。示例目录中提供了示例

该包提供了两个模块,一个完整的客户端和一个用于简单发布的助手。

入门

这是一个非常简单的示例,它订阅了代理 $SYS 主题树并打印出结果消息:

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("$SYS/#")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("mqtt.eclipseprojects.io", 1883, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

客户

您可以将客户端类用作实例、在类中或通过子类化。一般使用流程如下:

  • 创建客户端实例

  • 使用其中一个connect*()函数连接到代理

  • 调用其中一个loop*()函数来维护与代理的网络流量

  • 使用subscribe()订阅主题并接收消息

  • 使用publish()将消息发布到代理

  • 使用disconnect()断开与代理的连接

将调用回调以允许应用程序根据需要处理事件。这些回调如下所述。

构造函数/重新初始化

客户()
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311, transport="tcp")

Client()构造函数采用以下参数:

client_id

连接到代理时使用的唯一客户端 ID 字符串。如果 client_id的长度为零或None,则将随机生成一个。在这种情况下,clean_session参数必须为True

清洁会话

确定客户端类型的布尔值。如果为True,代理将在断开连接时删除有关此客户端的所有信息。如果为False,则客户端是持久客户端,并且在客户端断开连接时将保留订阅信息和排队消息。

请注意,客户端永远不会在断开连接时丢弃自己的传出消息。调用 connect() 或 reconnect() 将导致消息被重新发送。使用 reinitialise() 将客户端重置为其原始状态。

用户数据

作为用户数据参数传递给回调任何类型的用户定义数据。稍后可能会使用 user_data_set()函数对其进行更新。

协议

用于此客户端的 MQTT 协议的版本。可以是 MQTTv31MQTTv311MQTTv5

运输

设置为“websockets”以通过 WebSockets 发送 MQTT。保留默认的“tcp”以使用原始 TCP。

构造函数示例
import paho.mqtt.client as mqtt

mqttc = mqtt.Client()
重新初始化()
reinitialise(client_id="", clean_session=True, userdata=None)

reinitialise()函数将客户端重置为其起始状态,就好像它刚刚被创建一样。它采用与Client()构造函数相同的参数。

重新初始化示例
mqttc.reinitialise()

选项功能

这些函数表示可以在客户端上设置以修改其行为的选项。在大多数情况下,这必须在连接到代理之前完成。

max_inflight_messages_set()
max_inflight_messages_set(self, inflight)

设置 QoS>0 的最大消息数,这些消息可以同时通过其网络流。

默认为 20。增加此值将消耗更多内存,但可以增加吞吐量。

max_queued_messages_set()
max_queued_messages_set(self, queue_size)

设置可以在传出消息队列中挂起的 QoS>0 的传出消息的最大数量。

默认为 0。0 表示无限制,但由于目前的实施限制为 65555(队列中的 65535 条消息 + 20 条正在运行的消息)。当队列已满时,将丢弃任何进一步的传出消息。

message_retry_set()
message_retry_set(retry)

如果代理没有响应,则在重试 QoS>0 的消息之前设置时间(以秒为单位)。

默认情况下设置为 5 秒,通常不需要更改。

ws_set_options()
ws_set_options(self, path="/mqtt", headers=None)

设置 websocket 连接选项。只有在将transport="websockets"传递给Client()构造函数时才会使用这些选项。

小路

在代理上使用的 mqtt 路径。

标题

指定应附加到标准 websocket 标头的额外标头列表的字典,或者采用普通 websocket 标头并返回带有一组标头的新字典的可调用项以连接到代理。

必须在connect*()之前调用。示例文件夹中提供了如何将其与 AWS IoT 平台一起使用的示例。

tls_set()
tls_set(ca_certs=None, certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED,
    tls_version=ssl.PROTOCOL_TLS, ciphers=None)

配置网络加密和身份验证选项。启用 SSL/TLS 支持。

ca_certs

证书颁发机构证书文件的字符串路径,该文件被此客户端视为信任。如果这是给定的唯一选项,那么客户端将以与 Web 浏览器类似的方式运行。也就是说,它将要求代理拥有由ca_certs中的证书颁发机构签署的证书,并将使用 TLS v1.2 进行通信,但不会尝试任何形式的身份验证。这提供了基本的网络加密,但可能还不够,具体取决于代理的配置方式。默认情况下,在 Python 2.7.9+ 或 3.4+ 上,使用系统默认的证书颁发机构。在较旧的 Python 版本上,此参数是必需的。

证书文件,密钥文件

分别指向 PEM 编码的客户端证书和私钥的字符串。如果这些参数不是None那么它们将用作基于 TLS 的身份验证的客户端信息。对此功能的支持取决于代理。请注意,如果这些文件中的任何一个处于加密状态并且需要密码来解密,Python 将在命令行中询问密码。目前无法定义回调来提供密码。

cert_reqs

定义客户端对代理的证书要求。默认情况下这是ssl.CERT_REQUIRED,这意味着代理必须提供证书。有关此参数的更多信息,请参阅 ssl pydoc。

tls_version

指定要使用的 SSL/TLS 协议的版本。默认情况下(如果 python 版本支持它)检测到最高 TLS 版本。如果不可用,则使用 TLS v1.2。以前的版本(所有以 SSL 开头的版本)都是可能的,但由于可能存在安全问题,不推荐使用。

密码

一个字符串,指定此连接允许使用哪些加密密码,或者None使用默认值。有关更多信息,请参阅 ssl pydoc。

必须在connect*()之前调用。

tls_set_context()
tls_set_context(context=None)

配置网络加密和认证上下文。启用 SSL/TLS 支持。

语境

一个 ssl.SSLContext 对象。默认情况下,这由ssl.create_default_context()给出,如果可用(在 Python 3.4 中添加)。

如果您不确定是否使用此方法,请使用默认上下文或使用tls_set方法。有关更多信息,请参阅有关安全注意事项的 ssl 模块文档部分。

必须在connect*()之前调用。

tls_insecure_set()
tls_insecure_set(value)

在服务器证书中配置服务器主机名的验证。

如果value设置为True,则无法保证您连接的主机没有模拟您的服务器。这在初始服务器测试中很有用,但例如,恶意第三方可以通过 DNS 欺骗来冒充您的服务器。

不要在实际系统中使用此功能。将值设置为 True 意味着使用加密毫无意义。

必须在connect*()之前和tls_set()tls_set_context()之后调用。

enable_logger()
enable_logger(logger=None)

使用标准 python 日志包启用日志记录(参见 PEP 282)。这可以与on_log回调方法同时使用。

如果指定了logger,则将使用该logging.Logger对象,否则将自动创建一个。

根据以下映射将 Paho 日志记录级别转换为标准级别:

帕霍

日志记录

MQTT_LOG_ERR

logging.ERROR

MQTT_LOG_WARNING

记录。警告

MQTT_LOG_NOTICE

logging.INFO (没有直接等价物)

MQTT_LOG_INFO

日志记录.INFO

MQTT_LOG_DEBUG

日志记录.DEBUG

disable_logger()
disable_logger()

使用标准 python 日志包禁用日志记录。这对on_log回调没有影响。

username_pw_set()
username_pw_set(username, password=None)

为代理身份验证设置用户名和密码(可选)。必须在connect*()之前调用。

用户数据集()
user_data_set(userdata)

设置生成事件时将传递给回调的私有用户数据。将其用于您自己的目的以支持您的应用程序。

will_set()
will_set(topic, payload=None, qos=0, retain=False)

设置要发送给经纪人的遗嘱。如果客户端在没有调用 disconnect()的情况下断开连接,代理将代表它发布消息。

话题

遗嘱消息应该发布的主题。

有效载荷

作为遗嘱发送的信息。如果未给出,或设置为None ,则将使用零长度消息。传递 int 或 float 将导致有效负载转换为表示该数字的字符串。如果您希望发送真正的 int/float,请使用struct.pack()创建您需要的有效负载。

服务质量

用于遗嘱的服务质量水平。

保持

如果设置为True,遗嘱消息将被设置为主题的“最后一次已知良好”/保留的消息。

如果qos不是 0、1 或 2,或者主题None或字符串长度为零,则引发ValueError 。

reconnect_delay_set
reconnect_delay_set(min_delay=1, max_delay=120)

客户端会自动重试连接。在每次尝试之间,它将在min_delaymax_delay之间等待几秒钟。

当连接丢失时,最初重新连接尝试会延迟 min_delay秒。在随后的尝试到max_delay之间它加倍。

当连接完成时延迟重置为min_delay(例如接收到 CONNACK,而不仅仅是建立 TCP 连接)。

连接/重新连接/断开

连接()
connect(host, port=1883, keepalive=60, bind_address="")

connect()函数将客户端连接到代理。这是一个阻塞函数。它采用以下参数:

主持人

远程代理的主机名或 IP 地址

港口

要连接的服务器主机的网络端口。默认为 1883。请注意,基于 SSL/TLS 的 MQTT 的默认端口是 8883,因此如果您使用 tls_set()tls_set_context(),端口可能需要手动提供

活着

与代理通信之间允许的最大时间间隔(以秒为单位)。如果没有其他消息正在交换,这将控制客户端向代理发送 ping 消息的速率

绑定地址

将此客户端绑定到的本地网络接口的 IP 地址,假设存在多个接口

打回来

当客户端收到来自代理的 CONNACK 消息以响应连接时,它会生成一个on_connect()回调。

连接示例
mqttc.connect("mqtt.eclipseprojects.io")
连接异步()
connect_async(host, port=1883, keepalive=60, bind_address="")

loop_start()结合使用以非阻塞方式连接。在调用loop_start()之前,连接不会完成。

回调(连接)

当客户端收到来自代理的 CONNACK 消息以响应连接时,它会生成一个on_connect()回调。

连接服务器()
connect_srv(domain, keepalive=60, bind_address="")

使用 SRV DNS 查找连接到代理以获取代理地址。接受以下参数:

领域

用于搜索 SRV 记录的 DNS 域。如果None,尝试确定本地域名。

有关keepalivebind_address 参数的描述,请参见connect() 。

回调 (connect_srv)

当客户端收到来自代理的 CONNACK 消息以响应连接时,它会生成一个on_connect()回调。

SRV 连接示例
mqttc.connect_srv("eclipse.org")
重新连接()
reconnect()

使用之前提供的详细信息重新连接到代理。您必须在调用此函数之前调用connect*() 。

回调(重新连接)

当客户端收到来自代理的 CONNACK 消息以响应连接时,它会生成一个on_connect()回调。

断开()
disconnect()

彻底断开与代理的连接。使用disconnect()不会导致代理发送遗嘱消息。

断开连接不会等待发送所有排队的消息,为确保所有消息都已传递,应使用MQTTMessageInfo中的wait_for_publish() 。有关详细信息,请参阅发布()

回调(断开)

当客户端发送断开连接消息时,它会生成一个 on_disconnect()回调。

网络环路

这些功能是客户端背后的驱动力。如果不调用它们,则不会处理传入的网络数据,并且可能无法及时发送传出的网络数据。管理网络环路有四个选项。这里描述了三个,第四个在下面的“外部事件循环支持”中。不要混合不同的循环功能。

环形()
loop(timeout=1.0, max_packets=1)

定期调用以处理网络事件。此调用在select()中等待,直到网络套接字可用于读取或写入(如果合适),然后处理传入/传出数据。此函数最多阻塞timeout 秒。timeout不得超过客户端的keepalive值,否则您的客户端将定期被代理断开连接。

max_packets参数已过时,应保持未设置

循环示例
run = True
while run:
    mqttc.loop()
loop_start() / loop_stop()
loop_start()
loop_stop(force=False)

这些函数实现了到网络循环的线程接口。在connect*()之前或之后调用 loop_start()一次,会在后台运行一个线程以自动调用loop()。这为可能阻塞的其他工作释放了主线程。此调用还处理重新连接到代理。调用loop_stop()来停止后台线程。force 参数当前被忽略。

循环启动/停止示例
mqttc.connect("mqtt.eclipseprojects.io")
mqttc.loop_start()

while True:
    temperature = sensor.blocking_read()
    mqttc.publish("paho/temperature", temperature)
循环永远()
loop_forever(timeout=1.0, max_packets=1, retry_first_connection=False)

这是网络循环的阻塞形式,在客户端调用disconnect()之前不会返回。它会自动处理重新连接。

除了使用 connect_async 时的第一次连接尝试外,使用 retry_first_connection=True使其重试第一次连接。警告:这可能会导致客户端继续连接到不存在的主机而不会失败的情况。

timeout和max_packets参数已过时,应保持未设置

出版

从客户端向代理发送消息。

发布()
publish(topic, payload=None, qos=0, retain=False)

这会导致将消息发送到代理,然后从代理发送到订阅匹配主题的任何客户端。它采用以下参数:

话题

应该发布消息的主题

有效载荷

要发送的实际消息。如果未给出,或设置为None ,将使用零长度消息。传递 int 或 float 将导致有效负载转换为表示该数字的字符串。如果您希望发送真正的 int/float,请使用struct.pack()创建您需要的有效负载

服务质量

使用的服务质量等级

保持

如果设置为True,则该消息将被设置为该主题的“最后一次正确”/保留的消息。

返回一个 MQTTMessageInfo,它公开以下属性和方法:

  • rc,发布的结果。它可以是MQTT_ERR_SUCCESS表示成功,MQTT_ERR_NO_CONN如果客户端当前没有连接,或者MQTT_ERR_QUEUE_SIZEmax_queued_messages_set用于表示消息既没有排队也没有发送。

  • mid是发布请求的消息 ID。mid 值可用于通过检查 on_publish()回调中的 mid 参数(如果已定义)来跟踪发布请求。根据您的用例,wait_for_publish可能更容易。

  • wait_for_publish()将阻塞直到消息发布。如果消息未排队(rc == MQTT_ERR_QUEUE_SIZE ),它将引发 ValueError ,如果在发布时出现错误,则会引发 RuntimeError,很可能是由于客户端未连接。

  • 如果消息已发布,is_published返回 True。如果消息未排队(rc == MQTT_ERR_QUEUE_SIZE ),它将引发 ValueError ,如果在发布时出现错误,则会引发 RuntimeError,很可能是由于客户端未连接。

如果主题为None、长度为零或无效(包含通配符),如果qos不是 0、1 或 2 之一,或者如果有效负载的长度大于 268435455 字节,则会引发ValueError 。

回调(发布)

当消息发送到代理时,将生成on_publish()回调。

订阅/取消订阅

订阅()
subscribe(topic, qos=0)

为客户订阅一个或多个主题。

可以通过三种不同的方式调用此函数:

简单字符串和整数

例如订阅(“我的/主题”, 2)

话题

一个字符串,指定要订阅的订阅主题。

服务质量

订阅所需的服务质量级别。默认为 0。

字符串和整数元组

例如订阅((“我的/主题”, 1))

话题

(topic, qos)的元组。topic 和 qos 都必须存在于元组中。

服务质量

不曾用过。

字符串和整数元组列表

例如订阅([(“我的/主题”, 0),(“另一个/主题”,2)])

这允许在单个 SUBSCRIPTION 命令中订阅多个主题,这比使用多次调用subscribe()更有效。

话题

格式(topic, qos)的元组列表。topic 和 qos 都必须存在于所有元组中。

服务质量

不曾用过。

该函数返回一个元组(result, mid),其中结果MQTT_ERR_SUCCESS表示成功,如果客户端当前未连接 ,则返回(MQTT_ERR_NO_CONN, None) 。mid是订阅请求的消息 ID。mid 值可用于通过检查on_subscribe()回调中的 mid 参数(如果已定义)来跟踪订阅请求。

如果qos不是 0、1 或 2,或者主题为None或字符串长度为零,或者主题不是字符串、元组或列表,则引发ValueError 。

回调(订阅)

当代理确认订阅后, 将生成一个on_subscribe()回调。

取消订阅()
unsubscribe(topic)

取消订阅客户的一个或多个主题。

话题

单个字符串,或作为要取消订阅的订阅主题的字符串列表。

返回一个元组(result, mid),其中resultMQTT_ERR_SUCCESS表示成功,或者(MQTT_ERR_NO_CONN, None)如果客户端当前未连接。mid是取消订阅请求的消息 ID。mid 值可用于通过检查on_unsubscribe()回调中的 mid 参数(如果已定义)来跟踪取消订阅请求。

如果主题None或字符串长度为零,或者不是字符串或列表,则引发ValueError 。

回调(取消订阅)

当代理确认取消订阅后, 将生成一个on_unsubscribe()回调。

回调

on_connect()
on_connect(client, userdata, flags, rc)

当代理响应我们的连接请求时调用。

客户

此回调的客户端实例

用户数据

Client()user_data_set()中设置的私有用户数据

旗帜

代理发送的响应标志

rc

连接结果

flags 是一个包含来自代理的响应标志的字典:
flags['session present'] - 这个标志对客户端很有用

仅使用设置为 0 的干净会话。如果一个具有干净会话 = 0 的客户端重新连接到它先前连接的代理,则此标志指示代理是否仍然具有客户端的会话信息。如果为 1,则会话仍然存在。

rc 的值表示成功与否:

0:连接成功 1:连接被拒绝 - 协议版本不正确 2:连接被拒绝 - 客户端标识符无效 3:连接被拒绝 - 服务器不可用 4:连接被拒绝 - 用户名或密码错误 5:连接被拒绝 - 未经授权 6-255:当前未使用。

连接示例
def on_connect(client, userdata, flags, rc):
    print("Connection returned result: "+connack_string(rc))

mqttc.on_connect = on_connect
...
on_disconnect()
on_disconnect(client, userdata, rc)

当客户端与代理断开连接时调用。

客户

此回调的客户端实例

用户数据

Client()user_data_set()中设置的私有用户数据

rc

断线结果

rc 参数表示断开状态。如果MQTT_ERR_SUCCESS (0),则调用回调以响应disconnect()调用。如果有任何其他值,则断开连接是意外的,例如可能是由网络错误引起的。

断开连接示例
def on_disconnect(client, userdata, rc):
    if rc != 0:
        print("Unexpected disconnection.")

mqttc.on_disconnect = on_disconnect
...
on_message()
on_message(client, userdata, message)

当收到关于客户端订阅的主题的消息并且该消息与现有的主题过滤器回调不匹配时调用。使用message_callback_add()定义将为特定主题过滤器调用的回调。当没有匹配时, on_message将作为后备。

客户

此回调的客户端实例

用户数据

Client()user_data_set()中设置的私有用户数据

信息

MQTTMessage 的一个实例。这是一个包含成员topicpayloadqosretain的类。

消息示例
def on_message(client, userdata, message):
    print("Received message '" + str(message.payload) + "' on topic '"
        + message.topic + "' with QoS " + str(message.qos))

mqttc.on_message = on_message
...
message_callback_add()

此函数允许您定义处理特定订阅过滤器的传入消息的回调,包括使用通配符。例如,这使您可以订阅sensors/#并有一个回调来处理 传感器/温度,而另一个回调来处理传感器/湿度

message_callback_add(sub, callback)

与此回调匹配的订阅过滤器。每个文字子字符串只能定义一个回调

打回来

要使用的回调。采用与on_message 回调相同的形式。

如果使用message_callback_add()on_message,只有与订阅特定过滤器不匹配的消息才会传递给on_message 回调。

如果多个子匹配一个主题,每个回调将被调用(例如子传感器/# 和子+/湿度都匹配一个主题传感器/湿度的消息,所以两个回调都会处理这个消息)。

message_callback_remove()

删除以前使用 message_callback_add()注册的主题/订阅特定回调。

message_callback_remove(sub)

要删除的订阅过滤器

on_publish()
on_publish(client, userdata, mid)

当使用publish()调用发送的消息完成传输到代理时调用。对于具有 QoS 级别 1 和 2 的消息,这意味着已完成适当的握手。对于 QoS 0,这仅意味着消息已离开客户端。mid变量匹配从相应的publish()调用返回的mid 变量,以允许跟踪传出消息。

这个回调很重要,因为即使 publish() 调用返回成功,也并不总是意味着消息已经发送。

on_subscribe()
on_subscribe(client, userdata, mid, granted_qos)

当代理响应订阅请求时调用。mid变量匹配从相应的subscribe()调用返回的mid 变量。granted_qos变量是一个整数列表,它给出了代理为每个不同的订阅请求授予的 QoS 级别。

on_unsubscribe()
on_unsubscribe(client, userdata, mid)

当代理响应取消订阅请求时调用。mid变量匹配从相应的unsubscribe() 调用返回的mid 变量。

on_log()
on_log(client, userdata, level, buf)

当客户端有日志信息时调用。定义为允许调试。level变量给出消息的 严重性,将是 MQTT_LOG_INFOMQTT_LOG_NOTICEMQTT_LOG_WARNINGMQTT_LOG_ERRMQTT_LOG_DEBUG 之一。消息本身在buf中。

这可以与标准 Python 日志记录同时使用,可以通过enable_logger方法启用。

on_socket_open()
on_socket_open(client, userdata, sock)

在打开套接字时调用。使用它来将套接字注册到外部事件循环以供读取。

on_socket_close()
on_socket_close(client, userdata, sock)

当套接字即将关闭时调用。使用它从外部事件循环中取消注册套接字以进行读取。

on_socket_register_write()
on_socket_register_write(client, userdata, sock)

当对套接字的写入操作失败时调用,因为它会被阻塞,例如输出缓冲区已满。使用它向外部事件循环注册套接字以进行写入。

on_socket_unregister_write()
on_socket_unregister_write(client, userdata, sock)

在先前失败后对套接字的写入操作成功时调用。使用它从外部事件循环中注销套接字以进行写入。

外部事件循环支持

循环读取()
loop_read(max_packets=1)

当套接字准备好读取时调用。max_packets已过时,应保持未设置。

循环写入()
loop_write(max_packets=1)

当套接字准备好写入时调用。max_packets已过时,应保持未设置。

循环杂项()
loop_misc()

每隔几秒调用一次以处理消息重试和 ping。

插座()
socket()

返回客户端正在使用的套接字对象,以允许与其他事件循环交互。此调用对于基于选择的循环特别有用。请参阅示例/loop_select.py

想要写()
want_write()

如果有数据等待写入,则返回 true,以允许客户端与其他事件循环交互。此调用对于基于选择的循环特别有用。请参阅示例/loop_select.py

状态回调
on_socket_open
on_socket_close
on_socket_register_write
on_socket_unregister_write

使用这些回调来获取有关套接字状态更改的通知。这对于注册或注销套接字以进行读+写的事件循环特别有用。有关示例,请参见示例/loop_asyncio.py

当套接字打开时,on_socket_open被调用。使用您的事件循环注册套接字以供读取。

当套接字即将关闭时,会调用on_socket_close。从事件循环中注销套接字以进行读取。

当对套接字的写入失败时,因为它会被阻塞,例如输出缓冲区已满, on_socket_register_write被调用。使用您的事件循环注册套接字以进行写入。

当下一次写入套接字成功时,会调用on_socket_unregister_write。从事件循环中注销套接字以进行写入。

回调总是按以下顺序调用:

  • on_socket_open

  • 零次或多次:

    • on_socket_register_write

    • on_socket_unregister_write

  • on_socket_close

全局辅助函数

客户端模块还提供了一些全局帮助函数。

topic_matches_sub(sub, topic)可用于检查主题是否与订阅 匹配。

例如:

主题foo/bar将匹配订阅foo/#+/bar

主题non/matching与订阅non/+/+不匹配

connack_string(connack_code)返回与 CONNACK 结果关联的错误字符串。

error_string(mqtt_errno)返回与 Paho MQTT 错误号关联的错误字符串。

发布

该模块提供了一些帮助函数,以允许以一次性方式直接发布消息。换句话说,它们对于您想要发布到代理的单条/多条消息然后断开连接而无需其他任何东西的情况很有用。

提供的两个函数是single()multiple()

这两个功能都包括对 MQTT v5.0 的支持,但目前不允许您在连接或发送消息时设置任何属性。