Python 的 Kafka 作业队列
项目描述
KQ:用于 Python 的 Kafka 作业队列
KQ (Kafka Queue)是一个轻量级的 Python 库,可让您使用Apache Kafka异步排队和执行作业。它 在引擎盖下使用kafka-python 。
公告
- 将从 KQ 版本 3.0.0 中删除对 Python 3.5 的支持。
- 有关最新更新,请参阅版本。
要求
- 阿帕奇卡夫卡0.9+
- Python 3.6+
安装
使用pip安装:
pip install kq
入门
启动您的 Kafka 实例。使用Docker的示例:
docker run -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev
定义您的 KQworker.py模块:
import logging
from kafka import KafkaConsumer
from kq import Worker
# Set up logging.
formatter = logging.Formatter("[%(levelname)s] %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger("kq.worker")
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)
# Set up a Kafka consumer.
consumer = KafkaConsumer(
bootstrap_servers="127.0.0.1:9092",
group_id="group",
auto_offset_reset="latest"
)
# Set up a worker.
worker = Worker(topic="topic", consumer=consumer)
worker.start()
启动你的工人:
python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ...
将函数调用加入队列:
import requests
from kafka import KafkaProducer
from kq import Queue
# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers="127.0.0.1:9092")
# Set up a queue.
queue = Queue(topic="topic", producer=producer)
# Enqueue a function call.
job = queue.enqueue(requests.get, "https://google.com")
# You can also specify the job timeout, Kafka message key and partition.
job = queue.using(timeout=5, key=b"foo", partition=0).enqueue(requests.get, "https://google.com")
工作人员在后台执行作业:
python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ...
[INFO] Processing Message(topic=topic, partition=0, offset=0) ...
[INFO] Executing job c7bf2359: requests.api.get("https://www.google.com")
[INFO] Job c7bf2359 returned: <Response [200]>
有关更多信息,请参阅文档。
项目详情
下载文件
下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。
源分布
kq-2.2.1.tar.gz
(18.2 kB
查看哈希)
内置分布
kq-2.2.1-py3-none-any.whl
(11.6 kB
查看哈希)