RokcetMQ分布式事务消息的总体架构
- RocketMQ事务消息:
- RocketMQ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致
- 半消息Half Message:
- 暂不能投递的消息(暂不能消费),Producer已经将消息成功发送到了Broker端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息
- 消息回查:
- 由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
- 整体交互流程
- Producer向broker端发送消息。
- 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
- 发送方开始执行本地事务逻辑。
- 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息
- 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
- 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
- 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作
- RocketMQ事务消息的状态
- COMMIT_MESSAGE: 提交事务消息,消费者可以消费此消息
- ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能消费
- UNKNOW:Broker需要回查确认消息的状态
- 关于事务消息的消费
- 事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息能被consumer收到(消息重试等机制,最后也存在consumer消费失败的情况,这种情况出现的概率极低)。
RocketMQ分布式事务消息实战
我们先看发送方,TransactionProducer、PayController
package net.xdclass.xdclassmq.controller;
import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import net.xdclass.xdclassmq.jms.TransactionProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
@RestController
public class PayController {
@Autowired
private TransactionProducer transactionMQProducer;
@RequestMapping("/api/v1/pay_cb")
public Object callback( String tag, String otherParam ) throws Exception {
Message message = new Message(JmsConfig.TOPIC, tag, tag+"_key",tag.getBytes());
SendResult sendResult = transactionMQProducer.getProducer().
sendMessageInTransaction(message, otherParam);
System.out.printf("发送结果=%s, sendResult=%s \n", sendResult.getSendStatus(), sendResult.toString());
return new HashMap<>();
}
}
package net.xdclass.xdclassmq.jms;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
@Component
public class TransactionProducer {
private String producerGroup = "trac_producer_group";
//事务监听器
private TransactionListener transactionListener = new TransactionListenerImpl();
private TransactionMQProducer producer = null;
//一般自定义线程池的时候,需要给线程加个名称
private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
public TransactionProducer(){
producer = new TransactionMQProducer(producerGroup);
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
producer.setTransactionListener(transactionListener);
producer.setExecutorService(executorService);
//指定NameServer地址,多个地址以 ; 隔开
start();
}
public TransactionMQProducer getProducer(){
return this.producer;
}
/**
* 对象在使用之前必须要调用一次,只能初始化一次
*/
public void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown(){
this.producer.shutdown();
}
}
class TransactionListenerImpl implements TransactionListener{
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return null;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return null;
}
}
本文作者为DBC,转载请注明。
