一、Flink里面你需要知道的AggregateFunction增量聚合函数知识
窗口 window function ,对窗口内的数据做啥?
- 定义了要对窗口中收集的数据做的计算操作
- 增量聚合函数
aggregate(agg函数,WindowFunction(){ }) - 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
- 常见的增量聚合函数有 reduceFunction、aggregateFunction
- min、max、sum 都是简单的聚合操作,不需要自定义规则
AggregateFunction<IN, ACC, OUT> IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
代码实战
Flink17AggWindowApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class Flink17AggWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//数据源 source
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
SingleOutputStreamOperator<VideoOrder> aggregate = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<VideoOrder, VideoOrder, VideoOrder>() {
//初始化累加器
@Override
public VideoOrder createAccumulator() {
VideoOrder videoOrder = new VideoOrder();
return videoOrder;
}
//聚合操作
@Override
public VideoOrder add(VideoOrder value, VideoOrder accumulator) {
accumulator.setMoney(value.getMoney() + accumulator.getMoney());
if (accumulator.getTitle() == null) {
accumulator.setTitle(value.getTitle());
}
if (accumulator.getCreateTime() == null) {
accumulator.setCreateTime(value.getCreateTime());
}
return accumulator;
}
//获取结果
@Override
public VideoOrder getResult(VideoOrder accumulator) {
return accumulator;
}
@Override
public VideoOrder merge(VideoOrder a, VideoOrder b) {
VideoOrder videoOrder = new VideoOrder();
videoOrder.setMoney(a.getMoney() + b.getMoney());
videoOrder.setTitle(a.getTitle());
return videoOrder;
}
});
aggregate.print();
//DataStream需要调用execute,可以取个名称
env.execute("sliding window job");
}
}
控制台输出
产生:springc loud,价格:20, 时间:2021-11-18 09:43:37
产生:项目大课,价格:1, 时间:2021-11-18 09:43:38
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:43:39
VideoOrder{tradeNo='null', title='flink', money=45, userId=0, createTime=2021-11-18 09:43:35}
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:43:39}
VideoOrder{tradeNo='null', title='springc loud', money=20, userId=0, createTime=2021-11-18 09:43:37}
VideoOrder{tradeNo='null', title='项目大课', money=2, userId=0, createTime=2021-11-18 09:43:36}
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:43:40
产生:kafka,价格:300, 时间:2021-11-18 09:43:41
产生:flink,价格:45, 时间:2021-11-18 09:43:42
产生:springc loud,价格:20, 时间:2021-11-18 09:43:43
产生:springc loud,价格:20, 时间:2021-11-18 09:43:44
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:43:40}
VideoOrder{tradeNo='null', title='springc loud', money=40, userId=0, createTime=2021-11-18 09:43:43}
VideoOrder{tradeNo='null', title='flink', money=45, userId=0, createTime=2021-11-18 09:43:42}
VideoOrder{tradeNo='null', title='kafka', money=300, userId=0, createTime=2021-11-18 09:43:41}
产生:kafka,价格:300, 时间:2021-11-18 09:43:45
产生:kafka,价格:300, 时间:2021-11-18 09:43:46
产生:spring boot,价格:15, 时间:2021-11-18 09:43:47
产生:java,价格:10, 时间:2021-11-18 09:43:48
产生:spring boot,价格:15, 时间:2021-11-18 09:43:49
VideoOrder{tradeNo='null', title='kafka', money=600, userId=0, createTime=2021-11-18 09:43:45}
VideoOrder{tradeNo='null', title='java', money=10, userId=0, createTime=2021-11-18 09:43:48}
VideoOrder{tradeNo='null', title='spring boot', money=30, userId=0, createTime=2021-11-18 09:43:47}
产生:spring boot,价格:15, 时间:2021-11-18 09:43:50
产生:springc loud,价格:20, 时间:2021-11-18 09:43:51
产生:springc loud,价格:20, 时间:2021-11-18 09:43:52
产生:springc loud,价格:20, 时间:2021-11-18 09:43:53
产生:spring boot,价格:15, 时间:2021-11-18 09:43:54
VideoOrder{tradeNo='null', title='spring boot', money=30, userId=0, createTime=2021-11-18 09:43:50}
VideoOrder{tradeNo='null', title='springc loud', money=60, userId=0, createTime=2021-11-18 09:43:51}
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:43:55
产生:kafka,价格:300, 时间:2021-11-18 09:43:56
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:43:57
产生:spring boot,价格:15, 时间:2021-11-18 09:43:58
产生:spring boot,价格:15, 时间:2021-11-18 09:43:59
VideoOrder{tradeNo='null', title='面试专题第一季', money=100, userId=0, createTime=2021-11-18 09:43:55}
VideoOrder{tradeNo='null', title='spring boot', money=30, userId=0, createTime=2021-11-18 09:43:58}
VideoOrder{tradeNo='null', title='kafka', money=300, userId=0, createTime=2021-11-18 09:43:56}
产生:java,价格:10, 时间:2021-11-18 09:44:00
产生:java,价格:10, 时间:2021-11-18 09:44:01
产生:flink,价格:45, 时间:2021-11-18 09:44:02
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:44:03
产生:spring boot,价格:15, 时间:2021-11-18 09:44:04
VideoOrder{tradeNo='null', title='java', money=20, userId=0, createTime=2021-11-18 09:44:00}
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 09:44:04}
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:44:03}
VideoOrder{tradeNo='null', title='flink', money=45, userId=0, createTime=2021-11-18 09:44:02}
产生:springc loud,价格:20, 时间:2021-11-18 09:44:05
产生:spring boot,价格:15, 时间:2021-11-18 09:44:06
产生:flink,价格:45, 时间:2021-11-18 09:44:07
产生:项目大课,价格:1, 时间:2021-11-18 09:44:08
产生:kafka,价格:300, 时间:2021-11-18 09:44:09
VideoOrder{tradeNo='null', title='springc loud', money=20, userId=0, createTime=2021-11-18 09:44:05}
VideoOrder{tradeNo='null', title='kafka', money=300, userId=0, createTime=2021-11-18 09:44:09}
VideoOrder{tradeNo='null', title='项目大课', money=1, userId=0, createTime=2021-11-18 09:44:08}
VideoOrder{tradeNo='null', title='flink', money=45, userId=0, createTime=2021-11-18 09:44:07}
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 09:44:06}
产生:kafka,价格:300, 时间:2021-11-18 09:44:10
产生:java,价格:10, 时间:2021-11-18 09:44:11
产生:java,价格:10, 时间:2021-11-18 09:44:12
产生:spring boot,价格:15, 时间:2021-11-18 09:44:13
产生:spring boot,价格:15, 时间:2021-11-18 09:44:14
VideoOrder{tradeNo='null', title='kafka', money=300, userId=0, createTime=2021-11-18 09:44:10}
VideoOrder{tradeNo='null', title='spring boot', money=30, userId=0, createTime=2021-11-18 09:44:13}
VideoOrder{tradeNo='null', title='java', money=20, userId=0, createTime=2021-11-18 09:44:11}
产生:java,价格:10, 时间:2021-11-18 09:44:15
产生:java,价格:10, 时间:2021-11-18 09:44:16
产生:spring boot,价格:15, 时间:2021-11-18 09:44:17
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:44:18
产生:项目大课,价格:1, 时间:2021-11-18 09:44:19
VideoOrder{tradeNo='null', title='java', money=20, userId=0, createTime=2021-11-18 09:44:15}
VideoOrder{tradeNo='null', title='项目大课', money=1, userId=0, createTime=2021-11-18 09:44:19}
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:44:18}
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 09:44:17}
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:44:20
产生:spring boot,价格:15, 时间:2021-11-18 09:44:21
产生:flink,价格:45, 时间:2021-11-18 09:44:22
产生:springc loud,价格:20, 时间:2021-11-18 09:44:23
产生:项目大课,价格:1, 时间:2021-11-18 09:44:24
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:44:20}
VideoOrder{tradeNo='null', title='项目大课', money=1, userId=0, createTime=2021-11-18 09:44:24}
VideoOrder{tradeNo='null', title='springc loud', money=20, userId=0, createTime=2021-11-18 09:44:23}
VideoOrder{tradeNo='null', title='flink', money=45, userId=0, createTime=2021-11-18 09:44:22}
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 09:44:21}
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:44:25
Process finished with exit code -1
二、Flink里面你需要知道的WindowFunction全窗口函数知识
- 全窗口函数
apply(new WindowFunction(){ }) - 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
- 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 WindowFunction<IN, OUT, KEY, W extends Window>
代码实战
Flink17ApplyWindowApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.stream.Collectors;
public class Flink17ApplyWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//数据源 source
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
SingleOutputStreamOperator<VideoOrder> apply = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.apply(new WindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<VideoOrder> input, Collector<VideoOrder> out) throws Exception {
List<VideoOrder> list = IteratorUtils.toList(input.iterator());
int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
VideoOrder videoOrder = new VideoOrder();
videoOrder.setMoney(total);
videoOrder.setTitle(list.get(0).getTitle());
videoOrder.setCreateTime(list.get(0).getCreateTime());
out.collect(videoOrder);
}
});
apply.print();
//DataStream需要调用execute,可以取个名称
env.execute("sliding window job");
}
} 控制台输出
-----open-----
产生:spring boot,价格:15, 时间:2021-11-18 09:55:28
产生:kafka,价格:300, 时间:2021-11-18 09:55:29
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 09:55:28}
VideoOrder{tradeNo='null', title='kafka', money=300, userId=0, createTime=2021-11-18 09:55:29}
产生:面试专题第一季,价格:50, 时间:2021-11-18 09:55:30
产生:spring boot,价格:15, 时间:2021-11-18 09:55:31
产生:java,价格:10, 时间:2021-11-18 09:55:32
产生:java,价格:10, 时间:2021-11-18 09:55:33
产生:spring boot,价格:15, 时间:2021-11-18 09:55:34
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 09:55:30}
VideoOrder{tradeNo='null', title='java', money=20, userId=0, createTime=2021-11-18 09:55:32}
VideoOrder{tradeNo='null', title='spring boot', money=30, userId=0, createTime=2021-11-18 09:55:31}
产生:kafka,价格:300, 时间:2021-11-18 09:55:35
产生:springc loud,价格:20, 时间:2021-11-18 09:55:36
产生:kafka,价格:300, 时间:2021-11-18 09:55:37
Process finished with exit code -1
三、Flink里面你需要知道的processWindowFunction全窗口函数知识
- 全窗口函数
process(new ProcessWindowFunction(){}) - 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
- 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 ProcessWindowFunction<IN, OUT, KEY, W extends Window>
代码实战
Flink17ProcessWindowApp
<pre class="prettyprint lang-java linenums:1">
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.List;
import java.util.stream.Collectors;
public class Flink17ProcessWindowApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//数据源 source
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
SingleOutputStreamOperator<VideoOrder> process = keyByDS
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction<VideoOrder, VideoOrder, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<VideoOrder> elements, Collector<VideoOrder> out) throws Exception {
List<VideoOrder> list = IteratorUtils.toList(elements.iterator());
int total = list.stream().collect(Collectors.summingInt(VideoOrder::getMoney)).intValue();
VideoOrder videoOrder = new VideoOrder();
videoOrder.setMoney(total);
videoOrder.setCreateTime(list.get(0).getCreateTime());
videoOrder.setTitle(list.get(0).getTitle());
out.collect(videoOrder);
}
});
process.print();
//DataStream需要调用execute,可以取个名称
env.execute("sliding window job");
}
}
</pre> 控制台输出
-----open-----
产生:面试专题第一季,价格:50, 时间:2021-11-18 10:03:04
VideoOrder{tradeNo='null', title='面试专题第一季', money=50, userId=0, createTime=2021-11-18 10:03:04}
产生:kafka,价格:300, 时间:2021-11-18 10:03:05
产生:kafka,价格:300, 时间:2021-11-18 10:03:06
产生:springc loud,价格:20, 时间:2021-11-18 10:03:07
产生:spring boot,价格:15, 时间:2021-11-18 10:03:08
产生:面试专题第一季,价格:50, 时间:2021-11-18 10:03:09
VideoOrder{tradeNo='null', title='kafka', money=600, userId=0, createTime=2021-11-18 10:03:05}
VideoOrder{tradeNo='null', title='spring boot', money=15, userId=0, createTime=2021-11-18 10:03:08}
VideoOrder{tradeNo='null', title='springc loud', money=20, userId=0, createTime=2021-11-18 10:03:07}
产生:java,价格:10, 时间:2021-11-18 10:03:10
产生:面试专题第一季,价格:50, 时间:2021-11-18 10:03:11
产生:springc loud,价格:20, 时间:2021-11-18 10:03:12
Process finished with exit code -1
窗口函数对比
- 增量聚合
aggregate(new AggregateFunction(){}); apply(new WindowFunction(){}) process(new ProcessWindowFunction(){}) //比上面这个强 本文作者为DBC,转载请注明。