Skip to content

Spring Boot WebSocket 示例

这是一个简单的 Spring Boot WebSocket 示例,展示了如何在 Spring Boot 中创建一个基本的 WebSocket 服务器。

示例代码

添加 websocket 依赖

除了 spring-boot-starter-web 之外,还需要添加 spring-boot-starter-websocket 依赖。

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

创建 WebSocket 处理器类

这里负责具体的接收和发送消息处理。

java
package me.liujiajia.example.websocket;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

@Component
public class MyWebSocketHandler extends TextWebSocketHandler {

    private static final Logger log = LoggerFactory.getLogger(MyWebSocketHandler.class);

    private final CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sessions.add(session);
        log.info("New session added, session id: {}", session.getId());
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        log.info("Received message: {} from session: {}", message.getPayload(), session.getId());
        // 广播消息给所有连接的客户端
        for (WebSocketSession webSocketSession : sessions) {
            if (webSocketSession.isOpen()) {
                webSocketSession.sendMessage(new TextMessage("Echo: " + message.getPayload()));
            }
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        sessions.remove(session);
        log.info("Session closed, session id: {}", session.getId());
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        sessions.remove(session);
        log.error("Transport error for session: {}, error: {}", session.getId(), exception.getMessage());
    }

    public void broadcast(String message) {
        sessions.forEach(session -> {
            try {
                session.sendMessage(new TextMessage(message));
            } catch (IOException e) {
                log.error("Failed to send message to session: {}, error: {}", session.getId(), e.getMessage());
            }
        });
    }
}

注册 WebSocket 处理器

暴露 WebSocket 端点,配置允许的源。

java
package me.liujiajia.example.websocket;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Autowired
    private MyWebSocketHandler myWebSocketHandler;

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myWebSocketHandler, "/ws")
                .setAllowedOrigins("*");
    }

}

创建一个向所有 WebSocket 连接发送消息的定时任务

这是用来展示向客户端发送消息的定时任务示例:每 5 秒钟会向所有连接的客户端发送当前的服务器时间。

java
package me.liujiajia.example.websocket;

import jakarta.annotation.Resource;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class BroadcastSeverTimeScheduler {

    @Resource
    private MyWebSocketHandler myWebSocketHandler;

    @Scheduled(fixedRate = 5000)
    public void checkSessions() {
        myWebSocketHandler.broadcast("Server time: " + System.currentTimeMillis());
    }
}

SpringBoot 启动类

因为前面使用了定时任务,所以需要添加 @EnableScheduling 注解。

java
package me.liujiajia.example.websocket;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class WebsocketApplication {

    public static void main(String[] args) {
        SpringApplication.run(WebsocketApplication.class, args);
    }

}

前端测试页面

resources/static 目录下创建一个 index.html 文件,并在其中添加以下内容。

本页面会在创建连接后发送一条测试消息,然后在页面中显示接收到的消息。

html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>WebSocket 测试页面</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 20px;
            line-height: 1.6;
        }
        h1, h2 {
            color: #333;
        }
        pre {
            background-color: #f4f4f4;
            padding: 10px;
            border-radius: 5px;
            height: 80px;
            overflow: scroll;
        }
    </style>
</head>
<body>
<h1>WebSocket 测试页面</h1>
<h2>说明</h2>
<p>
    这个页面展示了如何使用 WebSocket 进行简单的通信,并提供如何在浏览器开发者工具中测试 WebSocket 的步骤。
</p>

<h2>WebSocket 示例</h2>
<pre id="log"></pre>

<script>
    const logElement = document.getElementById('log');

    // 创建 WebSocket 连接
    // 本地启动的 WebSocket 服务器由于没有证书需要使用 ws 协议,生产环境需要使用 wss 协议
    const ws = new WebSocket('ws://localhost:8080/ws');

    // 连接打开事件
    ws.onopen = () => {
        log('连接已打开');
        // 发送一个测试消息
        ws.send('Hello, WebSocket!');
    };

    // 接收消息事件
    ws.onmessage = (event) => {
        log(`收到消息: ${event.data}`);
    };

    // 连接关闭事件
    ws.onclose = () => {
        log('连接已关闭');
    };

    // 连接错误事件
    ws.onerror = (error) => {
        log(`发生错误: ${error.message}`);
    };

    // 日志函数
    function log(message) {
        const time = new Date().toLocaleTimeString();
        const logEntry = `${time} - ${message}\n`;
        logElement.textContent += logEntry;
        logElement.scrollTop = logElement.scrollHeight; // 自动滚动到底部
    }
</script>

<h2>在浏览器开发者工具中查看 WebSocket 请求</h2>
<ol>
    <li>
        打开浏览器(例如 Chrome、Firefox 等)。
    </li>
    <li>
        按 <kbd>F12</kbd> 或右键点击页面然后选择“检查”以打开开发者工具。
    </li>
    <li>
        导航到“网络”(Network)标签页。
    </li>
    <li>
        在过滤器中选择“WebSocket”。
    </li>
    <li>
        刷新此页面,你应该能看到与 `ws://localhost:8080/ws` 的 WebSocket 连接。
    </li>
    <li>
        点击该连接以查看详细信息,包括发送和接收的消息。
    </li>
</ol>

</body>
</html>

创建 WebSocket 连接的过程

本段内容由文心一言提供。

  1. TCP 三次握手

    • 在建立 WebSocket 连接之前,客户端和服务器之间首先需要进行 TCP 的三次握手,以确保双方之间的网络连接是可靠的。
  2. 客户端发送 WebSocket 握手请求

    • 握手请求是一个 HTTP 请求,但包含了特殊的头信息以指示这是一个 WebSocket 连接请求。
    • 请求方式必须是 GET,且 HTTP 的版本必须是 1.1。
    • 请求头中必须包含以下字段:
      • Host:指定服务器的主机名。
      • Connection:其值为Upgrade,表示这是一个升级请求。
      • Upgrade:其值为websocket,指定要升级到的协议是 WebSocket。
      • Sec-WebSocket-Key:一个随机生成的 Base64 编码字符串,用于帮助服务器确认连接请求的合法性,并防止恶意攻击。
      • Sec-WebSocket-Version:表示客户端支持的 WebSocket 版本,常见的是 13(目前的标准版本)。
  3. 服务器响应 WebSocket 握手请求

    • 如果服务器支持 WebSocket 协议,并且能够处理客户端的连接请求,它会返回一个包含 101 状态码的 HTTP 响应。
    • 响应头中包含必要的字段来确认升级协议,如:
      • Upgrade:其值为websocket,确认协议被升级到 WebSocket。
      • Connection:其值为Upgrade,指示当前连接已经从 HTTP 协议升级到 WebSocket。
      • Sec-WebSocket-Accept:这是一个通过特定算法(使用Sec-WebSocket-Key和一个预定义的 GUID 值)计算出来的响应值,用于确保客户端和服务器之间的握手是有效且安全的。
  4. 握手完成,WebSocket 连接建立

    • 一旦握手完成,客户端和服务器之间的连接就已经升级为 WebSocket 协议,此时双方可以开始通过 WebSocket 协议进行双向通信,发送和接收消息。

超时时间

调试跟踪的过程中还发现,Upgrade 成功后,读取和写入的超时时间会设置为 -1。

java
public UpgradeProcessorInternal(SocketWrapperBase<?> wrapper, UpgradeToken upgradeToken,
        UpgradeGroupInfo upgradeGroupInfo) {
    super(upgradeToken);
    this.internalHttpUpgradeHandler = (InternalHttpUpgradeHandler) upgradeToken.getHttpUpgradeHandler();
    /*
        * Leave timeouts in the hands of the upgraded protocol.
        */
    wrapper.setReadTimeout(INFINITE_TIMEOUT);
    wrapper.setWriteTimeout(INFINITE_TIMEOUT);

    internalHttpUpgradeHandler.setSocketWrapper(wrapper);

    // HTTP/2 uses RequestInfo objects so does not provide upgradeInfo
    UpgradeInfo upgradeInfo = internalHttpUpgradeHandler.getUpgradeInfo();
    if (upgradeInfo != null && upgradeGroupInfo != null) {
        upgradeInfo.setGroupInfo(upgradeGroupInfo);
    }
}

之后发送消息时会将写入的超时时间设置为 20s(WsRemoteEndpointImplBase.sendMessageBlock)。

java
void sendMessageBlock(CharBuffer part, boolean last) throws IOException {
    long timeout = getBlockingSendTimeout();
    boolean isDone = false;
    while (!isDone) {
        encoderBuffer.clear();
        CoderResult cr = encoder.encode(part, encoderBuffer, true);
        if (cr.isError()) {
            throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.encoderError", cr));
        }
        isDone = !cr.isOverflow();
        encoderBuffer.flip();
        sendMessageBlock(Constants.OPCODE_TEXT, encoderBuffer, last && isDone, timeout);
    }
    stateMachine.complete(last);
}

private long getBlockingSendTimeout() {
    Object obj = wsSession.getUserProperties().get(Constants.BLOCKING_SEND_TIMEOUT_PROPERTY);
    Long userTimeout = null;
    if (obj instanceof Long) {
        userTimeout = (Long) obj;
    }
    if (userTimeout == null) {
        return Constants.DEFAULT_BLOCKING_SEND_TIMEOUT;
    } else {
        return userTimeout.longValue();
    }
}

// Configuration for blocking sends
public static final String BLOCKING_SEND_TIMEOUT_PROPERTY = "org.apache.tomcat.websocket.BLOCKING_SEND_TIMEOUT";

另外如果发布到生产环境时,服务和客户端之间部署了反向代理(如 Nginx),还需要配置反向代理的超时时间。

nginx
http {
    ...
    upstream backend_websocket_servers {
        server 10.0.0.1:8080;
        server 10.0.0.2:8080;
    }

    server {
        ...
        location /ws {
            proxy_pass http://backend_websocket_servers;

            # WebSocket特有的配置,确保升级请求被正确处理
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "Upgrade";
            proxy_set_header Host $host;
            proxy_cache_bypass $http_upgrade;

            # 设置WebSocket请求的超时时间
            proxy_read_timeout 3600s;  # 读取超时时间设置为3600秒(1小时)
            proxy_send_timeout 3600s;  # 发送超时时间设置为3600秒(1小时)
        }

    }

}

Nginx Ingress 的配置可以参考如下清单 [1]

yaml
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
  annotations:
    kubernetes.io/ingress.class: nginx-controller-jiajia
    nginx.org/websocket-services: "svc-jiajia-websocket"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
  name: ingress-www-liujiajia-me-websocket
  namespace: jiajia-test
spec:
  rules:
  - host: www.liujiajia.me
    http:
      paths:
      - backend:
          serviceName: svc-jiajia-websocket
          servicePort: 80
        path: /ws/
        pathType: ImplementationSpecific
  tls:
  - hosts:
    - www.liujiajia.me
    secretName: tls-www-liujiajia-me

  1. Nginx Ingress Controller x WebSocket ↩︎

Page Layout Max Width

Adjust the exact value of the page width of VitePress layout to adapt to different reading needs and screens.

Adjust the maximum width of the page layout
A ranged slider for user to choose and customize their desired width of the maximum width of the page layout can go.

Content Layout Max Width

Adjust the exact value of the document content width of VitePress layout to adapt to different reading needs and screens.

Adjust the maximum width of the content layout
A ranged slider for user to choose and customize their desired width of the maximum width of the content layout can go.