一、Flink 的API层级介绍Source Operator速览
- Flink的API层级 为流式/批式处理应用程序的开发提供了不同级别的抽象
- 第一层是最底层的抽象为有状态实时流处理,抽象实现是 Process Function,用于底层处理
- 第二层抽象是 Core APIs,许多应用程序不需要使用到上述最底层抽象的 API,而是使用 Core APIs 进行开发
- 例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等,此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
- 第三层抽象是 Table API。 是以表Table为中心的声明式编程API,Table API 使用起来很简洁但是表达能力差
- 类似数据库中关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等
- 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用
- 第四层最顶层抽象是 SQL,这层程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式
- SQL 抽象与 Table API 抽象之间的关联是非常紧密的
- 注意:Table和SQL层变动多,还在持续发展中,大致知道即可,核心是第一和第二层

- Flink编程模型
- Source来源
- 元素集合
- env.fromElements
- env.fromColletion
- env.fromSequence(start,end);
- 文件/文件系统
- env.readTextFile(本地文件);
- env.readTextFile(HDFS文件);
- 基于Socket
- env.socketTextStream("ip", 8888)
- 自定义Source,实现接口自定义数据源,rich相关的api更丰富
- 并行度为1
- SourceFunction
- RichSourceFunction
- 并行度大于1
- ParallelSourceFunction
- RichParallelSourceFunction
- 并行度为1
- 元素集合
- Connectors与第三方系统进行对接(用于source或者sink都可以)
- Flink本身提供Connector例如kafka、RabbitMQ、ES等
- 注意:Flink程序打包一定要将相应的connetor相关类打包进去,不然就会失败
- Apache Bahir连接器
- 里面也有kafka、RabbitMQ、ES的连接器更多
- 总结 和外部系统进行读取写入的
- 第一种 Flink 里面预定义的 source 和 sink。
- 第二种 Flink 内部也提供部分 Boundled connectors。
- 第三种是第三方 Apache Bahir 项目中的连接器。
- 第四种是通过异步 IO 方式
- 异步I/O是Flink提供的非常底层的与外部系统交互
二、Flink 预定义的Source 数据源 案例实战
- Source来源
- 元素集合
- env.fromElements
- env.fromColletion
- env.fromSequence(start,end);
- 元素集合
代码实战
点击查看完整内容
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class Flink03Source1App {
/**
* source
* transformation
* sink
* @param args
*/
public static void main(String [] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//相同类型元素的数据流 source
DataStream<String> ds1 = env.fromElements("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂");
ds1.print("ds1:");
//相同类型元素的数据流 source
DataStream<String> ds2 = env.fromCollection(Arrays.asList("java,SpringBoot", "spring cloud,redis", "kafka,小滴课堂"));
ds2.print("ds2:");
DataStream<Long> ds3 = env.fromSequence(1,10);
ds3.print("ds3:");
//DataStream需要调用execute,可以取个名称
env.execute("source job");
}
} 控制台输出
点击查看完整内容
ds3::4> 4
ds3::6> 9
ds3::11> 5
ds3::10> 8
ds3::5> 2
ds3::2> 7
ds3::9> 10
ds3::7> 3
ds3::8> 6
ds3::1> 1
ds1::12> kafka,小滴课堂
ds2::11> spring cloud,redis
ds1::10> java,SpringBoot
ds1::11> spring cloud,redis
ds2::12> kafka,小滴课堂
ds2::10> java,SpringBoot
Process finished with exit code 0
三、Flink自定义的Source 数据源案例-订单来源实战
- 自定义Source,实现接口自定义数据源
- 并行度为1
- SourceFunction
- RichSourceFunction
- 并行度大于1
- ParallelSourceFunction
- RichParallelSourceFunction
- Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
- 并行度为1
VideoOrderSource
点击查看完整内容
import net.xdclass.model.VideoOrder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.*;
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<String> list = new ArrayList<>();
static {
list.add("spring boot2.x课程");
list.add("微服务SpringCloud课程");
list.add("RabbitMQ消息队列");
list.add("Kafka课程");
list.add("小滴课堂面试专题第一季");
list.add("Flink流式技术课程");
list.add("工业级微服务项目大课训练营");
list.add("Linux课程");
}
/**
* run 方法调用前 用于初始化连接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("-----open-----");
}
/**
* 用于清理之前
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("-----close-----");
}
/**
* 产生数据的逻辑
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<VideoOrder> ctx) throws Exception {
while (flag){
Thread.sleep(1000);
String id = UUID.randomUUID().toString();
int userId = random.nextInt(10);
int money = random.nextInt(100);
int videoNum = random.nextInt(list.size());
String title = list.get(videoNum);
VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());
ctx.collect(videoOrder);
}
}
/**
* 控制任务取消
*/
@Override
public void cancel() {
flag = false;
}
}
VideoOrder
点击查看完整内容
@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
private String tradeNo;
private String title;
private int money;
private int userId;
private Date createTime;
} Flink04CustomSourceApp
点击查看完整内容
import net.xdclass.model.VideoOrder;
import net.xdclass.source.VideoOrderSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink04CustomSourceApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setParallelism(1);
DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
videoOrderDS.print();
//DataStream需要调用execute,可以取个名称
env.execute("source job");
}
} 我们也可以过滤一下数据,比如过滤金额小于5的
代码如下
DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder videoOrder) throws Exception {
return videoOrder.getMoney() < 5;
}
}).setParallelism(3);
filterDS.print().setParallelism(4); 控制台输出
本文作者为DBC,转载请注明。
