一、java版本环境搭建
可能需要的包
点击查看完整内容
<properties> <encoding>UTF-8</encoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <java.version>1.8</java.version> <scala.version>2.12</scala.version> <flink.version>1.13.1</flink.version> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.16</version> </dependency> <!--flink客户端--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!--scala版本--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!--java版本--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <!--streaming的scala版本--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!--streaming的java版本--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!--日志输出--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> <!--json依赖包--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.44</version> </dependency> </dependencies>
log4j.properties
点击查看完整内容
### 配置appender名称 log4j.rootLogger = debugFile, errorFile ### debug级别以上的日志到:src/logs/flink.log log4j.appender.debugFile = org.apache.log4j.DailyRollingFileAppender log4j.appender.debugFile.File = src/logs/flink.log log4j.appender.debugFile.Append = true #Threshold属性指定输出等级 log4j.appender.debugFile.Threshold = info log4j.appender.debugFile.layout = org.apache.log4j.PatternLayout log4j.appender.debugFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %n%m%n ### error级别以上的日志 src/logs/error.log log4j.appender.errorFile = org.apache.log4j.DailyRollingFileAppender log4j.appender.errorFile.File = src/logs/error.log log4j.appender.errorFile.Append = true log4j.appender.errorFile.Threshold = error log4j.appender.errorFile.layout = org.apache.log4j.PatternLayout log4j.appender.errorFile.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %n%m%n
二、Tuple数据类型+Map+FlatMap操作介绍
- 什么是Tuple类型
- 元组类型, 多个语言都有的特性, flink的java版 tuple最多支持25个
- 用途
- 函数返回(return)多个值,多个不同类型的对象
- List集合不是也可以吗,集合里面是单个类型
- 列表只能存储相同的数据类型,而元组Tuple可以存储不同的数据类型
小例子测试一下
private static void tupleTest(){ Tuple3<Integer,String,Long> tuple3 = Tuple3.of(1,"xdclass.net",120L); System.out.println(tuple3.f0); System.out.println(tuple3.f1); System.out.println(tuple3.f2); }
控制台输出
点击查看完整内容
1
xdclass.net
120
什么是java里面的Map操作
小例子测试一下
private static void mapTest() { List<String> list1 = new ArrayList<>(); list1.add("springboot,springcloud"); list1.add("redis6,docker"); list1.add("kafka,rabbitmq"); List<String> result = list1.stream().map(obj -> { obj = "小滴课堂" + obj; return obj; }).collect(Collectors.toList()); System.out.println(result); }
控制台输出
点击查看完整内容
[小滴课堂springboot,springcloud, 小滴课堂redis6,docker, 小滴课堂kafka,rabbitmq]
什么是java里面的FlatMap操作
- 一对多转换对象
小例子测试一下
private static void flatMapTest() { List<String> list1 = new ArrayList<>(); list1.add("springboot,springcloud"); list1.add("redis6,docker"); list1.add("kafka,rabbitmq"); List<String> result = list1.stream().flatMap(obj -> { Stream<String> stream = Arrays.stream(obj.split(",")); return stream; }).collect(Collectors.toList()); System.out.println(result); }
控制台输出
点击查看完整内容
[springboot, springcloud, redis6, docker, kafka, rabbitmq]
三、流处理案例-代码编写实战
点击查看完整内容
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class Flink01App { /** * source * transformation * sink * @param args */ public static void main(String [] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(1); //相同类型元素的数据流 source DataStream<String> stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂"); stringDS.print("处理前"); // FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型 DataStream<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> collector) throws Exception { String [] arr = value.split(","); for(String str : arr){ //收集返回来的结果,其实就是加入的意思 collector.collect(str); } } }); //输出 sink flatMapDS.print("处理后"); //DataStream需要调用execute,可以取个名称 env.execute("flat map job"); } }
控制台输出
处理前> java,SpringBoot
处理后> java
处理后> SpringBoot
处理前> spring cloud,redis
处理后> spring cloud
处理后> redis
处理前> kafka,小滴课堂
处理后> kafka
处理后> 小滴课堂
四、批处理案例-代码编写实战
控制台输出
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class Flink02DataSetApp { /** * source * transformation * sink * @param args */ public static void main(String [] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(1); //相同类型元素的数据集 source DataSet<String> stringDS = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂"); stringDS.print("处理前"); // FlatMapFunction<String, String>, key是输入类型,value是Collector响应的收集的类型,看源码注释,也是 DataStream<String>里面泛型类型 DataSet<String> flatMapDS = stringDS.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> collector) throws Exception { String [] arr = value.split(","); for(String str : arr){ collector.collect(str); } } }); //输出 sink flatMapDS.print("处理后"); //DataStream需要调用execute,可以取个名称 env.execute("flat map job"); } }
控制台输出
处理前> java,SpringBoot
处理前> spring cloud,redis
处理前> kafka,小滴课堂
处理后> java
处理后> SpringBoot
处理后> spring cloud
处理后> redis
处理后> kafka
处理后> 小滴课堂
- 注意
- Flink1.12时支持流批一体,DataSetAPI已经不推荐使用了,案例都会优先使用DataStream流式API
五、Blink介绍和IDEA里面运行Flink运行流程解析
- Flink和Blink关系
- 2019年Flink的母公司被阿里全资收购
- 阿里进行高度定制并取名为Blink (加了很多特性 )
- 阿里巴巴官方说明:Blink不会单独作为一个开源项目运作,而是Flink的一部分
- 都在不断演进中,对比其他流式计算框架(老到新)
- Storm 只支持流处理
- Spark Streaming (流式处理,其实是micro-batch微批处理,本质还是批处理)
- Flink 支持流批一体
- 算子Operator
- 将一个或多个DataStream转换为新的DataStream,可以将多个转换组合成复杂的数据流拓扑
- Source 和 Sink 是数据输入和数据输出的特殊算子,重点是transformation类的算子
- 代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 那我们在IDEA里面运行这样就行?实际项目也是这样用???
- flink可以本地idea执行模拟多线程执行,但不能读取配置文件,适合本地调试
- 可以提交到远程搭建的flink集群
- getExecutionEnvironment() 是flink封装好的方式可以自动判断运行模式,更方便开发,
- 如果程序是独立调用的,此方法返回本地执行环境;
- 如果从命令行客户端调用程序以提交到集群,则返回此集群的执行环境,是最常用的一种创建执行环境的方式
- 最终线上部署会把main函数打成jar包,提交到Flink集群进行运行, 会有UI可视化界面
- 服务端部署例子(后续会讲)
- Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同
- Local 本地部署,直接启动进程,适合调试使用
- Standalone Cluster集群部署,flink自带集群模式
- On Yarn 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率,生产环境
- 服务端部署例子(后续会讲)
六、Flink可视化控制台依赖配置和界面介绍
WebUI可视化界面
- 访问:ip:8081
- 方式一:服务端部署Flink集群(生产环境)
- 方式二:本地依赖添加(测试开发)
pom文件
<!--Flink web ui--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.version}</artifactId> <version>${flink.version}</version> </dependency>
代码开发
点击查看完整内容
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WebUIApp { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); //env.setParallelism(1); DataStream<String> stringDataStream = env.socketTextStream("127.0.0.1",8888); DataStream<String> flatMapDataStream = stringDataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { String[] arr = value.split(","); for (String word : arr) { out.collect(word); } } }); flatMapDataStream.print("结果"); //DataStream需要调用execute,可以取个名称 env.execute("data stream job"); } }
- nc命令介绍
- Linux nc命令用于设置网络路由的
- nc -lk 8888
- 开启 监听模式,用于指定nc将处于监听模式, 等待客户端来链接指定的端口
- win | linux 需要安装
- win 百度搜索博文参考不同系统安装
- linux 安装
- yum install -y netcat
- yum install -y nc
windows下是没有-k命令的,那么执行nc -l 9999 报错local listen fuxored: INVAL
在windows下监听端口正确命令是
nc -l -p 9999
这里博主也不知道win能不能成功,博主是Linux系统直接实现的
本文作者为DBC,转载请注明。