玩转Flink里面核心的Sink Operator实战——第五课

DBC 715 0

一、Flink 的核心知识 Sink Operator

  • Flink编程模型

玩转Flink里面核心的Sink Operator实战——第五课插图

 

  • Sink 输出源
    • 预定义
      • print
      • writeAsText (过期)
    • 自定义
      • SinkFunction
      • RichSinkFunction
        • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
    • flink官方提供 Bundle Connector
      • kafka、ES 等
    • Apache Bahir
      • kafka、ES、Redis等

 

  • 预定义Sink输出实战

二、Flink 自定义的Sink 连接Mysql存储商品订单案例实战《上》

  • 自定义
    • SinkFunction
    • RichSinkFunction
      • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
  • Flink连接mysql的几种方式(都需要加jdbc驱动)
    • 方式一:自带flink-connector-jdbc 需要加依赖包
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
  • 方式二:自定义sink

建表

CREATE TABLE `video_order` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `money` int(11) DEFAULT NULL,
  `title` varchar(32) DEFAULT NULL,
  `trade_no` varchar(64) DEFAULT NULL,
  `create_time` date DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

添加jdbc依赖

        <!--mysql驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>

编码——MysqlSink

点击查看完整内容

三、Flink 自定义的Sink 连接Mysql存储商品订单案例实战《下》

Flink整合自定义Sink实战

Flink06CustomMysqSinkApp

点击查看完整内容
  • 自定义MysqlSink
    • 建议继承RichSinkFunction函数
      • 用open()函数初始化JDBC连接
      • invoke SQL预编译器等运行时环境
      • close()函数做清理工作
    • 如果选择继承SinkFunction,会在每次写入一条数据时都会创建一个JDBC连接

效果如下

玩转Flink里面核心的Sink Operator实战——第五课插图2

玩转Flink里面核心的Sink Operator实战——第五课插图4

四、Flink 商品销量统计-实战Bahir Connetor实战存储 数据到Redis6.X

  • Flink怎么操作redis?
    • 方式一:自定义sink
    • 方式二:使用connector
  • Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法
    • getCommandDescription 选择对应的数据结构和key名称配置
    • getKeyFromData 获取key
    • getValueFromData 获取value
  • 使用
    • 添加依赖
<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

编码

VideoOrderCounterSink

点击查看完整内容

五、Flink 商品销量统计-转换-分组-聚合-存储自定义的Redis Sink实战

温馨提示

这里一定要有redis运行菜可以喔!

编码

Flink07RedisSinkApp

点击查看完整内容

运行结果

玩转Flink里面核心的Sink Operator实战——第五课插图6

玩转Flink里面核心的Sink Operator实战——第五课插图8

发表评论 取消回复
表情 图片 链接 代码

分享