pyflow
函数功能
支持将函数作为pipeline任务在本地或者远端运行。为此,需使用@pyflow装饰函数,以表达需要使用pipeline方式运行此函数。当用户的类或者函数被@pyflow装饰后,会自动添加fnode的构图方法,使用方式参考调用示例。
函数原型
1 | 装饰器@pyflow |
参数说明
参数名称 |
数据类型 |
取值说明 |
||
---|---|---|---|---|
num_returns |
int |
装饰器装饰函数时,用于表示函数的输出个数,不设置该参数时默认函数返回一个返回值。该参数与使用type annotations方式标识函数返回个数与类型的方式选择其一即可。 |
||
resources |
dict |
用于标识当前func需要的资源信息, 支持memory和num_cpus,memory单位为M。例如:{"memory": 100, "num_cpus": 1} |
||
stream_input |
str |
用于表示当前func的输入为流式输入(即函数入参为队列),当前只支持"Queue"类型,用户可自行从输入队列中取数据。 |
||
choice_output |
function |
表示当前func为可选输出,只有满足条件的输出才会返回(条件为用户自定义的function)。例如:
该例子表示只有非None的输出才会返回。 |
返回值
装饰后的类或者函数。
异常情况下会抛出DfException异常。可以通过捕捉异常获取DfException中的error_code与message查看具体的错误码及错误信息。详细信息请参考DataFlow错误码。
调用示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | # current is udf.py import dataflow as df @df.pyflow(num_returns=2, resources={"memory": 100, "num_cpus": 1}) def func1(a, b): return a + b,a - b @df.pyflow def func2(a, b): return a + b @df.pyflow(stream_input='Queue') def func3(a, b): data1 = a.get() data2 = a.get() data3 = b.get() return data1 + data2 + data3 @df.pyflow(choice_output=lambda e: e is not None) def func4(self, a) -> Tuple[int, int]: return None, a # 根据lambda函数将非空值传给相应输出 # current is graph.py from udf import func2 import dataflow as df # 构图 # 定义输入 data0 = df.FlowData() data1 = df.FlowData() # 使用func2自动生成的fnode方法构图 func_out = func2.fnode()(data0, data1) # 构建FlowGraph dag = df.FlowGraph([func_out]) |
约束说明
环境需安装对应python版本的cloudpickle包。
流式输入场景下DataFlow框架不支持数据对齐和异常事务处理。
父主题: DataFlow构图接口