歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> 並發無鎖環形隊列的實現

並發無鎖環形隊列的實現

日期:2017/3/1 9:08:40   编辑:Linux編程

前面在《Linux內核數據結構kfifo詳解》一文中詳細解析了 Linux 內核並發無鎖環形隊列kfifo的原理和實現,kfifo鬼斧神工,博大精深,讓人歎為觀止,但遺憾的是kfifo為內核提供服務,並未開放出來。劍不試則利鈍暗,弓不試則勁撓誣,鷹不試則巧拙惑,馬不試則良驽疑,光說不練是不能學到精髓的,下面就動手實現自己的並發無鎖隊列UnlockQueue(單生產者單消費者)。

一、UnlockQueue聲明

   1: #ifndef _UNLOCK_QUEUE_H
   2: #define _UNLOCK_QUEUE_H
   3:  
   4: class UnlockQueue
   5: {
   6: public:
   7:     UnlockQueue(int nSize);
   8:     virtual ~UnlockQueue();
   9:  
  10:     bool Initialize();
  11:  
  12:     unsigned int Put(const unsigned char *pBuffer, unsigned int nLen);
  13:     unsigned int Get(unsigned char *pBuffer, unsigned int nLen);
  14:  
  15:     inline void Clean() { m_nIn = m_nOut = 0; }
  16:     inline unsigned int GetDataLen() const { return  m_nIn - m_nOut; }
  17:  
  18: private:
  19:     inline bool is_power_of_2(unsigned long n) { return (n != 0 && ((n & (n - 1)) == 0)); };
  20:     inline unsigned long roundup_power_of_two(unsigned long val);
  21:  
  22: private:
  23:     unsigned char *m_pBuffer;    /* the buffer holding the data */
  24:     unsigned int   m_nSize;        /* the size of the allocated buffer */
  25:     unsigned int   m_nIn;        /* data is added at offset (in % size) */
  26:     unsigned int   m_nOut;        /* data is extracted from off. (out % size) */
  27: };
  28:  
  29: #endif

UnlockQueue與kfifo 結構相同相同,也是由一下變量組成:

UnlockQueue kfifo 作用 m_pBuffer buffer 用於存放數據的緩存 m_nSize size 緩沖區空間的大小,圓整為2的次冪 m_nIn in 指向buffer中隊頭 m_nOut out 指向buffer中的隊尾 UnlockQueue的設計是用在單生產者單消費者情況下,所以不需要鎖 lock 如果使用不能保證任何時間最多只有一個讀線程和寫線程,必須使用該lock實施同步。

二、UnlockQueue構造函數和初始化

   1: UnlockQueue::UnlockQueue(int nSize)
   2: :m_pBuffer(NULL)
   3: ,m_nSize(nSize)
   4: ,m_nIn(0)
   5: ,m_nOut(0)
   6: {
   7:     //round up to the next power of 2
   8:     if (!is_power_of_2(nSize))
   9:     {
  10:         m_nSize = roundup_power_of_two(nSize);
  11:     }
  12: }
  13:  
  14: UnlockQueue::~UnlockQueue()
  15: {
  16:     if(NULL != m_pBuffer)
  17:     {
  18:         delete[] m_pBuffer;
  19:         m_pBuffer = NULL;
  20:     }
  21: }
  22:  
  23: bool UnlockQueue::Initialize()
  24: {
  25:     m_pBuffer = new unsigned char[m_nSize];
  26:     if (!m_pBuffer)
  27:     {
  28:         return false;
  29:     }
  30:  
  31:     m_nIn = m_nOut = 0;
  32:  
  33:     return true;
  34: }
  35:  
  36: unsigned long UnlockQueue::roundup_power_of_two(unsigned long val)
  37: {
  38:     if((val & (val-1)) == 0)
  39:         return val;
  40:  
  41:     unsigned long maxulong = (unsigned long)((unsigned long)~0);
  42:     unsigned long andv = ~(maxulong&(maxulong>>1));
  43:     while((andv & val) == 0)
  44:         andv = andv>>1;
  45:  
  46:     return andv<<1;
  47: }

1.在構造函數中,對傳入的size進行2的次冪圓整,圓整的好處是可以將m_nIn % m_nSize 可以轉化為 m_nIn & (m_nSize – 1),取模運算”的效率並沒有 “位運算” 的效率高。

2.在構造函數中,未給buffer分配內存,而在Initialize中分配,這樣做的原因是:我們知道在new UnlockQueue的時候有兩步操作,第一步分配內存,第二步調用構造函數,如果將buffer的分配放在構造函數中,那麼就可能 buffer 就可能分配失敗,而後面用到buffer,還需要判空。

三、UnlockQueue入隊和出隊操作

   1: unsigned int UnlockQueue::Put(const unsigned char *buffer, unsigned int len)
   2: {
   3:     unsigned int l;
   4:  
   5:     len = std::min(len, m_nSize - m_nIn + m_nOut);
   6:  
   7:     /*
   8:      * Ensure that we sample the m_nOut index -before- we
   9:      * start putting bytes into the UnlockQueue.
  10:      */
  11:     __sync_synchronize();
  12:  
  13:     /* first put the data starting from fifo->in to buffer end */
  14:     l = std::min(len, m_nSize - (m_nIn  & (m_nSize - 1)));
  15:     memcpy(m_pBuffer + (m_nIn & (m_nSize - 1)), buffer, l);
  16:  
  17:     /* then put the rest (if any) at the beginning of the buffer */
  18:     memcpy(m_pBuffer, buffer + l, len - l);
  19:  
  20:     /*
  21:      * Ensure that we add the bytes to the kfifo -before-
  22:      * we update the fifo->in index.
  23:      */
  24:     __sync_synchronize();
  25:  
  26:     m_nIn += len;
  27:  
  28:     return len;
  29: }
  30:  
  31: unsigned int UnlockQueue::Get(unsigned char *buffer, unsigned int len)
  32: {
  33:     unsigned int l;
  34:  
  35:     len = std::min(len, m_nIn - m_nOut);
  36:  
  37:     /*
  38:      * Ensure that we sample the fifo->in index -before- we
  39:      * start removing bytes from the kfifo.
  40:      */
  41:     __sync_synchronize();
  42:  
  43:     /* first get the data from fifo->out until the end of the buffer */
  44:     l = std::min(len, m_nSize - (m_nOut & (m_nSize - 1)));
  45:     memcpy(buffer, m_pBuffer + (m_nOut & (m_nSize - 1)), l);
  46:  
  47:     /* then get the rest (if any) from the beginning of the buffer */
  48:     memcpy(buffer + l, m_pBuffer, len - l);
  49:  
  50:     /*
  51:      * Ensure that we remove the bytes from the kfifo -before-
  52:      * we update the fifo->out index.
  53:      */
  54:     __sync_synchronize();
  55:  
  56:     m_nOut += len;
  57:  
  58:     return len;
  59: }

入隊和出隊操作與kfifo相同,用到的技巧也完全相同,有不理解的童鞋可以參考前面一篇文章《Linux內核數據結構kfifo詳解》。這裡需要指出的是__sync_synchronize()函數,由於linux並未開房出內存屏障函數,而在gcc4.2以上版本提供This builtin issues a full memory barrier,有興趣同學可以參考Built-in functions for atomic memory access。

四、測試程序

如圖所示,我們設計了兩個線程,一個生產者隨機生成學生信息放入隊列,一個消費者從隊列中取出學生信息並打印,可以看到整個代碼是無鎖的

   1: #include "UnlockQueue.h"
   2: #include <iostream>
   3: #include <algorithm>
   4: #include <pthread.h>
   5: #include <time.h>
   6: #include <stdio.h>
   7: #include <errno.h>
   8: #include <string.h>
   9:  
  10: struct student_info
  11: {
  12:    long stu_id;
  13:    unsigned int age;
  14:    unsigned int score;
  15: };
  16:  
  17: void print_student_info(const student_info *stu_info)
  18: {
  19:     if(NULL == stu_info)
  20:         return;
  21:  
  22:     printf("id:%ld\t",stu_info->stu_id);
  23:     printf("age:%u\t",stu_info->age);
  24:     printf("score:%u\n",stu_info->score);
  25: }
  26:  
  27: student_info * get_student_info(time_t timer)
  28: {
  29:      student_info *stu_info = (student_info *)malloc(sizeof(student_info));
  30:      if (!stu_info)
  31:      {
  32:         fprintf(stderr, "Failed to malloc memory.\n");
  33:         return NULL;
  34:      }
  35:      srand(timer);
  36:      stu_info->stu_id = 10000 + rand() % 9999;
  37:      stu_info->age = rand() % 30;
  38:      stu_info->score = rand() % 101;
  39:      //print_student_info(stu_info);
  40:      return stu_info;
  41: }
  42:  
  43: void * consumer_proc(void *arg)
  44: {
  45:      UnlockQueue* queue = (UnlockQueue *)arg;
  46:      student_info stu_info;
  47:      while(1)
  48:      {
  49:          sleep(1);
  50:          unsigned int len = queue->Get((unsigned char *)&stu_info, sizeof(student_info));
  51:          if(len > 0)
  52:          {
  53:              printf("------------------------------------------\n");
  54:              printf("UnlockQueue length: %u\n", queue->GetDataLen());
  55:              printf("Get a student\n");
  56:              print_student_info(&stu_info);
  57:              printf("------------------------------------------\n");
  58:          }
  59:      }
  60:      return (void *)queue;
  61: }
  62:  
  63: void * producer_proc(void *arg)
  64:  {
  65:       time_t cur_time;
  66:       UnlockQueue *queue = (UnlockQueue*)arg;
  67:       while(1)
  68:       {
  69:           time(&cur_time);
  70:           srand(cur_time);
  71:           int seed = rand() % 11111;
  72:           printf("******************************************\n");
  73:           student_info *stu_info = get_student_info(cur_time + seed);
  74:           printf("put a student info to queue.\n");
  75:           queue->Put( (unsigned char *)stu_info, sizeof(student_info));
  76:           free(stu_info);
  77:           printf("UnlockQueue length: %u\n", queue->GetDataLen());
  78:           printf("******************************************\n");
  79:           sleep(1);
  80:       }
  81:      return (void *)queue;
  82:   }
  83:  
  84:  
  85: int main()
  86: {
  87:     UnlockQueue unlockQueue(1024);
  88:     if(!unlockQueue.Initialize())
  89:     {
  90:         return -1;
  91:     }
  92:  
  93:     pthread_t consumer_tid, producer_tid;
  94:  
  95:     printf("multi thread test.......\n");
  96:  
  97:     if(0 != pthread_create(&producer_tid, NULL, producer_proc, (void*)&unlockQueue))
  98:     {
  99:          fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",
 100:                  errno, strerror(errno));
 101:          return -1;
 102:     }
 103:  
 104:     if(0 != pthread_create(&consumer_tid, NULL, consumer_proc, (void*)&unlockQueue))
 105:     {
 106:            fprintf(stderr, "Failed to create consumer thread.errno:%u, reason:%s\n",
 107:                    errno, strerror(errno));
 108:            return -1;
 109:     }
 110:  
 111:     pthread_join(producer_tid, NULL);
 112:     pthread_join(consumer_tid, NULL);
 113:  
 114:     return 0;
 115:  }

運行結果:

Copyright © Linux教程網 All Rights Reserved