一、Flink里面你需要知道的AggregateFunction增量聚合函数知识
窗口 window function ,对窗口内的数据做啥?
- 定义了要对窗口中收集的数据做的计算操作
- 增量聚合函数
aggregate(agg函数,WindowFunction(){ })
- 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
- 常见的增量聚合函数有 reduceFunction、aggregateFunction
- min、max、sum 都是简单的聚合操作,不需要自定义规则
AggregateFunction<IN, ACC, OUT> IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
代码实战
Flink17AggWindowApp
import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSourceV2; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class Flink17AggWindowApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //数据源 source DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); SingleOutputStreamOperator<VideoOrder> aggregate = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate(new AggregateFunction<VideoOrder, VideoOrder, VideoOrder>() { //初始化累加器 @Override public VideoOrder createAccumulator() { VideoOrder videoOrder = new VideoOrder(); return videoOrder; } //聚合操作 @Override public VideoOrder add(VideoOrder value, VideoOrder accumulator) { accumulator.setMoney(value.getMoney() + accumulator.getMoney()); if (accumulator.getTitle() == null) { accumulator.setTitle(value.getTitle()); } if (accumulator.getCreateTime() == null) { accumulator.setCreateTime(value.getCreateTime()); } return accumulator; } //获取结果 @Override public VideoOrder getResult(VideoOrder accumulator) { return accumulator; } @Override public VideoOrder merge(VideoOrder a, VideoOrder b) { VideoOrder videoOrder = new VideoOrder(); videoOrder.setMoney(a.getMoney() + b.getMoney()); videoOrder.setTitle(a.getTitle()); return videoOrder; } }); aggregate.print(); //DataStream需要调用execute,可以取个名称 env.execute("sliding window job"); } }
控制台输出
产生:springc loud,价格:20, 时间:2021-11-18 09:43:37
产生:项目大课,价格:1, 时间:2021-11-18 09:43:38
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:43:39
VideoOrder{tradeNo='null', title='flink', money=45, userId=0, createTime=2021-11-18 09:43:35}
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:43:39}
VideoOrder{tradeNo='null', title='springc loud', money=20, userId=0, createTime=2021-11-18 09:43:37}
VideoOrder{tradeNo='null', title='项目大课', money=2, userId=0, createTime=2021-11-18 09:43:36}
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:43:40
产生:kafka,价格:300, 时间:2021-11-18 09:43:41
产生:flink,价格:45, 时间:2021-11-18 09:43:42
产生:springc loud,价格:20, 时间:2021-11-18 09:43:43
产生:springc loud,价格:20, 时间:2021-11-18 09:43:44
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:43:40}
VideoOrder{tradeNo='null', title='springc loud', money=40, userId=0, createTime=2021-11-18 09:43:43}
VideoOrder{tradeNo='null', title='flink', money=45, userId=0, createTime=2021-11-18 09:43:42}
VideoOrder{tradeNo='null', title='kafka', money=300, userId=0, createTime=2021-11-18 09:43:41}
产生:kafka,价格:300, 时间:2021-11-18 09:43:45
产生:kafka,价格:300, 时间:2021-11-18 09:43:46
产生:spring boot,价格:15, 时间:2021-11-18 09:43:47
产生:java,价格:10, 时间:2021-11-18 09:43:48
产生:spring boot,价格:15, 时间:2021-11-18 09:43:49
VideoOrder{tradeNo='null', title='kafka', money=600, userId=0, createTime=2021-11-18 09:43:45}
VideoOrder{tradeNo='null', title='java', money=10, userId=0, createTime=2021-11-18 09:43:48}
VideoOrder{tradeNo='null', title='spring boot', money=30, userId=0, createTime=2021-11-18 09:43:47}
产生:spring boot,价格:15, 时间:2021-11-18 09:43:50
产生:springc loud,价格:20, 时间:2021-11-18 09:43:51
产生:springc loud,价格:20, 时间:2021-11-18 09:43:52
产生:springc loud,价格:20, 时间:2021-11-18 09:43:53
产生:spring boot,价格:15, 时间:2021-11-18 09:43:54
VideoOrder{tradeNo='null', title='spring boot', money=30, userId=0, createTime=2021-11-18 09:43:50}
VideoOrder{tradeNo='null', title='springc loud', money=60, userId=0, createTime=2021-11-18 09:43:51}
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:43:55
产生:kafka,价格:300, 时间:2021-11-18 09:43:56
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:43:57
产生:spring boot,价格:15, 时间:2021-11-18 09:43:58
产生:spring boot,价格:15, 时间:2021-11-18 09:43:59
VideoOrder{tradeNo='null', title='面试专题第一季', money=100, userId=0, createTime=2021-11-18 09:43:55}
VideoOrder{tradeNo='null', title='spring boot', money=30, userId=0, createTime=2021-11-18 09:43:58}
VideoOrder{tradeNo='null', title='kafka', money=300, userId=0, createTime=2021-11-18 09:43:56}
产生:java,价格:10, 时间:2021-11-18 09:44:00
产生:java,价格:10, 时间:2021-11-18 09:44:01
产生:flink,价格:45, 时间:2021-11-18 09:44:02
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:44:03
产生:spring boot,价格:15, 时间:2021-11-18 09:44:04
VideoOrder{tradeNo='null', title='java', money=20, userId=0, createTime=2021-11-18 09:44:00}
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 09:44:04}
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:44:03}
VideoOrder{tradeNo='null', title='flink', money=45, userId=0, createTime=2021-11-18 09:44:02}
产生:springc loud,价格:20, 时间:2021-11-18 09:44:05
产生:spring boot,价格:15, 时间:2021-11-18 09:44:06
产生:flink,价格:45, 时间:2021-11-18 09:44:07
产生:项目大课,价格:1, 时间:2021-11-18 09:44:08
产生:kafka,价格:300, 时间:2021-11-18 09:44:09
VideoOrder{tradeNo='null', title='springc loud', money=20, userId=0, createTime=2021-11-18 09:44:05}
VideoOrder{tradeNo='null', title='kafka', money=300, userId=0, createTime=2021-11-18 09:44:09}
VideoOrder{tradeNo='null', title='项目大课', money=1, userId=0, createTime=2021-11-18 09:44:08}
VideoOrder{tradeNo='null', title='flink', money=45, userId=0, createTime=2021-11-18 09:44:07}
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 09:44:06}
产生:kafka,价格:300, 时间:2021-11-18 09:44:10
产生:java,价格:10, 时间:2021-11-18 09:44:11
产生:java,价格:10, 时间:2021-11-18 09:44:12
产生:spring boot,价格:15, 时间:2021-11-18 09:44:13
产生:spring boot,价格:15, 时间:2021-11-18 09:44:14
VideoOrder{tradeNo='null', title='kafka', money=300, userId=0, createTime=2021-11-18 09:44:10}
VideoOrder{tradeNo='null', title='spring boot', money=30, userId=0, createTime=2021-11-18 09:44:13}
VideoOrder{tradeNo='null', title='java', money=20, userId=0, createTime=2021-11-18 09:44:11}
产生:java,价格:10, 时间:2021-11-18 09:44:15
产生:java,价格:10, 时间:2021-11-18 09:44:16
产生:spring boot,价格:15, 时间:2021-11-18 09:44:17
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:44:18
产生:项目大课,价格:1, 时间:2021-11-18 09:44:19
VideoOrder{tradeNo='null', title='java', money=20, userId=0, createTime=2021-11-18 09:44:15}
VideoOrder{tradeNo='null', title='项目大课', money=1, userId=0, createTime=2021-11-18 09:44:19}
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:44:18}
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 09:44:17}
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:44:20
产生:spring boot,价格:15, 时间:2021-11-18 09:44:21
产生:flink,价格:45, 时间:2021-11-18 09:44:22
产生:springc loud,价格:20, 时间:2021-11-18 09:44:23
产生:项目大课,价格:1, 时间:2021-11-18 09:44:24
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:44:20}
VideoOrder{tradeNo='null', title='项目大课', money=1, userId=0, createTime=2021-11-18 09:44:24}
VideoOrder{tradeNo='null', title='springc loud', money=20, userId=0, createTime=2021-11-18 09:44:23}
VideoOrder{tradeNo='null', title='flink', money=45, userId=0, createTime=2021-11-18 09:44:22}
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 09:44:21}
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:44:25
Process finished with exit code -1
二、Flink里面你需要知道的WindowFunction全窗口函数知识
- 全窗口函数
apply(new WindowFunction(){ })
- 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
- 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 WindowFunction<IN, OUT, KEY, W extends Window>
代码实战
Flink17ApplyWindowApp
import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSourceV2; import org.apache.commons.collections.IteratorUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.List; import java.util.stream.Collectors; public class Flink17ApplyWindowApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //数据源 source DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); SingleOutputStreamOperator<VideoOrder> apply = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .apply(new WindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() { @Override public void apply(String key, TimeWindow window, Iterable<VideoOrder> input, Collector<VideoOrder> out) throws Exception { List<VideoOrder> list = IteratorUtils.toList(input.iterator()); int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue(); VideoOrder videoOrder = new VideoOrder(); videoOrder.setMoney(total); videoOrder.setTitle(list.get(0).getTitle()); videoOrder.setCreateTime(list.get(0).getCreateTime()); out.collect(videoOrder); } }); apply.print(); //DataStream需要调用execute,可以取个名称 env.execute("sliding window job"); } }
控制台输出
-----open-----
产生:spring boot,价格:15, 时间:2021-11-18 09:55:28
产生:kafka,价格:300, 时间:2021-11-18 09:55:29
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 09:55:28}
VideoOrder{tradeNo='null', title='kafka', money=300, userId=0, createTime=2021-11-18 09:55:29}
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:55:30
产生:spring boot,价格:15, 时间:2021-11-18 09:55:31
产生:java,价格:10, 时间:2021-11-18 09:55:32
产生:java,价格:10, 时间:2021-11-18 09:55:33
产生:spring boot,价格:15, 时间:2021-11-18 09:55:34
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:55:30}
VideoOrder{tradeNo='null', title='java', money=20, userId=0, createTime=2021-11-18 09:55:32}
VideoOrder{tradeNo='null', title='spring boot', money=30, userId=0, createTime=2021-11-18 09:55:31}
产生:kafka,价格:300, 时间:2021-11-18 09:55:35
产生:springc loud,价格:20, 时间:2021-11-18 09:55:36
产生:kafka,价格:300, 时间:2021-11-18 09:55:37
Process finished with exit code -1
三、Flink里面你需要知道的processWindowFunction全窗口函数知识
- 全窗口函数
process(new ProcessWindowFunction(){})
- 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
- 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 ProcessWindowFunction<IN, OUT, KEY, W extends Window>
代码实战
Flink17ProcessWindowApp
<pre class="prettyprint lang-java linenums:1"> import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSourceV2; import org.apache.commons.collections.IteratorUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.List; import java.util.stream.Collectors; public class Flink17ProcessWindowApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //数据源 source DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); SingleOutputStreamOperator<VideoOrder> process = keyByDS .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<VideoOrder> elements, Collector<VideoOrder> out) throws Exception { List<VideoOrder> list = IteratorUtils.toList(elements.iterator()); int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue(); VideoOrder videoOrder = new VideoOrder(); videoOrder.setMoney(total); videoOrder.setCreateTime(list.get(0).getCreateTime()); videoOrder.setTitle(list.get(0).getTitle()); out.collect(videoOrder); } }); process.print(); //DataStream需要调用execute,可以取个名称 env.execute("sliding window job"); } } </pre>
控制台输出
-----open-----
产生:面试专题第一季,价格:50, 时间:2021-11-18 10:03:04
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 10:03:04}
产生:kafka,价格:300, 时间:2021-11-18 10:03:05
产生:kafka,价格:300, 时间:2021-11-18 10:03:06
产生:springc loud,价格:20, 时间:2021-11-18 10:03:07
产生:spring boot,价格:15, 时间:2021-11-18 10:03:08
产生:面试专题第一季,价格:50, 时间:2021-11-18 10:03:09
VideoOrder{tradeNo='null', title='kafka', money=600, userId=0, createTime=2021-11-18 10:03:05}
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 10:03:08}
VideoOrder{tradeNo='null', title='springc loud', money=20, userId=0, createTime=2021-11-18 10:03:07}
产生:java,价格:10, 时间:2021-11-18 10:03:10
产生:面试专题第一季,价格:50, 时间:2021-11-18 10:03:11
产生:springc loud,价格:20, 时间:2021-11-18 10:03:12
Process finished with exit code -1
窗口函数对比
- 增量聚合
aggregate(new AggregateFunction(){});
apply(new WindowFunction(){})
process(new ProcessWindowFunction(){}) //比上面这个强
本文作者为DBC,转载请注明。