一、Flink 里面的Map和FlatMap 算子实战订单转换
- 需求:多数算子,我们会用订单 转换-过滤-分组-统计 来实现
- 这样大家更加明白应用场景,比如应用到多个方面等
- 结果类型 idea自动提示
- 算子后 .var 回车 java类型
- 算子后 .val 回车 scala类型
- 什么是java里面的Map操作
- 一对一 转换对象
代码实现
import net.xdclass.model.VideoOrder; import net.xdclass.sink.VideoOrderCounterSink; import net.xdclass.source.VideoOrderSource; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import java.util.Date; public class Flink09MapApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //数据源 source DataStream<VideoOrder> ds = env.fromElements( new VideoOrder("21312","java",32,5,new Date()), new VideoOrder("314","java",32,5,new Date()), new VideoOrder("542","springboot",32,5,new Date()), new VideoOrder("42","redis",32,5,new Date()), new VideoOrder("4252","java",32,5,new Date()), new VideoOrder("42","springboot",32,5,new Date()), new VideoOrder("554232","flink",32,5,new Date()), new VideoOrder("23323","java",32,5,new Date()) ); //transformation DataStream<Tuple2<String,Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(VideoOrder value) throws Exception { return new Tuple2<>(value.getTitle(),1); } }); mapDS.print(); //DataStream需要调用execute,可以取个名称 env.execute("map job"); } }
控制台输出
(java,1)
(java,1)
(springboot,1)
(redis,1)
(java,1)
(springboot,1)
(flink,1)
(java,1)
- 什么是java里面的FlatMap操作
- 一对多转换对象
代码实现
import net.xdclass.model.VideoOrder; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Date; public class Flink09FlatMapApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); DataStreamSource<String> ds = env.fromElements("spring,java", "springcloud,flink", "java,kafka"); SingleOutputStreamOperator<String> flatMapDS = ds.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { String [] arr = value.split(","); for(String str:arr){ out.collect(str); } } }); flatMapDS.print(); //DataStream需要调用execute,可以取个名称 env.execute("flat map job"); } }
控制台输出
spring
java
springcloud
flink
java
kafka
二、Flink 里面的RichMap和RichFlatMap 算子实战
- Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
- RichXXX相关Open、Close、setRuntimeContext等 API方法会根据并行度进行操作的
- 比如并行度是4,那就有4次触发对应的open/close方法等,是4个不同subtask
- 比如 RichMapFunction、RichFlatMapFunction、RichSourceFunction等
Flink09FlatMapApp
import net.xdclass.model.VideoOrder; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Date; public class Flink09FlatMapApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(2); DataStreamSource<String> ds = env.fromElements("spring,java", "springcloud,flink", "java,kafka"); SingleOutputStreamOperator<String> flatMapDS = ds.flatMap(new RichFlatMapFunction<String, String>() { @Override public void open(Configuration parameters) throws Exception { System.out.println("========open"); } @Override public void close() throws Exception { System.out.println("========close"); } @Override public void flatMap(String value, Collector<String> out) throws Exception { String [] arr = value.split(","); for(String str:arr){ out.collect(str); } } }); flatMapDS.print(); //DataStream需要调用execute,可以取个名称 env.execute("flat map job"); } }
控制台输出
========open
========open
2> springcloud
1> spring
2> flink
1> java
1> java
1> kafka
========close
========close
Flink09MapApp
import net.xdclass.model.VideoOrder; import net.xdclass.sink.VideoOrderCounterSink; import net.xdclass.source.VideoOrderSource; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import java.util.Date; public class Flink09MapApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //数据源 source DataStream<VideoOrder> ds = env.fromElements( new VideoOrder("21312","java",32,5,new Date()), new VideoOrder("314","java",32,5,new Date()), new VideoOrder("542","springboot",32,5,new Date()), new VideoOrder("42","redis",32,5,new Date()), new VideoOrder("4252","java",32,5,new Date()), new VideoOrder("42","springboot",32,5,new Date()), new VideoOrder("554232","flink",32,5,new Date()), new VideoOrder("23323","java",32,5,new Date()) ); //transformation DataStream<Tuple2<String,Integer>> mapDS = ds.map(new RichMapFunction<VideoOrder, Tuple2<String,Integer>>() { @Override public void open(Configuration parameters) throws Exception { System.out.println("========open"); } @Override public void close() throws Exception { System.out.println("========close"); } @Override public Tuple2<String, Integer> map(VideoOrder value) throws Exception { return new Tuple2<>(value.getTitle(),1); } }); mapDS.print(); //DataStream需要调用execute,可以取个名称 env.execute("map job"); } }
控制台输出
========open
(java,1)
(java,1)
(springboot,1)
(redis,1)
(java,1)
(springboot,1)
(flink,1)
(java,1)
========close
三、Flink 里面的KeyBy分组概念讲解+订单统计实战
KeyBy分组概念介绍
- keyBy是把数据流按照某个字段分区
- keyBy后是相同的数据放到同个组里面,再进行组内统计
代码实战
Flink10KeyByApp
import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSource; import net.xdclass.source.VideoOrderSourceV2; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Date; public class Flink10KeyByApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //数据源 source // DataStream<VideoOrder> ds = env.fromElements( // new VideoOrder("21312","java",32,5,new Date()), // new VideoOrder("314","java",32,5,new Date()), // new VideoOrder("542","springboot",45,5,new Date()), // new VideoOrder("42","redis",12,5,new Date()), // new VideoOrder("4252","java",32,5,new Date()), // new VideoOrder("42","springboot",45,5,new Date()), // new VideoOrder("554232","flink",30,5,new Date()), // new VideoOrder("23323","java",32,5,new Date()) // ); DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); SingleOutputStreamOperator<VideoOrder> money = videoOrderStringKeyedStream.sum("money"); money.print(); //DataStream需要调用execute,可以取个名称 env.execute("map job"); } }
VideoOrderSourceV2
import net.xdclass.model.VideoOrder; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.util.*; public class VideoOrderSourceV2 extends RichParallelSourceFunction<VideoOrder> { private volatile Boolean flag = true; private Random random = new Random(); private static List<VideoOrder> list = new ArrayList<>(); static { list.add(new VideoOrder("","java",10,0,null)); list.add(new VideoOrder("","spring boot",15,0,null)); list.add(new VideoOrder("","springc loud",20,0,null)); list.add(new VideoOrder("","flink",45,0,null)); list.add(new VideoOrder("","面试专题第一季",50,0,null)); list.add(new VideoOrder("","项目大课",1,0,null)); list.add(new VideoOrder("","kafka",300,0,null)); } /** * run 方法调用前 用于初始化连接 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { System.out.println("-----open-----"); } /** * 用于清理之前 * @throws Exception */ @Override public void close() throws Exception { System.out.println("-----close-----"); } /** * 产生数据的逻辑 * @param ctx * @throws Exception */ @Override public void run(SourceContext<VideoOrder> ctx) throws Exception { while (flag){ Thread.sleep(1000); String id = UUID.randomUUID().toString(); int userId = random.nextInt(10); int videoNum = random.nextInt(list.size()); VideoOrder videoOrder = list.get(videoNum); videoOrder.setUserId(userId); videoOrder.setCreateTime(new Date()); videoOrder.setTradeNo(id); ctx.collect(videoOrder); } } /** * 控制任务取消 */ @Override public void cancel() { flag = false; } }
控制台输出
-----open-----
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=10, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=15, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=300, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=30, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=600, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=45, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=45, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=20, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=90, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=60, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=20, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=40, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=50, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=100, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=75, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=60, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=900, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=90, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=105, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=80, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=1200, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=1, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=135, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=120, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=180, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=100, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=2, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=1500, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=120, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=3, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=30, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=135, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=150, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=1800, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=40, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=200, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=140, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=150, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=160, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=4, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=165, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=180, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=180, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=250, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=2100, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=225, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=200, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=270, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=315, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=5, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=220, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=195, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=240, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=300, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=2400, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=260, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=360, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=210, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=280, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=6, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=2700, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=3000, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=50, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=350, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=225, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=7, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=240, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=300, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=320, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=60, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=400, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=3300, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=255, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=340, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=3600, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=270, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=285, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=405, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=450, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=3900, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=495, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=540, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=585, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=300, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=630, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=315, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=4200, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=70, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=675, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=4500, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=80, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=8, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=330, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=9, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=450, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=500, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=720, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=765, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=4800, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=345, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=5100, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=550, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=90, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=600, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=100, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=5400, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=650, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=360, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=10, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=375, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=11, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=390, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=5700, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=6000, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=6300, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=6600, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=405, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=12, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=6900, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=700, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=750, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=7200, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=360, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=380, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=13, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=810, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=400, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=855, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=900, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=420, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=7500, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=420, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=14, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=800, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=850, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
- 常规的数据流转
- DataStream->keyBy操作->KeyStream->window操作->windowStream->聚合操作->DataStream
- 注意: KeyBy后的聚合函数,只处理当前分组后组内的数据,不同组内数据互不影响
四、Flink 里面的filter和sum 算子实战电商订单成交价统计
filter过滤、sum 算子—— 大于20的才可以过去
代码实战
Flink11FilterApp
import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSourceV2; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Flink11FilterApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); SingleOutputStreamOperator<VideoOrder> filterDS = ds.filter(new FilterFunction<VideoOrder>() { @Override public boolean filter(VideoOrder value) throws Exception { return value.getMoney()>20; } }); KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = filterDS.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); SingleOutputStreamOperator<VideoOrder> sumDS = videoOrderStringKeyedStream.sum("money"); sumDS.print(); //DataStream需要调用execute,可以取个名称 env.execute("map job"); } }
控制台输出
-----open-----
VideoOrder(tradeNo=520a3662-2102-48ac-a994-bd888175ca83, title=面试专题第一季, money=50, userId=6, createTime=Mon Nov 15 11:40:50 CST 2021)
VideoOrder(tradeNo=3b2c60a1-90e9-4923-b0c4-a8c3110cdc59, title=flink, money=45, userId=1, createTime=Mon Nov 15 11:40:51 CST 2021)
VideoOrder(tradeNo=520a3662-2102-48ac-a994-bd888175ca83, title=面试专题第一季, money=100, userId=6, createTime=Mon Nov 15 11:40:50 CST 2021)
VideoOrder(tradeNo=47f26b1b-c4b2-45b6-b547-8a550f8d5eac, title=kafka, money=300, userId=1, createTime=Mon Nov 15 11:40:55 CST 2021)
VideoOrder(tradeNo=47f26b1b-c4b2-45b6-b547-8a550f8d5eac, title=kafka, money=600, userId=1, createTime=Mon Nov 15 11:40:55 CST 2021)
VideoOrder(tradeNo=47f26b1b-c4b2-45b6-b547-8a550f8d5eac, title=kafka, money=900, userId=1, createTime=Mon Nov 15 11:40:55 CST 2021)
VideoOrder(tradeNo=3b2c60a1-90e9-4923-b0c4-a8c3110cdc59, title=flink, money=90, userId=1, createTime=Mon Nov 15 11:40:51 CST 2021)
VideoOrder(tradeNo=520a3662-2102-48ac-a994-bd888175ca83, title=面试专题第一季, money=150, userId=6, createTime=Mon Nov 15 11:40:50 CST 2021)
VideoOrder(tradeNo=3b2c60a1-90e9-4923-b0c4-a8c3110cdc59, title=flink, money=135, userId=1, createTime=Mon Nov 15 11:40:51 CST 2021)
VideoOrder(tradeNo=520a3662-2102-48ac-a994-bd888175ca83, title=面试专题第一季, money=200, userId=6, createTime=Mon Nov 15 11:40:50 CST 2021)
这里再给个小例子,链式调用
import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSourceV2; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Flink11FilterChainApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); DataStream<VideoOrder> sumDS = ds.filter(new FilterFunction<VideoOrder>() { @Override public boolean filter(VideoOrder value) throws Exception { return value.getMoney() > 20; } }).keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }).sum("money"); sumDS.print(); //DataStream需要调用execute,可以取个名称 env.execute("map job"); } }
五、Flink 核心API Reduce聚合讲解和sum区别应用场景
reduce函数
- keyBy分组后聚合统计sum和reduce实现一样的效果
- 例子
Flink12KeyByReduceApp
import net.xdclass.model.VideoOrder; import net.xdclass.source.VideoOrderSourceV2; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Date; public class Flink12KeyByReduceApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //数据源 source DataStream<VideoOrder> ds = env.fromElements( new VideoOrder("21312","java",32,5,new Date()), new VideoOrder("314","java",32,5,new Date()), new VideoOrder("542","springboot",45,5,new Date()), new VideoOrder("42","redis",12,5,new Date()), new VideoOrder("4252","java",32,5,new Date()), new VideoOrder("42","springboot",45,5,new Date()), new VideoOrder("554232","flink",30,5,new Date()), new VideoOrder("23323","java",32,5,new Date()) ); //DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2()); KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); SingleOutputStreamOperator<VideoOrder> reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() { @Override public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception { VideoOrder videoOrder = new VideoOrder(); videoOrder.setTitle(value1.getTitle()); videoOrder.setMoney(value1.getMoney() + value2.getMoney()); return videoOrder; } }); reduce.print(); //SingleOutputStreamOperator<VideoOrder> money = videoOrderStringKeyedStream.sum("money"); //money.print(); //DataStream需要调用execute,可以取个名称 env.execute("map job"); } }
控制台输出
VideoOrder(tradeNo=null, title=java, money=128, userId=0, createTime=null)
VideoOrder(tradeNo=554232, title=flink, money=30, userId=5, createTime=Mon Nov 15 11:50:40 CST 2021)
VideoOrder(tradeNo=42, title=redis, money=12, userId=5, createTime=Mon Nov 15 11:50:40 CST 2021)
VideoOrder(tradeNo=null, title=springboot, money=90, userId=0, createTime=null)
六、Flink 核心API maxBy-max-minBy-min 区别和应用
- 如果是用了keyby,在后续算子要用maxby,minby类型,才可以再分组里面找对应的数据
- 如果是用max、min等,就不确定是哪个key中选了
- 如果是keyBy的是对象的某个属性,则分组用max/min聚合统计,只有聚合的字段会更新,其他字段还是旧的,导致对象不准确
- 需要用maxby/minBy才对让整个对象的属性都是最新的
- 例子
Flink13KeyByMaxByApp
import net.xdclass.model.VideoOrder; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Date; public class Flink13KeyByMaxByApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //env.setParallelism(1); //数据源 source DataStream<VideoOrder> ds = env.fromElements( new VideoOrder("1","java",31,15,new Date()), new VideoOrder("2","java",32,45,new Date()), new VideoOrder("3","java",33,52,new Date()), new VideoOrder("4","springboot",21,5,new Date()), new VideoOrder("5","redis",41,52,new Date()), new VideoOrder("6","redis",40,15,new Date()), new VideoOrder("7","kafka",1,55,new Date()) ); KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() { @Override public String getKey(VideoOrder value) throws Exception { return value.getTitle(); } }); //SingleOutputStreamOperator<VideoOrder> money = keyByDS.max("money"); SingleOutputStreamOperator<VideoOrder> money = keyByDS.maxBy("money"); money.print(); //DataStream需要调用execute,可以取个名称 env.execute("map job"); } }
控制台输出
1> VideoOrder(tradeNo=7, title=kafka, money=1, userId=55, createTime=Mon Nov 15 12:00:51 CST 2021)
2> VideoOrder(tradeNo=3, title=java, money=33, userId=52, createTime=Mon Nov 15 12:00:51 CST 2021)
12> VideoOrder(tradeNo=4, title=springboot, money=21, userId=5, createTime=Mon Nov 15 12:00:51 CST 2021)
1> VideoOrder(tradeNo=5, title=redis, money=41, userId=52, createTime=Mon Nov 15 12:00:51 CST 2021)
- KeyBy后可以用KeyedProcessFunction,更底层的API,有processElement和onTimer函数
本文作者为DBC,转载请注明。