1 - Linux IO/线程 模型

用户空间与内核空间

我们知道现在的操作系统都是采用虚拟存储器,那么对 32 位操作系统来说,它的寻址空间即虚拟存储空间为 4G,2 的 32 次方。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核,保证内核的的安全,操作系统将虚拟内存空间划分为两部分,一部分是内核空间,一部分是用户空间。

针对 Linux 操作系统而言,将最高的 1G 字节,即从虚拟地址 0xC0000000 到 0xFFFFFFFF 供内核使用,称为内核空间。而较低的 3G 字节,即从虚拟地址 0x00000000 到 0xBFFFFFFF,供进程使用,称为用户空间。每个进程都可以通过系统调用进入内核,因此 Linux 内核由系统内的所有进程共享。于是,从具体进程的角度看,每个进程可以拥有 4G 字节的虚拟空间

有了用户空间和内核空间,整个 Linux 内部结构可以分为三个部分,从最底层到最上层依次是:硬件、内核空间、用户空间

NAME

需要注意的细节是,从上图可以看出内核的组成:

  • 内核空间中存放的是内核代码和数据,而进程的用户空间存放的是用户程序的代码和数据。不管是内核空间还是用户空间,都处于虚拟空间之中。
  • Linux 使用两级保护机制:0 级供内核使用,3 级供用户程序使用。

服务端处理网络请求的流程

为了 OS 的安全性等考虑,进程是无法直接操作 IO 设备的,其必须通过系统调用来请求内核以协助完成 IO 动作,而内核会为每个 IO 设备维护一个 buffer

整个请求过程为:

  1. 用户进程发起请求;
  2. 内核接收到请求后;
  3. 从 IO 设备中获取数据到 buffer 中
  4. 再将 buffer 中的数据 copy 到用户进程的地址空间
  5. 该用户进程获取到数据后再响应客户端。

服务端处理网络请求的典型流程图如下:

NAME

在请求过程中,数据从 IO 设备输入至 buffer 需要时间,从 buffer 复制将数据复制到用户进程也需要时间。因此根据在这两段时间内等待方式的不同,IO 动作可以分为以下五种

  • 阻塞 IO,Blocking IO
  • 非阻塞 IO,Non-Blocking IO
  • IO 复用,IO Multiplexing
  • 信号驱动的 IO,Signal Driven IO
  • 异步 IO,Asynchrnous IO

更多细节参考 <Unix 网络编程>,6.2 节 “IO Models”。

设计服务端并发模型时,主要有如下两个关键点:

  • 服务器如何管理连接,获取请求数据。
  • 服务器如何处理请求。

以上两个关键点最终都与操作系统的 I/O 模型以及线程(进程)模型相关,下面详细介绍这两个模型。

阻塞/非阻塞、同步/异步

阻塞/非阻塞

  • 阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
  • 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

区别:

  • 两者的最大区别在于被调用方在收到请求到返回结果之前的这段时间内,调用方是否一直在等待。
  • 阻塞是指调用方一直在等待而且别的事情什么都不做;非阻塞是指调用方先去忙别的事情。

同步/异步

  • 同步处理是指被调用方得到最终结果之后才返回给调用方;
  • 异步处理是指被调用方先返回应答,然后再计算调用结果,计算完最终结果后再通知并返回给调用方。

区别与联系

阻塞、非阻塞和同步、异步其实针对的对象是不一样的

  • 阻塞、非阻塞的讨论对象是调用者。
  • 同步、异步的讨论对象是被调用者。

Linux 网络 I/O 模型

recvfrom 函数

recvfrom 函数(经 Socket 接收数据),这里把它视为系统调用。一个输入操作通常包括两个不同的阶段:

  • 等待数据准就绪。
  • 从内核向应用进程复制数据。

对于一个套接字上的输入操作,第一步通常涉及等待数据从网络中到达。当所等待分组到达时,它被复制到内核中的某个缓冲区。第二步就是把数据从内核缓冲区复制到应用进程缓冲区。

实际应用程序在通过系统调用完成上面的 2 步操作时,调用方式的阻塞、非阻塞,操作系统在处理应用程序请求时处理方式的同步、异步,可以分为 5 种 I/O 模型。

阻塞式 IO

NAME

在阻塞式 IO 模型中,应用程序从调用 recvfrom 开始到它返回有数据报准备好这段时间是阻塞的,recvfrom 返回成功后,应用程序开始处理数据报。

  • 优点:程序实现简单,在阻塞等待数据期间,进程、线程挂起,基本不会占用 CPU 资源。
  • 每个连接需要独立的进程、线程单独处理,当并发请求量大时为了维护程序,内存、线程切换开销很大,这种模型在实际生产中很少使用。

非阻塞 IO

NAME

在非阻塞 IO 模型中,应用程序把一个套接口设置为非阻塞,就是告诉内核,当所有请求的 IO 操作无法完成时,不要将进程睡眠。

而是返回一个错误,应用程序基于 IO 操作函数,将会不断的轮询数据是否已经准备就绪,直到数据准备就绪。

  • 优点:不会阻塞在内核的等待数据过程,每次发起的 IO 请求可以立即返回,不会阻塞等待,实时性比较好。
  • 缺点:轮询将会不断的询问内核,这将占用大量的 CPU 时间,系统资源利用率较低,所以一般 Web 服务器不会使用这种 IO 模型。

IO 多路复用

NAME

在 IO 复用模型中,会用到 Select、Poll、Epoll 函数,这些函数会使进程阻塞,但是和阻塞 IO 有所不同。

这些函数可以同时阻塞多个 IO 操作,而且可以同时对多个读、写操作的 IO 函数进行检测,直到有数据可读或可写时,才会真正调用 IO 操作函数。

  • 优点:可以基于一个阻塞对象,同时在多个描述符上等待就绪,而不是使用多个线程(每个文件描述符一个线程),这样可以大大节省系统资源。
  • 当连接数较少时效率比“多线程+阻塞IO”的模式效率低,可能延迟更大,因为单个连接处理需要 2 次系统调用,占用时间会增加。

信号驱动 IO

NAME

在信号驱动 IO 模型中,应用程序使用套接口进行信号驱动 IO,并安装一个信号处理函数,进程继续运行并不阻塞。

当数据准备好时,进程会收到一个 SIGIO 信号,可以在信号处理函数中调用 IO 操作函数处理数据。

  • 优点:线程没有在等待数据时被阻塞,可以提高资源利用率。
  • 缺点:信号 IO 模式在大量 IO 操作时可能会因为信号队列溢出而导致无法通知。

信号驱动 IO 尽管对于处理 UDP 套接字来说有用,即这种信号通知意味着到达了一个数据报,或者返回一个异步错误。

但是,对于 TCP 而言,信号驱动 IO 方式近乎无用。因为导致这种通知的条件为数众多,逐个进行判断会消耗很大的资源,与前几种方式相比优势尽失。

异步 IO

NAME

由 POSIX 规范定义,应用程序告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核拷贝到应用程序的缓冲区)通知应用程序。

这种模型与信号驱动模型的主要区别在于:信号驱动 IO 是由内核通知应用程序合适启动一个 IO 操作,而异步 IO 模型是由内核通知应用程序 IO 操作合适完成。

  • 优点:异步 IO 能够充分利用 DMA 特性,让 IO 操作与计算重叠。
  • 缺点:需要实现真正的异步 IO,操作系统需要做大量的工作。当前 Windows 下通过 IOCP 实现了真正的异步 IO。

而在 Linux 系统下直到 2.6 版本才引入,目前 AIO 并不完善,因此在 Linux 下实现并发网络编程时都是以 IO 复用模型为主。

IO 模型对比

NAME

从上图可以看出,越往后,阻塞越少,理论上效率也最优。

这五种模型中,前四种属于同步 IO,因为其中真正的 IO 操作(recvfrom 函数调用)将阻塞进程/线程,只有异步 IO 模型才与 POSIX 定义的异步 IO 相匹配。

进程/线程模型

介绍完服务器如何基于 IO 模型管理连接、获取输入数据,下面介绍服务器如何基于进程、线程模型来处理请求

传统阻塞 IO 服务模型

NAME

特点:

  • 采用阻塞式 IO 模型获取输入数据。
  • 每个连接都需要独立的线程完成数据输入的读取、业务处理、数据返回操作。

存在问题:

  • 当请求的并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
  • 当连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费。

Reactor 模式

针对传统阻塞 IO 服务模型的 2 个缺点,比较常见的有如下解决方案:

  • 基于 IO 复用模型,多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。
    • 当某条连接有新的数据可处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
  • 基于线程池复用线程资源,不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以多个连接的业务。

IO 复用模式结合线程池,就是 Reactor 模式的基本设计思想,如下图:

NAME

Reactor 模式,是指通过一个或多个输入同时传递给服务器来处理服务请求的事件驱动处理模式。

服务端程序处理传入的多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式。

即 IO 多路复用以统一的方式监听事件,收到事件后分发(Dispatch 给某线程),是编写高性能服务器的必备技术之一。

Reactor 模式有两个关键组件构成:

  • Reactor:在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序对 IO 事件做出反应。它就像公司的电话接线员,接听来自客户的电话并将线路转移给适当的联系人。
  • Handlers:处理程序执行 IO 事件需要完成的实际组件,类似于客户想要与之交谈的客服坐席。Reactor 通过调度适当的处理程序来响应 IO 事件,处理程序执行非阻塞操作。

根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现:

  • 单 Reactor 单线程
  • 单 Reactor 多线程
  • 主从 Reactor 多线程

单 Reactor 单线程

NAME

其中,Select 是前面 IO 复用模型介绍的标准网络编程 API,可以实现应用程序通过一个阻塞多向监听多路连接请求,其他方案的示意图也类似。

方案说明:

  • Reactor 对象通过 Select 监听客户端请求事件,收到事件后通过 Dispatch 进行分发。

  • 如果是“建立连接”请求事件,则由 Acceptor 通过 Accept 处理连接请求,同时创建一个 Handler 对象来处理连接完成后的后续业务处理。

  • 如果不是“建立连接”事件,则 Reactor 会分发调用“连接”对应的 Handler 来响应。

  • Handler 会完成 “Read->业务处理->Send” 的完整业务流程。

  • 优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成。

  • 缺点:性能问题,只有一个线程,无法完全发挥多个 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。

可靠性问题、线程意外跑飞、进入死循环,或导致整个系统的通信模块不可用,不能接收或处理外部消息,造成节点故障。

应用场景:客户端的数量有限,业务处理非常快,比如 Redis,业务处理的时间复杂度为 O(1)。

单 Reactor 多线程

NAME
  • Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发。

  • 如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,同时创建一个 Handler 对象处理连接完成后续的各种事件。

  • 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应。

  • Handler 只负责响应事件,不做具体业务处理,通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理。

  • Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处理。

  • Handler 收到响应结果后通过 Send 将响应结果返回给 Client。

  • 优点:可以充分利用多核 CPU 的处理能力。

  • 缺点:

    • 多线程数据共享和访问比较复杂;
    • Reactor 承担所有事件的监听和响应,在单线程中运行,高并发场景下容易成为性能瓶颈。

主从 Reactor 多线程

针对单 Reactor 多线程模型中,Reactor 在单线程中运行,高并发场景下容易成为性能瓶颈,可以让 Reactor 在多线程中运行。

NAME
  • Reactor 主线程 MainReactor 对象通过 Select 监控建立连接事件,收到事件后通过 Acceptor 接收,处理建立连接事件。

  • Acceptor 处理建立连接事件后,MainReactor 将连接分配 Reactor 子线程给 SubReactor 进行处理。

  • SubReactor 将连接加入连接队列进行监听,并创建一个 Handler 用于处理各种连接事件。

  • 当有新的事件发生时,SubReactor 会调用连接对应的 Handler 进行响应。

  • Handler 通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理。

  • Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处理。

  • Handler 收到响应结果后通过 Send 将响应结果返回给 Client。

  • 优点:父线程与子线程的数据交互简单、职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。

父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传递给子线程即可,子线程无需返回数据。

这种模型在很多项目中广泛使用,包括 Nginx 主从 Reactor 多线程模型,Memcached 主从多线程。

Reactor 模式总结

三种模式可以用一个比喻来理解:餐厅常常雇佣接待员负责迎接顾客,当顾客入座后,侍应生专门为这张桌子服务。

  • 单 Reactor 单线程:接待员和侍应生是同一个人,全程为顾客服务。
  • 单 Reactor 多线程:一个接待员、多个侍应生,接待员只负责接待。
  • 主从 Reactor:多个接待员,多个侍应生。

Reactor 模式具有如下的优点:

  • 响应快:不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的。
  • 编程相对简单:可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程的切换开销。
  • 可扩展性:可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源。
  • 可复用性:Reactor 模型本身与具体事件处理逻辑无关,具有很高的复用性。

Proactor 模型

在 Reactor 模式中,Reactor 等待某个事件、可应用或操作的状态发生(比如文件描述符可读、Socket 可读写)。

然后把该事件传递给事先注册的 Handler(事件处理函数或回调函数),由后者来做实际的读写操作。

其中的读写操作都需要应用程序同步操作,所以 Reactor 是非阻塞同步网络模型

如果把 IO 操作改为异步,即交给操作系统来完成 IO 操作,就能进一步提升性能,这就是异步网络模型 Proactor。

NAME

Proactor 是和异步 I/O 相关的,详细方案如下:

  • ProactorInitiator 创建 Proactor 和 Handler 对象,并将 Proactor 和 Handler 都通过 AsyOptProcessor(Asynchronous Operation Processor) 注册到内核。
  • AsyOptProcessor 处理注册请求,并处理 I/O 操作。
  • AsyOptProcessor 完成 I/O 操作后通知 Proactor。
  • Proactor 根据不同的事件类型回调不同的 Handler 进行业务处理。
  • Handler 完成业务处理。

可以看出 Proactor 和 Reactor 的区别:

  • Reactor 是在事件发生时就通知事先注册的事件(读写在应用程序线程中处理完成)。
  • Proactor 是在事件发生时基于异步 I/O 完成读写操作(由内核完成),待 I/O 操作完成后才回调应用程序的处理器来进行业务处理。

理论上 Proactor 比 Reactor 效率更高,异步 I/O 更加充分发挥 DMA(Direct Memory Access,直接内存存取)的优势,但是有如下缺点:

  • 编程复杂性:由于异步操作流程的事件的初始化和事件完成在时间和空间上都是相互分离的,因此开发异步应用程序更加复杂。应用程序还可能因为反向的流控而变得更加难以 Debug。
  • 内存使用:缓冲区在读或写操作的时间段内必须保持住,可能造成持续的不确定性,并且每个并发操作都要求有独立的缓存,相比 Reactor 模式,在 Socket 已经准备好读或写前,是不要求开辟缓存的。
  • 操作系统支持,Windows 下通过 IOCP 实现了真正的异步 I/O,而在 Linux 系统下,Linux 2.6 才引入,目前异步 I/O 还不完善。

因此在 Linux 下实现高并发网络编程都是以 Reactor 模型为主。

2 - Linux IO 零拷贝

CPU 并不执行将数据从一个存储区域拷贝到另一个存储区域这样的任务。通常用于在网络传输文件时节省 CPU 周期和内存带宽。

缓存 IO

缓存 IO 又被称为标准 IO,大多数文件系统的默认 IO 操作都是缓存 IO。在 Linux 的缓存 IO 机制中,操作系统会将 IO 的数据缓存在文件系统的页缓存(page cache)中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 IO 的缺点:数据在传输过程中需要在应用程序地址空间和内核间进行多次数据复制操作,这些数据复制所带来的 CPU 及内存开销是非常大的。

零拷贝技术分类

零拷贝技术的发展很多样化,现有的零拷贝技术种类也非常多,而当前并没有一个适合于所有场景的零拷贝技术出现。对于 Linux 来说,现有的零拷贝技术也比较多,这些零拷贝技术大部分存在于不同的 Linux 内核版本,有些旧的技术在不同的 Linux 内核版本间得到了很大的发展或者已经渐渐被新的技术所代替。

本文针对这些零拷贝技术所适用的不同场景对它们进行了划分。概括起来,Linux 中的零拷贝技术主要有下面这几种:

  • 直接 I/O:对于这种数据传输方式来说,应用程序可以直接访问硬件存储,操作系统内核只是辅助数据传输:这类零拷贝技术针对的是操作系统内核并不需要对数据进行直接处理的情况,数据可以在应用程序地址空间的缓冲区和磁盘之间直接进行传输,完全不需要 Linux 操作系统内核提供的页缓存的支持。
  • 在数据传输的过程中,避免数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间进行拷贝。有的时候,应用程序在数据进行传输的过程中不需要对数据进行访问,那么,将数据从 Linux 的页缓存拷贝到用户进程的缓冲区中就可以完全避免,传输的数据在页缓存中就可以得到处理。在某些特殊的情况下,这种零拷贝技术可以获得较好的性能。Linux 中提供类似的系统调用主要有 mmap(),sendfile() 以及 splice()。
  • 对数据在 Linux 的页缓存和用户进程的缓冲区之间的传输过程进行优化。该零拷贝技术侧重于灵活地处理数据在用户进程的缓冲区和操作系统的页缓存之间的拷贝操作。这种方法延续了传统的通信方式,但是更加灵活。在Linux 中,该方法主要利用了写时复制技术。

前两类方法的目的主要是为了避免应用程序地址空间和操作系统内核地址空间这两者之间的缓冲区拷贝操作。这两类零拷贝技术通常适用在某些特殊的情况下,比如要传送的数据不需要经过操作系统内核的处理或者不需要经过应用程序的处理。第三类方法则继承了传统的应用程序地址空间和操作系统内核地址空间之间数据传输的概念,进而针对数据传输本身进行优化。我们知道,硬件和软件之间的数据传输可以通过使用 DMA 来进行,DMA 进行数据传输的过程中几乎不需要CPU参与,这样就可以把 CPU 解放出来去做更多其他的事情,但是当数据需要在用户地址空间的缓冲区和 Linux 操作系统内核的页缓存之间进行传输的时候,并没有类似DMA 这种工具可以使用,CPU 需要全程参与到这种数据拷贝操作中,所以这第三类方法的目的是可以有效地改善数据在用户地址空间和操作系统内核地址空间之间传递的效率。

NAME

当应用程序访问某块数据时,操作系统首先会检查,是不是最近访问过此文件,文件内容是否缓存在内核缓冲区,如果是,操作系统则直接根据read系统调用提供的buf地址,将内核缓冲区的内容拷贝到buf所指定的用户空间缓冲区中去。如果不是,操作系统则首先将磁盘上的数据拷贝的内核缓冲区,这一步目前主要依靠DMA来传输,然后再把内核缓冲区上的内容拷贝到用户缓冲区中。 接下来,write系统调用再把用户缓冲区的内容拷贝到网络堆栈相关的内核缓冲区中,最后socket再把内核缓冲区的内容发送到网卡上。

从上图中可以看出,共产生了四次数据拷贝,即使使用了DMA来处理了与硬件的通讯,CPU仍然需要处理两次数据拷贝,与此同时,在用户态与内核态也发生了多次上下文切换,无疑也加重了CPU负担。

在此过程中,我们没有对文件内容做任何修改,那么在内核空间和用户空间来回拷贝数据无疑就是一种浪费,而零拷贝主要就是为了解决这种低效性。

mmap:让数据传输不需要经过user space

我们减少拷贝次数的一种方法是调用mmap()来代替read调用:

buf = mmap(diskfd, len);
write(sockfd, buf, len);

应用程序调用 mmap(),磁盘上的数据会通过 DMA被拷贝的内核缓冲区,接着操作系统会把这段内核缓冲区与应用程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝。应用程序再调用 write(),操作系统直接将内核缓冲区的内容拷贝到 socket缓冲区中,这一切都发生在内核态,最后, socket缓冲区再把数据发到网卡去。

如下图:

NAME

使用mmap替代read很明显减少了一次拷贝,当拷贝数据量很大时,无疑提升了效率。但是使用 mmap是有代价的。当你使用 mmap时,你可能会遇到一些隐藏的陷阱。例如,当你的程序 map了一个文件,但是当这个文件被另一个进程截断(truncate)时, write系统调用会因为访问非法地址而被 SIGBUS信号终止。 SIGBUS信号默认会杀死你的进程并产生一个 coredump,如果你的服务器这样被中止了,那会产生一笔损失。

通常我们使用以下解决方案避免这种问题:

  • 为SIGBUS信号建立信号处理程序:当遇到 SIGBUS信号时,信号处理程序简单地返回, write系统调用在被中断之前会返回已经写入的字节数,并且 errno会被设置成success,但是这是一种糟糕的处理办法,因为你并没有解决问题的实质核心。
  • 使用文件租借锁:通常我们使用这种方法,在文件描述符上使用租借锁,我们为文件向内核申请一个租借锁,当其它进程想要截断这个文件时,内核会向我们发送一个实时的 RT_SIGNAL_LEASE信号,告诉我们内核正在破坏你加持在文件上的读写锁。这样在程序访问非法内存并且被 SIGBUS杀死之前,你的 write系统调用会被中断。 write会返回已经写入的字节数,并且置 errno为success。 我们应该在 mmap文件之前加锁,并且在操作完文件后解锁:
if(fcntl(diskfd, F_SETSIG, RT_SIGNAL_LEASE) == -1){
	perror("kernel lease set signal");
    return -1;
}
/*  l_type can be F_RDLCK_F_WRLCK 加锁 */
/*  l_type can be F_UNLCK 解锁 */
if(fcntl(diskfd, F_SETLEASE, l_type)){
	perror("kernel lease set_type");
    return -1;
}

Reference

3 - Netty IO 模型

本文基于Netty4.1展开介绍相关理论模型,使用场景,基本组件、整体架构,知其然且知其所以然,希望给大家在实际开发实践、学习开源项目提供参考。

简介

Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

JDK原生NIO程序的问题

JDK原生也有一套网络应用程序API,但是存在一系列问题,主要如下:

  • NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
  • 需要具备其它的额外技能做铺垫,例如熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序
  • 可靠性能力补齐,开发工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大
  • JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该bug发生概率降低了一些而已,它并没有被根本解决

Netty 的特点

Netty对JDK自带的NIO的API进行封装,解决上述问题,主要特点有:

  • 设计优雅
    • 适用于各种传输类型的统一API - 阻塞和非阻塞Socket
    • 基于灵活且可扩展的事件模型,可以清晰地分离关注点
    • 高度可定制的线程模型 - 单线程,一个或多个线程池
    • 真正的无连接数据报套接字支持(自3.1起)
  • 使用方便
    • 详细记录的Javadoc,用户指南和示例
    • 没有其他依赖项,JDK 5(Netty 3.x)或6(Netty 4.x)就足够了
  • 高性能
    • 吞吐量更高,延迟更低
    • 减少资源消耗
    • 最小化不必要的内存复制
  • 安全
    • 完整的SSL / TLS和StartTLS支持
  • 社区活跃,不断更新
    • 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会被加入

Netty常见使用场景

Netty常见的使用场景如下:

  • 互联网行业
    • 在分布式系统中,各个节点之间需要远程服务调用,高性能的RPC框架必不可少,Netty作为异步高新能的通信框架,往往作为基础通信组件被这些RPC框架使用。
    • 典型的应用有:阿里分布式服务框架Dubbo的RPC框架使用Dubbo协议进行节点间通信,Dubbo协议默认使用Netty作为基础通信组件,用于实现各进程节点之间的内部通信。
  • 游戏行业
    • 无论是手游服务端还是大型的网络游戏,Java语言得到了越来越广泛的应用。Netty作为高性能的基础通信组件,它本身提供了TCP/UDP和HTTP协议栈。
    • 非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过Netty进行高性能的通信
  • 大数据领域
    • 经典的Hadoop的高性能通信和序列化组件Avro的RPC框架,默认采用Netty进行跨界点通信,它的Netty Service基于Netty框架二次封装实现

Netty 的高性能设计

Netty作为异步事件驱动的网络,高性能之处主要来自于其I/O模型和线程处理模型,前者决定如何收发数据,后者决定如何处理数据

IO 模型

用什么样的通道将数据发送给对方,BIO、NIO或者AIO,I/O模型在很大程度上决定了框架的性能

阻塞 IO

传统阻塞型I/O(BIO)可以用下图表示:

NAME

特点:

  • 每个请求都需要独立的线程完成数据read,业务处理,数据write的完整操作

问题:

  • 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大
  • 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在read操作上,造成线程资源浪费

IO 复用模型

NAME

在I/O复用模型中,会用到select,这个函数也会使进程阻塞,但是和阻塞I/O所不同的的,这两个函数可以同时阻塞多个I/O操作,而且可以同时对多个读操作,多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数。

Netty的非阻塞I/O的实现关键是基于I/O复用模型,这里用Selector对象表示:

NAME

Netty的IO线程NioEventLoop由于聚合了多路复用器Selector,可以同时并发处理成百上千个客户端连接。当线程从某客户端Socket通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。

由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁I/O阻塞导致的线程挂起,一个I/O线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

基于 Buffer

传统的I/O是面向字节流或字符流的,以流式的方式顺序地从一个Stream 中读取一个或多个字节, 因此也就不能随意改变读取指针的位置。

在NIO中, 抛弃了传统的 I/O流, 而是引入了Channel和Buffer的概念. 在NIO中, 只能从Channel中读取数据到Buffer中或将数据 Buffer 中写入到 Channel。

基于buffer操作不像传统IO的顺序操作, NIO 中可以随意地读取任意位置的数据。

线程模型

数据报如何读取?读取之后的编解码在哪个线程进行,编解码后的消息如何派发,线程模型的不同,对性能的影响也非常大。

事件驱动模型

通常,我们设计一个事件处理模型的程序有两种思路

  • 轮询方式
    • 线程不断轮询访问相关事件发生源有没有发生事件,有发生事件就调用事件处理逻辑。
  • 事件驱动方式
    • 发生事件,主线程把事件放入事件队列,在另外线程不断循环消费事件列表中的事件,调用事件对应的处理逻辑处理事件。事件驱动方式也被称为消息通知方式,其实是设计模式中观察者模式的思路。

以GUI的逻辑处理为例,说明两种逻辑的不同:

  • 轮询方式
    • 线程不断轮询是否发生按钮点击事件,如果发生,调用处理逻辑
  • 事件驱动方式
    • 发生点击事件把事件放入事件队列,在另外线程消费的事件列表中的事件,根据事件类型调用相关事件处理逻辑

这里借用O’Reilly 大神关于事件驱动模型解释图

NAME

主要包括4个基本组件:

  • 事件队列(event queue):接收事件的入口,存储待处理事件
  • 分发器(event mediator):将不同的事件分发到不同的业务逻辑单元
  • 事件通道(event channel):分发器与处理器之间的联系渠道
  • 事件处理器(event processor):实现业务逻辑,处理完成后会发出事件,触发下一步操作

可以看出,相对传统轮询模式,事件驱动有如下优点:

  • 可扩展性好,分布式的异步架构,事件处理器之间高度解耦,可以方便扩展事件处理逻辑
  • 高性能,基于队列暂存事件,能方便并行异步处理事件

Reactor 线程模型

Reactor是反应堆的意思,Reactor模型,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。 服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor模式也叫Dispatcher模式,即I/O多了复用统一监听事件,收到事件后分发(Dispatch给某进程),是编写高性能网络服务器的必备技术之一。

Reactor模型中有2个关键组成:

  • Reactor
    • Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。 它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人
  • Handlers
    • 处理程序执行I/O事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作
NAME

取决于Reactor的数量和Hanndler线程数量的不同,Reactor模型有3个变种

  • 单Reactor单线程
  • 单Reactor多线程
  • 主从Reactor多线程

可以这样理解,Reactor就是一个执行while (true) { selector.select(); …}循环的线程,会源源不断的产生新的事件,称作反应堆很贴切。

Netty 线程模型

Netty主要基于主从Reactors多线程模型(如下图)做了一定的修改,其中主从Reactor多线程模型有多个Reactor:MainReactor和SubReactor:

  • MainReactor负责客户端的连接请求,并将请求转交给SubReactor
  • SubReactor负责相应通道的IO读写请求
  • 非IO请求(具体逻辑处理)的任务则会直接写入队列,等待worker threads进行处理

这里引用Doug Lee大神的Reactor介绍:Scalable IO in Java里面关于主从Reactor多线程模型的图

NAME

特别说明的是: 虽然Netty的线程模型基于主从Reactor多线程,借用了MainReactor和SubReactor的结构,但是实际实现上,SubReactor和Worker线程在同一个线程池中:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)

上面代码中的bossGroup 和workerGroup是Bootstrap构造方法中传入的两个对象,这两个group均是线程池

  • bossGroup线程池则只是在bind某个端口后,获得其中一个线程作为MainReactor,专门处理端口的accept事件,每个端口对应一个boss线程
  • workerGroup线程池会被各个SubReactor和worker线程充分利用

异步处理

异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

Netty中的I/O操作是异步的,包括bind、write、connect等操作会简单的返回一个ChannelFuture,调用者并不能立刻获得结果,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

当future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操,常见有如下操作:

  • 通过isDone方法来判断当前操作是否完成
  • 通过isSuccess方法来判断已完成的当前操作是否成功
  • 通过getCause方法来获取已完成的当前操作失败的原因
  • 通过isCancelled方法来判断已完成的当前操作是否被取消
  • 通过addListener方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;如果future对象已完成,则理解通知指定的监听器

例如下面的的代码中绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑

serverBootstrap.bind(port).addListener(future -> {
       if (future.isSuccess()) {
           System.out.println(new Date() + ": 端口[" + port + "]绑定成功!");
       } else {
           System.err.println("端口[" + port + "]绑定失败!");
       }
   });

相比传统阻塞I/O,执行I/O操作后线程会被阻塞住, 直到操作完成;异步处理的好处是不会造成线程阻塞,线程在I/O操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。

Netty 的架构设计

功能特性

NAME

模块组件

Bootstrap、ServerBootstrap

Bootstrap意思是引导,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。

Future、ChannelFuture

正如前面介绍,在Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理,但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。

Channel

Netty网络通信的组件,能够用于执行网络I/O操作。

Channel为用户提供:

  • 当前网络连接的通道的状态(例如是否打开?是否已连接?)

  • 网络连接的配置参数 (例如接收缓冲区大小)

  • 提供异步的网络I/O操作(如建立连接,读写,绑定端口),异步调用意味着任何I / O调用都将立即返回,并且不保证在调用结束时所请求的I / O操作已完成。调用立即返回一个ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I / O操作成功、失败或取消时回调通知调用方。

  • 支持关联I/O操作与对应的处理程序

    不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,下面是一些常用的 Channel 类型

  • NioSocketChannel,异步的客户端 TCP Socket 连接

  • NioServerSocketChannel,异步的服务器端 TCP Socket 连接

  • NioDatagramChannel,异步的 UDP 连接

  • NioSctpChannel,异步的客户端 Sctp 连接

  • NioSctpServerChannel,异步的 Sctp 服务器端连接

这些通道涵盖了 UDP 和 TCP网络 IO以及文件 IO.

Selector

Netty基于Selector对象实现I/O多路复用,通过 Selector, 一个线程可以监听多个连接的Channel事件, 当向一个Selector中注册Channel 后,Selector 内部的机制就可以自动不断地查询(select) 这些注册的Channel是否有已就绪的I/O事件(例如可读, 可写, 网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。

NioEventLoop

NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:

  • I/O任务 即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。
  • 非IO任务 添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。

两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。

NioEventLoopGroup

NioEventLoopGroup,主要管理eventLoop的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个Channel上的事件,而一个Channel只对应于一个线程。

ChannelHandler

ChannelHandler是一个接口,处理I / O事件或拦截I / O操作,并将其转发到其ChannelPipeline(业务处理链)中的下一个处理程序。

ChannelHandler本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:

  • ChannelInboundHandler用于处理入站I / O事件
  • ChannelOutboundHandler用于处理出站I / O操作

或者使用以下适配器类:

  • ChannelInboundHandlerAdapter用于处理入站I / O事件

  • ChannelOutboundHandlerAdapter用于处理出站I / O操作

  • ChannelDuplexHandler用于处理入站和出站事件

ChannelHandlerContext

保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象

ChannelPipline

保存ChannelHandler的List,用于处理或拦截Channel的入站事件和出站操作。 ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler如何相互交互。

下图引用Netty的Javadoc4.1中ChannelPipline的说明,描述了ChannelPipeline中ChannelHandler通常如何处理I/O事件。 I/O事件由ChannelInboundHandler或ChannelOutboundHandler处理,并通过调用ChannelHandlerContext中定义的事件传播方法(例如ChannelHandlerContext.fireChannelRead(Object)和ChannelOutboundInvoker.write(Object))转发到其最近的处理程序。

NAME

入站事件由自下而上方向的入站处理程序处理,如图左侧所示。 入站Handler处理程序通常处理由图底部的I / O线程生成的入站数据。 通常通过实际输入操作(例如SocketChannel.read(ByteBuffer))从远程读取入站数据。

出站事件由上下方向处理,如图右侧所示。 出站Handler处理程序通常会生成或转换出站传输,例如write请求。 I/O线程通常执行实际的输出操作,例如SocketChannel.write(ByteBuffer)。

在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应, 它们的组成关系如下:

NAME

一个 Channel 包含了一个 ChannelPipeline, 而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表, 并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的handler,出站事件会从链表tail往前传递到最前一个出站的handler,两种类型的handler互不干扰。

Netty 工作流程

初始化并启动Netty服务端过程如下:

public static void main(String[] args) {
       // 创建mainReactor
       NioEventLoopGroup boosGroup = new NioEventLoopGroup();
       // 创建工作线程组
       NioEventLoopGroup workerGroup = new NioEventLoopGroup();

       final ServerBootstrap serverBootstrap = new ServerBootstrap();
       serverBootstrap 
                // 组装NioEventLoopGroup 
               .group(boosGroup, workerGroup)
                // 设置channel类型为NIO类型
               .channel(NioServerSocketChannel.class)
               // 设置连接配置参数
               .option(ChannelOption.SO_BACKLOG, 1024)
               .childOption(ChannelOption.SO_KEEPALIVE, true)
               .childOption(ChannelOption.TCP_NODELAY, true)
               // 配置入站、出站事件handler
               .childHandler(new ChannelInitializer<NioSocketChannel>() {
                   @Override
                   protected void initChannel(NioSocketChannel ch) {
                       // 配置入站、出站事件channel
                       ch.pipeline().addLast(...);
                       ch.pipeline().addLast(...);
                   }
   });

       // 绑定端口
       int port = 8080;
       serverBootstrap.bind(port).addListener(future -> {
           if (future.isSuccess()) {
               System.out.println(new Date() + ": 端口[" + port + "]绑定成功!");
           } else {
               System.err.println("端口[" + port + "]绑定失败!");
           }
       });
}

基本过程如下:

  1. 初始化创建2个NioEventLoopGroup,其中boosGroup用于Accetpt连接建立事件并分发请求, workerGroup用于处理I/O读写事件和业务逻辑

  2. 基于ServerBootstrap(服务端启动引导类),配置EventLoopGroup、Channel类型,连接参数、配置入站、出站事件handler

  3. 绑定端口,开始工作

结合上面的介绍的Netty Reactor模型,介绍服务端Netty的工作架构图:

NAME

server端包含1个Boss NioEventLoopGroup和1个Worker NioEventLoopGroup,NioEventLoopGroup相当于1个事件循环组,这个组里包含多个事件循环NioEventLoop,每个NioEventLoop包含1个selector和1个事件循环线程。

每个Boss NioEventLoop循环执行的任务包含3步:

  • 1 轮询accept事件
  • 2 处理accept I/O事件,与Client建立连接,生成NioSocketChannel,并将NioSocketChannel注册到某个Worker NioEventLoop的Selector上 *3 处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用eventloop.execute或schedule执行的任务,或者其它线程提交到该eventloop的任务。

每个Worker NioEventLoop循环执行的任务包含3步:

  • 1 轮询read、write事件;
  • 2 处I/O事件,即read、write事件,在NioSocketChannel可读、可写事件发生时进行处理
  • 3 处理任务队列中的任务,runAllTasks。

其中任务队列中的task有3种典型使用场景

  • 1 用户程序自定义的普通任务

    ctx.channel().eventLoop().execute(new Runnable() {
       @Override
       public void run() {
           //...
       }
    });
    
  • 2 非当前reactor线程调用channel的各种方法 例如在推送系统的业务线程里面,根据用户的标识,找到对应的channel引用,然后调用write类方法向该用户推送消息,就会进入到这种场景。最终的write会提交到任务队列中后被异步消费。

  • 3 用户自定义定时任务

    ctx.channel().eventLoop().schedule(new Runnable() {
       @Override
       public void run() {
    
       }
    }, 60, TimeUnit.SECONDS);
    

总结

现在稳定推荐使用的主流版本还是Netty4,Netty5 中使用了 ForkJoinPool,增加了代码的复杂度,但是对性能的改善却不明显,所以这个版本不推荐使用,官网也没有提供下载链接。

Netty 入门门槛相对较高,其实是因为这方面的资料较少,并不是因为他有多难,大家其实都可以像搞透 Spring 一样搞透 Netty。在学习之前,建议先理解透整个框架原理结构,运行过程,可以少走很多弯路。

4 - Redis IO 模型

Redis 是一个事件驱动的内存数据库,服务器需要处理两种类型的事件。

  • 文件事件
  • 时间事件

文件事件(FileEvent)

Redis 服务器通过 socket 实现与客户端(或其他redis服务器)的交互,文件事件就是服务器对 socket 操作的抽象。 Redis 服务器,通过监听这些 socket 产生的文件事件并处理这些事件,实现对客户端调用的响应。

Reactor

Redis 基于 Reactor 模式开发了自己的事件处理器。

这里就先展开讲一讲 Reactor 模式。看下图:

NAME

“I/O 多路复用模块”会监听多个 FD ,当这些FD产生,accept,read,write 或 close 的文件事件。会向“文件事件分发器(dispatcher)”传送事件。

文件事件分发器(dispatcher)在收到事件之后,会根据事件的类型将事件分发给对应的 handler。

我们顺着图,从上到下的逐一讲解 Redis 是怎么实现这个 Reactor 模型的。

I/O 多路复用模块

Redis 的 I/O 多路复用模块,其实是封装了操作系统提供的 select,epoll,avport 和 kqueue 这些基础函数。向上层提供了一个统一的接口,屏蔽了底层实现的细节。

操作系统I/O多路复用
Solarisavport
LINUXepoll
Mackqueue
Otherselect

下面以Linux epoll为例,看看使用 Redis 是怎么利用 linux 提供的 epoll 实现I/O 多路复用。

Redis 对文件事件,封装epoll向上提供的接口:

/*
 * 事件状态
 */
typedef struct aeApiState {
    // epoll_event 实例描述符
    int epfd;
    // 事件槽
    struct epoll_event *events;
} aeApiState;

/*
 * 创建一个新的 epoll 
 */
static int  aeApiCreate(aeEventLoop *eventLoop)
/*
 * 调整事件slot的大小
 */
static int  aeApiResize(aeEventLoop *eventLoop, int setsize)
/*
 * 释放epoll实例和事件slot
 */
static void aeApiFree(aeEventLoop *eventLoop)
/*
 * 关联给定事件到fd
 */
static int  aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
/*
 * 从fd中删除给定事件
 */
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)
/*
 * 获取可执行事件
 */
static int  aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)

所以看看这个ae_peoll.c 如何对 epoll 进行封装的:

  • aeApiCreate() 是对 epoll.epoll_create() 的封装。
  • aeApiAddEvent()aeApiDelEvent() 是对 epoll.epoll_ctl()的封装。
  • aeApiPoll() 是对 epoll_wait()的封装。

这样 Redis 的利用 epoll 实现的 I/O 复用器就比较清晰了。

再往上一层次我们需要看看 ae.c 是怎么封装的?

首先需要关注的是事件处理器的数据结构:

typedef struct aeFileEvent {

    // 监听事件类型掩码,
    // 值可以是 AE_READABLE 或 AE_WRITABLE ,
    // 或者 AE_READABLE | AE_WRITABLE
    int mask; /* one of AE_(READABLE|WRITABLE) */
    // 读事件处理器
    aeFileProc *rfileProc;
    // 写事件处理器
    aeFileProc *wfileProc;
    // 多路复用库的私有数据
    void *clientData;
} aeFileEvent;

mask 就是可以理解为事件的类型。

除了使用 ae_epoll.c 提供的方法外, ae.c 还增加 “增删查” 的几个 API。

  • 增:aeCreateFileEvent
  • 删:aeDeleteFileEvent
  • 查: 查包括两个维度 aeGetFileEvents 获取某个 fd 的监听类型和aeWait等待某个fd 直到超时或者达到某个状态。

事件分发器(dispatcher)

Redis 的事件分发器 ae.c/aeProcessEvents 不但处理文件事件还处理时间事件,所以这里只贴与文件分发相关的出部分代码,dispather 根据 mask 调用不同的事件处理器。

//从 epoll 中获关注的事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
    // 从已就绪数组中获取事件
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd;
    int rfired = 0;

    // 读事件
    if (fe->mask & mask & AE_READABLE) {
        // rfired 确保读/写事件只能执行其中一个
        rfired = 1;
        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
    }
    // 写事件
    if (fe->mask & mask & AE_WRITABLE) {
        if (!rfired || fe->wfileProc != fe->rfileProc)
            fe->wfileProc(eventLoop,fd,fe->clientData,mask);
    }

    processed++;
}

可以看到这个分发器,根据 mask 的不同将事件分别分发给了读事件和写事件。

文件事件处理器的类型

Redis 有大量的事件处理器类型,我们就讲解处理一个简单命令涉及到的3个处理器:

  • acceptTcpHandler 连接应答处理器,负责处理连接相关的事件,当有client 连接到Redis的时候们就会产生 AE_READABLE 事件。引发它执行。
  • readQueryFromClient 命令请求处理器,负责读取通过 sokect 发送来的命令。
  • sendReplyToClient 命令回复处理器,当Redis处理完命令,就会产生 AE_WRITEABLE 事件,将数据回复给 client。

文件事件实现总结

我们按照开始给出的 Reactor 模型,从上到下讲解了文件事件处理器的实现,下面将会介绍时间时间的实现。

时间事件(TimeEvent)

Reids 有很多操作需要在给定的时间点进行处理,时间事件就是对这类定时任务的抽象。

先看时间事件的数据结构:

/* Time event structure
 *
 * 时间事件结构
 */
typedef struct aeTimeEvent {

    // 时间事件的唯一标识符
    long long id; /* time event identifier. */

    // 事件的到达时间
    long when_sec; /* seconds */
    long when_ms; /* milliseconds */

    // 事件处理函数
    aeTimeProc *timeProc;

    // 事件释放函数
    aeEventFinalizerProc *finalizerProc;

    // 多路复用库的私有数据
    void *clientData;

    // 指向下个时间事件结构,形成链表
    struct aeTimeEvent *next;

} aeTimeEvent;

看见 next 我们就知道这个 aeTimeEvent 是一个链表结构。看图:

NAME

注意这是一个按照id倒序排列的链表,并没有按照事件顺序排序。

processTimeEvent

Redis 使用这个函数处理所有的时间事件,我们整理一下执行思路:

  1. 记录最新一次执行这个函数的时间,用于处理系统时间被修改产生的问题。
  2. 遍历链表找出所有 when_sec 和 when_ms 小于现在时间的事件。
  3. 执行事件对应的处理函数。
  4. 检查事件类型,如果是周期事件则刷新该事件下一次的执行事件。
  5. 否则从列表中删除事件。

综合调度器(aeProcessEvents)

综合调度器是 Redis 统一处理所有事件的地方。我们梳理一下这个函数的简单逻辑:

// 1. 获取离当前时间最近的时间事件
shortest = aeSearchNearestTimer(eventLoop);
// 2. 获取间隔时间
timeval = shortest - nowTime;
// 如果timeval 小于 0,说明已经有需要执行的时间事件了。
if(timeval < 0){
    timeval = 0
}
// 3. 在 timeval 时间内,取出文件事件。
numevents = aeApiPoll(eventLoop, timeval);
// 4.根据文件事件的类型指定不同的文件处理器
if (AE_READABLE) {
    // 读事件
    rfileProc(eventLoop,fd,fe->clientData,mask);
}
    // 写事件
if (AE_WRITABLE) {
    wfileProc(eventLoop,fd,fe->clientData,mask);
}

以上的伪代码就是整个 Redis 事件处理器的逻辑。

我们可以再看看谁执行了这个 aeProcessEvents:

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        // 如果有需要在事件处理前执行的函数,那么运行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        // 开始处理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

然后我们再看看是谁调用了 aeMain:

int main(int argc, char **argv) {
    //一些配置和准备
    ...
    aeMain(server.el);
    
    //结束后的回收工作
    ...
}

我们在 Redis 的 main 方法中找个了它。

这个时候我们整理出的思路就是:

  • Redis 的 main() 方法执行了一些配置和准备以后就调用 aeMain() 方法。
  • eaMain() while(true) 的调用 aeProcessEvents()

所以我们说 Redis 是一个事件驱动的程序,期间我们发现,Redis 没有 fork 过任何线程。所以也可以说 Redis 是一个基于事件驱动的单线程应用。

5 - Nginx IO 模型

Nginx以其高性能,稳定性,丰富的功能,简单的配置和低资源消耗而闻名。本文从底层原理分析Nginx为什么这么快!

进程模型

NAME

Nginx 服务器在运行过程中:

  • 多进程:一个 Master 进程、多个 Worker 进程。
  • Master进程:管理 Worker 进程。
    • 对外接口:接收外部的操作(信号);
    • 对内转发:根据外部的操作的不同,通过信号管理 Worker;
    • 监控:监控 Worker 进程的运行状态,Worker 进程异常终止后,自动重启 Worker 进程。
  • Worker进程:所有 Worker 进程都是平等的。
    • 处理网络请求,由Worker进程处理。
    • Worker进程数量:在nginx.conf中配置,一般设置为核心数,充分利用 CPU 资源。
    • 同时,避免进程数量过多,避免进程竞争 CPU 资源,增加上下文切换的损耗。

思考

  • 请求是连接到 Nginx,Master 进程负责处理和转发?
  • 如何选定哪个 Worker 进程处理请求?请求的处理结果,是否还要经过 Master 进程?
NAME

请求处理过程

HTTP 连接建立和请求处理过程如下:

  • Nginx 启动时,Master 进程,加载配置文件。
  • Master 进程,初始化监听的 Socket。
  • Master 进程,Fork 出多个 Worker 进程。
  • Worker 进程,竞争新的连接,获胜方通过三次握手,建立 Socket 连接,并处理请求。

高性能高并发

Nginx 为什么拥有高性能并且能够支撑高并发?

  • Nginx采用多进程+异步非阻塞方式(IO 多路复用 Epoll)。
  • 请求的完整过程:建立连接→读取请求→解析请求→处理请求→响应请求。
  • 请求的完整过程对应到底层就是:读写Socket事件。

事件处理模型

Request:Nginx中HTTP请求。

基本的HTTP Web Server工作模式:

  • 接收请求:逐行读取请求行和请求头,判断段有请求体后,读取请求体。
  • 处理请求:获取对应的资源
  • 返回响应:根据处理结果,生成相应的 HTTP 请求(响应行、响应头、响应体)。

Nginx也是这个套路,整体流程一致:

NAME

模块化体系结构

NAME

Nginx的模块根据其功能基本上可以分为以下几种类型:

  • event module
    • 搭建了独立于操作系统的事件处理机制的框架,及提供了各具体事件的处理。包括 ngx_events_module,ngx_event_core_module 和 ngx_epoll_module 等。
    • Nginx 具体使用何种事件处理模块,这依赖于具体的操作系统和编译选项。
  • phase handler
    • 此类型的模块也被直接称为 handler 模块。主要负责处理客户端请求并产生待响应内容,比如 ngx_http_static_module 模块,负责客户端的静态页面请求处理并将对应的磁盘文件准备为响应内容输出。
  • output filter
    • 也称为 filter 模块,主要是负责对输出的内容进行处理,可以对输出进行修改。
    • 例如: 可以实现对输出的所有 html 页面增加预定义的 footbar 一类的工作,或者对输出的图片的 URL 进行替换之类的工作。
  • upstream
    • upstream 模块实现反向代理的功能,将真正的请求转发到后端服务器上,并从后端服务器上读取响应,发回客户端。
    • upstream 模块是一种特殊的 handler,只不过响应内容不是真正由自己产生的,而是从后端服务器上读取的。
  • load-balancer
    • 负载均衡模块,实现特定的算法,在众多的后端服务器中,选择一个服务器出来作为某个请求的转发服务器。

Nginx vs Apache

Nginx:

  • IO 多路复用,Epoll(freebsd 上是 kqueue)
  • 高性能
  • 高并发
  • 占用系统资源少

Apache:

  • 阻塞+多进程/多线程
  • 更稳定,Bug 少
  • 模块更丰富

最大连接数

基础背景:

  • Nginx 是多进程模型,Worker 进程用于处理请求。
  • 单个进程的连接数(文件描述符 fd),有上限(nofile):ulimit -n。
  • Nginx 上配置单个 Worker 进程的最大连接数:worker_connections 上限为 nofile。
  • Nginx 上配置 Worker 进程的数量:worker_processes。

因此,Nginx 的最大连接数:

  • Nginx 作为通用服务器时,最大的连接数:Worker进程数量 * 单个Worker进程的最大连接数。

  • Nginx 作为反向代理服务器时,能够服务的最大连接数:(Worker 进程数量 * 单个 Worker 进程的最大连接数)/ 2。

    Nginx 反向代理时,会建立 Client 的连接和后端 Web Server 的连接,占用 2 个连接。

6 - MySQL 模型

MySQL启动Socket监听

看源码,首先就需要找到其入口点,mysqld的入口点为mysqld_main,跳过了各种配置文件的加载 之后,我们来到了network_init初始化网络环节,如下图所示:

NAME

下面是其调用栈:

mysqld_main (MySQL Server Entry Point)
	|-network_init (初始化网络)
		/* 建立tcp套接字 */
		|-create_socket (AF_INET)
		|-mysql_socket_bind (AF_INET)
		|-mysql_socket_listen (AF_INET)
		/* 建立UNIX套接字*/
		|-mysql_socket_socket (AF_UNIX)
		|-mysql_socket_bind (AF_UNIX)
		|-mysql_socket_listen (AF_UNIX)

值得注意的是,在tcp socket的初始化过程中,考虑到了ipv4/v6的两种情况:

// 首先创建ipv4连接
ip_sock= create_socket(ai, AF_INET, &a);
// 如果无法创建ipv4连接,则尝试创建ipv6连接
if(mysql_socket_getfd(ip_sock) == INVALID_SOCKET)
 	ip_sock= create_socket(ai, AF_INET6, &a);

如果我们以很快的速度 stop/start mysql,会出现上一个mysql的listen port没有被release导致无法当前mysql的socket无法bind的情况,在此种情况下mysql会循环等待,其每次等待时间为当前重试次数retry * retry/3 +1秒,一直到设置的–port-open-timeout(默认为0)为止,如下图所示:

NAME

MySQL新建连接处理循环

通过 handle_connections_sockets 处理 MySQL 的新建连接循环,根据操作系统的配置通过 poll/select 处理循环(非epoll,这样可移植性较高,且mysql瓶颈不在网络上)。 MySQL通过线程池的模式处理连接(一个连接对应一个线程,连接关闭后将线程归还到池中)

如下图所示:

NAME

对应的调用栈如下所示:

handle_connections_sockets
	|->poll/select
	|->new_sock=mysql_socket_accept(...sock...) /*从listen socket中获取新连接*/
	|->new THD 连接线程上下文 /* 如果获取不到足够内存,则shutdown new_sock*/
	|->mysql_socket_getfd(sock) 从socket中获取
		/** 设置为NONBLOCK和环境有关 **/
	|->fcntl(mysql_socket_getfd(sock), F_SETFL, flags | O_NONBLOCK);
	|->mysql_socket_vio_new
		|->vio_init (VIO_TYPE_TCPIP)
			|->(vio->write = vio_write)
			/* 默认用的是vio_read */
			|->(vio->read=(flags & VIO_BUFFERED_READ) ?vio_read_buff :vio_read;)
			|->(vio->viokeepalive = vio_keepalive) /*tcp层面的keepalive*/
			|->.....
	|->mysql_net_init
		|->设置超时时间,最大packet等参数
	|->create_new_thread(thd) /* 实际是从线程池拿,不够再新建pthread线程 */
		|->最大连接数限制
		|->create_thread_to_handle_connection
			|->首先看下线程池是否有空闲线程
				|->mysql_cond_signal(&COND_thread_cache) /* 有则发送信号 */
			/** 这边的hanlde_one_connection是mysql连接的主要处理函数 */
			|->mysql_thread_create(...handle_one_connection...)		

MySQL 的 VIO

如上图代码中,每新建一个连接,都随之新建一个 vio(mysql_socket_vio_new->vio_init),在vio_init的过程中,初始化了一堆回掉函数,如下图所示:

NAME

我们关注点在vio_read和vio_write上,如上面代码所示,在笔者所处机器的环境下将MySQL连接的socket设置成了非阻塞模式(O_NONBLOCK)模式。所以在vio的代码里面采用了nonblock代码的编写模式,如下面源码所示:

vio_read

size_t vio_read(Vio *vio, uchar *buf, size_t size)
{
  while ((ret= mysql_socket_recv(vio->mysql_socket, (SOCKBUF_T *)buf, size, flags)) == -1)
  {
    ......
    // 如果上面获取的数据为空,则通过select的方式去获取读取事件,并设置超时timeout时间
    if ((ret= vio_socket_io_wait(vio, VIO_IO_EVENT_READ)))
        break;
  }
}

即通过while循环去读取socket中的数据,如果读取为空,则通过vio_socket_io_wait去等待(借助于select的超时机制),其源码如下所示:

vio_socket_io_wait
	|->vio_io_wait
		|-> (ret= select(fd + 1, &readfds, &writefds, &exceptfds, 
              (timeout >= 0) ? &tm : NULL))

笔者在jdk源码中看到java的connection time out也是通过这,select(…wait_time)的方式去实现连接超时的。 由上述源码可以看出,这个mysql的read_timeout是针对每次socket recv(而不是整个packet的),所以可能出现超过read_timeout MySQL仍旧不会报错的情况,如下图所示:

NAME

vio_write

vio_write实现模式和vio_read一致,也是通过select来实现超时时间的判定,如下面源码所示:

size_t vio_write(Vio *vio, const uchar* buf, size_t size)
{
  while ((ret= mysql_socket_send(vio->mysql_socket, (SOCKBUF_T *)buf, size, flags)) == -1)
  {
    int error= socket_errno;

    /* The operation would block? */
    // 处理EAGAIN和EWOULDBLOCK返回,NON_BLOCK模式都必须处理
    if (error != SOCKET_EAGAIN && error != SOCKET_EWOULDBLOCK)
      break;

    /* Wait for the output buffer to become writable.*/
    if ((ret= vio_socket_io_wait(vio, VIO_IO_EVENT_WRITE)))
      break;
  }
}

MySQL 的连接线程处理

从上面的代码:

mysql_thread_create(...handle_one_connection...)

可以发现,MySQL每个线程的处理函数为handle_one_connection,其过程如下图所示:

NAME

代码如下所示:

for(;;){
	// 这边做了连接的handshake和auth的工作
	rc= thd_prepare_connection(thd);
	// 和通常的线程处理一样,一个无限循环获取连接请求
	while(thd_is_connection_alive(thd))
	{
		if(do_command(thd))
			break;
	}
	// 出循环之后,连接已经被clientdu端关闭或者出现异常
	// 这边做了连接的销毁动作
	end_connection(thd);
end_thread:
	...
	// 这边调用end_thread做清理动作,并将当前线程返还给线程池重用
	// end_thread对应为one_thread_per_connection_end
	if (MYSQL_CALLBACK_ELSE(thread_scheduler, end_thread, (thd, 1), 0))
		return;	
	...
	// 这边current_thd是个宏定义,其实是current_thd();
	// 主要是从线程上下文中获取新塞进去的thd
	// my_pthread_getspecific_ptr(THD*,THR_THD);
	thd= current_thd;
	...
}

mysql的每个woker线程通过无限循环去处理请求。

线程的归还过程

MySQL通过调用one_thread_per_connection_end(即上面的end_thread)去归还连接。

MYSQL_CALLBACK_ELSE(...end_thread)
	one_thread_per_connection_end
		|->thd->release_resources()
		|->......
		|->block_until_new_connection

线程在新连接尚未到来之前,等待在信号量上(下面代码是C/C++ mutex condition的标准使用模式):

static bool block_until_new_connection()
{	
	mysql_mutex_lock(&LOCK_thread_count);
	......
    while (!abort_loop && !wake_pthread && !kill_blocked_pthreads_flag)
      mysql_cond_wait(&x1, &LOCK_thread_count);
   ......
   // 从等待列表中获取需要处理的THD
   thd= waiting_thd_list->front();
   waiting_thd_list->pop_front();
   ......
   // 将thd放入到当前线程上下文中
   // my_pthread_setspecific_ptr(THR_THD,  this)    
   thd->store_globals();
   ......
   mysql_mutex_unlock(&LOCK_thread_count);
   .....
}

整个过程如下图所示:

NAME

由于MySQL的调用栈比较深,所以将thd放入线程上下文中能够有效的在调用栈中减少传递参数的数量。

总结

MySQL的网络IO模型采用了经典的线程池技术,虽然性能上不及reactor模型,但好在其瓶颈并不在网络IO上,采用这种方法无疑可以节省大量的精力去专注于处理sql等其它方面的优化。