Skip to main content

迭代器实用程序类和函数

项目描述

提供包装类 TimeoutIterator 以将超时功能添加到普通迭代器

安装:

pip intall iterators

有关所有功能,请参阅 TimeoutIterator 的帮助。检查测试以获取有关如何使用 TimeoutIterator 的示例。有关基本用法,请参见下面的示例测试

例子:

  1. TimeoutIterator 像普通迭代器一样工作:

    from iterators import TimeoutIterator
    
    def iter_simple():
        yield 1
        yield 2
    
    def test_normal_iteration(self):
        i = iter_simple()
        it = TimeoutIterator(i)
    
        self.assertEqual(next(it), 1)
        self.assertEqual(next(it), 2)
    
        self.assertRaises(StopIteration, next, it)
        self.assertRaises(StopIteration, next, it)
    
  2. 当需要超时时,像这样使用

    def iter_with_sleep():
        yield 1
        time.sleep(0.6)
        yield 2
        time.sleep(0.4)
        yield 3
    
    def test_fixed_timeout(self):
        i = iter_with_sleep()
        it = TimeoutIterator(i, timeout=0.5)
        self.assertEqual(next(it), 1)
        self.assertEqual(next(it), it.get_sentinel())
    
        self.assertEqual(next(it), 2)
        self.assertEqual(next(it), 3)
        self.assertRaises(StopIteration, next, it)
    
  3. 动态超时调整

    def iter_with_sleep():
        yield 1
        time.sleep(0.6)
        yield 2
        time.sleep(0.4)
        yield 3
    
    def test_timeout_update(self):
        i = iter_with_sleep()
        it = TimeoutIterator(i, timeout=0.5)
        self.assertEqual(next(it), 1)
        self.assertEqual(next(it), it.get_sentinel())
    
        it.set_timeout(0.3)
        self.assertEqual(next(it), 2)
        self.assertEqual(next(it), it.get_sentinel())
    
        self.assertEqual(next(it), 3)
        self.assertRaises(StopIteration, next, it)
    

在本地运行单元测试:

python -m unittest discover tests

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

源分布

iterators-0.0.2.tar.gz (3.0 kB 查看哈希)

已上传 source

内置分布

iterators-0.0.2-py3-none-any.whl (3.9 kB 查看哈希)

已上传 py3