Flink处理函数实战之一:深入了解ProcessFunction的状态(Flink-1.10)

Flink处理函数实战之一:深入了解ProcessFunction的状态(Flink-1.10)

欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

Flink处理函数实战系列链接

  1. 深入了解ProcessFunction的状态操作(Flink-1.10)
  2. ProcessFunction
  3. KeyedProcessFunction类
  4. ProcessAllWindowFunction(窗口处理)
  5. CoProcessFunction(双流处理)

关于ProcessFunction状态的疑惑

学习Flink的ProcessFunction过程中,官方文档中涉及状态处理的时候,不止一次提到只适用于keyed stream的元素,如下图红框所示:
在这里插入图片描述

之前写过一些flink应用,keyed stream常用但不是必须用的,所以产生了疑问:

  1. 为何只有keyed stream的元素能读写状态?
  2. 每个key对应的状态是如何操作的?

Flink的”状态”

先去回顾Flink”状态”的知识点:

  1. 官方文档说就两种状态:keyed state和operator state:
    在这里插入图片描述
  2. 如上图,keyed stream的元素是具有key的特征,与ProcessFunction的操作状态时要求匹配,其他steam的元素由于没有key的特征,所以也就没有状态一说了;
  3. 另一种状态是Operator State,如下图,这是和多并行度计算时的算子实例绑定的,例如当前算子消费kafka的某个分区的最新offset,而ProcessFunction是用来处理stream元素的,不会涉及到Operator State:
    在这里插入图片描述

官方demo

为了学习ProcessFunction就去看官方demo,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/process_function.html ,简单说说这个demo的功能:

  1. 数据源在不间断的产生单词,每个单词对应一个Tuple2<String,String>的实例;
  2. 数据源被keyBy方法转成KeyedStream,key是Tuple2实例的f0字段;
  3. 一个KeyedProcessFunction的子类CountWithTimeoutFunction,被用来处理KeyedStream的每个元素,处理的逻辑:为每个key维护一个状态,状态的内容是这个key的出现次数和最后一次出现时间;
  4. 如果那个key连续一分钟没有出现,KeyedProcessFunction就向下游发送这个元素;

以上就是官方demo的功能,本来是想通过demo来加深认识,结果看完不但没有明白,反而更晕了,下图是我对demo代码的疑惑:

在这里插入图片描述

从上图可见我的疑惑,这里再复述一下:
5. 入参value是Tuple2类型,假设其f0字段等于aaa,那么processElement方法的作用,就是取出aaa的状态,更新后保存;
6. 从代码上看,state.value()返回了aaa的状态,这个value方法并没有将aaa作为入参,那怎么做到返回aaa的状态呢?如果下一个入参value的f0字段等于bbb了,这个state.value()能返回bbb的状态吗?
7. 对更新状态的代码state.update(current)也是同样的疑惑;
8. 然后又产生了新的疑惑:成员变量state难道是一直在变?每执行一次processElement,都会变成该key对应的state实例?

先反思为何会有上述疑惑

  1. 上述疑惑产生的原因,应该是受到平时使用HashMap的影响,HashMap获取值就是在调用get方法时指定key,设置值也是在put时指定key,所以看到state.value()方法没有用key做入参就不习惯了
  2. 要消除这种不适应,要做的第一件事就是提醒自己:processElement是在框架内运行的,很多数据在之前已经由框架准备好了;
  3. 接下来要做的,就是把框架准备数据的逻辑看一遍,除了弄明白自己的问题,由于ProcessFunction属于最低阶抽象(如下图的最下方位置),看懂了这些,其实也是在了解DataStream/DataSet API的设计思路:
    在这里插入图片描述

跟踪源码

  1. 如下图,让我们从一个断点的堆栈开始吧,这是在执行上面demo中的processElement方法之前的一个断点,可见根源是个线程的run方法,也就是KeyedProcessFunction对应的算子执行任务的线程:
    在这里插入图片描述
  2. 上面的堆栈不必每一层都细看,只关注重要的部分,下图这段很重要:StreamTask.run方法中,有个无限循环(猜测是每次执行processInput方法都处理KeyedStream的一个元素):
    在这里插入图片描述
  3. 如下图,StreamOneInputProcessor.processInput方法取出KeyedStream的一个元素,调用processElement方法,并将此元素作为入参,再结合上一幅图可以看出:在编写KeyedProcessFunction子类的时候,KeyedStream的每个元素都会作为入参,在调用你重写的processElement方法时传进去;这一点,在做ProcessFunction和KeyedProcessFunction开发时都是要格外注意的:
    在这里插入图片描述
  4. 接下来到了最关键的地方了,下图红框中的streamOperator.setKeyContextElement1(record)会解答我前面的疑惑,一定要进去看个清楚,(后面的黄线上的代码,您应该猜到了,里面其实就是调用demo中的processElement方法)
    在这里插入图片描述
  5. 下图中,AbstractStreamOperator.setKeyContextElement给出了答案:对于KeyedStream的每个元素,都会在这里算出key,再调用setCurrentKey保存这个key
    在这里插入图片描述
  6. 展开setCurrentKey,如下图,发现key的保存和当前状态的存储策略(StateBackend)有关,我这里是默认策略HeapKeyedStateBackend
    在这里插入图片描述
  7. 最终,根据当前元素得到的key会在StateBackend的keyContext对象中找地方保存,StateBackend的具体实现和Flink设置有关,我这里是保存到了InternalKeyContextImpl实例的currentKey变量中:
    在这里插入图片描述
  8. 代码读到这里,对我前面的疑惑,您应该能推测出答案了:state.value()里面会通过StateBackend的keyContext取出刚才保存的key,接下来就能像HashMap那样根据key查出该key的状态了,接下来是愉快的印证我们推测的过程;
  9. state.value()代码位置打断点一次看个明白,如下图,果然,state里面有StateBackend的keyContext对象的引用,访问刚才保存的key就不成问题了:
    在这里插入图片描述
  10. 展开state.value()方法如下,简单明了,直接拿keyContext保存的key作为入参去取对应的状态:
    在这里插入图片描述
  11. 再展开上面的get方法,可见最终是从stateMap中取得的,而这个stateMap的具体实现是CopyOnWriteStateMap类型的实例:
    在这里插入图片描述
  12. 代码读到这里,只剩最后一处需要印证了:更新状态的state.update(current)方法,应该也是以StateBackend的keyContext中的key作为自己的key,再将入参的current作为value,更新到stateMap中,来吧,一起印证这个推测;
  13. 展开方法,看到的是stateTable.put方法(前面刚看过stateTable的get方法,稳了):
    在这里插入图片描述
  14. stateTable.put方法里面和前面的get方法一样,直接拿keyContext保存的key作为自己的key:
    在这里插入图片描述
  15. 最终是调用了stateMap.put方法,将数据保存在CopyOnWriteStateMap实例中:
    在这里插入图片描述
  16. 得益于Flink代码自身规范、清晰的设计和实现,再加上IDEA强大的debug功能,整个阅读和分析过程十分顺利,这其中的收获会逐渐在今后深入学习DataStreamAPI的过程中见效;

最后,根据上面的分析过程绘制了一幅简陋的流程图,希望能帮助您加快理解:
在这里插入图片描述

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界…
https://github.com/zq2599/blog_demos

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/2623.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • sesvc.exe_alg是什么进程

    sesvc.exe_alg是什么进程今天公司的一台电脑一点右键就没有反应,通过任务管理发现每次启动都会有一个“Excel”进程,第一感觉就是中毒了,在网上找到了无暇解决方案杀毒方法:(切记:在操作过程中使用“右键->打开”,不可双击。)1.结束注册表中的fun.xls.exe的进程(建议选中fun.xls.exe->右键->转到进程,查找到“algsrvs.exe”单击它,再选择“…

  • 基于OpenCv的人脸识别(Python完整代码)

    基于OpenCv的人脸识别(Python完整代码)目前人脸识别有很多较为成熟的方法,这里调用OpenCv库,而OpenCV又提供了三种人脸识别方法,分别是LBPH方法、EigenFishfaces方法、Fisherfaces方法。本文采用的是LBPH(LocalBinaryPatternsHistogram,局部二值模式直方图)方法。opencv是一个开源的的跨平台计算机视觉库,内部实现了图像处理和计算机视觉方面的很多通用算法,对于python而言,在引用opencv库的时候需要写为importcv2。其中,cv2是opencv的C++命名空间名称

  • 权威外汇交易平台_国内外汇交易平台排行

    权威外汇交易平台_国内外汇交易平台排行Guardian是世界领先的在线金融商品交易平台,在华经营已超过10年,是专业外汇指数交易商。Guardian以优质的客户服务,技术支持以及同业最出色的交易系统使其成为客户理想的选择对象。Guardian服务对象包括金融机构客户,对冲基金,经理人账户和个人客户,多年来以卓越的信誉赢得了客户的满意。Guardian集团是专注于为全球客户提供包括外汇、贵金属、期权、指数、数字货币等交易产品,以及…

  • U盘 未知USB设备 设定地址失败 由于该设备有问题Windows 已将其停止(代码 43) 终极解决方案(做过系统装机盘而无法解决的必看)

    U盘 未知USB设备 设定地址失败 由于该设备有问题Windows 已将其停止(代码 43) 终极解决方案(做过系统装机盘而无法解决的必看)U盘由于该设备有问题Windows已将其停止(代码43)终极解决方案我们在使用U盘的时候偶尔会碰到下列情况一般是因为传输数据的过程中,死机或未响应直接断点或拔掉设备导致的,U盘再次插上之后出现设定地址失败。无法再次读取设备的数据。解决方案:首先请确认出现该情况不是因为你摔了U盘或接口处产生断裂这种物理损伤导致的!!!首先请确认出现该情况不是因为你摔了U盘或接口处产生断裂这种物理损…

  • mysql清空表数据_mysql数据库之如何清空表中数据「建议收藏」

    mysql清空表数据_mysql数据库之如何清空表中数据「建议收藏」本篇文章主要讲述的是在数据库中使用清空命令,具有一定学习价值,有需要的朋友可以了解一下,希望能够对你有所帮助。在做数据迁移,数据清洗或者写web项目时要将数据替换更新,那么有时要将表做清空处理常用的清空数据表的SQL语句有如下两种deletefrom表名;truncatetable表名;运行测试我使用的是MySql待测试的表有20000条记录,将其多拷两份以备测试分别运行两个清空表的SQL…

  • 振荡周期、时钟周期、机器周期、指令周期的区别与联系[通俗易懂]

    振荡周期、时钟周期、机器周期、指令周期的区别与联系[通俗易懂]以下内容均来自网上查找,并根据个人理解进行整理,刚开始学习单片机,如有不对的地方敬请指正。先给出结论:一个振荡周期=一个时钟周期;一个时钟周期=一个机器周期;一个机器周期=六个状态周期;一个状态周期=两个节拍;一个节拍=一个时钟周期;一个指令周期=N个机器周期;综上:1个指令周期=N个机器周期=6N个状态周期=12N个节拍=12N个时钟周期=12N个振荡周期时钟周期:一个脉冲所需…

    2022年10月13日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号