@boothsun
2017-09-06T12:45:44.000000Z
字数 3291
阅读 1641
消息队列
就像订阅报纸,我们可以选择一份或者多份报纸。比如:北京日报、人民日报。这些报纸就相当于发布订阅模型中的topic。如果有很多人订阅了相同的报纸,那我们就在同一个topic中注册,对于报纸发行方,它就和所有的订阅者形成了一对多的关系。并且,当你开始订阅报纸之后,正常情况下,你订阅之前的报纸你是拿不到的,除非有人给你留存了;
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-core --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-core</artifactId><version>5.5.1</version></dependency>
/*** 基于发布模式的 消息生产者*/public class JMSPubProducer {//默认连接用户名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接密码private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接地址private static final String BROKEURL = "tcp://xx.xx.xx.xx:61616";//发送的消息数量private static final int SENNUM = 10;public static void main(String[] args){ConnectionFactory factory ; //连接工厂Connection connection = null ; //连接Session session ; //会话,接收或者发送消息的线程Destination destination; //消息的目的地MessageProducer messageProducer; //消息生产者//实例化连接工厂factory = new ActiveMQConnectionFactory(JMSPubProducer.USERNAME, JMSPubProducer.PASSWORD, JMSPubProducer.BROKEURL);//通过连接工厂获取connectiontry {connection = factory.createConnection();connection.start(); //启动连接//创建sessionsession = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//创建主题destination = session.createTopic("topic1");//创建消息发布者messageProducer = session.createProducer(destination);//发送消息sendMessage(session, messageProducer);session.commit();} catch (JMSException e) {e.printStackTrace();}finally{if (connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}/*** 发送消息* @param session* @param mp* @throws JMSException*/public static void sendMessage(Session session, MessageProducer mp) throws JMSException{for(int i = 0 ; i < JMSPubProducer.SENNUM;i++){TextMessage message = session.createTextMessage("ActiveMq 发布的消息 ---> " + i);System.out.println("发布消息:" + "ActiveMq 发布的消息" + i);mp.send(message);}}}
import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** Created by boothsun on 2017/9/6.*/public class JMSSubConsumer {//默认连接用户名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接密码private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认的连接地址private static final String BROKEURL = "tcp://119.29.182.145:61616";public static void main(String[] args) {ConnectionFactory factory ; //连接工厂Connection connection = null ; //连接Session session ; //会话,接收或者发送消息的线程Destination destination; //消息的目的地MessageConsumer messageConsumer; //消息消费者//实例化连接工厂factory = new ActiveMQConnectionFactory(JMSSubConsumer.USERNAME, JMSSubConsumer.PASSWORD, JMSSubConsumer.BROKEURL);//通过连接工厂获取connectiontry {connection = factory.createConnection();connection.start(); //启动连接//创建sessionsession = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);//创建连接消息队列,消息到达的目的地destination = session.createTopic("topic1");//创建消费者messageConsumer = session.createConsumer(destination);//注册消息监听messageConsumer.setMessageListener(new Listener1());}catch(Exception e){e.printStackTrace();}}}/*** 订阅者1的监听* 消息监听类* @author xx*/class Listener1 implements MessageListener {@Overridepublic void onMessage(Message message) {try {System.out.println("订阅者一收到的消息:" + ((TextMessage)message).getText());} catch (JMSException e) {e.printStackTrace();}}}
对于订阅发布模型,要先启动订阅者,订阅者先订阅topic,再发布消息;否则先发布的消息将无法被后启动的订阅者消费。就像是上面说的订阅报纸,订阅之前的报纸,正常情况下,订阅者是拿不到的。