Skip to main content

增强 Python 的 concurrent.futures 以提高内存效率的简单包

项目描述

未来地图

concurrent.futuresfuture-map 是一个 Python 库,可与官方模块一起使用。

为什么是未来地图?

concurrent.future.ThreadPoolExecutor因为用and处理无限或巨大的输入是很困难的concurrent.future.ProcessPoolExecutor。请参阅以下示例。

from concurrent.futures import ThreadPoolExecutor

def make_input(length):
    return range(length)

def make_infinite_input():
    count = 0
    while True:
        yield count
        count += 1

def process(value):
    return value * 2

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=3) as executor:
        # Works well
        for value in executor.map(process, make_input(10)):
            print('Doubled value:', value)

        # This freezes the process and memory usage keeps growing
        for value in executor.map(process, make_infinite_input()):
            print('Doubled value:', value)

安装

使用包管理器pip安装future-map.

$ pip install future-map

用法

该库提供FutureMap. 请参阅以下示例。

from future_map import FutureMap
from concurrent.futures import ThreadPoolExecutor

def make_infinite_input():
    count = 0
    while True:
        yield count
        count += 1

def process(value):
    return value * 2

if __name__ == '__main__':
    with ThreadPoolExecutor(max_workers=3) as executor:
        fm = FutureMap(
            lambda value: executor.submit(process, value),
            make_infinite_input(), buffersize=5
        )
        for value in fm:
            print('Doubled value:', value)

对于更复杂的用例:

import time
from concurrent.futures import ThreadPoolExecutor

from future_map import FutureMap

class APIClient:
    def __init__(self, max_connections):
        self.__max_connections = max_connections
        self.__executor = None

    def __enter__(self):
        self.__executor = ThreadPoolExecutor(max_workers=self.__max_connections)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.__executor.shutdown()
        self.__executor = None

    def call(self, url):
        time.sleep(1)
        return "Response from {}".format(url)

    def call_async(self, url):
        if self.__executor is None:
            raise Exception("call_async needs to be called in the runtime context with this APIClient")
        return self.__executor.submit(self.call, url)


def make_urls():
    for i in range(100):
        yield "https://example.com/api/resources/{}".format(i)

if __name__ == '__main__':
    with APIClient(max_connections=3) as api_client:
        for response in FutureMap(api_client.call_async, make_urls(), buffersize=5):
            print(response)

API

FutureMap(fn, iterable, buffersize)

的构造函数FutureMap

FutureMap是一个可迭代对象,它将可迭代对象(iterable参数)映射到函数(fn参数),等待每个未来对象完成,并产生每个结果。

请注意,此对象将产生无序的结果。

  • 论据
    • fn:可调用对象,它从可迭代对象中获取参数,并返回一个concurrent.futures.Future.
    • iterable: 可迭代对象。
    • buffersize:内部缓冲区的最大大小。每个concurrent.futures.Future对象都存储在缓冲区中,直到完成。如果缓冲区已满,则FutureMap停止从 读取值iterable
  • 返回
    • FutureMap实例

future_map(fn, iterable, buffersize)

的别名FutureMap。如果您喜欢与该函数类似的语法,则可以使用该map函数。

更多详情,请参阅FutureMap(fn, iterable, buffersize)

贡献

欢迎请求请求。对于重大更改,请先打开一个问题讨论您想要更改的内容。

请确保根据需要更新测试。

执照

麻省理工学院

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

future-map-0.1.2.tar.gz (4.3 kB 查看哈希

已上传 source

内置分布

future_map-0.1.2-py3-none-any.whl (4.0 kB 查看哈希

已上传 py3