Kanon(二):Reactor模式

概念

Reactor直译为反应器/堆,之所以这么起名我想是因为Reactor模式是一个对触发事件反应的事件处理设计模式
Reactor通过事件解多路复用器获取多个准备就绪的输入源,然后根据每个输入源请求(即触发的事件)分发到具体的事件处理器
因此Reactor模式的核心就是事件分发

Reactor可以说是网络同步并发模型最广泛被使用的,比如libeventredis均采用了该模型。与之相对的是”真正“异步Proactor模式,这个被ASIO采用。
对于Unix/Linux等Unix-like平台而言,更为偏爱Reactor,因为它们提供的多路复用API就是同步的,而对于WindowsIOCP而言,更适合于Proactor

由于kanon暂时不考虑跨平台,所以采用Reactor不需要考虑如何兼容WindowsIOCP

reactor_1.png

组件

根据Wikipedia,Reactor模式由以下构成:

  • Resources:提供输入或消费输出的资源,对应的是Socket fd
  • Synchronous Event Demultiplexer:对应的是select/poll/epoll这些多路复用API,它们会阻塞当前进程,等待有资源可用(事件触发),然后送往事件分发器。
  • Dispatcher:主要负责注册和注销事件处理器,以及根据触发的事件分发到具体的事件处理器。
  • Request Handler:具体的事件处理器,一个事件处理器只会关联一个资源。

当然,还得有个组件提供注册感兴趣的事件注销事件的功能。
从OS提供的API的角度来看待,要注册或注销事件必须通过struct pollfdepoll_ctl(),也就是说解复用器提供这些功能的API。
为了简化解复用器的负责工作,由事件分发器提供事件的注册和注销:转发给解复用器(在下面你会看到kanon是怎么做的)。从语义上来看,事件分发器提供这些服务也是容易接受的。

kanon的对应设施

kanon中可以对应为:

  • Resources - Channel中的socket fd
  • Demultipler - (E)Poller
  • Dispatcher - Channel
  • Request Handler - 各种回调

由于事件不止触发一次,后面仍然有事件触发,需要不断循环,即所谓的EventLoop

event_loop.png
kanon的事件循环分为三个阶段:

  • 等待事件阶段:通过poll()epoll_wait()阻塞当前进程,等待有事件触发,进入下一个阶段
  • 事件处理阶段Channel根据具体的事件类型分发到各个回调,处理各种事件
  • 异步回调处理阶段:维护一个存储异步回调(std::function<void>)的容器,在该阶段调用它们

前面两个阶段已经构成了事件循环,出于以下两个目的,设置第三阶段:

  • 不同线程的异步调用,如果要保证线程安全的话,大部分场景下需要加锁,通过在一个特定的阶段统一调用可以实现无锁串行化
  • 推迟调用(尽管同一线程,也是异步),比如TcpConnection在事件处理阶段需要销毁自身需要一同销毁Channel,但仍在事件处理阶段,这样做是危险的,因此需要推迟。

能够这样添加第三阶段是因为事件循环本身是阻塞的,这点和一般的应用不同。

由于事件循环本身是阻塞的,当有异步调用添加到容器中时,需要唤醒(wakeup)事件循环,这个是通过eventfd实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void EventLoop::StartLoop() {
assert(!looping_);
AssertInThread();

looping_ = true;

std::vector<Channel*> activeChannels;

while (!quit_) {
auto receive_time = poller_->Poll(POLLTIME, activeChannels);

for (auto& channel : activeChannels) {
channel->HandleEvents(receive_time);
}

CallFunctors();

activeChannels.clear();
}

looping_ = false;
}

通过类图来描述Reactor组件之间的关系会更清晰:
reactor.drawio.png

基本上网络组件都有EventLoop*成员,出于两个原因:

  • 使用其Public方法,比如Channel会使用EventLoopXXXChannel()
  • 保证One loop per thread不变式成立(通过AssertInThread()

多线程Reactor(Mutithread-Reactor)

显然,单线程Reactor处理IO事件在大多数场景是足够的,但是在多核机器上没有充分利用多核优势。
我们将处理IO事件的线程称为IO线程,在单线程模式下,主线程就是唯一的IO线程,而多线程Reactor模式下,我们将除主线程外的其他线程作为IO线程去处理IO事件,主线程接受连接并通过特定的调度策略将接受到的连接分发到不同的IO线程中去,这样就利用了多核优势,每个IO线程处理一定数量的连接,可以认为起到了均衡负载器的作用。
这就是所谓的One loop per thread策略:通过多个IO线程处理各自的IO事件,可以在同一时间响应多个客户端请求(并发执行)。
mutithread_reactor.png

一般IO线程的分配需要根据机器的核数服务类型来确定。

  • IO密集型服务,由于主要是IO操作,可以让出cpu给其他线程,cpu占用率低,创建超过机器核数倍数的IO线程可以使多个线程被充分利用
  • CPU密集型服务或计算密集型服务,由于需要进行大量运算,cpu占用率高,创建的线程数不超过机器核数最好,不然会带来多余的上下文切换。

PS:多线程Reactor只有服务端可以启动