功能齐全的纯 Python Kafka 客户端
项目描述
派卡夫卡
PyKafka 是一个程序员友好的 Python Kafka 客户端。它包括 Kafka 生产者和消费者的 Python 实现,它们可选地由构建在librdkafka上的 C 扩展支持。它在 Python 2.7+、Python 3.4+ 和 PyPy 下运行,并支持 Kafka 0.8.2 及更高版本。
PyKafka 的主要目标是使用 Python 程序员熟悉的惯用语为JVM Kafka 客户端提供类似级别的抽象, 并尽可能公开最 Pythonic 的 API。
您可以使用 PyPI 从 PyPI 安装 PyKafka
$ pip install pykafka
或来自 conda-forge
$ conda install -c conda-forge pykafka
PyKafka 的完整文档和使用示例可以在readthedocs上找到。
您可以通过克隆此存储库并运行来安装 PyKafka 以进行本地开发和测试
$ python setup.py develop
入门
假设您在 localhost 上运行了至少一个 Kafka 实例,您可以使用 PyKafka 连接到它。
>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="127.0.0.1:9092,127.0.0.1:9093,...")
或者,对于 TLS 连接,您可以编写(有关更多详细信息,另请参阅SslConfig文档):
>>> from pykafka import KafkaClient, SslConfig
>>> config = SslConfig(cafile='/your/ca.cert',
... certfile='/your/client.cert', # optional
... keyfile='/your/client.key', # optional
... password='unlock my client key please') # optional
>>> client = KafkaClient(hosts="127.0.0.1:<ssl-port>,...",
... ssl_config=config)
如果您连接的集群上定义了任何主题,您可以列出它们:
>>> client.topics
>>> topic = client.topics['my.test']
一旦你有了一个Topic,你就可以为它创建一个Producer并开始生产消息。
>>> with topic.get_sync_producer() as producer:
... for i in range(4):
... producer.produce('test message ' + str(i ** 2))
上面的示例将同步生成到 kafka - 只有在我们确认消息到达集群后调用才会返回。
为了获得更高的吞吐量,我们建议在异步模式下使用Producer,以便producer ()调用将立即返回,并且生产者可以选择以更大的批量发送消息。Producer在发送每个批次之前将生成的消息收集到linger_ms的内部队列中。可以使用linger_ms、 min_queued_messages和其他关键字参数(请参阅readthedocs )以牺牲效率为代价来删除或更改此延迟。您仍然可以通过队列接口获取消息的传递确认,该接口可以通过设置delivery_reports=True来启用。这是一个粗略的用法示例:
>>> with topic.get_producer(delivery_reports=True) as producer:
... count = 0
... while True:
... count += 1
... producer.produce('test msg', partition_key='{}'.format(count))
... if count % 10 ** 5 == 0: # adjust this or bring lots of RAM ;)
... while True:
... try:
... msg, exc = producer.get_delivery_report(block=False)
... if exc is not None:
... print 'Failed to deliver msg {}: {}'.format(
... msg.partition_key, repr(exc))
... else:
... print 'Successfully delivered msg {}'.format(
... msg.partition_key)
... except Queue.Empty:
... break
请注意,传递报告队列是线程本地的:它只会为从当前线程生成的消息提供报告。此外,如果您使用 delivery_reports=True,未能使用交付报告队列将导致 PyKafka 的内存使用量无限增长。
您还可以使用Consumer实例使用来自该主题的消息。
>>> consumer = topic.get_simple_consumer()
>>> for message in consumer:
... if message is not None:
... print message.offset, message.value
0 test message 0
1 test message 1
2 test message 4
3 test message 9
此SimpleConsumer无法扩展 - 如果您有两个SimpleConsumer 使用相同的主题,他们将收到重复的消息。为了解决这个问题,您可以使用BalancedConsumer。
>>> balanced_consumer = topic.get_balanced_consumer(
... consumer_group='testgroup',
... auto_commit_enable=True,
... zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot'
... )
您可以拥有与该主题具有分区一样多的BalancedConsumer实例使用该主题。如果它们都连接到同一个 zookeeper 实例,它们将与其通信以自动平衡它们之间的分区。BalancedConsumer使用的分区分配策略默认为“范围”策略。该策略可以通过 members_protocol 关键字参数进行切换, 并且可以是 pykafka.membershipprotocol 公开的对象或pykafka.membershipprotocol.GroupMembershipProtocol的自定义实例。
您还可以将 Kafka 0.9 Group Membership API 与get_balanced_consumer上的managed 关键字参数一起使用。
使用 librdkafka 扩展
PyKafka 包含一个 C 扩展,它利用 librdkafka 来加速生产者和消费者的操作。要使用 librdkafka 扩展,您需要确保头文件和共享库位于 python 可以找到它们的位置,无论是在构建扩展时(由setup.py develop负责)还是在运行时。通常,这意味着您需要将 librdkafka 安装在系统的常规位置,或者在 shell 环境中声明C_INCLUDE_PATH、LIBRARY_PATH和LD_LIBRARY_PATH以指向 librdkafka 共享对象的安装位置。您可以使用locate librdkafka.so找到此位置。
之后,您只需将额外参数 use_rdkafka=True传递给topic.get_producer()、 topic.get_simple_consumer()或topic.get_balanced_consumer()。请注意,某些配置选项可能具有不同的最佳值;这可能值得参考 librdkafka 的配置说明。
操作工具
PyKafka 包含一小部分CLI 工具,可以帮助完成与 Kafka 集群管理相关的常见任务,包括偏移和滞后监控以及主题检查。这些工具的完整的、最新的界面可以通过运行来破坏
$ python cli/kafka_tools.py --help
或通过 setuptools 或 pip 安装 PyKafka 之后:
$ kafka-tools --help
PyKafka 还是 kafka-python?
这是两个不同的项目。有关两个项目之间的比较,请参见此处的讨论。
贡献
支持
如果您在使用 PyKafka 时需要帮助,这里有大量可用资源。有关使用问题或常见食谱,请查看StackOverflow 标签。Google 组对于您想直接发送给 PyKafka 维护人员的更深入的问题或查询很有用。如果你认为你在 PyKafka 中发现了一个错误,请在阅读 贡献指南后打开一个github 问题。