一、Kafka相关运行环境说明和必备基础知识点
Docker快速部署kafka
zk
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
kafka
docker run -d --name xdclass_kafka \ -p 9092:9092 \ -e KAFKA_BROKER_ID=0 \ -e KAFKA_ZOOKEEPER_CONNECT=8.142.19.202:2181 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://8.142.19.202:9092 \ -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
进入容器内部,创建topic
docker exec -it 40162dc4c773 /bin/bash
cd /opt/kafka
bin/kafka-topics.sh --create --zookeeper 172.23.46.157:2181 --replication-factor 1 --partitions 1 --topic xdclass-topic
创建生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic xdclass-topic
运行一个消费者(新建一个窗口,然后进入容器)
docker exec -it 40162dc4c773 /bin/bash
cd /opt/kafka
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic xdclass-topic --from-beginning
运行结果如下
二、Flink Source 读取消息队列Kafka连接器整合实战
- 之前自定义SourceFunction,Flink官方也有提供对接外部系统的,比如读取Kafka
- flink官方提供的连接器
添加依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.version}</artifactId> <version>${flink.version}</version> </dependency>
- 编写代码
- FlinkKafkaConsumer-》FlinkKafkaConsumerBase-》RichParallelSourceFunction(富函数-并行读取kafka多分区)
Flink08KafkaSourceApp
点击查看完整内容
import net.xdclass.model.VideoOrder; import net.xdclass.sink.VideoOrderCounterSink; import net.xdclass.source.VideoOrderSource; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import java.util.Properties; public class Flink08KafkaSourceApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); Properties props = new Properties(); //kafka地址 props.setProperty("bootstrap.servers", "8.142.19.202:9092"); //组名 props.setProperty("group.id", "video-order-group"); //字符串序列化和反序列化规则 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //offset重置规则 props.setProperty("auto.offset.reset", "latest"); //自动提交 props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "2000"); //有后台线程每隔10s检测一下Kafka的分区变化情况 props.setProperty("flink.partition-discovery.interval-millis","10000"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("xdclass-topic", new SimpleStringSchema(),props); //设置从记录的消费者组内的offset开始消费 consumer.setStartFromGroupOffsets(); DataStream<String> kafkaDS = env.addSource(consumer); kafkaDS.print("kafka:"); //DataStream需要调用execute,可以取个名称 env.execute("kafka source job"); } }
效果图
三、Flink Sink读取消息队列Kafka连接器整合实战
Sink输出配置
点击查看完整内容
import net.xdclass.model.VideoOrder; import net.xdclass.sink.VideoOrderCounterSink; import net.xdclass.source.VideoOrderSource; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import java.util.Properties; public class Flink08KafkaSourceApp { /** * source * transformation * sink * * @param args */ public static void main(String[] args) throws Exception { //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); Properties props = new Properties(); //kafka地址 props.setProperty("bootstrap.servers", "8.142.19.202:9092"); //组名 props.setProperty("group.id", "video-order-group"); //字符串序列化和反序列化规则 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //offset重置规则 props.setProperty("auto.offset.reset", "latest"); //自动提交 props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "2000"); //有后台线程每隔10s检测一下Kafka的分区变化情况 props.setProperty("flink.partition-discovery.interval-millis","10000"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("xdclass-topic", new SimpleStringSchema(),props); //设置从记录的消费者组内的offset开始消费 consumer.setStartFromGroupOffsets(); DataStream<String> kafkaDS = env.addSource(consumer); kafkaDS.print("kafka:"); DataStream<String> mapDS = kafkaDS.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return "大白菜内容:"+value; } }); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("dabaicai-order",new SimpleStringSchema(),props); mapDS.addSink(producer); //DataStream需要调用execute,可以取个名称 env.execute("kafka source job"); } }
控制台输出(结果图)
本文作者为DBC,转载请注明。