RabbitMQ- 从基础到实战(6)— 与 Spring 集成

本贴最后更新于 2329 天前,其中的信息可能已经沧海桑田

1.简介

原计划这章应该讲 RabbitMQ 的 RPC 调用的,后来想想,这个场景应该用的不多,现在比较火的微服务,要么用 dubbo,要么用 spring cloud,用 RabbitMQ 做 RPC 比较少见,所以就先跳过了,有需要再补充。

其实网上 RabbitMQ 和 Spring 集成的教程有不少,我也大致看了看,大部分都是言简意赅,代码配置一贴,然后就可以用了,而我希望我的教程能多和大家一起探讨一些“为什么”。

2.Spring AMQP

Spring AMQP 中有两个单词,Spring 都知道,那 AMQP 是什么?

中文意思是,高级消息队列协议,然后用蹩脚的英语猜一下,advance message queue protocol,差不多了,advance 变成形容词高级的-advanced,queue 变成 queuing(排队论,学术一点),所以,AMQP 就是 Advanced Message Queuing Protocal。

AMQP 0-9-1 是 RabbitMQ 支持的协议之一,0-9-1 是个版本号,正常情况下推荐使用,它所表现出的形式,就是前面几张介绍的内容

RabbitMQ 还支持其他版本的协议,具体可以参考这里

Spring AMQP 的定义如下

The Spring AMQP project applies core Spring concepts to the development of AMQP-based messaging solutions. It provides a "template" as a high-level abstraction for sending and receiving messages. It also provides support for Message-driven POJOs with a "listener container". These libraries facilitate management of AMQP resources while promoting the use of dependency injection and declarative configuration

意外的发现谷歌翻译的很通顺

Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。 它提供了一个“模板”作为发送和接收消息的高级抽象。 它还通过“侦听器容器”为消息驱动的 POJO 提供支持。 这些库促进 AMQP 资源的管理,同时促进使用依赖注入和声明性配置

这里有很多资料

3.main 方法集成

先不用 Spring,只在 Main 方法中集成 Spring AMQP,方便分析。

在 pom 文件中加入如下配置,不需要再引入 RabbitMQ 的包,Spring AMQP 已经包含了 4.0.x 的客户端

Spring AMQP now uses the new 4.0.x version of the amqp-client library provided by the RabbitMQ team. This client has auto recovery configured by default

org.springframework.amqpgroupId> spring-rabbitartifactId> 1.7.1.RELEASEversion> dependency> ch.qos.logbackgroupId> logback-classicartifactId> 1.2.1version> dependency>

新建一个类,

1 package com.liyang.ticktock.rabbitmq; 2
3 import org.springframework.amqp.core.BindingBuilder; 4 import org.springframework.amqp.core.Queue; 5 import org.springframework.amqp.core.TopicExchange; 6 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 7 import org.springframework.amqp.rabbit.connection.ConnectionFactory; 8 import org.springframework.amqp.rabbit.core.RabbitAdmin; 9 import org.springframework.amqp.rabbit.core.RabbitTemplate; 10 import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 11 import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; 12
13
14 public class App 15 { 16 public static void main( String[] args ) throws InterruptedException 17 { 18 //获取一个连接工厂,用户默认是 guest/guest(只能使用部署在本机的 RabbitMQ) 19 //是 Spring 实现的对 com.rabbitmq.client.Connection 的包装
20 ConnectionFactory cf = new CachingConnectionFactory("localhost"); 21
22 //对 AMQP 0-9-1 的实现
23 RabbitAdmin admin = new RabbitAdmin(cf); 24 //声明一个队列
25 Queue queue = new Queue("myQueue"); 26 admin.declareQueue(queue); 27 //声明一个 exchange
28 TopicExchange exchange = new TopicExchange("myExchange"); 29 admin.declareExchange(exchange); 30 //绑定队列到 exchange,加上 routingKey foo.*
31 admin.declareBinding(BindingBuilder.bind(queue).to(exchange).with("foo.*")); 32
33 //监听容器
34 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf); 35 //监听者对象
36 Object listener = new Object() { 37 @SuppressWarnings("unused") 38 public void handleMessage(String foo) { 39 System.out.println(foo); 40 } 41 }; 42 //通过这个适配器代理 listener
43 MessageListenerAdapter adapter = new MessageListenerAdapter(listener); 44 //把适配器(listener)设置给 Container
45 container.setMessageListener(adapter); 46 //设置该容器监听的队列名,可以传多个,public void setQueueNames(String... queueName) {
47 container.setQueueNames("myQueue"); 48 //开始监听
49 container.start(); 50
51 //发送模版,设置上连接工厂
52 RabbitTemplate template = new RabbitTemplate(cf); 53 //发送消息
54 template.convertAndSend("myExchange", "foo.bar", "Hello, world!"); 55
56 Thread.sleep(1000); 57 container.stop(); 58 } 59 }

运行结果如下:

以上代码中

ConnectionFactory 包装了所有物理连接信息,然后传递给 RabbitAdmin 创建了 RabbitMQ 支持协议的连接(AMQP 0-9-1);

声明队列和交换中心后,通过 BindingBuilder 把队列的绑定关系声明到 admin 上;

创建一个消息处理类,用一个适配器(MessageListenerAdapter)包装它,并注册到监听容器中,启动监听;

最后通过连接信息创建一个 Rabbit 模板,调用发送方法。

  这里引申出一个问题,为什么要用监听容器(SimpleMessageListenerContainer),我们点开它的 outline,如下,可以看到,它是对监听这个动作的抽象,一个容器可以有多个 Consumer,并且可以控制如超时时间等配置。

  

可以看出,比起直接使用 RabbitMQ 客户端,以上代码已经简化了一部分,最明显的部分就是,不需要手动去关 Channel、Connection 了,对以上概念的介绍都穿插在前几章,这里不再赘述。

这些代码用 Spring 的方式注入将会更加简洁

4.传统 Spring 方式集成

首先,POM 文件中要把 Spring 引进来

org.springframeworkgroupId> spring-contextartifactId> 4.3.7.RELEASEversion> dependency>

然后增加 applicatioContext.xml

xml version="1.0" encoding="UTF-8"?>

<context:annotation-config/>
<context:component-scan base-package="com.liyang.ticktock.rabbitmq.listener"/>


<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" username="guest" password="guest" port="5672" />


<rabbit:admin connection-factory="connectionFactory" />


<rabbit:queue name="myQueue" />


<rabbit:topic-exchange name="myExchange">
    <rabbit:bindings>
        <rabbit:binding queue="myQueue" pattern="foo.*" />
        ****
    rabbit:bindings>
rabbit:topic-exchange>


<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="myExchange"/>


<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="myListener" method="listen" queue-names="myQueue" />
    ****
rabbit:listener-container>

beans>

这个配置文件中,我们使用了 context 的自动扫描装配,不用配 bean 了,还使用了 rabbit 的命名空间 xmlns:rabbit

这个东西可能有些童鞋不知道怎么配的,其实没什么神秘的,我们打开根路径看看里面有什么“http://www.springframework.org/schema/

和本地目录一样,就是个存东西的地方,rabbit 命名空间,在这个路径后面加上 rabbit 对应的目录就行了

再进入 rabbit 目录,发现里面是一些不同版本的 xsd 文件,所以,在 schemaLocation 中,配上使用的具体版本的 xsd 就可以了

言归正传,上面的配置文件,把大量原来在代码中实现的东西挪到了 xml 中,一目了然,方便修改。注意注释加粗的三处,现在我们可以用 Spring 实现我们原来用原生 RabbitMQ 客户端实现的所有功能了

在发送消息之前,还需要写一个类用来处理消息,并且这个类要有 linsten 方法,和配置文件中声明的 method="listen" 对应

1 package com.liyang.ticktock.rabbitmq.listener; 2
3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.stereotype.Component; 6
7 @Component
8 public class MyListener { 9 Logger logger = LoggerFactory.getLogger(MyListener.class); 10
11 public void listen(String message){ 12 logger.debug("received:"+message); 13 } 14
15 }

剩下在 java 中的代码就非常简洁了,监听在 Spring 环境启动后就自动开始了,我们只需要发消息就行,代码如下

直接一个 main 方法,省的还要捉一只汤姆猫

1 public static void main(String[] args) throws InterruptedException { 2 //启动 Spring 环境
3 AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
4 //假装是 Autowired 的
5 RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
6 //设置 routingKey
7 template.setRoutingKey("foo.bar");
8 //发送,exchange,routingKey 什么的都配好了
9 template.convertAndSend("Hello, world!"); 10
11 //关掉环境
12 Thread.sleep(1000); 13 ctx.destroy(); 14 }

运行结果如下:

5.采用 Spring Boot 集成

不是很重要,先欠着,有空补上

6.结束语

最近实在太忙,这篇博客利用工作的碎片时间,写了有一个多星期,囧。

在写这篇系列教程的同时,我也获益良多,后面会继续为大家介绍一些 RabbitMQ 实用的高级特性,请多多支持

  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 392 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...