歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> 關於Linux >> UDP的epoll並發框架-UDP Listener解決OpenVPN的並發問題

UDP的epoll並發框架-UDP Listener解決OpenVPN的並發問題

日期:2017/3/1 12:17:52   编辑:關於Linux
UDP具有是一種很好的封裝協議,比如OpenVPN使用UDP封裝會比TCP好很多,現在越來越多的業務采用UDP傳輸,然後自己定義按序到達以及流控邏輯,然而就我個人的使用經驗來看,UDP太難做並發,大多數情況下,使用UDP會讓epoll等高性能event機制優勢全無。本文以OpenVPN為例,說明一下我是怎麼解決UDP並發問題的。

異步並發模型與epoll

和apache相比,nginx采用異步的處理方式,也就是說,一個線程可以處理多個連接,基於event模型,來了個數據包就讀,可能依次到達的數據不屬於同一個連接,但是沒關系,只要能將可讀的socket描述符和具體的連接對應上即可。這樣會使得在大並發場景下,讓CPU逼近其極限運轉,因為它幾乎沒有時間閒著,它會一直處理到達的數據包。apache的模型就不是這樣,它會讓一個連接單獨占有一個線程,如果有大量的連接就會有大量的線程,然而對於每一個線程而言,其數據讀寫的壓力並不是很大,這就會導致大量線程之間頻繁切換,而切換會導致cache的刷新等副作用...因此在同樣的硬件配置情形下,nginx的異步模型要比apache好很多。

我們已經知道,異步處理是搞定大並發的根本,接下來的問題是,如何讓一個就緒的socket和一個業務邏輯連接對應起來,這個問題在同步模型下並不存在,因為一個線程只處理一個連接。曾經的event機制比如select,poll,它們只能告訴你socket n就緒了,你不得不自己去通過數據結構來組織socket n和該連接信息之間的關系,典型的如下:

struct conn {
    int sd;
    void *others;
};

list conns;

一個鏈表conns囊括了該線程負責的所有連接,如果select/poll告訴你socket n就緒了,你不得不遍歷這個conns鏈表,比較誰的sd是n,然後取出conn來處理,雖然可以用更加高效的數據結構,但是查找是必不可少的。然而epoll解決了這個問題。

在調用epoll_ctrl將一個socket加入到epoll中時,API會為你提供一個指針,讓你直接綁定一個socket描述符和一個指針,一旦socket就緒,取出的是一個結構體,其中包含了與該socket對應的指針,因此你便可以這麼做:

conn.sd = sd;
conn.others = all;
ev.events = EPOLLIN;
ev.data.ptr = &conn;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, sd, &ev);
while (1) {      
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {
        conn = events[n].data.ptr;
        recv(conn.sd, ....);
        ....
    }
}

conn會一下子取出來。這是合理的方式。畢竟,內核中已經經過socket查找了,一個5元組唯一代表了一個連接,為何要在用戶態程序再找一次呢?因此除了epoll不需要遍歷所有的被監視socket之外,可以保存用戶的指針也是其相對於select/poll的一大優勢。nginx正是用的這種方式。我們回到OpenVPN。

使用TCP的OpenVPN

使用TCP的OpenVPN跟nginx幾乎是一模一樣,其核心處理邏輯如下:

/* 加入偵聽socket */
context.sd = listener;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);

/* 加入TUN網卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);

while(1) {
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {
        if (events[n].data.ptr == context) {
            child_sd = accept(context.sd, remote_addr....);
            multi_instance *mi = create_mi(child_sd, remote_addr, ...);
            entry.ptr = mi;
            entry.type = SOCKET;
            new_ev.events = EPOLLIN;
            new_ev.data.ptr = entry;
            epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);
            ....
        } else if (events[n].data.ptr.type == SOCKET){
            multi_instance *mi = events[n].data.ptr;
            data = read_from_socket(mi);
            // 這裡簡化了處理,因為並不是每一個數據包都是需要加密解密的,還有控制通道的包
            decrypt(mi, data);
            write_to_tun(data);
        } else {
            tun *tun = events[n].data.ptr.ptr;
            packet = read_from_tun(tun);
            lock(mi_hashtable);
            multi_instance *mi = lookup_multi_instance_from(packet);
            unlock(mi_hashtable);
            encrypt(packet);
            write_to_socket(packet, mi);
        }
    }
    ...
}

以上就是TCP模式下的OpenVPN全部邏輯,可以看到,如果socket可讀,那麼就可以直接取到multi_instance,然後順序處理就是了。我記得去年我就把OpenVPN改成多線程了,但是現在看來那是個失敗的做法。如果使用TCP,從上述邏輯可以看到,就算使用多線程,在socket-to-tun這個路徑上也不用加鎖,因此multi_instance直接通過epoll_wait就可以取的到。

使用UDP的OpenVPN

然而對於UDP而言,OpenVPN的處理邏輯根上面TCP的邏輯就截然不同了。因為全程只有一個UDP socket,接受所有客戶端的連接,此時根本不存在什麼多路復用的問題,充其量也就是那唯一的UDP socket和tun網卡字符設備二者之間的兩路復用,使用epoll完全沒有必要。為了定位了具體的multi_instance,你不得不先去read唯一的那個UDP socket,然後根據recvfrom返回參數中的sockaddr結構體來構造4元組,然後根據這4元組在全局的multi_instance hash表中去查找具體multi_instance實例。其邏輯如下所示:

/* 加入唯一的UDP socket */
context.sd = udp_sd;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);

/* 加入TUN網卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);

while(1) {
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {  //實際上nfds最多也就是2
        if (events[n].data.ptr == context) {
            data = recvfrom(context.sd, remote_addr....);
            lock(mi_hashtable);   //如果多線程,這個鎖將會成為瓶頸,即便是RW鎖也一樣
            multi_instance *mi = lookup_mi(child_sd, remote_addr, ...);  //再好的hash算法,也不是0成本的!
            unlock(mi_hashtable);
            // 這裡簡化了處理,因為並不是每一個數據包都是需要加密解密的,還有控制通道的包
            decrypt(mi, data);
            write_to_tun(data);  
            ....
        } else {
            tun *tun = events[n].data.ptr.ptr;
            packet = read_from_tun(tun);
            lock(mi_hashtable);
            multi_instance *mi = lookup_multi_instance_from(packet);
            unlock(mi_hashtable);
            encrypt(packet);
            write_to_socket(packet, mi);
        }
    }
    ...
}

可見,TCP的OpenVPN和UDP的OpenVPN處理方式完全不同,UDP的問題在於,完全沒有充分利用epoll的多路復用機制,不得不根據數據包的recvfrom返回地址來查找multi_instance...

讓UDP socket也Listen起來

如果UDP也能像TCP一樣,每一個用戶接進來就為之創建一個單獨的socket為其專門服務該多好,這樣在大並發的時候,就可以充分復用內核UDP層的socket查找結論加上epoll的通知機制了。理論上這是可行的,因為UDP的4元組可以唯一識別一個與之通信的客戶端,雖然UDP生成無連接,不可靠,但是為每一個連接的客戶端創建一個socket並沒有破壞UDP的語義,只是改變了UDP的編程模型而已,內核協議棧依然不會去刻意維護一個UDP連接,也不會進行任何的數據確認。
需要說明的是,這種方案僅僅對“長連接”的UDP有意義,比如OpenVPN這類。因為UDP是沒有連接的,那麼你也就不知道一個客戶端什麼時候會永遠停止發送數據,因此必然要通過定時器來定時關閉那些在一定時間段內沒有數據的socket。
為了驗證可行性,我先在用戶態做實驗,也就是說,接受一個客戶端的“連接請求”(其實就是一個數據包)時,我手工為其創建一個socket,然後bind本地地址,並且connect從recvfrom返回的對端地址,這樣理論上對於後續的數據包,epoll都應該觸發這個新的socket,畢竟它更精確。事實是不是這樣呢?以下的程序可以證明:

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define SO_REUSEPORT    15

#define MAXBUF 10240
#define MAXEPOLLSIZE 100

int flag = 0;

int read_data(int sd)
{
    char recvbuf[MAXBUF + 1];
    int  ret;
    struct sockaddr_in client_addr;
    socklen_t cli_len=sizeof(client_addr);

    bzero(recvbuf, MAXBUF + 1);
  
    ret = recvfrom(sd, recvbuf, MAXBUF, 0, (struct sockaddr *)&client_addr, &cli_len);
    if (ret > 0) {
        printf("read[%d]: %s  from  %d\n", ret, recvbuf, sd);
    } else {
        printf("read err:%s  %d\n", strerror(errno), ret);
      
    }
    fflush(stdout);
}

int udp_accept(int sd, struct sockaddr_in my_addr)
{
    int new_sd = -1;
    int ret = 0;
    int reuse = 1;
    char buf[16];
    struct sockaddr_in peer_addr;
    socklen_t cli_len = sizeof(peer_addr);

    ret = recvfrom(sd, buf, 16, 0, (struct sockaddr *)&peer_addr, &cli_len);
    if (ret > 0) {
    }

    if ((new_sd = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
        perror("child socket");
        exit(1);
    } else {
        printf("parent:%d  new:%d\n", sd, new_sd);
    }

    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEADDR, &reuse,sizeof(reuse));
    if (ret) {
        exit(1);
    }

    ret = setsockopt(new_sd, SOL_SOCKET, SO_REUSEPORT, &reuse, sizeof(reuse));
    if (ret) {
        exit(1);
    }

    ret = bind(new_sd, (struct sockaddr *) &my_addr, sizeof(struct sockaddr));
    if (ret){
        perror("chid bind");
        exit(1);
    } else {
    }

    peer_addr.sin_family = PF_INET;
    printf("aaa:%s\n", inet_ntoa(peer_addr.sin_addr));
    if (connect(new_sd, (struct sockaddr *) &peer_addr, sizeof(struct sockaddr)) == -1) {
        perror("chid connect");
        exit(1);
    } else {
    }

out:
    return new_sd;
}

int main(int argc, char **argv)
{
    int listener, kdpfd, nfds, n, curfds;
    socklen_t len;
    struct sockaddr_in my_addr, their_addr;
    unsigned int port;
    struct epoll_event ev;
    struct epoll_event events[MAXEPOLLSIZE];
    int opt = 1;;
    int ret = 0;

    port = 1234;
  
    if ((listener = socket(PF_INET, SOCK_DGRAM, 0)) == -1) {
        perror("socket");
        exit(1);
    } else {
        printf("socket OK\n");
    }

    ret = setsockopt(listener,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
    if (ret) {
        exit(1);
    }

    ret = setsockopt(listener, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));
    if (ret) {
        exit(1);
    }
  
    bzero(&my_addr, sizeof(my_addr));
    my_addr.sin_family = PF_INET;
    my_addr.sin_port = htons(port);
    my_addr.sin_addr.s_addr = INADDR_ANY;
    if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1) {
        perror("bind");
        exit(1);
    } else {
        printf("IP bind OK\n");
    }
  
    kdpfd = epoll_create(MAXEPOLLSIZE);

    ev.events = EPOLLIN|EPOLLET;
    ev.data.fd = listener;

    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0) {
        fprintf(stderr, "epoll set insertion error: fd=%dn", listener);
        return -1;
    } else {
        printf("ep add OK\n");
    }
 
    while (1) {
      
        nfds = epoll_wait(kdpfd, events, 10000, -1);
        if (nfds == -1) {
            perror("epoll_wait");
            break;
        }
      
        for (n = 0; n < nfds; ++n) {
            if (events[n].data.fd == listener) {
                printf("listener:%d\n", n);
                int new_sd;               
                struct epoll_event child_ev;

                new_sd = udp_accept(listener, my_addr);
                child_ev.events = EPOLLIN;
                child_ev.data.fd = new_sd;
                if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, new_sd, &child_ev) < 0) {
                    fprintf(stderr, "epoll set insertion error: fd=%dn", new_sd);
                    return -1;
                }
            } else {
                read_data(events[n].data.fd);
            }
        }
    }
    close(listener);
    return 0;
}

需要說明的是,REUSEPORT是必要的,因為在connect之前,你必須為新建的socket bind跟listener一樣的IP地址和端口,因此就需要這個socket選項。
此時,如果你用多個udp客戶端去給這個服務端發數據,會發現完全實現了想要的效果。

內核中的UDP Listener

雖然在用戶態可以實現效果,但是編程模型並不太好用,為了創建一個socket,你不得不先去recvfrom一下數據,好得到對端的地址,雖然使用PEEK標志可以讓創建好child socket後再讀一次,但是仔細想想,最徹底的方案還是直接擴展內核,我基於3.9.6內核,對__udp4_lib_rcv這個UDP協議棧接收函數作了以下的修改:

int __udp4_lib_rcv(struct sk_buff *skb, struct udp_table *udptable,
                   int proto)
{
......................
        sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable);

        if (sk != NULL) {
                int ret;
#if 1
                // 這個UDP_LISTEN,通過setsockopt來設置
                if (sk->sk_state == UDP_LISTEN) {
                        // 如果是UDP的listener,創建一個數據socket
                        struct sock *newsk = inet_udp_clone_lock(sk, skb, GFP_ATOMIC);
                        if (newsk) {
                                struct inet_sock *newinet;
                                // 為這個數據傳輸socket根據skb來填充4元組信息
                                newinet               = inet_sk(newsk);
                                newinet->inet_daddr   = ip_hdr(skb)->saddr;
                                newinet->inet_rcv_saddr = ip_hdr(skb)->daddr;
                                newinet->inet_saddr           = ip_hdr(skb)->daddr;
                                rcu_assign_pointer(newinet->inet_opt, NULL);

                                newinet->mc_index     = inet_iif(skb);
                                newinet->mc_ttl       = ip_hdr(skb)->ttl;
                                newinet->rcv_tos      = ip_hdr(skb)->tos;
                                newinet->inet_id = 0xffffffff ^ jiffies;
                                inet_sk_rx_dst_set(newsk, skb);
                                // sock結構體新增csk變量,類似TCP的accept queue,但是為了簡單,目前每個Listen socket只能持有一個csk,即child sock。
                                sk->csk = newsk;

                                // 將新的數據傳輸socket排入全局的UDP socket hash表
                                if (newsk->sk_prot->get_port(newsk, newinet->inet_num)) {
                                        printk("[UDP listen] get port error\n");
                                        release_sock(newsk);
                                        err = -2;
                                        goto out_go;
                                }
                                ret = udp_queue_rcv_skb(newsk, skb);
                                // 喚醒epoll,讓epoll返回UDP Listener
                                sk->sk_data_ready(sk, 0);
                                sock_put(newsk);
                        } else {
                                printk("[UDP listen] create new error\n");
                                sock_put(sk);
                                return -1;
                        }
out_go:
                        sock_put(sk);
                        if (ret > 0)
                                return -ret;
                        return 0;
                }
#endif
                ret = udp_queue_rcv_skb(sk, skb);
                sock_put(sk);
......................
}

我只是測試,因此並沒有擴展UDP的accept方法,只是簡單的用getsocketopt來獲得這個新的socket描述符並為task安裝該文件描述符,setsockopt可以設置一個UDP socket為listener。這樣用戶態的編程模型就很簡單了。

使用新的Listen UDP來改造OpenVPN

有必要重構一下OpenVPN了,現如今它的邏輯變成了:

listen = 1;
listener = socket(PF_INET, SOCK_DGRAM, 0);
setsockopt(new_sd, SOL_SOCKET, SO_UDPLISTEN, &listen,sizeof(listen));

/* 加入偵聽socket */
context.sd = listener;
context.others = dont_care;
listen_ev.events = EPOLLIN;
listen_ev.data.ptr = context;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &listen_ev);

/* 加入TUN網卡 */
tun.sd = tun;
tun.others = dont_care;
entry.ptr = tun;
entry.type = TUN;
tun_ev.events = EPOLLIN;
tun_ev.data.ptr = entry;
epoll_ctl(kdpfd, EPOLL_CTL_ADD, tun, &tun_ev);

while(1) {
    nfds = epoll_wait(kdpfd, events, 10000, -1);
    for (n = 0; n < nfds; ++n) {
        if (events[n].data.ptr == context) {
            getsockopt(context.sd, SOL_SOCKET, &newsock_info....);
            child_sd = newsock_info.sd;
            multi_instance *mi = create_mi(child_sd, newsock_info.remote_addr, ...);
            entry.ptr = mi;
            entry.type = SOCKET;
            new_ev.events = EPOLLIN;
            new_ev.data.ptr = entry;
            epoll_ctl(kdpfd, EPOLL_CTL_ADD, child_sd, &new_ev);
            // 這是UDP,內核除了通知Listener之外,還會將數據排入child_sd,因此需要去讀取,可以參考TCP的Fastopen邏輯
            data = recvfrom(child_sd, ....);
            ....
        } else if (events[n].data.ptr.type == SOCKET){
            multi_instance *mi = events[n].data.ptr;
            data = read_from_socket(mi);
            // 這裡簡化了處理,因為並不是每一個數據包都是需要加密解密的,還有控制通道的包
            decrypt(mi, data);
            write_to_tun(data);
        } else {
            tun *tun = events[n].data.ptr.ptr;
            packet = read_from_tun(tun);
            lock(mi_hashtable);
            multi_instance *mi = lookup_multi_instance_from(packet);
            unlock(mi_hashtable);
            encrypt(packet);
            write_to_socket(packet, mi);
        }
    }
    ...
}

除了把accept改成了getsockopt之外,別的幾乎和TCP的OpenVPN完全一致了。
如此一來,2014年改造的OpenVPN多線程版本就完美了,用戶態根本不需要再使用recvfrom返回的address信息來定位multi_instance了,一個multi_instance唯一和一個socket綁定,而每一個socket都由epoll來管理,大大降低了用戶態查找multi_instance的開銷,同時也避免了鎖定。
Copyright © Linux教程網 All Rights Reserved