Skip to main content

当值可用时从多个迭代器中产生

项目描述

项目状态:活跃——项目已达到稳定、可用状态,正在积极开发中。 CI 状态 https://codecov.io/gh/jwodder/interleave/branch/master/graph/badge.svg https://img.shields.io/pypi/pyversions/interleave.svg 康达版 麻省理工学院许可证

GitHub | 派皮 | 问题 | 变更日志

interleave包提供了一个同名的函数,它接受多个迭代器,在不同的线程中运行它们,并在每个迭代器可用时立即生成生成的值。

安装

interleave需要 Python 3.7 或更高版本。只需使用Python 3 的pip(你有 pip,对吗?)来安装 interleave及其依赖项:

python3 -m pip install interleave

例子

>>> from time import sleep, strftime
>>> from interleave import interleave
>>>
>>> def sleeper(idno, delays):
...     for i, d in enumerate(delays):
...         sleep(d)
...         yield (idno, i)
...
>>> with interleave(
...     [
...         sleeper(0, [0, 1, 2]),
...         sleeper(1, [2, 2, 2]),
...         sleeper(2, [5, 2, 1]),
...     ]
... ) as it:
...     for x in it:
...         print(strftime("%H:%M:%S"), x)
...
22:08:39 (0, 0)
22:08:40 (0, 1)
22:08:41 (1, 0)
22:08:42 (0, 2)
22:08:43 (1, 1)
22:08:44 (2, 0)
22:08:45 (1, 2)
22:08:46 (2, 1)
22:08:47 (2, 2)

API

interleave.interleave(
    iterators: Iterable[Iterator[T]],
    *,
    max_workers: Optional[int] = None,
    thread_name_prefix: str = "",
    queue_size: Optional[int] = None,
    onerror: interleave.OnError = interleave.STOP,
) -> interleave.Interleaver[T]

interleave()在单独的线程中运行给定的迭代器,并返回一个迭代器,该迭代器在它们可用时产生它们产生的值。(有关Interleaver类的详细信息,请参见下文。)

max_workers和thread_name_prefix参数被传递给底层的concurrent.futures.ThreadPoolExecutor ( qv)。 max_workers确定一次运行的最大迭代器数。

queue_size参数设置内部使用的队列的最大大小,以管道由迭代器产生的值;当队列已满时,任何具有要 yield 的值的迭代器都将阻塞等待通过调用交织器的__next__将下一个值出列。当queue_sizeNone(默认值)时,interleave()使用queue.SimpleQueue,它没有最大大小。当queue_size为非None(包括零,表示没有最大大小)时,interleave()使用queue.Queue,其get()方法是不可中断的(包括通过键盘中断)在 Windows 上。

onerror参数是一个枚举,确定 如果其中一个迭代器引发异常,interleave()应该如何表现。可能的值是:

停止

(默认)停止迭代所有迭代器,取消任何尚未启动的未完成迭代器,等待所有正在运行的线程完成,然后重新引发异常。请注意,由于无法在产生之间停止迭代器,“等待”步骤涉及等待每个当前运行的迭代器在停止之前产生其下一个值。如果队列在中间填满,这可能会死锁。

流走

STOP类似,但迭代器在完成之前产生的任何剩余值都由交织器在引发异常之前产生

完成_全部

继续正常运行并在所有迭代器完成后重新引发异常

FINISH_CURRENT

FINISH_ALL一样,除了只有当前运行的迭代器会运行到完成;任何在引发异常时线程尚未启动的迭代器都将取消其工作

无论onerror的值如何,迭代器在初始异常之后引发的任何后续异常都将被丢弃。

class Interleaver(Generic[T]):
    def __init__(
        self,
        max_workers: Optional[int] = None,
        thread_name_prefix: str = "",
        queue_size: Optional[int] = None,
        onerror: OnError = STOP,
    )

迭代器和上下文管理器。作为一个迭代器,它产生由传递给相应interleave()调用的迭代器生成的值,因为它们变得可用。作为上下文管理器,它在进入时返回自身,并在退出时通过调用 shutdown(wait=True)方法清除所有未完成的线程(见下文)。

可以通过调用interleave()或直接调用构造函数来实例化Interleaver 。构造函数采用与interleave()相同的参数 ,减去迭代器,并生成一个尚未运行任何迭代器的新Interleaver 。迭代器通过submit()方法提交给新的 Interleaver;一旦提交了所有需要的迭代器,就必须调用finalize()方法,以便 Interleaver可以判断一切何时完成。

Interleaver将关闭其ThreadPoolExecutor并在产生其最终值后等待线程完成(特别是,当调用__next__ / get()时会导致StopIteration或另一个异常被引发)。如果在迭代完成之前放弃了Interleaver,则可能无法正确清理相关资源,并且线程可能会无限期地继续运行。出于这个原因,强烈建议您将任何迭代包装在上下文管理器中的 Interleaver上,以处理迭代的过早结束(包括来自KeyboardInterrupt)。

除了迭代器和上下文管理器 API,Interleaver还具有以下公共方法:

Interleaver.submit(it: Iterator[T]) -> None

0.2.0 版中的新功能

向Interleaver添加一个迭代器。

如果Interleaver是从interleave()返回的,或者已经调用了 finalize(),则调用submit()将导致 ValueError

Interleave.finalize() -> None

0.2.0 版中的新功能

通知Interleaver所有迭代器都已注册。必须调用此方法才能让Interleaver检测到迭代的结束;如果尚未调用此方法并且所有提交的迭代器都已完成并检索了它们的值,那么对next(it)的后续调用 将无限期地挂起。

Interleaver.get(block: bool = True, timeout: Optional[float] = None) -> T

0.2.0 版中的新功能

获取迭代器生成的下一个值。如果所有迭代器都已完成并且所有值都已检索到,则引发 interleaver.EndOfInputError。如果blockFalse并且没有立即可用的值,则引发queue.Empty。如果blockTrue,则等待timeout秒(或无限期,如果timeoutNone)等待下一个值变为可用或所有迭代器结束;如果在超时到期之前没有任何反应,则引发queue.Empty

it.get(block=True, timeout=None)等价于next(it),除了后者将EndOfInputError转换为StopIteration

注意:onerror=STOP并设置了超时时,如果迭代器引发异常,则可能会超过超时,因为Interleaver会等待所有剩余线程关闭。

Interleaver.shutdown(wait: bool = True) -> None

如果尚未调用finalize() ,则调用它,告诉所有正在运行的迭代器停止迭代,取消任何尚未启动的未完成迭代器,并关闭ThreadPoolExecutor等待参数被传递给对ThreadPoolExecutor.shutdown()的调用。

Interleaver可以在调用shutdown()后继续迭代 ,并将产生迭代器在完全停止之前产生的任何剩余值。

下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

interleave-0.2.1.tar.gz (16.1 kB 查看哈希)

已上传 source

内置分布

interleave-0.2.1-py3-none-any.whl (11.3 kB 查看哈希

已上传 py3