一、Flink 核心-Window窗口介绍和应用场景
- 文档地址
- https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/windows
- Windows are at the heart of processing infinite streams(Window是处理无限数据量的核心)
- 背景
- 数据流是一直源源不断产生,业务需要聚合统计使用,比如每10秒统计过去5分钟的点击量、成交额等
- Windows 就可以将无限的数据流拆分为有限大小的“桶 buckets”,然后程序可以对其窗口内的数据进行计算
- 窗口认为是Bucket桶,一个窗口段就是一个桶,比如8到9点是一个桶,9到10点是一个桶
- 分类
- time Window 时间窗口,即按照一定的时间规则作为窗口统计
- time-tumbling-window 时间滚动窗口 (用的多)
- time-sliding-window 时间滑动窗口 (用的多)
- session WIndow 会话窗口,即一个会话内的数据进行统计,相对少用
- count Window 数量窗口,即按照一定的数据量作为窗口统计,相对少用
- time Window 时间窗口,即按照一定的时间规则作为窗口统计
- 窗口属性
- 滑动窗口 Sliding Windows
- 窗口具有固定大小
- 窗口数据有重叠
- 例子:每10s统计一次最近1min内的订单数量
- 滚动窗口 Tumbling Windows
- 窗口具有固定大小
- 窗口数据不重叠
- 例子:每10s统计一次最近10s内的订单数量
- 窗口大小size 和 滑动间隔 slide
- tumbling-window:滚动窗口: size=slide,如:每隔10s统计最近10s的数据
- sliding-window:滑动窗口: size>slide,如:每隔5s统计最近10s的数据
- size<slide的时候,如每隔15s统计最近10s的数据,那么中间5s的数据会丢失,所以开发中不用
- 滑动窗口 Sliding Windows
二、Flink 核心-Window 窗口API和使用流程介绍
- 什么情况下才可以使用WindowAPI
- 有keyBy 用 window() api
- 没keyBy 用 windowAll() api ,并行度低
- 方括号 ([…]) 中的命令是可选的,允许用多种不同的方式自定义窗口逻辑
- 注意
- 一个窗口内 的是左闭右开
- countWindow没过期,但timeWindow在1.12过期,统一使用window;
- 窗口分配器 Window Assigners
- 定义了如何将元素分配给窗口,负责将每条数据分发到正确的 window窗口上
- window() 的参数是一个 WindowAssigner,flink本身提供了Tumbling、Sliding 等Assigner
- 窗口触发器 trigger
- 用来控制一个窗口是否需要被触发
- 每个 窗口分配器WindowAssigner 都有一个默认触发器,也支持自定义触发器
- 窗口 window function ,对窗口内的数据做啥?
- 定义了要对窗口中收集的数据做的计算操作
- 增量聚合函数
aggregate(agg函数,WindowFunction(){ })
- 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
- 常见的增量聚合函数有 reduceFunction、aggregateFunction
- min、max、sum 都是简单的聚合操作,不需要自定义规则
AggregateFunction<IN, ACC, OUT> IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
- 全窗口函数
apply(new processWindowFunction(){ })
- 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
- 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 WindowFunction<IN, OUT, KEY, W extends Window>
- 如果想处理每个元素更底层的API的时候用
//对数据进行解析 ,process对每个元素进行处理,相当于 map+flatMap+filter process(new KeyedProcessFunction(){processElement、onTimer})
三、Tumbling-Window滚动时间窗介绍和案例实战
- 滚动窗口 Tumbling Windows
- 窗口具有固定大小
- 窗口数据不重叠
- 比如指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口
- 案例实战(可以单一个数据测试,后续再讲时间语义)
代码实战
Flink14TumblingWindowApp
import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSourceV2; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.Date; public class Flink14TumblingWindowApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //数据源 source DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); DataStream<VideoOrder> sumDS = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money"); sumDS.print(); //DataStream需要调用execute,可以取个名称 env.execute("tumbling window job"); } }
控制台输出
-----open-----
产生:面试专题第一季,价格:50, 时间:2021-11-16 11:55:29
VideoOrder{tradeNo='87cc1df4-f8a4-405c-a531-8c6c5c3a0760', title='面试专题第一季', money=50, userId=1, createTime=2021-11-16 11:55:29}
产生:项目大课,价格:1, 时间:2021-11-16 11:55:30
产生:项目大课,价格:1, 时间:2021-11-16 11:55:31
产生:kafka,价格:300, 时间:2021-11-16 11:55:32
产生:flink,价格:45, 时间:2021-11-16 11:55:33
产生:spring boot,价格:15, 时间:2021-11-16 11:55:34
VideoOrder{tradeNo='21d648d7-d5db-4690-9019-930b83a50345', title='项目大课', money=2, userId=1, createTime=2021-11-16 11:55:30}
VideoOrder{tradeNo='3fc94b2d-b19c-45d7-9ac8-c702f495593f', title='spring boot', money=15, userId=0, createTime=2021-11-16 11:55:34}
VideoOrder{tradeNo='731fb1ca-0b0b-49e9-978b-4628d27262ec', title='flink', money=45, userId=1, createTime=2021-11-16 11:55:33}
VideoOrder{tradeNo='01856978-bdd2-4557-90a9-c65f3fadbb4a', title='kafka', money=300, userId=7, createTime=2021-11-16 11:55:32}
产生:springc loud,价格:20, 时间:2021-11-16 11:55:35
产生:java,价格:10, 时间:2021-11-16 11:55:36
产生:springc loud,价格:20, 时间:2021-11-16 11:55:37
产生:flink,价格:45, 时间:2021-11-16 11:55:38
产生:java,价格:10, 时间:2021-11-16 11:55:39
VideoOrder{tradeNo='a2b23200-2c91-4fe2-be1e-84d646d61dcd', title='springc loud', money=40, userId=9, createTime=2021-11-16 11:55:35}
VideoOrder{tradeNo='25290b84-8369-453f-849c-f6e30560afc8', title='flink', money=45, userId=0, createTime=2021-11-16 11:55:38}
VideoOrder{tradeNo='e6d5e382-f091-4638-9630-0f0c0f78a616', title='java', money=20, userId=0, createTime=2021-11-16 11:55:36}
产生:项目大课,价格:1, 时间:2021-11-16 11:55:40
Process finished with exit code -1
四、Sliding -Window滑动时间窗介绍和案例实战
- 滑动窗口 Sliding Windows
- 窗口具有固定大小
- 窗口数据有重叠
- 例子:每10s统计一次最近1min内的订单数量
- 案例实战
- 每5秒统计过去20秒的不同视频的订单总价
关键代码
DataStream<VideoOrder> sumDS = keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5))).sum("money");
完整代码
Flink15SlidingWindowApp
import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSourceV2; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class Flink15SlidingWindowApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //数据源 source DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); DataStream<VideoOrder> sumDS = keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5))).sum("money"); sumDS.print(); //DataStream需要调用execute,可以取个名称 env.execute("sliding window job"); } }
控制台输出
-----open-----
产生:面试专题第一季,价格:50, 时间:2021-11-16 20:20:45
产生:springc loud,价格:20, 时间:2021-11-16 20:20:46
产生:spring boot,价格:15, 时间:2021-11-16 20:20:47
产生:kafka,价格:300, 时间:2021-11-16 20:20:48
产生:kafka,价格:300, 时间:2021-11-16 20:20:49
VideoOrder{tradeNo='40266fb3-2c4d-47da-98cf-f3709a049998', title='面试专题第一季', money=50, userId=0, createTime=2021-11-16 20:20:45}
VideoOrder{tradeNo='e4ad0815-1b39-40e4-9790-27ce4a369e38', title='springc loud', money=20, userId=7, createTime=2021-11-16 20:20:46}
VideoOrder{tradeNo='82201d5e-ab17-44a6-94af-649605f9a456', title='kafka', money=600, userId=1, createTime=2021-11-16 20:20:48}
VideoOrder{tradeNo='c6f1c559-d3bd-48e8-b31d-5ce52cb61df9', title='spring boot', money=15, userId=4, createTime=2021-11-16 20:20:47}
产生:springc loud,价格:20, 时间:2021-11-16 20:20:50
产生:flink,价格:45, 时间:2021-11-16 20:20:51
产生:java,价格:10, 时间:2021-11-16 20:20:52
产生:flink,价格:45, 时间:2021-11-16 20:20:53
产生:springc loud,价格:20, 时间:2021-11-16 20:20:54
VideoOrder{tradeNo='40266fb3-2c4d-47da-98cf-f3709a049998', title='面试专题第一季', money=50, userId=0, createTime=2021-11-16 20:20:45}
VideoOrder{tradeNo='c6f1c559-d3bd-48e8-b31d-5ce52cb61df9', title='spring boot', money=15, userId=4, createTime=2021-11-16 20:20:47}
VideoOrder{tradeNo='1e7c6967-a653-4787-81dd-b04ef916069f', title='flink', money=90, userId=5, createTime=2021-11-16 20:20:51}
VideoOrder{tradeNo='1e5f7f09-d758-4695-8f5d-8e994920f472', title='java', money=10, userId=2, createTime=2021-11-16 20:20:52}
VideoOrder{tradeNo='e4ad0815-1b39-40e4-9790-27ce4a369e38', title='springc loud', money=60, userId=7, createTime=2021-11-16 20:20:46}
VideoOrder{tradeNo='82201d5e-ab17-44a6-94af-649605f9a456', title='kafka', money=600, userId=1, createTime=2021-11-16 20:20:48}
产生:springc loud,价格:20, 时间:2021-11-16 20:20:55
产生:java,价格:10, 时间:2021-11-16 20:20:56
产生:spring boot,价格:15, 时间:2021-11-16 20:20:57
产生:springc loud,价格:20, 时间:2021-11-16 20:20:58
产生:kafka,价格:300, 时间:2021-11-16 20:20:59
VideoOrder{tradeNo='1e5f7f09-d758-4695-8f5d-8e994920f472', title='java', money=20, userId=2, createTime=2021-11-16 20:20:52}
VideoOrder{tradeNo='40266fb3-2c4d-47da-98cf-f3709a049998', title='面试专题第一季', money=50, userId=0, createTime=2021-11-16 20:20:45}
VideoOrder{tradeNo='1e7c6967-a653-4787-81dd-b04ef916069f', title='flink', money=90, userId=5, createTime=2021-11-16 20:20:51}
VideoOrder{tradeNo='c6f1c559-d3bd-48e8-b31d-5ce52cb61df9', title='spring boot', money=30, userId=4, createTime=2021-11-16 20:20:47}
VideoOrder{tradeNo='82201d5e-ab17-44a6-94af-649605f9a456', title='kafka', money=900, userId=1, createTime=2021-11-16 20:20:48}
VideoOrder{tradeNo='e4ad0815-1b39-40e4-9790-27ce4a369e38', title='springc loud', money=100, userId=7, createTime=2021-11-16 20:20:46}
产生:项目大课,价格:1, 时间:2021-11-16 20:21:00
产生:java,价格:10, 时间:2021-11-16 20:21:01
产生:kafka,价格:300, 时间:2021-11-16 20:21:02
产生:flink,价格:45, 时间:2021-11-16 20:21:03
产生:kafka,价格:300, 时间:2021-11-16 20:21:04
VideoOrder{tradeNo='40266fb3-2c4d-47da-98cf-f3709a049998', title='面试专题第一季', money=50, userId=0, createTime=2021-11-16 20:20:45}
VideoOrder{tradeNo='c6f1c559-d3bd-48e8-b31d-5ce52cb61df9', title='spring boot', money=30, userId=4, createTime=2021-11-16 20:20:47}
VideoOrder{tradeNo='e9ec7731-f1a1-4f57-a3d8-989587493efa', title='项目大课', money=1, userId=8, createTime=2021-11-16 20:21:00}
VideoOrder{tradeNo='e4ad0815-1b39-40e4-9790-27ce4a369e38', title='springc loud', money=100, userId=7, createTime=2021-11-16 20:20:46}
VideoOrder{tradeNo='82201d5e-ab17-44a6-94af-649605f9a456', title='kafka', money=1500, userId=1, createTime=2021-11-16 20:20:48}
VideoOrder{tradeNo='1e5f7f09-d758-4695-8f5d-8e994920f472', title='java', money=30, userId=2, createTime=2021-11-16 20:20:52}
VideoOrder{tradeNo='1e7c6967-a653-4787-81dd-b04ef916069f', title='flink', money=135, userId=5, createTime=2021-11-16 20:20:51}
产生:spring boot,价格:15, 时间:2021-11-16 20:21:05
产生:kafka,价格:300, 时间:2021-11-16 20:21:06
产生:springc loud,价格:20, 时间:2021-11-16 20:21:07
产生:面试专题第一季,价格:50, 时间:2021-11-16 20:21:08
产生:spring boot,价格:15, 时间:2021-11-16 20:21:09
VideoOrder{tradeNo='bd153991-15d2-499c-a21c-763fed4c6e76', title='kafka', money=1200, userId=3, createTime=2021-11-16 20:20:59}
VideoOrder{tradeNo='e9ec7731-f1a1-4f57-a3d8-989587493efa', title='项目大课', money=1, userId=8, createTime=2021-11-16 20:21:00}
VideoOrder{tradeNo='5c57985e-8803-46a7-a007-2fd4ba711288', title='spring boot', money=45, userId=7, createTime=2021-11-16 20:20:57}
VideoOrder{tradeNo='0753f5ea-5455-4f37-8aa5-df57d23ae39f', title='springc loud', money=100, userId=2, createTime=2021-11-16 20:20:50}
VideoOrder{tradeNo='9b250cf5-b784-48c1-a095-fc783c353b8c', title='面试专题第一季', money=50, userId=1, createTime=2021-11-16 20:21:08}
VideoOrder{tradeNo='1e7c6967-a653-4787-81dd-b04ef916069f', title='flink', money=135, userId=5, createTime=2021-11-16 20:20:51}
VideoOrder{tradeNo='1e5f7f09-d758-4695-8f5d-8e994920f472', title='java', money=30, userId=2, createTime=2021-11-16 20:20:52}
产生:面试专题第一季,价格:50, 时间:2021-11-16 20:21:10
产生:项目大课,价格:1, 时间:2021-11-16 20:21:11
产生:flink,价格:45, 时间:2021-11-16 20:21:12
产生:项目大课,价格:1, 时间:2021-11-16 20:21:13
产生:springc loud,价格:20, 时间:2021-11-16 20:21:14
VideoOrder{tradeNo='9b250cf5-b784-48c1-a095-fc783c353b8c', title='面试专题第一季', money=100, userId=1, createTime=2021-11-16 20:21:08}
VideoOrder{tradeNo='5c57985e-8803-46a7-a007-2fd4ba711288', title='spring boot', money=45, userId=7, createTime=2021-11-16 20:20:57}
VideoOrder{tradeNo='afee7be1-cae5-497e-a3cf-8487463c66cc', title='springc loud', money=80, userId=0, createTime=2021-11-16 20:20:55}
VideoOrder{tradeNo='e9ec7731-f1a1-4f57-a3d8-989587493efa', title='项目大课', money=3, userId=8, createTime=2021-11-16 20:21:00}
VideoOrder{tradeNo='bd153991-15d2-499c-a21c-763fed4c6e76', title='kafka', money=1200, userId=3, createTime=2021-11-16 20:20:59}
VideoOrder{tradeNo='6c8967aa-a0bf-4bda-9d7e-159703c705c2', title='flink', money=90, userId=9, createTime=2021-11-16 20:21:03}
VideoOrder{tradeNo='907ff38b-f4f7-456a-ab00-069b7e32af49', title='java', money=20, userId=7, createTime=2021-11-16 20:20:56}
产生:项目大课,价格:1, 时间:2021-11-16 20:21:15
Process finished with exit code -1
五、基于数量的Count Window窗口介绍和案例实战
- 基于数量的滚动窗口, 滑动计数窗口
- 案例:
- 统计分组后同个key内的数据超过5次则进行统计 countWindow(5)
- 只要有2个数据到达后就可以往后统计5个数据的值, countWindow(5, 2)
关键代码
//数据源 source DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); //分组后的组内数据超过5个则触发 //DataStream<VideoOrder> sumDS = keyByDS.countWindow(5).sum("money"); //分组后的组内数据超过3个则触发统计过去的5个数据 DataStream<VideoOrder> sumDS = keyByDS.countWindow(5,3).sum("money"); sumDS.print();
Flink16CountWindowApp
import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSourceV2; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class Flink16CountWindowApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //数据源 source DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); //分组后的组内数据超过5个则触发 //DataStream<VideoOrder> sumDS = keyByDS.countWindow(5).sum("money"); //分组后的组内数据超过3个则触发统计过去的5个数据 DataStream<VideoOrder> sumDS = keyByDS.countWindow(5,3).sum("money"); sumDS.print(); //DataStream需要调用execute,可以取个名称 env.execute("sliding window job"); } }
控制台输出
-----open-----
产生:项目大课,价格:1, 时间:2021-11-16 20:59:12
产生:spring boot,价格:15, 时间:2021-11-16 20:59:13
产生:kafka,价格:300, 时间:2021-11-16 20:59:14
产生:面试专题第一季,价格:50, 时间:2021-11-16 20:59:15
产生:springc loud,价格:20, 时间:2021-11-16 20:59:16
产生:项目大课,价格:1, 时间:2021-11-16 20:59:17
产生:flink,价格:45, 时间:2021-11-16 20:59:18
产生:面试专题第一季,价格:50, 时间:2021-11-16 20:59:19
产生:java,价格:10, 时间:2021-11-16 20:59:20
产生:springc loud,价格:20, 时间:2021-11-16 20:59:21
产生:面试专题第一季,价格:50, 时间:2021-11-16 20:59:22
VideoOrder{tradeNo='359df0be-6923-47a1-9cba-077e044ad057', title='面试专题第一季', money=150, userId=1, createTime=2021-11-16 20:59:15}
产生:kafka,价格:300, 时间:2021-11-16 20:59:23
产生:面试专题第一季,价格:50, 时间:2021-11-16 20:59:24
产生:springc loud,价格:20, 时间:2021-11-16 20:59:25
VideoOrder{tradeNo='980af9a9-ec6e-47a8-9d50-10e9845beed8', title='springc loud', money=60, userId=2, createTime=2021-11-16 20:59:16}
产生:java,价格:10, 时间:2021-11-16 20:59:26
产生:面试专题第一季,价格:50, 时间:2021-11-16 20:59:27
产生:项目大课,价格:1, 时间:2021-11-16 20:59:28
VideoOrder{tradeNo='53ad98be-0251-4629-95b3-f16141866af3', title='项目大课', money=3, userId=4, createTime=2021-11-16 20:59:12}
产生:spring boot,价格:15, 时间:2021-11-16 20:59:29
产生:kafka,价格:300, 时间:2021-11-16 20:59:30
VideoOrder{tradeNo='a37fd03d-351d-4618-a494-0b5d7c67ed8b', title='kafka', money=900, userId=5, createTime=2021-11-16 20:59:14}
产生:kafka,价格:300, 时间:2021-11-16 20:59:31
产生:flink,价格:45, 时间:2021-11-16 20:59:32
产生:flink,价格:45, 时间:2021-11-16 20:59:33
VideoOrder{tradeNo='82fb8a6d-f1d0-4381-ba4e-167b6d954e8d', title='flink', money=135, userId=1, createTime=2021-11-16 20:59:18}
产生:spring boot,价格:15, 时间:2021-11-16 20:59:34
VideoOrder{tradeNo='5f5245d2-c908-4932-a710-aa8fecfab048', title='spring boot', money=45, userId=7, createTime=2021-11-16 20:59:13}
产生:springc loud,价格:20, 时间:2021-11-16 20:59:35
本文作者为DBC,转载请注明。