1.【面试】Consumer消费者机制和分区策略讲解
Kafka的Consumer消费者机制和分区策略讲解
- 思路:从面试题角度去讲解原理流程
- 消费者根据什么模式从broker获取数据的?
- 为什么是pull模式,而不是broker主动push?
- 消费者采用 pull 拉取方式,从broker的partition获取数据
- pull 模式则可以根据 consumer 的消费能力进行自己调整,不同的消费者性能不一样
- 如果broker没有数据,consumer可以配置 timeout 时间,阻塞等待一段时间之后再返回
- 如果是broker主动push,优点是可以快速处理消息,但是容易造成消费者处理不过来,消息堆积和延迟。
- 为什么是pull模式,而不是broker主动push?
- 消费者从哪个分区进行消费?
- 一个 topic 有多个 partition,一个消费者组里面有多个消费者,那是怎么分配过
- 一个主题topic可以有多个消费者,因为里面有多个partition分区 ( leader分区)
- 一个partition leader可以由一个消费者组中的一个消费者进行消费
- 一个 topic 有多个 partition,所以有多个partition leader,给多个消费者消费,那分配策略如何?
- 一个 topic 有多个 partition,一个消费者组里面有多个消费者,那是怎么分配过
消费者从哪个分区进行消费?两个策略
- 顶层接口
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
- round-robin (RoundRobinAssignor非默认策略)轮训
- 【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的 partition 和所有的 consumer 都列出来, 所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题,例如7个分区,同组内2个消费者
- topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6
- c-1: topic-p0/topic-p2/topic-p4/topic-p6
- c-2:topic-p1/topic-p3/topic-p5
- 弊端
- 如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀
- 有3个消费者C0、C1和C2,他们共订阅了 3 个主题:t0、t1 和 t2
- t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2))
- 消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2
- range (RangeAssignor默认策略)范围
- 【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区, 一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者
- topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5//topic-p6
- c-1: topic-p0/topic-p1/topic-p2/topic-p3
- c-2:topic-p4/topic-p5/topic-p6
- 弊端
- 只是针对 1 个 topic 而言,c-1多消费一个分区影响不大
- 如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic越多则消费的分区也越多,则性能有所下降
2.【面试】Consumer重新分配策略和offset维护机制
简介: Consumer消费者重新分配策略和offset维护机制
- 什么是Rebalance操作
- kafka 怎么均匀地分配某个 topic 下的所有 partition 到各个消费者,从而使得消息的消费速度达到最快,这就是平衡(balance),前面讲了 Range 范围分区 和 RoundRobin 轮询分区,也支持自定义分区策略。
- 而 rebalance(重平衡)其实就是重新进行 partition 的分配,从而使得 partition 的分配重新达到平衡状态
- 面试
- 例如70个分区,10个消费者,但是先启动一个消费者,后续再启动一个消费者,这个会怎么分配?
- Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作 ,下面都会发生rebalance操作
- 当消费者组内的消费者数量发生变化(增加或者减少),就会产生重新分配patition
- 分区数量发生变化时(即 topic 的分区数量发生变化时)
- Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作 ,下面都会发生rebalance操作
- 例如70个分区,10个消费者,但是先启动一个消费者,后续再启动一个消费者,这个会怎么分配?
- 面试:当消费者在消费过程突然宕机了,重新恢复后是从哪里消费,会有什么问题?
- 消费者会记录offset,故障恢复后从这里继续消费,这个offset记录在哪里?
- 记录在zk里面和本地,新版默认将offset保证在kafka的内置topic中,名称是 __consumer_offsets
- 该Topic默认有50个Partition,每个Partition有3个副本,分区数量由参数offset.topic.num.partition配置
- 通过groupId的哈希值和该参数取模的方式来确定某个消费者组已消费的offset保存到__consumer_offsets主题的哪个分区中
- 由 消费者组名+主题+分区,确定唯一的offset的key,从而获取对应的值
- 三元组:group.id+topic+分区号,而 value 就是 offset 的值
3.Consumer配置讲解和Kafka调试日志配置
springboot关闭kafka调试日志
server: port: 8080 logging: config: classpath:logback.xml
<configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 --> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern> </encoder> </appender> <root level="info"> <appender-ref ref="STDOUT" /> </root> </configuration>
消费者配置
4.Kafka消费者Consumer消费消息配置实战
package net.xdclass.xdclasskafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.*; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class KafkaConsumerTest { public static Properties getProperties() { Properties props = new Properties(); //broker地址 props.put("bootstrap.servers", "81.71.147.62:9092"); //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息 props.put("group.id", "xdclass-g1"); //开启自动提交offset props.put("enable.auto.commit", "true"); //自动提交offset延迟时间 props.put("auto.commit.interval.ms", "1000"); //反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } @Test public void simpleConsumerTest(){ Properties properties = getProperties(); KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties); //订阅主题 kafkaConsumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME)); while (true){ //领取时间,阻塞超时时间(这里的意思是,如果没有消息,然后就会阻塞100毫秒) ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100)); for(ConsumerRecord record : records){ System.err.printf("topic=%s, offset=%d,key=%s,value=%s %n",record.topic(),record.offset(),record.key(),record.value()); } } } }
topic=xdclass-sp-topic-test, offset=9,key=xdclass-key0,value=xdclass-value0
topic=xdclass-sp-topic-test, offset=10,key=xdclass-key1,value=xdclass-value1
topic=xdclass-sp-topic-test, offset=11,key=xdclass-key2,value=xdclass-value2
topic=xdclass-sp-topic-test, offset=12,key=xdclass-key0,value=我是小笨蛋0
topic=xdclass-sp-topic-test, offset=13,key=xdclass-key1,value=我是小笨蛋1
topic=xdclass-sp-topic-test, offset=14,key=xdclass-key2,value=我是小笨蛋2
5.Consumer从头消费配置和手工提交offset配置
如果需要从头消费partition消息,怎操作?
- auto.offset.reset 配置策略即可
- 默认是latest,需要改为 earliest 且消费者组名变更 ,即可实现从头消费
//默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("auto.offset.reset","earliest");
topic=xdclass-sp-topic-test, offset=0,key=xdclass-key0,value=xdclass-value0
topic=xdclass-sp-topic-test, offset=1,key=xdclass-key1,value=xdclass-value1
topic=xdclass-sp-topic-test, offset=2,key=xdclass-key2,value=xdclass-value2
topic=xdclass-sp-topic-test, offset=3,key=xdclass-key0,value=xdclass-value0
topic=xdclass-sp-topic-test, offset=4,key=xdclass-key1,value=xdclass-value1
topic=xdclass-sp-topic-test, offset=5,key=xdclass-key2,value=xdclass-value2
topic=xdclass-sp-topic-test, offset=6,key=xdclass-key0,value=xdclass-value0
topic=xdclass-sp-topic-test, offset=7,key=xdclass-key1,value=xdclass-value1
topic=xdclass-sp-topic-test, offset=8,key=xdclass-key2,value=xdclass-value2
topic=xdclass-sp-topic-test, offset=9,key=xdclass-key0,value=xdclass-value0
topic=xdclass-sp-topic-test, offset=10,key=xdclass-key1,value=xdclass-value1
topic=xdclass-sp-topic-test, offset=11,key=xdclass-key2,value=xdclass-value2
topic=xdclass-sp-topic-test, offset=12,key=xdclass-key0,value=我是小笨蛋0
topic=xdclass-sp-topic-test, offset=13,key=xdclass-key1,value=我是小笨蛋1
topic=xdclass-sp-topic-test, offset=14,key=xdclass-key2,value=我是小笨蛋2
- 自动提交offset问题
- 没法控制消息是否正常被消费
- 适合非严谨的场景,比如日志收集发送
- 手工提交offset配置和测试
- 初次启动消费者会请求broker获取当前消费的offset值
- 初次启动消费者会请求broker获取当前消费的offset值
- 手工提交offset
- 同步 commitSync 阻塞当前线程 (自动失败重试)
- 异步 commitAsync 不会阻塞当前线程 (没有失败重试,回调callback函数获取提交信息,记录日志)
手动提交示例完整代码
package net.xdclass.xdclasskafka; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class KafkaConsumerTest { public static Properties getProperties() { Properties props = new Properties(); //broker地址 props.put("bootstrap.servers", "81.71.147.62:9092"); //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息 props.put("group.id", "xdclass-g1"); //开启自动提交offset props.put("enable.auto.commit", "false"); //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("auto.offset.reset", "earliest"); //自动提交offset延迟时间 props.put("auto.commit.interval.ms", "1000"); //反序列化 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } @Test public void simpleConsumerTest() { Properties properties = getProperties(); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); //订阅主题 kafkaConsumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME)); while (true) { //领取时间,阻塞超时时间(这里的意思是,如果没有消息,然后就会阻塞100毫秒) ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.err.printf("topic=%s, offset=%d,key=%s,value=%s %n", record.topic(), record.offset(), record.key(), record.value()); } //同步阻塞提交offset //kafkaConsumer.commitSync(); if (!records.isEmpty()) { //异步提交offset kafkaConsumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception == null) { System.err.println("手工提交offset成功:" + offsets.toString()); } else { System.err.println("手工提交offset失败:" + offsets.toString()); } } }); } } } }
关键代码
本文作者为DBC,转载请注明。