Skip to main content

流处理的流水线框架

项目描述

https://badge.fury.io/py/tanbih-pipeline.svg 文件状态 可维护性评分

Pipeline 提供了一个统一的接口来设置与 Kafka、Pulsar、RabbitMQ、Redis 等的数据流处理系统。这个想法是让开发者从部署中技术的动态变化中解放出来,通过环境变量的变化,为某个任务发布的 docker 镜像可以与 Kafka 或 Redis 一起使用。

特征

  • 从 Kakfa 到 Pulsar,从 Redis 到 MongoDB 的统一接口

  • 通过命令行或环境变量控制的组件连接

  • 支持文件和内存用于测试

要求

  • 蟒蛇 3.7、3.8

安装

$ pip install tanbih-pipeline

您可以使用以下命令安装所需的后端依赖项:

$ pip install tanbih-pipeline[redis]
$ pip install tanbih-pipeline[kafka]
$ pip install tanbih-pipeline[pulsar]
$ pip install tanbih-pipeline[rabbitmq]
$ pip install tanbih-pipeline[elastic]
$ pip install tanbih-pipeline[mongodb]

如果你想支持所有后端,你可以:

$ pip install tanbih-pipeline[full]

制片人

在我们的管道中开发数据源时将使用生产者。源将在没有输入的情况下产生输出。爬虫可以被视为生产者。

>>> from typing import Generator
>>> from pydantic import BaseModel
>>> from pipeline import Producer as Worker, ProducerSettings as Settings
>>>
>>> class Output(BaseModel):
...     key: int
>>>
>>> class MyProducer(Worker):
...     def generate(self) -> Generator[Output, None, None]:
...         for i in range(10):
...             yield Output(key=i)
>>>
>>> settings = Settings(name='producer', version='0.0.0', description='')
>>> producer = MyProducer(settings, output_class=Output)
>>> producer.parse_args("--out-kind MEM --out-topic test".split())
>>> producer.start()
>>> [r.get('key') for r in producer.destination.results]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

处理器

处理器将用于处理输入。修改将就地进行。处理器可以为每个输入产生一个输出,或者不产生输出。

>>> from pydantic import BaseModel
>>> from pipeline import Processor as Worker, ProcessorSettings as Settings
>>>
>>> class Input(BaseModel):
...     temperature: float
>>>
>>> class Output(BaseModel):
...     is_hot: bool
>>>
>>> class MyProcessor(Worker):
...     def process(self, content, key):
...         is_hot = (content.temperature > 25)
...         return Output(is_hot=is_hot)
>>>
>>> settings = Settings(name='processor', version='0.1.0', description='')
>>> processor = MyProcessor(settings, input_class=Input, output_class=Output)
>>> args = "--in-kind MEM --in-topic test --out-kind MEM --out-topic test".split()
>>> processor.parse_args(args)
>>> processor.start()

分路器

写入多个输出时将使用拆分器。它将带一个函数根据处理消息生成输出主题,并在编写输出时使用它。

>>> from pipeline import Splitter as Worker, SplitterSettings as Settings
>>>
>>> class MySplitter(Worker):
...     def get_topic(self, msg):
...         return '{}-{}'.format(self.destination.topic, msg.get('id'))
>>>
>>> settings = Settings(name='splitter', version='0.1.0', description='')
>>> splitter = MySplitter(settings)
>>> args = "--in-kind MEM --in-topic test --out-kind MEM --out-topic test".split()
>>> splitter.parse_args(args)
>>> splitter.start()

用法

选择后端技术:

种类

描述

多阅读器

共享阅读器

数据过期

雷迪斯

Redis 列表

X

X

XREDIS

Redis 流

X

X

限制

卡夫卡

卡夫卡

X

X

脉冲星

脉冲星

X

X

ttl

兔子MQ

兔MQ

X

松紧带

弹性搜索

蒙古数据库

MongoDB

文件*

json,csv

记忆*

记忆

  • FILE 接受标准输入上的 jsonl 输入和文件名,它也接受 csv 文件。两种格式都可以压缩。

  • MEM 读写内存,专为单元测试而设计。

# check command line arguments for certain input and output
worker.py --in-kind FILE --help
# or
IN_KIND=FILE worker.py
# or
export IN_KIND=FILE
worker.py --help

# process input from file and output to stdout (--in-content-only is
# needed for this version)
worker.py --in-kind FILE --in-filename data.jsonl --in-content-only \
          --out-kind FILE --out-filename -


# read from file and write to KAFKA
worker.py --in-kind FILE --in-filename data.jsonl --in-content-only \
          --out-kind KAFKA --out-namespace test --out-topic articles \
          --out-kafka kafka_url --out-config kafka_config_json

论据

常见的

调试监控

种类命名空间主题

输入:

文件

脚本

pipeline-copy是一个脚本,用于将数据从源复制到目标。它可用于将数据从文件注入数据库,或从数据库注入另一个数据库。它被实现为管道工作者。

由于 JSON 格式不支持日期时间,为了使pipeline-copy将日期时间字段视为日期时间而不是字符串,您可以通过参数–model-definition提供模型定义。此类模型定义的示例如下(类名需要为Model):

from datetime import datetime
from typing import Optional

from pydantic import BaseModel

class Model(BaseModel):
    hashtag: str
    username: str
    text: str
    tweet_id: str
    location: Optional[str]
    created_at: datetime
    retweet_count: int

环境变量

应用程序接受以下环境变量(请注意,您需要在这些变量中添加前缀IN_–in-OUT_–out-以指示输入和输出选项)。请参阅后端文档以获取可用的参数/环境变量。

自定义设置

class CustomSettings(Settings):
    new_argument: str = Field("", title="a new argument for custom settings")

class CustomProcessor(Processor):
    def __init__(self):
        settings = CustomSettings("worker", "v0.1.0", "custom processor")
        super().__init__(settings, input_class=BaseModel, output_class=BaseModel)

错误

发生错误时将引发 PipelineError

贡献

使用pre-commit运行black and flake8

学分

张一凡 (yzhang at hbku.edu.qa)

项目详情


发布历史 发布通知| RSS订阅