天天看點

python 協程可以嵌套協程嗎_Python線程、協程探究(3)——協程的排程實作

python 協程可以嵌套協程嗎_Python線程、協程探究(3)——協程的排程實作

前言的前言

同學們,别光點贊收藏,記得關注我的專欄哦,每周都有更新!

前言

這篇文章我們再次回到協程。協程的技術實際上是非常重要的,

比如微信的背景就大量的使用協程來進行并發量的提高

,隻用多線程和多程序很難低成本、高效地滿足全球數十億使用者的各種操作請求。協程的底層實作可以定制的,微信團隊就開源了一個他們實作的協程庫。直接分析他們的實作比較困難,是以我們先分析python官方的協程庫asyncio的底層實作。

Tencent/libco​github.com

python 協程可以嵌套協程嗎_Python線程、協程探究(3)——協程的排程實作

這篇文章主要基于之前的兩篇文章。在協程文章中,主要介紹了協程的了解和特點,在Redis單線程請求響應文章中,主要介紹了IO複用技術,以及Redis是如何利用IO複用來實作定時任務(TimeEvent)和IO任務(FileEvent)的排程。本篇在前兩篇文章的基礎上,來研究Python的協程排程到底是如何實作的。

大龍:Python線程協程探究(二)——揭開協程的神秘面紗​zhuanlan.zhihu.com

python 協程可以嵌套協程嗎_Python線程、協程探究(3)——協程的排程實作

大龍:Redis詳解(2)——Redis是如何做到單線程服務10000用戶端的​zhuanlan.zhihu.com

python 協程可以嵌套協程嗎_Python線程、協程探究(3)——協程的排程實作
本篇假定讀者對asyncio的使用以及python協程有基本的熟悉

并主要涉及到如下的内容:

  • 内容回顧: 首先對上兩篇文章進行一個簡單的回顧,回顧對協程的了解和特點以及Redis事件驅動的實作
  • Python的asyncio架構中事件循環的實作和任務排程的整體流程
    • asyncio實作中的三個重要概念及協程排程:TimeHandler, Eventloop,Task
    • 以asyncio.sleep()調用和實作為例介紹

内容回顧

協程回顧

我們曾總結協程具有兩大特點

  • 可保留運作時的狀态資料
  • 可出讓自己的執行權,當重新獲得執行權時從上一次暫停的位置繼續執行

總結而言就是協程相比較正常的函數,是可以被打斷以及恢複執行流程的。我們可以将之了解為使用者級的線程,核心級的線程的排程執行是由作業系統來決定負責的,而協程的排程執行是由線程來負責的。

需要再次提醒的一點是,正常情況下,同一個程序的線程之間是可以并行的(python多線程除外,具體詳見專欄文章),但是無論怎麼樣,同一個線程内的各個協程不是并行的,隻能并發執行,具體解釋見專欄中的協程介紹文章。
python 協程可以嵌套協程嗎_Python線程、協程探究(3)——協程的排程實作

協程即為使用者級線程

既然協程的排程執行需要線程來負責,那麼我們就需要實作一個排程器來實作協程的排程,在asyncio中,eventloop我們可以認為就是一個協程排程器。

python 協程可以嵌套協程嗎_Python線程、協程探究(3)——協程的排程實作

排程器實作線程排程

Redis事件(任務)排程

事件驅動程式設計最重要的兩個元素就是

  • 事件循環(即Redis中和asyncio中的eventloop)
  • 事件在事件循環的注冊及發生事件時的回調函數綁定

Redis事件排程中主要排程

IO事件和定時事件

,每一個事件都會指定一個回調函數,當該事件發生的時候執行指定回調函數。同時我們還介紹

IO事件的發生與否是托管給作業系統進行管理,但時間事件的管理是Redis自己來管理的

。Redis事件循環執行一次的邏輯如下,首先檢查是否有時間事件的發生,然後根據即将最早發生的時間事件的時間戳與目前時間戳的比較來決定epoll調用(我們假定底層采用epoll)的等待時長,接着使用epoll系統調用從核心拿到已經發生的IO事件,然後處理已發生的檔案事件和預定時間已經到達的定時事件。

python 協程可以嵌套協程嗎_Python線程、協程探究(3)——協程的排程實作

Redis事件循環單次執行邏輯

asyncio協程排程

asyncio裡協程的排程就是基于事件排程實作的。那麼和Redis一樣,asyncio中的事件分為IO事件(主要用于socket檔案事件的監控)和定時事件,

asyncio的協程排程和Redis的事件排程二者從頂層邏輯看也是幾乎一緻的

。即都有一個事件循環,每次執行流程和上圖也幾乎一緻。

但協程與函數最大的不同在于協程本身的執行流程是可以被打斷的,那麼如果一個協程的執行流程被打斷了,該如何恢複其排程?

答案仍然是回調函數。接下來我們一步步展開asyncio是如何實作協程排程的,以及下面這些代碼執行每一行時底層實作到底發生了什麼。

# 代碼片段1
           

asyncio實作關鍵概念

為了友善之後的源碼分析,我們先介紹asyncio實作中的幾個關鍵概念。

需要說明的是為了友善在說明,如下的代碼并不是嚴格的asyncio底層的實作代碼,我将不影響了解的代碼全部進行了删除,隻留了一些核心邏輯代碼。強烈下面的代碼認真閱讀,代碼加了詳細的注釋比較好懂,且對協程的排程了解非常有幫助。 核心概念1:TimeHandler
#代碼片段2
           

TimeHandler是時間事件的句柄,實際上就是一個時間事件上面封裝了一層,支援指定事件發生的時間。執行個體化一個TimeHandler時,需要指明該事件指定的發生時間以及該事件的回調函數。當執行一個TimeHandler對象的_run()方法時,執行該時間事件的回調函數。

核心概念2:Eventloop

事件循環的概念我們應該不陌生,在Redis中每一次事件循環都會執行asProcessEvent()的函數,對應的在asyncio中,每一次事件循環我們都會執行_run_once()函數。

asyncio中所有就緒的任務/事件都放在self._ready中,所有待發生的時間事件都放在self._scheduled中進行存儲管理,其中self._scheduled是一個按照時間事件的發生時間來構成的最小堆。
#代碼片段3
           

每個任務加入到事件循環中時,會執行個體化一個對應的TimeHandler對象,ready和scheduled中存儲的就是一個個TimeHandler對象。為了友善使用者指定TimeHandler中的when參數,eventloop提供了三個接口讓使用者把時間事件加入到事件循環中,他們分别是

call_later(), call_at(), call_soon()

, 其中call_soon()是直接将回調函數加到ready對列中,然後在下一次run_once函數被執行時該回調函數被執行,而call_later和call_soon将定時任務加入到schduled中。asyncio中的eventloop會循環執行run_once函數,run_once函數的執行邏輯如下:

  1. 檢查ready隊列中是否有任務, 如果ready隊列中已經有任務 ,則設定timeout為0,不擷取IO事件,然後擷取所有預計發生時間小于目前時間的的時間事件加入到ready隊列,并執行ready隊列中的事件。
  2. 如果ready隊列中沒有任務 ,就根據最早發生的時間事件的時間與目前的時間的比較結果來确定self.selector.select()函數的逾時時間。(不失一般性,我們可以認為self.selector.select()底層使用的是epoll系統調用)。如果擷取到了IO事件,則首先進行IO事件的處理,然後擷取該發生的時間事件加入到ready隊列中,最後執行所有的ready隊列中的任務。

上述的代碼流程我認為是比較簡單易懂的,沒什麼特殊之處。核心的過程在于擷取發生的事件,并執行事件的回調函數。使用epoll系統調用中獲得IO事件并執行其回調函數,從scheduled中擷取時間事件,并執行其回調函數。

需要注意的是,在run_once函數中,隻有最後的部分才是執行回調函數的部分,前半部分做的各種事件檢查都是檢查事件的回調函數是否能加入到ready隊列中,即最終隻執行ready隊列中的回調函數

核心概念3:Task

Task是協程排程的核心實作,所有的協程都是一個Task,對協程的執行、挂起、切換、恢複執行等都是在Task中進行。Task的實作代碼中集合核心方法如下,代碼我都加了詳細的注釋,還比較易懂。

還是需要重申一遍的是,下面的代碼并不是嚴格asyncio底層實作代碼,為了友善介紹,我将Python的實作源碼的很多部分進行了删除,以及将Task從Future類繼承的方法加入到了Task類的方法中。
class 
           

在Task中實際上完成了協程函數的執行、挂起、切換、結束之後調用其回調函數。

對于一個Task,很重要的一點在于可以對其加入回調函數,即該Task的協程運作結束後,會排程執行所有的回調函數

。我們按照一個協程被建立為一個Task到Task執行完畢的執行過程來分析上述的代碼。

  1. __init__() 函數:使用協程建立一個Task,我們命名為Task1,同時在__init__函數的最後,将Task1的__step()函數加入了事件循環的就緒隊列中,下一次執行run_once函數時,這個_step函數就會被執行。
  2. __step() 函數:實際上為一個Task的執行函數, 首先會調用目前協程的send()函數進行目前協程的執行
  3. 如果目前協程中沒有使用yield/yield from/await,則會順利的執行完畢(因為不會被打斷)。 協程執行結束時會raise一個stopIteration的異常,__step()中捕獲到後,就執行Task1的結果設定函數set_result()
    1. set_result() : 在set_result()函數中,設定了result結果,同時執行了所有回調函數的排程函數。(可以對一個Task對象重複調用add_done_callback()加入多個回調函數,這些回調函數被放在一個清單中維護。)
    2. __schedule_callbacks() :在回調函數排程中,将回調函數清單中的所有的回調函數加入到事件循環的ready隊列中,下次run_once函數被運作時,這些回調函數會被執行。
  4. 如果目前協程中使用了yield from或者await等待另一個協程的執行完畢, 則就把目前Task的喚醒函數_wakeup()加入到被執行協程的回調函數
    1. _wakeup() :當被等待寫成執行完畢,執行其回調函數wakeup時,wakeup函數重新執行step函數

我們發現有了Task類的輔助,協程的運作、切換、挂起、喚醒變得非常的容易實作和了解。即

  1. 如果一個Task的協程正常執行完,我們就設定Task的結果屬性,然後執行Task的回調函數。
  2. 如果該Task的協程由于中間使用了yield/yield from/ await進而被打斷了執行流程,則将目前 Task的協程的喚醒函數作為被調用的協程所屬的Task的回調函數 。在被調用協程順利執行完畢後,按照情況1的執行流程,目前Task的喚醒函數會被作為回調函數執行,進而又可以繼續執行目前的協程。

這基本就是協程切換、執行的核心内容了,應該還是比較好懂的。接下來我們以一段示例代碼來将上面的所有内容串起來,研究每一步底層到底都執行了什麼。

以asyncio.sleep()調用為例

本節我們以asyncio.sleep()調用為例,來追蹤每一步到底發生了什麼,調用官方的代碼應該是下面的代碼這樣。但是為了更好的研究,我将asyncio.sleep()函數的實作進行了少量的修改,把實作代碼都放在同一個代碼檔案中,是以我們跳過這個代碼片段,直接研究下一個代碼片段。

#一個協程執行流程被打斷的最主要的原因就是在協程内部調用了另一個協程函數
           

我們按照整個函數的執行流程來研究其每一步到底發生了什麼。

# 代碼片段1
           
  1. 首先通過get_event_loop()獲得一個事件循環 event_loop。
  2. 使用協程 cor1() 建立一個Task對象(為友善指代,我們命名為Task1) 在Task1的建立函數的最後,Task1的_step函數被加入到事件循環的ready隊列中,進而在下一次 run_once 函數被執行時Task1的協程會被排程執行。(asyncio.create_task()就是執行Task執行個體化,然後傳回Task對象,Task對象初始化時最後一步将step函數加入到事件循環的ready隊列中)
  3. 對Task1注冊一個回調函數call_back(),當Task1執行結束後,在 set_result() 函數中會将所有的回調函數加入到事件循環的ready隊列中。
  4. 開始執行事件循環,每次事件循環都會執行 run_once 函數。進入到run_once函數中,首先由于第2步在ready隊列中加入了Task的 step 函數,是以開始執行step函數,而step函數會通過協程的send()函數觸發cor1()協程的執行。
  5. cor1() 函數中遇到 await ,于是将cor1對應的Task1的 wakeup 函數加入到 dalong_sleep() 協程對應的Task的回調函數中(将dalong_sleep()對應的Task命名為Task2
  6. 如果調用dalong_sleep時delay參數為0。
    1. 則d along_sleep 中直接調用 yield, dalong_sleep執行流程被打斷,但按照step函數中的邏輯,由于Task2并沒有等待任何其他的協程執行完畢,是以Task2的step函數會被重新加入到事件循環的ready隊列中,然後再下一次run_once函數被執行時再次執行Task2的step函數。
    2. 當dalong_sleep協程被再次繼續執行時,就從yield的下一句開始執行,由于緊接着就是return語句,是以協程執行完畢正常退出并執行其set_result函數。在該函數中設定結果并執行回調函數即Task1的wakeup函數,于是Task1被繼續執行。
    3. Task1的step函數被繼續執行,在step函數中使用send()恢複cor1協程的執行流程。由于後續沒有遇到任何的yield/await/yield from,是以協程順利執行到結束。cor1結束時,調用Task1的set_result函數,設定result屬性并将其回調函數加入到事件循環的ready隊列中。在下一次的run_once函數被執行時,其回調函數被執行。
  7. 如果delay不為0。
    1. dalong_sleep中首先建立一個future對象(不影響了解,可以近似認為就是建立了一個Task對象,我們将之命名為Task3雖然實作上Future為Task的父類)。指定其發生的時間為delay時長之後的時間戳,然後将其加入到事件循環的schedule堆中,并綁定回調函數為_set_result_unless_cancelled, 在該時間事件發生後該回調函數被執行。
    2. 繼續向下執行遇到yield from等待Task3,按照協程的切換執行流程。會将Task2的喚醒函數wakeup注冊為Task3的回調函數。此時Task3有兩個回調函數,一個是set_result_unless_cancelled,一個是Task2的喚醒函數。
    3. 當睡眠的時間到達後,在run_once函數中就會執行Task3的回調函數。執行Task2的喚醒函數之後,Task2可以被繼續執行。
    4. Task2遇到dalong_sleep協程中的return語句,故Task2執行結束,調用其set_result函數并将其回調函數加入到事件循環的ready隊列中,Task2的其中一個回調函數就是Task1的喚醒函數
    5. Task1被喚醒之後被繼續執行到結束。然後将其回調函數加入到ready隊列中
    6. Task1的回調函數被執行。

總結

本篇中我們主要介紹了python的asyncio中協程的排程的底層實作,我們發現asyncio事件循環的run_once函數的執行邏輯與Redis事件循環的aeProcessEvent()函數幾乎一緻。與Redis不同的是,在協程的排程執行中,我們需要關注協程被打斷執行流之後如何恢複其執行。而在asyncio的實作中我們也發現:asyncio借助了Task類,通過将目前協程所對應的Task的喚醒函數設定為被調用協程的回調函數進而實作了協程的切換、挂起以及喚醒。最後我們通過一個函數代碼示例來詳細的分析函數的每一步中協程排程底層到底發生了什麼,示例代碼雖然較短,卻也幾乎包含了所有協程中常用的概念和使用方法,希望這篇文章對大家更深入的了解協程有所幫助。

當然了,協程的排程我們是研究清楚了,但是協程還有一個最關鍵的特點是:可以被打斷執行流程和恢複執行流程。那麼如何實作在協程被打斷時儲存上下文,并在下一次恢複執行時再恢複執行流程?

這個内容就會在後續的部落格介紹。感興趣的話可以關注我的專欄~

後記

最近的兩篇部落格都涉及到比較多的源碼分析,之後的文章應該會主要交替進行。一些文章專注于一些系統設計、概念了解的内容,一些部落格專注于這些系統和概念背後的具體實作代碼。