天天看點

Python Tornado架構(TCP層)

Tornado在TCP層裡的工作機制

上一節是關于應用層的協定 HTTP,它依賴于傳輸層協定 TCP,例如伺服器是如何綁定端口的?HTTP 伺服器的 handle_stream 是在什麼時候被調用的呢?本節聚焦在 TCP 層次的實作,以便和上節的程式流程銜接起來。

首先是關于 TCP 協定。這是一個面向連接配接的可靠傳遞的協定。由于是面向連接配接,是以在伺服器端需要配置設定記憶體來記憶用戶端連接配接,同樣用戶端也需要記錄伺服器。由于保證可靠傳遞,是以引入了很多保證可靠性的機制,比如定時重傳機制,SYN/ACK 機制等,相當複雜。是以,在每個系統裡的 TCP 協定軟體都是相當複雜的,本文不打算深入談這些(我也談不了多少,呵呵)。但我們還是得對 TCP 有個了解。先上一張圖(UNIX 網絡程式設計)-- 狀态轉換圖。

Python Tornado架構(TCP層)

除外,來一段TCP伺服器端程式設計經典三段式代碼(C實作):

// 建立監聽socket
int sfd = socket(AF_INET, SOCK_STREAM, 0);  
// 綁定socket到位址-端口, 并在該socket上開始監聽。listen的第二個參數叫backlog,和連接配接隊列有關
bind(sfd,(struct sockaddr *)(&s_addr), sizeof(struct sockaddr)) && listen(sfd, 10); 
while(1) cfd = accept(sfd, (struct sockaddr *)(&cli_addr), &addr_size);      

以上,忽略所有錯誤處理和變量聲明,顧名思義吧…… 更多詳細,可以搜 Linux TCP 伺服器程式設計。是以,對于 TCP 程式設計的總結就是:建立一個監聽 socket,然後把它綁定到端口和位址上并開始監聽,然後不停 accept。這也是 tornado 的 TCPServer 要做的工作。

TCPServer 類的定義在 tcpserver.py。它有兩種用法:bind+start 或者 listen。

第一種用法可用于多線程,但在 TCP 方面兩者是一樣的。就以 listen 為例吧。TCPServer 的__init__沒什麼注意的,就是記住了 ioloop 這個單例,這個下節再分析(它是tornado異步性能的關鍵)。listen 方法接收兩個參數端口和位址,代碼如下

def listen(self, port, address=""):
	"""Starts accepting connections on the given port.

	This method may be called more than once to listen on multiple ports.
	`listen` takes effect immediately; it is not necessary to call
	`TCPServer.start` afterwards.  It is, however, necessary to start
	the `.IOLoop`.
	"""
	sockets = bind_sockets(port, address=address)
	self.add_sockets(sockets)
           

以上。首先 bind_sockets 方法接收位址和端口建立 sockets 清單并綁定位址端口并監聽(完成了TCP三部曲的前兩部),add_sockets 在這些 sockets 上注冊 read/timeout 事件。有關高性能并發伺服器程式設計可以參照UNIX網絡程式設計裡給的幾種程式設計模型,tornado 可以看作是單線程事件驅動模式的伺服器,TCP 三部曲中的第三部就被分隔到了事件回調裡,是以肯定要在所有的檔案 fd(包括sockets)上監聽事件。在做完這些事情後就可以安心的調用 ioloop 單例的 start 方法開始循環監聽事件了。具體細節可以參照現代高性能 web 伺服器(nginx/lightttpd等)的事件模型,後面也會涉及一點。

簡言之,基于事件驅動的伺服器(tornado)要幹的事就是:建立 socket,綁定到端口并 listen,然後注冊事件和對應的回調,在回調裡accept 新請求。

bind_sockets 方法在 netutil 裡被定義,沒什麼難的,建立監聽 socket 後為了異步,設定 socket 為非阻塞(這樣由它 accept 派生的socket 也是非阻塞的),然後綁定并監聽之。add_sockets 方法接收 socket 清單,對于清單中的 socket,用 fd 作鍵記錄下來,并調用add_accept_handler 方法。它也是在 netutil 裡定義的,代碼如下:

def add_accept_handler(sock, callback, io_loop=None):
    """Adds an `.IOLoop` event handler to accept new connections on ``sock``.

    When a connection is accepted, ``callback(connection, address)`` will
    be run (``connection`` is a socket object, and ``address`` is the
    address of the other end of the connection).  Note that this signature
    is different from the ``callback(fd, events)`` signature used for
    `.IOLoop` handlers.
    """
    if io_loop is None:
        io_loop = IOLoop.current()

    def accept_handler(fd, events):
        while True:
            try:
                connection, address = sock.accept()
            except socket.error as e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return
                raise
            callback(connection, address)
    io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)
           

需要注意的一個參數是 callback,現在指向的是 TCPServer 的 _handle_connection 方法。add_accept_handler 方法的流程:首先是確定ioloop對象。然後調用 add_handler 向 loloop 對象注冊在fd上的read事件和回調函數accept_handler。該回調函數是現成定義的,屬于IOLoop層次的回調,每當事件發生時就會調用。回調内容也就是accept得到新socket和用戶端位址,然後調用callback向上層傳遞事件。從上面的分析可知,當read事件發生時,accept_handler被調用,進而callback=_handle_connection被調用。

_handle_connection就比較簡單了,跳過那些ssl的處理,簡化為兩句stream = IOStream(connection, io_loop=self.io_loop)和self.handle_stream()。這裡IOStream代表了IO層,以後再說,反正讀寫是不愁了。接着是調用handle_stream。我們可以看到,不論應用層是什麼協定(或者自定義協定),當有新連接配接到來時走的流程是差不多的,都要經曆一番上訴的回調,不同之處就在于這個handle_stream方法。這個方法是由子類自定義覆寫的,它的HTTP實作已經在上一節看過了。

到此,和上節的代碼流程接上軌了。當事件發生時是如何回調的呢?app.py裡的IOLoop.instance().start()又是怎樣的流程呢?明天繼續,看tornado異步高性能的根本所在

Tornado TCPServer類的設計解讀

前文已經說過,HTTPServer是派生自TCPServer,從協定層次上講,這再自然不過。

從TCPServer的實作上看,它是一個通用的server架構,基本是按照BSD socket的思想設計的。create-bind-listen三段式一個都不少。

從helloworld.py往下追,可以看到:

  1. helloworld.py中的main函數建立了HTTPServer.
  2. HTTPServer繼承自TCPServer,在HTTPServer的構造函數中直接調用了TCPServer的構造函數。

接下來我們就去看看TCPServer這個類的實作,它的代碼放在tornado/tcpserver.py中。tcpserver.py隻有兩百多行,不算多。所有代碼都是在實作TCPServer這個類。

TCPServer

在TCPServer類的注釋中,首先強調了它是一個non-blocking, single-threaded TCP Server。

怎麼了解呢? 

non-blocking,就是說,這個伺服器沒有使用阻塞式API。

什麼是阻塞式設計?舉個例子,在BSD Socket裡,recv函數預設是阻塞式的。使用recv讀取用戶端資料時,如果對方并未發送資料,則這個API就會一直阻塞那裡不傳回。這樣伺服器的設計不得不使用多線程或者多程序方式,避免因為一個API的阻塞導緻伺服器沒法做其它事。阻塞式API是很常見的,我們可以簡單認為,阻塞式設計就是“不管有沒有資料,伺服器都派API去讀,讀不到,API就不會回來交差”。

而非阻塞,對recv來說,差別在于沒有資料可讀時,它不會在那死等,它直接就傳回了。你可能會認為這辦法比阻塞式還要矬,因為伺服器無法預知有沒有資料可讀,不得不反複派recv函數去讀。這不是浪費大量的CPU資源麼?

當然不會這麼傻。tornado這裡說的非阻塞要進階得多,基本上是另一種思路:伺服器并不主動讀取資料,它和作業系統合作,實作了一種“螢幕”,TCP連接配接就是它的監視對象。當某個連接配接上有資料到來時,作業系統會按事先的約定通知伺服器:某某号連接配接上有資料到來,你去處理一下。伺服器這時候才派API去取資料。伺服器不用建立大量線程來阻塞式的處理每個連接配接,也不用不停派API去檢查連接配接上有沒有資料,它隻需要坐那裡等作業系統的通知,這保證了recv API出手就不會落空。

tornado另一個被強調的特征是single-threaded,這是因為我們的“螢幕”非常高效,可以在一個線程裡監視成千上萬個連接配接的狀态,基本上不需要再動用線程來分流。實測表明,它比阻塞式多線程或者多程序設計更加高效——當然,這依賴于作業系統的大力配合,現在主流作業系統都提供了非常高端大氣上檔次的“螢幕”機制,比如epoll、kqueue。

作者提到這個類一般不直接被執行個體化,而是由它派生出子類,再用子類執行個體化。

為了強化這個設計思想,作者定義了一個未直接實作的接口,叫handle_stream()。

def handle_stream(self, stream, address):
    """Override to handle a new `.IOStream` from an incoming connection."""
    raise NotImplementedError()
           

這倒是個不錯的技巧,強制讓子類覆寫本方法,不然就報錯給你看!

TCPServer是支援SSL的。由于Python的強大,支援SSL一點都不費事。要啟動一個支援SSL的TCPServer,隻需要告訴它你的certifile和keyfile就行。

TCPServer(ssl_options={"certfile": os.path.join(data_dir, "mydomain.crt"),
	"keyfile": os.path.join(data_dir, "mydomain.key"),})
           

關于這兩個檔案的來龍去脈,可以去Google“數字證書原理”這篇文章。

TCPServer的三種形式

TCPServer的初始化有三種形式。

1. 單程序形式

server = TCPServer()
server.listen(8888)
IOLoop.instance().start()
           

 我們在helloworld.py中看到的就是這種用法,不再贅述。

2. 多程序形式。

server = TCPServer()
server.bind(8888)
server.start(0)  # Forks multiple sub-processes
IOLoop.instance().start(
           

差別主要在server.start(0)這裡。後面分析listen()與start()兩個成員函數時,就會看到它們是怎麼跟程序結合的。

注意:這種模式啟動時,不能把IOLoop對象傳遞給TCPServer的構造函數,這樣會導緻TCPServer直接按單程序啟動。

3. 進階多程序形式。

sockets = bind_sockets(8888)
tornado.process.fork_processes(0)
server = TCPServer()
server.add_sockets(sockets)
IOLoop.instance().start()
           

進階意味着複雜。從上面代碼看,雖然隻多了一兩行,實際裡面的流程有比較大的差别。

這種方式的主要優點就是 tornado.process.fork_processes(0)這句,它為程序的建立提供了更多的靈活性。當然現在說了也是糊塗,後面鑽進這些代碼後,我們再來驗證這裡的說法。

以上内容都是TCPServer類的doc string中提到的。後面小節開始看code。

從代碼分析TCPServer類的機制

TCPServer的__init__函數很簡單,僅儲存了參數而已。

唯一要注意的是,它可以接受一個io_loop為參數。實際上io_loop對TCPServer來說并不是可有可無,它是必須的。不過TCPServer提供了多種管道來與一個io_loop綁定,初始化參數隻是其中一種綁定方式而已。

listen

接下來我們看一下listen函數,在helloworld.py中,httpserver執行個體建立之後,它被第一個調用。

TCPServer類的listen函數是開始接受指定端口上的連接配接。注意,這個listen與BSD Socket中的listen并不等價,它做的事比BSD socket()+bind()+listen()還要多。

注意在函數注釋中提到的一句話:你可以在一個server的執行個體中多次調用listen,以實作一個server偵聽多個端口。

怎麼了解?在BSD Socket架構裡,我們不可能在一個socket上同時偵聽多個端口。反推之,不難想到,TCPServer的listen函數内部一定是執行了全套的BSD Socket三段式(create socket->bind->listen),使得每調用一次listen實際上是建立了一個新的socket。

代碼很好的符合了我們的猜想:

def listen(self, port, address=""):
	sockets = bind_sockets(port, address=address)
	self.add_sockets(sockets)
           

兩步走,先建立了一個socket,然後把它加到自己的偵聽隊列裡。

bind_socket

bind_socket函數并不是TCPServer的成員,它定義在netutil.py中,原型:

def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128, flags=None):
           

它也有大段的注釋。

bind_socket完成的工作包括:建立socket,綁定socket到指定的位址和端口,開啟偵聽。

解釋一下參數:

  • port不用說,端口号嘛。
  • address可以是IP位址,如“192.168.1.100”,也可以是hostname,比如“localhost”。如果是hostname,則可以監聽該hostname對應的所有IP。如果address是空字元串(“”)或者None,則會監聽主機上的所有接口。
  • family是指網絡層協定類型。可以選AF_INET和AF_INET6,預設情況下則兩者都會被啟用。這個參數就是在BSD Socket建立時的那個sockaddr_in.sin_family參數哈。
  • backlog就是指偵聽隊列的長度,即BSD listen(n)中的那個n。
  • flags參數是一些位标志,它是用來傳遞給socket.getaddrinfo()函數的。比如socket.AI_PASSIVE等。

另外要注意,在IPV6和IPV4混用的情況下,這個函數的傳回值可以是一個socket清單,因為這時候一個address參數可能對應一個IPv4位址和一個IPv6位址,它們的socket是不通用的,會各自獨立建立。

現在來一行一行看下bind_socket的代碼

sockets = []
if address == "":
	address = None
if not socket.has_ipv6 and family == socket.AF_UNSPEC:
	# Python can be compiled with --disable-ipv6, which causes
	# operations on AF_INET6 sockets to fail, but does not
	# automatically exclude those results from getaddrinfo
	# results.
	# http://bugs.python.org/issue16208
	family = socket.AF_INET
if flags is None:
	flags = socket.AI_PASSIVE
           

這一段平淡無奇,基本上都是前面講到的參數指派。

接下來就是一個大的循環:

for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,0, flags)):
           

鬧半天,前面解釋的參數全都被socket.getaddrinfo()這個函數吃下去了。

socket.getaddrinfo()是python标準庫中的函數,它的作用是将所接收的參數重組為一個結構res,res的類型将可以直接作為socket.socket()的參數。跟BSD Socket中的getaddrinfo差不多嘛。

之是以用了一個循環,正如前面講到的,因為IPv6和IPv4混用的情況下,getaddrinfo會傳回多個位址的資訊。參見python文檔中的說明和示例:

The function returns a list of 5-tuples with the following structure: (family, type, proto, canonname, sockaddr)

>>> socket.getaddrinfo("www.python.org", 80, proto=socket.SOL_TCP)
[(2, 1, 6, '', ('82.94.164.162', 80)),
 (10, 1, 6, '', ('2001:888:2000:d::a2', 80, 0, 0))]
           

 接下來的代碼在循環體中,是針對單個位址的。循環體内一開始就如我們猜想,直接拿getaddrinfo的傳回值來建立socket。

af, socktype, proto, canonname, sockaddr = res
try:
	sock = socket.socket(af, socktype, proto)
except socket.error as e:
	if e.args[0] == errno.EAFNOSUPPORT:
		continue
raise
           

 先從tuple中拆出5個參數,然後揀需要的來建立socket。

set_close_exec(sock.fileno())
           

這行是設定程序退出時對sock的操作。lose_on_exec 是一個程序所有檔案描述符(檔案句柄)的位圖示志,每個比特位代表一個打開的檔案描述符,用于确定在調用系統調用execve()時需要關閉的檔案句柄(參見include/fcntl.h)。當一個程式使用fork()函數建立了一個子程序時,通常會在該子程序中調用execve()函數加載執行另一個新程式。此時子程序将完全被新程式替換掉,并在子程序中開始執行新程式。若一個檔案描述符在close_on_exec中的對應比特位被設定,那麼在執行execve()時該描述符将被關閉,否則該描述符将始終處于打開狀态。

當打開一個檔案時,預設情況下檔案句柄在子程序中也處于打開狀态。是以sys_open()中要複位對應比特位

if os.name != 'nt':
	sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
           

 對非NT的核心,需要額外設定一個SO_REUSEADDR參數。有些系統的設計裡,伺服器程序結束後端口也會被核心保持一段時間,若我們迅速的重新開機伺服器,可能會遇到“端口已經被占用”的情況。這個标志就是通知核心不要保持了,程序一關,立馬放手,便于後來者重用。

if af == socket.AF_INET6:
	 # On linux, ipv6 sockets accept ipv4 too by default,
	 # but this makes it impossible to bind to both
	 # 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,
	 # separate sockets *must* be used to listen for both ipv4
	 # and ipv6.  For consistency, always disable ipv4 on our
	 # ipv6 sockets and use a separate ipv4 socket when needed.
	 #
	 # Python 2.x on windows doesn't have IPPROTO_IPV6.
	 if hasattr(socket, "IPPROTO_IPV6"):
		 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
           

 這段代碼的說明已經很清楚了

sock.setblocking(0)
sock.bind(sockaddr)
sock.listen(backlog)
sockets.append(sock)
           

前面經常提BSD Socket的這幾個家夥,現在它們終于出現了。“非阻塞”性質也是在這裡決定的。

每建立一個socket都将它加入到前面定義的清單裡,最後函數結束時,将清單傳回。其實這個函數蠻簡單的。為什麼它不是TCPServer的成員函數?

轉載于:https://www.cnblogs.com/jasonwang-2016/p/5950106.html