歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
 Linux教程網 >> Linux編程 >> Linux編程 >> Linux下多線程編程互斥鎖和條件變量的簡單使用

Linux下多線程編程互斥鎖和條件變量的簡單使用

日期:2017/4/19 14:17:00      编辑:Linux編程

Linux下的多線程遵循POSIX線程接口,稱為pthread。編寫Linux下的多線程程序,需要使用頭文件pthread.h,鏈接時需要使用庫libpthread.a。線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位。線程自己基本上不擁有系統資源,只擁有一點在運行中必不可少的資源(如程序計數器、一組寄存器和棧),但是它可與同屬一個進程的其它的線程共享進程所擁有的全部資源。當多個任務可以並行執行時,可以為每個任務啟動一個線程。

線程是並發運行的。在串行程序基礎上引入線程和進程是為了提供程序的並發度,從而提高程序運行效率和響應時間。

與進程相比,線程的優勢:(1)、線程共享相同的內存空間,不同的線程可以存取內存中的同一個變量;(2)、與標准fork()相比,線程帶來的開銷很小,節省了CPU時間,使得線程創建比新進程創建快上十到一百倍。

適應多線程的理由:(1)、和進程相比,它是一種非常“節儉”的多任務操作方式,在Linux系統下,啟動一個新的進程必須分配給它獨立的地址空間,建立眾多的數據表來維護它的代碼段、堆棧段和數據段,這是一種“昂貴”的多任務工作方式。而運行一個進程中的多個線程,它們彼此之間使用相同的地址空間,共享大部分數據,啟動一個線程花費的空間遠遠小於啟動一個進程所花費的空間,而且,線程間彼此切換所需的時間也遠遠小於進程間切換所需要的時間;(2)、線程間方便的通信機制。對不同的進程來說,它們具有獨立的數據空間,要進行數據的傳輸只能通過通信的方式進行,這種方式不僅費時,而且很不方便。線程則不然,同一進程下的線程之間共享數據空間,所以一個線程的數據可以直接為其它線程所用,這不僅快捷,而且方便。

多線程程序作為一種多任務、並發的工作方式,其優點包括:(1)、提供應用程序響應;(2)、使多CPU系統更加有效:操作系統會保證當線程數不大於CPU數目時,不同的線程運行在不同的CPU上;(3)、改善程序結構:一個既長又復雜的進程可以考慮分為多個線程,成為幾個獨立或半獨立的運行部分,這樣的程序利於理解和修改。

pthread_create:用於在調用的進程中創建一個新的線程。它有四個參數,第一個參數為指向線程標識符指針;第二個參數用來設置線程屬性;第三個參數是線程運行函數的起始地址;第四個參數是運行函數的參數。

在一個線程中調用pthread_create函數創建新的線程後,當前線程從pthread_create處繼續往下執行。pthread_create函數的第三個參數為新創建線程的入口函數的起始地址,此函數接收一個參數,是通過第四個參數傳遞給它的,該參數的類型是void*,這個指針按什麼類型解釋由調用者自己定義,入口函數的返回值類型也是void*,這個指針的含義同樣由調用者自己定義,入口函數返回時,這個線程就退出了,其它線程可以調用pthread_join函數得到入口函數的返回值。

pthread_join:線程阻塞函數,用於阻塞當前的線程,直到另外一個線程運行結束;使一個線程等待另一個線程結束;讓主線程阻塞在這個地方等待子線程結束;代碼中如果沒有pthread_join主線程會很快結束從而使整個進程結束,從而使創建的線程沒有機會開始執行就結束了,加入pthread_join後,主線程會一直等待直到等待的線程結束自己才結束,使創建的線程有機會執行。

pthread_create將一個線程拆分為兩個,pthread_join()將兩個線程合並為一個線程。

一個線程實際上就是一個函數,創建後,立即被執行,當函數返回時該線程也就結束了。

線程終止時,一個需要注意的問題是線程間的同步問題。一般情況下,進程中各個線程的運行是相互獨立的,線程的終止並不會相互通知,也不會影響其它線程,終止的線程所占用的資源不會隨著線程的終止而歸還系統,而是仍然為線程所在的進程持有。一個線程僅允許一個線程使用pthread_join等待它的終止,並且被等待的線程應該處於可join狀態,而非DETACHED狀態。一個可”join”的線程所占用的內存僅當有線程對其執行了pthread_join()後才會釋放,因此為了避免內存洩露,所有線程終止時,要麼設為DETACHED,要麼使用pthread_join來回收資源。一個線程不能被多個線程等待。

所有線程都有一個線程號,也就是threadid,其類型為pthread_t,通過調用pthread_self函數可以獲得自身的線程號。

Linux線程同步的幾種基本方式:join、互斥鎖(mutex)、讀寫鎖(read-writelock)、條件變量(condition variables)。mutex的本質是鎖,而條件變量的本質是等待。

互斥:簡單的理解就是,一個線程進入工作區後,如果有其它線程想要進入工作區,它就會進入等待狀態,要等待工作區內的線程結束後才可以進入。

互斥提供線程間資源的獨占訪問控制。它是一個簡單的鎖,只有持有它的線程才可以釋放那個互斥。它確保了它們正在訪問的共享資源的完整性,因為在同一時刻只允許一個線程訪問它。

互斥操作,就是對某段代碼或某個變量修改的時候只能有一個線程在執行這段代碼,其它線程不能同時進入這段代碼或同時修改該變量。這個代碼或變量稱為臨界資源。

通過鎖機制實現線程間的同步,同一時刻只允許一個線程執行一個關鍵部分的代碼。

有兩種方式創建互斥鎖,靜態方式和動態方式。

在默認情況下,Linux下的同一線程無法對同一互斥鎖進行遞歸加鎖,否則將發生死鎖。所謂遞歸加鎖,就是在同一線程中試圖對互斥鎖進行兩次或兩次以上的行為。解決問題的方法就是顯示地在互斥變量初始化時將其設置成recursive屬性。

互斥量是一種用於多線程中的同步訪問的方法,它允許程序鎖住某個對象或者某段代碼,使得每次只能有一個線程訪問它。為了控制對關鍵對象或者代碼的訪問,必須在進入這段代碼之前鎖住一個互斥量,然後在完成操作之後解鎖。

互斥量用pthread_mutex_t數據類型來表示,在使用互斥變量之前,必須首先對它進行初始化,可以把它置為常量PTHREAD_MUTEX_INITIALIZER(只對靜態分配的互斥量),也可以通過調用pthread_mutex_init函數進行初始化。如果動態地分配互斥量(如調用malloc)函數,那麼釋放內存前(free)需要使用pthread_mutex_destroy函數。

對共享資源的訪問,要對互斥量進行加鎖,如果互斥量已經上了鎖,調用線程會阻塞,直到互斥量被解鎖。在完成了對共享資源的訪問後,要對互斥量進行解鎖。

pthread_mutex_init函數:主要用於多線程中互斥鎖的初始化。如果要用默認的屬性初始化互斥量,只需把第二個參數設置為NULL。互斥量的屬性可以分為四種:(1)、PTHREAD_MUTEX_TIMED_NP,這是缺省值,也就是普通鎖,當一個線程加鎖以後,其余請求鎖的線程將形成一個等待隊列,並在解鎖後按優先級獲得鎖,這種鎖策略保證了資源分配的公平性;(2)、PTHREAD_MUTEX_RECURSIVE_NP,嵌套鎖,允許線程多次加鎖,不同線程,解鎖後重新競爭;(3)、PTHREAD_MUTEX_ERRORCHECK_NP,檢錯,如果該互斥量已經被上鎖,那麼後續的上鎖將會失敗而不會阻塞,否則與PTHREAD_MUTEX_TIMED_NP類型相同,這樣就保證當不允許多次加鎖時不會出現最簡單情況下的死鎖;(4)、PTHREAD_MUTEX_ADAPTIVE_NP,適應鎖,動作最簡單的鎖類型,僅等待解鎖後重新競爭。

pthread_mutex_destroy函數:銷毀(注銷)線程互斥鎖;銷毀一個互斥鎖即意味著釋放它所占用的資源,且要求鎖當前處於開放狀態。

pthread_mutex_lock:占有互斥鎖(阻塞操作);互斥鎖被鎖定,如果這個互斥鎖被一個線程鎖定和擁有,那麼另一個線程要調用這個函數就會進入阻塞狀態(即等待狀態),直到互斥鎖被釋放為止;互斥量一旦被上鎖後,其它線程如果想給該互斥量上鎖,那麼就會阻塞在這個操作上,如果在此之前該互斥量已經被其它線程上鎖,那麼該操作將會一直阻塞在這個地方,直到獲得該鎖為止。

pthread_mutex_unlock:釋放互斥鎖;在操作完成後,必須調用該函數給互斥量解鎖,這樣其它等待該鎖的線程才有機會獲得該鎖,否則其它線程將會永遠阻塞。

與互斥鎖不同,條件變量是用來等待而不是用來上鎖的。條件變量用來自動阻塞一個線程,直到某特殊情況發生為止。通常條件變量和互斥鎖同時使用。條件變量分為兩部分:條件和變量。條件本身是由互斥量保護的。線程在改變條件狀態前先要鎖住互斥量。條件變量使我們可以睡眠等待某種條件出現。條件變量是利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動作:一個線程等待“條件變量的條件成立”而掛起;另一個線程使“條件成立”(給出條件成立信號)。條件的檢測是在互斥鎖的保護下進行的。如果一個條件為假,一個線程自動阻塞,並釋放等待狀態改變的互斥鎖。如果另一個線程改變了條件,它發信號給關聯的條件變量,喚醒一個或多個等待它的線程,重新獲得互斥鎖,重新評價條件。如果兩線程共享可讀寫的內存,條件變量可以被用來實現這兩線程間的線程同步。

互斥鎖一個明顯的缺點是它只有兩種狀態,鎖定和非鎖定。而條件變量通過允許線程阻塞和等待另一個線程發送信號的方法彌補了互斥鎖的不足,它常和互斥鎖一起使用。使用時,條件變量被用來阻塞一個線程,當條件不滿足時,線程往往解開相應的互斥鎖並等待條件發生變化。一旦其它的某個線程改變了條件變量,它將通知相應的條件變量喚醒一個或多個正被此條件變量阻塞的線程。這些線程將重新鎖定互斥鎖並重新測試條件是否滿足。一般來說,條件變量被用來進行線程間的同步。條件變量只是起阻塞和喚醒線程的作用,具體的判斷條件還需用戶給出。線程被喚醒後,它將重新檢查判斷條件是否滿足,如果還不滿足,一般說來線程應該仍阻塞在這裡,被等待被下一次喚醒。這個過程一般用while語句實現。

條件變量用pthread_cond_t結構體來表示。

pthread_cond_init:初始化一個條件變量,當第二個參數屬性為空指針時,函數創建的是一個缺省的條件變量,���則條件變量的屬性將由第二個參數的屬性值來決定。不能由多個線程同時初始化一個條件變量。當需要重新初始化或釋放一個條件變量時,應用程序必須保證這個條件變量未被使用。

pthread_cond_wait:阻塞在條件變量上,函數將解鎖第二個參數指向的互斥鎖,並使當前線程阻塞在第一個參數指向的條件變量上。被阻塞的線程可以被pthread_cond_signal、pthread_cond_broadcast函數喚醒,也可能在被信號中斷後被喚醒。

一般一個條件表達式都是在一個互斥鎖的保護下被檢查。當條件表達式未被滿足時,線程將仍然阻塞在這個條件變量上。當另一個線程改變了條件的值並向條件變量發出信號時,等待在這個條件變量上的一個線程或所有線程被喚醒,接著都試圖再次占有相應的互斥鎖。阻塞在條件變量上的線程被喚醒以後,直到pthread_cond_wait函數返回之前,條件的值都有可能發生變化。所以函數返回以後,在鎖定相應的互斥鎖之前,必須重新測試條件值。最好的測試方法是循環調用pthread_cond_wait函數,並把滿足條件的表達式置為循環的終止條件。阻塞在同一個條件變量上的不同線程被釋放的次序是不一定的。

pthread_cond_wait函數是退出點,如果在調用這個函數時,已有一個掛起的退出請求,且線程允許退出,這個線程將被終止並開始執行善後處理函數,而這時和條件變量相關的互斥鎖仍將處在鎖定狀態。

pthread_cond_signal:解除在條件變量上的阻塞。此函數被用來釋放被阻塞在指定條件變量上的一個線程。一般在互斥鎖的保護下使用相應的條件變量,否則對條件變量的解鎖有可能發生在鎖定條件變量之前,從而造成死鎖。喚醒阻塞在條件變量上的所有線程的順序由調度策略決定。

pthread_cond_timewait:阻塞直到指定時間。函數到了一定的時間,即使條件未發生也會解除阻塞。這個時間由第三個參數指定。

pthread_cond_broadcast:釋放阻塞的所有線程。函數喚醒所有被pthread_cond_wait函數阻塞在某個條件變量上的線程。當沒有線程阻塞在這個條件變量上時,此函數無效。此函數喚醒所有阻塞在某個條件變量上的線程,這些線程被喚醒後將再次競爭相應的互斥鎖。

pthread_cond_destroy:釋放條件變量。條件變量占用的空間未被釋放。

pthread_cond_wait和pthread_cond_timewait一定要在mutex的鎖定區域內使用;而pthread_cond_signal和pthread_cond_broadcoast無需考慮調用線程是否是mutex的擁有者,可以在lock與unlock以外的區域調用。

一個特定條件只能有一個互斥對象,而且條件變量應該表示互斥數據“內部”的一種特殊的條件更改。一個互斥對象可以有許多條件變量,但每個條件變量只能有一個互斥對象。

以上所有線程相關函數,函數執行成功時返回0,返回其它非0值表示錯誤。

以下是一些測試例子:

1. test_create_thread.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

void* run(void* para)
{
 cout<<"start new thread!"<<endl;
 
 //sleep(5);//suspend 5 s,在正式的代碼中,一般不要用sleep函數
 int* iptr = (int*)((void**)para)[0];
 float* fptr = (float*)((void**)para)[1];
 char* str = (char*)((void**)para)[2];
 cout << *iptr << "    " << *fptr << "    " << str << endl;

 cout<<"end new thread!"<<endl;
 
 return ((void *)0);
}

int main()
{
 pthread_t pid;//thread handle
 int err = -1;
 int ival = 1;
 float fval = 10.0F;
 char buf[] = "func";
 void* para[3] = { &ival, &fval, buf };

 err = pthread_create(&pid, NULL, run, para);
 if (err != 0) {
  cout << "can't create thread!" << endl;
  return -1;
 }

 //新線程創建之後主線程如何運行----主線程按順序繼續執行下一行程序
 cout << "main thread!" << endl;
 
 //新線程結束時如何處理----新線程先停止,然後作為其清理過程的一部分,等待與另一個線程合並或“連接”
 pthread_join(pid, NULL);

 cout << "ok!" << endl;

 return 0;
}

//終端執行:$ g++ -o test_create_thread test_create_thread.cpp -lpthread
//    $ ./test_create_thread

2. test_thread_mutex.cpp:

#include <pthread.h>
#include <iostream>

using namespace std;

pthread_t tid[2];
int counter = 0;
pthread_mutex_t lock;

void* run(void* arg)
{
 pthread_mutex_lock(&lock);

 unsigned long i = 0;
 counter += 1;
 cout << "Job " << counter << " started!" << endl;
 for (i = 0; i<(0xFFFFFFFF); i++);
 cout << "Job " << counter << " finished!" << endl;

 pthread_mutex_unlock(&lock);

 return NULL;
}

int main()
{
 int i = 0, err = -1;

 if (pthread_mutex_init(&lock, NULL) != 0) {
  cout << "mutex init failed" << endl;
  return -1;
 }

 while (i < 2) {
  err = pthread_create(&(tid[i]), NULL, &run, NULL);
  if (err != 0)
   cout << "can't create thread!" << endl;
  i++;
 }

 pthread_join(tid[0], NULL);
 pthread_join(tid[1], NULL);
 pthread_mutex_destroy(&lock);

 cout << "ok!" << endl;
 return 0;
}

//終端執行:$ g++ -o test_thread_mutex test_thread_mutex.cpp -lpthread
//    $ ./test_thread_mutex

3. test_thread_cond.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

pthread_mutex_t count_lock;
pthread_cond_t count_nonzero;
unsigned count = 0;

void* decrement_count(void* arg)
{
 pthread_mutex_lock(&count_lock);
 cout << "decrement_count get count_lock" << endl;

 while (count == 0) {
  cout << "decrement_count count == 0" << endl;

  cout << "decrement_count before cond_wait" << endl;
  pthread_cond_wait(&count_nonzero, &count_lock);
  cout << "decrement_count after cond_wait" << endl;
 }

 count = count + 1;

 pthread_mutex_unlock(&count_lock);

 return NULL;
}

void* increment_count(void* arg)
{
 pthread_mutex_lock(&count_lock);
 cout << "increment_count get count_lock" << endl;

 if (count == 0) {
  cout << "increment_count before cond_signal" << endl;
  pthread_cond_signal(&count_nonzero);
  cout << "increment_count after cond_signal" << endl;
 }

 count = count + 1;

 pthread_mutex_unlock(&count_lock);

 return NULL;
}

int main()
{
 pthread_t tid1, tid2;

 pthread_mutex_init(&count_lock, NULL);
 pthread_cond_init(&count_nonzero, NULL);

 pthread_create(&tid1, NULL, decrement_count, NULL);
 sleep(2);

 pthread_create(&tid2, NULL, increment_count, NULL);
 sleep(2);

 pthread_join(tid1, NULL);
 pthread_join(tid2, NULL);
 pthread_mutex_destroy(&count_lock);
 pthread_cond_destroy(&count_nonzero);

 cout << "ok!" << endl;
}

//終端執行:$ g++ -o test_thread_cond test_thread_cond.cpp -lpthread
//    $ ./test_thread_cond

4. test_thread_cond1.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

pthread_mutex_t counter_lock;
pthread_cond_t counter_nonzero;
int counter = 0;

void* decrement_counter(void* argv);
void* increment_counter(void* argv);

int main()
{
 cout << "counter: " << counter << endl;
 pthread_mutex_init(&counter_lock, NULL);
 pthread_cond_init(&counter_nonzero, NULL);

 pthread_t thd1, thd2;
 int ret = -1;

 ret = pthread_create(&thd1, NULL, decrement_counter, NULL);
 if (ret){
  cout << "create thread1 fail" << endl;
  return -1;
 }

 ret = pthread_create(&thd2, NULL, increment_counter, NULL);
 if (ret){
  cout << "create thread2 fail" << endl;
  return -1;
 }

 int counter = 0;
 while (counter != 10) {
  cout << "counter(main): " << counter << endl;
  sleep(1);
  counter++;
 }

 pthread_join(thd1, NULL);
 pthread_join(thd2, NULL);
 pthread_mutex_destroy(&counter_lock);
 pthread_cond_destroy(&counter_nonzero);

 cout << "ok!" << endl;
}

void* decrement_counter(void* argv)
{
 cout << "counter(decrement): " << counter << endl;

 pthread_mutex_lock(&counter_lock);
 while (counter == 0)
  pthread_cond_wait(&counter_nonzero, &counter_lock); //進入阻塞(wait),等待激活(signal)

 cout << "counter--(decrement, before): " << counter << endl;
 counter--; //等待signal激活後再執行 
 cout << "counter--(decrement, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock);

 return NULL;
}

void* increment_counter(void* argv)
{
 cout << "counter(increment): " << counter << endl;

 pthread_mutex_lock(&counter_lock);
 if (counter == 0)
  pthread_cond_signal(&counter_nonzero); //激活(signal)阻塞(wait)的線程(先執行完signal線程,然後再執行wait線程) 

 cout << "counter++(increment, before): " << counter << endl;
 counter++;
 cout << "counter++(increment, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock);

 return NULL;
}

//終端執行:$ g++ -o test_thread_cond1 test_thread_cond1.cpp -lpthread
//    $ ./test_thread_cond1

5. test_thread_cond2.cpp:

#include <pthread.h>
#include <iostream>
#include <unistd.h>

using namespace std;

pthread_mutex_t counter_lock1, counter_lock2;
pthread_cond_t counter_nonzero1, counter_nonzero2;
int counter = 0;

void* decrement_increment_counter(void* argv);

int main()
{
 cout << "counter: " << counter << endl;
 pthread_mutex_init(&counter_lock1, NULL);
 pthread_mutex_init(&counter_lock2, NULL);
 pthread_cond_init(&counter_nonzero1, NULL);
 pthread_cond_init(&counter_nonzero2, NULL);

 pthread_t thd;
 int ret = -1;

 ret = pthread_create(&thd, NULL, decrement_increment_counter, NULL);
 if (ret){
  cout << "create thread1 fail" << endl;
  return -1;
 }

 int counter = 0;
 while (counter != 10) {
  cout << "counter(main): " << counter << endl;
  sleep(1);
  counter++;
 }

 pthread_join(thd, NULL);
 pthread_mutex_destroy(&counter_lock1);
 pthread_mutex_destroy(&counter_lock2);
 pthread_cond_destroy(&counter_nonzero1);
 pthread_cond_destroy(&counter_nonzero2);

 cout << "ok!" << endl;
}

void* decrement_increment_counter(void* argv)
{
 cout << "start counter: " << counter << endl;

 pthread_mutex_lock(&counter_lock1);
 cout << "counter(decrement): " << counter << endl;
 while (counter == 1)
  pthread_cond_wait(&counter_nonzero1, &counter_lock1); //進入阻塞(wait),等待激活(signal)

 cout << "counter--(decrement, before): " << counter << endl;
 counter--; //等待signal激活後再執行 
 cout << "counter--(decrement, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock1);

 pthread_mutex_lock(&counter_lock2);
 cout << "counter(increment): " << counter << endl;
 if (counter == 0)
  pthread_cond_signal(&counter_nonzero2); //激活(signal)阻塞(wait)的線程(先執行完signal線程,然後再執行wait線程) 

 cout << "counter++(increment, before): " << counter << endl;
 counter++;
 cout << "counter++(increment, after): " << counter << endl;
 pthread_mutex_unlock(&counter_lock2);

 return NULL;
}

//終端執行:$ g++ -o test_thread_cond2 test_thread_cond2.cpp -lpthread
//    $ ./test_thread_cond2

Copyright © Linux教程網 All Rights Reserved