迭代器实用程序类和函数
项目描述
提供包装类 TimeoutIterator 以将超时功能添加到普通迭代器
安装:
pip intall iterators
有关所有功能,请参阅 TimeoutIterator 的帮助。检查测试以获取有关如何使用 TimeoutIterator 的示例。有关基本用法,请参见下面的示例测试
例子:
-
TimeoutIterator 像普通迭代器一样工作:
from iterators import TimeoutIterator def iter_simple(): yield 1 yield 2 def test_normal_iteration(self): i = iter_simple() it = TimeoutIterator(i) self.assertEqual(next(it), 1) self.assertEqual(next(it), 2) self.assertRaises(StopIteration, next, it) self.assertRaises(StopIteration, next, it)
-
当需要超时时,像这样使用
def iter_with_sleep(): yield 1 time.sleep(0.6) yield 2 time.sleep(0.4) yield 3 def test_fixed_timeout(self): i = iter_with_sleep() it = TimeoutIterator(i, timeout=0.5) self.assertEqual(next(it), 1) self.assertEqual(next(it), it.get_sentinel()) self.assertEqual(next(it), 2) self.assertEqual(next(it), 3) self.assertRaises(StopIteration, next, it)
-
动态超时调整
def iter_with_sleep(): yield 1 time.sleep(0.6) yield 2 time.sleep(0.4) yield 3 def test_timeout_update(self): i = iter_with_sleep() it = TimeoutIterator(i, timeout=0.5) self.assertEqual(next(it), 1) self.assertEqual(next(it), it.get_sentinel()) it.set_timeout(0.3) self.assertEqual(next(it), 2) self.assertEqual(next(it), it.get_sentinel()) self.assertEqual(next(it), 3) self.assertRaises(StopIteration, next, it)
在本地运行单元测试:
python -m unittest discover tests
项目详情
下载文件
下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。
源分布
iterators-0.0.2.tar.gz
(3.0 kB
查看哈希)
内置分布
iterators-0.0.2-py3-none-any.whl
(3.9 kB
查看哈希)
关
iterators- 0.0.2 -py3-none-any.whl 的哈希值
算法 | 哈希摘要 | |
---|---|---|
SHA256 | ac2a9d8af1dd9eed051ccab4a1905a1343d66bbc9f451567d94f6e2744f30fce |
|
MD5 | a6a3a4ee7a7a66612d98881d18ef4609 |
|
布莱克2-256 | ac61582052d25f2bdfbc169655392363a451ca6cf3492d204bbf0079be91bf1c |