歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> Linux技術 >> Linux 設備驅動開發 —— Tasklets 機制淺析

Linux 設備驅動開發 —— Tasklets 機制淺析

日期:2017/3/3 11:49:28   编辑:Linux技術
一 、Tasklets 機制基礎知識點
1、Taklets 機制概念
Tasklets 機制是linux中斷處理機制中的軟中斷延遲機制。通常用於減少中斷處理的時間,將本應該是在中斷服務程序中完成的任務轉化成軟中斷完成。
為了最大程度的避免中斷處理時間過長而導致中斷丟失,有時候我們需要把一些在中斷處理中不是非常緊急的任務放在後面執行,而讓中斷處理程序盡快返回。在老版本的 linux 中通常將中斷處理分為 top half handler 、 bottom half handler 。利用 top half handler 處理中斷必須處理的任務,而 bottom half handler 處理不是太緊急的任務。
但是 linux2.6 以後的 linux 采取了另外一種機制,就是軟中斷來代替 bottom half handler 的處理。而 tasklet 機制正是利用軟中斷來完成對驅動 bottom half 的處理。 Linux2.6 中軟中斷通常只有固定的幾種: HI_SOFTIRQ( 高優先級的 tasklet ,一種特殊的 tasklet) 、 TIMER_SOFTIRQ (定時器)、 NET_TX_SOFTIRQ (網口發送)、 NET_RX_SOFTIRQ (網口接收) 、 BLOCK_SOFTIRQ
(塊設備)、 TASKLET_SOFTIRQ (普通 tasklet )。當然也可以通過直接修改內核自己加入自己的軟中斷,但是一般來說這是不合理的,軟中斷的優先級比較高,如果不是在內核處理頻繁的任務不建議使用。通常驅動用戶使用 tasklet 足夠了。
機制流程:當linux接收到硬件中斷之後,通過 tasklet 函數來設定軟中斷被執行的優先程度從而導致軟中斷處理函數被優先執行的差異性。
特點:tasklet的優先級別較低,而且中斷處理過程中可以被打斷。但被打斷之後,還能進行自我恢復,斷點續運行。
2、Tasklets 解決什麼問題?
a -- tasklet是I/O驅動程序中實現可延遲函數的首選方法
b -- tasklet和工作隊列是延期執行工作的機制,其實現基於軟中斷,但他們更易於使用,因而更適合與設備驅動程序...tasklet是“小進程”,執行一些迷你任務,對這些人物使用全功能進程可能比較浪費。
c -- tasklet是並行可執行(但是是鎖密集型的)軟件中斷和舊下半區的一種混合體,這裡既談不上並行性,也談不上性能。引入tasklet是為了替代原來的下半區。
軟中斷是將操作推遲到未來時刻執行的最有效的方法。但該延期機制處理起來非常復雜。因為多個處理器可以同時且獨立的處理軟中斷,同一個軟中斷的處理程序可以在幾個CPU上同時運行。對軟中斷的效率來說,這是一個關鍵,多處理器系統上的網絡實現顯然受惠於此。但處理程序的設計必須是完全可重入且線程安全的。另外,臨界區必須用自旋鎖保護(或其他IPC機制),而這需要大量審慎的考慮。
我自己的理解,由於軟中斷以ksoftirqd的形式與用戶進程共同調度,這將關系到OS整體的性能,因此軟中斷在Linux內核中也僅僅就幾個(網絡、時鐘、調度以及Tasklet等),在內核編譯時確定。軟中斷這種方法顯然不是面向硬件驅動的,而是驅動更上一層:不關心如何從具體的網卡接收數據包,但是從所有的網卡接收的數據包都要經過內核協議棧的處理。而且軟中斷比較“硬”——數量固定、編譯時確定、操作函數必須可重入、需要慎重考慮鎖的問題,不適合驅動直接調用,因此Linux內核為驅動直接提供了一種使用軟中斷的方法,就是tasklet。
軟中斷和 tasklet 的關系如下圖:

上圖可以看出, ksoftirqd 是一個後台運行的內核線程,它會周期的遍歷軟中斷的向量列表,如果發現哪個軟中斷向量被掛起了( pend ),就執行對應的處理函數,對於 tasklet 來說,此處理函數就是 tasklet_action ,這個處理函數在系統啟動時初始化軟中斷的就掛接了。Tasklet_action 函數,遍歷一個全局的 tasklet_vec 鏈表(此鏈表對於 SMP 系統是每個 CPU 都有一個),此鏈表中的元素為 tasklet_struct 。下面將介紹各個函數
二、tasklet數據結構
tasklet通過軟中斷實現,軟中斷中有兩種類型屬於tasklet,分別是級別最高的HI_SOFTIRQ和TASKLET_SOFTIRQ。
Linux內核采用兩個PER_CPU的數組tasklet_vec[]和tasklet_hi_vec[]維護系統種的所有tasklet(kernel/softirq.c),分別維護TASKLET_SOFTIRQ級別和HI_SOFTIRQ級別的tasklet:
[cpp] view
plain copy





struct tasklet_head
{
struct tasklet_struct *head;
struct tasklet_struct *tail;
};
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);

tasklet的核心結構體如下(include/linux/interrupt.h):
[cpp] view
plain copy





struct tasklet_struct
{
struct tasklet_struct *next;
unsigned long state;
atomic_t count;
void (*func)(unsigned long);
unsigned long data;
};
各成員的含義如下:
a -- next指針:指向下一個tasklet的指針。
b -- state:定義了這個tasklet的當前狀態。這一個32位的無符號長整數,當前只使用了bit[1]和bit[0]兩個狀態位。其中,bit[1]=1表示這個tasklet當前正在某個CPU上被執行,它僅對SMP系統才有意義,其作用就是為了防止多個CPU同時執行一個tasklet的情形出現;bit[0]=1表示這個tasklet已經被調度去等待執行了。對這兩個狀態位的宏定義如下所示(interrupt.h)
[cpp] view
plain copy





enum
{
TASKLET_STATE_SCHED,
TASKLET_STATE_RUN
};
TASKLET_STATE_SCHED置位表示已經被調度(掛起),也意味著tasklet描述符被插入到了tasklet_vec和tasklet_hi_vec數組的其中一個鏈表中,可以被執行。TASKLET_STATE_RUN置位表示該tasklet正在某個CPU上執行,單個處理器系統上並不校驗該標志,因為沒必要檢查特定的tasklet是否正在運行。
c -- 原子計數count:對這個tasklet的引用計數值。NOTE!只有當count等於0時,tasklet代碼段才能執行,也即此時tasklet是被使能的;如果count非零,則這個tasklet是被禁止的。任何想要執行一個tasklet代碼段的人都首先必須先檢查其count成員是否為0。
d -- 函數指針func:指向以函數形式表現的可執行tasklet代碼段。
e -- data:函數func的參數。這是一個32位的無符號整數,其具體含義可供func函數自行解釋,比如將其解釋成一個指向某個用戶自定義數據結構的地址值。
三、tasklet操作接口
tasklet對驅動開放的常用操作包括:
a -- 初始化,tasklet_init(),初始化一個tasklet描述符。
b -- 調度,tasklet_schedule()和tasklet_hi_schedule(),將taslet置位TASKLET_STATE_SCHED,並嘗試激活所在的軟中斷。
c -- 禁用/啟動,tasklet_disable_nosync()、tasklet_disable()、task_enable(),通過count計數器實現。
d -- 執行,tasklet_action()和tasklet_hi_action(),具體的執行軟中斷。
e -- 殺死,tasklet_kill()
即驅動程序在初始化時,通過函數task_init建立一個tasklet,然後調用函數tasklet_schedule將這個tasklet放在 tasklet_vec鏈表的頭部,並喚醒後台線程ksoftirqd。當後台線程ksoftirqd運行調用__do_softirq時,會執行在中斷向量表softirq_vec裡中斷號TASKLET_SOFTIRQ對應的tasklet_action函數,然後tasklet_action遍歷 tasklet_vec鏈表,調用每個tasklet的函數完成軟中斷操作。
1、tasklet_int()函數實現如下(kernel/softirq.c)
用來初始化一個指定的tasklet描述符
[cpp] view
plain copy





void tasklet_init(struct tasklet_struct *t,void (*func)(unsigned long), unsigned long data)
{
t->next = NULL;
t->state = 0;
atomic_set(&t->count, 0);
t->func = func;
t->data = data;
}
2、tasklet_schedule()函數
與tasklet_hi_schedule()函數的實現很類似,這裡只列tasklet_schedule()函數的實現(kernel/softirq.c),都挺明白就不描述了:
[cpp] view
plain copy





static inline void tasklet_schedule(struct tasklet_struct *t)
{
if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
__tasklet_schedule(t);
}
void __tasklet_schedule(struct tasklet_struct *t)
{
unsigned long flags;
local_irq_save(flags);
t->next = NULL;
*__this_cpu_read(tasklet_vec.tail) = t;
__this_cpu_write(tasklet_vec.tail, &(t->next));
raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_restore(flags);
}
該函數的參數t指向要在當前CPU上被執行的tasklet。對該函數的NOTE如下:
a -- 調用test_and_set_bit()函數將待調度的tasklet的state成員變量的bit[0]位(也即TASKLET_STATE_SCHED位)設置為1,該函數同時還返回TASKLET_STATE_SCHED位的原有值。因此如果bit[0]為的原有值已經為1,那就說明這個tasklet已經被調度到另一個CPU上去等待執行了。由於一個tasklet在某一個時刻只能由一個CPU來執行,因此tasklet_schedule()函數什麼也不做就直接返回了。否則,就繼續下面的調度操作。
b -- 首先,調用local_irq_save()函數來關閉當前CPU的中斷,以保證下面的步驟在當前CPU上原子地被執行。
c -- 然後,將待調度的tasklet添加到當前CPU對應的tasklet隊列的首部。
d -- 接著,調用__cpu_raise_softirq()函數在當前CPU上觸發軟中斷請求TASKLET_SOFTIRQ。
e -- 最後,調用local_irq_restore()函數來開當前CPU的中斷。
3、tasklet_disable()函數、task_enable()函數以及tasklet_disable_nosync()函數(include/linux/interrupt.h)
使能與禁止操作往往總是成對地被調用的
[cpp] view
plain copy





static inline void tasklet_disable_nosync(struct tasklet_struct *t)
{
atomic_inc(&t->count);
smp_mb__after_atomic_inc();
}
static inline void tasklet_disable(struct tasklet_struct *t)
{
tasklet_disable_nosync(t);
tasklet_unlock_wait(t);
smp_mb();
}
static inline void tasklet_enable(struct tasklet_struct *t)
{
smp_mb__before_atomic_dec();
atomic_dec(&t->count);
}
4、tasklet_action()函數在softirq_init()函數中被調用:
[cpp] view
plain copy





void __init softirq_init(void)
{
...
open_softirq(TASKLET_SOFTIRQ, tasklet_action);
open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}
tasklet_action()函數
[cpp] view
plain copy





static void tasklet_action(struct softirq_action *a)
{
struct tasklet_struct *list;
local_irq_disable();
list = __this_cpu_read(tasklet_vec.head);
__this_cpu_write(tasklet_vec.head, NULL);
__this_cpu_write(tasklet_vec.tail, &__get_cpu_var(tasklet_vec).head);
local_irq_enable();
while (list)
{
struct tasklet_struct *t = list;
list = list->next;
if (tasklet_trylock(t))
{
if (!atomic_read(&t->count))
{
if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
BUG();
t->func(t->data);
tasklet_unlock(t);
continue;
}
tasklet_unlock(t);
}
local_irq_disable();
t->next = NULL;
*__this_cpu_read(tasklet_vec.tail) = t;
__this_cpu_write(tasklet_vec.tail, &(t->next));
__raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_enable();
}
}
注釋如下:
①首先,在當前CPU關中斷的情況下,“原子”地讀取當前CPU的tasklet隊列頭部指針,將其保存到局部變量list指針中,然後將當前CPU的tasklet隊列頭部指針設置為NULL,以表示理論上當前CPU將不再有tasklet需要執行(但最後的實際結果卻並不一定如此,下面將會看到)。
②然後,用一個while{}循環來遍歷由list所指向的tasklet隊列,隊列中的各個元素就是將在當前CPU上執行的tasklet。循環體的執行步驟如下:
a -- 用指針t來表示當前隊列元素,即當前需要執行的tasklet。
b -- 更新list指針為list->next,使它指向下一個要執行的tasklet。
c -- 用tasklet_trylock()宏試圖對當前要執行的tasklet(由指針t所指向)進行加鎖
如果加鎖成功(當前沒有任何其他CPU正在執行這個tasklet),則用原子讀函atomic_read()進一步判斷count成員的值。如果count為0,說明這個tasklet是允許執行的,於是:
(1)先清除TASKLET_STATE_SCHED位;
(2)然後,調用這個tasklet的可執行函數func;
(3)執行barrier()操作;
(4)調用宏tasklet_unlock()來清除TASKLET_STATE_RUN位。
(5)最後,執行continue語句跳過下面的步驟,回到while循環繼續遍歷隊列中的下一個元素。如果count不為0,說明這個tasklet是禁止運行的,於是調用tasklet_unlock()清除前面用tasklet_trylock()設置的TASKLET_STATE_RUN位。
如果tasklet_trylock()加鎖不成功,或者因為當前tasklet的count值非0而不允許執行時,我們必須將這個tasklet重新放回到當前CPU的tasklet隊列中,以留待這個CPU下次服務軟中斷向量TASKLET_SOFTIRQ時再執行。為此進行這樣幾步操作:
(1)先關CPU中斷,以保證下面操作的原子性。
(2)把這個tasklet重新放回到當前CPU的tasklet隊列的首部;
(3)調用__cpu_raise_softirq()函數在當前CPU上再觸發一次軟中斷請求TASKLET_SOFTIRQ;
(4)開中斷。
c -- 最後,回到while循環繼續遍歷隊列。
5、tasklet_kill()實現
[cpp] view
plain copy





void tasklet_kill(struct tasklet_struct *t)
{
if (in_interrupt())
printk("Attempt to kill tasklet from interruptn");
while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
{
do {
yield();
} while (test_bit(TASKLET_STATE_SCHED, &t->state));
}
tasklet_unlock_wait(t);
clear_bit(TASKLET_STATE_SCHED, &t->state);
}
四、一個tasklet調用例子
找了一個tasklet的例子看一下(drivers/usb/atm,usb攝像頭),在其自舉函數usbatm_usb_probe()中調用了tasklet_init()初始化了兩個tasklet描述符用於接收和發送的“可延遲操作處理”,但此是並沒有將其加入到tasklet_vec[]或tasklet_hi_vec[]中:
[cpp] view
plain copy





tasklet_init(&instance->rx_channel.tasklet,
usbatm_rx_process, (unsigned long)instance);
tasklet_init(&instance->tx_channel.tasklet,
usbatm_tx_process, (unsigned long)instance);
在其發送接口usbatm_atm_send()函數調用tasklet_schedule()函數將所初始化的tasklet加入到當前cpu的tasklet_vec鏈表尾部,並嘗試調用do_softirq_irqoff()執行軟中斷TASKLET_SOFTIRQ:
[cpp] view
plain copy





static int usbatm_atm_send(struct atm_vcc *vcc, struct sk_buff *skb)
{
...
tasklet_schedule(&instance->tx_channel.tasklet);
...
}
在其斷開設備的接口usbatm_usb_disconnect()中調用tasklet_disable()函數和tasklet_enable()函數重新啟動其收發tasklet(具體原因不詳,這個地方可能就是由這個需要,暫時重啟收發tasklet):
[cpp] view
plain copy





void usbatm_usb_disconnect(struct usb_interface *intf)
{
...
tasklet_disable(&instance->rx_channel.tasklet);
tasklet_disable(&instance->tx_channel.tasklet);
...
tasklet_enable(&instance->rx_channel.tasklet);
tasklet_enable(&instance->tx_channel.tasklet);
...
}
在其銷毀接口usbatm_destroy_instance()中調用tasklet_kill()函數,強行將該tasklet踢出調度隊列。
從上述過程以及tasklet的設計可以看出,tasklet整體是這麼運行的:驅動應該在其硬中斷處理函數的末尾調用tasklet_schedule()接口激活該tasklet;內核經常調用do_softirq()執行軟中斷,通過softirq執行tasket,如下圖所示。圖中灰色部分為禁止硬中斷部分,為保護軟中斷pending位圖和tasklet_vec鏈表數組,count的改變均為原子操作,count確保SMP架構下同時只有一個CPU在執行該tasklet:
Copyright © Linux教程網 All Rights Reserved