一、Flink 里面的Map和FlatMap 算子实战订单转换
- 需求:多数算子,我们会用订单 转换-过滤-分组-统计 来实现
- 这样大家更加明白应用场景,比如应用到多个方面等
- 结果类型 idea自动提示
- 算子后 .var 回车 java类型
- 算子后 .val 回车 scala类型
- 什么是java里面的Map操作
- 一对一 转换对象
代码实现
import net.xdclass.model.VideoOrder;
import net.xdclass.sink.VideoOrderCounterSink;
import net.xdclass.source.VideoOrderSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import java.util.Date;
public class Flink09MapApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//数据源 source
DataStream<VideoOrder> ds = env.fromElements(
new VideoOrder("21312","java",32,5,new Date()),
new VideoOrder("314","java",32,5,new Date()),
new VideoOrder("542","springboot",32,5,new Date()),
new VideoOrder("42","redis",32,5,new Date()),
new VideoOrder("4252","java",32,5,new Date()),
new VideoOrder("42","springboot",32,5,new Date()),
new VideoOrder("554232","flink",32,5,new Date()),
new VideoOrder("23323","java",32,5,new Date())
);
//transformation
DataStream<Tuple2<String,Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
return new Tuple2<>(value.getTitle(),1);
}
});
mapDS.print();
//DataStream需要调用execute,可以取个名称
env.execute("map job");
}
}
控制台输出
(java,1)
(java,1)
(springboot,1)
(redis,1)
(java,1)
(springboot,1)
(flink,1)
(java,1)
- 什么是java里面的FlatMap操作
- 一对多转换对象
代码实现
import net.xdclass.model.VideoOrder;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Date;
public class Flink09FlatMapApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
DataStreamSource<String> ds = env.fromElements("spring,java", "springcloud,flink", "java,kafka");
SingleOutputStreamOperator<String> flatMapDS = ds.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String [] arr = value.split(",");
for(String str:arr){
out.collect(str);
}
}
});
flatMapDS.print();
//DataStream需要调用execute,可以取个名称
env.execute("flat map job");
}
} 控制台输出
spring
java
springcloud
flink
java
kafka
二、Flink 里面的RichMap和RichFlatMap 算子实战
- Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
- RichXXX相关Open、Close、setRuntimeContext等 API方法会根据并行度进行操作的
- 比如并行度是4,那就有4次触发对应的open/close方法等,是4个不同subtask
- 比如 RichMapFunction、RichFlatMapFunction、RichSourceFunction等
Flink09FlatMapApp
import net.xdclass.model.VideoOrder;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Date;
public class Flink09FlatMapApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(2);
DataStreamSource<String> ds = env.fromElements("spring,java", "springcloud,flink", "java,kafka");
SingleOutputStreamOperator<String> flatMapDS = ds.flatMap(new RichFlatMapFunction<String, String>() {
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("========open");
}
@Override
public void close() throws Exception {
System.out.println("========close");
}
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String [] arr = value.split(",");
for(String str:arr){
out.collect(str);
}
}
});
flatMapDS.print();
//DataStream需要调用execute,可以取个名称
env.execute("flat map job");
}
}
控制台输出
========open
========open
2> springcloud
1> spring
2> flink
1> java
1> java
1> kafka
========close
========close
Flink09MapApp
import net.xdclass.model.VideoOrder;
import net.xdclass.sink.VideoOrderCounterSink;
import net.xdclass.source.VideoOrderSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import java.util.Date;
public class Flink09MapApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//数据源 source
DataStream<VideoOrder> ds = env.fromElements(
new VideoOrder("21312","java",32,5,new Date()),
new VideoOrder("314","java",32,5,new Date()),
new VideoOrder("542","springboot",32,5,new Date()),
new VideoOrder("42","redis",32,5,new Date()),
new VideoOrder("4252","java",32,5,new Date()),
new VideoOrder("42","springboot",32,5,new Date()),
new VideoOrder("554232","flink",32,5,new Date()),
new VideoOrder("23323","java",32,5,new Date())
);
//transformation
DataStream<Tuple2<String,Integer>> mapDS = ds.map(new RichMapFunction<VideoOrder, Tuple2<String,Integer>>() {
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("========open");
}
@Override
public void close() throws Exception {
System.out.println("========close");
}
@Override
public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
return new Tuple2<>(value.getTitle(),1);
}
});
mapDS.print();
//DataStream需要调用execute,可以取个名称
env.execute("map job");
}
}
控制台输出
========open
(java,1)
(java,1)
(springboot,1)
(redis,1)
(java,1)
(springboot,1)
(flink,1)
(java,1)
========close
三、Flink 里面的KeyBy分组概念讲解+订单统计实战
KeyBy分组概念介绍
- keyBy是把数据流按照某个字段分区
- keyBy后是相同的数据放到同个组里面,再进行组内统计
代码实战
Flink10KeyByApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSource;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Date;
public class Flink10KeyByApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//数据源 source
// DataStream<VideoOrder> ds = env.fromElements(
// new VideoOrder("21312","java",32,5,new Date()),
// new VideoOrder("314","java",32,5,new Date()),
// new VideoOrder("542","springboot",45,5,new Date()),
// new VideoOrder("42","redis",12,5,new Date()),
// new VideoOrder("4252","java",32,5,new Date()),
// new VideoOrder("42","springboot",45,5,new Date()),
// new VideoOrder("554232","flink",30,5,new Date()),
// new VideoOrder("23323","java",32,5,new Date())
// );
DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
SingleOutputStreamOperator<VideoOrder> money = videoOrderStringKeyedStream.sum("money");
money.print();
//DataStream需要调用execute,可以取个名称
env.execute("map job");
}
} VideoOrderSourceV2
import net.xdclass.model.VideoOrder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.*;
public class VideoOrderSourceV2 extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<VideoOrder> list = new ArrayList<>();
static {
list.add(new VideoOrder("","java",10,0,null));
list.add(new VideoOrder("","spring boot",15,0,null));
list.add(new VideoOrder("","springc loud",20,0,null));
list.add(new VideoOrder("","flink",45,0,null));
list.add(new VideoOrder("","面试专题第一季",50,0,null));
list.add(new VideoOrder("","项目大课",1,0,null));
list.add(new VideoOrder("","kafka",300,0,null));
}
/**
* run 方法调用前 用于初始化连接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("-----open-----");
}
/**
* 用于清理之前
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("-----close-----");
}
/**
* 产生数据的逻辑
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<VideoOrder> ctx) throws Exception {
while (flag){
Thread.sleep(1000);
String id = UUID.randomUUID().toString();
int userId = random.nextInt(10);
int videoNum = random.nextInt(list.size());
VideoOrder videoOrder = list.get(videoNum);
videoOrder.setUserId(userId);
videoOrder.setCreateTime(new Date());
videoOrder.setTradeNo(id);
ctx.collect(videoOrder);
}
}
/**
* 控制任务取消
*/
@Override
public void cancel() {
flag = false;
}
}
控制台输出
-----open-----
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=10, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=15, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=300, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=30, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=600, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=45, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=45, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=20, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=90, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=60, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=20, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=40, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=50, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=100, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=75, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=60, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=900, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=90, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=105, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=80, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=1200, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=1, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=135, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=120, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=180, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=100, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=2, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=1500, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=120, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=3, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=30, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=135, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=150, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=1800, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=40, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=200, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=140, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=150, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=160, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=4, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=165, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=180, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=180, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=250, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=2100, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=225, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=200, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=270, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=315, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=5, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=220, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=195, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=240, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=300, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=2400, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=260, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=360, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=210, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=280, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=6, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=2700, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=3000, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=50, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=350, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=225, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=7, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=240, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=300, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=320, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=60, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=400, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=3300, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=255, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=340, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=3600, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=270, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=285, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=405, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=450, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=3900, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=495, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=540, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=585, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=300, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=630, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=315, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=4200, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=70, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=675, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=4500, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=80, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=8, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=330, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=9, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=450, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=500, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=720, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=765, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=4800, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=345, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=5100, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=550, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=90, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=600, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=fc5ec4ff-12a3-4600-bc45-8fd511b3ee36, title=java, money=100, userId=0, createTime=Mon Nov 15 11:29:45 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=5400, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=650, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=360, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=10, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=375, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=11, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=390, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=5700, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=6000, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=6300, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=6600, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=405, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=12, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=6900, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=700, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=750, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=7200, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=360, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=380, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=13, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=810, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=400, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=855, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=c8ba8a20-1e35-4f0f-a14d-0943b1d85718, title=flink, money=900, userId=8, createTime=Mon Nov 15 11:29:51 CST 2021)
VideoOrder(tradeNo=62df1007-4b38-4ba3-a0a5-38d443ea6f8c, title=spring boot, money=420, userId=4, createTime=Mon Nov 15 11:29:46 CST 2021)
VideoOrder(tradeNo=87c389d0-4c0c-4f61-95c7-b68fa415e0ab, title=kafka, money=7500, userId=6, createTime=Mon Nov 15 11:29:47 CST 2021)
VideoOrder(tradeNo=4a56570f-8e93-4691-8700-b7338df7a064, title=springc loud, money=420, userId=1, createTime=Mon Nov 15 11:29:52 CST 2021)
VideoOrder(tradeNo=d8b562ec-67a1-45ca-93e4-9909bf958bfa, title=项目大课, money=14, userId=9, createTime=Mon Nov 15 11:30:06 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=800, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
VideoOrder(tradeNo=5388766e-69a4-4768-acbe-9061888fbf89, title=面试专题第一季, money=850, userId=8, createTime=Mon Nov 15 11:29:57 CST 2021)
- 常规的数据流转
- DataStream->keyBy操作->KeyStream->window操作->windowStream->聚合操作->DataStream
- 注意: KeyBy后的聚合函数,只处理当前分组后组内的数据,不同组内数据互不影响
四、Flink 里面的filter和sum 算子实战电商订单成交价统计
filter过滤、sum 算子—— 大于20的才可以过去
代码实战
Flink11FilterApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink11FilterApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
SingleOutputStreamOperator<VideoOrder> filterDS = ds.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder value) throws Exception {
return value.getMoney()>20;
}
});
KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = filterDS.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
SingleOutputStreamOperator<VideoOrder> sumDS = videoOrderStringKeyedStream.sum("money");
sumDS.print();
//DataStream需要调用execute,可以取个名称
env.execute("map job");
}
} 控制台输出
-----open-----
VideoOrder(tradeNo=520a3662-2102-48ac-a994-bd888175ca83, title=面试专题第一季, money=50, userId=6, createTime=Mon Nov 15 11:40:50 CST 2021)
VideoOrder(tradeNo=3b2c60a1-90e9-4923-b0c4-a8c3110cdc59, title=flink, money=45, userId=1, createTime=Mon Nov 15 11:40:51 CST 2021)
VideoOrder(tradeNo=520a3662-2102-48ac-a994-bd888175ca83, title=面试专题第一季, money=100, userId=6, createTime=Mon Nov 15 11:40:50 CST 2021)
VideoOrder(tradeNo=47f26b1b-c4b2-45b6-b547-8a550f8d5eac, title=kafka, money=300, userId=1, createTime=Mon Nov 15 11:40:55 CST 2021)
VideoOrder(tradeNo=47f26b1b-c4b2-45b6-b547-8a550f8d5eac, title=kafka, money=600, userId=1, createTime=Mon Nov 15 11:40:55 CST 2021)
VideoOrder(tradeNo=47f26b1b-c4b2-45b6-b547-8a550f8d5eac, title=kafka, money=900, userId=1, createTime=Mon Nov 15 11:40:55 CST 2021)
VideoOrder(tradeNo=3b2c60a1-90e9-4923-b0c4-a8c3110cdc59, title=flink, money=90, userId=1, createTime=Mon Nov 15 11:40:51 CST 2021)
VideoOrder(tradeNo=520a3662-2102-48ac-a994-bd888175ca83, title=面试专题第一季, money=150, userId=6, createTime=Mon Nov 15 11:40:50 CST 2021)
VideoOrder(tradeNo=3b2c60a1-90e9-4923-b0c4-a8c3110cdc59, title=flink, money=135, userId=1, createTime=Mon Nov 15 11:40:51 CST 2021)
VideoOrder(tradeNo=520a3662-2102-48ac-a994-bd888175ca83, title=面试专题第一季, money=200, userId=6, createTime=Mon Nov 15 11:40:50 CST 2021)
这里再给个小例子,链式调用
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink11FilterChainApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
DataStream<VideoOrder> sumDS = ds.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder value) throws Exception {
return value.getMoney() > 20;
}
}).keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
}).sum("money");
sumDS.print();
//DataStream需要调用execute,可以取个名称
env.execute("map job");
}
} 五、Flink 核心API Reduce聚合讲解和sum区别应用场景
reduce函数
- keyBy分组后聚合统计sum和reduce实现一样的效果
- 例子
Flink12KeyByReduceApp
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSourceV2;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Date;
public class Flink12KeyByReduceApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//数据源 source
DataStream<VideoOrder> ds = env.fromElements(
new VideoOrder("21312","java",32,5,new Date()),
new VideoOrder("314","java",32,5,new Date()),
new VideoOrder("542","springboot",45,5,new Date()),
new VideoOrder("42","redis",12,5,new Date()),
new VideoOrder("4252","java",32,5,new Date()),
new VideoOrder("42","springboot",45,5,new Date()),
new VideoOrder("554232","flink",30,5,new Date()),
new VideoOrder("23323","java",32,5,new Date())
);
//DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
SingleOutputStreamOperator<VideoOrder> reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() {
@Override
public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception {
VideoOrder videoOrder = new VideoOrder();
videoOrder.setTitle(value1.getTitle());
videoOrder.setMoney(value1.getMoney() + value2.getMoney());
return videoOrder;
}
});
reduce.print();
//SingleOutputStreamOperator<VideoOrder> money = videoOrderStringKeyedStream.sum("money");
//money.print();
//DataStream需要调用execute,可以取个名称
env.execute("map job");
}
}
控制台输出
VideoOrder(tradeNo=null, title=java, money=128, userId=0, createTime=null)
VideoOrder(tradeNo=554232, title=flink, money=30, userId=5, createTime=Mon Nov 15 11:50:40 CST 2021)
VideoOrder(tradeNo=42, title=redis, money=12, userId=5, createTime=Mon Nov 15 11:50:40 CST 2021)
VideoOrder(tradeNo=null, title=springboot, money=90, userId=0, createTime=null)
六、Flink 核心API maxBy-max-minBy-min 区别和应用
- 如果是用了keyby,在后续算子要用maxby,minby类型,才可以再分组里面找对应的数据
- 如果是用max、min等,就不确定是哪个key中选了
- 如果是keyBy的是对象的某个属性,则分组用max/min聚合统计,只有聚合的字段会更新,其他字段还是旧的,导致对象不准确
- 需要用maxby/minBy才对让整个对象的属性都是最新的
- 例子
Flink13KeyByMaxByApp
import net.xdclass.model.VideoOrder;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Date;
public class Flink13KeyByMaxByApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setParallelism(1);
//数据源 source
DataStream<VideoOrder> ds = env.fromElements(
new VideoOrder("1","java",31,15,new Date()),
new VideoOrder("2","java",32,45,new Date()),
new VideoOrder("3","java",33,52,new Date()),
new VideoOrder("4","springboot",21,5,new Date()),
new VideoOrder("5","redis",41,52,new Date()),
new VideoOrder("6","redis",40,15,new Date()),
new VideoOrder("7","kafka",1,55,new Date())
);
KeyedStream<VideoOrder, String> keyByDS = ds.keyBy(new KeySelector<VideoOrder, String>() {
@Override
public String getKey(VideoOrder value) throws Exception {
return value.getTitle();
}
});
//SingleOutputStreamOperator<VideoOrder> money = keyByDS.max("money");
SingleOutputStreamOperator<VideoOrder> money = keyByDS.maxBy("money");
money.print();
//DataStream需要调用execute,可以取个名称
env.execute("map job");
}
}
控制台输出
1> VideoOrder(tradeNo=7, title=kafka, money=1, userId=55, createTime=Mon Nov 15 12:00:51 CST 2021)
2> VideoOrder(tradeNo=3, title=java, money=33, userId=52, createTime=Mon Nov 15 12:00:51 CST 2021)
12> VideoOrder(tradeNo=4, title=springboot, money=21, userId=5, createTime=Mon Nov 15 12:00:51 CST 2021)
1> VideoOrder(tradeNo=5, title=redis, money=41, userId=52, createTime=Mon Nov 15 12:00:51 CST 2021)
- KeyBy后可以用KeyedProcessFunction,更底层的API,有processElement和onTimer函数
本文作者为DBC,转载请注明。
