生产消息使用MessageQueueSelector投递到Topic下指定的queue
- 应用场景:顺序消息,分摊负载
- 默认topic下的queue数量是4,可以配置
//可以使用Jdk8的lambda表达式,只有一个方法需要被实现
producer.send(message, new MessageQueueSelector(){
select(List<MessageQueue> mqs, Message msg, Object arg){
Integer queueNum = (Integer)arg;
return mqs.get(queueNum);
}
},0)
- 支持同步,异步发送指定的MessageQueue
- 选择的queue数量必须小于配置的,否则会出错
小例子
package net.xdclass.xdclassmq.controller;
import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.List;
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
@RequestMapping("/api/v1/pay_cb")
public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() );
//同步发送
// SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() {
// @Override
// public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// int queueNum = Integer.parseInt(arg.toString());
// return mqs.get(queueNum);
// }
//
// }, 3);
// System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());
//异步发送到指定queue, SendCallback不能用lambda表达式,有两个函数需要被实现
payProducer.getProducer().send(message, (mqs, msg, arg) -> {
int queueNum = Integer.parseInt(arg.toString());
return mqs.get(queueNum);
}, 0, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
return new HashMap<>();
}
} 本文作者为DBC,转载请注明。