python中基于流的编程的轻量级框架。
项目描述
覆盖报告
文件 | 结构 | 错过 | 覆盖 |
---|---|---|---|
全部的 | 940 | 0 | 100% |
基于流的编程
python中基于流的编程的轻量级框架。
+-------------------+ +---------------------+
| Invite People | | Birthday Party |
|-------------------| |---------------------|
o amount<4> | +----->o attendees<> |
| people o---+ +--->o cake<> |
+-------------------+ | +---------------------+
|
+-------------------+ |
| Bake a cake | |
+-------------------+ |
o type<"Chocolate"> | |
| cake o-----+
+-------------------+
好处:
- 可视化代码
- 可重用性
- 简化的代码设计
- 内置并发
- 在代码中一对一地表示工作流
快速示例
考虑这个关于如何用 Flowpipe 表示房屋建造的简单示例:
from flowpipe import Graph, INode, Node, InputPlug, OutputPlug
class HireWorkers(INode):
"""A node can be derived from the INode interface.
The plugs are defined in the init method.
The compute method received the inputs from any connected upstream nodes.
"""
def __init__(self, amount=None, **kwargs):
super(HireWorkers, self).__init__(**kwargs)
InputPlug('amount', self, amount)
OutputPlug('workers', self)
def compute(self, amount):
workers = ['John', 'Jane', 'Mike', 'Michelle']
print('{0} workers are hired to build the house.'.format(amount))
return {'workers.{0}'.format(i): workers[i] for i in range(amount)}
@Node(outputs=['workers'])
def Build(workers, section):
"""A node can also be created by the Node decorator.outputs
The inputs to the function are turned into InputsPlugs, otuputs are defined
in the decorator itself. The wrapped function is used as the compute method.
"""
print('{0} are building the {1}'.format(', '.join(workers.values()), section))
return {'workers.{0}'.format(i): worker for i, worker in workers.items()}
@Node()
def Party(attendees):
print('{0} and {1} are having a great party!'.format(
', '.join(list(attendees.values())[:-1]), list(attendees.values())[-1]))
# Create a graph with the necessary nodes
graph = Graph(name='How to build a house')
workers = HireWorkers(graph=graph, amount=4)
build_walls = Build(graph=graph, name='Build Walls', section='walls')
build_roof = Build(graph=graph, name='Build Roof', section='roof')
party = Party(graph=graph, name='Housewarming Party')
# Wire up the connections between the nodes
workers.outputs['workers']['0'].connect(build_walls.inputs['workers']['0'])
workers.outputs['workers']['1'].connect(build_walls.inputs['workers']['1'])
workers.outputs['workers']['2'].connect(build_roof.inputs['workers']['0'])
workers.outputs['workers']['3'].connect(build_roof.inputs['workers']['1'])
build_walls.outputs['workers']['0'] >> party.inputs['attendees']['0']
build_walls.outputs['workers']['1'] >> party.inputs['attendees']['2']
build_roof.outputs['workers']['0'] >> party.inputs['attendees']['1']
build_roof.outputs['workers']['1'] >> party.inputs['attendees']['3']
party.inputs['attendees']['4'].value = 'Homeowner'
将代码可视化为图形或列表:
print(graph.name)
print(graph)
print(graph.list_repr())
输出:
How to build a house
+------------------------+ +------------------------+ +---------------------------+
| HireWorkers | | Build Roof | | Housewarming Party |
|------------------------| |------------------------| |---------------------------|
o amount<4> | o section<"roof"> | % attendees |
| workers % % workers | +--->o attendees.0<> |
| workers.0 o-----+--->o workers.0<> | |--->o attendees.1<> |
| workers.1 o-----|--->o workers.1<> | |--->o attendees.2<> |
| workers.2 o-----| | workers % |--->o attendees.3<> |
| workers.3 o-----| | workers.0 o-----| o attendees.4<"Homeowner> |
+------------------------+ | | workers.1 o-----| +---------------------------+
| +------------------------+ |
| +------------------------+ |
| | Build Walls | |
| |------------------------| |
| o section<"walls"> | |
| % workers | |
+--->o workers.0<> | |
+--->o workers.1<> | |
| workers % |
| workers.0 o-----+
| workers.1 o-----+
+------------------------+
Build a House
HireWorkers
[i] amount: 4
[o] workers
[o] workers.0 >> Build Walls.workers.0
[o] workers.1 >> Build Walls.workers.1
[o] workers.2 >> Build Roof.workers.0
[o] workers.3 >> Build Roof.workers.1
Build Roof
[i] section: "roof"
[i] workers
[i] workers.0 << HireWorkers.workers.2
[i] workers.1 << HireWorkers.workers.3
[o] workers
[o] workers.0 >> Housewarming Party.attendees.1
[o] workers.1 >> Housewarming Party.attendees.3
Build Walls
[i] section: "walls"
[i] workers
[i] workers.0 << HireWorkers.workers.0
[i] workers.1 << HireWorkers.workers.1
[o] workers
[o] workers.0 >> Housewarming Party.attendees.0
[o] workers.1 >> Housewarming Party.attendees.2
Housewarming Party
[i] attendees
[i] attendees.0 << Build Walls.workers.0
[i] attendees.1 << Build Roof.workers.0
[i] attendees.2 << Build Walls.workers.1
[i] attendees.3 << Build Roof.workers.1
[i] attendees.4: "Homeowner"
现在建造房子:
graph.evaluate(mode='threading') # Options are linear, threading and multiprocessing
输出:
4 workers are hired to build the house.
Michelle, Mike are building the roof
Jane, John are building the walls
Mike, John, Michelle, Jane and Homeowner are having a great party!
(注意:有关更详细的评估方案,请参阅评估者)
我们现在知道如何举办派对,所以让我们邀请一些人并在生日时重新使用这些技能:
graph = Graph(name='How to throw a birthday party')
@Node(outputs=['people'])
def InvitePeople(amount):
people = ['John', 'Jane', 'Mike', 'Michelle']
d = {'people.{0}'.format(i): people[i] for i in range(amount)}
d['people'] = {people[i]: people[i] for i in range(amount)}
return d
invite = InvitePeople(graph=graph, amount=4)
birthday_party = Party(graph=graph, name='Birthday Party')
invite.outputs['people'] >> birthday_party.inputs['attendees']
print(graph.name)
print(graph)
graph.evaluate()
输出:
How to throw a birthday party
+-------------------+ +---------------------+
| InvitePeople | | Birthday Party |
|-------------------| |---------------------|
o amount<4> | +--->o attendees<> |
| people o-----+ +---------------------+
+-------------------+
Jane, Michelle, Mike and John are having a great party!
更多示例
流管的常见用例还有更多示例:
这些示例的代码: house_and_birthday.py!
另一个简单的例子: world_clock.py!
如何使用嵌套子图: nested_graphs.py!
成功使用带有 flowpipe 的命令模式: workflow_design_pattern.py!
在远程机器集群上使用流管,在 VFX/动画行业通常称为“渲染农场”: vfx_render_farm_conversion.py!
一个示例图展示了 VFX/动画行业中遇到的常见工作流程: vfx_rendering.py!
视觉特效管道
如果您在 VFX/动画行业工作,请查看有关如何在vfx 管道中使用 flowpipe 的广泛指南!
评价者
如果您的节点只需要顺序、线程或多处理评估,那么该Graph.evaluate()
方法将为您提供很好的服务。如果您想更好地控制图表的评估Evaluators
方式,那么适合您。这也可以用于添加,例如记录或跟踪节点评估。
评估器允许您控制节点评估顺序或它们的调度。请参阅flowpipe/evaluator.py
查看该Graph.evaluate()
方法的评估方案。
使用自定义评估器,子类flowpipe.evaluator.Evaluator
,并至少提供一个_evaluate_nodes(self, nodes)
方法。此方法应获取节点列表并调用它们各自的node.evalaute()
方法(以及您想要为每个正在评估的节点执行的任何其他任务)。要使用自定义求值器,请创建它并Evalator.evaluate()
使用 Graph 调用其方法以求值作为参数:
from flowpipe.evaluators import LinearEvaluator
# assuming you created a graph to evaluate above, called `graph`
lin_eval = LinearEvaluator()
lin_eval.evaluate(graph)