[关闭]
@yexiaoqi 2022-05-31T01:08:23.000000Z 字数 9070 阅读 660

RocketMQ的使用

技术 RocketMQ 转载



前言

RocketMQ 是阿里开源的一个消息中间件,在 Springboot 中使用主要有两种方式,第一种是基于 RocketMQ 原生的 API,第二种是采用 Springboot 对 RocketMQ 封装后的写法,下面分别来介绍这两种基本方法。

原生API的方式

生产者的使用

pom依赖:

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.8.0</version>
  5. </dependency>

配置文件:

  1. # NameServer地址
  2. apache.rocketmq.namesrvAddr=192.168.56.129:9876
  3. # 生产者的组名
  4. apache.rocketmq.producer.producerGroup=test_Producer

普通消息

生产者初始化:

  1. import org.apache.rocketmq.client.exception.MQClientException;
  2. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.PropertySource;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.PostConstruct;
  7. import javax.annotation.PreDestroy;
  8. @Component
  9. public class MsgProducer {
  10. @Value("${apache.rocketmq.producer.producerGroup}")
  11. private String producerGroup;
  12. @Value("${apache.rocketmq.namesrvAddr}")
  13. private String namesrvAddr;
  14. private DefaultMQProducer mqProducer;
  15. //调用方注入MsgProducer后,通过这个方法获取配置好的DefaultMQProducer
  16. public DefaultMQProducer getMqProducer(){
  17. return mqProducer;
  18. }
  19. @PostConstruct
  20. public void initMQ(){
  21. mqProducer=new DefaultMQProducer(producerGroup);
  22. mqProducer.setNamesrvAddr(namesrvAddr);
  23. mqProducer.setVipChannelEnabled(false);
  24. try {
  25. mqProducer.start();
  26. } catch (MQClientException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. @PreDestroy
  31. public void destory(){
  32. mqProducer.shutdown();
  33. }
  34. }

生产者发送消息【同步、非顺序】

  1. @Autowired
  2. private MsgProducer msgProducer;
  3. public void sendMsg() throws Exception {
  4. //消息体
  5. String msg = "hello, RocketMQ";
  6. Message message = new Message("test_topic_2", "test", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
  7. //调用配置好的DefaultMQProducer发送消息,拿到返回结果
  8. SendResult result = msgProducer.getMqProducer().send(message);
  9. }

生产者发送【同步、顺序消息】

  1. @Autowired
  2. private MsgProducer msgProducer;
  3. public void sendMsg() throws Exception {
  4. //发100条消息测试
  5. for(int i = 1; i <= 100; i++) {
  6. //默认自动创建的topic有四个队列,如果按照队列读取,那么同一个队列id下的value一定按照接受的顺序从小到大
  7. String msg = "id:" + i%4 + " value:" + i;
  8. Message message = new Message("test_topic_2", "test", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
  9. SendResult result = msgProducer.getMqProducer().send(message, new MessageQueueSelector(){
  10. @Override
  11. //arg一般是唯一id,这里是i
  12. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  13. int queueNum = Integer.valueOf(String.valueOf(arg)) % 4;
  14. System.out.println("队列id:" + queueNum + " 消息:" + new String(msg.getBody()));
  15. return mqs.get(queueNum);
  16. }
  17. }, i);
  18. }
  19. }

生产者发送【异步、非顺序消息】

  1. public void asyncProducer() throws Exception {
  2. // Instantiate with a producer group name.
  3. DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
  4. // Launch the instance.
  5. producer.start();
  6. producer.setRetryTimesWhenSendAsyncFailed(0);
  7. for (int i = 0; i < 100; i++) {
  8. final int index = i;
  9. //创建一个消息示例, 指定topic, tag 和 message体.
  10. Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
  11. //发送消息并设置回调函数
  12. producer.send(msg, new SendCallback() {
  13. @Override
  14. public void onSuccess(SendResult sendResult) {
  15. System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
  16. }
  17. @Override
  18. public void onException(Throwable e) {
  19. System.out.printf("%-10d Exception %s %n", index, e);
  20. e.printStackTrace();
  21. }
  22. });
  23. }
  24. }

事务消息

消息先不发到 broker 的目的队列,而是包装一层放到中间队列,待提交之后再放到目的队列。配置类如下:

  1. @Component
  2. public class TxMsgProducer {
  3. @Value("${apache.rocketmq.producer.producerGroup}")
  4. private String producerGroup;
  5. @Value("${apache.rocketmq.namesrvAddr}")
  6. private String namesrvAddr;
  7. private TransactionMQProducer mqProducer;
  8. public DefaultMQProducer getMqProducer(){
  9. return mqProducer;
  10. }
  11. @PostConstruct
  12. public void initMQ(){
  13. mqProducer=new TransactionMQProducer(producerGroup);
  14. mqProducer.setNamesrvAddr(namesrvAddr);
  15. mqProducer.setVipChannelEnabled(false);
  16. mqProducer.setNamesrvAddr(namesrvAddr);
  17. //下面两个是新增的
  18. mqProducer.setExecutorService(getExecutorService());
  19. mqProducer.setTransactionListener(new TransactionListenerImpl());
  20. try {
  21. mqProducer.start();
  22. } catch (MQClientException e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. @PreDestroy//在程序运行结束时执行
  27. public void destory(){
  28. mqProducer.shutdown();
  29. }
  30. /**
  31. * 事务监听
  32. */
  33. class TransactionListenerImpl implements TransactionListener {
  34. //第一次判断是否提交或回滚
  35. @Override
  36. public LocalTransactionState executeLocalTransaction(Message message, Object arg){
  37. //message就是那个半发送的消息 arg是在transcationProducter.send(Message,Object)时的另一个携带参数)
  38. //执行本地事务或调用其他为服务
  39. if(true) return LocalTransactionState.COMMIT_MESSAGE;
  40. if(true) return LocalTransactionState.ROLLBACK_MESSAGE;
  41. //如果在检查事务时数据库出现宕机可以让broker过一段时间回查 和return null 效果相同
  42. return LocalTransactionState.UNKNOW;
  43. }
  44. //返回UNKOWN时回查!
  45. @Override
  46. public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
  47. //只去返回commit或者rollback
  48. return LocalTransactionState.COMMIT_MESSAGE;
  49. }
  50. }
  51. //定义一个线程池 让broker用来执行回调和回查
  52. public ExecutorService getExecutorService(){
  53. return new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000));
  54. }
  55. }

第二步,发送消息

  1. @Autowired
  2. private TxMsgProducer msgProducer;
  3. public void sendMsg() throws Exception {
  4. String msg="hello";
  5. Message message=new Message("test_topic_2","test",msg.getBytes());
  6. //调用配置好的TxMsgProducer 发送消息
  7. SendResult result=msgProducer.getMqProducer().send(message);
  8. }

消费者的使用

pom依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.8.0</version>
  5. </dependency>

配置文件:

  1. # NameServer地址
  2. apache.rocketmq.namesrvAddr=192.168.56.129:9876
  3. # 消费者的组名
  4. apache.rocketmq.consumer.PushConsumer=test_Consumer

消费消息:

  1. @Component
  2. public class MsgConsumer {
  3. @Value("${apache.rocketmq.consumer.PushConsumer}")
  4. private String consumerGroup;
  5. @Value("${apache.rocketmq.namesrvAddr}")
  6. private String namesrvAddr;
  7. private DefaultMQPushConsumer consumer;
  8. @PostConstruct
  9. public void init() throws MQClientException {
  10. consumer=new DefaultMQPushConsumer(consumerGroup);
  11. consumer.setNamesrvAddr(namesrvAddr);
  12. //设置consumer所订阅的Topic和Tag,*代表全部的Tag
  13. consumer.subscribe("test_topic_2", "*");
  14. /**
  15. * CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,跳过历史消息
  16. * CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
  17. */
  18. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  19. consumer.registerMessageListener(new MessageListenerConcurrently() {
  20. @Override
  21. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
  22. try{
  23. System.out.println("接受:"+new String(list.get(0).getBody()));
  24. }catch (Exception e){
  25. //ACK机制,消费失败,触发RocketMQ 重发消息
  26. return ConsumeConcurrentlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  27. }
  28. //ACK机制,消费成功
  29. return ConsumeConcurrentlyStatus.SUCCESS;
  30. }
  31. });
  32. consumer.start();
  33. }
  34. @PreDestroy
  35. public void destory(){
  36. consumer.shutdown();
  37. }
  38. }

其他消费模式

顺序消费

  1. consumer.registerMessageListener(new MessageListenerConcurrently(){});

改为

  1. consumer.registerMessageListener(new MessageListenerOrderly(){});

事务消费

不用改变消费者,如果事务的监听 rollback 了,消费者的消费结果会自动回滚

广播消费

  1. consumer.setMessageMode(MessageMode.BROADCASTING);
  2. consumer.setOffsetStore(OffsetStore.LocalFileOffsetStore);

SpringBoot整合RocketMQ的方式

生产者的使用

pom依赖:

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.2.0</version>
  5. </dependency>

配置文件设置:

  1. rocketmq:
  2. name-server: localhost:9876
  3. producer:
  4. group: my-group
  5. server:
  6. port: 8081

发送消息:

直接实现 CommandLineRunner 这个接口,复写 run 方法即可,然后注册 RocketMQTemplate,就可以生产消息了

  1. @SpringBootApplication
  2. public class SpringBootRocketmqProducerApplication implements CommandLineRunner {
  3. //引入依赖模板
  4. @Resource
  5. private RocketMQTemplate rocketMQTemplate;
  6. @Override
  7. public void run(String... args) throws Exception {
  8. //发送消息
  9. rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
  10. rocketMQTemplate.convertAndSend("test-topic-2",
  11. new OrderPaidEvent("orderId-0001", 88));
  12. }
  13. }
  14. @Data
  15. @AllArgsConstructor
  16. class OrderPaidEvent implements Serializable {
  17. private String orderId;
  18. private Integer paidMoney;
  19. }

消费者的使用

pom依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-spring-boot-starter</artifactId>
  4. <version>2.2.0</version>
  5. </dependency>

配置文件设置:

  1. rocketmq:
  2. name-server: localhost:9876
  3. server:
  4. port: 8082

消费消息:onMessage() 封装了ACK 机制,消费者往外抛异常时,RocketMQ 认为消费失败,重新发送该条消息,否则默认消费成功。

  1. @Slf4j
  2. @Service
  3. @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
  4. class MyConsumer1 implements RocketMQListener<String> {
  5. /**
  6. *需要注意的是,onMessage()封装了ACK机制,消费者往外抛异常时,RocketMQ认为消费失败,重新发送该条消息,否则默认消费成功
  7. */
  8. @SneakyThrows
  9. @Override
  10. public void onMessage(Message message) {
  11. log.info("receivie message:topic={},body={}", message.getTopic(), new String(message.getBody()));
  12. if(消费成功){
  13. //TODO
  14. }else if(消费失败){
  15. throw new Exception;
  16. }
  17. }
  18. }
  19. @Data
  20. @AllArgsConstructor
  21. class OrderPaidEvent implements Serializable {
  22. private String orderId;
  23. private Integer paidMoney;
  24. }

转载自 RocketMQ在SpringBoot中的使用


常见问题

消息消费有延迟,但并未使用延迟消息?

现象:消息投递到消费者监听到,有 2~10 秒的延迟,但并未使用延迟消息
一般来说消费延迟可能有以下几个原因

  1. 生产者生产速度过快、消费者来不及消费,消息积压造成消费延迟。一般增加消费者可以解决
  2. 部分队列分不到消费者
  3. 消费者线程阻塞

经过判断1、3均排除,最有可能的是消费者的 consumer group 乱用,搜索代码中的所有消费者,修改消费组后,消费正常。

生产者报连接超时,但网络正常?

是否是多网卡环境,查看 broker 的配置,brokerIP1=xxx.xxx.xxx.xxx 需配置。

广播模式,消费不到?

广播模式会把 offset 保存到本地,默认是在用户根目录/.rocketmq_offsets/xxx.xxx.xxx.xxx@DEFAULT下,当同一个机子上启动有多个相同的消费者时,会对这个本地的偏移量造成覆盖。
集群模式下的消费者的 instanceName 会用 PID,没有 instanceName 影响不大,广播模式的 instanceName 是DEFAULT,不同实例之间会重复。

producer发送消息时抛出异常: No route info of this topic?

  1. broker 上不存在该 topic,需要手动创建该 topic 或者修改 broker 配置为自动创建 topic;
  2. broker 没有正确连接到 name server 上;
  3. producer 没有正确连接到 name server 上;
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注