Python 本地优先的无膨胀任务编排框架
项目描述
woflo
概述
woflo是一个 Python 本地优先的无膨胀可扩展任务编排框架
主要目标是抽象出许多与任务编排和执行相关的功能。
目前这包括:
- 重试
- 重试超时
- 并行性
- 日志记录
例子
预期用途是使用装饰器@task,考虑一个非常简单的示例,它将并行运行 10 个昏昏欲睡的工作人员而不会阻塞主线程:
import time
from woflo import task
@task
def sleepy_worker():
time.sleep(5)
print('I am done')
for _ in range(10):
sleepy_task_run = sleepy_worker()
您还可以对有时可能会失败的任务进行重试。以下将尝试运行修饰函数总共 3 次,尝试之间有 5 秒的延迟。
from woflo import task
@task(retries=2, retry_sleep_time=5)
def fetch_data_from_unstable_api():
...
此外,您还可以在@task装饰器中提供运行器。例如,SequentialTaskRun如果喜欢您的任务按顺序运行并且喜欢在计算机周围等待很多。例如:
from woflo import task
from woflo.runners import SequentialTaskRun
@task(runner=SequentialTaskRun)
def sequential_sleepy_worker():
time.sleep(5)
print('I am done')
for _ in range(10):
sleepy_task_run = sequential_sleepy_worker()
每个TaskRun还应该公开一些使您能够处理它的方法:
.get_result()获取已完成任务的返回值.wait()阻塞主线程直到任务完成(无关的SequentialTaskRun将阻塞直到它完成).stop()在任务运行时停止任务(与SequentialTaskRun哪个任务无关,直到它完成为止).is_running()检查任务是否仍在运行(与SequentialTaskRun哪个任务无关,直到它完成为止)
让我们定义一个示例任务:
import time
from woflo import task
@task
def quick_nap(duration):
time.sleep(duration)
if duration < 10
raise Exception("Ouch oof")
else:
return 'Well rested'
运行后,
napping = quick_nap(10)
您可以检查它以监控它的状态并接收结果,
assert napping.is_running()
napping.wait()
assert napping.get_result() == "Well rested"
可扩展性
它旨在通过开发自定义运行器轻松扩展Task。库本身目前公开了两个这样的运行器,MultiprocessTaskRun并且SequentialTaskRun.
此外,还woflo提供了一个 BaseTaskRun, 一个可以开发自定义运行器的接口。
默认任务运行器是MultiprocessTaskRun,它可以在单独的 Python 进程中并行运行多个任务,甚至是同一任务的多个实例。
路线图
-
设置 GitHub Actions、SonarCloud 监控和 Codecov -
制作一个 PyPI 包 - 决定最终 API 并创建 1.xx 版本
- 实现一个 Dask 运行器
项目详情
下载文件
下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。
源分布
woflo-0.8.0.tar.gz
(8.0 kB
查看哈希)
内置分布
woflo-0.8.0-py3-none-any.whl
(7.3 kB
查看哈希)