Skip to main content

功能齐全的纯 Python Kafka 客户端

项目描述

https://travis-ci.org/Parsely/pykafka.svg?branch=master https://codecov.io/github/Parsely/pykafka/coverage.svg?branch=master

派卡夫卡

http://i.imgur.com/ztYl4lG.jpg

PyKafka 是一个程序员友好的 Python Kafka 客户端。它包括 Kafka 生产者和消费者的 Python 实现,它们可选地由构建在librdkafka上的 C 扩展支持。它在 Python 2.7+、Python 3.4+ 和 PyPy 下运行,并支持 Kafka 0.8.2 及更高版本。

PyKafka 的主要目标是使用 Python 程序员熟悉的惯用语为JVM Kafka 客户端提供类似级别的抽象, 并尽可能公开最 Pythonic 的 API。

您可以使用 PyPI 从 PyPI 安装 PyKafka

$ pip install pykafka

或来自 conda-forge

$ conda install -c conda-forge pykafka

PyKafka 的完整文档和使用示例可以在readthedocs上找到。

您可以通过克隆此存储库并运行来安装 PyKafka 以进行本地开发和测试

$ python setup.py develop

入门

假设您在 localhost 上运行了至少一个 Kafka 实例,您可以使用 PyKafka 连接到它。

>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="127.0.0.1:9092,127.0.0.1:9093,...")

或者,对于 TLS 连接,您可以编写(有关更多详细信息,另请参阅SslConfig文档):

>>> from pykafka import KafkaClient, SslConfig
>>> config = SslConfig(cafile='/your/ca.cert',
...                    certfile='/your/client.cert',  # optional
...                    keyfile='/your/client.key',  # optional
...                    password='unlock my client key please')  # optional
>>> client = KafkaClient(hosts="127.0.0.1:<ssl-port>,...",
...                      ssl_config=config)

如果您连接的集群上定义了任何主题,您可以列出它们:

>>> client.topics
>>> topic = client.topics['my.test']

一旦你有了一个Topic,你就可以为它创建一个Producer并开始生产消息。

>>> with topic.get_sync_producer() as producer:
...     for i in range(4):
...         producer.produce('test message ' + str(i ** 2))

上面的示例将同步生成到 kafka - 只有在我们确认消息到达集群后调用才会返回。

为了获得更高的吞吐量,我们建议在异步模式下使用Producer,以便producer ()调用将立即返回,并且生产者可以选择以更大的批量发送消息。Producer在发送每个批次之前将生成的消息收集到linger_ms的内部队列中。可以使用linger_msmin_queued_messages和其他关键字参数(请参阅readthedocs )以牺牲效率为代价来删除或更改此延迟。您仍然可以通过队列接口获取消息的传递确认,该接口可以通过设置delivery_reports=True来启用。这是一个粗略的用法示例:

>>> with topic.get_producer(delivery_reports=True) as producer:
...     count = 0
...     while True:
...         count += 1
...         producer.produce('test msg', partition_key='{}'.format(count))
...         if count % 10 ** 5 == 0:  # adjust this or bring lots of RAM ;)
...             while True:
...                 try:
...                     msg, exc = producer.get_delivery_report(block=False)
...                     if exc is not None:
...                         print 'Failed to deliver msg {}: {}'.format(
...                             msg.partition_key, repr(exc))
...                     else:
...                         print 'Successfully delivered msg {}'.format(
...                         msg.partition_key)
...                 except Queue.Empty:
...                     break

请注意,传递报告队列是线程本地的:它只会为从当前线程生成的消息提供报告。此外,如果您使用 delivery_reports=True,未能使用交付报告队列将导致 PyKafka 的内存使用量无限增长。

您还可以使用Consumer实例使用来自该主题的消息。

>>> consumer = topic.get_simple_consumer()
>>> for message in consumer:
...     if message is not None:
...         print message.offset, message.value
0 test message 0
1 test message 1
2 test message 4
3 test message 9

SimpleConsumer无法扩展 - 如果您有两个SimpleConsumer 使用相同的主题,他们将收到重复的消息。为了解决这个问题,您可以使用BalancedConsumer

>>> balanced_consumer = topic.get_balanced_consumer(
...     consumer_group='testgroup',
...     auto_commit_enable=True,
...     zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot'
... )

您可以拥有与该主题具有分区一样多的BalancedConsumer实例使用该主题。如果它们都连接到同一个 zookeeper 实例,它们将与其通信以自动平衡它们之间的分区。BalancedConsumer使用的分区分配策略默认为“范围”策略。该策略可以通过 members_protocol 关键字参数进行切换 并且可以是 pykafka.membershipprotocol 公开的对象pykafka.membershipprotocol.GroupMembershipProtocol的自定义实例。

您还可以将 Kafka 0.9 Group Membership API 与get_balanced_consumer上的managed 关键字参数一起使用。

使用 librdkafka 扩展

PyKafka 包含一个 C 扩展,它利用 librdkafka 来加速生产者和消费者的操作。要使用 librdkafka 扩展,您需要确保头文件和共享库位于 python 可以找到它们的位置,无论是在构建扩展时(由setup.py develop负责)还是在运行时。通常,这意味着您需要将 librdkafka 安装在系统的常规位置,或者在 shell 环境中声明C_INCLUDE_PATHLIBRARY_PATHLD_LIBRARY_PATH以指向 librdkafka 共享对象的安装位置。您可以使用locate librdkafka.so找到此位置。

之后,您只需将额外参数 use_rdkafka=True传递给topic.get_producer()topic.get_simple_consumer()topic.get_balanced_consumer()。请注意,某些配置选项可能具有不同的最佳值;这可能值得参考 librdkafka 的配置说明

操作工具

PyKafka 包含一小部分CLI 工具,可以帮助完成与 Kafka 集群管理相关的常见任务,包括偏移和滞后监控以及主题检查。这些工具的完整的、最新的界面可以通过运行来破坏

$ python cli/kafka_tools.py --help

或通过 setuptools 或 pip 安装 PyKafka 之后:

$ kafka-tools --help

PyKafka 还是 kafka-python?

这是两个不同的项目。有关两个项目之间的比较,请参见此处的讨论。

贡献

如果您有兴趣为 PyKafka 贡献代码,一个好的起点是 “需要帮助”问题标签。我们还建议您查看贡献指南

支持

如果您在使用 PyKafka 时需要帮助,这里有大量可用资源。有关使用问题或常见食谱,请查看StackOverflow 标签Google 组对于您想直接发送给 PyKafka 维护人员的更深入的问题或查询很有用。如果你认为你在 PyKafka 中发现了一个错误,请在阅读 贡献指南后打开一个github 问题