SpringCloud Gateway转发Websocket并修改消息体大小限制


前言

在生产环境中, 为了保障业务数据的安全, 我们往往会将业务服务部署在内网环境, 并通过一个网关服务将需要提供给外部调用的接口暴露出去, HTTP请求如此, Websocket亦是如此, 今天就来讲一下如何使用SpringCloud Gateway网关服务代理转发Websocket请求, 以及如何解决其中的消息体大小问题!

  根据上图, 我准备了两个案例服务, 一个是 Gateway网关 , 一个是 Websocket服务, 代码如下:

网关

项目结构

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0modelVersion>

    <parent>
        <artifactId>spring-boot-parentartifactId>
        <groupId>org.springframework.bootgroupId>
        <version>2.1.3.RELEASEversion>
    parent>

    <groupId>org.examplegroupId>
    <artifactId>sample-gatewayartifactId>
    <version>1.0-SNAPSHOTversion>

    <properties>
        <maven.compiler.source>8maven.compiler.source>
        <maven.compiler.target>8maven.compiler.target>

        <spring.cloud.version>Greenwich.SR3spring.cloud.version>
    properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloudgroupId>
                <artifactId>spring-cloud-dependenciesartifactId>
                <version>${spring.cloud.version}version>
                <type>pomtype>
                <scope>importscope>
            dependency>
        dependencies>
    dependencyManagement>

    <dependencies>
        
        <dependency>
            <groupId>org.springframework.cloudgroupId>
            <artifactId>spring-cloud-starter-gatewayartifactId>
        dependency>
    dependencies>

project> 

Application.java

package com.my.gateway;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author yjy
 * Created at 2022/1/2 3:24 下午
 */
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

application.properties

spring.application.name=sample-gateway
server.port=7070
 
spring.cloud.gateway.routes[0].id = test-websocket
spring.cloud.gateway.routes[0].uri = ws://127.0.0.1:18080
spring.cloud.gateway.routes[0].predicates[0] = Path=/websocket
spring.cloud.gateway.routes[0].predicates[1] = Header=Connection,Upgrade
 

Websocket服务

项目结构

 

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-boot-starter-parentartifactId>
        <groupId>org.springframework.bootgroupId>
        <version>2.1.3.RELEASEversion>
        <relativePath/>
    parent>
    <modelVersion>4.0.0modelVersion>

    <artifactId>example-spring-cloud-simple-websocketartifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-websocketartifactId>
        dependency>
    dependencies>

project>

Application.java

package com.idanchuang.example.spring.cloud.simple.websocket;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Bean
    public WebsocketServer websocketServer() {
        return new WebsocketServer();
    }

}

WebsocketServer.java 

package com.idanchuang.example.spring.cloud.simple.websocket;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;

@ServerEndpoint("/websocket")
public class WebsocketServer {

    private void sendMessage(Session session, String message) throws IOException {
        session.getBasicRemote().sendText(message);
    }

    @OnOpen
    public void onOpen(Session session) throws IOException {
        sendMessage(session, "收到请求, 连接成功");
    }

    @OnClose
    public void onClose(Session session){
        System.out.println(session.getBasicRemote() + " > 断开连接");
    }

    @OnMessage
    public void onMessage(String message, Session session) throws IOException{
        message = message.length() > 100 ? message.substring(0, 100) + "..." : message;
        message = "收到消息: " + message;
        System.out.println(message);
        sendMessage(session, message);
    }

    @OnError
    public void onError(Session session, Throwable throwable){
        throwable.printStackTrace();
    }

} 

application.yml

server.port: 18080
spring.application.name: example-simple-websocket
 

存在问题

一般情况下, 以上配置不会有什么问题, 但是有一个隐藏的限制是, Websocket消息的接收与发送限制了长度在 65535 以内, 否则连接就会异常并中断, 如下:

解决方案

Websocket服务配置

首先调整的消息体大小限制, 具体的方式就是指定 @OnMessage 注解的 maxMessageSize 属性, 比如修改到 3000000字节 (约3M)
    @OnMessage(maxMessageSize = 3000000)
    public void onMessage(String message, Session session) throws IOException{
        message = message.length() > 100 ? message.substring(0, 100) + "..." : message;
        message = "收到消息: " + message;
        System.out.println(message);
        sendMessage(session, message);
    }
这样修改后, 如果客户端直接连接Websocket服务端, 那么消息体大小限制的问题就解决了, 但是如果通过Gateway网关代理, 还需要对网关进行配置  

Gateway网关配置

假如使用的SpringCloud版本为 H (Hoxton.???)

那么仅需要在网关添加一行配置皆可, 如下
spring.cloud.gateway.httpclient.websocket.max-frame-payload-length=3000000

假如使用的SpringCloud版本为H以下

比如我们正在使用的是G版本, 如果不能升级到H, 那可以通过覆盖源码的方案来解决了, 具体如下 在我们的Gateway项目中创建两个类, 注意包名必须以类的全限定名为准 org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy.java 调整好的项目结构如下 将两个类的代码贴上来, 对源码做了修改的地方写了一行注释 // Modified ReactorNettyWebSocketClient.java
package org.springframework.web.reactive.socket.client;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.websocket.WebsocketInbound;

import java.net.URI;

/**
 * {@link WebSocketClient} implementation for use with Reactor Netty.
 *
 * @author Rossen Stoyanchev
 * @since 5.0
 */
public class ReactorNettyWebSocketClient implements WebSocketClient {

    private static final Log logger = LogFactory.getLog(ReactorNettyWebSocketClient.class);


    private final HttpClient httpClient;

    /**
     * Default constructor.
     */
    public ReactorNettyWebSocketClient() {
        this(HttpClient.create());
    }

    /**
     * Constructor that accepts an existing {@link HttpClient} builder.
     * @since 5.1
     */
    public ReactorNettyWebSocketClient(HttpClient httpClient) {
        Assert.notNull(httpClient, "HttpClient is required");
        this.httpClient = httpClient;
    }


    /**
     * Return the configured {@link HttpClient}.
     */
    public HttpClient getHttpClient() {
        return this.httpClient;
    }


    @Override
    public Mono execute(URI url, WebSocketHandler handler) {
        return execute(url, new HttpHeaders(), handler);
    }

    @Override
    public Mono execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
        return getHttpClient()
                .headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
                .websocket(StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols()))
                .uri(url.toString())
                .handle((inbound, outbound) -> {
                    HttpHeaders responseHeaders = toHttpHeaders(inbound);
                    String protocol = responseHeaders.getFirst("Sec-WebSocket-Protocol");
                    HandshakeInfo info = new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol);
                    NettyDataBufferFactory factory = new NettyDataBufferFactory(outbound.alloc());
                    // Modified
                    WebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, info, factory, 3000000);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Started session '" + session.getId() + "' for " + url);
                    }
                    return handler.handle(session);
                })
                .doOnRequest(n -> {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Connecting to " + url);
                    }
                })
                .next();
    }

    private void setNettyHeaders(HttpHeaders httpHeaders, io.netty.handler.codec.http.HttpHeaders nettyHeaders) {
        httpHeaders.forEach(nettyHeaders::set);
    }

    private HttpHeaders toHttpHeaders(WebsocketInbound inbound) {
        HttpHeaders headers = new HttpHeaders();
        io.netty.handler.codec.http.HttpHeaders nettyHeaders = inbound.headers();
        nettyHeaders.forEach(entry -> {
            String name = entry.getKey();
            headers.put(name, nettyHeaders.getAll(name));
        });
        return headers;
    }

}
ReactorNettyRequestUpgradeStrategy.java
/*
 * Copyright 2002-2018 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.web.reactive.socket.server.upgrade;

import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.server.reactive.AbstractServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServerResponse;

import java.util.function.Supplier;

/**
 * A {@link RequestUpgradeStrategy} for use with Reactor Netty.
 *
 * @author Rossen Stoyanchev
 * @since 5.0
 */
public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {

    // Modified
    private int maxFramePayloadLength = 3000000;
//    private int maxFramePayloadLength = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE;


    /**
     * Configure the maximum allowable frame payload length. Setting this value
     * to your application's requirement may reduce denial of service attacks
     * using long data frames.
     * 

Corresponds to the argument with the same name in the constructor of * {@link io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory * WebSocketServerHandshakerFactory} in Netty. *

By default set to 65536 (64K). * @param maxFramePayloadLength the max length for frames. * @since 5.1 */ public void setMaxFramePayloadLength(Integer maxFramePayloadLength) { this.maxFramePayloadLength = maxFramePayloadLength; } /** * Return the configured max length for frames. * @since 5.1 */ public int getMaxFramePayloadLength() { return this.maxFramePayloadLength; } @Override public Mono upgrade(ServerWebExchange exchange, WebSocketHandler handler, @Nullable String subProtocol, Supplier handshakeInfoFactory) { ServerHttpResponse response = exchange.getResponse(); HttpServerResponse reactorResponse = ((AbstractServerHttpResponse) response).getNativeResponse(); HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); NettyDataBufferFactory bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); return reactorResponse.sendWebsocket(subProtocol, this.maxFramePayloadLength, (in, out) -> { ReactorNettyWebSocketSession session = new ReactorNettyWebSocketSession( in, out, handshakeInfo, bufferFactory, this.maxFramePayloadLength); return handler.handle(session); }); } }

验证结果