RabbitMQ(一):Hello World
前言
先决条件
✔️ 正确安装 RabbitMQ 并将其运行在 localhost:5672 上
✔️ 已经了解了 RabbitMQ 中的一些基础概念
在本文中,我们将通过在 Spring Boot 应用中整合 RabbitMQ,并实现一个简单的发送、接收消息的例子来对 RabbitMQ 有一个直观的感受和理解。
Spring Boot 整合
环境:
- RabbitMQ:3.7.4
- Spring Boot:2.0.1.RELEASE
因为有 Starter POMs,在 Spring Boot 中整合 RabbitMQ 是一件非常容易的事,其中的 AMQP 模块就可以很好的支持 RabbitMQ。
我们可以使用 Spring Intializr 或 https://start.spring.io/ 创建一个 Spring Boot 工程,并勾选 RabbitMQ。
或者手动在 pom.xml 文件中加入
在 application.yml 中配置关于 RabbitMQ 的连接和用户信息,如果没有改 RabbitMQ 的默认配置的话,这里零配置即可启动。这里我们还定义了一些额外的配置备用。
生产者
Spring AMQP 让我们用少量的代码就能轻松实现消息的发送和接收。通过注入 AmqpTemplate
接口的实例来实现消息的发送,AmqpTemplate
接口定义了一套针对 AMQP 协议的基础操作。在 Spring Boot 中会根据配置来注入其具体实现 (AmqpTemplate
的默认实现就是 RabbitTemplate
)。
public class Tut1Sender { private AmqpTemplate template; private Queue queue; /** * 用定时任务来模拟生产者定时发送消息 */ 1000, initialDelay = 500) (fixedDelay = public void send() { String message = "Hello World!" + new Date(); template.convertAndSend(queue.getName(), message); System.out.println(" [x] Sent '" + message + "'"); } }
在该生产者中,我们会产生一个字符串,并发送到名为”hello-world” 的队列中。
消费者
创建消费者 Receiver
。通过 @RabbitListener
注解定义该类对”hello-world” 队列的监听,并用 @RabbitHandler
注解来指定对消息的处理方法。所以,该消费者实现了对”hello-world” 队列的消费,消费操作为输出消息的字符串内容。
"hello-world") (queues = public class Tut1Receiver { public void receive(String in) { System.out.println(" [x] Received '" + in + "'"); } }
配置类
创建一个新的 JavaConfig 文件
"tut1", "hello-world"}) ({ public class Tut1Config { public Queue queue() { return new Queue("hello-world"); } "receiver") ( public Tut1Receiver receiver() { return new Tut1Receiver(); } "sender") ( public Tut1Sender sender() { return new Tut1Sender(); } }
在上面的 JavaConfig 中,我们使用 @Configuration
让 Spring 知道这是一个 Java 配置,并定义了生产者、消费者和一个名为”hello-world” 的队列。并且,我们使用 Spring Profiles 来控制它运行哪个示例,以及它是生产者还是消费者,这样我们就可以简单的通过启动参数传递我们的配置文件来正确的启动应用了。
至于具体的生产者(Tut1Sender)和消费者(Tut1Receiver),我们这里仅先定义出来,稍后再具体实现。
应用主类
再小小的改造一下生成的 RabbitmqTutorialApplication.java
public class RabbitmqTutorialApplication { public static void main(String[] args) { new SpringApplicationBuilder() .sources(RabbitmqTutorialApplication.class) // 设置成非 web 环境 .web(WebApplicationType.NONE) .run(args); } "usage_message") ( public CommandLineRunner usage() { return arg0 -> { System.out.println("This app uses Spring Profiles to control its behavior.\n"); System.out.println("Sample usage: java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=hello-world,sender"); }; } "!usage_message") ( public CommandLineRunner tutorial() { return new RabbitTutorialRunner(); } }
这里我将环境设置为了 WebApplicationType.NONE
,即非 WEB 环境,因为默认的话 Netty 会监听 8080 端口,同时运行的话就会接口冲突导致启动失败(当然,也可以直接在启动时用参数绑定不同的端口以避免冲突)。
其中的 RabbitTutorialRunner 如下
public class RabbitTutorialRunner implements CommandLineRunner { "${tutorial.client.duration:0}") ( private int duration; private ConfigurableApplicationContext ctx; public void run(String... args) throws Exception { System.out.println("Ready ... running for " + duration + "ms"); Thread.sleep(duration); ctx.close(); } }
这个 Runner 主要是为了阻止主线程退出。除了用 Thread.sleep(millisecond)
,也可以用 CountDownLatch
来达到相同的目的。
运行
编译
mvn clean package -Dmaven.test.skip=true
运行
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut1,sender
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut1,receiver
输出
// Sender Ready ... running for 10000ms [x] Sent 'Hello World!Thu Apr 12 16:56:01 CST 2018' [x] Sent 'Hello World!Thu Apr 12 16:56:03 CST 2018' [x] Sent 'Hello World!Thu Apr 12 16:56:04 CST 2018' ... // Receiver Ready ... running for 10000ms [x] Received 'Hello World!Thu Apr 12 16:56:01 CST 2018' [x] Received 'Hello World!Thu Apr 12 16:56:03 CST 2018' [x] Received 'Hello World!Thu Apr 12 16:56:04 CST 2018' ...