作者近日在研读Netty的源码,Netty本身内部采用Reactor模式来作为它的核心处理模型,故此想更加深入的了解Reactor模型的产生,以及他解决了什么问题。本文的内容是对Douglas C. Schmidt的《Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events》[1]一文的大致翻译,补充了一份可以实际运行的模型代码[3]。在参考文献一栏提供了文件的下载地址。
一、问题
在一个分布式场景下的日志服务器如图一所示。客户端应用将会连接到日志服务器,记录客户端应用自身的状态,debug日志以及性能报告等数据。日志记录被发送到中心的日志服务器上,后续可以转发到不同种类的输出设备上,例如控制台、打印机,文件或者是数据库中。日志服务器需要并发处理连接请求和日志记录。 客户端应用和日志服务器之间的通讯协议采用面向连接的协议,例如TCP,这也就意味着如果客户端要于日志服务器发生通讯需要先发送连接请求。日志服务器需要监听一个 固定的地址等待连接请求的到达。当连接建立完成后服务器还要在新建一个句柄来与客户端发生通讯,等待后续客户端应用的服务请求。一旦客户端连接后,他们将会并发发送 日志记录。
为了处理并发请求,通常的想法是开多线程处理。例如图2,这种方法同步接受请求然后“每一个连接一个线程”去处理客户端的日志记录。看起来是挺不错的,但是在多线程下 需要去思考如下这些问题:
- 性能。由于上下文切换、同步和数据移动,线程可能会导致性能不佳
- 编程难度。多线程模式下可能需要复杂的并发控制。
- 可移植性。某些操作系统可能不支持多线程。 在单线程模式下,传统的IO操作必须等连接建立完成,等待日志记录完成后才能进行处理下一个连接请求。期间在队列中的请求将会一直被阻塞。
实现一个上述的分布式下的日志服务器应该要满足些什么要求呢?
- 可用性:即使服务器正在等待其他请求到达,它也必须能够处理传入的请求。特别是,服务器不能在排除其他事件源的情况下无限期地阻止处理任何单个事件源,因为这可能会显着延迟对其他客户端的响应。
- 性能:服务器应该低延迟,高吞吐,避免CPU空转
- 编程难度:服务器设计应该足够简单,能够适用于并发场景
- 要能够适应变化:当添加新的功能,例如添加一个服务端的消息格式转换应该要减少对已存在代码的影响,提供代码的可维护性。
- 可移植性:将程序迁移到其他操作系统上不需要做太多的额外处理。
二、解决方案
集成同步事件解复用和处理事件的相应事件处理程序的分派。此外,将特定于应用程序的服务分派和实现与通用事件多路分解和分派机制分离。
对于应用程序提供的每项服务,引入一个单独的Event Handler
来处理某些类型的事件。所有Event Handler
都实现相同的接口。事件处理程序向Initiation Dispatcher
注册,该分派器使用同步事件解复用(Synchronous Event Demultiplexer
)来等待事件发生。当事件发生时,同步事件解复用器通知Initiation Dispatcher
,Initiation Dispatcher
同步回调与事件关联的Event Handler
。然后Event Handler
将事件分派给实现所请求服务的方法。
三、Reactor的结构
组件
句柄(Handles)
标识被操作系统管理的资源,例如网络连接,打开的文件等。在日志服务器中使用句柄来标识套接字,以便于同步解复用器在等待事件上的通知。日志服务器中感兴趣的事件是连接事件和读取事件。
同步事件解复用器(Synchronized Demultiplexer)
阻塞并等待一组句柄的事件触发。当有可进行的操作时将会解除阻塞。一个常见的事件解复用器是select,在linux和window平台下都能提供。
分发器(InitiationDispatcher)
定义Event Handler
的注册,移除以及事件的分发。当事件触发时,同步解复用器将会通知相关线程,返回可操作的句柄以及触发的事件,Initiation Dispatcher
将会回调对应的Event Handler`执行业务逻辑。
事件处理(Event Handler
)
Event Handler
是一个接口,指定了一个钩子方法。当Initiation Dispatcher
上触发事件以后将会调用相关的Event Handler
。
具体的事件处理器(Correct Event Handler)
是Event Handler
的具体实现,对指定的事件做对应的处理。例如上述的日志服务器的例子实现Event Handler
分别处理连接事件和日志记录的事件。
这些组件的关系如下图所示。
Reactor各组件之间的协作
当Event Handler
注册到Initiation Dispatcher
表示希望当相关联的句柄上触发事件的时候通知事件对应的Event Handler
。一个句柄和一个Event Handler
关联。
当所有的Correct Event Handler
全部都注册到Initiation Dispatcher
上后,Initiation Dispatcher
需要先提取Event Handler
对应的句柄注册到同步解复用器上,用于等待通知,然后Initiation Dispatcher
开始调用handle_events()
方法开启事件轮询。同步解复用器会等待相关句柄事件的触发。在TCP协议层使用select的同步解复用器等待客户端的连接和已建立连接的sockets 句柄的记录日志请求(在TCP层表现为读事件)。
当同步解复用器上有事件触发的时候将会解除阻塞,然后循环调用Event Handler
的handle_event
方法实现事件的分发处理。
具体流程如下图所示:
四、协作场景
以下将会描述将Reactor模式应用到日志服务器的应用。假定日志服务器只有两个简单的操作,接受连接以及分发日志记录请求。
客户端连接到日志服务器
操作序列如下:
- 日志服务器注册一个
Logging Acceptor
到Initiation Dispatcher
上用于处理连接请求。 - 日志服务器调用
handle_events()
方法 Initiation Dispatcher
调用同步事件解复用器等待连接请求- 客户端发起到日志服务器的连接。
- 日志服务器上同步事件解复用器解除阻塞,
Logging Acceptor
被Initiation Dispatcher
通知。 Logging Acceptor
接受一个新的连接Logging Acceptor
创建一个Logging Handler
,新的连接与其绑定。Logging Handler
注册到Initiation Dispatcher
,Initiation Dispatcher
提取Logging Handler
的绑定句柄,并注册到同步事件解复用器
客户端发送日志记录到日志服务器
操作步骤如下所示:
- 客户端发送一个日志记录
Initiation Dispatcher
的同步事件解复用器解除阻塞,通知指定的句柄可读,Initiation Dispatcher
调用Logging Handler
Logging Handler
非阻塞的完成数据的接收Logging Handler
将数据写到目标输出。Logging Handler
调用返回,Initiation Dispatcher
在完成对所有Event Handler
的调用后将会开始新的一次事件轮询
五、日志服务器的Reactor模型的实现
同步事件解复用器
Initiation Dispatcher
使用同步事件解复用器去同步等待多个事件的触发。在Linux或者是Window平台都可以使用调用例如select
的同步事件解复用器。在Linux2.6版本发布以后现在还可以使用Epoll。select
是有句柄限制的,受到FD_SETSIZE
的影响。具体可以参考select
的文档,见参考文献[2]。
Initiation Dispatcher
实现Event Handler表存储
Initiation Dispatcher
需要使用一个容器去存储实际的Concrete Event Handlers
.Initiation Dispatcher
上的register
和remove
操作都需要对该容器进行修改。实现该容器的方法很多,例如hash表,线性表,以及基于索引的数组都可以完成。特别要注意的一点是需要处理并发的场景。保证该容器是线程安全的。
实现Event Loop
按照图3提供的类结构图,Initiation Dispatcher
的重点在于Event loop
,Event Loop
的入口点是handle_events方法。handle_events的首先会等待同步事件解复用器。当事件触发以后需要轮询所有的Event Handler
,提取Event Handler
上的句柄检测事件当前句柄是否有事件触发,然后对有事件的句柄调用handle_event
。在正常情况下,当完成所有的handle_event的调用后将会进行下一次Event Loop
等待在同步事件解复用器的事件触发。
必要的同步设计
在多线程模式下,Initiation Dispatcher
需要保证线程安全(确保共享变量的修改不会有问题),并且避免不必要的线程竞争。例如对存储Event Handler
的修改。同步的方式有多种,例如使用信号量和互斥变量都可以完成。为了避免自身死锁(即自身获取锁的情况,然后又调用了加锁,这样会导致自身死锁),可以使用递归锁,也可以叫做重入锁(Reentrant mutex
)。这一特性是非常重要的。
public class InitiationDispatcher {
private Selector selector;
private static InitiationDispatcher self = new InitiationDispatcher();
private InitiationDispatcher() {
}
public static InitiationDispatcher getInstance() {
return self;
}
void handle_events() throws IOException {
// open selector
openSelector();
// do event loop
for (; ; ) {
try {
// wait event occur
if (selector.select() > 0) {
// get notified handle
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
// for handle
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
// callback Event Handler
EventHandler handler = (EventHandler) key.attachment();
handler.handle_event(key.readyOps());
}
}
} catch (Throwable e) {
e.printStackTrace();
}
}
}
// register Event Handler to dispatcher
public boolean register_handler(EventHandler eventHandler, int ops) throws IOException {
openSelector();
eventHandler.get_handle().register(selector(), ops, eventHandler);
return true;
}
// remove Event Handler from dispatcher
public boolean remove_handler(EventHandler eventHandler, int ops) {
if (selector == null) {
return false;
}
try {
eventHandler.get_handle().unRegister(selector());
return true;
} catch (Throwable e) {
e.printStackTrace();
return false;
}
}
private Selector selector() {
return selector;
}
private void openSelector() throws IOException {
if (selector == null || !selector.isOpen()) {
// close old selector
if (selector != null) {
selector.close();
}
selector = Selector.open();
}
}
}
决定分发目标的类型
有两类Event Handler
可以关联到Handle
服务于Initiation Dispatcher
的处理逻辑。
Event Handler Objects
一种通用的方式是一个Event Handler
和一个句柄作为一个整体注册。如上述提到的图3中描述的。使用一个对象作为通知的目标一语代码的重用和扩展。此外,一个对象还能够整合一系列的状态和方法。Netty使用的是这一种方式。
Event Handler functions
另一种方式是直接关联一个Handler直接注册一个函数到Initiation Dispatcher
。这样的有点是不需要对Event Handler进行实现,比较简单。
可以使用适配器模式来整合两种场景。例如,一个适配器可以定义使用一个事件处理对象来持有一个事件处理函数的指针event handler function
。当handle_event
被调用的时候转发到该对象持有的事件处理函数上。
定义Event Handler接口
假定使用Event Handler 对象来做为通知目标,下一步需要Event Handler
接口。这个也有两种方法。
单方法接口
这种方法如图3所示。一个Correct Event Handler
只需要实现一个handle_event。这种情况下需要传入事件类型,在handle_event中做判断。
多方法接口
即在Event Handler接口中定义不同的方法,当触发不同的事件的时候调用不同方法完成处理。以日志服务器为例,可以定义handle_accept,handle_read等方法。Netty使用的是这一种方式
public interface EventHandler {
void handle_event(int readyOps);
Handle get_handle();
}
多方法接口的优势是可以有选择的覆盖基类的方法避免进一步的调用。例如在单方法handle_event中使用switch或者if。缺点是需要开发者需要有预见性提前知道需要有哪些Event Handler
,避免后续改动。其实可以选用工厂模式来对分发代码的问题。
决定Initiation Dispatchers的数量
正常情况下Initiation Dispatchers可以是单例的,但是因为操作系统的限制,例如上文提到的select能操作的句柄数是有限制的。因此考虑使用多线程来解决这个问题。
Concrete Event Handlers的实现
开发者需要去决定何种事件能调用何种Event Handler的handle_event.以下是Event Handler的在日志服务器上的两种实现。一个是Logging Acceptor用于连接的建立,一种是Logging Handler用于数据的接收。
Logging Acceptor class
Logging Acceptor主动接收客户端的请求连接,然后创建一个于客户端配对的Logging Handler对象,用于接收和处理客户端的日志记录,然后将它注册到Inititation Dispatcher
。关键代码如下:
public class LoggingAcceptor implements EventHandler {
private Handle handle;
public LoggingAcceptor(Handle handle) {
this.handle = handle;
}
@Override
public void handle_event(int readyOps) {
if ((readyOps & SelectionKey.OP_ACCEPT) != 0) {
try {
// accept new connection
Handle childHandle = handle.accept();
// create new Event Handler
EventHandler eventHandler = new LoggingHandler(childHandle);
// register to Initiation Dispatcher
InitiationDispatcher.getInstance().register_handler(eventHandler,SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public Handle get_handle() {
return handle;
}
}
Logging Handler class
当Logging Handler
注册到Inititation Dispatcher
只关注读事件和连接关闭事件。当对应的Socket handle的读事件触发时候Inititation Dispatcher
会调用Loging Handler
的handle_event
进行数据读取和写入到其他输出中。触发连接关闭事件时需要关闭连接,并且释放相关的资源。
public class LoggingHandler implements EventHandler {
private Handle handle;
public LoggingHandler(Handle handle) {
this.handle = handle;
}
@Override
public void handle_event(int readyOps) {
if ((readyOps & SelectionKey.OP_READ) != 0) {
try {
byte[] data = handle.read();
if (data != null) {
// write to other device
System.out.println(new String(data));
} else {
// socket is close
InitiationDispatcher.getInstance().remove_handler(this, SelectionKey.OP_READ);
System.out.println("client close!");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public Handle get_handle() {
return handle;
}
}
封装Handle的句柄操作
public class Handle {
private final SelectableChannel selectableChannel;
private SelectionKey selectionKey;
public Handle(SelectableChannel selectableChannel) {
this.selectableChannel = selectableChannel;
}
public void register(Selector selector, int ops, Object attachment) throws ClosedChannelException {
if (selectableChannel.isOpen()) {
selectionKey = selectableChannel.register(selector, ops, attachment);
selector.wakeup();
}
}
public void unRegister(Selector selector) {
selectionKey.cancel();
selector.wakeup();
}
public Handle accept() throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectableChannel;
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
return new Handle(channel);
}
public byte[] read() throws IOException {
SocketChannel socketChannel = (SocketChannel) selectableChannel;
byte[] rcv = new byte[100 * 1024];
ByteBuffer bf = ByteBuffer.allocate(1024);
int len, pos = 0;
while ((len = socketChannel.read(bf)) > 0) {
bf.flip();
System.arraycopy(bf.array(), 0, rcv, pos, len);
pos += len;
bf.clear();
}
if (len == -1) {
return null;
}
byte[] result = new byte[pos];
System.arraycopy(rcv, 0, result, 0, pos);
return result;
}
}
启动器
public class ExampleServer {
public static void main(String[] args) {
try {
// open socket listener
ServerSocketChannel serverSocketChannel = SelectorProvider.provider().openServerSocketChannel();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(2000));
// create a handle
Handle handle = new Handle(serverSocketChannel);
// create a event_handler
EventHandler acceptor = new LoggingAcceptor(handle);
// register a handler
InitiationDispatcher.getInstance().register_handler(acceptor, SelectionKey.OP_ACCEPT);
// open event loop
InitiationDispatcher.getInstance().handle_events();
} catch (IOException e) {
e.printStackTrace();
}
}
}
以上的实现内容是可以在Java环境下正常执行,模拟了一个简单的LoggerServer的功能。
六、Reactor的优缺点
优势
Reactor模型有以下有点:
- 关注点分离:Reactor 模式分离实际的业务处理代码以及事件解复用,解复用事件开发人员只需要关心系统中可能存在的触发事件源以及完成事件触发的动作,而业务开发人员只需要关注业务的实现不需要关系具体的事件怎么分派。
- 提高事件驱动应用程序的模块化、可重用性和可配置性: 该模式将应用程序功能分离到单独的类中。例如,日志服务器中有两个独立的类:一个用于建立连接,另一个用于接收和处理日志记录。这种解耦使得不同类型的面向连接的服务(如文件传输、远程登录和视频点播)可以重用连接建立类。因此,修改或扩展日志服务器的功能只会影响日志处理程序类的实现
- 提高应用程序的可移植性:
Initiation Dispatcher
的接口可以独立于执行事件多路分解的 OS 系统调用而重用。在不同的OS下可以选择不同的事件解复用器,Initiation Dispatcher
而无需发生变化。例如在类Unix平台下可以使用select 或者poll,windows下可以使用Win32API完成事件解复用。 - 提供粗粒度的并发控制: Reactor 模式在进程或线程内的事件多路分解和分派级别序列化事件处理程序的调用。 Initiation Dispatcher 级别的序列化通常消除了对应用程序进程中更复杂的同步或锁定的需要
缺点
- 限制适用性:Reactor 模式只有在操作系统支持句柄的情况下才能有效地应用。可以使用 Initiation Dispatcher 中的多个线程来模拟 Reactor 模式的语义,例如每个句柄一个线程。每当句柄上有可用的事件时,其关联的线程将读取该事件并将其放置在由启动调度程序按顺序处理的队列中。然而,这种设计通常非常低效,因为它序列化了所有事件处理程序,从而增加了同步和上下文切换开销,而没有增强并行性。
- 非抢占式:在单线程应用程序进程中,事件处理程序在执行时不会被抢占。这意味着事件处理程序不应在单个 Handle 上执行阻塞 I/O,因为这将阻塞整个进程并阻碍连接到其他 Handle 的客户端的响应。一个主动对象使用多线程或多处理来与
Event Handler
的主事件循环并行完成其任务。 - 难调试:使用 Reactor 模式编写的应用程序可能难以调试,调试过程中可能会在框架代码和应用程序之间跳动。如果开发人员不了解框架代码的情况下将会无从着手。
七、个人理解于评价
Reactor模型实际是IO多路复用和多线程技术的结合。Reactor模型通过使用IO多路复用技术来解决传统多线程IO并发模型的问题,IO多路复用下,单个线程可以记录每个I/O流的状态,使得单个线程可以对多个IO流的管理。单个线程已经可以完成多个I/O流的管理,如果在此之上添加新的线程将能够管理更多的I/O流,同时所需要的线程数量也远远小于传统的I/O并发模型,也进一步减少了资源的消耗。
参考文献
- [1] [Reactor,An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events],By Douglas C. Schmidt
- [2] 可执行的Reactor模型Demo,By AlvinKwok
- [3] I/O多路复用,By 罗志宇