中斷服務程式一般都是在中斷請求關閉的條件下執行的,以避免嵌套而使中斷控制複雜化。但是,中斷是一個随機事件,它随時會到來,如果關中斷的時間太長,CPU就不能及時響應其他的中斷請求,進而造成中斷的丢失。
是以,核心把中斷處理分為兩部分:上半部(top-half)和下半部(bottom-half),上半部 (就是中斷服務程式)核心立即執行,而下半部(就是一些核心函數)留着稍後處理。
首先:一個快速的“上半部”來處理硬體發出的請求,它必須在一個新的中斷産生之前終止。通常,除了在裝置和一些記憶體緩沖區(如果你的裝置用到了DMA,就不止這些)之間移動或傳送資料,确定硬體是否處于健全的狀态之外,這一部分做的工作很少。
第二:“下半部”運作時是允許中斷請求的,而上半部運作時是關中斷的,這是二者之間的主要差別。
核心到底什麼時候執行下半部,以何種方式組織下半部?
這就是我們要讨論的下半部實作機制,這種機制在核心的演變過程中不斷得到改進,在以前的核心中,這個機制叫做bottom-half(以下簡稱BH)。但是,Linux的這種bottom-half機制有兩個缺點:
1) 在任意一時刻,系統隻能有一個CPU可以執行BH代碼,以防止兩個或多個CPU同時來執行BH函數而互相幹擾。是以BH代碼的執行是嚴格“串行化”的。
2) BH函數不允許嵌套。
這兩個缺點在單CPU系統中是無關緊要的,但在SMP系統中卻是非常緻命的。因為BH機制的嚴格串行化執行顯然沒有充分利用SMP系統的多CPU特點。為此,在2.4以後的版本中有了新的發展和改進,改進的目标使下半部可以在多處理機上并行執行,并有助于驅動程式的開發者進行驅動程式的開發。下面主要介紹3種2.6核心中的“下半部”處理機制:
1) 軟中斷請求(softirq)機制
2) 小任務(tasklet)機制
3) 工作隊列機制
以上三種機制的比較如下圖所示:
Linux的softirq機制是與SMP緊密不可分的。為此,整個softirq機制的設計與實作中自始自終都貫徹了一個思想:“誰觸發,誰執行”(Who marks,Who runs),也即觸發軟中斷的那個CPU負責執行它所觸發的軟中斷,而且每個CPU都有它自己的軟中斷觸發與控制機制。這個設計思想也使得softirq機制充分利用了SMP系統的性能和特點。
Linux在include/linux/interrupt.h頭檔案中定義了資料結構softirq_action,來描述一個軟中斷請求,如下所示:
/* PLEASE, avoid to allocate new softirqs, if you need not _really_ high
frequency threaded job scheduling. For almost all the purposes
tasklets are more than enough. F.e. all serial device BHs et
al. should be converted to tasklets, not to softirqs.
*/
enum
{
HI_SOFTIRQ=0, //用于實作高優先級的軟中斷
TIMER_SOFTIRQ,
NET_TX_SOFTIRQ, // 用于網絡資料的發送
NET_RX_SOFTIRQ, // 用于網絡資料的接收
BLOCK_SOFTIRQ,
BLOCK_IOPOLL_SOFTIRQ,
TASKLET_SOFTIRQ, // 用于實作tasklet軟中斷
SCHED_SOFTIRQ,
HRTIMER_SOFTIRQ,
RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */
NR_SOFTIRQS
};
/* map softirq index to softirq name. update 'softirq_to_name' in
* kernel/softirq.c when adding a new softirq.
extern char *softirq_to_name[NR_SOFTIRQS];
/* softirq mask and active fields moved to irq_cpustat_t in
* asm/hardirq.h to get better cache usage. KAO
struct softirq_action
void (*action)(struct softirq_action *);
asmlinkage void do_softirq(void);
asmlinkage void __do_softirq(void);
其中,函數指針action指向軟中斷請求的服務函數。基于上述軟中斷描述符,Linux在kernel/softirq.c檔案中定義了一個全局的softirq_vec數組:
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;
在這裡系統一共定義了10個軟中斷請求描述符。軟中斷向量i(0≤i≤9)所對應的軟中斷請求描述符就是softirq_vec[i]。這個數組是個系統全局數組,即它被所有的CPU所共享。這裡需要注意的一點是:每個CPU雖然都有它自己的觸發和控制機制,并且隻執行他自己所觸發的軟中斷請求,但是各個CPU所執行的軟中斷服務例程卻是相同的,也即都是執行softirq_vec[ ]數組中定義的軟中斷服務函數。Linux在kernel/softirq.c中的相關代碼如下:
/*
- No shared variables, all the data are CPU local.
- If a softirq needs serialization, let it serialize itself
by its own spinlocks.
- Even if softirq is serialized, only local cpu is marked for
execution. Hence, we get something sort of weak cpu binding.
Though it is still not clear, will it result in better locality
or will not.
Examples:
- NET RX softirq. It is multithreaded and does not require
any global serialization.
- NET TX softirq. It kicks software netdevice queues, hence
it is logically serialized per device, but this serialization
is invisible to common code.
- Tasklets: serialized wrt itself.
#ifndef __ARCH_IRQ_STAT
irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;
EXPORT_SYMBOL(irq_stat);
#endif
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;
DEFINE_PER_CPU(struct task_struct *, ksoftirqd);
char *softirq_to_name[NR_SOFTIRQS] = {
"HI", "TIMER", "NET_TX", "NET_RX", "BLOCK", "BLOCK_IOPOLL",
"TASKLET", "SCHED", "HRTIMER", "RCU"
IPI: 處理器間的中斷(Inter-Processor Interrupts)
#define NR_IPI 6
typedef struct {
unsigned int __softirq_pending;
#ifdef CONFIG_LOCAL_TIMERS
unsigned int local_timer_irqs;
#ifdef CONFIG_SMP
unsigned int ipi_irqs[NR_IPI];
} ____cacheline_aligned irq_cpustat_t;
中斷處理的相關宏如下:
#define __inc_irq_stat(cpu, member) __IRQ_STAT(cpu, member)++
#define __get_irq_stat(cpu, member) __IRQ_STAT(cpu, member)
#define __IRQ_STAT(cpu, member) (irq_stat[cpu].member)
irq_cpustat_tirq_stat[NR_CPUS] ____cacheline_aligned;
1) NR_CPUS:為系統中CPU個數。
2) 這樣,每個CPU都隻操作它自己的中斷統計資訊結構。假設有一個編号為id的CPU,那麼它隻能操作它自己的中斷統計資訊結構irq_stat[id](0≤id≤NR_CPUS-1),進而使各CPU之間互不影響。
1) 觸發軟中斷函數:
void raise_softirq(unsigned int nr);// nr為中斷号
2) 設定軟中斷服務函數:
void open_softirq(int nr, void (*action)(struct softirq_action *)); // nr為中斷号, action為中斷處理函數
void __init softirq_init(void)
int cpu;
for_each_possible_cpu(cpu) {
int i;
per_cpu(tasklet_vec, cpu).tail =
&per_cpu(tasklet_vec, cpu).head;
per_cpu(tasklet_hi_vec, cpu).tail =
&per_cpu(tasklet_hi_vec, cpu).head;
for (i = 0; i < NR_SOFTIRQS; i++)
INIT_LIST_HEAD(&per_cpu(softirq_work_list[i], cpu));
}
register_hotcpu_notifier(&remote_softirq_cpu_notifier);
open_softirq(TASKLET_SOFTIRQ, tasklet_action); //設定軟中斷服務函數
open_softirq(HI_SOFTIRQ, tasklet_hi_action); //設定軟中斷服務函數
}
函數do_softirq()負責執行數組softirq_vec[i]中設定的軟中斷服務函數。每個CPU都是通過執行這個函數來執行軟中斷服務的。由于同一個CPU上的軟中斷服務例程不允許嵌套,是以,do_softirq()函數一開始就檢查目前CPU是否已經正出在中斷服務中,如果是則do_softirq()函數立即傳回。舉個例子,假設CPU0正在執行do_softirq()函數,執行過程産生了一個高優先級的硬體中斷,于是CPU0轉去執行這個高優先級中斷所對應的中斷服務程式。衆所周知,所有的中斷服務程式最後都要跳轉到do_IRQ()函數并由它來依次執行中斷服務隊列中的ISR,這裡我們假定這個高優先級中斷的ISR請求觸發了一次軟中斷,于是do_IRQ()函數在退出之前看到有軟中斷請求,進而調用do_softirq()函數來服務軟中斷請求。是以,CPU0再次進入do_softirq()函數(也即do_softirq()函數在CPU0上被重入了)。但是在這一次進入do_softirq()函數時,它馬上發現CPU0此前已經處在中斷服務狀态中了,是以這一次do_softirq()函數立即傳回。于是,CPU0回到該開始時的do_softirq()函數繼續執行,并為高優先級中斷的ISR所觸發的軟中斷請求補上一次服務。從這裡可以看出,do_softirq()函數在同一個CPU上的執行是串行的。
asmlinkage void do_softirq(void)
__u32 pending;
unsigned long flags;
if (in_interrupt())
return;
local_irq_save(flags);
pending = local_softirq_pending();
if (pending)
__do_softirq();
local_irq_restore(flags);
tasklet機制是一種較為特殊的軟中斷。
tasklet一詞的原意是“小片任務”的意思,這裡是指一小段可執行的代碼,且通常以函數的形式出現。軟中斷向量HI_SOFTIRQ和TASKLET_SOFTIRQ均是用tasklet機制來實作的。
從某種程度上講,tasklet機制是Linux核心對BH機制的一種擴充。在2.4核心引入了softirq機制後,原有的BH機制正是通過tasklet機制這個橋梁來将softirq機制納入整體架構中的。正是由于這種曆史的延伸關系,使得tasklet機制與一般意義上的軟中斷有所不同,而呈現出以下兩個顯著的特點:
1) 與一般的軟中斷不同,某一段tasklet代碼在某個時刻隻能在一個CPU上運作,而不像一般的軟中斷服務函數(即softirq_action結構中的action函數指針)那樣——在同一時刻可以被多個CPU并發地執行。
2) 與BH機制不同,不同的tasklet代碼在同一時刻可以在多個CPU上并發地執行,而不像BH機制那樣必須嚴格地串行化執行(也即在同一時刻系統中隻能有一個CPU執行BH函數)。
Linux用資料結構tasklet_struct來描述一個tasklet,每個結構代表一個獨立的小任務。該資料結構定義在include/linux/interrupt.h頭檔案中。如下所示:
/* Tasklets --- multithreaded analogue of BHs.
Main feature differing them of generic softirqs: tasklet
is running only on one CPU simultaneously.
Main feature differing them of BHs: different tasklets
may be run simultaneously on different CPUs.
Properties:
* If tasklet_schedule() is called, then tasklet is guaranteed
to be executed on some cpu at least once after this.
* If the tasklet is already scheduled, but its execution is still not
started, it will be executed only once.
* If this tasklet is already running on another CPU (or schedule is called
from tasklet itself), it is rescheduled for later.
* Tasklet is strictly serialized wrt itself, but not
wrt another tasklets. If client needs some intertask synchronization,
he makes it with spinlocks.
struct tasklet_struct
struct tasklet_struct *next;
unsigned long state;
atomic_t count;
void (*func)(unsigned long);
unsigned long data;
• next: 指向下一個tasklet的指針
• state: 定義了這個tasklet的目前狀态。這一個32位的無符号長整數,目前隻使用了bit[1]和bit[0]兩個狀态位。其中,bit[1]=1 表示這個tasklet目前正在某個CPU上被執行,它僅對SMP系統才有意義,其作用就是為了防止多個CPU同時執行一個tasklet的情形出現;bit[0]=1表示這個tasklet已經被排程去等待執行了。
對這兩個狀态位的宏定義如下所示(interrupt.h):
TASKLET_STATE_SCHED, /* Tasklet is scheduled for execution */
TASKLET_STATE_RUN /* Tasklet is running (SMP only) */
• count: 子計數count,對這個tasklet的引用計數值。
注:隻有當count等于0時,tasklet代碼段才能執行,也即此時tasklet是被使能的;如果count非零,則這個tasklet是被禁止的。任何想要執行一個tasklet代碼段的人都首先必須先檢查其count成員是否為0。
• func:指向以函數形式表現的可執行tasklet代碼段。
• data:函數func的參數。這是一個32位的無符号整數,其具體含義可供func函數自行解釋,比如将其解釋成一個指向某個使用者自定義資料結構的位址值。
Linux在interrupt.h頭檔案中又定義了兩個用來定義tasklet_struct結構變量的輔助宏:
#define DECLARE_TASKLET(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }
#define DECLARE_TASKLET_DISABLED(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }
顯然,從上述源代碼可以看出,用DECLARE_TASKLET宏定義的tasklet在初始化時是被使能的(enabled),因為其count成員為0。而用DECLARE_TASKLET_DISABLED宏定義的tasklet在初始時是被禁止的(disabled),因為其count等于1。
在這裡,tasklet狀态指兩個方面:
1) state:成員所表示的運作狀态;
2) count:成員決定的使能/禁止狀态。
state成員中的bit[0]表示一個tasklet是否已被排程去等待執行,bit[1]表示一個tasklet是否正在某個CPU上執行。對于state變量中某位的改變必須是一個原子操作,是以可以用定義在include/asm/bitops.h頭檔案中的位操作來進行。
由于bit[1]這一位(即TASKLET_STATE_RUN)僅僅對于SMP系統才有意義,是以Linux在Interrupt.h頭檔案中顯示地定義了對TASKLET_STATE_RUN位的操作。如下所示:
static inline int tasklet_trylock(struct tasklet_struct *t)
return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
static inline void tasklet_unlock(struct tasklet_struct *t)
smp_mb__before_clear_bit();
clear_bit(TASKLET_STATE_RUN, &(t)->state);
static inline void tasklet_unlock_wait(struct tasklet_struct *t)
while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
#else
#define tasklet_trylock(t) 1
#define tasklet_unlock_wait(t) do { } while (0)
#define tasklet_unlock(t) do { } while (0)
顯然,在SMP系統同,tasklet_trylock()宏将把一個tasklet_struct結構變量中的state成員中的bit[1]位設定成1,同時還傳回bit[1]位的非。是以,如果bit[1]位原有值為1(表示另外一個CPU正在執行這個tasklet代碼),那麼tasklet_trylock()宏将傳回值0,也就表示上鎖不成功。如果bit[1]位的原有值為0,那麼tasklet_trylock()宏将傳回值1,表示加鎖成功。而在單CPU系統中,tasklet_trylock()宏總是傳回為1。
任何想要執行某個tasklet代碼的程式都必須首先調用宏tasklet_trylock()來試圖對這個tasklet進行上鎖(即設定TASKLET_STATE_RUN位),且隻能在上鎖成功的情況下才能執行這個tasklet。建議!即使你的程式隻在CPU系統上運作,你也要在執行tasklet之前調用tasklet_trylock()宏,以便使你的代碼獲得良好可移植性。
宏tasklet_unlock()用來對一個tasklet進行解鎖操作,也即将TASKLET_STATE_RUN位清零。在單CPU系統中,這是一個空操作。
使能與禁止操作往往總是成對地被調用的,tasklet_disable()函數如下(interrupt.h):
static inline void tasklet_disable(struct tasklet_struct *t)
tasklet_disable_nosync(t);
tasklet_unlock_wait(t);
smp_mb();
函數tasklet_disable_nosync()也是一個靜态inline函數,它簡單地通過原子操作将count成員變量的值減1。如下所示(interrupt.h):
static inline void tasklet_disable_nosync(struct tasklet_struct *t)
atomic_inc(&t->count);
smp_mb__after_atomic_inc();
函數tasklet_enable()用于使能一個tasklet,如下所示(interrupt.h):
static inline void tasklet_enable(struct tasklet_struct *t)
smp_mb__before_atomic_dec();
atomic_dec(&t->count);
函數tasklet_init()用來初始化一個指定的tasklet描述符,其源碼如下所示(kernel/softirq.c):
void tasklet_init(struct tasklet_struct *t,
void (*func)(unsigned long), unsigned long data)
t->next = NULL;
t->state = 0;
atomic_set(&t->count, 0);
t->func = func;
t->data = data;
函數tasklet_kill()用來将一個已經被排程了的tasklet殺死,即将其恢複到未排程的狀态。其源碼如下所示(kernel/softirq.c):
void tasklet_kill(struct tasklet_struct *t)
printk("Attempt to kill tasklet from interrupt\n");
while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
do {
yield();
} while (test_bit(TASKLET_STATE_SCHED, &t->state));
clear_bit(TASKLET_STATE_SCHED, &t->state);
多個tasklet可以通過tasklet描述符中的next成員指針連結成一個單向對列。為此,Linux專門在頭檔案include/linux/interrupt.h中定義了資料結構tasklet_head來描述一個tasklet對列的頭部指針。如下所示:
* Tasklets
struct tasklet_head
struct tasklet_struct *head;
struct tasklet_struct **tail;
盡管tasklet機制是特定于軟中斷向量HI_SOFTIRQ和TASKLET_SOFTIRQ的一種實作,但是tasklet機制仍然屬于softirq機制的整體架構範圍内的,是以,它的設計與實作仍然必須堅持“誰觸發,誰執行”的思想。為此,Linux為系統中的每一個CPU都定義了一個tasklet對列頭部,來表示應該有各個CPU負責執行的tasklet對列。如下所示(kernel/softirq.c):
#define DEFINE_PER_CPU_SECTION(type, name, sec) \
__PCPU_ATTRS(sec) PER_CPU_DEF_ATTRIBUTES \
__typeof__(type) name
#define DEFINE_PER_CPU(type, name) \
DEFINE_PER_CPU_SECTION(type, name, "")
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);
即:struct tasklet_head tasklet_vec[NR_CPUS] __cacheline_aligned;
struct tasklet_head tasklet_hi_vec[NR_CPUS] __cacheline_aligned;
其中,tasklet_vec[]數組用于軟中斷向量TASKLET_SOFTIRQ,而tasklet_hi_vec[]數組則用于軟中斷向量HI_SOFTIRQ。也即,如果CPUi(0≤i≤NR_CPUS-1)觸發了軟中斷向量TASKLET_SOFTIRQ,那麼對列tasklet_vec[i]中的每一個tasklet都将在CPUi服務于軟中斷向量TASKLET_SOFTIRQ時被CPUi所執行。同樣地,如果CPUi(0≤i≤NR_CPUS-1)觸發了軟中斷向量HI_SOFTIRQ,那麼隊列tasklet_hi_vec[i]中的每一個tasklet都将CPUi在對軟中斷向量HI_SOFTIRQ進行服務時被CPUi所執行。
隊列tasklet_vec[I]和tasklet_hi_vec[I]中的各個tasklet是怎樣被所CPUi所執行的呢?其關鍵就是軟中斷向量TASKLET_SOFTIRQ和HI_SOFTIRQ的軟中斷服務程式——tasklet_action()函數和tasklet_hi_action()函數。下面我們就來分析這兩個函數。
Linux為軟中斷向量TASKLET_SOFTIRQ和HI_SOFTIRQ實作了專用的觸發函數和軟中斷服務函數。
• 專用的觸發函數
tasklet_schedule()函數和tasklet_hi_schedule()函數分别用來在目前CPU上觸發軟中斷向量TASKLET_SOFTIRQ和HI_SOFTIRQ,并把指定的tasklet加入目前CPU所對應的tasklet隊列中去等待執行。
• 專用的軟中斷服務函數
tasklet_action()函數和tasklet_hi_action()函數則分别是軟中斷向量TASKLET_SOFTIRQ和HI_SOFTIRQ的軟中斷服務函數。在初始化函數softirq_init()中,這兩個軟中斷向量對應的描述符softirq_vec[0]和softirq_vec[6]中的action函數指針就被分别初始化成指向函數tasklet_hi_action()和函數tasklet_action()。
該函數實作在include/linux/interrupt.h頭檔案中,是一個inline函數。其源碼如下所示:
void __tasklet_schedule(struct tasklet_struct *t)
*__this_cpu_read(tasklet_vec.tail) = t;
__this_cpu_write(tasklet_vec.tail, &(t->next));
raise_softirq_irqoff(TASKLET_SOFTIRQ); // 觸發軟中斷TASKLET_SOFTIRQ
static inline void tasklet_schedule(struct tasklet_struct *t)
if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
__tasklet_schedule(t);
• 調用test_and_set_bit()函數将待排程的tasklet的state成員變量的bit[0]位(也即TASKLET_STATE_SCHED位)設定為1,該函數同時還傳回TASKLET_STATE_SCHED位的原有值。是以如果bit[0]為的原有值已經為1,那就說明這個tasklet已經被排程到另一個CPU上去等待執行了。由于一個tasklet在某一個時刻隻能由一個CPU來執行,是以tasklet_schedule()函數什麼也不做就直接傳回了。否則,就繼續下面的排程操作。
• 首先,調用local_irq_save()函數來關閉目前CPU的中斷,以保證下面的步驟在目前CPU上原子地被執行。
• 然後,将待排程的tasklet添加到目前CPU對應的tasklet隊列的尾部。
• 接着,調用raise_softirq_irqoff函數在目前CPU上觸發軟中斷請求TASKLET_SOFTIRQ。
• 最後,調用local_irq_restore()函數來開目前CPU的中斷。
函數tasklet_action()是tasklet機制與軟中斷向量TASKLET_SOFTIRQ的聯系紐帶。正是該函數将目前CPU的tasklet隊列中的各個tasklet放到目前CPU上來執行的。該函數實作在kernel/softirq.c檔案中,其源代碼如下:
static void tasklet_action(struct softirq_action *a)
struct tasklet_struct *list;
local_irq_disable();
list = __this_cpu_read(tasklet_vec.head);
__this_cpu_write(tasklet_vec.head, NULL);
__this_cpu_write(tasklet_vec.tail, &__get_cpu_var(tasklet_vec).head);
local_irq_enable();
while (list) {
struct tasklet_struct *t = list;
list = list->next;
if (tasklet_trylock(t)) {
if (!atomic_read(&t->count)) {
if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
BUG();
t->func(t->data);
tasklet_unlock(t);
continue;
}
tasklet_unlock(t);
}
local_irq_disable();
t->next = NULL;
*__this_cpu_read(tasklet_vec.tail) = t;
__this_cpu_write(tasklet_vec.tail, &(t->next));
__raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_enable();
• 首先,在目前CPU關中斷的情況下,“原子”地讀取目前CPU的tasklet隊列頭部指針,将其儲存到局部變量list指針中,然後将目前CPU的tasklet隊列頭部指針設定為NULL,以表示理論上目前CPU将不再有tasklet需要執行(但最後的實際結果卻并不一定如此,下面将會看到)。
• 然後,用一個while{}循環來周遊由list所指向的tasklet隊列,隊列中的各個元素就是将在目前CPU上執行的tasklet。循環體的執行步驟如下:
• 用指針t來表示目前隊列元素,即目前需要執行的tasklet。
• 更新list指針為list->next,使它指向下一個要執行的tasklet。
• 用tasklet_trylock()宏試圖對目前要執行的tasklet(由指針t所指向)進行加鎖,如果加鎖成功(目前沒有任何其他CPU正在執行這個tasklet),則用原子讀函數atomic_read()進一步判斷count成員的值。如果count為0,說明這個tasklet是允許執行的,于是:
(1) 先清除TASKLET_STATE_SCHED位;
(2) 然後,調用這個tasklet的可執行函數func;
(3) 調用宏tasklet_unlock()來清除TASKLET_STATE_RUN位
(4) 最後,執行continue語句跳過下面的步驟,回到while循環繼續周遊隊列中的下一個元素。如果count不為0,說明這個tasklet是禁止運作的,于是調用tasklet_unlock()清除前面用tasklet_trylock()設定的TASKLET_STATE_RUN位。
1) 聲明和使用小任務大多數情況下,為了控制一個常用的硬體裝置,小任務機制是實作下半部的最佳選擇。小任務可以動态建立,使用友善,執行起來也比較快。我們既可以靜态地建立小任務,也可以動态地建立它。選擇那種方式取決于到底是想要對小任務進行直接引用還是一個間接引用。如果準備靜态地建立一個小任務(也就是對它直接引用),使用下面兩個宏中的一個:
DECLARE_TASKLET(name,func, data)
DECLARE_TASKLET_DISABLED(name,func, data)
這兩個宏都能根據給定的名字靜态地建立一個tasklet_struct結構。當該小任務被排程以後,給定的函數func會被執行,它的參數由data給出。這兩個宏之間的差別在于引用計數器的初始值設定不同。第一個宏把建立的小任務的引用計數器設定為0,是以,該小任務處于激活狀态。另一個把引用計數器設定為1,是以該小任務處于禁止狀态。例如:
DECLARE_TASKLET(my_tasklet,my_tasklet_handler, dev);
這行代碼其實等價于
struct tasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0),
tasklet_handler,dev};
這樣就建立了一個名為my_tasklet的小任務,其處理程式為tasklet_handler,并且已被激活。當處理程式被調用的時候,dev就會被傳遞給它。
2) 編寫自己的小任務處理程式小任務處理程式必須符合如下的函數類型:
void tasklet_handler(unsigned long data)
由于小任務不能睡眠,是以不能在小任務中使用信号量或者其它産生阻塞的函數。但是小任務運作時可以響應中斷。
3) 排程自己的小任務通過調用tasklet_schedule()函數并傳遞給它相應的tasklt_struct指針,該小任務就會被排程以便适當的時候執行:
tasklet_schedule(&my_tasklet); /*把my_tasklet标記為挂起 */
在小任務被排程以後,隻要有機會它就會盡可能早的運作。在它還沒有得到運作機會之前,如果一個相同的小任務又被排程了,那麼它仍然隻會運作一次。
可以調用tasklet_disable()函數來禁止某個指定的小任務。如果該小任務目前正在執行,這個函數會等到它執行完畢再傳回。調用tasklet_enable()函數可以激活一個小任務,如果希望把以DECLARE_TASKLET_DISABLED()建立的小任務激活,也得調用這個函數,如:
tasklet_disable(&my_tasklet); /*小任務現在被禁止,這個小任務不能運作*/
tasklet_enable(&my_tasklet); /* 小任務現在被激活*/
也可以調用tasklet_kill()函數從挂起的隊列中去掉一個小任務。該函數的參數是一個指向某個小任務的tasklet_struct的長指針。在小任務重新排程它自身的時候,從挂起的隊列中移去已排程的小任務會很有用。這個函數首先等待該小任務執行完畢,然後再将它移去。
4.tasklet的簡單用法
下面是tasklet的一個簡單應用,以子產品的形成加載。
#include <linux/module.h>
#include <linux/init.h>
#include <linux/fs.h>
#include <linux/kdev_t.h>
#include <linux/cdev.h>
#include <linux/kernel.h>
#include <linux/interrupt.h>
static struct t asklet_struct my_tasklet;
static void tasklet_handler (unsigned long d ata)
printk(KERN_ALERT,"tasklet_handler is running./n");
static int __init test_init(void)
tasklet_init(&my_tasklet,tasklet_handler,0);
tasklet_schedule(&my_tasklet);
return0;
static void __exit test_exit(void)
tasklet_kill(&tasklet);
printk(KERN_ALERT,"test_exit is running./n");
MODULE_LICENSE("GPL");
module_init(test_init);
module_exit(test_exit);
從這個例子可以看出,所謂的小任務機制是為下半部函數的執行提供了一種執行機制,也就是說,推遲處理的事情是由tasklet_handler實作,何時執行,經由小任務機制封裝後交給核心去處理。
工作隊列(work queue)是另外一種将工作推後執行的形式,它和前面讨論的tasklet有所不同。工作隊列可以把工作推後,交由一個核心線程去執行,也就是說,這個下半部分可以在程序上下文中執行。這樣,通過工作隊列執行的代碼能占盡程序上下文的所有優勢。最重要的就是工作隊列允許被重新排程甚至是睡眠。
那麼,什麼情況下使用工作隊列,什麼情況下使用tasklet。如果推後執行的任務需要睡眠,那麼就選擇工作隊列;如果推後執行的任務不需要睡眠,那麼就選擇tasklet。另外,如果需要用一個可以重新排程的實體來執行你的下半部處理,也應該使用工作隊列。它是唯一能在程序上下文運作的下半部實作的機制,也隻有它才可以睡眠。這意味着在需要獲得大量的記憶體時、在需要擷取信号量時,在需要執行阻塞式的I/O操作時,它都會非常有用。如果不需要用一個核心線程來推後執行工作,那麼就考慮使用tasklet。
如前所述,我們把推後執行的任務叫做工作(work),描述它的資料結構為work_struct,這些工作以隊列結構組織成工作隊列(workqueue),其資料結構為workqueue_struct,而工作線程就是負責執行工作隊列中的工作。系統預設的工作者線程為events,自己也可以建立自己的工作者線程。表示工作的資料結構用<linux/workqueue.h>中定義的work_struct結構表示:
typedef void (*work_func_t)(struct work_struct *work);
struct work_struct {
atomic_long_t data;
struct list_head entry; // 連接配接所有工作的連結清單
work_func_t func; // 要執行的函數
#ifdef CONFIG_LOCKDEP
struct lockdep_map lockdep_map;
這些結構被連接配接成連結清單。當一個工作者線程被喚醒時,它會執行它的連結清單上的所有工作。工作被執行完畢,它就将相應的work_struct對象從連結清單上移去。當連結清單上不再有對象的時候,它就會繼續休眠。
表示工作隊列的資料結構用<kernel/workqueue.c>中定義的workqueue_struct:
* The externally visible workqueue abstraction is an array of
* per-CPU workqueues:
struct workqueue_struct {
unsigned int flags; /* I: WQ_* flags */
union {
struct cpu_workqueue_struct __percpu *pcpu;
struct cpu_workqueue_struct *single;
unsigned long v;
} cpu_wq; /* I: cwq's */
struct list_head list; /* W: list of all workqueues */
struct mutex flush_mutex; /* protects wq flushing */
int work_color; /* F: current work color */
int flush_color; /* F: current flush color */
atomic_t nr_cwqs_to_flush; /* flush in progress */
struct wq_flusher *first_flusher; /* F: first flusher */
struct list_head flusher_queue; /* F: flush waiters */
struct list_head flusher_overflow; /* F: flush overflow list */
mayday_mask_t mayday_mask; /* cpus requesting rescue */
struct worker *rescuer; /* I: rescue worker */
int saved_max_active; /* W: saved cwq max_active */
const char *name; /* I: workqueue name */
struct lockdep_map lockdep_map;
要使用工作隊列,首先要做的是建立一些需要推後完成的工作。可以通過DECLARE_WORK在編譯時靜态地建該結構:
DECLARE_WORK(name, func);
其定義如下:
#define DECLARE_WORK(n, f) \
struct work_struct n = __WORK_INITIALIZER(n, f)
舉例如下:
static void do_poweroff(struct work_struct *dummy)
kernel_power_off();
static DECLARE_WORK(poweroff_work, do_poweroff);
即建立了一個全局靜态變量:static work_struct poweroff_work,且被初始化了,其執行函數為do_poweroff。
先定義一具struct work_struct 變量,在需要使用時調用INIT_WORK進行初始化,然後便可以使用。
#define INIT_WORK(_work, _func) \
do { \
__INIT_WORK((_work), (_func), 0); \
} while (0)
舉例如下:
void __cfg80211_scan_done(struct work_struct *wk)
struct cfg80211_registered_device *rdev;
rdev = container_of(wk, struct cfg80211_registered_device,
scan_done_wk);
cfg80211_lock_rdev(rdev);
___cfg80211_scan_done(rdev, false);
cfg80211_unlock_rdev(rdev);
struct cfg80211_registered_device {
struct work_struct scan_done_wk;
struct work_struct sched_scan_results_wk;
struct work_struct conn_work;
struct work_struct event_work;
struct cfg80211_wowlan *wowlan;
struct cfg80211_registered_device *rdev;
rdev = kzalloc(alloc_size, GFP_KERNEL);
INIT_WORK(&rdev->scan_done_wk, __cfg80211_scan_done); // 其執行函數為: __cfg80211_scan_done
INIT_WORK(&rdev->sched_scan_results_wk, __cfg80211_sched_scan_results);
現在工作已經被建立,我們可以排程它了。想要把給定工作的待處理函數送出給預設的events工作線程,隻需調用: int schedule_work(struct work_struct *work);
它把work放入全局工作隊列:system_wq,其定義如下:
struct workqueue_struct *system_wq __read_mostly;
int schedule_work(struct work_struct *work)
return queue_work(system_wq, work);
queue_work:把一個工作放入工作隊列:
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
int ret;
ret = queue_work_on(get_cpu(), wq, work);
put_cpu();
return ret;
把work放入工作隊列,work馬上就會被排程,一旦其所在的處理器上的工作者線程被喚醒,它就會被執行。有時候并不希望工作馬上就被執行,而是希望它經過一段延遲以後再執行。在這種情況下,可以排程它在指定的時間執行:
struct delayed_work {
struct work_struct work;
struct timer_list timer;
int schedule_delayed_work(struct delayed_work *dwork, unsigned long delay/*jiffies*/)
return queue_delayed_work(system_wq, dwork, delay);
#define DECLARE_DELAYED_WORK(n, f) \
struct delayed_work n = __DELAYED_WORK_INITIALIZER(n, f)
#define INIT_DELAYED_WORK(_work, _func) \
INIT_WORK(&(_work)->work, (_func)); \
init_timer(&(_work)->timer); \
工作放入工作隊列之後,由管理此工作隊列的工作者來執行這些work,通過alloc_workqueue或create_singlethread_workqueue來建立工作者線程,它最後調用kthread_create建立線程,其線程名為alloc_workqueue中指定的name,其舉例如下:
static int __init init_workqueues(void)
unsigned int cpu;
int i;
...
system_wq = alloc_workqueue("events", 0, 0);
system_long_wq = alloc_workqueue("events_long", 0, 0);
system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
WQ_UNBOUND_MAX_ACTIVE);
system_freezable_wq = alloc_workqueue("events_freezable",
WQ_FREEZABLE, 0);
BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq ||
!system_unbound_wq || !system_freezable_wq);
return 0;
#define create_singlethread_workqueue(name) \
alloc_workqueue((name), WQ_UNBOUND | WQ_MEM_RECLAIM, 1)
#define alloc_workqueue(name, flags, max_active) \
__alloc_workqueue_key((name), (flags), (max_active), NULL, NULL)
如:cfg80211_wq = create_singlethread_workqueue("cfg80211");建立了一個名為cfg80211的kernel線程。