Skip to main content

一个易于多处理的 Python 包,但比多处理更快

项目描述

构建状态 文档状态 皮皮状态 Python 版本

MPIRE是 MultiProcessing IsReally Easy 的缩写,是一个用于多处理的 Python 包,但比默认的多处理包更快、更友好。它结合了multiprocessing.Pool的方便的 map 功能和 使用multiprocessing.Process的写时复制共享对象的好处,以及易于使用的工作状态、工作洞察力和进度条功能。

完整的文档可在https://slimmer-ai.github.io/mpire/获得。

特征

  • 比其他多处理库更快的执行。请参阅基准

  • 直观的 Pythonic 语法

  • 使用map / map_unordered / imap / imap_unordered函数进行多处理

  • 使用工作者池轻松使用写时复制共享对象(写时复制仅适用于 start 方法 fork

  • 每个工作人员可以有自己的状态,并且通过方便的工作人员初始化和退出功能,可以轻松地操作此状态(例如,为每个工作人员仅加载一次内存密集型模型,而无需通过队列发送它)

  • 使用tqdm支持进度条

  • 进度仪表板支持

  • 工作人员洞察力可让您深入了解您的多处理效率

  • 优雅和用户友好的异常处理

  • 超时,包括 worker 初始化和退出函数

  • 所有可用映射函数的自动任务分块,以加快小型任务队列(包括 numpy 数组)的处理

  • 可调整的最大活动任务数以避免内存问题

  • 在指定数量的任务后自动重新启动工作程序以减少内存占用

  • 设置守护程序选项时允许嵌套工作人员池

  • 子进程可以固定到特定或一系列 CPU

  • 可选择通过multiprocess使用dill作为序列化后端,从而在 iPython 和 Jupyter 笔记本中并行化更多奇特的对象、lambda 和函数。

MPIRE 已经在 Linux 和 Windows 上进行了测试。Windows 用户有一些已知的小警告,可在此处找到。

安装

通过点子(PyPi):

pip install mpire

MPIRE 也可通过 conda-forge 获得:

conda install -c conda-forge mpire

入门

假设您有一个耗时的函数,它接收一些输入并返回其结果。像这样的简单函数被称为令人尴尬的并行问题,这些函数几乎不需要努力就可以变成并行任务。并行化一个简单的函数就像导入multiprocessing和使用 multiprocessing.Pool类一样简单:

import time
from multiprocessing import Pool

def time_consuming_function(x):
    time.sleep(1)  # Simulate that this function takes long to complete
    return ...

with Pool(processes=5) as pool:
    results = pool.map(time_consuming_function, range(10))

MPIRE 几乎可以用作multiprocessing的替代品。我们使用mpire.WorkerPool类并调用可用的地图函数之一:

from mpire import WorkerPool

with WorkerPool(n_jobs=5) as pool:
    results = pool.map(time_consuming_function, range(10))

代码差异很小:如果您习惯于 vanilla multiprocessing,则无需学习全新的多处理语法。然而,额外的可用功能使 MPIRE 与众不同。

进度条

假设我们想知道当前任务的状态:完成了多少任务,还有多久工作准备好?就像将progress_bar参数设置为True一样简单:

with WorkerPool(n_jobs=5) as pool:
    results = pool.map(time_consuming_function, range(10), progress_bar=True)

它会输出一个格式很好的tqdm进度条。如果您在笔记本中运行代码,它将自动切换到小部件。

MPIRE 还提供了一个仪表板,您需要为其安装额外的依赖项。有关更多信息,请参阅仪表板

共享对象

注意:写时复制共享对象仅适用于 start 方法fork。对于线程,对象按原样共享。对于其他启动方法,共享对象为每个工作人员复制一次。

如果您想要在所有工作人员之间共享一个或多个对象,您可以使用 MPIRE 的写时复制 shared_objects选项。MPIRE 将只为每个工作人员传递这些对象一次,而无需复制/序列化。仅当您更改工作者函数中的对象时,它才会开始为该工作者复制它。

def time_consuming_function(some_object, x):
    time.sleep(1)  # Simulate that this function takes long to complete
    return ...

def main():
    some_object = ...
    with WorkerPool(n_jobs=5, shared_objects=some_object) as pool:
        results = pool.map(time_consuming_function, range(10), progress_bar=True)

有关详细信息,请参阅shared_objects

工作器初始化

可以使用worker_init功能初始化工作程序。与worker_state一起,您可以加载模型,或建立数据库连接等:

def init(worker_state):
    # Load a big dataset or model and store it in a worker specific worker_state
    worker_state['dataset'] = ...
    worker_state['model'] = ...

def task(worker_state, idx):
    # Let the model predict a specific instance of the dataset
    return worker_state['model'].predict(worker_state['dataset'][idx])

with WorkerPool(n_jobs=5, use_worker_state=True) as pool:
    results = pool.map(task, range(10), worker_init=init)

同样,您可以使用worker_exit功能让 MPIRE 在 worker 终止时调用函数。你甚至可以让这个退出函数返回结果,这些结果可以在以后获得。有关更多信息,请参阅worker_init 和 worker_exit 部分。

员工洞察

当您的多处理设置没有按照您的意愿执行并且您不知道是什么原因导致它时,可以使用工作人员洞察功能。这将使您深入了解您的设置,但它不会分析您正在运行的功能(还有其他库)。相反,它会描述工作人员的启动时间、等待时间和工作时间。当提供worker init 和exit 函数时,它也会为这些函数计时。

也许您正在通过任务队列发送大量数据,这会增加等待时间。无论如何,您可以分别使用enable_insights标志和mpire.WorkerPool.get_insights函数启用和获取洞察:

with WorkerPool(n_jobs=5) as pool:
    results = pool.map(time_consuming_function, range(10), enable_insights=True)
    insights = pool.get_insights()

有关更详细的示例和预期输出,请参阅工作人员见解。

超时

可以为 target、 worker_initworker_exit函数单独设置超时。当设置并达到超时时,它将抛出TimeoutError

# Will raise TimeoutError, provided that the target function takes longer
# than half a second to complete
with WorkerPool(n_jobs=5) as pool:
    pool.map(time_consuming_function, range(10), task_timeout=0.5)

# Will raise TimeoutError, provided that the worker_init function takes longer
# than 3 seconds to complete or the worker_exit function takes longer than
# 150.5 seconds to complete
with WorkerPool(n_jobs=5) as pool:
    pool.map(time_consuming_function, range(10), worker_init=init, worker_exit=exit_,
             worker_init_timeout=3.0, worker_exit_timeout=150.5)

当使用线程作为启动方法时,MPIRE 将无法中断某些功能,例如time.sleep

有关更多详细信息,请参阅超时

基准

MPIRE 已经在三个不同的基准上进行了基准测试:数值计算、有状态计算和昂贵的初始化。有关这些基准的更多详细信息,请参阅此博客文章。这些基准测试的所有代码都可以在这个项目中找到。

简而言之,MPIRE 速度更快的主要原因是:

  • fork可用时,我们可以使用写时复制共享对象,这减少了复制需要在子进程中共享的对象的需要

  • 工人可以保持多个任务的状态。因此,您可以选择加载一个大文件或每个工作人员仅发送一次资源

  • 自动任务分块

下图显示了所有三个基准的平均归一化结果。可以在博客文章中找到各个基准测试的结果。基准测试在具有 20 个内核、禁用超线程和 200GB 内存的 Linux 机器上运行。对于每项任务,使用不同数量的进程/工作人员运行实验,结果平均运行 5 次。

平均归一化基准结果

文档

有关MPIRE 的所有其他功能的信息,请参阅https://slimmer-ai.github.io/mpire/上的完整文档。

如果您想自己构建文档,请执行以下命令安装文档依赖项:

pip install mpire[docs]

或者

pip install .[docs]

然后可以使用 Python <= 3.9 构建文档并执行:

python setup.py build_docs

文档也可以直接从docs文件夹构建。在这种情况下,应该在您当前的工作环境中安装并使用MPIRE 。然后执行:

make html

docs文件夹中。

下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

mpire-2.6.0.tar.gz (283.5 kB 查看哈希

已上传 source

内置分布

mpire-2.6.0-py3-none-any.whl (291.4 kB 查看哈希

已上传 py3