[关闭]
@boothsun 2017-09-09T02:25:31.000000Z 字数 4540 阅读 1876

ActiveMQ 功能(消息消费端特性)

消息队列


说明:官方文档
参考好的博文:ActiveMQ 分析系列(三)

1 异步分发机制(Consumer Dispatch Async)

broker将消息发送给客户端默认是异步的,异步就意味着多线程和上下文切换的消耗。但异步能将资源最大利用,因为不同的客户端访问的环境可能不同,速度有快有慢。如果认为客户端都能足够快,且不想将资源消耗在上下文切换上,可以设置为同步。如下:

  1. //1. connect级别
  2. ((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);
  3. //2. connectFactory级别
  4. ((ActiveMQConnection)connection).setDispatchAsync(false);
  5. //3. 消息级别
  6. queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
  7. consumer = session.createConsumer(queue);
  8. //4. broker级别(直接修改activemq.xml中的配置)
  9. <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" disableAsyncDispatch="true"/>

2 消息消费优先级(Consumer Priority)

消息的发送是可以有权重级别的,同样消息的消费也是可以有优先级的;消息的消费端可以在创建的时候设置级别:

  1. queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
  2. consumer = session.createConsumer(queue);

优先级值的范围是:0~127,高级别能接受低级别的消息。如果在生产者端设置的级别是4,那么3级别的消费者不能马上收到该消息,而在5和6之间,broker会把消息优先发送给6。只有在消息堆积超过broker队列或缓存极限时,才会把消息发给低级别的消费者。

3 Exclusive Consumer

一般来说,消息队列都会保证queue当中的消息消费顺序。然而如果有多个consumer(比如集群环境下多台机器)同时消费同一个queue,那么这时就不能保证消息的消费顺序了。

有时候,消息的消费顺序是非常重要的,比如:退款消息肯定要在支付成功消息之后被消费。为了能顺序的消费消息,我们只能启动一个consumer来消费这个queue,但是这样就存在单点故障的隐患。

那么如何提高consumer的可用性呢?通常的做法是启动多个consumer,让其中一个consumer成为master,其他的为slave,成为master的consumer才让消费queue中的消息(通常的做法是使用zookeeper进行选主)。

但是这仍然没有解决全部问题。假设master在要消费一条消息之前,发了一次很长的gc,zookeeper认为master挂了,重新选主,一个slave被提升为master,并且开始消费这个queue的消息,这时原来的master从gc中恢复,在一定的时间范围内,原master还没有收到zookeeper通知它已经不是主了,这时它会继续消费queue中的消息。也就是说,在这段时刻,我们有2个认为自己是master的consumer。这种现象是分布式系统中所说的脑裂。zookeeper可以用来选主,zookeeper会保证只选出一个主,但是zookee并不能保证在某一时间系统中只有一个主,也就是说用了zookeeper并不能保证脑裂不会发生。zookeeper并不是用来防止脑裂的。

那么如何解决这问题那?很多mq都有exclusive consumer的概念,我们可以用它来解决这个问题。ActiveMQ也不例外。

ActiveMQ能够保证在集群环境下,同一个queue的消息消费行为只绑定到单一的客户端上;但如果被绑定的客户端发生故障宕机,则会自动转移到其他客户端。这其实和zookeeper选主一样,只不过ActiveMQ保证了同一时刻只能有一个Master。ActiveMQ 使用exclusive consumer的方式如下:

  1. queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
  2. consumer = session.createConsumer(queue);

但是,如果使用独家模式,我们就会发现消费者处理消息的能力将会严重下降。即使集群中有多台机器,使用独占模式之后,也就只能有一台机器进行消息处理。

4 管理持久化订阅者(Manage Durable Subscribers)

消息的持久化,保证了消息者离线后,再次进入系统,不会错过消息,但是这样也很耗费系统资源。而且一些应用程序发送的消息是具有时效性的,一旦客户端长时间不消费此消息,我们就需要将这些过期的消息进行删除,防止消息堆积。另外,为了更好的系统性能,我们还需要断开与那些不活跃消费者的连接。

过期消息检测周期配置:

  1. <policyEntry topic=">" expireMessagesPeriod="300000"/>

上面的配置表示每隔5分钟检测删除过期消息。

删除长期不活跃的消费者:

属性 默认 描述
offlineDurableSubscriberTimeout -1 在此之后,我们删除不活动的持久子站的时间(以毫秒为单位)。默认值-1,表示不删除它们
offlineDurableSubscriberTaskSchedule 300000 我们多久检查一次(毫秒)

示例配置如下:

  1. <broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000">

上面的配置意味着我们每隔一小时检查一次,并删除已经离线一天的用户。

5 消息分组(Message Groups)

6 消息重发策略(Redelivery Policy)

7 追溯消费者(Retroactive Consumer)

Retroactive Consumer属于非持久化订阅者,但它是消费 非持久化消息的订阅者。(其他非持久化订阅者 可以消费持久化消息)

  1. ActiveMQ Broker可以为各种Topic缓存消息(但不支持temporary topic和advisory topic)。这说明:该机制只针对于Topic而言。
  2. 缓存的消息只会发给Retroactive Consumer,并不会发送给持久化订阅者。

那非持久化订阅者如何成为Retroactive Consumer呢?最简单的方式是在创建Topic的时候指定consumer为retroactive。

  1. topic = new ActiveMQTopic("TEST.Topic?consumer.retroactive=true");
  2. consumer = session.createConsumer(topic);

8 JMS Selectors

  1. 原文地址:深入浅出JMS(四)--ActiveMQ消息选择器Selector

JMS Selectors is a filter.   JMS选择器可以支持对消息的筛选。消息大多数情况都是发送到broker的,在知道Destination的情况下,都可以消费,因此有些情况下需要我们将消息分组、隔离,或者指定A消息,只能由A消费者消费等情况。下面我们举个例子,限制消息生产者A生产的消息只能让A消费者消费,生产者B生产的消息只能让B消费者消费。

发送者代码:

  1. Destination send_destination = session.createQueue("order_queue");
  2. MessageProducer producer = session.createProducer(send_destination);
  3. for(int i =0;i<300;i++){
  4. // 创建一个文本消息
  5. TextMessage message = session.createTextMessage("A-张三-"+i);
  6. // 这里我们分别设置对应的消息信息,当成是一组消息
  7. message.setStringProperty("JMSXGroupID","A");
  8. producer.send(message);
  9. TextMessage message1 = session.createTextMessage("B-李四-"+i);
  10. message1.setStringProperty("JMSXGroupID","B");
  11. producer.send(message1);
  12. }

消费者代码:

  1. Destination destination = session.createQueue("order_queue");
  2. // 创建消费者
  3. MessageConsumer consumer = session.createConsumer(destination,"JMSXGroupID='A'");
  4. consumer.setMessageListener(new MessageListener() {
  5. @Override
  6. public void onMessage(Message message) {
  7. TextMessage textMessage = (TextMessage) message;
  8. try {
  9. System.out.println("A:"+textMessage.getText());
  10. } catch (JMSException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. });

消费者B代码:

  1. // 指定接收消息的地方
  2. Destination destination = session.createQueue("order_queue");
  3. // 创建消费者
  4. MessageConsumer consumer = session.createConsumer(destination,"JMSXGroupID='B'");
  5. consumer.setMessageListener(new MessageListener() {
  6. @Override
  7. public void onMessage(Message message) {
  8. TextMessage textMessage = (TextMessage) message;
  9. try {
  10. System.out.println("B:"+textMessage.getText());
  11. } catch (JMSException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. });

然后开启A、B消费者监听,启动发送者,那么就能看到消息分别消费了。同时Selector支持一些表达式的过滤,比如可以写成:JMSXGroupID = 'A' or JMSXGroupID = 'B'。

9 慢消费者处理策略

10 订阅恢复逻辑(Subscription Recovery Policy)

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注