Messaging with RabbitMQ - Spring Boot
通过 Spring Initializr 创建项目,并添加如下依赖:
- Spring for RabbitMQ
- Docker Compose Support
需要注意以下几点:
- 默认的 compose.yaml 的
ports
为5672
,需要手动修改为'5672:5672'
- rabbitmq:latest 镜像如果拉不下来可以使用 registry.cn-hangzhou.aliyuncs.com/pusher/rabbitmq:3.9
示例代码如下:
yaml
services:
rabbitmq:
image: 'registry.cn-hangzhou.aliyuncs.com/pusher/rabbitmq:3.9'
environment:
- 'RABBITMQ_DEFAULT_PASS=secret'
- 'RABBITMQ_DEFAULT_USER=myuser'
- 'TZ=Asia/Shanghai'
ports:
- '5672:5672'
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>me.liujiajia</groupId>
<artifactId>rabbitmq-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-example</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-docker-compose</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
java
package me.liujiajia.rabbitmq_example;
import java.util.concurrent.CountDownLatch;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
private final CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
java
package me.liujiajia.rabbitmq_example;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class Runner implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(RabbitmqExampleApplication.topicExchangeName, "foo.bar.baz", "Hello from RabbitMQ!");
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
java
package me.liujiajia.rabbitmq_example;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class RabbitmqExampleApplication {
static final String topicExchangeName = "spring-boot-exchange";
static final String queueName = "spring-boot";
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
public static void main(String[] args) {
SpringApplication.run(RabbitmqExampleApplication.class, args).close();
}
}
properties
spring.application.name=rabbitmq-example
spring.rabbitmq.password=secret
spring.rabbitmq.username=myuser
txt
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.4.4)
2025-04-01T13:57:29.575+08:00 INFO 31592 --- [rabbitmq-example] [ main] m.l.r.RabbitmqExampleApplication : Starting RabbitmqExampleApplication using Java 22.0.2 with PID 31592 (D:\projects\gitee\ryukaka\example\rabbitmq-example\target\classes started by 佳佳 in D:\projects\gitee\ryukaka\example\rabbitmq-example)
2025-04-01T13:57:29.580+08:00 INFO 31592 --- [rabbitmq-example] [ main] m.l.r.RabbitmqExampleApplication : No active profile set, falling back to 1 default profile: "default"
2025-04-01T13:57:29.706+08:00 INFO 31592 --- [rabbitmq-example] [ main] .s.b.d.c.l.DockerComposeLifecycleManager : Using Docker Compose file D:\projects\gitee\ryukaka\example\rabbitmq-example\compose.yaml
2025-04-01T13:57:33.456+08:00 INFO 31592 --- [rabbitmq-example] [utReader-stderr] o.s.boot.docker.compose.core.DockerCli : Container rabbitmq-example-rabbitmq-1 Recreate
2025-04-01T13:57:33.615+08:00 INFO 31592 --- [rabbitmq-example] [utReader-stderr] o.s.boot.docker.compose.core.DockerCli : Container rabbitmq-example-rabbitmq-1 Recreated
2025-04-01T13:57:33.620+08:00 INFO 31592 --- [rabbitmq-example] [utReader-stderr] o.s.boot.docker.compose.core.DockerCli : Container rabbitmq-example-rabbitmq-1 Starting
2025-04-01T13:57:34.072+08:00 INFO 31592 --- [rabbitmq-example] [utReader-stderr] o.s.boot.docker.compose.core.DockerCli : Container rabbitmq-example-rabbitmq-1 Started
2025-04-01T13:57:34.072+08:00 INFO 31592 --- [rabbitmq-example] [utReader-stderr] o.s.boot.docker.compose.core.DockerCli : Container rabbitmq-example-rabbitmq-1 Waiting
2025-04-01T13:57:34.594+08:00 INFO 31592 --- [rabbitmq-example] [utReader-stderr] o.s.boot.docker.compose.core.DockerCli : Container rabbitmq-example-rabbitmq-1 Healthy
2025-04-01T13:57:48.997+08:00 INFO 31592 --- [rabbitmq-example] [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2025-04-01T13:57:49.102+08:00 INFO 31592 --- [rabbitmq-example] [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#404eca05:0/SimpleConnection@68e02f33 [delegate=amqp://myuser@127.0.0.1:5672/, localPort=50853]
2025-04-01T13:57:49.121+08:00 INFO 31592 --- [rabbitmq-example] [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (spring-boot) durable:false, auto-delete:false, exclusive:false. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost.
2025-04-01T13:57:49.276+08:00 INFO 31592 --- [rabbitmq-example] [ main] m.l.r.RabbitmqExampleApplication : Started RabbitmqExampleApplication in 20.658 seconds (process running for 21.607)
Sending message...
Received <Hello from RabbitMQ!>
2025-04-01T13:57:49.344+08:00 INFO 31592 --- [rabbitmq-example] [ container-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2025-04-01T13:57:49.347+08:00 INFO 31592 --- [rabbitmq-example] [ container-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
已与地址为 ''127.0.0.1:50803',传输: '套接字'' 的目标虚拟机断开连接
进程已结束,退出代码为 0