歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux綜合 >> Linux內核 >> Linux內核中的同步和互斥分析報告

Linux內核中的同步和互斥分析報告

日期:2017/2/27 9:23:55   编辑:Linux內核

  先看進程間的互斥。在Linux內核中主要通過semaphore機制和spin_lock機制實現。主要的區別是在semaphore機制中,進不了臨界區時會進行進程的切換,而spin_lock剛執行忙等(在SMP中)。先看內核中的semaphore機制。前提是對引用計數count增減的原子性操作。內核用atomic_t的數據結構和在它上面的一系列操作如atomic_add()、atomic_sub()等等實現。(定義在atomic.h中)semaphone機制主要通過up()和down()兩個操作實現。semaphone的結構為:

strUCt semaphore { atomic_t count; int sleepers; wait_queue_head_t wait; };

  相應的down()函數為:

static inline void down(struct semaphore*sem) { /* 1 */sem->count--; //為原子操作 if(sem->count<0) { struct task_struct *tsk = current; DECLARE_WAITQUEUE(wait, tsk); tsk->state = TASK_UNINTERRUPTIBLE; add_wait_queue_exclusive(&sem->wait, &wait); spin_lock_irq(&semaphore_lock); /* 2 */ sem->sleepers++; for (;;) { int sleepers = sem->sleepers; /* * Add "everybody else" into it. They aren't * playing, because we own the spinlock. */ /* 3 */ if (!atomic_add_negative(sleepers - 1, &sem->count)) { /* 4 */ sem->sleepers = 0; //這時sem->count=0 break; } /* 4 */ sem->sleepers = 1; /* us - see -1 above */ // 這時sem ->count =-1 spin_unlock_irq(&semaphore_lock); schedule(); tsk->state = TASK_UNINTERRUPTIBLE; spin_lock_irq(&semaphore_lock); } spin_unlock_irq(&semaphore_lock); remove_wait_queue(&sem->wait, &wait); tsk->state = TASK_RUNNING; wake_up(&sem->wait); } }

  相應的up()函數為:

void up(struct semaphore*sem) { sem->count++; //為原子操作 if(sem->count<=0) { //喚醒等待隊列中的一個符合條件的進程(因為每個進程都加了TASK_EXCLUSIVE標志) 。 };

  假設開始時,count=1;sleepers=0。當進程A執行down()時,引用計數count--,如果這時它的值大於等於0,則從down()中直接返回。如果count少於0,則A的state改為TASK_INTERRUPTIBLE後進入這個信號量的等待隊列中,同時使sleepers++;然後重新計算count=sleepers - 1 + count,若這時引用計數仍小於0(一般情況下應為-1,因為count = - sleepers,不過在SMP結構中,期間別的進程可能執行了up()和down()從而使得引用計數的值可能變化),則執行進程切換。   當進程A又獲得機會運行時,它先執行wake_up(&sem->wait)操作,喚醒等待隊列裡的一個進程,接著它進入臨界區,從臨界區出來時執行up()操作,使sem->count++,(如果進程A是從down()中直接返回,因為這時等待隊列一定為空,所以它不用執行wake_up()操作,直接進入臨界區,在從臨界區出來時一樣執行up()操作,使 sem->count++)。這時如果count的值小於等於0,這表明在它在臨界區期間又有一個進程(可能就是它進入臨界區時喚醒的那個進程)進入睡眠了,則執行wake_up()操作,反之,如果count的值已經大於0,這表明在它在臨界區期間沒有別的進程(包括在它進入臨界區時被它喚醒過的那個進程)進入睡眠,那麼它就可以直接返回了。

  從被喚醒的那個進程看看,如果在喚醒它的進程沒執行up()之前它就得到了運行機會,這時它又重新計算count=sleepers - 1 + count=-1;從而sleepers被賦值1;這時它又必須進行調度讓出運行的機會給別的進程,自己去睡眠。這正是發生在喚醒它的進程在臨界區時運行的時候。如果是在喚醒它的進程執行了up()操作後它才得到了運行機會,而且在喚醒它的進程在臨界區期間時沒別的進程執行down(),則count的值在進程執行up()之前依然為0,這時在up()裡面就不必要再執行wake_up()函數了。可以通過一個例子來說明具體的實現。設開始sem->count=sem->sleepers=0。也就是有鎖但無等待隊列 (一個進程已經在運行中)。先後分別進行3個down()操作,和3個up()操作,如下:為了闡述方便,只保留了一些會改變sleepers和count值的步驟,並且遵循從左到右依次進行的原則。

down1: count(0->-1),sleepers(0->1),sleepers-1+count(-1),count(-1),sleepers(1),調度 down2: count(-1->-2),sleepers(1->2),sleepers-1+count(-1),count(-1),sleepers(1),調度 down3: count(-1->-2),sleepers(1->2),sleepers-1+count(-1),count(-1),sleepers(1),調度

  

up1: count(-1->0),喚醒一個睡眠進程(設為1),(進程1得到機會運行) sleepers-1+count(0),count(0),sleepers(0),break, 喚醒另一個睡眠進程(設為2),(進程2得到機會運行) sleepers-1+count(-1),count(-1),sleepers(1), 調度(沒達到條件,又得睡覺)也可能是這樣的: up1`: count(-1->0),喚醒一個睡眠進程(設為1),(進程1得到機會運行) sleepers-1+count(0),count(0),sleepers(0),break, 喚醒另一個睡眠進程(設為2),進程2在以後才得到機會運行) up2: count(-1->0),(因為count<=0)喚醒一個睡眠進程(設為2),進程2得到機會運行) sleepers-+count(0) , count(0) , sleepers(0) ,break, 喚醒另一個睡眠進程(設為3),進程3得到機會運行) sleepers-1+count(-1),count(-1),sleepers(1), 調度(沒達到條件,又得睡覺)對應上面的1`: up2`: count(0->1),(因為count>0,所以直接返回)進程2得到機會運行) sleepers-1+count(0),count(0),sleepers(0),break, 喚醒另一個睡眠進程,(設為3) up3: count(-1->0),(因為count<=0)喚醒一個睡眠進程(設為3),進程3得到機會運行) sleepers-1+count(0),count(0),sleepers(0),break, 喚醒另一個睡眠進程(這時隊列裡沒進程了) 進程3運行結束,執行up(), 使count =1 ,這時變成沒鎖狀態 ) 對應上邊的2`: up3`: count(0->1),(因為count>0,所以直接返回)進程3得到機會運行) sleepers-1+count(0),count(0),sleepers(0),break, 喚醒另一個睡眠進程(這時隊列裡沒進程了) 進程3運行結束,執行up(), 使count =1 ,這時變成沒鎖狀態 )

  當然,還有另一種情況,就是up()操作和down()操作是交*出現的,一般的規律就是,如果進程在臨界區期間又有進程(無論是哪個進程,新來的還是剛被喚醒的那個)進入睡眠,就會令count的值從0變為-1,從而進程在從臨界區出來執行up()裡就必須執行一次wake_up(),以確保所有的進程都能被喚醒,因為多喚醒幾個是沒關系的。如果進程在臨界區期間沒有別的進程進入睡眠,則從臨界區出來執行up()時就用不著去執行wake_up()了(當然,執行了也沒什麼影響,不過多余罷了)。而為什麼要把wake_up()和count++分開呢,可以從上面的up1看出來,例如,進程2第一次得到機會運行時,本來這時喚醒它的進程還沒執行up()的,但有可能其它進程執行了up()了,所以真有可能會發現count==1的情況,這時它就真的不用睡覺了,令count=sleepers=0,就可以接著往下執行了。還可看出一點,一般的,( count ,sleepers)的值的取值范圍為(n ,0)[n>0] 和(0 ,0)和 (1 ,-1)。下邊看看spin_lock機制。

  Spin_lock采用的方式是讓一個進程運行,另外的進程忙等待,由於在只有一個cpu的機器(UP)上微觀上只有一個進程在運行。所以在UP中,spin_lock和spin_unlock就都是空的了。在SMP中,spin_lock()和spin_unlock()定義如下:

typedef struct { volatile unsigned int lock; } spinlock_t; static inline void spin_lock(spinlock_t *lock) { __asm__ __volatile__( "\n1:\t" "lock ; decb %0\n\t" "js 2f\n" //lock->lock< 0 ,jmp 2 forward ".section .text.lock,\"ax\"\n" "2:\t" "cmpb $0,%0\n\t" //wait lock->lock==1 "rep;nop\n\t" "jle 2b\n\t" "jmp 1b\n" ".previous" :"=m" (lock->lock) : : "memory"); } static inline void spin_unlock(spinlock_t *lock) { __asm__ __volatile__( "movb $1,%0" :"=m" (lock->lock) : : "memory"); //lock->lock=1 }

  一般是如此使用:

#define SPIN_LOCK_UNLOCKED (spinlock_t) { 1 } spinlock_t xxx_lock = SPIN_LOCK_UNLOCKED; spin_lock_(&xxx_lock) ... critical section ... spin_unlock (&xxx_lock)   可以看出,它和semaphore機制解決的都是兩個進程的互斥問題,都是讓一個進程退出臨界區後另一個進程才進入的方法,不過sempahore機制實行的是讓進程暫時讓出CPU,進入等待隊列等待的策略,而spin_lock實行的卻是卻進程在原地空轉,等著另一個進程結束的策略。

  下邊考慮中斷對臨界區的影響。要互斥的還有進程和中斷服務程序之間。當一個進程在執行一個臨界區的代碼時,可能發生中斷,而中斷函數可能就會調用這個臨界區的代碼,不讓它進入的話就會產生死鎖。這時一個有效的方法就是關中斷了。

#define local_irq_save(x) __asm__ __volatile__("pushfl ; popl %0 ; cli": "=g" (x): /* no input */ :"memory") #define local_irq_restore(x) __asm__ __volatile__("pushl %0 ; popfl": /* no output */ :"g" (x):"memory") #define local_irq_disable() __asm__ __volatile__("cli": : :"memory") #define local_irq_enable() __asm__ __volatile__("sti": : :"memory") #define cpu_bh_disable(cpu) do { local_bh_count(cpu)++; barrier(); } while (0) #define cpu_bh_enable(cpu) do { barrier(); local_bh_count(cpu)--; } while (0) #define local_bh_disable() cpu_bh_disable(smp_processor_id()) #define local_bh_enable() cpu_bh_enable(smp_processor_id())

  對於UP來說,上面已經是足夠了,不過對於SMP來說,還要防止來自其它cpu的影響,這時解決的方法就可以把上面的spin_lock機制也綜合進來了。

#define spin_lock_irqsave(lock, flags) do { local_irq_save(flags); sp in_lock(lock); } while (0) #define spin_lock_irq(lock) do { local_irq_disable(); spin_lock(lo ck); } while (0) #define spin_lock_bh(lock) do { local_bh_disable(); spin_lock(loc k); } while (0) #define spin_unlock_irqrestore(lock, flags) do { spin_unlock(lock); local_i rq_restore(flags); } while (0) #define spin_unlock_irq(lock) do { spin_unlock(lock); local_irq_enable(); } while (0) #define spin_unlock_bh(lock) do { spin_unlock(lock); local_bh_enable(); } while (0)

  前面說過,對於UP來說,spin_lock()是空的,所以以上的定義就一起適用於UP 和SMP的情形了。

read_lock_irqsave(lock, flags) , read_lock_irq(lock), read_lock_bh(lock) 和 write_lock_irqsave(lock, flags) , write_lock_irq(lock), write_lock_bh(lock )

  就是spin_lock的一個小小的變型而己了。




Copyright © Linux教程網 All Rights Reserved