FreeRTOS 隊列分析
- FreeRTOS 隊列分析
-
- 隊列建立函數
- 入隊
-
- xQueueGenericSend()
- xQueueGenericSendFromISR ()
- 出隊
文章由 FreeRTOS 系列部落格整理而來,僅為學習記錄,如有不妥,請告知。
FreeRTOS 隊列分析
FreeRTOS 提供了多種任務間通訊方式,包括:
- 任務通知(版本V8.2以及以上版本)
- 隊列
- 二進制信号量
- 計數信号量
- 互斥量
- 遞歸互斥量
其中,二進制信号量、計數信号量、互斥量和遞歸互斥量都是使用隊列來實作的,是以掌握隊列的運作機制,是很有必要的。
隊列是 FreeRTOS 主要的任務間通訊方式。可以在任務與任務間、中斷和任務間傳送資訊。發送到隊列的消息是通過拷貝實作的,這意味着隊列存儲的資料是原資料,而不是原資料的引用。先看一下隊列的資料結構:
typedef struct QueueDefinition
{
int8_t *pcHead; /* 指向隊列存儲區起始位置,即第一個隊列項 */
int8_t *pcTail; /* 指向隊列存儲區結束後的下一個位元組 */
int8_t *pcWriteTo; /* 指向下隊列存儲區的下一個空閑位置 */
union /* 使用聯合體用來確定兩個互斥的結構體成員不會同時出現 */
{
int8_t *pcReadFrom; /* 當結構體用于隊列時,這個字段指向出隊項目中的最後一個. */
UBaseType_t uxRecursiveCallCount;/* 當結構體用于互斥量時,用作計數器,儲存遞歸互斥量被"擷取"的次數. */
} u;
List_t xTasksWaitingToSend; /* 因為等待入隊而阻塞的任務清單,按照優先級順序存儲 */
List_t xTasksWaitingToReceive; /* 因為等待隊列項而阻塞的任務清單,按照優先級順序存儲 */
volatile UBaseType_t uxMessagesWaiting;/*< 目前隊列的隊列項數目 */
UBaseType_t uxLength; /* 隊列項的數目 */
UBaseType_t uxItemSize; /* 每個隊列項的大小 */
volatile BaseType_t xRxLock; /* 隊列上鎖後,存儲從隊列收到的清單項數目,如果隊列沒有上鎖,設定為queueUNLOCKED */
volatile BaseType_t xTxLock; /* 隊列上鎖後,存儲發送到隊列的清單項數目,如果隊列沒有上鎖,設定為queueUNLOCKED */
#if ( configUSE_QUEUE_SETS == 1 )
struct QueueDefinition *pxQueueSetContainer;
#endif
#if ( configUSE_TRACE_FACILITY == 1 )
UBaseType_t uxQueueNumber;
uint8_t ucQueueType;
#endif
#if ( configSUPPORT_STATIC_ALLOCATION == 1 )
uint8_t ucStaticAllocationFlags;
#endif
} xQUEUE;
typedef xQUEUE Queue_t;
下面的所有 API 函數都是圍繞這個資料結構展開,是以資料結構的每個成員都需要了解。如果你是第一次看這篇文章,即使有注釋,可能你對結構體的某些成員還是不了解,不要着急,這是正常的。後面介紹 API 函數的時候,會一一使用這些成員,結合着具體執行個體,會很容了解的,你需要做的,是要反複翻到這裡檢視。
隊列建立函數
在《FreeRTOS(12)—FreeRTOS 信号量API函數》一文中,我們介紹了建立隊列 API 函數
xQueueCreate()
,但其實這是一個宏,隻是定義的像函數而已。真正被執行的函數是
xQueueGenericCreate()
,我們稱這個函數為通用隊列建立函數。 我們來分析一下
xQueueGenericCreate()
函數,函數原型為:
QueueHandle_t xQueueGenericCreate
(
const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
uint8_t *pucQueueStorage,
StaticQueue_t *pxStaticQueue,
const uint8_t ucQueueType
)
- uxQueueLength:隊列項數目
- uxItemSize:每個隊列項的大小
- pucQueueStorage:使用靜态配置設定隊列時才使用,指向定義隊列存儲空間,如果使用動态配置設定隊列空間(預設),向這個參數傳遞NULL。
- pxStaticQueue:使用靜态配置設定隊列時才使用,指向隊列控制結構體,如果使用動态配置設定隊列空間(預設),向這個參數傳遞NULL。
- ucQueueType:類型。可能的值為:
- queueQUEUE_TYPE_BASE:表示隊列
- queueQUEUE_TYPE_SET:表示隊列集合
- queueQUEUE_TYPE_MUTEX:表示互斥量
- queueQUEUE_TYPE_COUNTING_SEMAPHORE:表示計數信号量
- queueQUEUE_TYPE_BINARY_SEMAPHORE:表示二進制信号量
- queueQUEUE_TYPE_RECURSIVE_MUTEX :表示遞歸互斥量
然而,等下我們看源碼,就會看到,在
xQueueGenericCreate()
函數中,參數 ucQueueType 隻是用來可視化跟蹤調試用。
xQueueGenericCreate()
函數的源碼如下所示:
QueueHandle_t xQueueGenericCreate(
const UBaseType_t uxQueueLength,
const UBaseType_t uxItemSize,
uint8_t *pucQueueStorage,
StaticQueue_t *pxStaticQueue,
const uint8_t ucQueueType )
{
Queue_t *pxNewQueue;
/* 如果使能可視化跟蹤調試,這裡用來消除編譯器警告. */
( void ) ucQueueType;
/*配置設定隊列結構體和隊列項存儲空間.可以靜态也可以動态配置設定,取決于參數值,FreeRTOS預設采取動态配置設定 */
pxNewQueue = prvAllocateQueueMemory( uxQueueLength, uxItemSize, &pucQueueStorage, pxStaticQueue );
if( pxNewQueue != NULL )
{
if( uxItemSize == ( UBaseType_t ) 0 )
{
/* 沒有為隊列項存儲配置設定記憶體,但是pcHead指針不能設定為NULL,因為隊列用作互斥量時,pcHead要設定成NULL.這裡隻是将pcHead指向一個已知的區域 */
pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;
}
else
{
/* 指向隊列項存儲區域*/
pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;
}
/* 初始化隊列結構體成員*/
pxNewQueue->uxLength = uxQueueLength;
pxNewQueue->uxItemSize = uxItemSize;
( void ) xQueueGenericReset( pxNewQueue, pdTRUE );
#if ( configUSE_TRACE_FACILITY == 1 )
{
pxNewQueue->ucQueueType = ucQueueType;
}
#endif /* configUSE_TRACE_FACILITY */
traceQUEUE_CREATE( pxNewQueue );
}
return ( QueueHandle_t ) pxNewQueue;
}
我們以預設的動态配置設定隊列存儲空間方式講述一下隊列建立過程。首先調用函數 prvAllocateQueueMemory 配置設定隊列結構體和隊列項存儲空間,結構體和隊列項在存儲空間上是連續的,如圖1-1所示。
圖1-1:為隊列配置設定的記憶體
如果隊列記憶體申請成功,接下來會初始化隊列結構體成員,先是 pcHead 成員,然後是 uxLength 和 uxItemSize 成員,最後調用函數
xQueueGenericReset()
初始化剩下的結構體成員。
假設我們申請了3個隊列項,每個隊列項占用4位元組存儲空間(即uxLength=3、uxItemSize=4),則經過初始化後的隊列記憶體如圖1-2所示。(這個圖形象的描述了隊列結構體的大部分成員的作用)。
圖1-2:初始化後的隊列項記憶體
入隊
隊列項入隊也稱為投遞(Send),分為帶中斷保護的入隊操作和不帶中斷保護的入隊操作。每種情況下又分為從隊列尾部入隊和從隊列首部入隊兩種操作,從隊列尾部入隊還有一種特殊情況,覆寫式入隊,即隊列滿後自動覆寫最舊的隊列項。如表2-1所示。
表2-1:入隊API接口清單
xQueueGenericSend()
這個函數用于入隊操作,絕不可以用在中斷服務程式中。根據參數的不同,可以從隊列尾入隊、從隊列首入隊和覆寫式入隊。覆寫式入隊用于隻有一個隊列項的場合,入隊時如果隊列已滿,則将之前的隊列項覆寫掉。函數原型為:
BaseType_t xQueueGenericSend
(
QueueHandle_t xQueue,
const void * const pvItemToQueue,
TickType_t xTicksToWait,
const BaseType_t xCopyPosition
)
- xQueue:隊列句柄
- pvItemToQueue:指針,指向要入隊的項目
- xTicksToWait:如果隊列滿,等待隊列空閑的最大時間,如果隊列滿并且 xTicksToWait 被設定成0,函數立刻傳回。時間機關為系統節拍時鐘周期,宏
可以用來輔助計算真實延時值。如果portTICK_PERIOD_MS
設定成1,并且指定延時為INCLUDE_vTaskSuspend
将引起任務無限阻塞(沒有逾時)。portMAX_DELAY
- xCopyPosition:入隊位置,可以選擇從隊列尾入隊、從隊列首入隊和覆寫式入隊。
這個函數為了獲得最高效率而放寬了編碼标準:有多個傳回點。是以如果純粹以文字方式來講解,我覺得很難達到好的效果,是以我首先給出整理後的源碼(去除調試和隊列集合有關代碼),然後畫出流程圖,對函數的關鍵點做重點描述。
整理後的源碼:
BaseType_t xQueueGenericSend(
QueueHandle_t xQueue,
const void * const pvItemToQueue,
TickType_t xTicksToWait,
const BaseType_t xCopyPosition )
{
BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;
TimeOut_t xTimeOut;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;
for( ;; )
{
taskENTER_CRITICAL();
{
/* 隊列還有空閑?正在運作的任務一定要比等待通路隊列的任務優先級高.如果使用覆寫式入隊,則不需要關注隊列是否滿*/
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{
/*完成資料拷貝工作,分為從隊列尾入隊,從隊列首入隊和覆寫式入隊*/
xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );
/* 如果有任務在此等待隊列資料到來,則将該任務解除阻塞*/
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
/*有任務因等待出隊而阻塞,則将任務從隊列等待接收清單中删除,然後加入到就緒清單*/
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
/* 解除阻塞的任務有更高的優先級,則目前任務要讓出CPU,是以觸發一個上下文切換.又因為現在還在臨界區,要等退出臨界區後,才會執行上下文切換.*/
queueYIELD_IF_USING_PREEMPTION();
}
}
else if( xYieldRequired != pdFALSE )
{
/* 這個分支處理特殊情況*/
queueYIELD_IF_USING_PREEMPTION();
}
taskEXIT_CRITICAL();
return pdPASS;
}
else
{
if( xTicksToWait == ( TickType_t ) 0 )
{
/* 如果隊列滿并且沒有設定逾時,則直接退出 */
taskEXIT_CRITICAL();
/* 傳回隊列滿錯誤碼 */
return errQUEUE_FULL;
}
else if( xEntryTimeSet == pdFALSE )
{
/* 隊列滿并且規定了阻塞時間,是以需要配置逾時結構體對象 */
vTaskSetTimeOutState( &xTimeOut );
xEntryTimeSet = pdTRUE;
}
}
}
taskEXIT_CRITICAL();
/* 退出臨界區,至此,中斷和其它任務可以向這個隊列執行入隊(投遞)或出隊(讀取)操作.因為隊列滿,任務無法入隊,下面的代碼将目前任務将阻塞在這個隊列上,在這段代碼執行過程中我們需要挂起排程器,防止其它任務操作隊列事件清單;挂起排程器雖然可以禁止其它任務操作這個隊列,但并不能阻止中斷服務程式操作這個隊列,是以還需要将隊列上鎖,防止中斷程式讀取隊列後,使阻塞在出隊操作其它任務解除阻塞,執行上下文切換(因為排程器挂起後,不允許執行上下文切換) */
vTaskSuspendAll();
prvLockQueue( pxQueue );
/* 檢視任務的逾時時間是否到期 */
if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE )
{
if( prvIsQueueFull( pxQueue ) != pdFALSE )
{
/*逾時時間未到期,并且隊列仍然滿*/
vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToSend ), xTicksToWait );
/* 解除隊列鎖,如果有任務要解除阻塞,則将任務移到挂起就緒清單中(因為目前排程器挂起,是以不能移到就緒清單)*/
prvUnlockQueue( pxQueue );
/* 恢複排程器,将任務從挂起就緒清單移到就緒清單中*/
if( xTaskResumeAll() == pdFALSE )
{
portYIELD_WITHIN_API();
}
}
else
{
/* 隊列有空閑,重試 */
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
}
}
else
{
/* 逾時時間到期,傳回隊列滿錯誤碼*/
prvUnlockQueue( pxQueue );
( void ) xTaskResumeAll();
traceQUEUE_SEND_FAILED( pxQueue );
return errQUEUE_FULL;
}
}
}
程式流程如圖 2-1所示,我們對圖中紅色字型标注的部分做詳解。
圖 2-1:通用入隊操作流程圖
當任務将資料入隊時,如果隊列未滿或者以覆寫式入隊,情況是最簡單的,調用函數
prvCopyDataToQueue()
将要入隊的資料拷貝到隊列。這個函數處理三種入隊情況,第一種是隊列項大小為 0 時(即隊列結構體成員 uxItemSize 為0,比如二進制信号量和計數信号量),不進行資料拷貝工作,而是将隊列項計數器加1(即隊列結構體成員 uxMessagesWaiting++);第二種情況是從隊列尾入隊時,則将資料拷貝到指針 pxQueue->pcWriteTo 指向的地方、更新指針指向的位置、隊列項計數器加 1;第三種情況是從隊列首入隊時,則将資料拷貝到指針 pxQueue->u.pcReadFrom 指向的地方、更新指針指向的位置、隊列項計數器加 1。如果是覆寫式入隊,還會調整隊列項計數器的值。
完成資料入隊操作後,還要檢查是否有任務因為等待出隊而阻塞,因為這次資料入隊,隊列至少有一個隊列項,如果有阻塞任務,則阻塞的最高優先級任務可以解除阻塞了。
因等待出隊而阻塞的任務會将任務的事件清單項(即任務 TCB 結構體成員 xEventListItem,我們在《FreeRTOS(14)—FreeRTOS 任務建立分析》一文中講到過事件清單項,它是任務TCB的一個結構體成員)挂接到隊列的等待出隊清單上(即隊列結構體成員xTasksWaitingToReceive)。現在,因為要解除任務阻塞,我們需要将任務的事件清單項從隊列的等待出隊隊列上删除,并且将任務移動到就緒清單中。這一切,都是調用函數xTaskRemoveFromEventList()實作的。
之後,如果解除阻塞的任務優先級比目前任務優先級更高,則觸發一個PendSV中斷,等退出臨界區後,進行上下文切換。入隊任務完成。
上面讨論了最理想的情況,過程也簡潔明了,但如果任務入隊時,隊列滿并且不允許覆寫入隊,則情況會變得複雜起來。
在這種情況下,先看一個簡單分支:阻塞時間為0的情況。設定阻塞時間為0意味着當隊列滿時,函數立即傳回,傳回一個錯誤代碼,表示隊列滿。
如果阻塞時間不為0,則本任務會因為等待入隊而進入阻塞。在将任務設定為阻塞的過程中,是不希望有其它任務和中斷操作這個隊列的事件清單的(隊列結構體成員 xTasksWaitingToReceive 清單和 xTasksWaitingToSend 清單),因為操作隊列事件清單可能引起其它任務解除阻塞,這可能會發生優先級翻轉。比如任務A的優先級低于本任務,但是在本任務進入阻塞的過程中,任務A卻因為其它原因解除阻塞了,這顯然是要絕對禁止的。是以FreeRTOS使用挂起排程器來簡單粗暴的禁止其它任務操作隊列,因為挂起排程器意味着任務不能切換并且不準調用可能引起任務切換的API函數。
但挂起排程器并不會禁止中斷,中斷服務函數仍然可以操作隊列事件清單,可能會解除任務阻塞、可能會進行上下文切換,這是不允許的。于是,解決辦法是不但挂起排程器,還要給隊列上鎖!
隊列結構體中有兩個成員跟隊列上鎖有關:xRxLock 和 xTxLock。這兩個成員變量為 queueUNLOCKED(宏,定義為-1)時,表示隊列未上鎖;當這兩個成員變量為
queueLOCKED_UNMODIFIED
(宏,定義為0)時,表示隊列上鎖。
給隊列上鎖是調用宏
prvLockQueue()
實作的,代碼很簡單,将隊列結構體成員 xRxLock 和 xTxLock 都設定為queueLOCKED_UNMODIFIED。
我們看一下給隊列上鎖是如何起作用的。當中斷服務程式操作隊列并且導緻阻塞的任務解除阻塞時,會首先判斷該隊列是否上鎖,如果沒有上鎖,則解除被阻塞的任務,還會根據需要設定上下文切換請求标志;如果隊列已經上鎖,則不會解除被阻塞的任務,取而代之的是,将 xRxLock 或 xTxLock 加1,表示隊列上鎖期間出隊或入隊的數目,也表示有任務可以解除阻塞了。這部分代碼在帶中斷保護的入隊和出隊 API 函數中,後面我們會講到,這裡先有個印象。
有将隊列上鎖操作,就會有解除隊列鎖操作。函數
prvUnlockQueue()
用于解除隊列鎖,将可以解除阻塞的任務插入到就緒清單,解除任務的最大數量由 xRxLock 和 xTxLock 指定。
經過一系列的邏輯判斷,發現本任務還是要進入阻塞狀态,則調用函數vTaskPlaceOnEventList()來實作。這個函數将揭示任務因等待特定事件而進入阻塞的詳細步驟,其實非常簡單,隻有兩步:第一步,将任務的事件清單項(任務TCB結構體成員xEventListItem)插入到隊列的等待入隊清單(隊列結構體成員xTasksWaitingToSend)中;第二步,将任務的狀态清單項(任務TCB結構體成員xStateListItem)從就緒清單中删除,然後插入到延時清單中,任務的最大延時時間放入xStateListItem. xItemValue中,每次系統節拍定時器中斷服務函數中,都會檢查這個值,檢測任務是否逾時。
當任務成功阻塞在等待入隊操作後,目前任務就沒有必要再占用CPU了,是以接下來解除隊列鎖、恢複排程器、進行任務切換,下一個處于最高優先級的就緒任務就會被運作了。
xQueueGenericSendFromISR ()
這個函數用于入隊,用于中斷服務程式中。根據參數的不同,可以從隊列尾入隊、從隊列首入隊也可以覆寫式入隊。覆寫式入隊用于隻有一個隊列項的場合,入隊時如果隊列已滿,則将之前的隊列項覆寫掉。函數原型為:
BaseType_t xQueueGenericSendFromISR
(
QueueHandle_t xQueue,
const void * const pvItemToQueue,
BaseType_t * const pxHigherPriorityTaskWoken,
const BaseType_t xCopyPosition
)
- xQueue:隊列句柄。
- pvItemToQueue:指針,指向要入隊的項目。
- pxHigherPriorityTaskWoken:如果入隊導緻一個任務解鎖,并且解鎖的任務優先級高于目前運作的任務,則該函數将*pxHigherPriorityTaskWoken 設定成 pdTRUE。如果
設定這個值為 pdTRUE,則中斷退出前需要一次上下文切換。從 FreeRTOS V7.3.0 起,pxHigherPriorityTaskWoken 稱為一個可選參數,并可以設定為 NULL。xQueueSendFromISR()
- xCopyPosition:入隊位置,可以選擇從隊列尾入隊、從隊列首入隊和覆寫式入隊。
這個函數和
xQueueGenericSend()
很相似,但是當隊列滿時不會阻塞,直接傳回一個錯誤碼,表示隊列滿(相當于阻塞時間為0)。是以,有了分析
xQueueGenericSend()
的基礎,這個函數我們很快就能看完。源碼簡化後如下所示:
BaseType_t xQueueGenericSendFromISR(
QueueHandle_t xQueue,
const void * const pvItemToQueue,
BaseType_t * const pxHigherPriorityTaskWoken,
const BaseType_t xCopyPosition )
{
BaseType_t xReturn;
UBaseType_t uxSavedInterruptStatus;
Queue_t * const pxQueue = ( Queue_t * ) xQueue;
uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
{
if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
{
traceQUEUE_SEND_FROM_ISR( pxQueue );
/*完成資料拷貝工作,分為從隊列尾入隊,從隊列首入隊和覆寫式入隊*/
( void ) prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );
/*檢查隊列是否上鎖,如果上鎖,則隊列事件清單不能被改變 */
if( pxQueue->xTxLock == queueUNLOCKED )
{ /*隊列沒有上鎖*/
if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
{
if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE )
{
/* 解除阻塞的任務優先級比目前任務高,記錄上下文切換請求,等傳回中斷服務程式後,可以顯示的強制上下文切換 */
if( pxHigherPriorityTaskWoken != NULL )
{
*pxHigherPriorityTaskWoken = pdTRUE;
}
}
}
}
else
{
/* 隊列上鎖,增加鎖計數器,等到任務解除隊列鎖時,使用這個計數器就可以知道有多少資料入隊,可以最多解除多少個因等待從隊列讀資料而阻塞的任務 */
++( pxQueue->xTxLock );
}
xReturn = pdPASS;
}
else
{
xReturn = errQUEUE_FULL;
}
}
portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );
return xReturn;
}
因為沒有阻塞,是以代碼簡單了很多,唯一值得注意的是,當成功入隊後,如果有因為等待出隊而阻塞的任務,現在可以将其中最高優先級的任務解除阻塞,在執行解除阻塞操作之前,會判斷隊列是否上鎖。如果沒有上鎖,則解除被阻塞的任務,還會根據需要設定上下文切換請求标志;如果隊列已經上鎖,則不會解除被阻塞的任務,取而代之的是将 xTxLock 加 1,表示隊列上鎖期間入隊的個數,也表示有任務可以解除阻塞了。
出隊
出隊的 API 函數要相對少一些,也分為帶中斷保護的出隊操作和不帶中斷保護的出隊操作。每種出隊情況都可以選擇是否删除隊列項。出隊 API 函數如表 3-1所示。
表 3-1:出隊 API 接口清單
出隊操作和入隊操作有很多相似性,将入隊流程了解透徹,出隊操作不在話下,是以我們不再分析源碼。