Skip to main content

Liftbridge 的 Python 客户端。

项目描述

蟒蛇升降桥

派皮 GitHub

该项目正在开发中。

Liftbridge的Python 客户端,这是一个为NATS提供轻量级、容错消息流的系统。

Liftbridge 提供以下高级功能:

  • 用于 NATS 的基于日志的 API
  • 复制容错
  • 水平可扩展
  • 通配符订阅支持
  • 至少一次交付支持和消息重播
  • 消息键值支持
  • 按键压缩日志

安装

$ pip install python-liftbridge

基本用法

from liftclient import Lift, Message, Stream, ErrStreamExists

# Create a Liftbridge client.
client = Lift(ip_address='localhost:9292', timeout=5)

# Create a stream attached to the NATS subject "foo".
try:
    client.create_stream(Stream(subject='foo', name='foo-stream'))
except ErrStreamExists:
    print('This stream already exists!')

# Publish a message to "foo".
client.publish(Message(value='hello', subject='foo'))

# Subscribe to the stream starting from the beginning.
for message in client.subscribe(
    Stream(
        subject='foo',
        name='foo-stream',
    ).start_at_earliest_received(),
):
    print("Received: '{}'".format(message.value))

创建流

是附加到 NATS 主题的持久消息日志。他们记录发布给主题的消息以供消费。

流有几个关键属性:一个主题,它是相应的 NATS 主题,一个名称,它是流的人类可读标识符,以及一个复制因子,它是流应该复制到的节点数以实现冗余. 可选地,有一个组是要加入的流的负载平衡组的名称。当同一组中有多个流时,消息将在它们之间进行平衡。

"""
    Create a stream attached to the NATS subject "foo.*" that is replicated to
    all the brokers in the cluster. ErrStreamExists is returned if a stream with
    the given name already exists for the subject.
"""
client.create_stream(Stream(subject='foo.*', name='my-stream', max_replication=True))

订阅开始/重播选项

订阅是使用 Liftbridge 流的方式。客户端可以选择从哪里开始使用流中的消息。这是使用传递给订阅的选项来控制的。

# Subscribe starting with new messages only.
client.subscribe(
    Stream(subject='foo', name='foo-stream')
)
# Subscribe starting with the most recently published value.
client.subscribe(
    Stream(subject='foo', name='foo-stream').start_at_earliest_received()
)
# Subscribe starting with the oldest published value.
client.subscribe(
    Stream(subject='foo', name='foo-stream').start_at_latest_received()
)
# Subscribe starting at a specific offset.
client.subscribe(
    Stream(subject='foo', name='foo-stream').start_at_offset(4)
)
# Subscribe starting at a specific time.
client.subscribe(
    Stream(subject='foo', name='foo-stream').start_at_time(datetime.now())
)
# Subscribe starting at a specific amount of time in the past.
client.subscribe(
    Stream(subject='foo', name='foo-stream').start_at_time_delta(timedelta(days=1))
)

出版

提供了一个发布 API 以方便将消息写入流。这包括许多使用元数据(如消息键)装饰消息的选项。

Liftbridge 的日志压缩使用密钥。启用后,Liftbridge 流将仅保留给定键的最后一条消息。

# Publish a message with a key
client.publish(Message(subject='foo', value='Hello', key='key'))

使用 NATS 直接发布

由于 Liftbridge 是NATS的扩展,因此也可以使用NATS 客户端发布消息。这意味着现有的 NATS 发布者不需要对在 Liftbridge 中使用的消息进行任何更改。

如何贡献

  1. 检查未解决的问题或打开新问题以围绕功能想法或错误开始讨论。
  2. Fork GitHub 上的存储库以开始对主分支(或从它的分支)进行更改。
  3. 编写一个测试,表明该错误已修复或该功能按预期工作。
  4. 发送一个拉取请求并给我发错误,直到它被合并和发布。

积压的一些事情:

  • 添加文档 (Sphynx)
  • 添加 CI(CircleCI 或 TravisCI)
  • 添加测试
  • 添加代码覆盖率
  • 为 gRPC 添加 TLS 支持
  • 添加消息头支持
  • 添加消息 ACK 支持(脚手架已经完成)
  • 添加关闭连接的方法
  • 添加异步客户端
  • 添加 gRPC 连接池
  • 添加日志记录(并删除所有随机打印)
  • 添加适当的文档字符串
  • 添加版本文件
  • 添加 Contributing.md 和工作流程说明(pyenv、tox、make、pre-commit...)
  • 改进获取元数据
  • 改进错误处理
  • 使用 Docker容器添加到 makefile run-liftbridge
  • 更好的仪器/可观察性(OpenCensus 支持?)

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

liftclient-0.0.6.tar.gz (17.5 kB 查看哈希

已上传 source

内置分布

liftclient-0.0.6-py3-none-any.whl (22.9 kB 查看哈希

已上传 py3