[关闭]
@adamhand 2019-01-07T15:03:19.000000Z 字数 30959 阅读 1114

RabbitMQ系列(4)--Spring 整合RabbitMQ


简介

Spring整合RabbitMQ需要添加以下Maven依赖:

  1. <dependency>
  2. <groupId>org.springframework.amqp</groupId>
  3. <artifactId>spring-rabbit</artifactId>
  4. <version>2.0.2.RELEASE</version>
  5. </dependency>

使用Spring-rabbitmq时主要有三个API比较重要:

  1. MessageListenerContainer: 用来监听容器,为消息入队提供异步处理
  2. RabbitAdmin: 提供了一些列方法,比如declareExchange()、declareQueue()、declareBinding()用于声明交换机、队列、绑定等
  3. RabbitTemplate: 用于RabbitMQ消息的发送和接收

下面分别讲解Spring AMQP手动声明Spring AMQP自动声明Spring AMQP自定义消息处理器Spring AMQP消息转换器

Spring AMQP手动声明

手动声明的意思是要用户手动声明交换器、队列和绑定关系,主要步骤如下:

创建生产者配置类

在生产者配置类中将RabbitAdmin、RabbitTemplate纳入Spring管理,通过Spring自动扫描Bean的方式可以得到RabbitAdmin和RabbitTemplate对象。

  1. @Configuration
  2. public class ProducerConfig {
  3. private static final String IPADDRESS = "192.168.1.107";
  4. private static final int PORT = 5672;
  5. private static final String VIRTUALHOST = "/";
  6. private static final String USERNAME = "root";
  7. private static final String PASSWORD = "root";
  8. @Bean
  9. public CachingConnectionFactory connectionFactory(){
  10. ConnectionFactory factory = new ConnectionFactory();
  11. /**
  12. * 配置连接信息
  13. */
  14. factory.setHost(IPADDRESS);
  15. factory.setPort(PORT);
  16. factory.setVirtualHost(VIRTUALHOST);
  17. factory.setUsername(USERNAME);
  18. factory.setPassword(PASSWORD);
  19. /**
  20. * 网络异常自动回复,每隔10秒重连一次
  21. */
  22. factory.setAutomaticRecoveryEnabled(true);
  23. factory.setNetworkRecoveryInterval(1000);
  24. /**
  25. * factory的属性
  26. */
  27. Map<String, Object> propertiesMap = new HashMap<>();
  28. propertiesMap.put("prindipal", "adamhand");
  29. propertiesMap.put("description", "Order System");
  30. propertiesMap.put("email", "adaihand@163.com");
  31. factory.setClientProperties(propertiesMap);
  32. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(factory);
  33. return cachingConnectionFactory;
  34. }
  35. /**
  36. * 用于声明交换机 队列 绑定等
  37. * @param factory
  38. * @return
  39. */
  40. @Bean
  41. public RabbitAdmin rabbitAdmin(CachingConnectionFactory factory){
  42. return new RabbitAdmin(factory);
  43. }
  44. /**
  45. * 用于RabbitMQ消息的发送和接收
  46. * @param factory
  47. * @return
  48. */
  49. @Bean
  50. public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory){
  51. return new RabbitTemplate(factory);
  52. }
  53. }

生产者启动类

在生产者启动类中声明交换器、队列和绑定关系(注意@ComponentScan扫描的包不能错)。

  1. @ComponentScan(basePackages = "com.rabbitmq.SpringAMQP.ManualDeclare.Producer")
  2. public class ProducerBootstrap {
  3. private static final String EXCHANGE_NAME = "adamhand.order";
  4. private static final String ROUTING_KEUY = "add";
  5. private static final String CONTENT_TYPE = "UTF-8";
  6. public static void main(String[] args) {
  7. AnnotationConfigApplicationContext context =
  8. new AnnotationConfigApplicationContext(ProducerBootstrap.class);
  9. RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
  10. RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
  11. /**
  12. * 声明交换机
  13. */
  14. rabbitAdmin.declareExchange(
  15. new DirectExchange(EXCHANGE_NAME, true, false,
  16. new HashMap<String, Object>()));
  17. /**
  18. * 声明要发送的消息
  19. */
  20. MessageProperties msgProperties = new MessageProperties();
  21. msgProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  22. msgProperties.setContentType(CONTENT_TYPE);
  23. Message msg = new Message("order information".getBytes(), msgProperties);
  24. /**
  25. * 发布消息
  26. */
  27. rabbitTemplate.send(EXCHANGE_NAME, ROUTING_KEUY, msg);
  28. }
  29. }

消费者配置类

创建消费者配置类,将RabbitAdmin纳入Spring管理,并在MessageListenerContainer类中定义了消息消费的逻辑。

  1. @Configuration
  2. public class ConsumerConfig {
  3. private static final String IPADDRESS = "192.168.1.107";
  4. private static final int PORT = 5672;
  5. private static final String VIRTUALHOST = "/";
  6. private static final String USERNAME = "root";
  7. private static final String PASSWORD = "root";
  8. private static final String QUEUE_NAMES = "adamhand.order.add";
  9. private static final int CONSUMER_NUMBER = 5;
  10. private static final int MAX_CONSUMER_NUMBER = 10;
  11. @Bean
  12. public CachingConnectionFactory connectionFactory(){
  13. ConnectionFactory factory = new ConnectionFactory();
  14. factory.setHost(IPADDRESS);
  15. factory.setPort(PORT);
  16. factory.setVirtualHost(VIRTUALHOST);
  17. factory.setUsername(USERNAME);
  18. factory.setPassword(PASSWORD);
  19. factory.setAutomaticRecoveryEnabled(true);
  20. factory.setNetworkRecoveryInterval(1000);
  21. Map<String, Object> propertiesMap = new HashMap<>();
  22. propertiesMap.put("prindipal", "adamhand");
  23. propertiesMap.put("description", "Order System");
  24. propertiesMap.put("email", "adaihand@163.com");
  25. factory.setClientProperties(propertiesMap);
  26. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(factory);
  27. return cachingConnectionFactory;
  28. }
  29. @Bean
  30. public RabbitAdmin rabbitAdmin(CachingConnectionFactory factory){
  31. return new RabbitAdmin(factory);
  32. }
  33. /**
  34. * 监听容器 为消息入队提供异步处理
  35. * @param factory
  36. * @return
  37. */
  38. @Bean
  39. public MessageListenerContainer messageListenerContainer(CachingConnectionFactory factory){
  40. SimpleMessageListenerContainer msgLIstenerContainer = new SimpleMessageListenerContainer();
  41. msgLIstenerContainer.setConnectionFactory(factory);
  42. msgLIstenerContainer.setQueueNames(QUEUE_NAMES);
  43. /**
  44. * 设置消费者线程数和最大线程数
  45. */
  46. msgLIstenerContainer.setConcurrentConsumers(CONSUMER_NUMBER);
  47. msgLIstenerContainer.setMaxConcurrentConsumers(MAX_CONSUMER_NUMBER);
  48. /**
  49. * 设置消费者属性
  50. */
  51. Map<String, Object> argsMap = new HashMap<>();
  52. msgLIstenerContainer.setConsumerArguments(argsMap);
  53. /**
  54. * 设置消费者标签
  55. */
  56. msgLIstenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
  57. @Override
  58. public String createConsumerTag(String s) {
  59. return "Consumers of Order System";
  60. }
  61. });
  62. /**
  63. * 选择手动设置消费时机
  64. */
  65. msgLIstenerContainer.setAutoStartup(false);
  66. /**
  67. * 消息后置处理器。接收到消息之后打印出来。
  68. */
  69. msgLIstenerContainer.setMessageListener(new MessageListener() {
  70. @Override
  71. public void onMessage(Message message) {
  72. try {
  73. System.out.println(new String(message.getBody(), "UTF-8"));
  74. System.out.println(message.getMessageProperties());
  75. } catch (UnsupportedEncodingException e) {
  76. e.printStackTrace();
  77. }
  78. }
  79. });
  80. return msgLIstenerContainer;
  81. }
  82. }

创建消费者启动类

  1. @ComponentScan(basePackages = "com.rabbitmq.SpringAMQP.ManualDeclare.Consumer")
  2. public class ConsumerBootstrap {
  3. private static final String EXCHANGE_NAME = "adamhand.order";
  4. private static final String QUEUE_NAME = "adamhand.order.add";
  5. private static final String ROUTING_KEY = "add";
  6. public static void main(String[] args) {
  7. AnnotationConfigApplicationContext context =
  8. new AnnotationConfigApplicationContext(ConsumerBootstrap.class);
  9. RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
  10. MessageListenerContainer msgListenerContainer =
  11. context.getBean(MessageListenerContainer.class);
  12. /**
  13. * 声明一个direct类型的、持久化、非排他的交换器
  14. */
  15. rabbitAdmin.declareExchange(new DirectExchange(EXCHANGE_NAME,
  16. true, false,
  17. new HashMap<String, Object>()));
  18. /**
  19. * 声明一个持久化、非排他、非自动删除的队列
  20. */
  21. rabbitAdmin.declareQueue(new Queue(QUEUE_NAME,
  22. true, false, false,
  23. new HashMap<String, Object>()));
  24. /**
  25. * 将交换器和队列绑定
  26. */
  27. rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_NAME)).
  28. to(new DirectExchange(EXCHANGE_NAME)).
  29. with(ROUTING_KEY));
  30. /**
  31. * 开始监听
  32. */
  33. msgListenerContainer.start();
  34. }
  35. }

结果

打开rabbitmq-server(cmd 中输入rabbitmq-server.bat,如果安装了rabbitmq-service,可以通过rabbitmq-service start的方式打开)。接着打开消费者和she

Spring AMQP自动声明

Spring AMQP还提供了自动声明方式交换机、队列和绑定。可以直接把要自动声明的组件纳入Spring容器中管理即可。

自动声明发生在RabbitMQ第一次连接创建的时候,自动声明支持单个和批量自动声明。使用自动声明需要符合如下条件(下面的几个条件定义在RabbitAdmin的afterPropertiesSet方法中):

创建生产者配置类

将RabbitAdmin、RabbitTemplate纳入Spring管理。

  1. @Configuration
  2. public class ProducerConfig {
  3. private static final String IPADDRESS = "192.168.1.107";
  4. private static final int PORT = 5672;
  5. private static final String VIRTUALHOST = "/";
  6. private static final String USERNAME = "root";
  7. private static final String PASSWORD = "root";
  8. @Bean
  9. public CachingConnectionFactory connectionFactory(){
  10. ConnectionFactory factory = new ConnectionFactory();
  11. /**
  12. * 配置连接信息
  13. */
  14. factory.setHost(IPADDRESS);
  15. factory.setPort(PORT);
  16. factory.setVirtualHost(VIRTUALHOST);
  17. factory.setUsername(USERNAME);
  18. factory.setPassword(PASSWORD);
  19. /**
  20. * 网络异常自动回复,每隔10秒重连一次
  21. */
  22. factory.setAutomaticRecoveryEnabled(true);
  23. factory.setNetworkRecoveryInterval(1000);
  24. /**
  25. * factory的属性
  26. */
  27. Map<String, Object> propertiesMap = new HashMap<>();
  28. propertiesMap.put("prindipal", "adamhand");
  29. propertiesMap.put("description", "Order System");
  30. propertiesMap.put("email", "adaihand@163.com");
  31. factory.setClientProperties(propertiesMap);
  32. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(factory);
  33. return cachingConnectionFactory;
  34. }
  35. /**
  36. * 用于声明交换机 队列 绑定等
  37. * @param factory
  38. * @return
  39. */
  40. @Bean
  41. public RabbitAdmin rabbitAdmin(CachingConnectionFactory factory){
  42. return new RabbitAdmin(factory);
  43. }
  44. /**
  45. * 用于RabbitMQ消息的发送和接收
  46. * @param factory
  47. * @return
  48. */
  49. @Bean
  50. public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory){
  51. return new RabbitTemplate(factory);
  52. }
  53. }

创建生产者启动类

  1. @ComponentScan(basePackages = "com.rabbitmq.SpringAMQP.AutoDeclare.Producer")
  2. public class ProducerBootstrap {
  3. private static final String EXCHANGE_NAME = "adamhand.order";
  4. private static final String ROUTING_KEUY = "add";
  5. private static final String CONTENT_TYPE = "UTF-8";
  6. public static void main(String[] args) {
  7. AnnotationConfigApplicationContext context =
  8. new AnnotationConfigApplicationContext(ProducerBootstrap.class);
  9. RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
  10. RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
  11. /**
  12. * 声明交换机
  13. */
  14. rabbitAdmin.declareExchange(
  15. new DirectExchange(EXCHANGE_NAME, true, false,
  16. new HashMap<String, Object>()));
  17. /**
  18. * 声明要发送的消息
  19. */
  20. MessageProperties msgProperties = new MessageProperties();
  21. msgProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  22. msgProperties.setContentType(CONTENT_TYPE);
  23. Message msg = new Message("order information".getBytes(), msgProperties);
  24. /**
  25. * 发布消息
  26. */
  27. rabbitTemplate.send(EXCHANGE_NAME, ROUTING_KEUY, msg);
  28. }
  29. }

创建消费者配置类

将RabbitAdmin纳入Spring管理,并在MessageListenerContainer类中定义了消息消费的逻辑,并且在该配置类中声明交换机,队列,绑定。

  1. @Configuration
  2. public class ConsumerConfig {
  3. private static final String IPADDRESS = "192.168.1.107";
  4. private static final int PORT = 5672;
  5. private static final String VIRTUALHOST = "/";
  6. private static final String USERNAME = "root";
  7. private static final String PASSWORD = "root";
  8. private static final String QUEUE_NAMES = "adamhand.order.add";
  9. private static final String EXCHANGE_NAMES = "adamhand.order";
  10. private static final String ROUTING_KEY = "add";
  11. private static final int CONSUMER_NUMBER = 5;
  12. private static final int MAX_CONSUMER_NUMBER = 10;
  13. @Bean
  14. public CachingConnectionFactory connectionFactory(){
  15. ConnectionFactory factory = new ConnectionFactory();
  16. factory.setHost(IPADDRESS);
  17. factory.setPort(PORT);
  18. factory.setVirtualHost(VIRTUALHOST);
  19. factory.setUsername(USERNAME);
  20. factory.setPassword(PASSWORD);
  21. factory.setAutomaticRecoveryEnabled(true);
  22. factory.setNetworkRecoveryInterval(1000);
  23. Map<String, Object> propertiesMap = new HashMap<>();
  24. propertiesMap.put("prindipal", "adamhand");
  25. propertiesMap.put("description", "Order System");
  26. propertiesMap.put("email", "adaihand@163.com");
  27. factory.setClientProperties(propertiesMap);
  28. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(factory);
  29. return cachingConnectionFactory;
  30. }
  31. @Bean
  32. public RabbitAdmin rabbitAdmin(CachingConnectionFactory factory){
  33. return new RabbitAdmin(factory);
  34. }
  35. /**
  36. * 自动声明队列
  37. * @return
  38. */
  39. @Bean
  40. public Queue queue(){
  41. return new Queue(QUEUE_NAMES, true , false, false,
  42. new HashMap<String, Object>());
  43. }
  44. @Bean
  45. public Exchange exchange(){
  46. return new DirectExchange(EXCHANGE_NAMES, true, false,
  47. new HashMap<String, Object>());
  48. }
  49. @Bean
  50. public Binding binding(){
  51. return new Binding(QUEUE_NAMES, Binding.DestinationType.QUEUE,
  52. EXCHANGE_NAMES, ROUTING_KEY, new HashMap<String, Object>());
  53. }
  54. /**
  55. * 监听容器 为消息入队提供异步处理
  56. * @param factory
  57. * @return
  58. */
  59. @Bean
  60. public MessageListenerContainer messageListenerContainer(CachingConnectionFactory factory){
  61. SimpleMessageListenerContainer msgLIstenerContainer = new SimpleMessageListenerContainer();
  62. msgLIstenerContainer.setConnectionFactory(factory);
  63. msgLIstenerContainer.setQueueNames(QUEUE_NAMES);
  64. /**
  65. * 设置消费者线程数和最大线程数
  66. */
  67. msgLIstenerContainer.setConcurrentConsumers(CONSUMER_NUMBER);
  68. msgLIstenerContainer.setMaxConcurrentConsumers(MAX_CONSUMER_NUMBER);
  69. /**
  70. * 设置消费者属性
  71. */
  72. Map<String, Object> argsMap = new HashMap<>();
  73. msgLIstenerContainer.setConsumerArguments(argsMap);
  74. /**
  75. * 设置消费者标签
  76. */
  77. msgLIstenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
  78. @Override
  79. public String createConsumerTag(String s) {
  80. return "Consumers of Order System";
  81. }
  82. });
  83. /**
  84. * 选择自动设置消费时机
  85. */
  86. msgLIstenerContainer.setAutoStartup(true);
  87. /**
  88. * 消息后置处理器。接收到消息之后打印出来。
  89. */
  90. msgLIstenerContainer.setMessageListener(new MessageListener() {
  91. @Override
  92. public void onMessage(Message message) {
  93. try {
  94. System.out.println(new String(message.getBody(), "UTF-8"));
  95. System.out.println(message.getMessageProperties());
  96. } catch (UnsupportedEncodingException e) {
  97. e.printStackTrace();
  98. }
  99. }
  100. });
  101. return msgLIstenerContainer;
  102. }
  103. }

创建消费者启动类

  1. @ComponentScan(basePackages = "com.rabbitmq.SpringAMQP.AutoDeclare.Consumer")
  2. public class ConsumerBootstrap {
  3. public static void main(String[] args) {
  4. new AnnotationConfigApplicationContext(ConsumerBootstrap.class);
  5. }
  6. }

结果

启动消费者和生产者,控制台打印结果如下:

  1. order information
  2. MessageProperties [headers={}, contentType=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=adamhand.order, receivedRoutingKey=add, deliveryTag=1, consumerTag=Consumers of Order System, consumerQueue=adamhand.order.add]

自定义消息处理器--MessageListenerAdapte

上面的两个例子都是使用MessageListenerContainer中传递了MessageListener的方式来处理消费者的消息处理逻辑的,但是这样有一个缺点:不好扩展。如果想再多加上一些消息处理的逻辑,势必要修改MessageListener的代码。

SpringAMQP提供了一种消息处理器适配器的功能,它可以把一个纯POJO类适配成一个可以处理消息的处理器,默认处理消息的方法为handleMessage,可以通过setDefaultListenerMethod方法进行修改。

创建生产者配置类

  1. @Configuration
  2. public class ProducerConfig {
  3. private static final String IPADDRESS = "192.168.1.107";
  4. private static final int PORT = 5672;
  5. private static final String VIRTUALHOST = "/";
  6. private static final String USERNAME = "root";
  7. private static final String PASSWORD = "root";
  8. @Bean
  9. public CachingConnectionFactory connectionFactory(){
  10. ConnectionFactory factory = new ConnectionFactory();
  11. /**
  12. * 配置连接信息
  13. */
  14. factory.setHost(IPADDRESS);
  15. factory.setPort(PORT);
  16. factory.setVirtualHost(VIRTUALHOST);
  17. factory.setUsername(USERNAME);
  18. factory.setPassword(PASSWORD);
  19. /**
  20. * 网络异常自动回复,每隔10秒重连一次
  21. */
  22. factory.setAutomaticRecoveryEnabled(true);
  23. factory.setNetworkRecoveryInterval(1000);
  24. /**
  25. * factory的属性
  26. */
  27. Map<String, Object> propertiesMap = new HashMap<>();
  28. propertiesMap.put("prindipal", "adamhand");
  29. propertiesMap.put("description", "Order System");
  30. propertiesMap.put("email", "adaihand@163.com");
  31. factory.setClientProperties(propertiesMap);
  32. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(factory);
  33. return cachingConnectionFactory;
  34. }
  35. /**
  36. * 用于声明交换机 队列 绑定等
  37. * @param factory
  38. * @return
  39. */
  40. @Bean
  41. public RabbitAdmin rabbitAdmin(CachingConnectionFactory factory){
  42. return new RabbitAdmin(factory);
  43. }
  44. /**
  45. * 用于RabbitMQ消息的发送和接收
  46. * @param factory
  47. * @return
  48. */
  49. @Bean
  50. public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory){
  51. return new RabbitTemplate(factory);
  52. }
  53. }

创建生产者启动类

  1. @ComponentScan(basePackages = "com.rabbitmq.SpringAMQP.MsgListenerAdapte.Producer")
  2. public class ProducerBootstrap {
  3. private static final String EXCHANGE_NAME = "adamhand.order";
  4. private static final String ROUTING_KEUY = "add";
  5. private static final String CONTENT_TYPE = "UTF-8";
  6. public static void main(String[] args) {
  7. AnnotationConfigApplicationContext context =
  8. new AnnotationConfigApplicationContext(ProducerBootstrap.class);
  9. RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
  10. RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
  11. /**
  12. * 声明交换机
  13. */
  14. rabbitAdmin.declareExchange(
  15. new DirectExchange(EXCHANGE_NAME, true, false,
  16. new HashMap<String, Object>()));
  17. /**
  18. * 声明要发送的消息
  19. */
  20. MessageProperties msgProperties = new MessageProperties();
  21. msgProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  22. msgProperties.setContentType(CONTENT_TYPE);
  23. Message msg = new Message("order information".getBytes(), msgProperties);
  24. /**
  25. * 发布消息
  26. */
  27. rabbitTemplate.send(EXCHANGE_NAME, ROUTING_KEUY, msg);
  28. }
  29. }

创建消费者消息处理器类

它可是是纯POJO类。

  1. public class MessageHandle {
  2. public void add(byte[] message){
  3. try {
  4. System.out.println(new String(message,"UTF-8"));
  5. } catch (UnsupportedEncodingException e) {
  6. e.printStackTrace();
  7. }
  8. }
  9. }

创建消费者配置类

配置自定义消息处理器(将adamhand.order.add队列使用自定义消息处理类的add方法进行处理)。

  1. @Configuration
  2. public class ConsumerConfig {
  3. private static final String IPADDRESS = "192.168.1.107";
  4. private static final int PORT = 5672;
  5. private static final String VIRTUALHOST = "/";
  6. private static final String USERNAME = "root";
  7. private static final String PASSWORD = "root";
  8. private static final String QUEUE_NAMES = "adamhand.order.add";
  9. private static final String EXCHANGE_NAMES = "adamhand.order";
  10. private static final String ROUTING_KEY = "add";
  11. private static final int CONSUMER_NUMBER = 5;
  12. private static final int MAX_CONSUMER_NUMBER = 10;
  13. @Bean
  14. public CachingConnectionFactory connectionFactory(){
  15. ConnectionFactory factory = new ConnectionFactory();
  16. factory.setHost(IPADDRESS);
  17. factory.setPort(PORT);
  18. factory.setVirtualHost(VIRTUALHOST);
  19. factory.setUsername(USERNAME);
  20. factory.setPassword(PASSWORD);
  21. factory.setAutomaticRecoveryEnabled(true);
  22. factory.setNetworkRecoveryInterval(1000);
  23. Map<String, Object> propertiesMap = new HashMap<>();
  24. propertiesMap.put("prindipal", "adamhand");
  25. propertiesMap.put("description", "Order System");
  26. propertiesMap.put("email", "adaihand@163.com");
  27. factory.setClientProperties(propertiesMap);
  28. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(factory);
  29. return cachingConnectionFactory;
  30. }
  31. @Bean
  32. public RabbitAdmin rabbitAdmin(CachingConnectionFactory factory){
  33. return new RabbitAdmin(factory);
  34. }
  35. /**
  36. * 自动声明队列
  37. * @return
  38. */
  39. @Bean
  40. public Exchange exchange(){
  41. return new DirectExchange(EXCHANGE_NAMES, true, false,
  42. new HashMap<String, Object>());
  43. }
  44. @Bean
  45. public Binding binding(){
  46. return new Binding(QUEUE_NAMES, Binding.DestinationType.QUEUE,
  47. EXCHANGE_NAMES, ROUTING_KEY, new HashMap<String, Object>());
  48. }
  49. /**
  50. * 监听容器 为消息入队提供异步处理
  51. * @param factory
  52. * @return
  53. */
  54. @Bean
  55. public MessageListenerContainer messageListenerContainer(CachingConnectionFactory factory){
  56. SimpleMessageListenerContainer msgLIstenerContainer = new SimpleMessageListenerContainer();
  57. msgLIstenerContainer.setConnectionFactory(factory);
  58. msgLIstenerContainer.setQueueNames(QUEUE_NAMES);
  59. /**
  60. * 设置消费者线程数和最大线程数
  61. */
  62. msgLIstenerContainer.setConcurrentConsumers(CONSUMER_NUMBER);
  63. msgLIstenerContainer.setMaxConcurrentConsumers(MAX_CONSUMER_NUMBER);
  64. /**
  65. * 设置消费者属性
  66. */
  67. Map<String, Object> argsMap = new HashMap<>();
  68. msgLIstenerContainer.setConsumerArguments(argsMap);
  69. /**
  70. * 设置消费者标签
  71. */
  72. msgLIstenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
  73. @Override
  74. public String createConsumerTag(String s) {
  75. return "Consumers of Order System";
  76. }
  77. });
  78. /**
  79. * 选择自动设置消费时机
  80. */
  81. msgLIstenerContainer.setAutoStartup(true);
  82. /**
  83. * 创建消息适配器
  84. */
  85. MessageListenerAdapter msgListenerAdapter = new MessageListenerAdapter(new MessageHandle());
  86. msgListenerAdapter.setDefaultListenerMethod("handleMessage");
  87. Map<String, String> map = new HashMap<>();
  88. map.put(QUEUE_NAMES, ROUTING_KEY);
  89. msgListenerAdapter.setQueueOrTagToMethodName(map);
  90. msgLIstenerContainer.setMessageListener(msgListenerAdapter);
  91. return msgLIstenerContainer;
  92. }
  93. }

创建消费者启动类

  1. @ComponentScan(basePackages = "com.rabbitmq.SpringAMQP.MsgListenerAdapte.Consumer")
  2. public class ConsumerBootstrap {
  3. public static void main(String[] args) {
  4. new AnnotationConfigApplicationContext(ConsumerBootstrap.class);
  5. }
  6. }

结果

启动消费者和生产者,打印结果如下:

  1. order information

如上Demo说明我们可以将一个纯POJO类定义为消息处理器,并且不用去扩展MessageListener/ChannelAwareMessageListener接口,关于自定义处理器方法的参数默认情况下为byte[]类型,这是由Spring AMQP默认消息转换器(SimpleMessageConverter)决定的,接下来将介绍Spring AMQP的消息转换器功能。

Spring AMQP消息转换器

在上面自定义的MessageHandle函数中,add方法的参数为byte[],但是有时候我们往RabbitMQ中发送的是一个JSON对象,我们希望在处理消息的时候它已经自动帮我们转为JAVA对象;又或者我们往RabbitMQ中发送的是一张图片或其他格式的文件,我们希望在处理消息的时候它已经自动帮我们转成文件格式,我们可以手动设置MessageConverter来实现如上需求,如果未设置MessageConverter则使用Spring AMQP默认提供的SimpleMessageConverter

以下例子使用MessageConverter实现了当生产者往RabbitMQ发送不同类型的数据的时候,使用MessageHandle不同的方法进行处理,需要注意的是当生产者在发送JSON数据的时候,需要指定这个JSON是哪个对象,用于Spring AMQP转换,规则如下:

  1. 当发送普通对象的JSON数据时,需要在消息的header中增加一个__TypeId__的属性告知消费者是哪个对象
  2. 当发送List集合对象的JSON数据时,需要在消息的header中将__TypeId__指定为java.util.List,并且需要额外指定属性__ContentTypeId__用户告知消费者List集合中的对象类型
  3. 当发送Map集合对象的JSON数据时,需要在消息的header中将__TypeId__指定为java.util.Map,并且需要额外指定属性__KeyTypeId__用于告知客户端Mapkey的类型,__ContentTypeId__用于告知客户端MapValue的类型

创建订单实体类

使用lombok管理getter和setter方法。

  1. @Data
  2. public class Order {
  3. private String id;
  4. private float price;
  5. public Order() {
  6. }
  7. public Order(String id, float price) {
  8. this.id = id;
  9. this.price = price;
  10. }
  11. }

自定义转换器

自定义文件转换器。

  1. public class FileMessageConverter implements MessageConverter {
  2. @Override
  3. public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
  4. return null;
  5. }
  6. @Override
  7. public Object fromMessage(Message message) throws MessageConversionException {
  8. String exName = (String) message.getMessageProperties().getHeaders().get("_extName");
  9. byte[] data = message.getBody();
  10. String fileName = UUID.randomUUID().toString();
  11. String filePath = System.getProperty("java.io.temdir")+ fileName +"." + exName;
  12. File temFile = new File(filePath);
  13. try {
  14. FileCopyUtils.copy(data, temFile);
  15. } catch (IOException e) {
  16. throw new MessageConversionException("file message convert fails", e);
  17. }
  18. return temFile;
  19. }
  20. }

自定义字符串转换器。

  1. public class StringMessageConverter implements MessageConverter {
  2. @Override
  3. public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
  4. return null;
  5. }
  6. @Override
  7. public Object fromMessage(Message message) throws MessageConversionException {
  8. try {
  9. return new String(message.getBody(), "UTF-8");
  10. } catch (UnsupportedEncodingException e) {
  11. throw new MessageConversionException("string message convert fails", e);
  12. }
  13. }
  14. }

自定义消息处理类。

  1. public class MessageHandle {
  2. public void add(byte[] data){
  3. System.out.println("use byte[] to handle");
  4. System.out.println(data.toString());
  5. }
  6. public void add(String msg){
  7. System.out.println("use String to handle");
  8. System.out.println(msg);
  9. }
  10. public void add(File file){
  11. System.out.println("use File to handle");
  12. System.out.println(file.length() +" "+ file.getName()+ " " + file.getAbsolutePath());
  13. }
  14. public void add(Order order){
  15. System.out.println("use Order to handle");
  16. System.out.println(order.getId()+" "+ order.getPrice());
  17. }
  18. public void add(List<Order> list){
  19. System.out.println("use list to handle");
  20. System.out.println(list.size());
  21. for (Order o : list){
  22. System.out.println(o.getId()+" "+o.getPrice());
  23. }
  24. }
  25. public void add(Map<String, Order> map){
  26. System.out.println("use Map to handle");
  27. for(Map.Entry<String, Order> entry : map.entrySet()){
  28. System.out.println(entry.getKey() +" "+ entry.getValue());
  29. }
  30. }
  31. }

创建生产者配置类

  1. @Configuration
  2. public class ProducerConfig {
  3. private static final String IPADDRESS = "192.168.1.107";
  4. private static final int PORT = 5672;
  5. private static final String VIRTUALHOST = "/";
  6. private static final String USERNAME = "root";
  7. private static final String PASSWORD = "root";
  8. @Bean
  9. public CachingConnectionFactory connectionFactory(){
  10. ConnectionFactory factory = new ConnectionFactory();
  11. /**
  12. * 配置连接信息
  13. */
  14. factory.setHost(IPADDRESS);
  15. factory.setPort(PORT);
  16. factory.setVirtualHost(VIRTUALHOST);
  17. factory.setUsername(USERNAME);
  18. factory.setPassword(PASSWORD);
  19. /**
  20. * 网络异常自动回复,每隔10秒重连一次
  21. */
  22. factory.setAutomaticRecoveryEnabled(true);
  23. factory.setNetworkRecoveryInterval(1000);
  24. /**
  25. * factory的属性
  26. */
  27. Map<String, Object> propertiesMap = new HashMap<>();
  28. propertiesMap.put("prindipal", "adamhand");
  29. propertiesMap.put("description", "Order System");
  30. propertiesMap.put("email", "adaihand@163.com");
  31. factory.setClientProperties(propertiesMap);
  32. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(factory);
  33. return cachingConnectionFactory;
  34. }
  35. /**
  36. * 用于声明交换机 队列 绑定等
  37. * @param factory
  38. * @return
  39. */
  40. @Bean
  41. public RabbitAdmin rabbitAdmin(CachingConnectionFactory factory){
  42. return new RabbitAdmin(factory);
  43. }
  44. /**
  45. * 用于RabbitMQ消息的发送和接收
  46. * @param factory
  47. * @return
  48. */
  49. @Bean
  50. public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory){
  51. return new RabbitTemplate(factory);
  52. }
  53. }

创建生产者启动方法

  1. @ComponentScan(basePackages = "com.rabbitmq.SpringAMQP.MessageConverter.Producer")
  2. public class ProducerBootstrap {
  3. private static final String EXCHANGE_NAME = "adamhand.order";
  4. private static final String ROUTING_KEY = "add";
  5. private static final String CONTENT_TYPE_TEXT = "text/plain";
  6. private static final String CONTENT_TYPE_JSON = "application/json";
  7. private static final String CONTENT_TYPE_IMAGE = "image/jpg";
  8. public static void main(String[] args) throws Exception {
  9. AnnotationConfigApplicationContext context =
  10. new AnnotationConfigApplicationContext(ProducerBootstrap.class);
  11. RabbitAdmin rabbitAdmin = context.getBean(RabbitAdmin.class);
  12. RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
  13. /**
  14. * 声明交换机
  15. */
  16. rabbitAdmin.declareExchange(
  17. new DirectExchange(EXCHANGE_NAME, true, false,
  18. new HashMap<String, Object>()));
  19. /**
  20. * 发送String
  21. */
  22. sendString(rabbitTemplate);
  23. /**
  24. * 发送单个对象JSON
  25. */
  26. sendSingle(rabbitTemplate);
  27. /**
  28. * 发送List集合JSON
  29. */
  30. sendList(rabbitTemplate);
  31. /**
  32. * 发送Map集合JSON
  33. */
  34. sendMap(rabbitTemplate);
  35. /**
  36. * 发送图片
  37. */
  38. sendImage(rabbitTemplate);
  39. }
  40. public static void sendString(RabbitTemplate rabbitTemplate){
  41. MessageProperties msgProperties = new MessageProperties();
  42. msgProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  43. msgProperties.setContentType(CONTENT_TYPE_TEXT);
  44. Message msg = new Message("order information".getBytes(), msgProperties);
  45. rabbitTemplate.send(EXCHANGE_NAME, ROUTING_KEY, msg);
  46. }
  47. public static void sendSingle(RabbitTemplate rabbitTemplate) throws JsonProcessingException {
  48. Order order = new Order("OD00000000001", (float) 5555.5555);
  49. ObjectMapper mapper = new ObjectMapper();
  50. MessageProperties msgProperties = new MessageProperties();
  51. msgProperties.getHeaders().put("_TypeId_", "order");
  52. msgProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  53. msgProperties.setContentType(CONTENT_TYPE_JSON);
  54. Message msg = new Message(mapper.writeValueAsString(order).getBytes(), msgProperties);
  55. rabbitTemplate.send(EXCHANGE_NAME, ROUTING_KEY, msg);
  56. }
  57. public static void sendList(RabbitTemplate rabbitTemplate) throws JsonProcessingException {
  58. Order order1 = new Order("OD00000000001", (float) 5555.5555);
  59. Order order2 = new Order("OD00000000002", (float) 3333.3333);
  60. List<Order> orderList = Arrays.asList(order1, order2);
  61. ObjectMapper mapper = new ObjectMapper();
  62. MessageProperties msgProperties = new MessageProperties();
  63. msgProperties.getHeaders().put("__TypeId__", "java.util.List");
  64. msgProperties.getHeaders().put("__ContentTypeId__", "order");
  65. msgProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  66. msgProperties.setContentType(CONTENT_TYPE_JSON);
  67. Message msg = new Message(mapper.writeValueAsString(orderList).getBytes(), msgProperties);
  68. rabbitTemplate.send(EXCHANGE_NAME, ROUTING_KEY, msg);
  69. }
  70. public static void sendMap(RabbitTemplate rabbitTemplate) throws Exception {
  71. Order order1 = new Order("OD0000001", (float)1111.1111);
  72. Order order2 = new Order("OD0000002", (float)2222.2222);
  73. Map<String, Order> orderMap = new HashMap<>();
  74. orderMap.put(order1.getId(), order1);
  75. orderMap.put(order2.getId(), order2);
  76. ObjectMapper objectMapper = new ObjectMapper();
  77. // 声明消息 (消息体, 消息属性)
  78. MessageProperties messageProperties = new MessageProperties();
  79. messageProperties.getHeaders().put("__TypeId__", "java.util.Map");
  80. messageProperties.getHeaders().put("__KeyTypeId__", "java.lang.String");
  81. messageProperties.getHeaders().put("__ContentTypeId__", "order");
  82. messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  83. messageProperties.setContentType(CONTENT_TYPE_JSON);
  84. Message message = new Message(objectMapper.writeValueAsString(orderMap).getBytes(), messageProperties);
  85. rabbitTemplate.send(EXCHANGE_NAME, ROUTING_KEY, message);
  86. }
  87. public static void sendImage(RabbitTemplate rabbitTemplate) throws Exception {
  88. File file = new File(System.getProperty("user.dir")+"\\naruto.jpg"); //程序运行之前该图片要存在
  89. FileInputStream fileInputStream = new FileInputStream(file);
  90. ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(1024);
  91. int length;
  92. byte[] b = new byte[1024];
  93. while ((length = fileInputStream.read(b)) != -1) {
  94. byteArrayOutputStream.write(b, 0, length);
  95. }
  96. fileInputStream.close();
  97. byteArrayOutputStream.close();
  98. byte[] buffer = byteArrayOutputStream.toByteArray();
  99. // 声明消息 (消息体, 消息属性)
  100. MessageProperties messageProperties = new MessageProperties();
  101. messageProperties.getHeaders().put("_extName", "jpg");
  102. messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  103. messageProperties.setContentType(CONTENT_TYPE_IMAGE);
  104. Message message = new Message(buffer, messageProperties);
  105. rabbitTemplate.send(EXCHANGE_NAME, ROUTING_KEY, message);
  106. }
  107. }

创建消费者配置类

  1. @Configuration
  2. public class ConsumerConfig {
  3. private static final String IPADDRESS = "192.168.1.107";
  4. private static final int PORT = 5672;
  5. private static final String VIRTUALHOST = "/";
  6. private static final String USERNAME = "root";
  7. private static final String PASSWORD = "root";
  8. private static final String QUEUE_NAMES = "adamhand.order.add";
  9. private static final String EXCHANGE_NAMES = "adamhand.order";
  10. private static final String ROUTING_KEY = "add";
  11. private static final int CONSUMER_NUMBER = 5;
  12. private static final int MAX_CONSUMER_NUMBER = 10;
  13. @Bean
  14. public CachingConnectionFactory connectionFactory(){
  15. ConnectionFactory factory = new ConnectionFactory();
  16. factory.setHost(IPADDRESS);
  17. factory.setPort(PORT);
  18. factory.setVirtualHost(VIRTUALHOST);
  19. factory.setUsername(USERNAME);
  20. factory.setPassword(PASSWORD);
  21. factory.setAutomaticRecoveryEnabled(true);
  22. factory.setNetworkRecoveryInterval(1000);
  23. Map<String, Object> propertiesMap = new HashMap<>();
  24. propertiesMap.put("prindipal", "adamhand");
  25. propertiesMap.put("description", "Order System");
  26. propertiesMap.put("email", "adaihand@163.com");
  27. factory.setClientProperties(propertiesMap);
  28. CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(factory);
  29. return cachingConnectionFactory;
  30. }
  31. @Bean
  32. public RabbitAdmin rabbitAdmin(CachingConnectionFactory factory){
  33. return new RabbitAdmin(factory);
  34. }
  35. /**
  36. * 自动声明队列
  37. * @return
  38. */
  39. @Bean
  40. public Exchange exchange(){
  41. return new DirectExchange(EXCHANGE_NAMES, true, false,
  42. new HashMap<String, Object>());
  43. }
  44. @Bean
  45. public Binding binding(){
  46. return new Binding(QUEUE_NAMES, Binding.DestinationType.QUEUE,
  47. EXCHANGE_NAMES, ROUTING_KEY, new HashMap<String, Object>());
  48. }
  49. /**
  50. * 监听容器 为消息入队提供异步处理
  51. * @param factory
  52. * @return
  53. */
  54. @Bean
  55. public MessageListenerContainer messageListenerContainer(CachingConnectionFactory factory){
  56. SimpleMessageListenerContainer msgLIstenerContainer = new SimpleMessageListenerContainer();
  57. msgLIstenerContainer.setConnectionFactory(factory);
  58. msgLIstenerContainer.setQueueNames(QUEUE_NAMES);
  59. /**
  60. * 设置消费者线程数和最大线程数
  61. */
  62. msgLIstenerContainer.setConcurrentConsumers(CONSUMER_NUMBER);
  63. msgLIstenerContainer.setMaxConcurrentConsumers(MAX_CONSUMER_NUMBER);
  64. /**
  65. * 设置消费者属性
  66. */
  67. Map<String, Object> argsMap = new HashMap<>();
  68. msgLIstenerContainer.setConsumerArguments(argsMap);
  69. /**
  70. * 设置消费者标签
  71. */
  72. msgLIstenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
  73. @Override
  74. public String createConsumerTag(String s) {
  75. return "Consumers of Order System";
  76. }
  77. });
  78. /**
  79. * 选择自动设置消费时机
  80. */
  81. msgLIstenerContainer.setAutoStartup(true);
  82. //注意这个MessageHandle()一定要写对
  83. MessageListenerAdapter msgListenerAdapter = new MessageListenerAdapter(new MessageHandle());
  84. msgListenerAdapter.setDefaultListenerMethod("handleMessage");
  85. Map<String, String> map = new HashMap<>();
  86. map.put(QUEUE_NAMES, ROUTING_KEY);
  87. msgListenerAdapter.setQueueOrTagToMethodName(map);
  88. msgLIstenerContainer.setMessageListener(msgListenerAdapter);
  89. /**
  90. * 设置消息转换器
  91. */
  92. ContentTypeDelegatingMessageConverter converter =
  93. new ContentTypeDelegatingMessageConverter();
  94. StringMessageConverter stringMessageConverter = new StringMessageConverter();
  95. FileMessageConverter fileMessageConverter = new FileMessageConverter();
  96. Jackson2JsonMessageConverter jackson2JsonMessageConverter = new
  97. Jackson2JsonMessageConverter();
  98. Map<String, Class<?>> idClassMapping = new HashMap<>();
  99. idClassMapping.put("order", Order.class);
  100. DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
  101. javaTypeMapper.setIdClassMapping(idClassMapping);
  102. jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
  103. /**
  104. * 设置text/html text/plain 使用StringMessageConverter
  105. */
  106. converter.addDelegate("text/html", stringMessageConverter);
  107. converter.addDelegate("text/plain", stringMessageConverter);
  108. /**
  109. * 设置application/json 使用Jackson2JsonMessageConverter
  110. */
  111. converter.addDelegate("application/json", jackson2JsonMessageConverter);
  112. /**
  113. * 设置image/jpg image/png 使用FileMessageConverter
  114. */
  115. converter.addDelegate("image/jpg", fileMessageConverter);
  116. converter.addDelegate("image/png", fileMessageConverter);
  117. msgListenerAdapter.setMessageConverter(converter);
  118. return msgLIstenerContainer;
  119. }
  120. }

创建消费者启动类

  1. @ComponentScan(basePackages = "com.rabbitmq.SpringAMQP.MessageConverter.Consumer")
  2. public class ConsumerBootstrap {
  3. public static void main(String[] args) {
  4. new AnnotationConfigApplicationContext(ConsumerBootstrap.class);
  5. }
  6. }

结果

启动消费者启动类和生产者启动类,结果如下:

  1. use File to handle
  2. 45550 nulldc5f8104-e9cd-48f7-a8a1-2082556565ab.jpg D:\Prom\rabbitmq-master\nulldc5f8104-e9cd-48f7-a8a1-2082556565ab.jpg
  3. use Map to handle
  4. id OD00000000001
  5. price 5555.5557
  6. use list to handle
  7. 2
  8. use Map to handle
  9. OD00000000001 5555.5557
  10. OD00000000002 3333.3333
  11. OD0000001 Order(id=OD0000001, price=1111.1111)
  12. OD0000002 Order(id=OD0000002, price=2222.2222)

参考

(三) RabbitMQ实战教程(面向Java开发人员)之Spring整合RabbitMQ
《分布式消息中间件实践》

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注