Skip to main content

Python 本地优先的无膨胀任务编排框架

项目描述

woflo

CI MyPy 皮棉 可维护性评级

概述

woflo是一个 Python 本地优先的无膨胀可扩展任务编排框架

主要目标是抽象出许多与任务编排和执行相关的功能。

目前这包括:

  • 重试
  • 重试超时
  • 并行性
  • 日志记录

例子

预期用途是使用装饰器@task,考虑一个非常简单的示例,它将并行运行 10 个昏昏欲睡的工作人员而不会阻塞主线程:

import time
from woflo import task


@task
def sleepy_worker():
    time.sleep(5)
    print('I am done')


for _ in range(10):
    sleepy_task_run = sleepy_worker()

您还可以对有时可能会失败的任务进行重试。以下将尝试运行修饰函数总共 3 次,尝试之间有 5 秒的延迟。

from woflo import task


@task(retries=2, retry_sleep_time=5)
def fetch_data_from_unstable_api():
    ...

此外,您还可以在@task装饰器中提供运行器。例如,SequentialTaskRun如果喜欢您的任务按顺序运行并且喜欢在计算机周围等待很多。例如:

from woflo import task
from woflo.runners import SequentialTaskRun


@task(runner=SequentialTaskRun)
def sequential_sleepy_worker():
    time.sleep(5)
    print('I am done')


for _ in range(10):
    sleepy_task_run = sequential_sleepy_worker()

每个TaskRun还应该公开一些使您能够处理它的方法:

  • .get_result()获取已完成任务的返回值
  • .wait()阻塞主线程直到任务完成(无关的SequentialTaskRun将阻塞直到它完成)
  • .stop()在任务运行时停止任务(与SequentialTaskRun哪个任务无关,直到它完成为止)
  • .is_running()检查任务是否仍在运行(与SequentialTaskRun哪个任务无关,直到它完成为止)

让我们定义一个示例任务:

import time
from woflo import task

@task
def quick_nap(duration):
    time.sleep(duration)
    if duration < 10
        raise Exception("Ouch oof")
    else:
        return 'Well rested'

运行后,

napping = quick_nap(10)

您可以检查它以监控它的状态并接收结果,

assert napping.is_running()

napping.wait()
assert napping.get_result() == "Well rested"

可扩展性

它旨在通过开发自定义运行器轻松扩展Task。库本身目前公开了两个这样的运行器,MultiprocessTaskRun并且SequentialTaskRun.

此外,还woflo提供了一个 BaseTaskRun, 一个可以开发自定义运行器的接口。

默认任务运行器是MultiprocessTaskRun,它可以在单独的 Python 进程中并行运行多个任务,甚至是同一任务的多个实例。

路线图

  • 设置 GitHub Actions、SonarCloud 监控和 Codecov
  • 制作一个 PyPI 包
  • 决定最终 API 并创建 1.xx 版本
  • 实现一个 Dask 运行器

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

woflo-0.8.0.tar.gz (8.0 kB 查看哈希)

已上传 source

内置分布

woflo-0.8.0-py3-none-any.whl (7.3 kB 查看哈希

已上传 py3