一、Flink 的核心知识 Sink Operator
- Flink编程模型

- Sink 输出源
- 预定义
- writeAsText (过期)
- 自定义
- SinkFunction
- RichSinkFunction
- Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
- flink官方提供 Bundle Connector
- kafka、ES 等
- Apache Bahir
- kafka、ES、Redis等
- 预定义
- 预定义Sink输出实战
二、Flink 自定义的Sink 连接Mysql存储商品订单案例实战《上》
- 自定义
- SinkFunction
- RichSinkFunction
- Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
- Flink连接mysql的几种方式(都需要加jdbc驱动)
- 方式一:自带flink-connector-jdbc 需要加依赖包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.12.0</version>
</dependency> - 方式二:自定义sink
建表
CREATE TABLE `video_order` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `user_id` int(11) DEFAULT NULL, `money` int(11) DEFAULT NULL, `title` varchar(32) DEFAULT NULL, `trade_no` varchar(64) DEFAULT NULL, `create_time` date DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
添加jdbc依赖
<!--mysql驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency> 编码——MysqlSink
点击查看完整内容
import net.xdclass.model.VideoOrder;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MysqlSink extends RichSinkFunction<VideoOrder> {
private Connection conn;
private PreparedStatement ps;
/**
* 初始化连接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open=======");
conn = DriverManager.getConnection("jdbc:mysql://8.142.19.202:3306/xd_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "mima");
String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
ps = conn.prepareStatement(sql);
}
/**
* 关闭链接
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("close=======");
if(conn != null){
conn.close();
}
if(ps != null){
ps.close();;
}
}
/**
* 执行对应的sql
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(VideoOrder value, Context context) throws Exception {
ps.setInt(1,value.getUserId());
ps.setInt(2,value.getMoney());
ps.setString(3,value.getTitle());
ps.setString(4,value.getTradeNo());
ps.setDate(5,new Date(value.getCreateTime().getTime()));
ps.executeUpdate();
}
}
三、Flink 自定义的Sink 连接Mysql存储商品订单案例实战《下》
Flink整合自定义Sink实战
Flink06CustomMysqSinkApp
点击查看完整内容
import net.xdclass.model.VideoOrder;
import net.xdclass.sink.MysqlSink;
import net.xdclass.source.VideoOrderSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink06CustomMysqSinkApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource());
DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() {
@Override
public boolean filter(VideoOrder videoOrder) throws Exception {
return videoOrder.getMoney()>50;
}
});
filterDS.print();
filterDS.addSink(new MysqlSink());
//DataStream需要调用execute,可以取个名称
env.execute("custom mysql sink job");
}
}
- 自定义MysqlSink
- 建议继承RichSinkFunction函数
- 用open()函数初始化JDBC连接
- invoke SQL预编译器等运行时环境
- close()函数做清理工作
- 如果选择继承SinkFunction,会在每次写入一条数据时都会创建一个JDBC连接
- 建议继承RichSinkFunction函数
效果如下
四、Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X
- Flink怎么操作redis?
- 方式一:自定义sink
- 方式二:使用connector
- Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法
- getCommandDescription 选择对应的数据结构和key名称配置
- getKeyFromData 获取key
- getValueFromData 获取value
- 使用
- 添加依赖
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency> 编码
VideoOrderCounterSink
点击查看完整内容
<pre class="prettyprint lang-java linenums:1">import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import redis.clients.jedis.Tuple;
public class VideoOrderCounterSink implements RedisMapper<Tuple2<String,Integer>> {
/***
* 选择需要用到的命令,和key名称
* @return
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"VIDEO_ORDER_COUNTER");
}
/**
* 获取对应的key或者filed
* @param data
* @return
*/
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
System.out.println("getKeyFromData="+data.f0);
return data.f0;
}
/**
* 获取对应的值
* @param data
* @return
*/
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
System.out.println("getValueFromData="+data.f1.toString());
return data.f1.toString();
}
}
</pre> 五、Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战
编码
Flink07RedisSinkApp
点击查看完整内容
import net.xdclass.model.VideoOrder;
import net.xdclass.sink.MysqlSink;
import net.xdclass.sink.VideoOrderCounterSink;
import net.xdclass.source.VideoOrderSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.util.Collector;
import java.util.Date;
public class Flink07RedisSinkApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//数据源 source
DataStream<VideoOrder> ds = env.fromElements(
new VideoOrder("21312","java",32,5,new Date()),
new VideoOrder("314","java",32,5,new Date()),
new VideoOrder("542","springboot",32,5,new Date()),
new VideoOrder("42","redis",32,5,new Date()),
new VideoOrder("4252","java",32,5,new Date()),
new VideoOrder("42","springboot",32,5,new Date()),
new VideoOrder("554232","flink",32,5,new Date()),
new VideoOrder("23323","java",32,5,new Date())
);
//DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
//transformation
DataStream<Tuple2<String,Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
return new Tuple2<>(value.getTitle(),1);
}
});
//只是一对一记录而已,没必要使用flatMap
// DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
// @Override
// public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
// out.collect(new Tuple2<>(value.getTitle(),1));
// }
// });
//分组
//key是返回的类型,value是分组key的类型; DataSet里面分组是groupBy, 流处理里面分组用 keyBy
KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//统计每组有多少个
//对各个组内的数据按照数量(value)进行聚合就是求sum, 1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加
DataStream<Tuple2<String,Integer>> sumDS = keyByDS.sum(1);
//控制台打印
sumDS.print();
//单机redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("8.142.19.202").setPassword("密码").setPort(端口).build();
//
sumDS.addSink(new RedisSink<>(conf,new VideoOrderCounterSink()));
//DataStream需要调用execute,可以取个名称
env.execute("custom redis sink job");
}
}
运行结果
本文作者为DBC,转载请注明。



