Liftbridge 的 Python 客户端。
项目描述
蟒蛇升降桥
该项目正在开发中。
Liftbridge的Python 客户端,这是一个为NATS提供轻量级、容错消息流的系统。
Liftbridge 提供以下高级功能:
- 用于 NATS 的基于日志的 API
- 复制容错
- 水平可扩展
- 通配符订阅支持
- 至少一次交付支持和消息重播
- 消息键值支持
- 按键压缩日志
安装
$ pip install python-liftbridge
基本用法
from liftclient import Lift, Message, Stream, ErrStreamExists
# Create a Liftbridge client.
client = Lift(ip_address='localhost:9292', timeout=5)
# Create a stream attached to the NATS subject "foo".
try:
client.create_stream(Stream(subject='foo', name='foo-stream'))
except ErrStreamExists:
print('This stream already exists!')
# Publish a message to "foo".
client.publish(Message(value='hello', subject='foo'))
# Subscribe to the stream starting from the beginning.
for message in client.subscribe(
Stream(
subject='foo',
name='foo-stream',
).start_at_earliest_received(),
):
print("Received: '{}'".format(message.value))
创建流
流是附加到 NATS 主题的持久消息日志。他们记录发布给主题的消息以供消费。
流有几个关键属性:一个主题,它是相应的 NATS 主题,一个名称,它是流的人类可读标识符,以及一个复制因子,它是流应该复制到的节点数以实现冗余. 可选地,有一个组是要加入的流的负载平衡组的名称。当同一组中有多个流时,消息将在它们之间进行平衡。
"""
Create a stream attached to the NATS subject "foo.*" that is replicated to
all the brokers in the cluster. ErrStreamExists is returned if a stream with
the given name already exists for the subject.
"""
client.create_stream(Stream(subject='foo.*', name='my-stream', max_replication=True))
订阅开始/重播选项
订阅是使用 Liftbridge 流的方式。客户端可以选择从哪里开始使用流中的消息。这是使用传递给订阅的选项来控制的。
# Subscribe starting with new messages only.
client.subscribe(
Stream(subject='foo', name='foo-stream')
)
# Subscribe starting with the most recently published value.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_earliest_received()
)
# Subscribe starting with the oldest published value.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_latest_received()
)
# Subscribe starting at a specific offset.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_offset(4)
)
# Subscribe starting at a specific time.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_time(datetime.now())
)
# Subscribe starting at a specific amount of time in the past.
client.subscribe(
Stream(subject='foo', name='foo-stream').start_at_time_delta(timedelta(days=1))
)
出版
提供了一个发布 API 以方便将消息写入流。这包括许多使用元数据(如消息键)装饰消息的选项。
Liftbridge 的日志压缩使用密钥。启用后,Liftbridge 流将仅保留给定键的最后一条消息。
# Publish a message with a key
client.publish(Message(subject='foo', value='Hello', key='key'))
使用 NATS 直接发布
由于 Liftbridge 是NATS的扩展,因此也可以使用NATS 客户端发布消息。这意味着现有的 NATS 发布者不需要对在 Liftbridge 中使用的消息进行任何更改。
如何贡献
- 检查未解决的问题或打开新问题以围绕功能想法或错误开始讨论。
- Fork GitHub 上的存储库以开始对主分支(或从它的分支)进行更改。
- 编写一个测试,表明该错误已修复或该功能按预期工作。
- 发送一个拉取请求并给我发错误,直到它被合并和发布。
积压的一些事情:
- 添加文档 (Sphynx)
- 添加 CI(CircleCI 或 TravisCI)
- 添加测试
- 添加代码覆盖率
- 为 gRPC 添加 TLS 支持
- 添加消息头支持
- 添加消息 ACK 支持(脚手架已经完成)
- 添加关闭连接的方法
- 添加异步客户端
- 添加 gRPC 连接池
- 添加日志记录(并删除所有随机打印)
- 添加适当的文档字符串
- 添加版本文件
- 添加 Contributing.md 和工作流程说明(pyenv、tox、make、pre-commit...)
- 改进获取元数据
- 改进错误处理
- 使用 Docker容器添加到 makefile run-liftbridge
- 更好的仪器/可观察性(OpenCensus 支持?)
项目详情
下载文件
下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。
源分布
liftclient-0.0.6.tar.gz
(17.5 kB
查看哈希)
内置分布
liftclient-0.0.6-py3-none-any.whl
(22.9 kB
查看哈希)
关
Liftclient -0.0.6.tar.gz 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | f0eddc939499605d4221d049618eb23c07eee4e364ef92d56ad97eff60541c12 |
|
MD5 | 7755cbff4ca09fe7c16f73baf84435ab |
|
布莱克2-256 | 36e432f75eb0fd872c316a936b0bb8fd256c503acf8e63da552c44bcc253f51d |
关
Liftclient -0.0.6-py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | ac33eeda377c7c9423f5e5b945f894834d3375380786a82cabb5c7e4ae90af50 |
|
MD5 | 64e433347f05cacf213a0b558b1ba832 |
|
布莱克2-256 | ef799f2dbcee16862e8a3c767896e814f0fd0cc8761f8edd0f44317ce6fa9809 |