@File
2019-10-24T07:06:53.000000Z
字数 3983
阅读 210
java web
1.1 下载地址:http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
1.2 把 /bin 目录,配置到环境变量
1.3 cmd 执行 mqbroker.cmd -n localhost:9876
1.4 cmd 执行 mqnamesrv.cmd
1.1 下载地址:https://github.com/apache/rocketmq-externals
1.2 进入 /rocketmq-console 目录
1.3 cmd 执行 mvn clean package -Dmaven.test.skip=true
1.4 cmd 在执行 java -jar target/rocketmq-console-ng-1.0.1.jar
1.5 修改配置文件 rocketmq-externals\rocketmq-console\src\main\resourcesapplication.properties 中的 rocketmq.config.namesrvAddr=localhost:9876
<!-- rocketmq --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version></dependency>
server:# 发送和接收的服务端口不要重复port: 8800# 可选,用于 三、创建配置映射类rocketmq:nameServerAddr: localhost:9876topicName: send-email-topictopicTag: tag1producerGroupName: sendMailProducerconsumerGroupName: sendMailConsumer
@Data@Configuration@ConfigurationProperties("rocketmq")public class RocketMqPojo implements Serializable {private String nameServerAddr;private String topicName;private String topicTag;private String producerGroupName;private String consumerGroupName;}
调用 rocketMqPojo 的配置可根据实际需求,可在业务过程中再配置(如:五、发送信息)
@Configuration// 引入配置映射类@Import(com.lidaye.config.RocketMqPojo.class)public class SendMailComponent {@Resourceprivate RocketMqPojo rocketMqPojo;@Beanpublic DefaultMQProducer defaultMQProducer() throws MQClientException {// 创建生产者,制定其生产者组DefaultMQProducer producer = new DefaultMQProducer();//设置服务器地址producer.setNamesrvAddr(rocketMqPojo.getNameServerAddr());// 启动producer.start();return producer;}}
@Configuration// 引入配置映射类@Import(com.lidaye.config.RocketMqPojo.class)public class MessageConsumerCompoment {@Resourceprivate RocketMqPojo rocketMqPojo;@Beanpublic DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {// 创建消费者,制定其消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rocketMqPojo.getConsumerGroupName());// 配置服务器地址consumer.setNamesrvAddr(rocketMqPojo.getNameServerAddr());// 设置标题consumer.subscribe(rocketMqPojo.getTopicName(), rocketMqPojo.getTopicTag());// 设置消费者的信息偏移量consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 收信息时的逻辑msgs.forEach(mt -> {System.out.println(new String(mt.getBody()));});// 返回状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动consumer.start();return consumer;}}
@RestControllerpublic class RockerController {@Resourceprivate DefaultMQProducer defaultMQProducer;@Resourceprivate RocketMqPojo rocketMqPojo;@GetMapping("/SendMessage")public String sendMessage() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {// 实例化信息类Message message = new Message();// 配置信息内容message.setBody("信息内容".getBytes());// 设置标题message.setTopic(rocketMqPojo.getTopicName());message.setTags(rocketMqPojo.getTopicTag());// 发送信息defaultMQProducer.send(message);return "success";}}
参考1:https://www.jianshu.com/p/cc5c10221aa1
参考2:https://www.jianshu.com/p/694d6d2676ff
参考3:https://juejin.im/post/5d3bef91f265da1b725c4b3d
参考4:https://blog.csdn.net/lihongtai/article/details/84642817
TransactionListener 事务回查监听器
@Componentpublic class PointTransactionListener implements TransactionListener {/*** 根据消息发送的结果 判断是否执行本地事务* @param msg* @param arg* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {return LocalTransactionState.UNKNOW;}/*** RocketMQ 回调 根据本地事务是否执行成功 告诉broker 此消息是否投递成功* @param msg* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return LocalTransactionState.COMMIT_MESSAGE;}}
/*** 事务回查监听器*/@Resourceprivate PointTransactionListener pointTransactionListener;@Beanpublic TransactionMQProducer transactionMQProducer() throws MQClientException {// 创建事务生产者,制定其生产者组TransactionMQProducer producer = new TransactionMQProducer("T"+rocketMqPojo.getProducerGroupName());//设置服务器地址producer.setNamesrvAddr(rocketMqPojo.getNameServerAddr());// 设置事务决断处理类producer.setTransactionListener(pointTransactionListener);// 启动producer.start();return producer;}