歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> Linux教程 >> Linux 多線程條件變量同步

Linux 多線程條件變量同步

日期:2017/2/28 13:47:34   编辑:Linux教程

條件變量是線程同步的另一種方式,實際上,條件變量是信號量的底層實現,這也就意味著,使用條件變量可以擁有更大的自由度,同時也就需要更加小心的進行同步操作。條件變量使用的條件本身是需要使用互斥量進行保護的,線程在改變條件狀態之前必須首先鎖住互斥量,其他線程在獲得互斥量之前不會察覺到這種改變,因為互斥量必須在鎖定之後才能計算條件。

模型

#include<pthread.h>
pthread_t cond              //准備條件變量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;     //初始化靜態的條件變量
pthread_cond_init()         //初始化一個動態的條件變量
pthread_cond_wait()         //等待條件變量變為真
pthread_cond_timedwait()    //等待條件變量變為真,等待有時間限制。
pthread_cond_signal()       //至少喚醒一個等待該條件的線程
pthread_cond_broadcast()    //喚醒等待該條件的所有線程
pthread_cond_destroy()      //銷毀一個條件變量

pthread_cond_init()

//初始化一個動態的條件變量
//成功返回0,失敗返回error number
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

cond:條件變量指針,這裡使用了restrict關鍵字
attr:條件變量屬性指針,默認屬性賦NULL

pthread_cond_wait() / pthread_cond_timedwait()

//等待條件變量為真。收到pthread_cond_broadcast()或pthread_cond_signal()就喚醒
//成功返回0,失敗返回error number
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

條件變量的使在多線程的程序中,因為各個線程都可以訪問大部分的進程資源,所以我們為了保證公有資源的使用是可以控制的,在一個線程開始使用公有資源之前要嘗試獲取互斥鎖,在使用完畢之後要釋放互斥鎖,這樣就能保證每個公共資源在一個時刻都只能被一個線程使用,在一定程度上得到了控制,但這並不能解決同步的問題,考慮如下兩個線程:

Q=create_queue();
pthread_t mutex
//線程A,入隊
while(1){
    lock(mutex);
    in_queue(Q);
    unlock(mutex);
}

//線程B,出隊
while(1){
    lock(mutex);
    out_queue(Q);
    unlock(mutex);
}

上述代碼可以實現兩個線程的互斥,即同一時刻只有一個線程在使用公有資源-隊列。但如果線程B獲取了鎖,但隊列中是空的,它的out_queue()也就是沒有意義的,所以我們這裡更需要一種方法將兩個線程進行同步:只有當隊列中有數據的時候才進行出隊。
我們設計這樣一種邏輯:

//線程B,出隊
while(1){
    lock(mutex);
    if(隊列是空,線程不應該執行){
        釋放鎖;
        continue;
    }
    out_queue(Q);
    unlock(mutex);
}

這個程序就解決了上述的問題,即便線程B搶到了互斥鎖,但是如果隊列是空的,他就釋放鎖讓兩個線程重新搶鎖,希望這次線程A能搶到並往裡放一個數據。
但這個邏輯還有一個問題,就是多線程並發的問題,很有可能發生的一種情況是:線程B搶到了鎖,發現沒有數據,釋放鎖->線程A立即搶到了鎖並往裡放了一個數據->線程B執行continue,顯然,這種情況下是不應該continue的,因為線程B想要的條件在釋放鎖之後立即就被滿足了,它錯過了條件。
So,我們想一種反過來的邏輯:

//線程B,出隊
while(1){
    lock(mutex);
    if(隊列是空,線程不應該執行){
        continue;
        釋放鎖;
    }
    out_queue(Q);
    unlock(mutex);
}

顯然這種方法有個致命的問題:一旦continue了,線程B自己獲得鎖就沒有被釋放,這樣線程A不可能搶到鎖,而B繼續加鎖就會形成死鎖!
Finaly,我們希望看到一個函數fcn,如果條件不滿足,能同時釋放鎖+停止執行線程,如果條件滿足,自己當時獲得的鎖還在

//線程B,出隊
while(1){
    lock(mutex);
    fcn(當前線程不應該執行,mutex)     //if(當前線程不應該執行){釋放鎖“同時” continue;}
    out_queue(Q);
    unlock(mutex);
}

OK,這個就是pthread_cond_wait()的原理了,只不過它把continue變成了"休眠"這種由OS負責的操作,可以大大的節約資源。
當然,線程執行的條件是很難當作參數傳入一個函數的,POSIX多線程的模型使用系統提供的"條件變量"+"我們自己定義的具體條件" 來確定一個線程是否應該執行接下來的內容。"條件變量"只有,所以一種典型的多線程同步的結構如下

//線程B,出隊
while(1){
    lock(mutex);
    while(條件不滿足)
        pthread_cond_wait(cond,mutex)
        //獲得互斥鎖可以同時保護while裡的條件和cond的判斷,二者總是用一把鎖保護,並一同釋放  
        //cond為假,就休眠同時釋放鎖,等待被cond為真喚醒,把自己獲得的鎖拿回來           
        //拿回自己的鎖再檢查線程執行條件,條件不滿足繼續循環,直到條件滿足跳出循環
        //這個函數是帶著"線程的執行條件為真"+"cond為真"走出循環的
        //這個函數返回後cond被重新設置為0
    out_queue(Q);
    unlock(mutex);
}

pthread_cond_braoadcast()/pthread_cond_signal()

//使條件變量為真並喚醒wait中的線程,前者喚醒所有wait的,後者喚醒一個
//成功返回0,失敗返回error number
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

pthread_cond_destroy()

//銷毀條件變量
//成功返回0,失敗返回error number
int pthread_cond_destroy(pthread_cond_t *cond);

例子-線程池

\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread.h
#ifndef __THREAD_H__
#define __THREAD_H__
#define THREAD_NUM 3
#define TASK_NUM 100


typedef struct{//每個節點的封裝格式,fcn是用戶自定義函數,arg是用戶自定義函數參數指針,保證通用性聲明為void
    void* (*fcn)(void* arg);
    void* arg;
}task_t;
typedef struct{        //用戶自定義函數的參數結構體
    int x;
}argfcn_t;

//#define LQ_DATA_T task_t*
#define LQ_DATA_T task_t

#endif  //__THREAD_H__
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//lqueue.c
#include"thread.h"
#include"lqueue.h"
#include<stdlib.h>
...
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread_pool.c
#include<stdio.h>   
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include"thread.h"
#include"lqueue.h"

//互斥量和條件變量
pthread_mutex_t lock;
pthread_cond_t cond;


//全局的表
lqueue_t* Q;

//每個線程的任務,必須是這個形式的函數指針
void* do_task(void* p){   
    task_t data;
    int ret=0;
    while(1){
        pthread_mutex_lock(&lock);
        while(is_empty_lqueue(Q)){          //大家收到廣播,因為延遲,可能醒了好幾個,要判斷一下是不是自己
            pthread_cond_wait(&cond,&lock);     //先搶到鎖再醒
        }
        ret=out_lqueue(Q,&data);
        pthread_mutex_unlock(&lock);
        data.fcn(data.arg);
    }
}

//創建線程池
void create_pool(void){
    //初始化隊列
    Q=create_lqueue();
    //初始化互斥量
    pthread_mutex_init(&lock,NULL);
    //初始化條件變量
    pthread_cond_init(&cond,NULL);
    int i=THREAD_NUM;
    pthread_t tid[THREAD_NUM];
    while(i--)
        pthread_create(&tid[i],NULL,do_task,NULL);
}



//准備函數
void* fcn(void* parg){                //用戶自定義的需要線程執行的函數
    argfcn_t* i=(argfcn_t*)parg;
    printf("this is task1\n");
    printf("task1:%d\n",i->x);
}

//添加任務
void pool_add_task(void*(*pfcn)(void*parg),void*arg){
    task_t task;
    task.fcn=pfcn;
    task.arg=arg;

    in_lqueue(Q,task);
    pthread_cond_signal(&cond); //添加了一個任務,用signal更好
}

int main(int argc, const char *argv[])
{
    //創建線程池
    create_pool();

    //准備參數
    argfcn_t argfcn;
    argfcn.x=5;

    //添加任務
    pool_add_task(fcn,(void*)&argfcn);  
    pool_add_task(fcn,(void*)&argfcn);  
    pool_add_task(fcn,(void*)&argfcn);  
    pause();    
    return 0;
}
Copyright © Linux教程網 All Rights Reserved