玩转Flink里面核心Source Sink对接 Kafka Connetor实战——第六课

DBC 1.6K 0

一、Kafka相关运行环境说明和必备基础知识点

Docker快速部署kafka

zk

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

kafka

docker run -d --name xdclass_kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=8.142.19.202:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://8.142.19.202:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
温馨提示

这里如果不是特别清楚zk和kafka之间的内网通信,博主推荐全部使用外网,避免后面报错!

进入容器内部,创建topic

docker exec -it 40162dc4c773 /bin/bash
cd /opt/kafka
bin/kafka-topics.sh --create --zookeeper 172.23.46.157:2181 --replication-factor 1 --partitions 1 --topic xdclass-topic

创建生产者发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic xdclass-topic

运行一个消费者(新建一个窗口,然后进入容器)

docker exec -it 40162dc4c773 /bin/bash
cd /opt/kafka
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xdclass-topic --from-beginning

运行结果如下

玩转Flink里面核心Source Sink对接 Kafka Connetor实战——第六课插图

玩转Flink里面核心Source Sink对接 Kafka Connetor实战——第六课插图2

二、Flink Source 读取消息队列Kafka连接器整合实战

  • 之前自定义SourceFunction,Flink官方也有提供对接外部系统的,比如读取Kafka
  • flink官方提供的连接器

添加依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
  • 编写代码
    • FlinkKafkaConsumer-》FlinkKafkaConsumerBase-》RichParallelSourceFunction(富函数-并行读取kafka多分区)

Flink08KafkaSourceApp

点击查看完整内容

效果图

玩转Flink里面核心Source Sink对接 Kafka Connetor实战——第六课插图4

三、Flink Sink读取消息队列Kafka连接器整合实战

Sink输出配置

点击查看完整内容

控制台输出(结果图)

玩转Flink里面核心Source Sink对接 Kafka Connetor实战——第六课插图6

玩转Flink里面核心Source Sink对接 Kafka Connetor实战——第六课插图8

温馨提示

这里唯一要注意的点就是topic一定要写对,要不然会看不到数据,消费不了。这两个必须对应上!

玩转Flink里面核心Source Sink对接 Kafka Connetor实战——第六课插图10

玩转Flink里面核心Source Sink对接 Kafka Connetor实战——第六课插图12

发表评论 取消回复
表情 图片 链接 代码

分享