Kafka命令行生产者发送消息和消费者消费消息实战

DBC 2.1K 0

1.Kafka命令行生产者发送消息和消费者消费消息实战

创建topic

./kafka-topics.sh --create --zookeeper 81.71.147.62:2181 --replication-factor 1 --partitions 2 --topic xdclass-topic

查看topic

./kafka-topics.sh --list --zookeeper 81.71.147.62:2181

生产者发送消息

./kafka-console-producer.sh --broker-list 81.71.147.62:9092  --topic t1

消费者消费消息 ( --from-beginning:会把主题中以往所有的数据都读取出来, 重启后会有这个重复消费)

./kafka-console-consumer.sh --bootstrap-server 81.71.147.62:9092 --from-beginning --topic t1

删除topic

./kafka-topics.sh --zookeeper 81.71.147.62:2181 --delete --topic t1

查看broker节点topic状态信息

./kafka-topics.sh --describe --zookeeper 81.71.147.62:2181  --topic xdclass-topic

2.Kafka点对点模型和发布订阅模型讲解

JMS规范目前支持两种消息模型

  • 点对点(point to point)
    • 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息
    • 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费
  • 发布/订阅(publish/subscribe)
    • 消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。
    • 和点对点方式不同,发布到topic的消息会被所有订阅者消费。

Kafka命令行生产者发送消息和消费者消费消息实战插图

3.Kafka消费者组配置实现点对点消费模型

编辑消费者配置(确保同个名称group.id一样)

  • 编辑 config/consumer.properties

创建topic, 1个分区

./kafka-topics.sh --create --zookeeper 81.71.147.62:2181 --replication-factor 1 --partitions 2 --topic t1

指定配置文件启动 两个消费者

./kafka-console-consumer.sh --bootstrap-server 81.71.147.62:9092 --from-beginning --topic t1 --consumer.config ../config/consumer.properties
温馨提示

现象

  • 只有一个消费者可以消费到数据,一个分区只能被同个消费者组下的某个消费者进行消费
  • 看下面的图,分别是生产者和两个消费者
  • 生产者Kafka命令行生产者发送消息和消费者消费消息实战插图2
  • 两个消费者Kafka命令行生产者发送消息和消费者消费消息实战插图4 Kafka命令行生产者发送消息和消费者消费消息实战插图6
总结:可以看到,生产者生产的消息,消费者只有一个进行了消费

4.Kafka消费者组配置实现发布订阅消费模型

编辑消费者配置(确保group.id 不一样)

  • 编辑 config/consumer-1.properties
  • 编辑 config/consumer-2.properties

如图,修改对应的组名就好了

Kafka命令行生产者发送消息和消费者消费消息实战插图8

创建topic, 2个分区

./kafka-topics.sh --create --zookeeper 81.71.147.62:2181 --replication-factor 1 --partitions 2 --topic t2

指定配置文件启动 两个消费者

./kafka-console-consumer.sh --bootstrap-server 81.71.147.62:9092 --from-beginning --topic t2 --consumer.config ../config/consumer-1.properties
​
./kafka-console-consumer.sh --bootstrap-server 81.71.147.62:9092 --from-beginning --topic t2 --consumer.config ../config/consumer-2.properties
温馨提示

现象

  • 两个不同消费者组的节点,都可以消费到消息,实现发布订阅模型
  • 和上面一样的示例,一个生产者,和两个消费者
  • 生产者
  • Kafka命令行生产者发送消息和消费者消费消息实战插图10
  • 消费者
  • Kafka命令行生产者发送消息和消费者消费消息实战插图12 Kafka命令行生产者发送消息和消费者消费消息实战插图14

5.Kafka数据存储流程和原理概述和LEO+HW讲解

  • Partition
    • topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
    • 是以文件夹的形式存储在具体Broker本机上

      Kafka命令行生产者发送消息和消费者消费消息实战插图16

  • LEO(LogEndOffset)
    • 表示每个partition的log最后一条Message的位置。

       

  • HW(HighWatermark)
    • 表示partition各个replicas数据间同步且一致的offset位置,即表示allreplicas已经commit的位置
    • HW之前的数据才是Commit后的,对消费者才可见
    • ISR集合里面最小leo

Kafka命令行生产者发送消息和消费者消费消息实战插图18

  • offset
    • 每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中
    • partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息
    • 可以认为offset是partition中Message的id

 

  • Segment:每个partition又由多个segment file组成;
    • segment file 由2部分组成,分别为index file和data file(log file),
    • 两个文件是一一对应的,后缀”.index”和”.log”分别表示索引文件和数据文件
    • 命名规则:partition的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset+1

 

  • Kafka高效文件存储设计特点:
    • Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
    • 通过索引信息可以快速定位message
    • producer生产数据,要写入到log文件中,写的过程中一直追加到文件末尾,为顺序写,官网数据表明。同样的磁盘,顺序写能到600M/S,而随机写只有100K/S

发表评论 取消回复
表情 图片 链接 代码

分享