openresty sse 动态扩展处理


sse (server send events) 是一个单向的服务器消息推送方案,比较适合的业务场景是实时数据显示(而且sse 可以很好的支持数据的多播)

参考图

集成说明

对于业务请求我们基于openresty 灵活的扩展能力,对于需要sse 服务的,我们可以按照租户或者业务系统或者单一订阅id进行请求的分区,我们可以直接利用注册
中心的能力,对于请求认证的安全我们可以直接通过nginx 的access 阶段进行处理,同时在此处结合注册中心可以动态的配置后端sse server,如果需要消息体处理
的我们可以在body filter 阶段处理

简单参考配置

此处后端sse 请求是通过参数传递的,实际上我们应该通过配置处理,以下只是一个简单的演示

 
user root; 
master_process off;
worker_processes 1;
events {
    worker_connections  1024;
}
http {
    include       mime.types;
    default_type  text/html;
    lua_code_cache off;
    lua_package_path '/opt/lua/?.lua;;';
    real_ip_header     X-Forwarded-For;
    resolver 127.0.0.11;
    server {
       listen 80;
       charset utf-8;
       proxy_set_header X-Forwarded-For $remote_addr;
       proxy_buffering off;
       proxy_cache off;
       proxy_set_header Connection '';
       proxy_http_version 1.1;
       chunked_transfer_encoding off;
       default_type text/html;
       location /ssev2 {
           set $myhost localhost;
           set $port 5000;
           access_by_lua_block {
            local token, err = ngx.req.get_headers()["token"]
            local myhost, err = ngx.req.get_uri_args()["myhost"]
            local port, err = ngx.req.get_uri_args()["port"]
            if token == nil then
                ngx.exit(ngx.HTTP_FORBIDDEN)
            end
            ngx.var.myhost= myhost
            ngx.var.port= math.floor(port)
           }
           proxy_pass http://$myhost:$port;
           body_filter_by_lua_block {
               ngx.arg[1] = string.upper(ngx.arg[1]).."dalongdemo"
           }
       }
    }
}

说明

类似的nchan 以及pushpin 都是不错的工具,pushpin 更多是定义了一个标准协议,nchan 是直接在nginx 上的一个扩展模块利用了redis

参考资料


https://github.com/rongfengliang/openresty-sse-proxy
https://github.com/slact/nchan
https://pushpin.org/
https://nchan.io/