[关闭]
@boothsun 2017-09-06T12:45:44.000000Z 字数 3291 阅读 1398

ActiveMQ基于JMS的pub/sub传播机制

消息队列


  1. 原文地址:[ActiveMQ实战]基于JMS的pub/sub传播机制

发布订阅模型

就像订阅报纸,我们可以选择一份或者多份报纸。比如:北京日报、人民日报。这些报纸就相当于发布订阅模型中的topic。如果有很多人订阅了相同的报纸,那我们就在同一个topic中注册,对于报纸发行方,它就和所有的订阅者形成了一对多的关系。并且,当你开始订阅报纸之后,正常情况下,你订阅之前的报纸你是拿不到的,除非有人给你留存了;

pom.xml文件内容

  1. <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core -->
  2. <dependency>
  3. <groupId>org.apache.activemq</groupId>
  4. <artifactId>activemq-core</artifactId>
  5. <version>5.5.1</version>
  6. </dependency>

消息生产者

  1. /**
  2. * 基于发布模式的 消息生产者
  3. */
  4. public class JMSPubProducer {
  5. //默认连接用户名
  6. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
  7. //默认连接密码
  8. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  9. //默认的连接地址
  10. private static final String BROKEURL = "tcp://xx.xx.xx.xx:61616";
  11. //发送的消息数量
  12. private static final int SENNUM = 10;
  13. public static void main(String[] args){
  14. ConnectionFactory factory ; //连接工厂
  15. Connection connection = null ; //连接
  16. Session session ; //会话,接收或者发送消息的线程
  17. Destination destination; //消息的目的地
  18. MessageProducer messageProducer; //消息生产者
  19. //实例化连接工厂
  20. factory = new ActiveMQConnectionFactory(JMSPubProducer.USERNAME, JMSPubProducer.PASSWORD, JMSPubProducer.BROKEURL);
  21. //通过连接工厂获取connection
  22. try {
  23. connection = factory.createConnection();
  24. connection.start(); //启动连接
  25. //创建session
  26. session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
  27. //创建主题
  28. destination = session.createTopic("topic1");
  29. //创建消息发布者
  30. messageProducer = session.createProducer(destination);
  31. //发送消息
  32. sendMessage(session, messageProducer);
  33. session.commit();
  34. } catch (JMSException e) {
  35. e.printStackTrace();
  36. }finally{
  37. if (connection != null) {
  38. try {
  39. connection.close();
  40. } catch (JMSException e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }
  45. }
  46. /**
  47. * 发送消息
  48. * @param session
  49. * @param mp
  50. * @throws JMSException
  51. */
  52. public static void sendMessage(Session session, MessageProducer mp) throws JMSException{
  53. for(int i = 0 ; i < JMSPubProducer.SENNUM;i++){
  54. TextMessage message = session.createTextMessage("ActiveMq 发布的消息 ---> " + i);
  55. System.out.println("发布消息:" + "ActiveMq 发布的消息" + i);
  56. mp.send(message);
  57. }
  58. }
  59. }

消息消费者(可以有多个)

  1. import org.apache.activemq.ActiveMQConnection;
  2. import org.apache.activemq.ActiveMQConnectionFactory;
  3. import javax.jms.*;
  4. /**
  5. * Created by boothsun on 2017/9/6.
  6. */
  7. public class JMSSubConsumer {
  8. //默认连接用户名
  9. private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
  10. //默认连接密码
  11. private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
  12. //默认的连接地址
  13. private static final String BROKEURL = "tcp://119.29.182.145:61616";
  14. public static void main(String[] args) {
  15. ConnectionFactory factory ; //连接工厂
  16. Connection connection = null ; //连接
  17. Session session ; //会话,接收或者发送消息的线程
  18. Destination destination; //消息的目的地
  19. MessageConsumer messageConsumer; //消息消费者
  20. //实例化连接工厂
  21. factory = new ActiveMQConnectionFactory(JMSSubConsumer.USERNAME, JMSSubConsumer.PASSWORD, JMSSubConsumer.BROKEURL);
  22. //通过连接工厂获取connection
  23. try {
  24. connection = factory.createConnection();
  25. connection.start(); //启动连接
  26. //创建session
  27. session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
  28. //创建连接消息队列,消息到达的目的地
  29. destination = session.createTopic("topic1");
  30. //创建消费者
  31. messageConsumer = session.createConsumer(destination);
  32. //注册消息监听
  33. messageConsumer.setMessageListener(new Listener1());
  34. }catch(Exception e){
  35. e.printStackTrace();
  36. }
  37. }
  38. }
  39. /**
  40. * 订阅者1的监听
  41. * 消息监听类
  42. * @author xx
  43. */
  44. class Listener1 implements MessageListener {
  45. @Override
  46. public void onMessage(Message message) {
  47. try {
  48. System.out.println("订阅者一收到的消息:" + ((TextMessage)message).getText());
  49. } catch (JMSException e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. }

测试类

对于订阅发布模型,要先启动订阅者,订阅者先订阅topic,再发布消息;否则先发布的消息将无法被后启动的订阅者消费。就像是上面说的订阅报纸,订阅之前的报纸,正常情况下,订阅者是拿不到的。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注