轻量级线程和异步任务库
项目描述
新任务者
通过 asyncio 进行现代线程/多处理池和任务处理的轻量级 Python 库。
Neotasker 是 atasker库的轻量级变体:任务没有优先级,直接进入 ThreadPoolExecutor 并且是标准的 Python 未来对象。该库对于具有轻量级任务的高负载项目很有用,因为大多数任务直接代理到池中。
Neotasker 工作在ThreadPoolExecutor和asyncio 之上,并提供了额外的功能:
- 简单的线程池和异步循环初始化
- 间隔、队列和基于事件的工作人员
- 与aiosched的内置集成
安装
pip3 install neotasker
资料来源:https ://github.com/alttch/neotasker
文档:https ://neotasker.readthedocs.io/
代码示例
启动/停止
from neotasker import task_supervisor
# set pool size
# min_size='max' means pre-spawn all pool threads
task_supervisor.set_thread_pool(min_size='max', max_size=20)
task_supervisor.start()
# ...
# start workers, other threads etc.
# ...
# optionally block current thread
task_supervisor.block()
# stop from any thread
task_supervisor.stop()
执行未来
您可以直接使用neotasker.thread_pool对象或使用 task_supervisor.spawn函数,该函数直接映射到 thread_pool.submit)
from neotasker import thread_pool
thread_pool.start()
def mytask(a, b, c):
print(f'I am working in the background! {a} {b} {c}')
return 777
task = task_supervisor.spawn(mytask, 1, 2, c=3)
# get future result
result = task.result()
创建异步 io 循环
from neotasker import thread_pool
thread_pool.start()
task_supervisor.create_aloop('default', default=True)
# The loop will until supervisor is stopped
# Spawn coroutine from another thread:
task_supervisor.get_aloop().spawn_coroutine_threadsafe(coro)
工人示例
from neotasker import background_worker, task_supervisor
task_supervisor.start()
# we need to create at least one aloop to start workers
task_supervisor.create_aloop('default', default=True)
# create one more async loop
task_supervisor.create_aloop('loop2')
@background_worker
def worker1(**kwargs):
print('I am a simple background worker')
@background_worker
async def worker_async(**kwargs):
print('I am async background worker')
@background_worker(interval=1, loop='loop2')
def worker2(**kwargs):
print('I run every second!')
@background_worker(queue=True)
def worker3(task, **kwargs):
print('I run when there is a task in my queue')
@background_worker(event=True)
def worker4(**kwargs):
print('I run when triggered')
worker1.start()
worker_async.start()
worker2.start()
worker3.start()
worker4.start()
worker3.put_threadsafe('todo1')
worker4.trigger_threadsafe()
from neotasker import BackgroundIntervalWorker
class MyWorker(BackgroundIntervalWorker):
def run(self, **kwargs):
print('I am custom worker class')
worker5 = MyWorker(interval=0.1, name='worker5')
worker5.start()