Kanon(三):深入EventLoop

Reactor中聊到了事件循环,但是没有深入了解其构成,大体来说:

  • (Private)提供注册注销Channel的API(转发给poller_成员
  • (Private)提供判断是否与初始化时绑定的线程一致的API以及断言API(保证One loop per thread
  • 提供启动和退出事件循环API
  • 提供添加异步调用API(第三阶段)
  • 提供定时器的注册注销API

保证One loop per thread

如何判断各种调用是不是在绑定的事件循环初始化时所在的线程上是维护One loop per thread不变式必要的功能。
如果TcpConnection的各种调用最终是在不同的线程上运行的,会带来线程安全问题以及没有充分利用多核,因此这是必须避免的。
具体来说事件循环在初始化的时候会绑定对应的线程ID(tid),这是个TLS(Thread local storage)变量,所以在不同线程看来是不一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
EventLoop::EventLoop()
: owner_thread_id_{ CurrentThread::t_tid }
//, ..
{
//..
}

void EventLoop::AssertInThread() noexcept {
if (!this->IsLoopInThread())
this->AbortNotInThread();
}

bool EventLoop::IsLoopInThread() noexcept {
return CurrentThread::t_tid == owner_thread_id_;
}

void EventLoop::AbortNotInThread() noexcept
{ LOG_FATAL << "The policy of \"One Loop Per Thread\" has destroyed!"; }

这里的LOG_FATAL最终会调用abort()

退出事件循环

对于StartLoop()一般是同步调用,因此不需要特别关心线程安全的问题,相对来说Quit()可能会被异步调用,因此quit_变量声明为std::atomic<bool>,保证其更新是可见的。

1
2
3
4
5
6
7
8
9
10
11
12
13
void EventLoop::Quit() noexcept {
quit_ = true;

LOG_DEBUG_KANON << "EventLoop is quiting";

// If in the IO thread, call Wakeup() in Quit() is not necessary,
// because it only few cases can call Quit() successfully
// * In the loop, the only choice is timer event, but it is not blocking
// * In the other thread, do an asynchronous call, need call Wakeup() to
// avoid empty poll like QueueToLoop()
if (! this->IsLoopInThread())
this->Wakeup();
}

如果确实是异步调用,事件循环可能处于阻塞状态,考虑这点,需要唤醒事件循环。

正如之前所说,Wakeup()是利用eventfd实现的。具体可以参考man eventfd,这里展示封装好的API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
namespace detail {

/**
* Event fd API
*/
static inline int CreateEventFd() noexcept {
int evfd = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);

LOG_TRACE_KANON << "eventfd: " << evfd << " created";

if (evfd < 0) {
LOG_SYSERROR << "eventfd() error occurred";
}

return evfd;
}

/**
* Eventfd maintains a counter.
* The write() adds the 8-byte integer value to counter.
* The read() will block if counter is zero.
* Therefore, the dummy of write is must not be zero.
* \see man eventfd(2)
*/
static inline void ReadEventFd(int evfd) noexcept {
uint64_t dummy;
if (sizeof dummy != ::read(evfd, &dummy, sizeof dummy))
LOG_SYSERROR << "ReadEventFd() error occurred";
}

static inline void WriteEventFd(int evfd) noexcept {
uint64_t dummy = 1;
if (sizeof dummy != ::write(evfd, &dummy, sizeof dummy))
LOG_SYSERROR << "WriteEventFd() error occurred";
}

} // namespace detail

void EventLoop::EvRead() noexcept {
detail::ReadEventFd(ev_channel_->GetFd());
}

void EventLoop::Wakeup() noexcept {
detail::WriteEventFd(ev_channel_->GetFd());
}

EventLoop::EventLoop()
: ev_channel_{ kanon::make_unique<Channel>(this, detail::CreateEventFd()) }
//, ..
{
// ...
ev_channel_->SetReadCallback([this](TimeStamp receive_time){
LOG_TRACE_KANON << "EventFd receive_time: " << receive_time.ToFormattedString(true);
this->EvRead();
});

ev_channel_->SetWriteCallback([this](){
this->Wakeup();
});

ev_channel_->EnableReading();
// ...
}

说白了,就是利用它触发读写事件使事件循环从阻塞状态中切换出来,这样就可以进行下一轮的循环,也可以检查quit_是否被设置终止循环。

异步调用的处理

异步调用相关API有QueueToLoop(), CallFunctors(), RunInLoop()

既然是存储异步调用,自然需要考虑线程安全的问题。

这里通过互斥锁保护functors_容器,在CallFunctors()中可以先创建一个相同的空容器,然后和functors_进行交换,这样functors_就是私有的了(除非跨栈),其他线程也可以继续注册异步调用到新容器中,也就说只需要保证交换的原子性即可。

可以把它理解成前后台的关系,CallFunctors()是后台,而QueueToLoop()是前台。

QueueToLoop()也可能被异步调用,因此需要唤醒事件循环,以避免注册的调用被阻塞。但是,同步注册也可能发生,比如注册的调用中也调用了QueueToLoop(),那么也得唤醒事件循环,所以下面用到了calling_functors_识别这种情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void EventLoop::QueueToLoop(FunctorCallback cb) {
{
MutexGuard dummy(lock_);
functors_.emplace_back(std::move(cb));
}

// If not in IO thread(async), and not event occurred, then block.
// That's wrong, since it is expected to be called immediately.

// If call QueueToLoop() in the "call functor" phase(i.e. phase 3), we can't ensure
// the next loop don't do a empty poll(and block), should Wakeup() eventfd to avoid
// it. The other two phase it OK, since phase 3 in after.
if (!IsLoopInThread() || calling_functors_) {
Wakeup();
}
}

void EventLoop::CallFunctors() {
decltype(functors_) functors;
{
MutexGuard dummy{ lock_ };
functors.swap(functors_);
}

calling_functors_ = true;
for (auto& func : functors) {
try {
if (func) {
func();
}
} catch(std::exception const& ex) {
LOG_ERROR << "std::exception caught in CallFunctors()";
LOG_ERROR << "Reason: " << ex.what();
calling_functors_ = false;
KANON_RETHROW;
} catch(...) {
LOG_ERROR << "Unknown exception caught in CallFunctors()";
calling_functors_ = false;
KANON_RETHROW;
}
}

calling_functors_ = false;
}

RunInLoop()QueueToLoop()在同一线程时语义不同:
前者会立即调用而后者会先缓存起来。不同线程两者是等效的。
一般调用RunInLoop()就可以保证One loop per thread成立。但是有些场景,不得不调用QueueToLoop()将同一线程的调用推迟到第三阶段,比如TcpConnectionCloseHandle()

定时器API

定时器API都是转发给TimeQueue成员:一次性定时器和可重复定时器的注册及其注销。

1
2
3
4
5
6
7
8
9
10
11
TimerId EventLoop::RunAt(TimerCallback cb, TimeStamp expiration) {
return timer_queue_->AddTimer(std::move(cb), expiration, 0.0);
}

TimerId EventLoop::RunEvery(TimerCallback cb, TimeStamp expiration, double interval) {
return timer_queue_->AddTimer(std::move(cb), expiration, interval);
}

void EventLoop::CancelTimer(TimerId timer_id) {
timer_queue_->CancelTimer(timer_id);
}

因此,这里不细展开。