CH13-UDP 广播
本章介绍
- UDP 介绍
- ChannelHandler, Decoder, 和 Encoder
- 引导基于 Netty 的应用
前面的章节都是在示例中使用 TCP 协议,这一章,我们将使用UDP。UDP是一种无连接协议,若需要很高的性能和对数据的完成性没有严格要求,那使用 UDP 是一个很好的方法。最著名的基于UDP协议的是用来域名解析的DNS。这一章将给你一个好的理解的无连接协议所以你能够做出明智的决定何时使用 UDP 在您的应用程序。
我们将首先从一个 UDP 的概述,其特点和局限性开始讲解。之后,我们将在本章描述了示例应用程序的开发。
UDP 基础
面向连接的传输协议(如TCP)管理建立一个两个网络端点之间调用(或“连接”),命令和可靠的消息传输在调用的生命周期期间,最后有序在调用终止时终止。与此相反,在这样一个无连接协议 UDP 没有持久连接的概念,每个消息(UDP 数据报)是一个独立的传播。
此外,UDP 没有 TCP 的纠错机制,其中每个对等承认它接收的数据包并由发送方传送包。
以此类推,一个 TCP 连接就像一个电话交谈,一系列的命令消息流在两个方向上。UDP,另一方面,就像把一堆明信片丢进信箱。我们不能知道他们到达目的地的顺序,以及他们是否能够到达。
虽然 UDP 存在某些方面的的局限性,这也解释了为什么它是如此远远快于TCP:所有的握手和消息管理的开销已被消灭。显然,UDP 是一种只适合应用程序可以处理或容忍丢失消息,而不是例如处理金钱交易。
UDP 广播
我们所有的例子这一点利用传输方式称为“单播”:“将消息发送给一个网络拥有唯一地址的目的地”,这种模式支持连接和无连接协议。
然而,UDP 提供了额外的传输模式对多个接收者发送消息:
- 多播:传送给一组主机
- 广播:传送到网络上的所有主机(或子网)
示例应用程序在本章将说明使用 UDP 广播发送消息,可以接收到所有主机在同一网络。为此我们将使用特殊的“有限广播”或“零”网络地址255.255.255.255。消息发送到这个地址是规定要在本地网络(0.0.0.0)的所有主机和从不转发到其他网络通过路由器。
下一节将讨论示例应用程序的设计。
UDP 示例
我们的示例应用程序将打开一个文件,将每一行作为消息通过 UDP 发到指定的端口。如果你熟悉类 UNIX 操作系统,可以认为这是一个非常标准的简化版本 “syslog(系统日志)”。“UDP ,是一个完美的适合这样的应用程序,因为偶尔丢失一行日志文件可以被容忍,因为文件本身存储在文件系统中。此外,应用程序提供了非常有价值的能力有效地处理大量的数据。
UDP 广播使添加新事件“监视器”接收日志消息一样简单开始一个指定的端口上侦听器程序。然而,这种轻松的访问也提出了一个潜在的安全问题,指出为什么 UD P广播往往是在安全的环境中使用。还要注意广播消息可能只能在本地网络,因为路由器经常阻止他们。
Publish/Subscribe(发布/订阅)
应用程序,如 syslog 通常归类为“发布/订阅”;生产者或服务发布事件和多个订阅者可以收到它们。
整体看下这个应用,如下图:
- 应用监听新文件内容
- 事件通过 UDP 广播
- 事件监视器监听并显示内容
Figure 13.1 Application overview
应用程序有两个组件:广播器和监视器或(可能有多个实例)。为了简单起见我们不会添加身份验证、验证、加密。
在下一节中我们将开始探索实现中,我们还将讨论 UDP 和 TCP 应用程序开发之间的差异。
EventLog 的 POJO
在消息应用里面,数据一般以 POJO 形式呈现。这可能保存配置或处理信息除了实际的消息数据。在这个应用程序里,消息的单元是一个“事件”。由于数据来自一个日志文件,我们将称之为 LogEvent。
清单13.1显示了这个简单的POJO的细节。
Listing 13.1 LogEvent message
public final class LogEvent {
public static final byte SEPARATOR = (byte) ':';
private final InetSocketAddress source;
private final String logfile;
private final String msg;
private final long received;
public LogEvent(String logfile, String msg) { //1
this(null, -1, logfile, msg);
}
public LogEvent(InetSocketAddress source, long received, String logfile, String msg) { //2
this.source = source;
this.logfile = logfile;
this.msg = msg;
this.received = received;
}
public InetSocketAddress getSource() { //3
return source;
}
public String getLogfile() { //4
return logfile;
}
public String getMsg() { //5
return msg;
}
public long getReceivedTimestamp() { //6
return received;
}
}
- 构造器用于出站消息
- 构造器用于入站消息
- 返回发送 LogEvent 的 InetSocketAddress 的资源
- 返回用于发送 LogEvent 的日志文件的名称
- 返回消息的内容
- 返回 LogEvent 接收到的时间
写广播器
本节,我们将写一个广播器。下图展示了广播一个 DatagramPacket 在每个日志实体里面的方法
- 日志文件
- 日志文件中的日志实体
- 一个 DatagramPacket 保持一个单独的日志实体
Figure 13.2 Log entries sent with DatagramPackets
图13.3表示一个 LogEventBroadcaster 的 ChannelPipeline 的高级视图,说明了 LogEvent 是如何流转的。
Figure 13.3 LogEventBroadcaster: ChannelPipeline and LogEvent flow
正如我们所看到的,所有的数据传输都封装在 LogEvent 消息里。LogEventBroadcaster 写这些通过在本地端的管道,发送它们通过ChannelPipeline 转换(编码)为一个定制的 ChannelHandler 的DatagramPacket 信息。最后,他们通过 UDP 广播并被远程接收。
编码器和解码器
编码器和解码器将消息从一种格式转换为另一种,深度探讨在第7章中进行。我们探索 Netty 提供的基础类来简化和实现自定义 ChannelHandler 如 LogEventEncoder 在这个应用程序中。
下面展示了 编码器的实现
Listing 13.2 LogEventEncoder
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
private final InetSocketAddress remoteAddress;
public LogEventEncoder(InetSocketAddress remoteAddress) { //1
this.remoteAddress = remoteAddress;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, LogEvent logEvent, List<Object> out) throws Exception {
byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8); //2
byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
ByteBuf buf = channelHandlerContext.alloc().buffer(file.length + msg.length + 1);
buf.writeBytes(file);
buf.writeByte(LogEvent.SEPARATOR); //3
buf.writeBytes(msg); //4
out.add(new DatagramPacket(buf, remoteAddress)); //5
}
}
- LogEventEncoder 创建了 DatagramPacket 消息类发送到指定的 InetSocketAddress
- 写文件名到 ByteBuf
- 添加一个 SEPARATOR
- 写一个日志消息到 ByteBuf
- 添加新的 DatagramPacket 到出站消息
为什么使用 MessageToMessageEncoder?
当然我们可以编写自己的自定义 ChannelOutboundHandler 来转换 LogEvent 对象到 DatagramPackets。但是继承自MessageToMessageEncoder 为我们简化和做了大部分的工作。
为了实现 LogEventEncoder,我们只需要定义服务器的运行时配置,我们称之为“bootstrapping(引导)”。这包括设置各种 ChannelOption 并安装需要的 ChannelHandler 到 ChannelPipeline 中。完成的 LogEventBroadcaster 类,如清单13.3所示。
Listing 13.3 LogEventBroadcaster
public class LogEventBroadcaster {
private final Bootstrap bootstrap;
private final File file;
private final EventLoopGroup group;
public LogEventBroadcaster(InetSocketAddress address, File file) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new LogEventEncoder(address)); //1
this.file = file;
}
public void run() throws IOException {
Channel ch = bootstrap.bind(0).syncUninterruptibly().channel(); //2
System.out.println("LogEventBroadcaster running");
long pointer = 0;
for (;;) {
long len = file.length();
if (len < pointer) {
// file was reset
pointer = len; //3
} else if (len > pointer) {
// Content was added
RandomAccessFile raf = new RandomAccessFile(file, "r");
raf.seek(pointer); //4
String line;
while ((line = raf.readLine()) != null) {
ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath(), line)); //5
}
pointer = raf.getFilePointer(); //6
raf.close();
}
try {
Thread.sleep(1000); //7
} catch (InterruptedException e) {
Thread.interrupted();
break;
}
}
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new IllegalArgumentException();
}
LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255",
Integer.parseInt(args[0])), new File(args[1])); //8
try {
broadcaster.run();
} finally {
broadcaster.stop();
}
}
}
- 引导 NioDatagramChannel 。为了使用广播,我们设置 SO_BROADCAST 的 socket 选项
- 绑定管道。注意当使用 Datagram Channel 时,是没有连接的
- 如果需要,可以设置文件的指针指向文件的最后字节
- 设置当前文件的指针,这样不会把旧的发出去
- 写一个 LogEvent 到管道用于保存文件名和文件实体。(我们期望每个日志实体是一行长度)
- 存储当前文件的位置,这样,我们可以稍后继续
- 睡 1 秒。如果其他中断退出循环就重新启动它。
- 构造一个新的实例 LogEventBroadcaster 并启动它
这就是程序的完整的第一部分。可以使用 “netcat” 程序查看程序的结果。在 UNIX/Linux 系统,可以使用 “nc”, 在 Windows 环境下,可以在 http://nmap.org/ncat找到
Netcat 是完美的第一个测试我们的应用程序;它只是监听指定的端口上接收并打印所有数据到标准输出。将其设置为在端口 9999 上监听 UDP 数据如下:
$ nc -l -u 9999
现在我们需要启动 LogEventBroadcaster。清单13.4显示了如何使用 mvn 编译和运行广播器。pom的配置。pom.xml 配置指向一个文件/var/log/syslog
(假设是UNIX / Linux环境)和端口设置为 9999。文件中的条目将通过 UDP 广播到端口,在你开始 netcat 后打印到控制台。
Listing 13.4 Compile and start the LogEventBroadcaster
$ mvn clean package exec:exec -Pchapter13-LogEventBroadcaster
[INFO] Scanning for projects...
[INFO]
[INFO] --------------------------------------------------------------------
[INFO] Building netty-in-action 0.1-SNAPSHOT
[INFO] --------------------------------------------------------------------
...
...
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---
[INFO] Building jar: /Users/norman/Documents/workspace-intellij/netty-in-actionprivate/
target/netty-in-action-0.1-SNAPSHOT.jar
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action -
LogEventBroadcaster running
当调用 mvn 时,在系统属性中改变文件和端口值,指定你想要的。清单13.5 设置日志文件 到 /var/log/mail.log
和端口 8888。
Listing 13.5 Compile and start the LogEventBroadcaster
$ mvn clean package exec:exec -Pchapter13-LogEventBroadcaster /
-Dlogfile=/var/log/mail.log -Dport=8888 -....
....
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action -
LogEventBroadcaster running
当看到 “LogEventBroadcaster running” 说明程序运行成功了。
netcat 只用于测试,但不适合生产环境中使用。
写监视器
这一节我们编写一个监视器:EventLogMonitor ,也就是用来接收事件的程序,用来代替 netcat 。EventLogMonitor 做下面事情:
- 接收 LogEventBroadcaster 广播的 UDP DatagramPacket
- 解码 LogEvent 消息
- 输出 LogEvent 消息
和之前一样,将实现自定义 ChannelHandler 的逻辑。图13.4描述了LogEventMonitor 的 ChannelPipeline 并表明了 LogEvent 的流经情况。
Figure 13.4 LogEventMonitor
图中显示我们的两个自定义 ChannelHandlers,LogEventDecoder 和 LogEventHandler。首先是负责将网络上接收到的 DatagramPacket 解码到 LogEvent 消息。清单13.6显示了实现。
Listing 13.6 LogEventDecoder
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
ByteBuf data = datagramPacket.content(); //1
int i = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR); //2
String filename = data.slice(0, i).toString(CharsetUtil.UTF_8); //3
String logMsg = data.slice(i + 1, data.readableBytes()).toString(CharsetUtil.UTF_8); //4
LogEvent event = new LogEvent(datagramPacket.recipient(), System.currentTimeMillis(),
filename,logMsg); //5
out.add(event);
}
}
- 获取 DatagramPacket 中数据的引用
- 获取 SEPARATOR 的索引
- 从数据中读取文件名
- 读取数据中的日志消息
- 构造新的 LogEvent 对象并将其添加到列表中
第二个 ChannelHandler 将执行一些首先创建的 LogEvent 消息。在这种情况下,我们只会写入 system.out。在真实的应用程序可能用到一个单独的日志文件或放到数据库。
下面的清单显示了 LogEventHandler。
Listing 13.7 LogEventHandler
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { //1
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace(); //2
ctx.close();
}
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, LogEvent event) throws Exception {
StringBuilder builder = new StringBuilder(); //3
builder.append(event.getReceivedTimestamp());
builder.append(" [");
builder.append(event.getSource().toString());
builder.append("] [");
builder.append(event.getLogfile());
builder.append("] : ");
builder.append(event.getMsg());
System.out.println(builder.toString()); //4
}
}
- 继承 SimpleChannelInboundHandler 用于处理 LogEvent 消息
- 在异常时,输出消息并关闭 channel
- 建立一个 StringBuilder 并构建输出
- 打印出 LogEvent 的数据
LogEventHandler 打印出 LogEvent 的一个易读的格式,包括以下:
- 收到时间戳以毫秒为单位
- 发送方的 InetSocketAddress,包括IP地址和端口
- LogEvent 生成绝对文件名
- 实际的日志消息,代表在日志文件中一行
现在我们需要安装处理程序到 ChannelPipeline ,如图13.4所示。下一个清单显示了这是如何实现 LogEventMonitor 类的一部分。
Listing 13.8 LogEventMonitor
public class LogEventMonitor {
private final Bootstrap bootstrap;
private final EventLoopGroup group;
public LogEventMonitor(InetSocketAddress address) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group) //1
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LogEventDecoder()); //2
pipeline.addLast(new LogEventHandler());
}
}).localAddress(address);
}
public Channel bind() {
return bootstrap.bind().syncUninterruptibly().channel(); //3
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
}
LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0]))); //4
try {
Channel channel = monitor.bind();
System.out.println("LogEventMonitor running");
channel.closeFuture().await();
} finally {
monitor.stop();
}
}
}
- 引导 NioDatagramChannel。设置 SO_BROADCAST socket 选项。
- 添加 ChannelHandler 到 ChannelPipeline
- 绑定的通道。注意,在使用 DatagramChannel 是没有连接,因为这些 无连接
- 构建一个新的 LogEventMonitor
运行 LogEventBroadcaster 和 LogEventMonitor
如上所述,我们将使用 Maven 来运行应用程序。这一次你需要打开两个控制台窗口给每个项目。用 Ctrl-C 可以停止它。
首先我们将启动 LogEventBroadcaster 如清单13.4所示,除了已经构建项目以下命令即可(使用默认值):
$ mvn exec:exec -Pchapter13-LogEventBroadcaster
和之前一样,这将通过 UDP 广播日志消息。
现在,在一个新窗口,构建和启动 LogEventMonitor 接收和显示广播消息。
Listing 13.9 Compile and start the LogEventBroadcaster
$ mvn clean package exec:exec -Pchapter13-LogEventMonitor
[INFO] Scanning for projects...
[INFO]
[INFO] --------------------------------------------------------------------
[INFO] Building netty-in-action 0.1-SNAPSHOT
[INFO] --------------------------------------------------------------------
...
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ netty-in-action ---
[INFO] Building jar: /Users/norman/Documents/workspace-intellij/netty-in-actionprivate/
target/netty-in-action-0.1-SNAPSHOT.jar
[INFO]
[INFO] --- exec-maven-plugin:1.2.1:exec (default-cli) @ netty-in-action ---
LogEventMonitor running
当看到 “LogEventMonitor running” 说明程序运行成功了。
控制台将显示任何事件被添加到日志文件中,如下所示。消息的格式是由LogEventHandler 创建。
Listing 13.10 LogEventMonitor output
1364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:55:08 dev-linux
dhclient: DHCPREQUEST of 192.168.0.50 on eth2 to 192.168.0.254 port 67
1364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:55:08 dev-linux
dhclient: DHCPACK of 192.168.0.50 from 192.168.0.254
1364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:55:08 dev-linux
dhclient: bound to 192.168.0.50 -- renewal in 270 seconds.
1364217299382 [/192.168.0.38:63182] [[/var/log/messages] : Mar 25 13:59:38 dev-linux
dhclient: DHCPREQUEST of 192.168.0.50 on eth2 to 192.168.0.254 port 67
1364217299382 [/192.168.0.38:63182] [/[/var/log/messages] : Mar 25 13:59:38 dev-linux
dhclient: DHCPACK of 192.168.0.50 from 192.168.0.254
1364217299382 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 13:59:38 dev-linux
dhclient: bound to 192.168.0.50 -- renewal in 259 seconds.
1364217299383 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 14:03:57 dev-linux
dhclient: DHCPREQUEST of 192.168.0.50 on eth2 to 192.168.0.254 port 67
1364217299383 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 14:03:57 dev-linux
dhclient: DHCPACK of 192.168.0.50 from 192.168.0.254
1364217299383 [/192.168.0.38:63182] [/var/log/messages] : Mar 25 14:03:57 dev-linux
dhclient: bound to 192.168.0.50 -- renewal in 285 seconds.
若你没有访问 UNIX syslog 的权限,可以创建 自定义的文件,手动填入内容。下面是 UNIX 命令用 touch 创建一个空文件
$ touch ~/mylog.log
再次启动 LogEventBroadcaster,设置系统属性
$ mvn exec:exec -Pchapter13-LogEventBroadcaster -Dlogfile=~/mylog.log
当 LogEventBroadcaster 运行时,你可以手动的添加消息到文件来查看广播到 LogEventMonitor 控制台的内容。使用 echo 和输出的文件
$ echo ’Test log entry’ >> ~/mylog.log
你可以启动任意个监视器实例,他们都会收到相同的消息。
总结
本章提供了一个无连接的传输协议,如UDP的介绍。我们看到,在 Netty的您可以从 TCP 切换到 UDP 的同时使用相同的 API。您还了解了如何通过专门的 ChannelHandler 来组织处理逻辑。我们通过独立的解码器的逻辑来处理消息对象。
在下一章中我们将探讨用 Netty 实现可重用的编解码器。
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.