Skip to main content

Pika Python AMQP 客户端库

项目描述

Pika 是一个用于 Python 的 RabbitMQ (AMQP 0-9-1) 客户端库。

版本 Python 版本 操作状态 覆盖范围 执照 文件状态

介绍

Pika 是 AMQP 0-9-1 协议的纯 Python 实现,包括 RabbitMQ 的扩展。

  • 支持 Python 3.4+(1.1.0是支持 2.7 的最后一个版本)

  • 由于线程并不适用于所有情况,因此它不需要线程。Pika core 也注意不要禁止它们。greenlets、回调、延续和生成器也是如此。然而,Pika 内置连接适配器的一个实例不是线程安全的。

  • 人们可能正在使用直接套接字、普通的旧select()或任何从 Python 应用程序获取网络事件的多种方式。Pika 试图与所有这些保持兼容,并尽可能简单地使其适应新环境。

文档

Pika 的文档可以在https://pika.readthedocs.io找到。

例子

这是最简单的使用示例,使用 pika.BlockingConnection适配器发送消息:

import pika

connection = pika.BlockingConnection()
channel = connection.channel()
channel.basic_publish(exchange='test', routing_key='test',
                      body=b'Test message.')
connection.close()

还有一个编写阻塞消费者的例子:

import pika

connection = pika.BlockingConnection()
channel = connection.channel()

for method_frame, properties, body in channel.consume('test'):
    # Display the message parts and acknowledge the message
    print(method_frame, properties, body)
    channel.basic_ack(method_frame.delivery_tag)

    # Escape out of the loop after 10 messages
    if method_frame.delivery_tag == 10:
        break

# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
connection.close()

Pika 提供以下适配器

  • pika.adapters.asyncio_connection.AsyncioConnection - Python 3 AsyncIO的 I/O 循环的异步适配器。

  • pika.BlockingConnection - 库顶部的同步适配器,便于使用。

  • pika.SelectConnection - 没有第三方依赖的异步适配器。

  • pika.adapters.gevent_connection.GeventConnection - 用于Gevent的 I/O 循环的异步适配器。

  • pika.adapters.tornado_connection.TornadoConnection - 用于Tornado的 I/O 循环的异步适配器。

  • pika.adapters.twisted_connection.TwistedProtocolConnection - 用于Twisted的 I/O 循环的异步适配器。

多个连接参数

您还可以传递多个pika.ConnectionParameters实例以实现容错,如下面的代码片段所示(当然,主机名只是示例)。要启用重试,请在序列的最后一个pika.ConnectionParameters元素中根据需要设置connection_attemptsretry_delay 。在使用所有给定连接参数的连接尝试失败后会发生重试。

import pika

parameters = (
    pika.ConnectionParameters(host='rabbitmq.zone1.yourdomain.com'),
    pika.ConnectionParameters(host='rabbitmq.zone2.yourdomain.com',
                              connection_attempts=5, retry_delay=1))
connection = pika.BlockingConnection(parameters)

使用非阻塞适配器,例如pika.SelectConnectionpika.adapters.asyncio_connection.AsyncioConnection ,您可以通过连接适配器的create_connection()类方法使用多个连接参数实例请求连接。

从另一个线程请求消息确认

单个 Pika 连接适配器实例的单线程使用约束可能会导致 AMQP/流连接由于消费者的 AMQP 心跳超时而丢失,这些消费者需要很长时间来处理传入的消息。一个常见的解决方案是将传入消息的处理委托给另一个线程,而连接适配器的线程继续为其 I/O 循环的消息泵提供服务,从而允许及时处理 AMQP 心跳和其他 I/O。

在另一个线程中处理的消息可能不会直接从该线程确认,因为对连接适配器实例的所有访问都必须来自单个线程,该线程是运行适配器 I/O 循环的线程。这是通过请求在适配器的 I/O 循环线程中执行的回调来实现的。例如,回调函数的实现可能如下所示:

def ack_message(channel, delivery_tag):
    """Note that `channel` must be the same Pika channel instance via which
    the message being acknowledged was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        channel.basic_ack(delivery_tag)
    else:
        # Channel is already closed, so we can't acknowledge this message;
        # log and/or do something that makes sense for your app in this case.
        pass

在另一个线程中运行的代码可能会使用适配器特定的机制请求在连接适配器的 I/O 循环线程中执行ack_message()函数:

  • pika.BlockingConnection从应用程序中抽象出它的 I/O 循环,从而暴露pika.BlockingConnection.add_callback_threadsafe()。有关更多信息,请参阅此方法的文档字符串。例如:

    connection.add_callback_threadsafe(functools.partial(ack_message, channel, delivery_tag))
  • 当使用非阻塞连接适配器时,例如 pika.adapters.asyncio_connection.AsyncioConnectionpika.SelectConnection,您使用底层异步框架的本机 API 从另一个线程请求 I/O 循环绑定回调。例如pika.SelectConnection的 I/O 循环提供 add_callback_threadsafe()pika.adapters.tornado_connection.TornadoConnection的 I/O 循环有 add_callback(),而 pika.adapters.asyncio_connection.AsyncioConnection的 I/O 循环公开 call_soon_threadsafe()

这种线程安全的回调请求机制也可用于将消息等的发布从后台线程委托给连接适配器的线程。

连接恢复

一些 RabbitMQ 客户端(Bunny、Java、.NET、Objective-C、Swift)提供了一种在网络故障后自动恢复连接、其通道和拓扑(例如队列、绑定和消费者)的方法。其他人则要求应用程序代码执行连接恢复,并努力使其成为一个简单的过程。鼠兔属于第二类。

Pika 支持多个连接适配器。他们采用不同的方法来恢复连接。

对于pika.BlockingConnection适配器异常处理可用于检查连接错误。这是一个非常基本的例子:

import pika

while True:
    try:
        connection = pika.BlockingConnection()
        channel = connection.channel()
        channel.basic_consume('test', on_message_callback)
        channel.start_consuming()
    # Don't recover if connection was closed by broker
    except pika.exceptions.ConnectionClosedByBroker:
        break
    # Don't recover on channel errors
    except pika.exceptions.AMQPChannelError:
        break
    # Recover on all other connection errors
    except pika.exceptions.AMQPConnectionError:
        continue

这个例子可以在examples/consume_recover.py中找到。

可以使用重试等通用操作重试库 。装饰器可以配置一些额外的恢复行为,例如重试之间的延迟和限制重试次数:

from retry import retry


@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
def consume():
    connection = pika.BlockingConnection()
    channel = connection.channel()
    channel.basic_consume('test', on_message_callback)

    try:
        channel.start_consuming()
    # Don't recover connections closed by server
    except pika.exceptions.ConnectionClosedByBroker:
        pass


consume()

这个例子可以在examples/consume_recover_retry.py中找到。

对于异步适配器,使用on_close_callback对连接失败事件做出反应。此回调可用于清理和恢复连接。

可以在 examples/asynchronous_consumer_example.py中找到使用on_close_callback进行恢复的示例。

贡献

要为 Pika 做出贡献,请确保任何新功能或对现有功能的更改都包括测试覆盖率

在没有足够测试覆盖的情况下添加或更改代码的拉取请求将被拒绝。

此外, 在发出拉取请求之前,请使用带有google风格的Yapf格式化您的代码。注意:仅格式化您在拉取请求中更改的那些行。如果您格式化整个文件并在 PR 范围之外更改代码,则很可能会被拒绝。

扩展以支持其他 I/O 框架

新的非阻塞适配器可以通过以下任一方式实现:

  • 通过子类化 pika.BaseConnection,实现其抽象方法并向其构造函数传递 pika.adapters.utils.nbio_interface.AbstractIOServices的实现。 pika.BaseConnection实现pika.connection.Connection的抽象方法,包括内部启动的连接逻辑。例如,请参考 pika.adapters.asyncio_connection.AsyncioConnectionpika.adapters.gevent_connection.GeventConnectionpika.adapters.tornado_connection.TornadoConnection的实现。

  • 通过继承 pika.connection.Connection并实现其抽象方法。这种方法有助于实现自定义连接建立和传输机制。例如,参考 pika.adapters.twisted_connection.TwistedProtocolConnection的实现。