muduo的並發模型為one loop per thread+ threadpool。為了方便使用,muduo封裝了EventLoop和Thread為EventLoopThread,為了方便使用線程池,又把EventLoopThread封裝為EventLoopThreadPool。所以這篇博文並沒有涉及到新鮮的技術,但是也有一些封裝和邏輯方面的注意點需要我們去分析和理解。
EventLoopThread
任何一個線程,只要創建並運行了EventLoop,就是一個IO線程。 EventLoopThread類就是一個封裝好了的IO線程。
EventLoopThread的工作流程為:
1、在主線程創建EventLoopThread對象。
2、主線程調用EventLoopThread.start(),啟動EventLoopThread中的線程(稱為IO線程),並且主線程要等待IO線程創建完成EventLoop對象。
3、IO線程調用threadFunc創建EventLoop對象。通知主線程已經創建完成。
4、主線程返回創建的EventLoop對象。
EventLoopThread.h
class EventLoopThread : boost::noncopyable
{
public:
typedef boost::function ThreadInitCallback;
EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback());
~EventLoopThread();
EventLoop* startLoop(); // 啟動線程,該線程就成為了IO線程
private:
void threadFunc(); // 線程函數
EventLoop* loop_; // loop_指針指向一個EventLoop對象
bool exiting_;
Thread thread_;
MutexLock mutex_;
Condition cond_;
ThreadInitCallback callback_; // 回調函數在EventLoop::loop事件循環之前被調用
};
EventLoopThread.cc
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb)
: loop_(NULL),
exiting_(false),
thread_(boost::bind(&EventLoopThread::threadFunc, this)),
mutex_(),
cond_(mutex_),
callback_(cb)
{
}
EventLoopThread::~EventLoopThread()
{
exiting_ = true;
loop_->quit(); // 退出IO線程,讓IO線程的loop循環退出,從而退出了IO線程
thread_.join(); //等待線程退出
}
EventLoop* EventLoopThread::startLoop()
{
assert(!thread_.started());
thread_.start();//線程啟動,調用threadFunc()
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
cond_.wait();//需要等待EventLoop對象的創建
}
}
return loop_;
}
void EventLoopThread::threadFunc()
{
EventLoop loop;
if (callback_)
{
callback_(&loop);
}
{
MutexLockGuard lock(mutex_);
// loop_指針指向了一個棧上的對象,threadFunc函數退出之後,這個指針就失效了
// threadFunc函數退出,就意味著線程退出了,EventLoopThread對象也就沒有存在的價值了。
// 因而不會有什麼大的問題
loop_ = &loop;
cond_.notify(); //創建好,發送通知
}
loop.loop();// 會在這裡循環,直到EventLoopThread析構。此後不再使用loop_訪問EventLoop了
//assert(exiting_);
}
測試程序:
#include#include #include using namespace muduo; using namespace muduo::net; void runInThread() { printf("runInThread(): pid = %d, tid = %d\n", getpid(), CurrentThread::tid()); } int main() { printf("main(): pid = %d, tid = %d\n", getpid(), CurrentThread::tid()); EventLoopThread loopThread; EventLoop* loop = loopThread.startLoop(); // 異步調用runInThread,即將runInThread添加到loop對象所在IO線程,讓該IO線程執行 loop->runInLoop(runInThread); sleep(1); // runAfter內部也調用了runInLoop,所以這裡也是異步調用 loop->runAfter(2, runInThread); sleep(3); loop->quit(); printf("exit main().\n"); }
對調用過程進行分析:(查看日志)
主線程調用 loop->runInLoop(runInThread); 由於主線程(不是IO線程)調用runInLoop, 故調用queueInLoop() 將runInThead 添加到隊列,然後wakeup() IO線程,IO線程在doPendingFunctors() 中取loop->runAfter() 要喚醒一下,此時只是執行runAfter() 添加了一個2s的定時器, 2s超時,timerfd_ 可讀,先handleRead()一下然後執行回調函數runInThread()。
那為什麼exit main() 之後wakeupFd_ 還會有可讀事件呢?那是因為EventLoopThead 棧上對象析構,在析構函數內 loop_ ->quit(), 由於不是在IO線程調用quit(),故也需要喚醒一下,IO線程才能從poll 返回,這樣再次循環判斷 while (!quit_) 就能退出IO線程。

muduo的思想時eventLoop+thread pool,為了更方便使用,將EventLoopThread做了封裝。main reactor可以創建sub reactor,並發一些任務分發到sub reactor中去。EventLoopThreadPool的思想比較簡單,用一個main reactor創建EventLoopThreadPool。在EventLoopThreadPool中將EventLoop和Thread綁定,可以返回EventLoop對象來使用EventLoopThreadPool中的Thread。
EventLoopThreadPool.h
class EventLoopThreadPool : boost::noncopyable
{
public:
typedef boost::function ThreadInitCallback;
EventLoopThreadPool(EventLoop* baseLoop);
~EventLoopThreadPool();
void setThreadNum(int numThreads) { numThreads_ = numThreads; }
void start(const ThreadInitCallback& cb = ThreadInitCallback());
EventLoop* getNextLoop();
private:
EventLoop* baseLoop_; // 與Acceptor所屬EventLoop相同
bool started_;
int numThreads_; // 線程數
int next_; // 新連接到來,所選擇的EventLoop對象下標
boost::ptr_vector threads_; // IO線程列表
std::vector loops_; // EventLoop列表
};
EventLoopThreadPool.cc
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop)
: baseLoop_(baseLoop),
started_(false),
numThreads_(0),
next_(0)
{
}
EventLoopThreadPool::~EventLoopThreadPool()
{
// Don't delete loop, it's stack variable
}
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
assert(!started_);
baseLoop_->assertInLoopThread();
started_ = true;
for (int i = 0; i < numThreads_; ++i)
{
EventLoopThread* t = new EventLoopThread(cb);
threads_.push_back(t);
loops_.push_back(t->startLoop()); // 啟動EventLoopThread線程,在進入事件循環之前,會調用cb
}
if (numThreads_ == 0 && cb)
{
// 只有一個EventLoop,在這個EventLoop進入事件循環之前,調用cb
cb(baseLoop_);
}
}
EventLoop* EventLoopThreadPool::getNextLoop()
{
baseLoop_->assertInLoopThread();
EventLoop* loop = baseLoop_;
// 如果loops_為空,則loop指向baseLoop_
// 如果不為空,按照round-robin(RR,輪叫)的調度方式選擇一個EventLoop
if (!loops_.empty())
{
// round-robin
loop = loops_[next_];
++next_;
if (implicit_cast(next_) >= loops_.size())
{
next_ = 0;
}
}
return loop;
}
mainReactor關注監聽事件,已連接套接字事件輪詢給線程池中的subReactors 處理,一個新的連接對應一個subReactor
我們采用round-robin(RR,輪叫)的調度方式選擇一個EventLoop,也就是getNextLoop函數。極端情況下,線程池中個數為0時,那麼新的連接交給mainReactor,這樣就退化成單線程的模式。