RocketMQ生产者之MessageQueueSelector实战

DBC 1.7K 0

生产消息使用MessageQueueSelector投递到Topic下指定的queue

  • 应用场景:顺序消息,分摊负载
  • 默认topic下的queue数量是4,可以配置
//可以使用Jdk8的lambda表达式,只有一个方法需要被实现
producer.send(message, new MessageQueueSelector(){
			select(List<MessageQueue> mqs, Message msg, Object arg){
				Integer queueNum = (Integer)arg;
				return mqs.get(queueNum);
			}
	},0)
  • 支持同步,异步发送指定的MessageQueue
  • 选择的queue数量必须小于配置的,否则会出错
小例子
package net.xdclass.xdclassmq.controller;

import net.xdclass.xdclassmq.jms.JmsConfig;
import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.List;

@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;


    @RequestMapping("/api/v1/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

        Message message = new Message(JmsConfig.TOPIC,"taga" ,"66881" , ("hello xdclass rocketmq = "+text).getBytes() );

        //同步发送
//       SendResult sendResult =  payProducer.getProducer().send(message, new MessageQueueSelector() {
//           @Override
//            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//               int queueNum = Integer.parseInt(arg.toString());
//               return mqs.get(queueNum);
//            }
//
//        }, 3);
//        System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());


        //异步发送到指定queue, SendCallback不能用lambda表达式,有两个函数需要被实现
        payProducer.getProducer().send(message, (mqs, msg, arg) -> {

            int queueNum = Integer.parseInt(arg.toString());
            return mqs.get(queueNum);

        }, 0, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString());
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });





        return new HashMap<>();
    }

}
温馨提示

为了避免消息突然来得很多,直接冲垮这个topic,所以我们规定它只走这个topic的一个队列,这样保证其他的还能正常的工作,这是一种思想,方便后面的学习

发表评论 取消回复
表情 图片 链接 代码

分享