Skip to main content

基于 pika 库的 RabbitMQ 客户端。

项目描述

基于 pika 的 RabbitMQ 客户端助手

PyPI 版本 建造 编解码器

这个项目提供了在 Python 中使用 RabbitMQ 的辅助类。它基于pika,这是一个很棒的 RabbitMQ 无依赖客户端库。同样,该项目力求零依赖(开发依赖除外)。

通过使用这个项目,用户应该能够立即开始使用 Python 中的 RabbitMQ,只需实例化和启动一个RMQConsumerRMQProducer类。

消费者

RMQConsumer只用一个额外的方法扩展RMQConnection基类:consume. consume 可以传入参数,用于声明队列和交换器,也可以将它们绑定在一起,consumer 参数,在 pika 库中都有对应的 kwargs。这个想法不是重新发明轮子,而是简单地声明一个队列 -> 声明一个交换器 -> 将交换器和队列绑定在一起 -> 从队列中消费的过程。

这是一个例子:

from rabbitmq_client import RMQConsumer, ConsumeParams, QueueParams


def on_message(msg, ack=None):
    ...

consumer = RMQConsumer()
consumer.start()
consumer.consume(ConsumeParams(on_message),
                 queue_params=QueueParams("queue_name"))

声明、绑定和消费的流程非常简单。上面的示例将声明一个名为“queue_name”的队列并从中消费。

笔记!尽管上面的内容可能看起来是同步的,但事实并非如此。启动是异步的,并且在消费者未完全启动时启动的任何消费都将简单地延迟直到完全启动。当消费成功启动后,绑定的回调将接收一个ConsumeOK包含结果消费者标签的对象。

确认收到的消息

默认情况下,收到的消息需要在收到时进行确认。通过调用ackkwarg 函数,消息被确认并且不会再次发送。如果使用此函数未确认消息,则 RabbitMQ 将在再次从队列中消费时重新发送。

from rabbitmq_client import RMQConsumer, ConsumeParams, QueueParams

from some_other_module import handle_msg


def on_message(msg, ack=None):
    error = handle_msg(msg)

    if not error:
        ack()

consumer = RMQConsumer()
consumer.start()
consumer.consume(ConsumeParams(on_message),
                 queue_params=QueueParams("queue_name"))

要启用消息的自动确认,请将auto_ack参数传递给ConsumeParams,设置为True。如果auto_ackTrue,则 ack未设置 kwarg。

制片人

RMQProducer使用两个附加方法扩展RMQConnection基类:publishactivate_confirm_mode. 听起来,发布用于向队列和/或交换发布消息。确认模式激活方法启用确认模式,以便用户可以验证消息是否已成功传递。

from rabbitmq_client import RMQProducer, ExchangeParams


def on_confirm(confirmation):
    ...

producer = RMQProducer()
producer.start()
producer.activate_confirm_mode(on_confirm)  # Or don't, depends on your needs

producer.publish(b"body", 
                 exchange_params=ExchangeParams("exchange_name"),
                 routing_key="some.routing.key")

activate_confirm_mode也不是同步的,但你不必担心。在确认模式成功激活之前调用publishafter将导致发布不会发生。activate_confirm_mode传递给的回调activate_confirm_mode也将收到ConfirmModeOK一次确认模式开启。调用activate_confirm_mode和从 RabbitMQ 接收到 confirm_select_ok 的生产者之间的任何发布都将被缓冲,并且在确认模式开启之前不会发布。当确认模式打开时,publish还返回一个密钥,客户端可以使用该密钥将成功交付与发布调用相关联。一旦publish确认使用键 X 的调用,传递给的回调 activate_confirm_mode将使用 X 调用。

抽象连接助手

抽象RMQConnection类可以被子类化,以便在使用pika SelectConnectionandChannel对象时抢占先机,因为它包装它们并提供易于使用的接口以及重要事件的事件挂钩。

RMQConnection 生命周期钩子

子类RMQConnection化要求实现者重写三个方法: on_readyon_closeon_error

on_readyRMQConnection在建立连接并打开通道时调用。

on_close当连接或通道因任何原因关闭时调用。这意味着实施者可能会收到两个针对一个失败连接的调用,一个针对通道,一个针对连接本身。这使得on_close幂等变得很重要。

on_error当最近的操作失败时调用,例如交换声明失败。这些钩子旨在使实施者能够对连接状态做出反应并恢复操作状态。项目使用RMQConnection 抽象基类rabbitmq_client来实现其 RMQConsumerRMQProducer类。

RMQConnection 接口方法

除了需要通过实现类来实现的钩子外, RMQConnection还提供了三个可用于与连接对象交互的公共方法:startrestartstop

start启动连接,建立一个连接,pika.SelectConnection如果成功,pika.Channel为打开的连接打开一个连接。一旦通道打开,RMQConnection将调用on_ready. start如果连接已经启动,则后续调用无效。

restart关闭打开的连接并确保它在完全关闭后再次启动。restart仅用于成功建立的连接,它对关闭的连接没有影响。 restart旨在用作pika.ConnectionParameters 即时更改的手段。

stop永久关闭打开的连接,对关闭的连接没有影响。已调用的连接stop不能重新使用。on_close连接完全停止后调用。

除了与连接相关的方法之外,RMQConnection还公开了与 的交互pika.Channel,名称类似。请参阅此处了解公开的内容:Pika docs

自动重新连接

RMQConnection将重新建立丢失的连接,但不会重新建立丢失的通道。但是,不会出于任何原因进行重新连接,重新连接的原因包括:

  • pika.exceptions.ConnectionClosedByBroker
  • pika.exceptions.StreamLostError

这两个例外涵盖了代理已关闭的情况,无论是预期的还是意外的,或者连接由于其他原因而丢失的情况。

同样,如果通道丢失,但连接保持完好, RMQConnection将无法恢复通道。

重新连接尝试将随着尝试之间的延迟增加而进行。第一次尝试是瞬时的,第二次延迟 1 秒,第三次延迟 2 秒,依此类推。第 9 次尝试后,将以 30 秒的间隔进行以下重新连接。

日志记录

rabbitmq_client遵循 Python 日志记录标准,默认情况下禁用。要启用日志记录,请将处理程序附加到rabbitmq_client

import logging

logging.getLogger("rabbitmq_client").addHandler(logging.StreamHandler())

默认情况下, alogging.NullHandler()附加到此记录器。

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

rabbitmq-client-2.4.0.tar.gz (19.6 kB 查看哈希)

已上传 source

内置分布

rabbitmq_client-2.4.0-py2.py3-none-any.whl (18.0 kB 查看哈希

已上传 py2 py3