歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> 關於Linux >> Linux多線程實踐(六)使用Posix條件變量解決生產者消費者問題

Linux多線程實踐(六)使用Posix條件變量解決生產者消費者問題

日期:2017/3/1 12:18:07   编辑:關於Linux

前面的一片文章我們已經講過使用信號量解決生產者消費者問題,那麼什麼情況下我們需要引入條件變量呢?

假設有共享的資源sum,與之相關聯的mutex 是lock_s.假設每個線程對sum的操作很簡單的,與sum的狀態無關,比如只是sum++.那麼只用mutex足夠了.程序員只要確保每個線程操作前,取得lock,然後sum++,再unlock即可.每個線程的代碼將像這樣:

add()
{
   pthread_mutex_lock(lock_s);
   sum++;
   pthread_mutex_unlock(lock_s);
}

如果操作比較復雜,假設線程t0,t1,t2的操作是sum++,而線程t3則是在sum到達100的時候,打印出一條信息,並對sum清零. 這種情況下,如果只用mutex, 則t3需要一個循環,每個循環裡先取得lock_s,然後檢查sum的狀態,如果sum>=100,則打印並清零,然後unlock.如果sum<100,則unlock,並sleep()本線程合適的一段時間。

這個時候,t0,t1,t2的代碼不變,t3的代碼如下:

print()
{
   while (1)
  {
      pthread_mutex_lock(lock_s);
      if(sum<100)
     {
         printf(“sum reach 100!”);
         pthread_mutex_unlock(lock_s);
     }
  else
  {
       pthread_mutex_unlock(lock_s);
       my_thread_sleep(100);
       return OK;
   }
 }
}

這種辦法有兩個問題

1) sum在大多數情況下不會到達100,那麼對t3的代碼來說,大多數情況下,走的是else分支,只是lock和unlock,然後sleep().這浪費了CPU處理時間.

2) 為了節省CPU處理時間,t3會在探測到sum沒到達100的時候sleep()一段時間.這樣卻又帶來另外一個問題,亦即t3響應速度下降.可能在sum到達200的時候,t4才會醒過來.

3) 這樣,程序員在設置sleep()時間的時候陷入兩難境地,設置得太短了節省不了資源,太長了又降低響應速度.真是難辦啊!

這個時候,condition variable,從天而降,拯救了焦頭爛額的你.

你首先定義一個condition variable.

pthread_cond_t cond_sum_ready=PTHREAD_COND_INITIALIZER;

t0,t1,t2的代碼只要後面加兩行,像這樣:

add()
{
   pthread_mutex_lock(lock_s);
   sum++;
   pthread_mutex_unlock(lock_s);
   if(sum>=100)
   pthread_cond_signal(&cond_sum_ready);
}
而t3的代碼則是
print
{
   pthread_mutex_lock(lock_s);
   while(sum<100)
   pthread_cond_wait(&cond_sum_ready, &lock_s);
   printf(“sum is over 100!”);
   sum=0;
   pthread_mutex_unlock(lock_s);
   return OK;
}

注意兩點:

1) 在thread_cond_wait()之前,必須先lock相關聯的mutex, 因為假如目標條件未滿足,pthread_cond_wait()實際上會unlock該mutex, 然後block,在目標條件滿足後再重新lock該mutex, 然後返回.

2) 為什麼是while(sum<100),而不是if(sum<100) ?這是因為在pthread_cond_signal()和pthread_cond_wait()返回之間,有時間差,假設在這個時間差內,還有另外一個線程t4又把sum減少到100以下了,那麼t3在pthread_cond_wait()返回之後,顯然應該再檢查一遍sum的大小.這就是用 while的用意


線程間的同步技術,主要以互斥鎖和條件變量為主,條件變量和互斥所的配合使用可以很好的處理對於條件等待的線程間的同步問題

Posix條件變量常用API:

int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);  
int pthread_cond_destroy(pthread_cond_t *cond);  
  
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);  
int pthread_cond_timedwait(pthread_cond_t *cond,  pthread_mutex_t  *mutex,  const  struct timespec *abstime);  
  
int pthread_cond_signal(pthread_cond_t *cond);  
int pthread_cond_broadcast(pthread_cond_t *cond); 

通常條件變量需要和互斥鎖同時使用, 利用互斥量保護條件變量;條件的檢測是在互斥鎖的保護下進行的。如果一個條件為假,一個線程自動阻塞,並釋放等待狀態改變的互斥鎖。如果另一個線程改變了條件,它就發送信號給關聯的條件變量, 並喚醒一個或多個等待在該條件變量上的線程,這些線程將重新獲得互斥鎖,重新評價條件。如果將條件變量放到共享內存中, 而兩進程可共享讀寫這段內存,則條件變量可以被用來實現兩進程間的線程同步。

條件變量的使用規范:

(一)、等待條件代碼
pthread_mutex_lock(&mutex);
while (條件為假)
pthread_cond_wait(cond, mutex);
修改條件
pthread_mutex_unlock(&mutex);


(二)、給條件發送通知代碼
pthread_mutex_lock(&mutex);
設置條件為真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
注意是while而不是if,原因是在信號的中斷後還能正常運行。

解決生產者消費者問題(無界緩沖區):

#include 
#include 
#include 
#include 

#include 
#include 
#include 
#include 

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)

#define CONSUMERS_COUNT 2
#define PRODUCERS_COUNT 1

pthread_mutex_t g_mutex;
pthread_cond_t g_cond;

pthread_t g_thread[CONSUMERS_COUNT + PRODUCERS_COUNT];

int nready = 0;

void *consume(void *arg)
{
    int num = (int)arg;
    while (1)
    {
        pthread_mutex_lock(&g_mutex);
        while (nready == 0)
        {
            printf("%d begin wait a condtion ...\n", num);
            pthread_cond_wait(&g_cond, &g_mutex);
        }

        printf("%d end wait a condtion ...\n", num);
        printf("%d begin consume product ...\n", num);
        --nready;
        printf("%d end consume product ...\n", num);
        pthread_mutex_unlock(&g_mutex);
        sleep(1);
    }
    return NULL;
}

void *produce(void *arg)
{
    int num = (int)arg;
    while (1)
    {
        pthread_mutex_lock(&g_mutex);
        printf("%d begin produce product ...\n", num);
        ++nready;
        printf("%d end produce product ...\n", num);
        pthread_cond_signal(&g_cond);
        printf("%d signal ...\n", num);
        pthread_mutex_unlock(&g_mutex);
        sleep(1);
    }
    return NULL;
}

int main(void)
{
    int i;

    pthread_mutex_init(&g_mutex, NULL);
    pthread_cond_init(&g_cond, NULL);


    for (i = 0; i < CONSUMERS_COUNT; i++)
        pthread_create(&g_thread[i], NULL, consume, (void *)i);

    sleep(1);

    for (i = 0; i < PRODUCERS_COUNT; i++)
        pthread_create(&g_thread[CONSUMERS_COUNT + i], NULL, produce, (void *)i);

    for (i = 0; i < CONSUMERS_COUNT + PRODUCERS_COUNT; i++)
        pthread_join(g_thread[i], NULL);

    pthread_mutex_destroy(&g_mutex);
    pthread_cond_destroy(&g_cond);

    return 0;
}
Copyright © Linux教程網 All Rights Reserved