CH14-自定义编解码器
本章介绍:
- Decoder
- Encoder
- 单元测试
本章讲述 Netty 中如何轻松实现定制的编解码器,由于 Netty 架构的灵活性,这些编解码器易于重用和测试。为了更容易实现,使用 Memcached 作为协议例子是因为它更方便我们实现。
Memcached 是来自 Memcached.org 的免费开源、高性能、分布式的内存对象缓存系统,其目的是加速动态 Web 应用程序的响应,减轻数据库负载;Memcache 实际上是一个以 key-value 存储任意数据的内存小块。可能有人会问“为什么使用 Memcached?”,因为 Memcached 协议非常简单,便于讲解。
编解码器的范围
我们将只实现 Memcached 协议的一个子集,这足够我们进行添加、检索、删除对象;在 Memcached 中是通过执行 SET,GET,DELETE 命令来实现的。Memcached 支持很多其他的命令,但我们只使用其中三个命令,简单的东西,我们才会理解的更清楚。
Memcached 有一个二进制和纯文本协议,它们都可以用来与 Memcached 服务器通信,使用什么类型的协议取决于服务器支持哪些协议。本章主要关注实现二进制协议,因为二进制在网络编程中最常用。
实现 Memcached 编解码器
当想要实现一个给定协议的编解码器,我们应该花一些事件来了解它的运作原理。通常情况下,协议本身都有一些详细的记录。在这里你会发现多少细节?幸运的是 Memcached 的二进制协议可以很好的扩展。 在 RFC 中有相应的规范,可以在 https://code.google.com/p/Memcached/wiki/MemcacheBinaryProtocol 找到 。
我们不会实现 Memcached 的所有命令,只会实现三种操作:SET,GET 和 DELETE。这样做事为了让事情变得简单。
Memcached 二进制协议
我们说,要实现 Memcached 的 GET, SET, 和 DELETE 操作。我们仅仅关注这些,但 memcached 协议有一个通用的结构,只有少数参数改变为了改变一个请求或响应的意义。这意味着您可以轻松地扩展实现添加其他命令。一般协议有 24 字节头用于请求和响应。这个头可以分解如下表14.1中。
Table 14.1 Sample Memcached header byte structure
Field | Byte offset | Value |
---|---|---|
Magic | 0 | 0x80 用于请求 0x81 用于响应 |
OpCode | 1 | 0x01…0x1A |
Key length | 2 和 3 | 1…32,767 |
Extra length | 4 | 0x00, x04, 或 0x08 |
Data type | 5 | 0x00 |
Reserved | 6 和 7 | 0x00 |
Total body length | 8-11 | 所有 body 的长度 |
Opaque | 12-15 | 任何带带符号的 32-bit 整数; 这个也包含在响应中,因此更容易将请求映射到响应。 |
CAS | 16-23 | 数据版本检查 |
注意每个部分使用的字节数。这告诉你接下来你应该用什么数据类型。例如,如果字节的偏移量只是 byte 0,那么旧使用一个 Java byte来表示它;如果它是6和7(2字节),你使用一个Java short;如果它是 12-15(4字节),你使用一个Java int,等等。
- 请求(只有显示头)
- 响应
Figure 14.2 Real-world Memcached request and response headers
在图14.2中,高亮显示的第一部分代表请求打到 Memcached (只显示请求头),在这种情况下是告诉 Memcached 来 SET 键是“a”而值是“abc”。第部分是响应。
突出显示的部分中的每一行代表4个字节;因为有6行,这意味着请求头是由24个字节,正如我们之前说的。回顾表14.1中,您可以头在一个真正的请求中看到头文件中的信息。现在,这是所有你需要知道的关于 Memcached 二进制协议。在下一节中,我们需要看看多么我们可以开始制作 Netty 这些请求。
Netty 编码器和解码器
Netty 的是一个复杂和先进的框架,但它并不玄幻。当我们请求一些设置了 key 的给定值时,我们知道 Request 类的一个实例被创建来代表这个请求。但 Netty 并不知道 Request 对象是如何转成 Memcached 所期望的。Memcached 所期望的是字节序列;忽略使用的协议,数据在网络上传输永远是字节序列。
将 Request 对象转为 Memcached 所需的字节序列,Netty 需要用 MemcachedRequest 来编码成另外一种格式。这里所说的另外一种格式不单单是从对象转为字节,也可以是从对象转为对象,或者是从对象转为字符串等。编码器的内容可以详见第七章。
Netty 提供了一个抽象类称为 MessageToByteEncoder。它提供了一个抽象方法,将一条消息(在本例中我们 MemcachedRequest 对象)转为字节。你显示什么信息实现通过使用 Java 泛型可以处理;例如 , MessageToByteEncoder 说这个编码器要编码的对象类型是 MemcachedRequest
MessageToByteEncoder 和 Java 泛型
使用 MessageToByteEncoder 可以绑定特定的参数类型。如果你有多个不同的消息类型,在相同的编码器里,也可以使用MessageToByteEncoder,注意检查消息的类型即可
这也适用于解码器,除了解码器将一系列字节转换回一个对象。 这个 Netty 的提供了 ByteToMessageDecoder 类,而不是提供一个编码方法用来实现解码。在接下来的两个部分你看看如何实现一个 Memcached 解码器和编码器。在你做之前,应该意识到在使用 Netty 时,你不总是需要自己提供编码器和解码器。自所以现在这么做是因为 Netty 没有对 Memcached 内置支持。而 HTTP 以及其他标准的协议,Netty 已经是提供的了。
编码器和解码器
记住,编码器处理出站,而解码器处理入站。这基本上意味着编码器将编码数据,写入远端。解码器将从远端读取处理数据。重要的是要记住,出站和入站是两个不同的方向。
请注意,为了程序简单,我们的编码器和解码器不检查任何值的最大大小。在实际实现中你需要一些验证检查,如果检测到违反协议,则使用 EncoderException 或 DecoderException(或一个子类)。
实现 Memcached 编码器
本节我们将简要介绍编码器的实现。正如我们提到的,编码器负责编码消息为字节序列。这些字节可以通过网络发送到远端。为了发送请求,我们首先创建 MemcachedRequest 类,稍后编码器实现会编码为一系列字节。下面的清单显示了我们的 MemcachedRequest 类
Listing 14.1 Implementation of a Memcached request
public class MemcachedRequest { //1
private static final Random rand = new Random();
private final int magic = 0x80;//fixed so hard coded
private final byte opCode; //the operation e.g. set or get
private final String key; //the key to delete, get or set
private final int flags = 0xdeadbeef; //random
private final int expires; //0 = item never expires
private final String body; //if opCode is set, the value
private final int id = rand.nextInt(); //Opaque
private final long cas = 0; //data version check...not used
private final boolean hasExtras; //not all ops have extras
public MemcachedRequest(byte opcode, String key, String value) {
this.opCode = opcode;
this.key = key;
this.body = value == null ? "" : value;
this.expires = 0;
//only set command has extras in our example
hasExtras = opcode == Opcode.SET;
}
public MemcachedRequest(byte opCode, String key) {
this(opCode, key, null);
}
public int magic() { //2
return magic;
}
public int opCode() { //3
return opCode;
}
public String key() { //4
return key;
}
public int flags() { //5
return flags;
}
public int expires() { //6
return expires;
}
public String body() { //7
return body;
}
public int id() { //8
return id;
}
public long cas() { //9
return cas;
}
public boolean hasExtras() { //10
return hasExtras;
}
}
- 这个类将会发送请求到 Memcached server
- 幻数,它可以用来标记文件或者协议的格式
- opCode,反应了响应的操作已经创建了
- 执行操作的 key
- 使用的额外的 flag
- 表明到期时间
- body
- 请求的 id。这个id将在响应中回显。
- compare-and-check 的值
- 如果有额外的使用,将返回 true
你如果想实现 Memcached 的其余部分协议,你只需要将 client.op*(op* 任何新的操作添加)转换为其中一个方法请求。我们需要两个更多的支持类,在下一个清单所示
Listing 14.2 Possible Memcached operation codes and response statuses
public class Status {
public static final short NO_ERROR = 0x0000;
public static final short KEY_NOT_FOUND = 0x0001;
public static final short KEY_EXISTS = 0x0002;
public static final short VALUE_TOO_LARGE = 0x0003;
public static final short INVALID_ARGUMENTS = 0x0004;
public static final short ITEM_NOT_STORED = 0x0005;
public static final short INC_DEC_NON_NUM_VAL = 0x0006;
}
public class Opcode {
public static final byte GET = 0x00;
public static final byte SET = 0x01;
public static final byte DELETE = 0x04;
}
一个 Opcode 告诉 Memcached 要执行哪些操作。每个操作都由一个字节表示。同样的,当 Memcached 响应一个请求,响应头中包含两个字节代表响应状态。状态和 Opcode 类表示这些 Memcached 的构造。这些操作码可以使用当你构建一个新的 MemcachedRequest 指定哪个行动应该由它引发的。
但现在可以集中精力在编码器上:
Listing 14.3 MemcachedRequestEncoder implementation
public class MemcachedRequestEncoder extends
MessageToByteEncoder<MemcachedRequest> { //1
@Override
protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg,
ByteBuf out) throws Exception { //2
byte[] key = msg.key().getBytes(CharsetUtil.UTF_8);
byte[] body = msg.body().getBytes(CharsetUtil.UTF_8);
//total size of the body = key size + content size + extras size //3
int bodySize = key.length + body.length + (msg.hasExtras() ? 8 : 0);
//write magic byte //4
out.writeByte(msg.magic());
//write opcode byte //5
out.writeByte(msg.opCode());
//write key length (2 byte) //6
out.writeShort(key.length); //key length is max 2 bytes i.e. a Java short //7
//write extras length (1 byte)
int extraSize = msg.hasExtras() ? 0x08 : 0x0;
out.writeByte(extraSize);
//byte is the data type, not currently implemented in Memcached but required //8
out.writeByte(0);
//next two bytes are reserved, not currently implemented but are required //9
out.writeShort(0);
//write total body length ( 4 bytes - 32 bit int) //10
out.writeInt(bodySize);
//write opaque ( 4 bytes) - a 32 bit int that is returned in the response //11
out.writeInt(msg.id());
//write CAS ( 8 bytes)
out.writeLong(msg.cas()); //24 byte header finishes with the CAS //12
if (msg.hasExtras()) {
//write extras (flags and expiry, 4 bytes each) - 8 bytes total //13
out.writeInt(msg.flags());
out.writeInt(msg.expires());
}
//write key //14
out.writeBytes(key);
//write value //15
out.writeBytes(body);
}
}
- 该类是负责编码 MemachedRequest 为一系列字节
- 转换的 key 和实际请求的 body 到字节数组
- 计算 body 大小
- 写幻数到 ByteBuf 字节
- 写 opCode 作为字节
- 写 key 长度z作为 short
- 编写额外的长度作为字节
- 写数据类型,这总是0,因为目前不是在 Memcached,但可用于使用 后来的版本
- 为保留字节写为 short ,后面的 Memcached 版本可能使用
- 写 body 的大小作为 long
- 写 opaque 作为 int
- 写 cas 作为 long。这个是头文件的最后部分,在 body 的开始
- 编写额外的 flag 和到期时间为 int
- 写 key
- 这个请求完成后 写 body。
总结,编码器 使用 Netty 的 ByteBuf 处理请求,编码 MemcachedRequest 成一套正确排序的字节。详细步骤为:
- 写幻数字节。
- 写 opcode 字节。
- 写 key 长度(2字节)。
- 写额外的长度(1字节)。
- 写数据类型(1字节)。
- 为保留字节写 null 字节(2字节)。
- 写 body 长度(4字节- 32位整数)。
- 写 opaque(4个字节,一个32位整数在响应中返回)。
- 写 CAS(8个字节)。
- 写 额外的(flag 和 到期,4字节)= 8个字节
- 写 key
- 写 值
无论你放入什么到输出缓冲区( 调用 ByteBuf) Netty 的将向服务器发送被写入请求。下一节将展示如何进行反向通过解码器工作。
实现 Memcached 编码器
将 MemcachedRequest 对象转为 字节序列,Memcached 仅需将字节转到响应对象返回即可。
先见一个 POJO:
Listing 14.7 Implementation of a MemcachedResponse
public class MemcachedResponse { //1
private final byte magic;
private final byte opCode;
private byte dataType;
private final short status;
private final int id;
private final long cas;
private final int flags;
private final int expires;
private final String key;
private final String data;
public MemcachedResponse(byte magic, byte opCode,
byte dataType, short status,
int id, long cas,
int flags, int expires, String key, String data) {
this.magic = magic;
this.opCode = opCode;
this.dataType = dataType;
this.status = status;
this.id = id;
this.cas = cas;
this.flags = flags;
this.expires = expires;
this.key = key;
this.data = data;
}
public byte magic() { //2
return magic;
}
public byte opCode() { //3
return opCode;
}
public byte dataType() { //4
return dataType;
}
public short status() { //5
return status;
}
public int id() { //6
return id;
}
public long cas() { //7
return cas;
}
public int flags() { //8
return flags;
}
public int expires() { //9
return expires;
}
public String key() { //10
return key;
}
public String data() { //11
return data;
}
}
- 该类,代表从 Memcached 服务器返回的响应
- 幻数
- opCode,这反映了创建操作的响应
- 数据类型,这表明这个是基于二进制还是文本
- 响应的状态,这表明如果请求是成功的
- 惟一的 id
- compare-and-set 值
- 使用额外的 flag
- 表示该值存储的一个有效期
- 响应创建的 key
- 实际数据
下面为 MemcachedResponseDecoder, 使用了 ByteToMessageDecoder 基类,用于将 字节序列转为 MemcachedResponse
Listing 14.4 MemcachedResponseDecoder class
public class MemcachedResponseDecoder extends ByteToMessageDecoder { //1
private enum State { //2
Header,
Body
}
private State state = State.Header;
private int totalBodySize;
private byte magic;
private byte opCode;
private short keyLength;
private byte extraLength;
private short status;
private int id;
private long cas;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) {
switch (state) { //3
case Header:
if (in.readableBytes() < 24) {
return;//response header is 24 bytes //4
}
magic = in.readByte(); //5
opCode = in.readByte();
keyLength = in.readShort();
extraLength = in.readByte();
in.skipBytes(1);
status = in.readShort();
totalBodySize = in.readInt();
id = in.readInt(); //referred to in the protocol spec as opaque
cas = in.readLong();
state = State.Body;
case Body:
if (in.readableBytes() < totalBodySize) {
return; //until we have the entire payload return //6
}
int flags = 0, expires = 0;
int actualBodySize = totalBodySize;
if (extraLength > 0) { //7
flags = in.readInt();
actualBodySize -= 4;
}
if (extraLength > 4) { //8
expires = in.readInt();
actualBodySize -= 4;
}
String key = "";
if (keyLength > 0) { //9
ByteBuf keyBytes = in.readBytes(keyLength);
key = keyBytes.toString(CharsetUtil.UTF_8);
actualBodySize -= keyLength;
}
ByteBuf body = in.readBytes(actualBodySize); //10
String data = body.toString(CharsetUtil.UTF_8);
out.add(new MemcachedResponse( //1
magic,
opCode,
status,
id,
cas,
flags,
expires,
key,
data
));
state = State.Header;
}
}
}
- 类负责创建的 MemcachedResponse 读取字节
- 代表当前解析状态,这意味着我们需要解析的头或 body
- 根据解析状态切换
- 如果不是至少24个字节是可读的,它不可能读整个头部,所以返回这里,等待再通知一次数据准备阅读
- 阅读所有头的字段
- 检查是否足够的数据是可读用来读取完整的响应的 body。长度是从头读取
- 检查如果有任何额外的 flag 用于读,如果是这样做
- 检查如果响应包含一个 expire 字段,有就读它
- 检查响应是否包含一个 key ,有就读它
- 读实际的 body 的 payload
- 从前面读取字段和数据构造一个新的 MemachedResponse
所以在实现发生了什么事?我们知道一个 Memcached 响应有24位头;我们不知道是否所有数据,响应将被包含在输入 ByteBuf ,当解码方法调用时。这是因为底层网络堆栈可能将数据分解成块。所以确保我们只解码当我们有足够的数据,这段代码检查是否可用可读的字节的数量至少是24。一旦我们有24个字节,我们可以确定整个消息有多大,因为这个信息包含在24位头。
当我们解码整个消息,我们创建一个 MemcachedResponse 并将其添加到输出列表。任何对象添加到该列表将被转发到下一个ChannelInboundHandler 在 ChannelPipeline,因此允许处理。
测试编解码器
编码器和解码器完成,但仍有一些缺失:测试。
没有测试你只看到如果编解码器工作对一些真正的服务器运行时,这并不是你应该是依靠什么。第十章所示,为一个自定义编写测试 ChannelHandler通常是通过 EmbeddedChannel。
所以这正是现在做测试我们定制的编解码器,其中包括一个编码器和解码器。让重新开始编码器。后面的清单显示了简单的编写单元测试。
Listing 14.5 MemcachedRequestEncoderTest class
public class MemcachedRequestEncoderTest {
@Test
public void testMemcachedRequestEncoder() {
MemcachedRequest request = new MemcachedRequest(Opcode.SET, "key1", "value1"); //1
EmbeddedChannel channel = new EmbeddedChannel(new MemcachedRequestEncoder()); //2
channel.writeOutbound(request); //3
ByteBuf encoded = (ByteBuf) channel.readOutbound();
Assert.assertNotNull(encoded); //4
Assert.assertEquals(request.magic(), encoded.readUnsignedByte()); //5
Assert.assertEquals(request.opCode(), encoded.readByte()); //6
Assert.assertEquals(4, encoded.readShort());//7
Assert.assertEquals((byte) 0x08, encoded.readByte()); //8
Assert.assertEquals((byte) 0, encoded.readByte());//9
Assert.assertEquals(0, encoded.readShort());//10
Assert.assertEquals(4 + 6 + 8, encoded.readInt());//11
Assert.assertEquals(request.id(), encoded.readInt());//12
Assert.assertEquals(request.cas(), encoded.readLong());//13
Assert.assertEquals(request.flags(), encoded.readInt()); //14
Assert.assertEquals(request.expires(), encoded.readInt()); //15
byte[] data = new byte[encoded.readableBytes()]; //16
encoded.readBytes(data);
Assert.assertArrayEquals((request.key() + request.body()).getBytes(CharsetUtil.UTF_8), data);
Assert.assertFalse(encoded.isReadable()); //17
Assert.assertFalse(channel.finish());
Assert.assertNull(channel.readInbound());
}
}
- 新建 MemcachedRequest 用于编码为 ByteBuf
- 新建 EmbeddedChannel 用于保持 MemcachedRequestEncoder 到测试
- 写请求到 channel 并且判断是否产生了编码的消息
- 检查 ByteBuf 是否 null
- 判断 magic 是否正确写入 ByteBuf
- 判断 opCode (SET) 是否写入正确
- 检查 key 是否写入长度正确
- 检查写入的请求是否额外包含
- 检查数据类型是否写
- 检查是否保留数据插入
- 检查 body 的整体大小 计算方式是 key.length + body.length + extras
- 检查是否正确写入 id
- 检查是否正确写入 Compare and Swap (CAS)
- 检查是否正确的 flag
- 检查是否正确设置到期时间的
- 检查 key 和 body 是否正确
- 检查是否可读
Listing 14.6 MemcachedResponseDecoderTest class
public class MemcachedResponseDecoderTest {
@Test
public void testMemcachedResponseDecoder() {
EmbeddedChannel channel = new EmbeddedChannel(new MemcachedResponseDecoder()); //1
byte magic = 1;
byte opCode = Opcode.SET;
byte[] key = "Key1".getBytes(CharsetUtil.US_ASCII);
byte[] body = "Value".getBytes(CharsetUtil.US_ASCII);
int id = (int) System.currentTimeMillis();
long cas = System.currentTimeMillis();
ByteBuf buffer = Unpooled.buffer(); //2
buffer.writeByte(magic);
buffer.writeByte(opCode);
buffer.writeShort(key.length);
buffer.writeByte(0);
buffer.writeByte(0);
buffer.writeShort(Status.KEY_EXISTS);
buffer.writeInt(body.length + key.length);
buffer.writeInt(id);
buffer.writeLong(cas);
buffer.writeBytes(key);
buffer.writeBytes(body);
Assert.assertTrue(channel.writeInbound(buffer)); //3
MemcachedResponse response = (MemcachedResponse) channel.readInbound();
assertResponse(response, magic, opCode, Status.KEY_EXISTS, 0, 0, id, cas, key, body);//4
}
@Test
public void testMemcachedResponseDecoderFragments() {
EmbeddedChannel channel = new EmbeddedChannel(new MemcachedResponseDecoder()); //5
byte magic = 1;
byte opCode = Opcode.SET;
byte[] key = "Key1".getBytes(CharsetUtil.US_ASCII);
byte[] body = "Value".getBytes(CharsetUtil.US_ASCII);
int id = (int) System.currentTimeMillis();
long cas = System.currentTimeMillis();
ByteBuf buffer = Unpooled.buffer(); //6
buffer.writeByte(magic);
buffer.writeByte(opCode);
buffer.writeShort(key.length);
buffer.writeByte(0);
buffer.writeByte(0);
buffer.writeShort(Status.KEY_EXISTS);
buffer.writeInt(body.length + key.length);
buffer.writeInt(id);
buffer.writeLong(cas);
buffer.writeBytes(key);
buffer.writeBytes(body);
ByteBuf fragment1 = buffer.readBytes(8); //7
ByteBuf fragment2 = buffer.readBytes(24);
ByteBuf fragment3 = buffer;
Assert.assertFalse(channel.writeInbound(fragment1)); //8
Assert.assertFalse(channel.writeInbound(fragment2)); //9
Assert.assertTrue(channel.writeInbound(fragment3)); //10
MemcachedResponse response = (MemcachedResponse) channel.readInbound();
assertResponse(response, magic, opCode, Status.KEY_EXISTS, 0, 0, id, cas, key, body);//11
}
private static void assertResponse(MemcachedResponse response, byte magic, byte opCode, short status, int expires, int flags, int id, long cas, byte[] key, byte[] body) {
Assert.assertEquals(magic, response.magic());
Assert.assertArrayEquals(key, response.key().getBytes(CharsetUtil.US_ASCII));
Assert.assertEquals(opCode, response.opCode());
Assert.assertEquals(status, response.status());
Assert.assertEquals(cas, response.cas());
Assert.assertEquals(expires, response.expires());
Assert.assertEquals(flags, response.flags());
Assert.assertArrayEquals(body, response.data().getBytes(CharsetUtil.US_ASCII));
Assert.assertEquals(id, response.id());
}
}
- 新建 EmbeddedChannel ,持有 MemcachedResponseDecoder 到测试
- 创建一个新的 Buffer 并写入数据,与二进制协议的结构相匹配
- 写缓冲区到 EmbeddedChannel 和检查是否一个新的MemcachedResponse 创建由声明返回值
- 判断 MemcachedResponse 和预期的值
- 创建一个新的 EmbeddedChannel 持有 MemcachedResponseDecoder 到测试
- 创建一个新的 Buffer 和写入数据的二进制协议的结构相匹配
- 缓冲分割成三个片段
- 写的第一个片段 EmbeddedChannel 并检查,没有新的MemcachedResponse 创建,因为并不是所有的数据都是准备好了
- 写第二个片段 EmbeddedChannel 和检查,没有新的MemcachedResponse 创建,因为并不是所有的数据都是准备好了
- 写最后一段到 EmbeddedChannel 和检查新的 MemcachedResponse 是否创建,因为我们终于收到所有数据
- 判断 MemcachedResponse 与预期的值
总结
阅读本章后,您应该能够创建自己的编解码器针对你最喜欢的协议。这包括写编码器和解码器,从字节转换为你的 POJO,反之亦然。这一章展示了如何使用一个协议规范实现和提取所需的信息。
它还向您展示了如何编写单元测试完成你的工作的编码器和解码器,确保一切工作如预期而不需要一个完整的 Memcached 服务器运行。这允许轻松集成测试到构建系统的中。
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.