一、Flink 的核心知识 Sink Operator
- Flink编程模型
- Sink 输出源
- 预定义
- writeAsText (过期)
- 自定义
- SinkFunction
- RichSinkFunction
- Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
- flink官方提供 Bundle Connector
- kafka、ES 等
- Apache Bahir
- kafka、ES、Redis等
- 预定义
- 预定义Sink输出实战
二、Flink 自定义的Sink 连接Mysql存储商品订单案例实战《上》
- 自定义
- SinkFunction
- RichSinkFunction
- Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
- Flink连接mysql的几种方式(都需要加jdbc驱动)
- 方式一:自带flink-connector-jdbc 需要加依赖包
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.12</artifactId> <version>1.12.0</version> </dependency>
- 方式二:自定义sink
建表
CREATE TABLE `video_order` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `user_id` int(11) DEFAULT NULL, `money` int(11) DEFAULT NULL, `title` varchar(32) DEFAULT NULL, `trade_no` varchar(64) DEFAULT NULL, `create_time` date DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
添加jdbc依赖
<!--mysql驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.25</version> </dependency>
编码——MysqlSink
点击查看完整内容
import net.xdclass.model.VideoOrder; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; public class MysqlSink extends RichSinkFunction<VideoOrder> { private Connection conn; private PreparedStatement ps; /** * 初始化连接 * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { System.out.println("open======="); conn = DriverManager.getConnection("jdbc:mysql://8.142.19.202:3306/xd_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "mima"); String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);"; ps = conn.prepareStatement(sql); } /** * 关闭链接 * @throws Exception */ @Override public void close() throws Exception { System.out.println("close======="); if(conn != null){ conn.close(); } if(ps != null){ ps.close();; } } /** * 执行对应的sql * @param value * @param context * @throws Exception */ @Override public void invoke(VideoOrder value, Context context) throws Exception { ps.setInt(1,value.getUserId()); ps.setInt(2,value.getMoney()); ps.setString(3,value.getTitle()); ps.setString(4,value.getTradeNo()); ps.setDate(5,new Date(value.getCreateTime().getTime())); ps.executeUpdate(); } }
三、Flink 自定义的Sink 连接Mysql存储商品订单案例实战《下》
Flink整合自定义Sink实战
Flink06CustomMysqSinkApp
点击查看完整内容
import net.xdclass.model.VideoOrder; import net.xdclass.sink.MysqlSink; import net.xdclass.source.VideoOrderSource; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Flink06CustomMysqSinkApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); DataStream<VideoOrder> videoOrderDS = env.addSource(new VideoOrderSource()); DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction<VideoOrder>() { @Override public boolean filter(VideoOrder videoOrder) throws Exception { return videoOrder.getMoney()>50; } }); filterDS.print(); filterDS.addSink(new MysqlSink()); //DataStream需要调用execute,可以取个名称 env.execute("custom mysql sink job"); } }
- 自定义MysqlSink
- 建议继承RichSinkFunction函数
- 用open()函数初始化JDBC连接
- invoke SQL预编译器等运行时环境
- close()函数做清理工作
- 如果选择继承SinkFunction,会在每次写入一条数据时都会创建一个JDBC连接
- 建议继承RichSinkFunction函数
效果如下
四、Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X
- Flink怎么操作redis?
- 方式一:自定义sink
- 方式二:使用connector
- Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法
- getCommandDescription 选择对应的数据结构和key名称配置
- getKeyFromData 获取key
- getValueFromData 获取value
- 使用
- 添加依赖
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
编码
VideoOrderCounterSink
点击查看完整内容
<pre class="prettyprint lang-java linenums:1">import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import redis.clients.jedis.Tuple; public class VideoOrderCounterSink implements RedisMapper<Tuple2<String,Integer>> { /*** * 选择需要用到的命令,和key名称 * @return */ @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET,"VIDEO_ORDER_COUNTER"); } /** * 获取对应的key或者filed * @param data * @return */ @Override public String getKeyFromData(Tuple2<String, Integer> data) { System.out.println("getKeyFromData="+data.f0); return data.f0; } /** * 获取对应的值 * @param data * @return */ @Override public String getValueFromData(Tuple2<String, Integer> data) { System.out.println("getValueFromData="+data.f1.toString()); return data.f1.toString(); } } </pre>
五、Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战
编码
Flink07RedisSinkApp
点击查看完整内容
import net.xdclass.model.VideoOrder; import net.xdclass.sink.MysqlSink; import net.xdclass.sink.VideoOrderCounterSink; import net.xdclass.source.VideoOrderSource; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.util.Collector; import java.util.Date; public class Flink07RedisSinkApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //数据源 source DataStream<VideoOrder> ds = env.fromElements( new VideoOrder("21312","java",32,5,new Date()), new VideoOrder("314","java",32,5,new Date()), new VideoOrder("542","springboot",32,5,new Date()), new VideoOrder("42","redis",32,5,new Date()), new VideoOrder("4252","java",32,5,new Date()), new VideoOrder("42","springboot",32,5,new Date()), new VideoOrder("554232","flink",32,5,new Date()), new VideoOrder("23323","java",32,5,new Date()) ); //DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource()); //transformation DataStream<Tuple2<String,Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() { @Override public Tuple2<String, Integer> map(VideoOrder value) throws Exception { return new Tuple2<>(value.getTitle(),1); } }); //只是一对一记录而已,没必要使用flatMap // DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() { // @Override // public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception { // out.collect(new Tuple2<>(value.getTitle(),1)); // } // }); //分组 //key是返回的类型,value是分组key的类型; DataSet里面分组是groupBy, 流处理里面分组用 keyBy KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String,Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }); //统计每组有多少个 //对各个组内的数据按照数量(value)进行聚合就是求sum, 1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加 DataStream<Tuple2<String,Integer>> sumDS = keyByDS.sum(1); //控制台打印 sumDS.print(); //单机redis FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("8.142.19.202").setPassword("密码").setPort(端口).build(); // sumDS.addSink(new RedisSink<>(conf,new VideoOrderCounterSink())); //DataStream需要调用execute,可以取个名称 env.execute("custom redis sink job"); } }
运行结果
本文作者为DBC,转载请注明。