1.Springboot项目整合spring-kafka依赖包配置
添加pom文件
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置文件修改增加生产者信息
spring: kafka: bootstrap-servers: 81.71.147.62:9092,81.71.147.62:9093,81.71.147.62:9094 producer: # 消息重发的次数。 retries: 0 #一个批次可以使用的内存大小 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all
可看如下图
直接看完整代码
package xdclasskafka.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; @RestController public class UserController { private static final String TOPIC_NAME = "user.register.topic"; @Autowired private KafkaTemplate<String,Object> kafkaTemplate; @GetMapping("/api/v1/{num}") public void sendMessage(@PathVariable("num") String num){ kafkaTemplate.send(TOPIC_NAME,"这是一个消息,num="+num).addCallback(success->{ String topic = success.getRecordMetadata().topic(); int partition = success.getRecordMetadata().partition(); long offset = success.getRecordMetadata().offset(); System.out.println("发送成功:topic="+topic+", partition="+partition+",offset ="+offset); },failure->{ System.out.println("发送消息失败:"+failure.getMessage()); }); } }
控制台输出
发送成功:topic=user.register.topic, partition=0,offset =0
发送成功:topic=user.register.topic, partition=0,offset =1
发送成功:topic=user.register.topic, partition=0,offset =2
发送成功:topic=user.register.topic, partition=0,offset =3
2.Springboot项目整合spring-kafka监听消费消息
配置文件修改增加消费者信息
consumer: # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D auto-commit-interval: 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: auto-offset-reset: earliest # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: #手工ack,调用ack后立刻提交offset ack-mode: manual_immediate #容器运行的线程数 concurrency: 4
完整代码
package xdclasskafka.mq; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component public class MQListener { @KafkaListener(topics = {"user.register.topic"},groupId = "xdclass-gp2") public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){ System.out.println("消费消息:"+record.topic()+"----"+record.partition()+"----"+record.value()); ack.acknowledge(); } }
控制台输出
消费消息:user.register.topic----0----这是一个消息,num=12
3.Kafka事务消息-整合SpringBoot实战
Kafka 从 0.11 版本开始引入了事务支持
- 事务可以保证对多个分区写入操作的原子性
- 操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能
配置(依然直接给全,具体修改部分看图)
server: port: 8080 logging: config: classpath:logback.xml spring: kafka: bootstrap-servers: 81.71.147.62:9092,81.71.147.62:9093,81.71.147.62:9094 producer: # # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0 retries: 1 #一个批次可以使用的内存大小 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all acks: all #事务id transaction-id-prefix: xdclass-tran- consumer: # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D auto-commit-interval: 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: auto-offset-reset: earliest # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: #手工ack,调用ack后立刻提交offset ack-mode: manual_immediate #容器运行的线程数 concurrency: 4
SpringBoot代码编写
/** * 注解方式的事务 * @param num */ @GetMapping("/api/v1/tran1") @Transactional(rollbackFor = RuntimeException.class) public void sendMessage2(int num){ kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 1 i="+num); if(num == 0){ throw new RuntimeException(); } kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 2 i="+num); } /** * 声明式事务 * @param num */ @GetMapping("/api/v1/tran2") public void sendMessage3( int num){ kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() { @Override public Object doInOperations(KafkaOperations<String, Object> kafkaOperations) { kafkaOperations.send(TOPIC_NAME,"这个是事务消息 1 i="+num); if(num == 0){ throw new RuntimeException(); } kafkaOperations.send(TOPIC_NAME,"这个是事务消息 2 i="+num); return true; } }); }
本文作者为DBC,转载请注明。