條件變量是線程同步的另一種方式,實際上,條件變量是信號量的底層實現,這也就意味著,使用條件變量可以擁有更大的自由度,同時也就需要更加小心的進行同步操作。條件變量使用的條件本身是需要使用互斥量進行保護的,線程在改變條件狀態之前必須首先鎖住互斥量,其他線程在獲得互斥量之前不會察覺到這種改變,因為互斥量必須在鎖定之後才能計算條件。
#include<pthread.h>
pthread_t cond //准備條件變量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; //初始化靜態的條件變量
pthread_cond_init() //初始化一個動態的條件變量
pthread_cond_wait() //等待條件變量變為真
pthread_cond_timedwait() //等待條件變量變為真,等待有時間限制。
pthread_cond_signal() //至少喚醒一個等待該條件的線程
pthread_cond_broadcast() //喚醒等待該條件的所有線程
pthread_cond_destroy() //銷毀一個條件變量
//初始化一個動態的條件變量
//成功返回0,失敗返回error number
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
cond:條件變量指針,這裡使用了restrict關鍵字
attr:條件變量屬性指針,默認屬性賦NULL
//等待條件變量為真。收到pthread_cond_broadcast()或pthread_cond_signal()就喚醒
//成功返回0,失敗返回error number
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
條件變量的使在多線程的程序中,因為各個線程都可以訪問大部分的進程資源,所以我們為了保證公有資源的使用是可以控制的,在一個線程開始使用公有資源之前要嘗試獲取互斥鎖,在使用完畢之後要釋放互斥鎖,這樣就能保證每個公共資源在一個時刻都只能被一個線程使用,在一定程度上得到了控制,但這並不能解決同步的問題,考慮如下兩個線程:
Q=create_queue();
pthread_t mutex
//線程A,入隊
while(1){
lock(mutex);
in_queue(Q);
unlock(mutex);
}
//線程B,出隊
while(1){
lock(mutex);
out_queue(Q);
unlock(mutex);
}
上述代碼可以實現兩個線程的互斥,即同一時刻只有一個線程在使用公有資源-隊列。但如果線程B獲取了鎖,但隊列中是空的,它的out_queue()也就是沒有意義的,所以我們這裡更需要一種方法將兩個線程進行同步:只有當隊列中有數據的時候才進行出隊。
我們設計這樣一種邏輯:
//線程B,出隊
while(1){
lock(mutex);
if(隊列是空,線程不應該執行){
釋放鎖;
continue;
}
out_queue(Q);
unlock(mutex);
}
這個程序就解決了上述的問題,即便線程B搶到了互斥鎖,但是如果隊列是空的,他就釋放鎖讓兩個線程重新搶鎖,希望這次線程A能搶到並往裡放一個數據。
但這個邏輯還有一個問題,就是多線程並發的問題,很有可能發生的一種情況是:線程B搶到了鎖,發現沒有數據,釋放鎖->線程A立即搶到了鎖並往裡放了一個數據->線程B執行continue,顯然,這種情況下是不應該continue的,因為線程B想要的條件在釋放鎖之後立即就被滿足了,它錯過了條件。
So,我們想一種反過來的邏輯:
//線程B,出隊
while(1){
lock(mutex);
if(隊列是空,線程不應該執行){
continue;
釋放鎖;
}
out_queue(Q);
unlock(mutex);
}
顯然這種方法有個致命的問題:一旦continue了,線程B自己獲得鎖就沒有被釋放,這樣線程A不可能搶到鎖,而B繼續加鎖就會形成死鎖!
Finaly,我們希望看到一個函數fcn,如果條件不滿足,能同時釋放鎖+停止執行線程,如果條件滿足,自己當時獲得的鎖還在
//線程B,出隊
while(1){
lock(mutex);
fcn(當前線程不應該執行,mutex) //if(當前線程不應該執行){釋放鎖“同時” continue;}
out_queue(Q);
unlock(mutex);
}
OK,這個就是pthread_cond_wait()的原理了,只不過它把continue變成了"休眠"這種由OS負責的操作,可以大大的節約資源。
當然,線程執行的條件是很難當作參數傳入一個函數的,POSIX多線程的模型使用系統提供的"條件變量"+"我們自己定義的具體條件" 來確定一個線程是否應該執行接下來的內容。"條件變量"只有真和假,所以一種典型的多線程同步的結構如下
//線程B,出隊
while(1){
lock(mutex);
while(條件不滿足)
pthread_cond_wait(cond,mutex)
//獲得互斥鎖可以同時保護while裡的條件和cond的判斷,二者總是用一把鎖保護,並一同釋放
//cond為假,就休眠同時釋放鎖,等待被cond為真喚醒,把自己獲得的鎖拿回來
//拿回自己的鎖再檢查線程執行條件,條件不滿足繼續循環,直到條件滿足跳出循環
//這個函數是帶著"線程的執行條件為真"+"cond為真"走出循環的
//這個函數返回後cond被重新設置為0
out_queue(Q);
unlock(mutex);
}
//使條件變量為真並喚醒wait中的線程,前者喚醒所有wait的,後者喚醒一個
//成功返回0,失敗返回error number
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
//銷毀條件變量
//成功返回0,失敗返回error number
int pthread_cond_destroy(pthread_cond_t *cond);
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread.h
#ifndef __THREAD_H__
#define __THREAD_H__
#define THREAD_NUM 3
#define TASK_NUM 100
typedef struct{//每個節點的封裝格式,fcn是用戶自定義函數,arg是用戶自定義函數參數指針,保證通用性聲明為void
void* (*fcn)(void* arg);
void* arg;
}task_t;
typedef struct{ //用戶自定義函數的參數結構體
int x;
}argfcn_t;
//#define LQ_DATA_T task_t*
#define LQ_DATA_T task_t
#endif //__THREAD_H__
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//lqueue.c
#include"thread.h"
#include"lqueue.h"
#include<stdlib.h>
...
\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
//thread_pool.c
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include"thread.h"
#include"lqueue.h"
//互斥量和條件變量
pthread_mutex_t lock;
pthread_cond_t cond;
//全局的表
lqueue_t* Q;
//每個線程的任務,必須是這個形式的函數指針
void* do_task(void* p){
task_t data;
int ret=0;
while(1){
pthread_mutex_lock(&lock);
while(is_empty_lqueue(Q)){ //大家收到廣播,因為延遲,可能醒了好幾個,要判斷一下是不是自己
pthread_cond_wait(&cond,&lock); //先搶到鎖再醒
}
ret=out_lqueue(Q,&data);
pthread_mutex_unlock(&lock);
data.fcn(data.arg);
}
}
//創建線程池
void create_pool(void){
//初始化隊列
Q=create_lqueue();
//初始化互斥量
pthread_mutex_init(&lock,NULL);
//初始化條件變量
pthread_cond_init(&cond,NULL);
int i=THREAD_NUM;
pthread_t tid[THREAD_NUM];
while(i--)
pthread_create(&tid[i],NULL,do_task,NULL);
}
//准備函數
void* fcn(void* parg){ //用戶自定義的需要線程執行的函數
argfcn_t* i=(argfcn_t*)parg;
printf("this is task1\n");
printf("task1:%d\n",i->x);
}
//添加任務
void pool_add_task(void*(*pfcn)(void*parg),void*arg){
task_t task;
task.fcn=pfcn;
task.arg=arg;
in_lqueue(Q,task);
pthread_cond_signal(&cond); //添加了一個任務,用signal更好
}
int main(int argc, const char *argv[])
{
//創建線程池
create_pool();
//准備參數
argfcn_t argfcn;
argfcn.x=5;
//添加任務
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pool_add_task(fcn,(void*)&argfcn);
pause();
return 0;
}