Messaging with Redis - Spring Boot
通过 Spring Initializr 创建项目,并添加如下依赖:
- Spring Data Redis
- Docker Compose Support
需要注意以下几点:
- 默认的 compose.yaml 的
ports
为6379
,需要手动修改为'6379:6379'
- redis:latest 镜像如果拉不下来可以使用 registry.cn-hangzhou.aliyuncs.com/pusher/redis:7.4.2
示例代码
yaml
services:
redis:
image: 'registry.cn-hangzhou.aliyuncs.com/pusher/redis:7.4.2'
environment:
- 'TZ=Asia/Shanghai'
ports:
- '6379:6379'
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>me.liujiajia</groupId>
<artifactId>redis-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>redis-example</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-docker-compose</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
java
package me.liujiajia.redis_example;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Receiver {
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private final AtomicInteger counter = new AtomicInteger();
public void receiveMessage(String message) {
LOGGER.info("Received <" + message + ">");
counter.incrementAndGet();
}
public int getCount() {
return counter.get();
}
}
java
package me.liujiajia.redis_example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@SpringBootApplication
public class RedisExampleApplication {
private static final Logger LOGGER = LoggerFactory.getLogger(RedisExampleApplication.class);
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
Receiver receiver() {
return new Receiver();
}
@Bean
StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
return new StringRedisTemplate(connectionFactory);
}
public static void main(String[] args) throws InterruptedException {
ApplicationContext ctx = SpringApplication.run(RedisExampleApplication.class, args);
StringRedisTemplate template = ctx.getBean(StringRedisTemplate.class);
Receiver receiver = ctx.getBean(Receiver.class);
while (receiver.getCount() == 0) {
LOGGER.info("Sending message...");
template.convertAndSend("chat", "Hello from Redis!");
Thread.sleep(500L);
}
System.exit(0);
}
}
properties
spring.application.name=redis-example
txt
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.4.4)
2025-04-01T14:35:14.401+08:00 INFO 29672 --- [redis-example] [ main] m.l.r.RedisExampleApplication : Starting RedisExampleApplication using Java 22.0.2 with PID 29672 (D:\projects\gitee\ryukaka\example\redis-example\target\classes started by 佳佳 in D:\projects\gitee\ryukaka\example\redis-example)
2025-04-01T14:35:14.405+08:00 INFO 29672 --- [redis-example] [ main] m.l.r.RedisExampleApplication : No active profile set, falling back to 1 default profile: "default"
2025-04-01T14:35:14.527+08:00 INFO 29672 --- [redis-example] [ main] .s.b.d.c.l.DockerComposeLifecycleManager : Using Docker Compose file D:\projects\gitee\ryukaka\example\redis-example\compose.yaml
2025-04-01T14:35:19.922+08:00 INFO 29672 --- [redis-example] [utReader-stderr] o.s.boot.docker.compose.core.DockerCli : Container redis-example-redis-1 Created
2025-04-01T14:35:19.929+08:00 INFO 29672 --- [redis-example] [utReader-stderr] o.s.boot.docker.compose.core.DockerCli : Container redis-example-redis-1 Starting
2025-04-01T14:35:20.707+08:00 INFO 29672 --- [redis-example] [utReader-stderr] o.s.boot.docker.compose.core.DockerCli : Container redis-example-redis-1 Started
2025-04-01T14:35:20.707+08:00 INFO 29672 --- [redis-example] [utReader-stderr] o.s.boot.docker.compose.core.DockerCli : Container redis-example-redis-1 Waiting
2025-04-01T14:35:21.217+08:00 INFO 29672 --- [redis-example] [utReader-stderr] o.s.boot.docker.compose.core.DockerCli : Container redis-example-redis-1 Healthy
2025-04-01T14:35:23.451+08:00 INFO 29672 --- [redis-example] [ main] .s.d.r.c.RepositoryConfigurationDelegate : Multiple Spring Data modules found, entering strict repository configuration mode
2025-04-01T14:35:23.454+08:00 INFO 29672 --- [redis-example] [ main] .s.d.r.c.RepositoryConfigurationDelegate : Bootstrapping Spring Data Redis repositories in DEFAULT mode.
2025-04-01T14:35:23.506+08:00 INFO 29672 --- [redis-example] [ main] .s.d.r.c.RepositoryConfigurationDelegate : Finished Spring Data repository scanning in 20 ms. Found 0 Redis repository interfaces.
2025-04-01T14:35:25.752+08:00 INFO 29672 --- [redis-example] [ main] m.l.r.RedisExampleApplication : Started RedisExampleApplication in 12.299 seconds (process running for 13.239)
2025-04-01T14:35:25.758+08:00 INFO 29672 --- [redis-example] [ main] m.l.r.RedisExampleApplication : Sending message...
2025-04-01T14:35:25.785+08:00 INFO 29672 --- [redis-example] [ container-1] me.liujiajia.redis_example.Receiver : Received <Hello from Redis!>
已与地址为 ''127.0.0.1:54492',传输: '套接字'' 的目标虚拟机断开连接
进程已结束,退出代码为 0
Redis 的 PUBLISH & SUBSCRIBE 命令
Redis 的 PUBLISH
命令是 Redis 发布/订阅(Pub/Sub)功能的一部分,用于实现消息的发布和订阅机制。它允许客户端将消息发送到指定的频道(channel),而订阅了该频道的其他客户端可以接收到这些消息。
PUBLISH 命令的基本语法
bash
PUBLISH channel message
channel
:消息发布的频道名称,是一个字符串。message
:要发布的消息内容,也是一个字符串。
工作原理
发布者(Publisher):
- 使用
PUBLISH
命令向指定的频道发送消息。 - 消息不会被持久化,只会实时传递给当前订阅该频道的客户端。
- 使用
订阅者(Subscriber):
- 使用
SUBSCRIBE
或PSUBSCRIBE
命令订阅一个或多个频道。 - 当有消息发布到订阅的频道时,订阅者会立即收到消息。
- 使用
消息传递模型:
- Redis 的 Pub/Sub 是一种基于消息广播的模型。
- 消息的传递是“即发即弃”(fire-and-forget)的,Redis 不会保存未被接收的消息。
示例
订阅频道
在一个客户端中,使用
SUBSCRIBE
命令订阅频道news
:bashSUBSCRIBE news
此时,该客户端会进入订阅模式,等待接收来自
news
频道的消息。发布消息
在另一个客户端中,使用
PUBLISH
命令向news
频道发布消息:bashPUBLISH news "Hello, Redis Pub/Sub!"
接收消息
订阅了
news
频道的客户端会立即收到消息:bash1) "message" 2) "news" 3) "Hello, Redis Pub/Sub!"
返回值
PUBLISH
命令返回一个整数,表示接收到该消息的订阅者数量。例如:
bash
PUBLISH news "Hello, Redis!"
(integer) 1
这表明有一个客户端订阅了 news
频道,并成功接收到了消息。
特点与限制
实时性:
- 消息是实时传递的,适合需要快速传播信息的场景。
无持久化:
- 如果没有订阅者在线,消息会被丢弃,不会存储在 Redis 中。
解耦:
- 发布者和订阅者之间完全解耦,双方不需要知道对方的存在。
扩展性:
- 支持多频道订阅和模式匹配订阅(通过
PSUBSCRIBE
实现)。
- 支持多频道订阅和模式匹配订阅(通过
不适合任务队列:
- 因为消息无法持久化,也不支持确认机制,所以不适合用作任务队列。如果需要持久化消息队列,可以考虑 Redis Streams。
应用场景
实时通知:
- 如聊天应用、实时更新的新闻推送等。
事件驱动架构:
- 在微服务架构中,不同服务之间可以通过 Redis Pub/Sub 进行事件通信。
日志收集:
- 将日志信息发布到特定频道,供监控系统实时消费。
广播消息:
- 向多个客户端广播相同的消息。
注意事项
- Redis 的 Pub/Sub 是单向的,订阅者只能被动接收消息,无法向发布者发送响应。
- 如果需要更复杂的消息队列功能(如消息持久化、确认机制等),可以结合 Redis Streams 或其他消息队列系统(如 Kafka、RabbitMQ)。
总之,PUBLISH
是 Redis 提供的一个轻量级、高效的实时消息传递工具,适用于简单的发布/订阅场景。