数据结构论坛

首页 » 分类 » 定义 » FlinkState有可能代替数据库吗
TUhjnbcbe - 2023/10/6 18:15:00

有状态的计算作为容错以及数据一致性的保证,是当今实时计算必不可少的特性之一,流行的实时计算引擎包括GoogleDataflow、Flink、Spark(Structure)Streaming、KafkaStreams都分别提供对内置State的支持。State的引入使得实时应用可以不依赖外部数据库来存储元数据及中间数据,部分情况下甚至可以直接用State存储结果数据,这让业界不禁思考:State和Database是何种关系?有没有可能用State来代替数据库呢?

在这个课题上,Flink社区是比较早就开始探索的。总体来说,Flink社区的努力可以分为两条线:一是在作业运行时通过作业查询接口访问State的能力,即QueryableState;二是通过State的离线dump文件(Savepoint)来离线查询和修改State的能力,即即将引入的SavepointProcessorAPI。

QueryableState

在年发布的Flink1.2版本,Flink引入了QueryableState的特性以允许用户通过特定的client查询作业State的内容[1],这意味着Flink应用可以在完全不依赖State存储介质以外的外部存储的情况下提供实时访问计算结果的能力。

只通过QueryableState提供实时数据访问

然而,QueryableState虽然设想上比较理想化,但由于依赖底层架构的改动较多且功能也比较受限,它一直处于Beta版本并不能用于生产环境。针对这个问题,在前段时间腾讯的工程师杨华提出QueryableState的改进计划[2]。在邮件列表中,社区就QueryableState是否可以用于代替数据库作了讨论并出现了不同的观点。笔者结合个人见解将StateasDatabase的主要优缺点整理如下。

优点:

更低的数据延迟。一般情况下Flink应用的计算结果需要同步到外部的数据库,比如定时触发输出窗口计算结果,而这种同步通常是定时的会带来一定的延迟,导致计算是实时的而查询却不是实时的尴尬局面,而直接State则可以避免这个问题。更强的数据一致性保证。根据外部存储的特性不同,FlinkConnector或者自定义的SinkFunction提供的一致性保障也有所差别。比如对于不支持多行事务的HBase,Flink只能通过业务逻辑的幂等性来保障Exactly-Once投递。相比之下State则有妥妥的Exactly-Once投递保证。节省资源。因为减少了同步数据到外部存储的需要,我们可以节省序列化和网络传输的成本,另外当然还可以节省数据库成本。缺点:

SLA保障不足。数据库技术已经非常成熟,在可用性、容错性和运维上都很多的积累,在这点上State还相当于是处于原始人时期。另外从定位上来看,Flink作业有版本迭代维护或者遇到错误自动重启带来的downtime,并不能达到数据库在数据访问上的高可用性。可能导致作业的不稳定。未经过考虑的Ad-hocQuery可能会要求扫描并返回夸张量级的数据,这会系统带来很大的负荷,很可能影响作业的正常执行。即使是合理的Query,在并发数较多的情况下也可能影响作业的执行效率。存储数据量不能太大。State运行时主要存储在TaskManager本地内存和磁盘,State过大会造成TaskManagerOOM或者磁盘空间不足。另外State大意味着checkpoint大,导致checkpoint可能会超时并显著延长作业恢复时长。只支持最基础的查询。State只能进行最简单的数据结构查询,不能像关系型数据库一样提供函数等计算能力,也不支持谓词下推等优化技术。只可以读取,不能修改。State在运行时只可以被作业本身修改,如果实在要修改State只能通过下文的SavepointProcessorAPI来实现。总体来说,目前State代替数据库的缺点还是远多于其优点,不过对于某些对数据可用性要求不高的作业来说,使用State作为数据库还是完全合理的。由于定位上的不同,FlinkState在短时间内很难看到可以完全替代数据库的可能性,但在数据访问特性上State往数据库方向发展是无需质疑的。

SavepointProcessorAPI

SavepointProcessorAPI是社区最近提出的一个新特性(见FLIP-42[3]),用于离线对State的dump文件Savepoint进行分析、修改或者直接根据数据构建出一个初始的Savepoint。SavepointProcessorAPI属于FlinkStateEvolution的StateManagement。如果说QueryableState是DSL的话,FlinkStateEvolution就是DML,而SavepointProcessorAPI就是DML中最为重要的部分。

SavepointProcessorAPI的前身是第三方的Bravo项目[4],主要思路提供Savepoint和DataSet相互转换的能力,典型应用是Savepoint读取成DataSet,在DataSet上进行修改,然后再写为一个新的Savepoint。这适合用于以下的场景:

分析作业State以研究其模式和规律排查问题或者审计为新的应用构建的初始State修改Savepoint,比如:改变作业最大并行度进行巨大的Schema改动修正有问题的StateSavepoint作为State的dump文件,通过SavepointProcessorAPI可以暴露数据查询和修改功能,类似于一个离线的数据库,但State的概念和典型关系型数据的概念还是有很多不同,FLIP-43也对这些差异进行了类比和总结。

首先Savepoint是多个operator的state的物理存储集合,不同operator的state是独立的,这类似于数据库下不同namespace之间的table。我们可以得到Savepoint对应数据库,单个operator对应Namespace。

但就table而言,其在Savepoint里对应的概念根据State类型的不同而有所差别。State有OperatorState、KeyedState和BroadcastState三种,其中OperatorState和BroadcastState属于non-partitionedstate,即没有按key分区的state,而相反地KeyedState则属于partitionedstate。

对于non-partitionedstate来说,state是一个table,state的每个元素即是table里的一行;而对于partitionedstate来说,同一个operator下的所有state对应一个table。这个table像是HBase一样有个rowkey,然后每个具体的state对应table里的一个column。

举个例子,假设有一个游戏玩家得分和在线时长的数据流,我们需要用KeyedState来记录玩家所在组的分数和游戏时长,用OperatorState记录玩家的总得分和总时长。

在一段时间内数据流的输入如下:

用KeyedState,我们分别注册group_score和group_time两个MapState表示组总得分和组总时长,并根据user_groupkeyby数据流之后将两个指标的累积值更新到State里,得到的表如下:

相对地,假如用OperatorState来记录总得分和总时长(并行度设为1),我们注册total_score和total_time两个State,得到的表有两个:

total_score

-------

14,

至此Savepoint和Database的对应关系应该是比较清晰明了的。而对于Savepoint来说还有不同的StateBackend来决定State具体如何持续化,这显然对应的是数据库的存储引擎。在MySQL中,我们可以通过简单的一行命令ALTERTABLExxxENGINE=InnoDB;来改变存储引擎,在背后MySQL会自动完成繁琐的格式转换工作。而对于Savepoint来说,由于StateBackend各自的存储格式不兼容,目前尚不能方便地切换StateBackend。为此,社区在不久前创建FLIP-41[5]来进一步完善Savepoint的可操作性。

总结

StateasDatabase是实时计算发展的大趋势,它并不是要代替数据库的使用,而是借鉴数据库领域的经验拓展State接口使其操作方式更接近我们熟悉的数据库。对于Flink而言,State的外部使用可以分为在线的实时访问和离线的访问和修改,分别将由QueryableState和SavepointProcessorAPI两个特性支持。

参考文献

QueryableStateinApacheFlink1.2.0:AnOverviewDemoImproveQueryableStateandIntroduceaQueryServerProxyComponentFLIP-43:SavepointProcessorAPIBravo:UtilitiesforprocessingFlinkcheckpoints/savepointsFLIP-41:UnifyKeyedStateSnapshotBinaryFormatforSavepoints作者介绍:林小铂,网易游戏高级开发工程师,负责游戏数据中心实时平台的开发及运维工作,目前专注于ApacheFlink的开发及应用。探究问题本来就是一种乐趣。

1
查看完整版本: FlinkState有可能代替数据库吗