SpringBoot项目整合Spring-kafka和事务消息实战

DBC 1.7K 0

1.Springboot项目整合spring-kafka依赖包配置

添加pom文件

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置文件修改增加生产者信息

spring:
  kafka:
    bootstrap-servers: 81.71.147.62:9092,81.71.147.62:9093,81.71.147.62:9094

    producer:
      # 消息重发的次数。
      retries: 0
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all

可看如下图

SpringBoot项目整合Spring-kafka和事务消息实战插图

直接看完整代码

package xdclasskafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class UserController  {


    private static final String TOPIC_NAME = "user.register.topic";

    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;


    @GetMapping("/api/v1/{num}")
    public void sendMessage(@PathVariable("num") String num){

        kafkaTemplate.send(TOPIC_NAME,"这是一个消息,num="+num).addCallback(success->{
            String topic = success.getRecordMetadata().topic();

            int partition = success.getRecordMetadata().partition();

            long offset = success.getRecordMetadata().offset();

            System.out.println("发送成功:topic="+topic+", partition="+partition+",offset ="+offset);

        },failure->{
            System.out.println("发送消息失败:"+failure.getMessage());
        });


    }

}
控制台输出

2.Springboot项目整合spring-kafka监听消费消息

配置文件修改增加消费者信息

    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S

      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      auto-offset-reset: earliest

      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false

      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    listener:
      #手工ack,调用ack后立刻提交offset
      ack-mode: manual_immediate
      #容器运行的线程数
      concurrency: 4

完整代码

package xdclasskafka.mq;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class MQListener {


    @KafkaListener(topics = {"user.register.topic"},groupId = "xdclass-gp2")
    public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack,
                          @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){

        System.out.println("消费消息:"+record.topic()+"----"+record.partition()+"----"+record.value());

        ack.acknowledge();

    }

}
控制台输出

3.Kafka事务消息-整合SpringBoot实战

Kafka 从 0.11 版本开始引入了事务支持

  • 事务可以保证对多个分区写入操作的原子性
  • 操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能

配置(依然直接给全,具体修改部分看图)

server:
  port: 8080

logging:
  config: classpath:logback.xml

spring:
  kafka:
    bootstrap-servers: 81.71.147.62:9092,81.71.147.62:9093,81.71.147.62:9094

    producer:
      # # 消息重发的次数。 配置事务的话:如果用户显式地指定了 retries 参数,那么这个参数的值必须大于0
      retries: 1
      #一个批次可以使用的内存大小
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #配置事务的话:如果用户显式地指定了 acks 参数,那么这个参数的值必须-1 all
      acks: all
      #事务id
      transaction-id-prefix: xdclass-tran-

    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S

      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      auto-offset-reset: earliest

      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false

      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    listener:
      #手工ack,调用ack后立刻提交offset
      ack-mode: manual_immediate
      #容器运行的线程数
      concurrency: 4
温馨提示

注意这两个位置的修改

SpringBoot项目整合Spring-kafka和事务消息实战插图2

SpringBoot代码编写

    /**
     * 注解方式的事务
     * @param num
     */
    @GetMapping("/api/v1/tran1")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMessage2(int num){

        kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 1 i="+num);

        if(num == 0){
            throw new RuntimeException();
        }
        kafkaTemplate.send(TOPIC_NAME,"这个是事务消息 2 i="+num);

    }


    /**
     * 声明式事务
     * @param num
     */
    @GetMapping("/api/v1/tran2")
    public void sendMessage3( int num){

        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {
            @Override
            public Object doInOperations(KafkaOperations<String, Object> kafkaOperations) {

                kafkaOperations.send(TOPIC_NAME,"这个是事务消息 1 i="+num);

                if(num == 0){
                    throw new RuntimeException();
                }
                kafkaOperations.send(TOPIC_NAME,"这个是事务消息 2 i="+num);
                return true;
            }
        });
    }
温馨提示

两种方式都可以,具体区别可以网上自己搜索一下!

发表评论 取消回复
表情 图片 链接 代码

分享