生产消息使用MessageQueueSelector投递到Topic下指定的queue
- 应用场景:顺序消息,分摊负载
- 默认topic下的queue数量是4,可以配置
//可以使用Jdk8的lambda表达式,只有一个方法需要被实现 producer.send(message, new MessageQueueSelector(){ select(List<MessageQueue> mqs, Message msg, Object arg){ Integer queueNum = (Integer)arg; return mqs.get(queueNum); } },0)
- 支持同步,异步发送指定的MessageQueue
- 选择的queue数量必须小于配置的,否则会出错
小例子
package net.xdclass.xdclassmq.controller; import net.xdclass.xdclassmq.jms.JmsConfig; import net.xdclass.xdclassmq.jms.PayProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.List; @RestController public class PayController { @Autowired private PayProducer payProducer; @RequestMapping("/api/v1/pay_cb") public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() ); //同步发送 // SendResult sendResult = payProducer.getProducer().send(message, new MessageQueueSelector() { // @Override // public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // int queueNum = Integer.parseInt(arg.toString()); // return mqs.get(queueNum); // } // // }, 3); // System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString()); //异步发送到指定queue, SendCallback不能用lambda表达式,有两个函数需要被实现 payProducer.getProducer().send(message, (mqs, msg, arg) -> { int queueNum = Integer.parseInt(arg.toString()); return mqs.get(queueNum); }, 0, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString()); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); return new HashMap<>(); } }
本文作者为DBC,转载请注明。