CH11-Websocket

本章涵盖了如下内容:

  • WebSockets
  • ChannelHandler, Decoder 和 Encoder
  • 引导你的应用程序

real-time web(实时web)是一组技术和实践,使用户能够实时地接收 到作者发布的信息,而不需要用户用他们的软件定期检查更新源。

HTTP 的请求/响应的设计并不能满足实时的需求,而 WebSocket 协议从设计以来就提供双向数据传输,允许客户和服务器在任何时间发送消息,并要求它们能够异步处理消息。最新的浏览器都将 WebSockets 作为HTML5的一种客户端API来支持的。

Netty 中对于 WebSocket 的支持包括正在使用的所有主要的实现,所以在你的下一个应用程序中采用它会非常简单。像往常使用Netty一样,你可以充分利用这种协议,而不必担心其内部实现细节。 我们将通过开发基于 WebSocket 的实时聊天应用证明这一点。

WebSocket 程序示例

为了说明实时功能的特点,我们使用 WebSocket 协议来实现一个基于浏览器的实时聊天程序,就像你在 Facebook 中用文字聊天一样。但是我们这里要更进一步,我们要让不同的用户可以同时互相交谈。

程序逻辑如图 11.1 所示

NAME

#1客户端/用户连接到服务器,并且是聊天的一部分

#2聊天消息通过 WebSocket 进行交换

#3消息双向发送

#4服务器处理所有的客户端/用户

逻辑很简单:

  • 1.客户端发送一个消息。
  • 2.消息被广播到所有其他连接的客户端。

这正如你所想的聊天室的工作方式:每个人都可以跟其他人聊天。此例子将仅提供服务器端,浏览器充当客户端,通过访问网页来聊天。正如您接下来要看到的,WebSocket 让这一切变得简单。

添加 WebSocket 支持

WebSocket 使用一种被称作“Upgrade handshake(升级握手)”的机制将标准的 HTTP 或HTTPS 协议转为 WebSocket。因此,使用 WebSocket 的应用程序将始终以 HTTP/S 开始,然后进行升级。这种升级发生在什么时候取决于具体的应用;可以在应用启动的时候,或者当一个特定的 URL 被请求的时候。

在我们的应用中,仅当 URL 请求以“/ws”结束时,我们才升级协议为WebSocket。否则,服务器将使用基本的 HTTP/S。一旦连接升级,之后的数据传输都将使用 WebSocket 。

下面看下服务器的逻辑图

Figure 11.2 Server logic

NAME

#1客户端/用户连接到服务器并加入聊天

#2 HTTP 请求页面或 WebSocket 升级握手

#3服务器处理所有客户端/用户

#4响应 URI “/”的请求,转到 index.html

#5如果访问的是 URI“/ws” ,处理 WebSocket 升级握手

#6升级握手完成后 ,通过 WebSocket 发送聊天消息

处理 HTTP 请求

本节我们将实现此应用中用于处理 HTTP 请求的组件,这个组件托管着可供客户端访问的聊天室页面,并且显示客户端发送的消息。

下面就是这个 HttpRequestHandler 的代码,它是一个用来处理 FullHttpRequest 消息的 ChannelInboundHandler 的实现类。注意看它是怎么实现忽略符合 “/ws” 格式的 URI 请求的。

Listing 11.1 HTTPRequestHandler

public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {    //1
    private final String wsUri;
    private static final File INDEX;

    static {
        URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
        try {
            String path = location.toURI() + "index.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate index.html", e);
        }
    }

    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (wsUri.equalsIgnoreCase(request.getUri())) {
            ctx.fireChannelRead(request.retain());                    //2
        } else {
            if (HttpHeaders.is100ContinueExpected(request)) {
                send100Continue(ctx);                                //3
            }

            RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4

            HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");

            boolean keepAlive = HttpHeaders.isKeepAlive(request);

            if (keepAlive) {                                        //5
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
                response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response);                    //6

            if (ctx.pipeline().get(SslHandler.class) == null) {        //7
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);            //8
            if (!keepAlive) {
                future.addListener(ChannelFutureListener.CLOSE);        //9
            }
        }
    }

    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

1.扩展 SimpleChannelInboundHandler 用于处理 FullHttpRequest信息

2.如果请求是一次升级了的 WebSocket 请求,则递增引用计数器(retain)并且将它传递给在 ChannelPipeline 中的下个 ChannelInboundHandler

3.处理符合 HTTP 1.1的 “100 Continue” 请求

4.读取 index.html

5.判断 keepalive 是否在请求头里面

6.写 HttpResponse 到客户端

7.写 index.html 到客户端,根据 ChannelPipeline 中是否有 SslHandler 来决定使用 DefaultFileRegion 还是 ChunkedNioFile

8.写并刷新 LastHttpContent 到客户端,标记响应完成

9.如果 请求头中不包含 keepalive,当写完成时,关闭 Channel

HttpRequestHandler 做了下面几件事,

  • 如果该 HTTP 请求被发送到URI “/ws”,则调用 FullHttpRequest 上的 retain(),并通过调用 fireChannelRead(msg) 转发到下一个 ChannelInboundHandler。retain() 的调用是必要的,因为 channelRead() 完成后,它会调用 FullHttpRequest 上的 release() 来释放其资源。 (请参考我们先前在第6章中关于 SimpleChannelInboundHandler 的讨论)
  • 如果客户端发送的 HTTP 1.1 头是“Expect: 100-continue” ,则发送“100 Continue”的响应。
  • 在 头被设置后,写一个 HttpResponse 返回给客户端。注意,这不是 FullHttpResponse,这只是响应的第一部分。另外,这里我们也不使用 writeAndFlush(), 这个是在留在最后完成。
  • 如果传输过程既没有要求加密也没有要求压缩,那么把 index.html 的内容存储在一个 DefaultFileRegion 里就可以达到最好的效率。这将利用零拷贝来执行传输。出于这个原因,我们要检查 ChannelPipeline 中是否有一个 SslHandler。如果是的话,我们就使用 ChunkedNioFile。
  • 写 LastHttpContent 来标记响应的结束,并终止它
  • 如果不要求 keepalive ,添加 ChannelFutureListener 到 ChannelFuture 对象的最后写入,并关闭连接。注意,这里我们调用 writeAndFlush() 来刷新所有以前写的信息。

这里展示了应用程序的第一部分,用来处理纯的 HTTP 请求和响应。接下来我们将处理 WebSocket 的 frame(帧),用来发送聊天消息。

WebSocket frame

WebSockets 在“帧”里面来发送数据,其中每一个都代表了一个消息的一部分。一个完整的消息可以利用了多个帧。

处理 WebSocket frame

WebSocket “Request for Comments” (RFC) 定义了六种不同的 frame; Netty 给他们每个都提供了一个 POJO 实现 ,见下表:

Table 11.1 WebSocketFrame types

名称描述
BinaryWebSocketFramecontains binary data
TextWebSocketFramecontains text data
ContinuationWebSocketFramecontains text or binary data that belongs to a previous BinaryWebSocketFrame or TextWebSocketFrame
CloseWebSocketFramerepresents a CLOSE request and contains close status code and a phrase
PingWebSocketFramerequests the transmission of a PongWebSocketFrame
PongWebSocketFramesent as a response to a PingWebSocketFrame

我们的程序只需要使用下面4个帧类型:

  • CloseWebSocketFrame
  • PingWebSocketFrame
  • PongWebSocketFrame
  • TextWebSocketFrame

在这里我们只需要处理 TextWebSocketFrame,其他的会由 WebSocketServerProtocolHandler 自动处理。

下面代码展示了 ChannelInboundHandler 处理 TextWebSocketFrame,同时也将跟踪在 ChannelGroup 中所有活动的 WebSocket 连接

Listing 11.2 Handles Text frames

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { //1
    private final ChannelGroup group;

    public TextWebSocketFrameHandler(ChannelGroup group) {
        this.group = group;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    //2
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {

            ctx.pipeline().remove(HttpRequestHandler.class);    //3

            group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));//4

            group.add(ctx.channel());    //5
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        group.writeAndFlush(msg.retain());    //6
    }
}

1.扩展 SimpleChannelInboundHandler 用于处理 TextWebSocketFrame 信息

2.覆写userEventTriggered() 方法来处理自定义事件

3.如果接收的事件表明握手成功,就从 ChannelPipeline 中删除HttpRequestHandler ,因为接下来不会接受 HTTP 消息了

4.写一条消息给所有的已连接 WebSocket 客户端,通知它们建立了一个新的 Channel 连接

5.添加新连接的 WebSocket Channel 到 ChannelGroup 中,这样它就能收到所有的信息

6.保留收到的消息,并通过 writeAndFlush() 传递给所有连接的客户端。

上面显示了 TextWebSocketFrameHandler 仅作了几件事:

  • 当WebSocket 与新客户端已成功握手完成,通过写入信息到 ChannelGroup 中的 Channel 来通知所有连接的客户端,然后添加新 Channel 到 ChannelGroup
  • 如果接收到 TextWebSocketFrame,调用 retain() ,并将其写、刷新到 ChannelGroup,使所有连接的 WebSocket Channel 都能接收到它。和以前一样,retain() 是必需的,因为当 channelRead0()返回时,TextWebSocketFrame 的引用计数将递减。由于所有操作都是异步的,writeAndFlush() 可能会在以后完成,我们不希望它访问无效的引用。

由于 Netty 在其内部处理了其余大部分功能,唯一剩下的需要我们去做的就是为每一个新创建的 Channel 初始化 ChannelPipeline 。要完成这个,我们需要一个ChannelInitializer

初始化 ChannelPipeline

接下来,我们需要安装我们上面实现的两个 ChannelHandler 到 ChannelPipeline。为此,我们需要继承 ChannelInitializer 并且实现 initChannel()。看下面 ChatServerInitializer 的代码实现

Listing 11.3 Init the ChannelPipeline

public class ChatServerInitializer extends ChannelInitializer<Channel> {    //1
    private final ChannelGroup group;

    public ChatServerInitializer(ChannelGroup group) {
        this.group = group;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {            //2
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpRequestHandler("/ws"));
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        pipeline.addLast(new TextWebSocketFrameHandler(group));
    }
}

1.扩展 ChannelInitializer

2.添加 ChannelHandler 到 ChannelPipeline

initChannel() 方法用于设置所有新注册的 Channel 的ChannelPipeline,安装所有需要的 ChannelHandler。总结如下:

Table 11.2 ChannelHandlers for the WebSockets Chat server

ChannelHandler职责
HttpServerCodecDecode bytes to HttpRequest, HttpContent, LastHttpContent.Encode HttpRequest, HttpContent, LastHttpContent to bytes.
ChunkedWriteHandlerWrite the contents of a file.
HttpObjectAggregatorThis ChannelHandler aggregates an HttpMessage and its following HttpContents into a single FullHttpRequest or FullHttpResponse (depending on whether it is being used to handle requests or responses).With this installed the next ChannelHandler in the pipeline will receive only full HTTP requests.
HttpRequestHandlerHandle FullHttpRequests (those not sent to “/ws” URI).
WebSocketServerProtocolHandlerAs required by the WebSockets specification, handle the WebSocket Upgrade handshake, PingWebSocketFrames,PongWebSocketFrames and CloseWebSocketFrames.
TextWebSocketFrameHandlerHandles TextWebSocketFrames and handshake completion events

该 WebSocketServerProtocolHandler 处理所有规定的 WebSocket 帧类型和升级握手本身。如果握手成功所需的 ChannelHandler 被添加到管道,而那些不再需要的则被去除。管道升级之前的状态如下图。这代表了 ChannelPipeline 刚刚经过 ChatServerInitializer 初始化。

Figure 11.3 ChannelPipeline before WebSockets Upgrade

NAME

握手升级成功后 WebSocketServerProtocolHandler 替换HttpRequestDecoder 为 WebSocketFrameDecoder,HttpResponseEncoder 为WebSocketFrameEncoder。 为了最大化性能,WebSocket 连接不需要的 ChannelHandler 将会被移除。其中就包括了 HttpObjectAggregator 和 HttpRequestHandler

下图,展示了 ChannelPipeline 经过这个操作完成后的情况。注意 Netty 目前支持四个版本 WebSocket 协议,每个通过其自身的方式实现类。选择正确的版本WebSocketFrameDecoder 和 WebSocketFrameEncoder 是自动进行的,这取决于在客户端(在这里指浏览器)的支持(在这个例子中,我们假设使用版本是 13 的 WebSocket 协议,从而图中显示的是 WebSocketFrameDecoder13 和 WebSocketFrameEncoder13)。

Figure 11.4 ChannelPipeline after WebSockets Upgrade

NAME

引导

最后一步是 引导服务器,设置 ChannelInitializer

public class ChatServer {

    private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);//1
    private final EventLoopGroup group = new NioEventLoopGroup();
    private Channel channel;

    public ChannelFuture start(InetSocketAddress address) {
        ServerBootstrap bootstrap  = new ServerBootstrap(); //2
        bootstrap.group(group)
                .channel(NioServerSocketChannel.class)
                .childHandler(createInitializer(channelGroup));
        ChannelFuture future = bootstrap.bind(address);
        future.syncUninterruptibly();
        channel = future.channel();
        return future;
    }

    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {        //3
       return new ChatServerInitializer(group);
    }

    public void destroy() {        //4
        if (channel != null) {
            channel.close();
        }
        channelGroup.close();
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception{
        if (args.length != 1) {
            System.err.println("Please give port as argument");
            System.exit(1);
        }
        int port = Integer.parseInt(args[0]);

        final ChatServer endpoint = new ChatServer();
        ChannelFuture future = endpoint.start(new InetSocketAddress(port));

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }
}

1.创建 DefaultChannelGroup 用来 保存所有连接的的 WebSocket channel

2.引导 服务器

3.创建 ChannelInitializer

4.处理服务器关闭,包括释放所有资源

测试程序

使用下面命令启动服务器:

mvn -PChatServer clean package exec:exec

其中项目中的 pom.xml 是配置了 9999 端口。你也可以通过下面的方法修改属性

mvn -PChatServer -Dport=1111 clean package exec:exec

下面是控制台的主要输出(删除了部分行)

Listing 11.5 Compile and start the ChatServer

[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building ChatServer 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
...
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---
[INFO] Building jar: D:/netty-in-action/chapter11/target/chat-server-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ chat-server ---
Starting ChatServer on port 9999

可以在浏览器中通过 http://localhost:9999 地址访问程序。图11.5展示了此程序在Chrome浏览器下的用户界面。

Figure 11.5 WebSockets ChatServer demonstration

NAME

图中显示了两个已经连接了的客户端。第一个客户端是通过上面的图形界面连接的,第二个是通过Chrome浏览器底部的命令行连接的。 你可以注意到,这两个客户端都在发送消息,每条消息都会显示在两个客户端上。

如何加密?

在实际场景中,加密是必不可少的。在Netty中实现加密并不麻烦,你只需要向 ChannelPipeline 中添加 SslHandler ,然后配置一下即可。如下:

Listing 11.6 Add encryption to the ChannelPipeline

public class SecureChatServerIntializer extends ChatServerInitializer {    //1
    private final SslContext context;

    public SecureChatServerIntializer(ChannelGroup group, SslContext context) {
        super(group);
        this.context = context;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        super.initChannel(ch);
        SSLEngine engine = context.newEngine(ch.alloc());
        engine.setUseClientMode(false);
        ch.pipeline().addFirst(new SslHandler(engine)); //2
    }
}

1.扩展 ChatServerInitializer 来实现加密

2.向 ChannelPipeline 中添加SslHandler

最后修改 ChatServer,使用 SecureChatServerInitializer 并传入 SSLContext

Listing 11.7 Add encryption to the ChatServer

public class SecureChatServer extends ChatServer {//1

    private final SslContext context;

    public SecureChatServer(SslContext context) {
        this.context = context;
    }

    @Override
    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
        return new SecureChatServerIntializer(group, context);    //2
    }

    public static void main(String[] args) throws Exception{
        if (args.length != 1) {
            System.err.println("Please give port as argument");
            System.exit(1);
        }
        int port = Integer.parseInt(args[0]);
        SelfSignedCertificate cert = new SelfSignedCertificate();
        SslContext context = SslContext.newServerContext(cert.certificate(), cert.privateKey());
        final SecureChatServer endpoint = new SecureChatServer(context);
        ChannelFuture future = endpoint.start(new InetSocketAddress(port));

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }
}

1.扩展 ChatServer

2.返回先前创建的 SecureChatServerInitializer 来启用加密

这样,就在所有的通信中使用了 SSL/TLS 加密。和前面一样,你可以使用Maven拉取应用需要的所有依赖,并启动它,如下所示。

Listing 11.8 Start the SecureChatServer

$ mvn -PSecureChatServer clean package exec:exec
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building ChatServer 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
...
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---
[INFO] Building jar: D:/netty-in-action/chapter11/target/chat-server-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ chat-server ---
Starting SecureChatServer on port 9999

现在你可以通过 HTTPS 地址: https://localhost:9999 来访问SecureChatServer 了。

总结

在本章中,我们学习了如何使用 Netty 中的 WebSocket 来管理 Web 应用程序中的实时数据。我们讲了所支持的数据类型,并讨论了你可能会遇到的问题。虽然 WebSockets 并不能在所有情况下使用,但应该清楚,它代表了 web 技术发展上的一个重要进步。

接下来我们来谈谈“Web2.0”开发中的另一项技术。也许你还没有听说过“SPDY”,但只要你读了下一章,你就很可能在你将来的开发中很好的运用这门技术了。