Skip to main content

Python 的 Kafka 作业队列

项目描述

KQ:用于 Python 的 Kafka 作业队列

建造 代码QL 编解码器 PyPI 版本 GitHub 许可证 蟒蛇版本

KQ (Kafka Queue)是一个轻量级的 Python 库,可让您使用Apache Kafka异步排队和执行作业。它 在引擎盖下使用kafka-python 。

公告

  • 将从 KQ 版本 3.0.0 中删除对 Python 3.5 的支持。
  • 有关最新更新,请参阅版本

要求

安装

使用pip安装:

pip install kq

入门

启动您的 Kafka 实例。使用Docker的示例:

docker run -p 9092:9092 -e ADV_HOST=127.0.0.1 lensesio/fast-data-dev

定义您的 KQworker.py模块:

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter("[%(levelname)s] %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger("kq.worker")
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

# Set up a Kafka consumer.
consumer = KafkaConsumer(
    bootstrap_servers="127.0.0.1:9092",
    group_id="group",
    auto_offset_reset="latest"
)

# Set up a worker.
worker = Worker(topic="topic", consumer=consumer)
worker.start()

启动你的工人:

python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092 topic=topic, group=group) ...

将函数调用加入队列:

import requests

from kafka import KafkaProducer
from kq import Queue

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers="127.0.0.1:9092")

# Set up a queue.
queue = Queue(topic="topic", producer=producer)

# Enqueue a function call.
job = queue.enqueue(requests.get, "https://google.com")

# You can also specify the job timeout, Kafka message key and partition.
job = queue.using(timeout=5, key=b"foo", partition=0).enqueue(requests.get, "https://google.com")

工作人员在后台执行作业:

python my_worker.py
[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=topic, group=group) ...
[INFO] Processing Message(topic=topic, partition=0, offset=0) ...
[INFO] Executing job c7bf2359: requests.api.get("https://www.google.com")
[INFO] Job c7bf2359 returned: <Response [200]>

有关更多信息,请参阅文档

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

kq-2.2.1.tar.gz (18.2 kB 查看哈希

已上传 source

内置分布

kq-2.2.1-py3-none-any.whl (11.6 kB 查看哈希

已上传 py3