Skip to main content

轻量级线程和异步任务库

项目描述

新任务者

通过 asyncio 进行现代线程/多处理池和任务处理的轻量级 Python 库。

Neotasker 是 atasker库的轻量级变体:任务没有优先级,直接进入 ThreadPoolExecutor 并且是标准的 Python 未来对象。该库对于具有轻量级任务的高负载项目很有用,因为大多数任务直接代理到池中。

Neotasker 工作在ThreadPoolExecutorasyncio 之上,并提供了额外的功能:

  • 简单的线程池和异步循环初始化
  • 间隔、队列和基于事件的工作人员
  • 与aiosched的内置集成

安装

pip3 install neotasker

资料来源:https ://github.com/alttch/neotasker

文档:https ://neotasker.readthedocs.io/

代码示例

启动/停止

from neotasker import task_supervisor

# set pool size
# min_size='max' means pre-spawn all pool threads
task_supervisor.set_thread_pool(min_size='max', max_size=20)
task_supervisor.start()
# ...
# start workers, other threads etc.
# ...
# optionally block current thread
task_supervisor.block()

# stop from any thread
task_supervisor.stop()

执行未来

您可以直接使用neotasker.thread_pool对象或使用 task_supervisor.spawn函数,该函数直接映射到 thread_pool.submit

from neotasker import thread_pool

thread_pool.start()

def mytask(a, b, c):
    print(f'I am working in the background! {a} {b} {c}')
    return 777

task = task_supervisor.spawn(mytask, 1, 2, c=3)

# get future result
result = task.result()

创建异步 io 循环

from neotasker import thread_pool

thread_pool.start()
task_supervisor.create_aloop('default', default=True)

# The loop will until supervisor is stopped
# Spawn coroutine from another thread:

task_supervisor.get_aloop().spawn_coroutine_threadsafe(coro)

工人示例

from neotasker import background_worker, task_supervisor

task_supervisor.start()
# we need to create at least one aloop to start workers
task_supervisor.create_aloop('default', default=True)
# create one more async loop
task_supervisor.create_aloop('loop2')

@background_worker
def worker1(**kwargs):
    print('I am a simple background worker')

@background_worker
async def worker_async(**kwargs):
    print('I am async background worker')

@background_worker(interval=1, loop='loop2')
def worker2(**kwargs):
    print('I run every second!')

@background_worker(queue=True)
def worker3(task, **kwargs):
    print('I run when there is a task in my queue')

@background_worker(event=True)
def worker4(**kwargs):
    print('I run when triggered')

worker1.start()
worker_async.start()
worker2.start()
worker3.start()
worker4.start()

worker3.put_threadsafe('todo1')
worker4.trigger_threadsafe()

from neotasker import BackgroundIntervalWorker

class MyWorker(BackgroundIntervalWorker):

    def run(self, **kwargs):
        print('I am custom worker class')

worker5 = MyWorker(interval=0.1, name='worker5')
worker5.start()

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

neotasker-0.0.41.tar.gz (11.9 kB 查看哈希

已上传 source