歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> 關於Linux >> 使用epoll在linux上開發高性能應用服務器

使用epoll在linux上開發高性能應用服務器

日期:2017/3/3 16:16:41   编辑:關於Linux

概述

epoll是linux提供一種多路復用的技術,類似各個平台都支持的select,只是epoll在內核的實現做了 更多地優化,可以支持比select更多的文件描述符,當然也支持 socket這種網絡的文件描述符。linux上 的大並發的接入服務器,目前的實現方式肯定都通過epoll實現。

epoll和線程

有很多開發人員用epoll的時候,會開多個線程來進行數據通信,比如一個線程專門accept(我個人早 些年在FreeBSD用kqueue的時候,由於對內部機制沒有基本了解也這樣搞),一個線程收發,或者接收和發 送都用各自獨立的線程。

通常情況下,accept獨立線程是沒必要的,accept對於內核而言,就應用程序從內核的未完成的SYN隊 列讀取一點數據而已。具體參見 accept部分:

TCP三次握手過程與對應的Berkeley Socket APIs的介紹

收發獨立成兩個線程也沒有必要,因為大部分的應用服務器,通常情況下,啟動一個線程收發數據, 最大數據的收發量瓶頸在於網卡,而不是CPU;像網游接入服務器配置一個KM的網卡,很少有游戲會申請 1G的帶寬,那一台機器得有多少數據輸入和輸出。所以我們通信線程為epoll服務器就夠了。

epoll的基本原理

為了讓某些朋友能讀得更連慣,我還是說一下epoll基本原理。

epoll外部表現和select是一樣的。他提供READ, WRITE和ERROR等事件。

大致流程像下面這樣:

1. 應用注冊感興趣的事件到內核;

2. 內核在某種條件下,將事件通知應用程序;

3. 應用程序收到事件後,根據事件類型做對應的邏輯處理。

原理部分我再說一下,不容易理解的地方,包括水平觸發和邊緣觸發,WRITE事件的事件利用(這個可 以結合參考文獻1的kqueue的WRITE事件,原理一致的)和修改事件的細節。

水平觸發

READ事件,socket recv buff有數據,將一直向應用程序通知,直到buff為空。

WRITE事件,socket send buff從滿的狀態到可發送數據,將一直通知應用程序,直到buff滿。

邊緣觸發

READ事件,socket recv buff有數據了,只通知應用一次,不管應用程序有沒有調用read api,接下 來都不通知了。

WRITE事件,socket send buff從滿的狀態到可以發送數據,只通知一次。

上面這個解釋不知道大家能否理解,也只能這樣說了。有疑問的做一下試驗。另外,這些細節的東西 ,前幾年固定下來後,這幾年做的項目,是直接用的,也就很少在涉及細節,是憑理解和記憶寫下的文字 ,萬一有錯請指正^-^。

WRITE事件的利用

這個還一下不好描述。大概描述一下,詳細看參考文獻1。大致這樣:

1. 邏輯層寫數據到應用層的發送buff,向epoll注冊一下WRITE事件;

2. 這時epoll會通知應用程序一個WRITE事件;

3. 在WRITE事件響應函數裡,從應用層的發送buff讀數據,然後用socket send api發送。

因為我在很多實際項目中,看到大家沒有利用epoll的WRITE的事件來發數據,特意地說一下。大部分 的項目,是直接輪詢應用程序的發送隊列,我早期項目也是這麼干的。

epoll的修改事件

對於這個我的映像比較深刻。epoll的修改事件比較坑爹,不能單獨修改某個事件!怎麼說呢?比如 epoll裡已經注冊了READ&WRITE事件,你如果想單單重注冊一下WRITE事件而且READ事件不變,epoll 的epoll_ctl API是做不到的,你必須同時注冊READ&WRITE,這個在下面的代碼中可以看到。FreeBSD 的kqueue在這一點完全滿足我們程序員的要求。

抽象epoll API

我把herm socket epoll封裝部分貼出來,讓朋友們參考一下epoll的用法。大部分錯誤拋異常代碼被 我去掉了。

class Multiplexor
{
public:
Multiplexor(int size, int timeout = -1, bool lt = true);
~Multiplexor();
void Run();
void Register(ISockHandler* eh, MultiplexorMask mask);
void Remove(ISockHandler* eh);
void EnableMask(ISockHandler* eh, MultiplexorMask mask);
void DisableMask(ISockHandler* eh, MultiplexorMask mask);
private:
inline bool OperateHandler(int op, ISockHandler* eh, MultiplexorMask mask)
{
struct epoll_event evt;
evt.data.ptr = eh;
evt.events = mask;
return epoll_ctl(m_epfd, op, eh->GetHandle(), &evt) != -1;
}
private:
int m_epfd;
struct epoll_event* m_evts;
int m_size;
int m_timeout;
__uint32_t m_otherMasks;
};
Multiplexor::Multiplexor(int size, int timeout, bool lt)
{
m_epfd = epoll_create(size);
if (m_epfd == -1)
throw HERM_SOCKET_EXCEPTION(ST_OTHER);
m_size = size;
m_evts = new struct epoll_event[size];
m_timeout = timeout;
// sys/epoll.h is no EPOLLRDHUP(0X2000), don't add EPOLLRDHUP
m_otherMasks = EPOLLERR | EPOLLHUP;
if (!lt)
m_otherMasks |= EPOLLET;
}
Multiplexor::~Multiplexor()
{
close(m_epfd);
delete[] m_evts;
}
void Multiplexor::Run()
{
int fds = epoll_wait(m_epfd, m_evts, m_size, m_timeout);
if (fds == -1)
{
if (errno == EINTR)
return;
}
for (int i = 0; i < fds; ++i)
{
__uint32_t evts = m_evts[i].events;
ISockHandler* eh = reinterpret_cast<ISockHandler*>(m_evts[i].data.ptr);
int stateType = ST_SUCCESS;
if (evts & EPOLLIN)
stateType = eh->OnReceive();
if (evts & EPOLLOUT)
stateType = eh->OnSend();
if (evts & EPOLLERR || evts & EPOLLHUP)
stateType = ST_EXCEPT_FAILED;
if (stateType != ST_SUCCESS)
eh->OnError(stateType, errno);
}
}
void Multiplexor::Register(ISockHandler* eh, MultiplexorMask mask)
{
MultiplexorMask masks = mask | m_otherMasks;
OperateHandler(EPOLL_CTL_ADD, eh, masks);
}
void Multiplexor::Remove(ISockHandler* eh)
{
// Delete fd from epoll, don't need masks
OperateHandler(EPOLL_CTL_DEL, eh, ALL_EVENTS_MASK);
}
void Multiplexor::EnableMask(ISockHandler* eh, MultiplexorMask mask)
{
MultiplexorMask masks = mask | Herm::READ_MASK | Herm::WRITE_MASK;
OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks);
}
void Multiplexor::DisableMask(ISockHandler* eh, MultiplexorMask mask)
{
MultiplexorMask masks = (Herm::READ_MASK | Herm::WRITE_MASK) & (~mask);
if (!OperateHandler(EPOLL_CTL_MOD, eh, masks | m_otherMasks))
throw HERM_SOCKET_EXCEPTION(ST_OTHER);
}

上面類就用到epoll_create(), epoll_ctl()和epoll_wait(),以及幾種事件。epoll用起來比select 清爽一些。

大致用法類似下面這樣:

先定義一個Handler

class StreamHandler : public Herm::ISockHandler
{
public:
virtual Herm::Handle GetHandle() const;
virtual int OnReceive(int);
virtual int OnSend(int);
};

在OnReceive()處理收到數據的動作,在OnSend()。。。。

在通信線程中,大概類似這樣的代碼,實際看情況。

Multiplexor multiplexor;
StreamHandler sh;
multiplexor.Register(&sh, READ_EVT);
multiplexor.Run(...);

 

Copyright © Linux教程網 All Rights Reserved