1.PyFlink的发展史
1.1、v1.8.x
Flink在1.8版本的时候就已经提供PythonAPI,只在Datase/Stream上提供支持。
存在一些问题,比如:
TableAPI不支持Python。
两套各自独立实现的一个PythonAPI。
底层实现是JPython,JPython无法支持Python3.x。
1.2、v1.9.x
年8月发布。
支持PythonTableAPI。
1.3、v1.10.x
年2月发布。
提供了PythonUDF的支持。
提供UDF的依赖管理。
1.4、未来发展
提供PandasUDF的支持。
提供用户自定义的一些UDFMetrics。
MLAPI。
在易用性方面,提供SQLDDL支持PythonUDF。
在后面的一些版本中,我们也希望越来越多的人能够参与到PyFlink的贡献和开发中去。
2.PyFlink核心功能及原理介绍
PyFlink核心功能将主要从每个版本的划分来跟大家进行介绍,第1个PyFlink1.9版本里面提供PythonTableAPI的支持,然后是PyFlink1.10里面提供了PythonUDF还有相关依赖管理,最后1.11版本里面提供了PandasUDF和用户自定义的Metrics。
2.1、PythonTableAPI(PyFlink1.9)
■1.PythonTableAPI
什么是PythonTableAPI呢?我们可以从编程的角度来介绍一下。PythonTableAPI大概提供了一些Python的API,比如这里主要可以看一下Table的接口,Table接口上有很多Table相关的算子,这些算子可以分为两类:
1.跟sql相关的算子。比如select、filter、join、window等;
2.在sql的基础上扩展的一些算子。比如drop_columns(..),可以用来提升sql的便利性,比如:
当有一个很大的表并且想删除某一列的时候,可以用drop_columns来删除某一列。
对于我们来说,可以随意组合Table上的方法,然后编写不同的业务逻辑。我们接下来看一下,如何用TableAPI来写一个WordCount的例子,可以让大家有一个比较完整的认识。
■2.WordCount
如下图所示,是一个完整的PythonTableAPI的WordCount的例子。主要可以包含4个部分。
首先,我们需要去初始化环境,比如第6行,我们先拿到了一个ExecutionEnvironment,然后第7行,去创建一个TableEnvironment。
创建TableEnvironment之后,需要去定义source跟sink,这里source跟sink都是指定了输入和输出的文件路径,还指定了文件中Table对应的一些字段,以及字段对应的数据类型。而且可以定义输出分隔符。
定义好source跟sink之后,再来看一下如何编写计算逻辑。可以用from_path算子来读取source表,读取完之后,就可以进行groupby的一些聚合,做groupby跟wordcount。
做完之后,可以把结果表用insert_into进行输出。最后调用Environment的execute来提交作业。
经过上面4步,我们就完整的写出了一个PythonTableAPI的WordCount。那么对于WordCount例子,它的底层实现逻辑是怎么样的呢?接下来看一下,PythonTableAPI的一个架构。
■3.TableAPI架构
image
通过这个架构图,可以看到,PythonTableAPI是建立在JavaTableAPI的基础上的,我们并没有单独的从上到下实现一套PythonTableAPI。
PythonTableAPI是一个特别的存在,它是在JavaTableAPI的基础上加了一层薄薄的API,这两层API是可以相互调用的。
在client端的时候,会起一个PythonVM然后也会起一个JavaVM,两个VM进行通信。通信的细节可以看下面这张图。
我们可以看到Python跟JavaVM里面都会用Py4J各自起一个Gateway。然后Gateway会维护一些对象。
比如我们在Python这边创建一个table对象的时候,它也会在相应的Java这边创建一个相同table对象。如果创建一个TableEnvironment对象,在Java部分也会创建一个TableEnvironment对象。
如果你调用table对象上的方法,那么也会映射到Java这边,所以是一个一一映射的关系。
基于这一套架构,我们可以得出一个结论:如果你用PythonTableAPI写出了一个作业,这个作业没有PythonUDF的时候,那么这个作业的性能跟你用Java写出来的作业性能是一样的。因为底层的架构都是同一套Java的架构。
刚刚我们介绍了PyFlink1.9版本里面的PythonTableAPI,也提到了table的接口上面提供了很多不同的算子,而且可以用这些算子去组合,实现不同的业务逻辑。但是对于这些算子来说,它的功能还无法满足一些特定的情况,比如某些业务需要编写一些自定义的逻辑,此时就需要强依赖PythonUDF,所以在PyFlink1.10版本里面,提供了PythonUDF并且提供了相应的依赖管理。
2.2、PythonUDF依赖管理(PyFlink1.10)
■1.PythonUDF架构
如果你的作业是包含一个PythonUDF的作业,那么从提交的时候,就是左边的架构图,然后deploy到Remote端的时候,可以看到Remote端的架构图分为两个部分。左边部分是Java的Operator,右边部分是Python的Operator。
大体的流程我们可以大概看一下:
在open方法里进行JavaOperator和PythonOperator环境的初始化。
环境初始化好之后,会进行数据处理。当JavaOperator收到数据之后,先把数据放到一个inputbuffer缓冲区中,达到一定的阈值后,才会flash到Python这边。Python处理完之后,也会先将数据放到一个结果的缓冲区中,当达到一定阈值,比如达到一定的记录的行数,或者是达到一定的时间,才会把结果flush到这边。
state访问的链路。
logging访问的链路。
metrics汇报的链路。
■2.PythonUDF的使用
PyFlink-1.9版本中,PythonAPI中支持注册使用JavaUDF,使用方法如下:可以调TableEnvironment上的register_java_function这个方法,有两个参数,一个参数是给UDF的命名,第2个是Java类的路径。
table_env.register_java_function("func1","java.user.defined.function.class.name")
下面是一个例子:
PythonUDF的使用:
可以调TableEnvironment上的register_function这个方法,有两个参数,一个参数是给UDF起的名字,第2个是python_udf的一个对象。
table_env.register_function("func1",python_udf)
下面是一个例子:
■3.PythonUDF的定义方式
PyFlink里面也支持一些其他的方式去定义UDF,我们可以看一下,总共有4种方式:
可以继承ScalaFunction基类,并重写eval方法。
直接定义一个NamedFunction,然后再用UDF的签名去声明UDF的输入类型和输出类型。
也可以用刚刚例子里面的LambdaFunction的这种方式,来定义PythonUDF。
最后一种是CallableFunction的方式。也是声明其输入和输出的类型。
■4.依赖管理
写完UDF的时候,经常遇到一个问题,UDF里面可能会有一些依赖,如何去解决这些依赖问题呢?PyFlink提供了4种依赖的API,如下所示。
依赖文件
如果UDF里面依赖一个文件的话,可以用add_python_file加载依赖的文件的路径,指定完之后,作业提交的时候,就会把这个文件分发到集群,那么在远程执行的时候,你的UDF就可以去访问这个文件。
table_env.add_python_file(file_path)
依赖存档(打包)文件
可能会去依赖一个存档的文件,这个时候你可以用add_python_archive方法,传入两个参数。第2个参数是一个可选的参数。第1个参数表示对你存档文件的重命名。如果调用了API,那么在UDF里面就可以去访问存档文件里面的所有文件。
table_env.add_python_archive("py_env.zip","myenv")#thefilescontainedinthearchivefilecanbeaccessedinUDFdefmy_udf():withopen("myenv/py_env/data/data.txt")asf:
依赖第三方项目
可以用set_python_requirements方法去指定你的第三方依赖。也是有两个参数,第1个参数是传一个文件,文件中写了所依赖的第三方项目,以及它对应的版本。第2个参数是一个可选的参数,如果集群是一个有网络的环境,那么第2个参数可以不填,当第2个参数不填的时候,作业提交开始初始化的时候,Python就会去根据你的requirements文件里面配置的依赖,自动的去网络下载你的依赖,然后安装。如果集群是没有网络的,可以预先把这些依赖下载好,下载到cached的目录里面去。然后把目录也一起提交到集群,集群拿到这个目录会去安装这些依赖。
#