歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> 關於Linux >> Linux多線程實踐(五 )Posix信號量和互斥鎖解決生產者消費者問題

Linux多線程實踐(五 )Posix信號量和互斥鎖解決生產者消費者問題

日期:2017/3/1 12:18:34   编辑:關於Linux

Posix信號量和System V信號量的一點區別:

system v 信號量只能用於進程間同步,而posix 信號量除了可以進程間同步,還可以線程間同步。system v 信號量每次PV操作可以是N,但Posix 信號量每次PV只能是1。除此之外,posix 信號量還有命名和匿名之分(man 7 sem_overview):

Posix 信號量

有名信號量

無名信號量

sem_open

sem_init

sem_close

sem_destroy

sem_unlink

sem_wait

sem_post

有名信號量

#include            /* For O_* constants */  
#include         /* For mode constants */  
#include   
sem_t *sem_open(const char *name, int oflag);  
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);  
int sem_close(sem_t *sem);  
int sem_unlink(const char *name);  

與Posix類IPC用法類似: 名字以/somename形式標識,且只能有一個/ ,並且總長不能超過NAME_MAX-4 (i.e., 251)。

Posix有名信號量需要用sem_open 函數創建或打開,PV操作分別是sem_wait 和 sem_post,可以使用sem_close 關閉,刪除用sem_unlink。

有名信號量用於不需要共享內存的進程間同步(可以通過名字訪問), 類似System V 信號量。

匿名信號量

#include   
int sem_init(sem_t *sem, int pshared, unsigned int value);  
int sem_destroy(sem_t *sem); 

匿名信號量只存在於內存中, 並要求使用信號量的進程必須可以訪問內存; 這意味著他們只能應用在同一進程中的線程, 或者不同進程中已經映射相同內存內容到它們的地址空間中的線程.

存放在一塊共享內存中,如果是線程共享,這塊區域可以是全局變量;如果是進程共享,可以是system v 共享內存(shmget 創建,shmat 映射),也可以是 posix 共享內存(shm_open 創建,mmap 映射)。

匿名信號量必須用sem_init 初始化,sem_init 函數其中一個參數pshared決定了線程共享(pshared=0)還是進程共享(pshared!=0),也可以用sem_post 和sem_wait 進行操作,在共享內存釋放前,匿名信號量要先用sem_destroy 銷毀。

Posix信號量PV操作
int sem_wait(sem_t *sem);   //P操作  
int sem_post(sem_t *sem);   //V操作 

wait操作實現對信號量的減1, 如果信號量計數原先為0則會發生阻塞;

post操作將信號量加1, 在調用sem_post時, 如果在調用sem_wait中發生了進程阻塞, 那麼進程會被喚醒並且sem_post增1的信號量計數會再次被sem_wait減1;

Posix互斥鎖

#include   
int pthread_mutex_init(pthread_mutex_t *mutex,   
                       const pthread_mutexattr_t *mutexattr);       //互斥鎖初始化, 注意:函數成功執行後,互斥鎖被初始化為未鎖住狀態。  
int pthread_mutex_lock(pthread_mutex_t *mutex); //互斥鎖上鎖  
int pthread_mutex_trylock(pthread_mutex_t *mutex);  //互斥鎖判斷上鎖  
int pthread_mutex_unlock(pthread_mutex_t *mutex);   //互斥鎖解鎖  
int pthread_mutex_destroy(pthread_mutex_t *mutex);  //消除互斥鎖 

互斥鎖是用一種簡單的加鎖方法來控制對共享資源的原子操作。這個互斥鎖只有兩種狀態,也就是上鎖/解鎖,可以把互斥鎖看作某種意義上的全局變量。在同一時刻只能有一個線程掌握某個互斥鎖,擁有上鎖狀態的線程能夠對共享資源進行操作。若其他線程希望上鎖一個已經被上鎖的互斥鎖,則該線程就會阻塞,直到上鎖的線程釋放掉互斥鎖為止。可以說,這把互斥鎖保證讓每個線程對共享資源按順序進行原子操作。

其中,互斥鎖可以分為快速互斥鎖(默認互斥鎖)、遞歸互斥鎖和檢錯互斥鎖。這三種鎖的區別主要在於其他未占有互斥鎖的線程在希望得到互斥鎖時是否需要阻塞等待。快速鎖是指調用線程會阻塞直至擁有互斥鎖的線程解鎖為止。遞歸互斥鎖能夠成功地返回,並且增加調用線程在互斥上加鎖的次數,而檢錯互斥鎖則為快速互斥鎖的非阻塞版本,它會立即返回並返回一個錯誤信息。

解決生產者消費者問題

關於這個經典問題就不再敘述了,自行baidu吧。我們使用偽代碼理一下流程,然後使用上面的API實現就很容易了。

簡單概述: 一組生產者,一組消費者,公用n個環形緩沖區 。

在這個問題中,不僅生產者與消費者之間要同步,而且各個生產者之間、各個消費者之間還必須互斥地訪問緩沖區。

定義四個信號量:

empty——表示緩沖區是否為空,初值為n。

full——表示緩沖區中是否為滿,初值為0。

mutex1——生產者之間的互斥信號量,初值為1。

mutex2——消費者之間的互斥信號量,初值為1。

設緩沖區的編號為1~n-1,定義兩個指針in和out,分別是生產者進程和消費者進程使用的指針,指向下一個可用的緩沖區。

生產者進程 while(TRUE)
{       
     生產一個產品; 
     P(empty); 
     P(mutex1); 
     產品送往buffer(in);  
 in=(in+1)mod n;  
 V(mutex1);  
 V(full); 
}

消費者進程 
while(TRUE)
{   
   P(full)  
   P(mutex2);  
   從buffer(out)中取出產品;
out=(out+1)mod n;  
  V(mutex2);  
  V(empty);  
  消費該產品;
}
#include 
#include 
#include 
#include 

#include 
#include 
#include 
#include 

#define ERR_EXIT(m) \
        do \
        { \
                perror(m); \
                exit(EXIT_FAILURE); \
        } while(0)

#define CONSUMERS_COUNT 1
#define PRODUCERS_COUNT 1
#define BUFFSIZE 10

int g_buffer[BUFFSIZE];

unsigned short in = 0;
unsigned short out = 0;
unsigned short produce_id = 0;
unsigned short consume_id = 0;

sem_t g_sem_full;
sem_t g_sem_empty;
pthread_mutex_t g_mutex;

pthread_t g_thread[CONSUMERS_COUNT + PRODUCERS_COUNT];

void *consume(void *arg)
{
    int i;
    int num = (int)arg;
    while (1)
    {
        printf("%d wait buffer not empty\n", num);
        sem_wait(&g_sem_empty);
        pthread_mutex_lock(&g_mutex);

        for (i = 0; i < BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

            if (i == out)
                printf("\t<--consume");

            printf("\n");
        }
        consume_id = g_buffer[out];
        printf("%d begin consume product %d\n", num, consume_id);
        g_buffer[out] = -1;
        out = (out + 1) % BUFFSIZE;
        printf("%d end consume product %d\n", num, consume_id);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_full);
        sleep(1);
    }
    return NULL;
}

void *produce(void *arg)
{
    int num = (int)arg;
    int i;
    while (1)
    {
        printf("%d wait buffer not full\n", num);
        sem_wait(&g_sem_full);
        pthread_mutex_lock(&g_mutex);
        for (i = 0; i < BUFFSIZE; i++)
        {
            printf("%02d ", i);
            if (g_buffer[i] == -1)
                printf("%s", "null");
            else
                printf("%d", g_buffer[i]);

            if (i == in)
                printf("\t<--produce");

            printf("\n");
        }

        printf("%d begin produce product %d\n", num, produce_id);
        g_buffer[in] = produce_id;
        in = (in + 1) % BUFFSIZE;
        printf("%d end produce product %d\n", num, produce_id++);
        pthread_mutex_unlock(&g_mutex);
        sem_post(&g_sem_empty);
        sleep(5);
    }
    return NULL;
}

int main(void)
{
    int i;
    for (i = 0; i < BUFFSIZE; i++)
        g_buffer[i] = -1;

    sem_init(&g_sem_full, 0, BUFFSIZE);
    sem_init(&g_sem_empty, 0, 0);

    pthread_mutex_init(&g_mutex, NULL);


    for (i = 0; i < CONSUMERS_COUNT; i++)
        pthread_create(&g_thread[i], NULL, consume, (void *)i);

    for (i = 0; i < PRODUCERS_COUNT; i++)
        pthread_create(&g_thread[CONSUMERS_COUNT + i], NULL, produce, (void *)i);

    for (i = 0; i < CONSUMERS_COUNT + PRODUCERS_COUNT; i++)
        pthread_join(g_thread[i], NULL);

    sem_destroy(&g_sem_full);
    sem_destroy(&g_sem_empty);
    pthread_mutex_destroy(&g_mutex);

    return 0;
}
這裡是演示線程間同步,現在上述程序生產者消費者各一個線程,但生產者睡眠時間是消費者的5倍,故消費者會經常阻塞在sem_wait(&g_sem_empty) 上面,因為緩沖區經常為空,可以將PRODUCTORS_COUNT 改成5,即有5個生產者線程和1個消費者線程,而且生產者睡眠時間還是消費者的5倍,從動態輸出可以看出,基本上就動態平衡了,即5個生產者一下子生產了5份東西,消費者1s消費1份,剛好在生產者繼續生產前消費完。

自旋鎖和讀寫鎖

(1)自旋鎖(Spin lock)

自旋鎖與互斥鎖有點類似,只是自旋鎖不會引起調用者睡眠,如果自旋鎖已經被別的執行單元保持,調用者就一直循環在那裡看是 否該自旋鎖的保持者已經釋放了鎖,"自旋"一詞就是因此而得名。其作用是為了解決某項資源的互斥使用。因為自旋鎖不會引起調用者睡眠,所以自旋鎖的效率遠 高於互斥鎖。雖然它的效率比互斥鎖高,但是它也有些不足之處:
1、自旋鎖一直占用CPU,他在未獲得鎖的情況下,一直運行--自旋,所以占用著CPU,如果不能在很短的時 間內獲得鎖,這無疑會使CPU效率降低。
2、在用自旋鎖時有可能造成死鎖,當遞歸調用時有可能造成死鎖,調用有些其他函數也可能造成死鎖,如 copy_to_user()、copy_from_user()、kmalloc()等。
因此我們要慎重使用自旋鎖,自旋鎖只有在內核可搶占式或SMP的情況下才真正需要,在單CPU且不可搶占式的內核下,自旋鎖的操作為空操作。自旋鎖適用於鎖使用者保持鎖時間比較短的情況下。
自旋鎖的用法如下:
首先定義:spinlock_t x;
然後初始化:spin_lock_init(spinlock_t *x); //自旋鎖在真正使用前必須先初始化
在2.6.11內核中將定義和初始化合並為一個宏:DEFINE_SPINLOCK(x)

獲得自旋鎖:spin_lock(x); //只有在獲得鎖的情況下才返回,否則一直“自旋”
spin_trylock(x); //如立即獲得鎖則返回真,否則立即返回假
釋放鎖: spin_unlock(x);

(2)讀寫鎖

1、只要沒有線程持有給定的讀寫鎖用於寫,那麼任意數目的線程可以持有讀寫鎖用於讀
2、僅當沒有線程持有某個給定的讀寫鎖用於讀或用於寫時,才能分配讀寫鎖用於寫
3、讀寫鎖用於讀稱為共享鎖,讀寫鎖用於寫稱為排它鎖
pthread_rwlock_init
pthread_rwlock_destroy
int pthread_rwlock_rdlock
int pthread_rwlock_wrlock
int pthread_rwlock_unlock

關於linux的鎖的相關問題可以參考IBM的文檔:

http://www.ibm.com/developerworks/cn/linux/l-cn-mthreadps/index.html

Copyright © Linux教程網 All Rights Reserved