流处理的流水线框架
项目描述
Pipeline 提供了一个统一的接口来设置与 Kafka、Pulsar、RabbitMQ、Redis 等的数据流处理系统。这个想法是让开发者从部署中技术的动态变化中解放出来,通过环境变量的变化,为某个任务发布的 docker 镜像可以与 Kafka 或 Redis 一起使用。
特征
从 Kakfa 到 Pulsar,从 Redis 到 MongoDB 的统一接口
通过命令行或环境变量控制的组件连接
支持文件和内存用于测试
要求
蟒蛇 3.7、3.8
安装
$ pip install tanbih-pipeline
您可以使用以下命令安装所需的后端依赖项:
$ pip install tanbih-pipeline[redis]
$ pip install tanbih-pipeline[kafka]
$ pip install tanbih-pipeline[pulsar]
$ pip install tanbih-pipeline[rabbitmq]
$ pip install tanbih-pipeline[elastic]
$ pip install tanbih-pipeline[mongodb]
如果你想支持所有后端,你可以:
$ pip install tanbih-pipeline[full]
制片人
在我们的管道中开发数据源时将使用生产者。源将在没有输入的情况下产生输出。爬虫可以被视为生产者。
>>> from typing import Generator
>>> from pydantic import BaseModel
>>> from pipeline import Producer as Worker, ProducerSettings as Settings
>>>
>>> class Output(BaseModel):
... key: int
>>>
>>> class MyProducer(Worker):
... def generate(self) -> Generator[Output, None, None]:
... for i in range(10):
... yield Output(key=i)
>>>
>>> settings = Settings(name='producer', version='0.0.0', description='')
>>> producer = MyProducer(settings, output_class=Output)
>>> producer.parse_args("--out-kind MEM --out-topic test".split())
>>> producer.start()
>>> [r.get('key') for r in producer.destination.results]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
处理器
处理器将用于处理输入。修改将就地进行。处理器可以为每个输入产生一个输出,或者不产生输出。
>>> from pydantic import BaseModel
>>> from pipeline import Processor as Worker, ProcessorSettings as Settings
>>>
>>> class Input(BaseModel):
... temperature: float
>>>
>>> class Output(BaseModel):
... is_hot: bool
>>>
>>> class MyProcessor(Worker):
... def process(self, content, key):
... is_hot = (content.temperature > 25)
... return Output(is_hot=is_hot)
>>>
>>> settings = Settings(name='processor', version='0.1.0', description='')
>>> processor = MyProcessor(settings, input_class=Input, output_class=Output)
>>> args = "--in-kind MEM --in-topic test --out-kind MEM --out-topic test".split()
>>> processor.parse_args(args)
>>> processor.start()
分路器
写入多个输出时将使用拆分器。它将带一个函数根据处理消息生成输出主题,并在编写输出时使用它。
>>> from pipeline import Splitter as Worker, SplitterSettings as Settings
>>>
>>> class MySplitter(Worker):
... def get_topic(self, msg):
... return '{}-{}'.format(self.destination.topic, msg.get('id'))
>>>
>>> settings = Settings(name='splitter', version='0.1.0', description='')
>>> splitter = MySplitter(settings)
>>> args = "--in-kind MEM --in-topic test --out-kind MEM --out-topic test".split()
>>> splitter.parse_args(args)
>>> splitter.start()
用法
选择后端技术:
种类 |
描述 |
多阅读器 |
共享阅读器 |
数据过期 |
---|---|---|---|---|
雷迪斯 |
Redis 列表 |
X |
X |
读 |
XREDIS |
Redis 流 |
X |
X |
限制 |
卡夫卡 |
卡夫卡 |
X |
X |
读 |
脉冲星 |
脉冲星 |
X |
X |
ttl |
兔子MQ |
兔MQ |
X |
读 |
|
松紧带 |
弹性搜索 |
|||
蒙古数据库 |
MongoDB |
|||
文件* |
json,csv |
|||
记忆* |
记忆 |
FILE 接受标准输入上的 jsonl 输入和文件名,它也接受 csv 文件。两种格式都可以压缩。
MEM 读写内存,专为单元测试而设计。
# check command line arguments for certain input and output
worker.py --in-kind FILE --help
# or
IN_KIND=FILE worker.py
# or
export IN_KIND=FILE
worker.py --help
# process input from file and output to stdout (--in-content-only is
# needed for this version)
worker.py --in-kind FILE --in-filename data.jsonl --in-content-only \
--out-kind FILE --out-filename -
# read from file and write to KAFKA
worker.py --in-kind FILE --in-filename data.jsonl --in-content-only \
--out-kind KAFKA --out-namespace test --out-topic articles \
--out-kafka kafka_url --out-config kafka_config_json
论据
- 常见的
调试监控
种类命名空间主题
输入:
文件
脚本
pipeline-copy是一个脚本,用于将数据从源复制到目标。它可用于将数据从文件注入数据库,或从数据库注入另一个数据库。它被实现为管道工作者。
由于 JSON 格式不支持日期时间,为了使pipeline-copy将日期时间字段视为日期时间而不是字符串,您可以通过参数–model-definition提供模型定义。此类模型定义的示例如下(类名需要为Model):
from datetime import datetime
from typing import Optional
from pydantic import BaseModel
class Model(BaseModel):
hashtag: str
username: str
text: str
tweet_id: str
location: Optional[str]
created_at: datetime
retweet_count: int
环境变量
应用程序接受以下环境变量(请注意,您需要在这些变量中添加前缀IN_、–in-和 OUT_、–out-以指示输入和输出选项)。请参阅后端文档以获取可用的参数/环境变量。
自定义设置
class CustomSettings(Settings):
new_argument: str = Field("", title="a new argument for custom settings")
class CustomProcessor(Processor):
def __init__(self):
settings = CustomSettings("worker", "v0.1.0", "custom processor")
super().__init__(settings, input_class=BaseModel, output_class=BaseModel)
错误
发生错误时将引发 PipelineError
贡献
使用pre-commit运行black and flake8
学分
张一凡 (yzhang at hbku.edu.qa)