CH08-内置 ChannelHandler 与 Codec
本章介绍
- 使用 SSL/TLS 加密 Netty 程序
- 构建 Netty HTTP/HTTPS 程序
- 处理空闲连接和超时
- 解码分隔符和基于长度的协议
- 写大数据
- 序列化数据
Netty 提供了很多共同协议的编解码器和处理程序,您可以几乎“开箱即用”的使用他们,而无需花在相当乏味的基础设施问题。在这一章里,我们将探索这些工具和他们的好处。这包括支持 SSL/TLS,WebSocket 和 谷歌SPDY,通过数据压缩使 HTTP 有更好的性能。
使用 SSL/TLS 加密 Netty 程序
今天数据隐私是一个十分关注的问题,作为开发人员,我们需要准备好解决这个问题。至少我们需要熟悉加密协议 SSL 和 TLS 等之上的其他协议实现数据安全。作为一个 HTTPS 网站的用户,你是安全。当然,这些协议是广泛不基于 http 的应用程序,例如安全SMTP(SMTPS)邮件服务,甚至关系数据库系统。
为了支持 SSL/TLS,Java 提供了 javax.net.ssl API 的类SslContext 和 SslEngine 使它相对简单的实现解密和加密。Netty 的利用该 API 命名 SslHandler 的 ChannelHandler 实现, 有一个内部 SslEngine 做实际的工作。
图8.1显示了一个使用 SslHandler 数据流图。
- 加密的入站数据被 SslHandler 拦截,并被解密
- 前面加密的数据被 SslHandler 解密
- 平常数据传过 SslHandler
- SslHandler 加密数据并它传递出站
Figure 8.1 Data flow through SslHandler for decryption and encryption
如清单8.1所示一个 SslHandler 使用 ChannelInitializer 添加到 ChannelPipeline。(回想一下,当 Channel 注册时 ChannelInitializer 用于设置 ChannelPipeline 。)
Listing 8.1 Add SSL/TLS support
public class SslChannelInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean startTls;
public SslChannelInitializer(SslContext context,
boolean client, boolean startTls) { //1
this.context = context;
this.startTls = startTls;
}
@Override
protected void initChannel(Channel ch) throws Exception {
SSLEngine engine = context.newEngine(ch.alloc()); //2
engine.setUseClientMode(client); //3
ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls)); //4
}
}
- 使用构造函数来传递 SSLContext 用于使用(startTls 是否启用)
- 从 SslContext 获得一个新的 SslEngine 。给每个 SslHandler 实例使用一个新的 SslEngine
- 设置 SslEngine 是 client 或者是 server 模式
- 添加 SslHandler 到 pipeline 作为第一个处理器
在大多数情况下,SslHandler 将成为 ChannelPipeline 中的第一个 ChannelHandler 。这将确保所有其他 ChannelHandler 应用他们的逻辑到数据后加密后才发生,从而确保他们的变化是安全的。
SslHandler 有很多有用的方法,如表8.1所示。例如,在握手阶段两端相互验证,商定一个加密方法。您可以配置 SslHandler 修改其行为或提供 在SSL/TLS 握手完成后发送通知,这样所有数据都将被加密。 SSL/TLS 握手将自动执行。
Table 8.1 SslHandler methods
名称 | 描述 |
---|---|
setHandshakeTimeout(…) setHandshakeTimeoutMillis(…) getHandshakeTimeoutMillis() | Set and get the timeout, after which the handshake ChannelFuture is notified of failure. |
setCloseNotifyTimeout(…) setCloseNotifyTimeoutMillis(…) getCloseNotifyTimeoutMillis() | Set and get the timeout after which the close notify will time out and the connection will close. This also results in having the close notify ChannelFuture fail. |
handshakeFuture() | Returns a ChannelFuture that will be notified once the handshake is complete. If the handshake was done before it will return a ChannelFuture that contains the result of the previous handshake. |
close(…) | Send the close_notify to request close and destroy the underlying SslEngine. |
构建 Netty HTTP/HTTPS 应用
HTTP/HTTPS 是最常见的一种协议,在智能手机里广泛应用。虽然每家公司都有一个主页,您可以通过HTTP或HTTPS访问,这不是它唯一的使用。许多组织通过 HTTP(S) 公开 WebService API ,旨在用于缓解独立的平台带来的弊端 。 让我们看一下 Netty 提供的 ChannelHandler,是如何允许您使用 HTTP 和 HTTPS 而无需编写自己的编解码器。
HTTP Decoder, Encoder 和 Codec
HTTP 是请求-响应模式,客户端发送一个 HTTP 请求,服务就响应此请求。Netty 提供了简单的编码、解码器来简化基于这个协议的开发工作。图8.2和图8.3显示 HTTP 请求和响应的方法是如何生产和消费的
- HTTP Request 第一部分是包含的头信息
- HttpContent 里面包含的是数据,可以后续有多个 HttpContent 部分
- LastHttpContent 标记是 HTTP request 的结束,同时可能包含头的尾部信息
- 完整的 HTTP request
Figure 8.2 HTTP request component parts
- HTTP response 第一部分是包含的头信息
- HttpContent 里面包含的是数据,可以后续有多个 HttpContent 部分
- LastHttpContent 标记是 HTTP response 的结束,同时可能包含头的尾部信息
- 完整的 HTTP response
Figure 8.3 HTTP response component parts
如图8.2和8.3所示的 HTTP 请求/响应可能包含不止一个数据部分,它总是终止于 LastHttpContent 部分。FullHttpRequest 和FullHttpResponse 消息是特殊子类型,分别表示一个完整的请求和响应。所有类型的 HTTP 消息(FullHttpRequest ,LastHttpContent 以及那些如清单8.2所示)实现 HttpObject 接口。
表8.2概述 HTTP 解码器和编码器的处理和生产这些消息。
Table 8.2 HTTP decoder and encoder
名称 | 描述 |
---|---|
HttpRequestEncoder | Encodes HttpRequest , HttpContent and LastHttpContent messages to bytes. |
HttpResponseEncoder | Encodes HttpResponse, HttpContent and LastHttpContent messages to bytes. |
HttpRequestDecoder | Decodes bytes into HttpRequest, HttpContent and LastHttpContent messages. |
HttpResponseDecoder | Decodes bytes into HttpResponse, HttpContent and LastHttpContent messages. |
清单8.2所示的是将支持 HTTP 添加到您的应用程序是多么简单。仅仅添加正确的 ChannelHandler 到 ChannelPipeline 中
Listing 8.2 Add support for HTTP
public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpPipelineInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) {
pipeline.addLast("decoder", new HttpResponseDecoder()); //1
pipeline.addLast("encoder", new HttpRequestEncoder()); //2
} else {
pipeline.addLast("decoder", new HttpRequestDecoder()); //3
pipeline.addLast("encoder", new HttpResponseEncoder()); //4
}
}
}
- client: 添加 HttpResponseDecoder 用于处理来自 server 响应
- client: 添加 HttpRequestEncoder 用于发送请求到 server
- server: 添加 HttpRequestDecoder 用于接收来自 client 的请求
- server: 添加 HttpResponseEncoder 用来发送响应给 client
HTTP消息聚合
安装 ChannelPipeline 中的初始化之后,你能够对不同 HttpObject 消息进行操作。但由于 HTTP 请求和响应可以由许多部分组合而成,你需要聚合他们形成完整的消息。为了消除这种繁琐任务, Netty 提供了一个聚合器,合并消息部件到 FullHttpRequest 和 FullHttpResponse 消息。这样您总是能够看到完整的消息内容。
这个操作有一个轻微的成本,消息段需要缓冲,直到完全可以将消息转发到下一个 ChannelInboundHandler 管道。但好处是,你不必担心消息碎片。
实现自动聚合只需添加另一个 ChannelHandler 到 ChannelPipeline。清单8.3显示了这是如何实现的。
Listing 8.3 Automatically aggregate HTTP message fragments
public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpAggregatorInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) {
pipeline.addLast("codec", new HttpClientCodec()); //1
} else {
pipeline.addLast("codec", new HttpServerCodec()); //2
}
pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024)); //3
}
}
- client: 添加 HttpClientCodec
- server: 添加 HttpServerCodec 作为我们是 server 模式时
- 添加 HttpObjectAggregator 到 ChannelPipeline, 使用最大消息值是 512kb
HTTP 压缩
使用 HTTP 时建议压缩数据以减少传输流量,压缩数据会增加 CPU 负载,现在的硬件设施都很强大,大多数时候压缩数据时一个好主意。Netty 支持“gzip”和“deflate”,为此提供了两个 ChannelHandler 实现分别用于压缩和解压。看下面代码:
HTTP Request Header
客户端可以通过提供下面的头显示支持加密模式。然而服务器不是,所以不得不压缩它发送的数据。
GET /encrypted-area HTTP/1.1
Host: www.example.com
Accept-Encoding: gzip, deflate
下面是一个例子
Listing 8.4 Automatically compress HTTP messages
public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpAggregatorInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec()); //1
pipeline.addLast("decompressor",new HttpContentDecompressor()); //2
} else {
pipeline.addLast("codec", new HttpServerCodec()); //3
pipeline.addLast("compressor",new HttpContentCompressor()); //4
}
}
}
- client: 添加 HttpClientCodec
- client: 添加 HttpContentDecompressor 用于处理来自服务器的压缩的内容
- server: HttpServerCodec
- server: HttpContentCompressor 用于压缩来自 client 支持的 HttpContentCompressor
压缩与依赖
注意,Java 6或者更早版本,如果要压缩数据,需要添加 jzlib 到 classpath
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jzlib</artifactId>
<version>1.1.3</version>
</dependency>
使用 HTTPS
启用 HTTPS,只需添加 SslHandler
Listing 8.5 Using HTTPS
public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean client;
public HttpsCodecInitializer(SslContext context, boolean client) {
this.context = context;
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SSLEngine engine = context.newEngine(ch.alloc());
pipeline.addFirst("ssl", new SslHandler(engine)); //1
if (client) {
pipeline.addLast("codec", new HttpClientCodec()); //2
} else {
pipeline.addLast("codec", new HttpServerCodec()); //3
}
}
}
- 添加 SslHandler 到 pipeline 来启用 HTTPS
- client: 添加 HttpClientCodec
- server: 添加 HttpServerCodec ,如果是 server 模式的话
上面的代码就是一个很好的例子,解释了 Netty 的架构是如何让“重用”变成了“杠杆”。我们可以添加一个新的功能,甚至是一样重要的加密支持,几乎没有工作量,只需添加一个ChannelHandler 到 ChannelPipeline。
WebSocket
HTTP 是不错的协议,但是如果需要实时发布信息怎么做?有个做法就是客户端一直轮询请求服务器,这种方式虽然可以达到目的,但是其缺点很多,也不是优秀的解决方案,为了解决这个问题,便出现了 WebSocket。
WebSocket 允许数据双向传输,而不需要请求-响应模式。早期的WebSocket 只能发送文本数据,然后现在不仅可以发送文本数据,也可以发送二进制数据,这使得可以使用 WebSocket 构建你想要的程序。下图是WebSocket 的通信示例图:
WebSocket 规范及其实现是为了一个更有效的解决方案。简单的说, 一个WebSocket 提供一个 TCP 连接两个方向的交通。结合 WebSocket API 它提供了一个替代 HTTP 轮询双向通信从页面到远程服务器。
也就是说,WebSocket 提供真正的双向客户机和服务器之间的数据交换。 我们不会对内部太多的细节,但我们应该提到,虽然最早实现仅限于文本数据,但现在不再是这样,WebSocket可以用于任意数据,就像一个正常的套接字。
图8.4给出了一个通用的 WebSocket 协议。在这种情况下的通信开始于普通 HTTP ,并“升级”为双向 WebSocket。
- Client (HTTP) 与 Server 通讯
- Server (HTTP) 与 Client 通讯
- Client 通过 HTTP(s) 来进行 WebSocket 握手,并等待确认
- 连接协议升级至 WebSocket
Figure 8.4 WebSocket protocol
添加应用程序支持 WebSocket 只需要添加适当的客户端或服务器端WebSocket ChannelHandler 到管道。这个类将处理特殊 WebSocket 定义的消息类型,称为“帧。“如表8.3所示,这些可以归类为“数据”和“控制”帧。
Table 8.3 WebSocketFrame types
名称 | 描述 |
---|---|
BinaryWebSocketFrame | Data frame: binary data |
TextWebSocketFrame | Data frame: text data |
ContinuationWebSocketFrame | Data frame: text or binary data that belongs to a previous BinaryWebSocketFrame or TextWebSocketFrame |
CloseWebSocketFrame | Control frame: a CLOSE request, close status code and a phrase |
PingWebSocketFrame | Control frame: requests the send of a PongWebSocketFrame |
PongWebSocketFrame | Control frame: sent as response to a PingWebSocketFrame |
由于 Netty 的主要是一个服务器端技术重点在这里创建一个 WebSocket server 。清单8.6使用 WebSocketServerProtocolHandler 提出了一个简单的例子。该类处理协议升级握手以及三个“控制”帧 Close, Ping 和 Pong。Text 和 Binary 数据帧将被传递到下一个处理程序(由你实现)进行处理。
Listing 8.6 Support WebSocket on the server
public class WebSocketServerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new HttpServerCodec(),
new HttpObjectAggregator(65536), //1
new WebSocketServerProtocolHandler("/websocket"), //2
new TextFrameHandler(), //3
new BinaryFrameHandler(), //4
new ContinuationFrameHandler()); //5
}
public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// Handle text frame
}
}
public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
// Handle binary frame
}
}
public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {
// Handle continuation frame
}
}
}
- 添加 HttpObjectAggregator 用于提供在握手时聚合 HttpRequest
- 添加 WebSocketServerProtocolHandler 用于处理色好给你寄握手如果请求是发送到"/websocket." 端点,当升级完成后,它将会处理Ping, Pong 和 Close 帧
- TextFrameHandler 将会处理 TextWebSocketFrames
- BinaryFrameHandler 将会处理 BinaryWebSocketFrames
- ContinuationFrameHandler 将会处理ContinuationWebSocketFrames
加密 WebSocket 只需插入 SslHandler 到作为 pipline 第一个 ChannelHandler
详见 [Chapter 11 WebSocket]。
SPDY
SPDY(读作“SPeeDY”)是Google 开发的基于 TCP 的应用层协议,用以最小化网络延迟,提升网络速度,优化用户的网络使用体验。SPDY 并不是一种用于替代 HTTP 的协议,而是对 HTTP 协议的增强。SPDY 实现技术:
- 压缩报头
- 加密所有
- 多路复用连接
- 提供支持不同的传输优先级
SPDY 主要有5个版本:
- 1 - 初始化版本,但没有使用
- 2 - 新特性,包含服务器推送
- 3 - 新特性包含流控制和更新压缩
- 3.1 - 会话层流程控制
- 4.0 - 流量控制,并与 HTTP 2.0 更加集成
SPDY 被很多浏览器支持,包括 Google Chrome, Firefox, 和 Opera
Netty 支持 版本 2 和 3 (包含3.1)的支持。这些版本被广泛应用,可以支持更多的用户。更多内容详见 [Chapter 12]。
空闲连接以及超时
检测空闲连接和超时是为了及时释放资源。常见的方法发送消息用于测试一个不活跃的连接来,通常称为“心跳”,到远端来确定它是否还活着。(一个更激进的方法是简单地断开那些指定的时间间隔的不活跃的连接)。
处理空闲连接是一项常见的任务,Netty 提供了几个 ChannelHandler 实现此目的。表8.4概述。
Table 8.4 ChannelHandlers for idle connections and timeouts
名称 | 描述 |
---|---|
IdleStateHandler | 如果连接闲置时间过长,则会触发 IdleStateEvent 事件。在 ChannelInboundHandler 中可以覆盖 userEventTriggered(…) 方法来处理 IdleStateEvent。 |
ReadTimeoutHandler | 在指定的时间间隔内没有接收到入站数据则会抛出 ReadTimeoutException 并关闭 Channel。ReadTimeoutException 可以通过覆盖 ChannelHandler 的 exceptionCaught(…) 方法检测到。 |
WriteTimeoutHandler | WriteTimeoutException 可以通过覆盖 ChannelHandler 的 exceptionCaught(…) 方法检测到。 |
详细看下 IdleStateHandler,下面是一个例子,当超过60秒没有数据收到时,就会得到通知,此时就发送心跳到远端,如果没有回应,连接就关闭。
Listing 8.7 Sending heartbeats
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)); //1
pipeline.addLast(new HeartbeatHandler());
}
public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1)); //2
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); //3
} else {
super.userEventTriggered(ctx, evt); //4
}
}
}
}
- IdleStateHandler 将通过 IdleStateEvent 调用 userEventTriggered ,如果连接没有接收或发送数据超过60秒钟
- 心跳发送到远端
- 发送的心跳并添加一个侦听器,如果发送操作失败将关闭连接
- 事件不是一个 IdleStateEvent 的话,就将它传递给下一个处理程序
总而言之,这个例子说明了如何使用 IdleStateHandler 测试远端是否还活着,如果不是就关闭连接释放资源。
解码分隔符和基于长度的协议
使用 Netty 时会遇到需要解码以分隔符和长度为基础的协议,本节讲解Netty 如何解码这些协议。
分隔符协议
经常需要处理分隔符协议或创建基于它们的协议,例如SMTP、POP3、IMAP、Telnet等等。Netty 附带的解码器可以很容易的提取一些序列分隔:
Table 8.5 Decoders for handling delimited and length-based protocols
名称 | 描述 |
---|---|
DelimiterBasedFrameDecoder | 接收ByteBuf由一个或多个分隔符拆分,如NUL或换行符 |
LineBasedFrameDecoder | 接收ByteBuf以分割线结束,如"\n"和"\r\n" |
下图显示了使用"\r\n"分隔符的处理:
- 字节流
- 第一帧
- 第二帧
Figure 8.5 Handling delimited frames
下面展示了如何用 LineBasedFrameDecoder 处理
Listing 8.8 Handling line-delimited frames
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(65 * 1024)); //1
pipeline.addLast(new FrameHandler()); //2
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { //3
// Do something with the frame
}
}
}
- 添加一个 LineBasedFrameDecoder 用于提取帧并把数据包转发到下一个管道中的处理程序,在这种情况下就是 FrameHandler
- 添加 FrameHandler 用于接收帧
- 每次调用都需要传递一个单帧的内容
使用 DelimiterBasedFrameDecoder 可以方便处理特定分隔符作为数据结构体的这类情况。如下:
- 传入的数据流是一系列的帧,每个由换行(“\n”)分隔
- 每帧包括一系列项目,每个由单个空格字符分隔
- 一帧的内容代表一个“命令”:一个名字后跟一些变量参数
清单8.9中显示了的实现的方式。定义以下类:
- 类 Cmd - 存储帧的内容,其中一个 ByteBuf 用于存名字,另外一个存参数
- 类 CmdDecoder - 从重写方法 decode() 中检索一行,并从其内容中构建一个 Cmd 的实例
- 类 CmdHandler - 从 CmdDecoder 接收解码 Cmd 对象和对它的一些处理。
所以关键的解码器是扩展了 LineBasedFrameDecoder
Listing 8.9 Decoder for the command and the handler
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CmdDecoder(65 * 1024));//1
pipeline.addLast(new CmdHandler()); //2
}
public static final class Cmd { //3
private final ByteBuf name;
private final ByteBuf args;
public Cmd(ByteBuf name, ByteBuf args) {
this.name = name;
this.args = args;
}
public ByteBuf name() {
return name;
}
public ByteBuf args() {
return args;
}
}
public static final class CmdDecoder extends LineBasedFrameDecoder {
public CmdDecoder(int maxLength) {
super(maxLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, buffer); //4
if (frame == null) {
return null; //5
}
int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), (byte) ' '); //6
return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index +1, frame.writerIndex())); //7
}
}
public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception {
// Do something with the command //8
}
}
}
- 添加一个 CmdDecoder 到管道;将提取 Cmd 对象和转发到在管道中的下一个处理器
- 添加 CmdHandler 将接收和处理 Cmd 对象
- 命令也是 POJO
- super.decode() 通过结束分隔从 ByteBuf 提取帧
- frame 是空时,则返回 null
- 找到第一个空字符的索引。首先是它的命令名;接下来是参数的顺序
- 从帧先于索引以及它之后的片段中实例化一个新的 Cmd 对象
- 处理通过管道的 Cmd 对象
基于长度的协议
基于长度的协议协议在帧头文件里定义了一个帧编码的长度,而不是结束位置用一个特殊的分隔符来标记。表8.6列出了 Netty 提供的两个解码器,用于处理这种类型的协议。
Table 8.6 Decoders for length-based protocols
名称 | 描述 |
---|---|
FixedLengthFrameDecoder | 提取固定长度 |
LengthFieldBasedFrameDecoder | 读取头部长度并提取帧的长度 |
如下图所示,FixedLengthFrameDecoder 的操作是提取固定长度每帧8字节
- 字节流 stream
- 4个帧,每个帧8个字节
大部分时候帧的大小被编码在头部,这种情况可以使用LengthFieldBasedFrameDecoder,它会读取头部长度并提取帧的长度。下图显示了它是如何工作的:
- 长度 “0x000C” (12) 被编码在帧的前两个字节
- 后面的12个字节就是内容
- 提取没有头文件的帧内容
Figure 8.7 Message that has frame size encoded in the header
LengthFieldBasedFrameDecoder 提供了几个构造函数覆盖各种各样的头长字段配置情况。清单8.10显示了使用三个参数的构造函数是maxFrameLength,lengthFieldOffset lengthFieldLength。在这 情况下,帧的长度被编码在帧的前8个字节。
Listing 8.10 Decoder for the command and the handler
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(65 * 1024)); //1
pipeline.addLast(new FrameHandler()); //2
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// Do something with the frame //3
}
}
}
- 添加一个 LengthFieldBasedFrameDecoder ,用于提取基于帧编码长度8个字节的帧。
- 添加一个 FrameHandler 用来处理每帧
- 处理帧数据
总而言之,本部分探讨了 Netty 提供的编解码器支持协议,包括定义特定的分隔符的字节流的结构或协议帧的长度。这些编解码器非常有用。
写大型数据
由于网络的原因,如何有效的写大数据在异步框架是一个特殊的问题。因为写操作是非阻塞的,即便是在数据不能写出时,只是通知 ChannelFuture 完成了。当这种情况发生时,你必须停止写操作或面临内存耗尽的风险。所以写时,会产生大量的数据,我们需要做好准备来处理的这种情况下的缓慢的连接远端导致延迟释放内存的问题你。作为一个例子让我们考虑写一个文件的内容到网络。
在我们的讨论传输(见4.2节)时,我们提到了 NIO 的“zero-copy(零拷贝)”功能,消除移动一个文件的内容从文件系统到网络堆栈的复制步骤。所有这一切发生在 Netty 的核心,因此所有所需的应用程序代码是使用 interface FileRegion 的实现,在 Netty 的API 文档中定义如下为一个通过 Channel 支持 zero-copy 文件传输的文件区域。
下面演示了通过 zero-copy 将文件内容从 FileInputStream 创建 DefaultFileRegion 并写入 使用 Channel
Listing 8.11 Transferring file contents with FileRegion
FileInputStream in = new FileInputStream(file); //1
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length()); //2
channel.writeAndFlush(region).addListener(new ChannelFutureListener() { //3
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
Throwable cause = future.cause(); //4
// Do something
}
}
});
- 获取 FileInputStream
- 创建一个新的 DefaultFileRegion 用于文件的完整长度
- 发送 DefaultFileRegion 并且注册一个 ChannelFutureListener
- 处理发送失败
只是看到的例子只适用于直接传输一个文件的内容,没有执行的数据应用程序的处理。在相反的情况下,将数据从文件系统复制到用户内存是必需的,您可以使用 ChunkedWriteHandler。这个类提供了支持异步写大数据流不引起高内存消耗。
这个关键是 interface ChunkedInput,实现如下:
名称 | 描述 |
---|---|
ChunkedFile | 当你使用平台不支持 zero-copy 或者你需要转换数据,从文件中一块一块的获取数据 |
ChunkedNioFile | 与 ChunkedFile 类似,处理使用了NIOFileChannel |
ChunkedStream | 从 InputStream 中一块一块的转移内容 |
ChunkedNioStream | 从 ReadableByteChannel 中一块一块的转移内容 |
清单 8.12 演示了使用 ChunkedStream,实现在实践中最常用。 所示的类被实例化一个 File 和一个 SslContext。当 initChannel() 被调用来初始化显示的处理程序链的通道。
当通道激活时,WriteStreamHandler 从文件一块一块的写入数据作为ChunkedStream。最后将数据通过 SslHandler 加密后传播。
Listing 8.12 Transfer file content with FileRegion
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
private final File file;
private final SslContext sslCtx;
public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
this.file = file;
this.sslCtx = sslCtx;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SslHandler(sslCtx.createEngine()); //1
pipeline.addLast(new ChunkedWriteHandler());//2
pipeline.addLast(new WriteStreamHandler());//3
}
public final class WriteStreamHandler extends ChannelInboundHandlerAdapter { //4
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
}
}
}
- 添加 SslHandler 到 ChannelPipeline.
- 添加 ChunkedWriteHandler 用来处理作为 ChunkedInput 传进的数据
- 当连接建立时,WriteStreamHandler 开始写文件的内容
- 当连接建立时,channelActive() 触发使用 ChunkedInput 来写文件的内容 (插图显示了 FileInputStream;也可以使用任何 InputStream )
ChunkedInput 所有被要求使用自己的 ChunkedInput 实现,是安装ChunkedWriteHandler 在管道中
在本节中,我们讨论
- 如何采用zero-copy(零拷贝)功能高效地传输文件
- 如何使用 ChunkedWriteHandler 编写大型数据而避免 OutOfMemoryErrors 错误。
在下一节中我们将研究几种不同方法来序列化 POJO。
序列化数据
JDK 提供了 ObjectOutputStream 和 ObjectInputStream 通过网络将原始数据类型和 POJO 进行序列化和反序列化。API并不复杂,可以应用到任何对象,支持 java.io.Serializable 接口。但它也不是非常高效的。在本节中,我们将看到 Netty 所提供的。
JDK 序列化
如果程序与端对端间的交互是使用 ObjectOutputStream 和 ObjectInputStream,并且主要面临的问题是兼容性,那么, JDK 序列化 是不错的选择。
表8.8列出了序列化类,Netty 提供了与 JDK 的互操作。
Table 8.8 JDK Serialization codecs
名称 | 描述 |
---|---|
CompatibleObjectDecoder | 该解码器使用 JDK 序列化,用于与非 Netty 进行互操作。 |
CompatibleObjectEncoder | 该编码器使用 JDK 序列化,用于与非 Netty 进行互操作。 |
ObjectDecoder | 基于 JDK 序列化来使用自定义序列化解码。外部依赖被排除在外时,提供了一个速度提升。否则选择其他序列化实现 |
ObjectEncoder | 基于 JDK 序列化来使用自定义序列化编码。外部依赖被排除在外时,提供了一个速度提升。否则选择其他序列化实现 |
JBoss Marshalling 序列化
如果可以使用外部依赖 JBoss Marshalling 是个明智的选择。比 JDK 序列化快3倍且更加简练。
JBoss Marshalling 是另一个序列化 API,修复的许多 JDK序列化 API 中发现的问题,它与 java.io.Serializable 完全兼容。并添加了一些新的可调参数和附加功能,所有这些都可插入通过工厂配置外部化,类/实例查找表,类决议,对象替换,等等)
下表展示了 Netty 支持 JBoss Marshalling 的编解码器。
Table 8.9 JBoss Marshalling codecs
名称 | 描述 |
---|---|
CompatibleMarshallingDecoder | 为了与使用 JDK 序列化的端对端间兼容。 |
CompatibleMarshallingEncoder | 为了与使用 JDK 序列化的端对端间兼容。 |
MarshallingDecoder | 使用自定义序列化用于解码,必须使用 |
MarshallingEncoder MarshallingEncoder | 使用自定义序列化用于编码,必须使用 MarshallingDecoder
下面展示了使用 MarshallingDecoder 和 MarshallingEncoder
Listing 8.13 Using JBoss Marshalling
public class MarshallingInitializer extends ChannelInitializer<Channel> {
private final MarshallerProvider marshallerProvider;
private final UnmarshallerProvider unmarshallerProvider;
public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider,
MarshallerProvider marshallerProvider) {
this.marshallerProvider = marshallerProvider;
this.unmarshallerProvider = unmarshallerProvider;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
pipeline.addLast(new MarshallingEncoder(marshallerProvider));
pipeline.addLast(new ObjectHandler());
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) throws Exception {
// Do something
}
}
}
ProtoBuf 序列化
ProtoBuf 来自谷歌,并且开源了。它使编解码数据更加紧凑和高效。它已经绑定各种编程语言,使它适合跨语言项目。
下表展示了 Netty 支持 ProtoBuf 的 ChannelHandler 实现。
Table 8.10 ProtoBuf codec
名称 | 描述 |
---|---|
ProtobufDecoder | 使用 ProtoBuf 来解码消息 |
ProtobufEncoder | 使用 ProtoBuf 来编码消息 |
ProtobufVarint32FrameDecoder | 在消息的整型长度域中,通过 “Base 128 Varints“将接收到的 ByteBuf 动态的分割 |
用法见下面
Listing 8.14 Using Google Protobuf
public class ProtoBufInitializer extends ChannelInitializer<Channel> {
private final MessageLite lite;
public ProtoBufInitializer(MessageLite lite) {
this.lite = lite;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufDecoder(lite));
pipeline.addLast(new ObjectHandler());
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// Do something with the object
}
}
}
- 添加 ProtobufVarint32FrameDecoder 用来分割帧
- 添加 ProtobufEncoder 用来处理消息的编码
- 添加 ProtobufDecoder 用来处理消息的解码
- 添加 ObjectHandler 用来处理解码了的消息
本章在这最后一节中,我们探讨了 Netty 支持的不同的序列化的专门的解码器和编码器。这些是标准 JDK 序列化 API,JBoss Marshalling 和谷歌ProtoBuf。
总结
Netty 的提供了编解码器和处理程序,可以组合和扩展来实现一个非常广泛的处理场景。此外,他们在许多大型系统被证明是健壮的组件。 请注意我们只介绍最常见的例子。API文档提供完整的描述。
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.