Kafka核心API消费者模块实战

DBC 1.6K 0

1.【面试】Consumer消费者机制和分区策略讲解

Kafka的Consumer消费者机制和分区策略讲解

  • 思路:从面试题角度去讲解原理流程
  • 消费者根据什么模式从broker获取数据的?
    • 为什么是pull模式,而不是broker主动push?
      • 消费者采用 pull 拉取方式,从broker的partition获取数据
      • pull 模式则可以根据 consumer 的消费能力进行自己调整,不同的消费者性能不一样
        • 如果broker没有数据,consumer可以配置 timeout 时间,阻塞等待一段时间之后再返回
      • 如果是broker主动push,优点是可以快速处理消息,但是容易造成消费者处理不过来,消息堆积和延迟。
  • 消费者从哪个分区进行消费?
    • 一个 topic 有多个 partition,一个消费者组里面有多个消费者,那是怎么分配过
      • 一个主题topic可以有多个消费者,因为里面有多个partition分区 ( leader分区)
      • 一个partition leader可以由一个消费者组中的一个消费者进行消费
      • 一个 topic 有多个 partition,所以有多个partition leader,给多个消费者消费,那分配策略如何?

消费者从哪个分区进行消费?两个策略

  • 顶层接口
  • round-robin (RoundRobinAssignor非默认策略)轮训
    • 【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的 partition 和所有的 consumer 都列出来, 所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题,例如7个分区,同组内2个消费者
    • topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6
    • c-1: topic-p0/topic-p2/topic-p4/topic-p6
    • c-2:topic-p1/topic-p3/topic-p5
    • 弊端
      • 如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀
      • 有3个消费者C0、C1和C2,他们共订阅了 3 个主题:t0、t1 和 t2
      • t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2))
      • 消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2

Kafka核心API消费者模块实战插图

 

 

  • range (RangeAssignor默认策略)范围
    • 【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区, 一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者
    • topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5//topic-p6
    • c-1: topic-p0/topic-p1/topic-p2/topic-p3
    • c-2:topic-p4/topic-p5/topic-p6
    • 弊端
      • 只是针对 1 个 topic 而言,c-1多消费一个分区影响不大
      • 如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic越多则消费的分区也越多,则性能有所下降

2.【面试】Consumer重新分配策略和offset维护机制

简介: Consumer消费者重新分配策略和offset维护机制

  • 什么是Rebalance操作
    • kafka 怎么均匀地分配某个 topic 下的所有 partition 到各个消费者,从而使得消息的消费速度达到最快,这就是平衡(balance),前面讲了 Range 范围分区 和 RoundRobin 轮询分区,也支持自定义分区策略。
    • 而 rebalance(重平衡)其实就是重新进行 partition 的分配,从而使得 partition 的分配重新达到平衡状态

     

  • 面试
    • 例如70个分区,10个消费者,但是先启动一个消费者,后续再启动一个消费者,这个会怎么分配?
      • Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作 ,下面都会发生rebalance操作
        • 当消费者组内的消费者数量发生变化(增加或者减少),就会产生重新分配patition
        • 分区数量发生变化时(即 topic 的分区数量发生变化时)

 

  • 面试:当消费者在消费过程突然宕机了,重新恢复后是从哪里消费,会有什么问题?
    • 消费者会记录offset,故障恢复后从这里继续消费,这个offset记录在哪里?
    • 记录在zk里面和本地,新版默认将offset保证在kafka的内置topic中,名称是 __consumer_offsets
      • 该Topic默认有50个Partition,每个Partition有3个副本,分区数量由参数offset.topic.num.partition配置
      • 通过groupId的哈希值和该参数取模的方式来确定某个消费者组已消费的offset保存到__consumer_offsets主题的哪个分区中
      • 由 消费者组名+主题+分区,确定唯一的offset的key,从而获取对应的值
      • 三元组:group.id+topic+分区号,而 value 就是 offset 的值

3.Consumer配置讲解和Kafka调试日志配置

springboot关闭kafka调试日志

Kafka核心API消费者模块实战插图2

server:
  port: 8080

logging:
  config: classpath:logback.xml
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 -->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern>
        </encoder>
    </appender>

    <root level="info">
        <appender-ref ref="STDOUT" />
    </root>

</configuration>

消费者配置

温馨提示

#消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
group.id

#为true则自动提交偏移量
enable.auto.commit

#自动提交offset周期
auto.commit.interval.ms

#重置消费偏移量策略,消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理,
#默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更 才可以
auto.offset.reset

#序列化器
key.deserializer

4.Kafka消费者Consumer消费消息配置实战

package net.xdclass.xdclasskafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaConsumerTest {


    public static Properties getProperties() {

        Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "81.71.147.62:9092");

        //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
        props.put("group.id", "xdclass-g1");

        //开启自动提交offset
        props.put("enable.auto.commit", "true");

        //自动提交offset延迟时间
        props.put("auto.commit.interval.ms", "1000");

        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }



    @Test
    public void simpleConsumerTest(){

        Properties properties = getProperties();

        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);

        //订阅主题
        kafkaConsumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME));


        while (true){
            //领取时间,阻塞超时时间(这里的意思是,如果没有消息,然后就会阻塞100毫秒)
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));

            for(ConsumerRecord record : records){
                System.err.printf("topic=%s, offset=%d,key=%s,value=%s %n",record.topic(),record.offset(),record.key(),record.value());
            }
        }

    }


}
控制台输出

5.Consumer从头消费配置和手工提交offset配置

如果需要从头消费partition消息,怎操作?

  • auto.offset.reset 配置策略即可
  • 默认是latest,需要改为 earliest 且消费者组名变更 ,即可实现从头消费
//默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("auto.offset.reset","earliest");

Kafka核心API消费者模块实战插图4

控制台输出

  • 自动提交offset问题
    • 没法控制消息是否正常被消费
    • 适合非严谨的场景,比如日志收集发送

 

  • 手工提交offset配置和测试
    • 初次启动消费者会请求broker获取当前消费的offset值

       

  • 手工提交offset
    • 同步 commitSync 阻塞当前线程 (自动失败重试)
    • 异步 commitAsync 不会阻塞当前线程 (没有失败重试,回调callback函数获取提交信息,记录日志)

手动提交示例完整代码

package net.xdclass.xdclasskafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaConsumerTest {


    public static Properties getProperties() {

        Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "81.71.147.62:9092");

        //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
        props.put("group.id", "xdclass-g1");

        //开启自动提交offset
        props.put("enable.auto.commit", "false");

        //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效
        props.put("auto.offset.reset", "earliest");

        //自动提交offset延迟时间
        props.put("auto.commit.interval.ms", "1000");

        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }


    @Test
    public void simpleConsumerTest() {

        Properties properties = getProperties();

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //订阅主题
        kafkaConsumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME));


        while (true) {
            //领取时间,阻塞超时时间(这里的意思是,如果没有消息,然后就会阻塞100毫秒)
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord record : records) {
                System.err.printf("topic=%s, offset=%d,key=%s,value=%s %n", record.topic(), record.offset(), record.key(), record.value());
            }
            //同步阻塞提交offset
            //kafkaConsumer.commitSync();

            if (!records.isEmpty()) {
                //异步提交offset
                kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {

                        if (exception == null) {
                            System.err.println("手工提交offset成功:" + offsets.toString());
                        } else {
                            System.err.println("手工提交offset失败:" + offsets.toString());
                        }
                    }
                });
            }
        }
    }

}

关键代码

Kafka核心API消费者模块实战插图6
Kafka核心API消费者模块实战插图8

发表评论 取消回复
表情 图片 链接 代码

分享