天天看點

深入了解Tornado——一個異步web伺服器

本人的第一次翻譯,轉載請注明出處:http://www.cnblogs.com/yiwenshengmei/archive/2011/06/08/understanding_tornado.html

原文位址:http://golubenco.org/?p=16

這篇文章的目的在于對Tornado這個異步伺服器軟體的底層進行一番探索。

我采用自底向上的方式進行介紹,從輪巡開始,向上一直到應用層,指出我認為有趣的部分。

是以,如果你有打算要閱讀Tornado這個web架構的源碼,又或者是你對一個異步web伺服器是如何工作的感興趣,我可以在這成為你的指導。

通過閱讀這篇文章,你将可以:

自己寫一個Comet架構程式的伺服器端部分,即使你是從拷貝别人的代碼開始。

如果你想在Tornado架構上做開發,通過這篇文章你将更好的了解Tornado web架構。

在Tornado和Twisted的争論上,你将更有見解。

一、介紹

假設你還不知道Tornado是什麼也不知道為什麼應該對它感興趣,那我将用簡短的話來介紹Tornado這個項目。

如果你已經對它有了興趣,你可以跳去看下一節内容。

Tornado是一個用Python編寫的異步HTTP伺服器,同時也是一個web開發架構。

該架構服務于FriendFeed網站,最近Facebook也在使用它。

FriendFeed網站有使用者數多和應用實時性強的特點,是以性能和可擴充性是很受重視的。

由于現在它是開源的了(這得歸功于Facebook),我們可以徹底的對它是如何工作的一探究竟。

我覺得對非阻塞式IO (nonblocking IO) 和異步IO (asynchronous IO  AIO)很有必要談一談。

如果你已經完全知道他們是什麼了,可以跳去看下一節。我盡可能的使用一些例子來說明它們是什麼。

讓我們假設你正在寫一個需要請求一些來自其他伺服器上的資料(比如資料庫服務,再比如新浪微網誌的open api)的應用程式,

然後呢這些請求将花費一個比較長的時間,假設需要花費5秒鐘。大多數的web開發架構中處理請求的代碼大概長這樣:

def handler_request(self, request):

    answ = self.remote_server.query(request) # this takes 5 seconds

    request.write_response(answ)

如果這些代碼運作在單個線程中,你的伺服器隻能每5秒接收一個用戶端的請求。

在這5秒鐘的時間裡,伺服器不能幹其他任何事情,是以,你的服務效率是每秒0.2個請求,哦,這太糟糕了。 

當然,沒人那麼天真,大部分伺服器會使用多線程技術來讓伺服器一次接收多個用戶端的請求,

我們假設你有20個線程,你将在性能上獲得20倍的提高,是以現在你的伺服器效率是每秒接受4個請求,

但這還是太低了,當然,你可以通過不斷地提高線程的數量來解決這個問題,

但是,線程在記憶體和排程方面的開銷是昂貴的,我懷疑如果你使用這種提高線程數量的方式将永遠不可能達到每秒100個請求的效率。

如果使用AIO,達到每秒上千個請求的效率是非常輕松的事情。伺服器請求處理的代碼将被改成這樣:

def handler_request(self, request):

    self.remote_server.query_async(request, self.response_received)     

def response_received(self, request, answ):    # this is called 5 seconds later

    request.write(answ)

AIO的思想是當我們在等待結果的時候不阻塞,

轉而我們給架構一個回調函數作為參數,讓架構在有結果的時候通過回調函數通知我們。

這樣,伺服器就可以被解放去接受其他用戶端的請求了。

然而這也是AIO不太好的地方:代碼有點不直覺了。

還有,如果你使用像Tornado這樣的單線程AIO伺服器軟體,你需要時刻小心不要去阻塞什麼,

因為所有本該在目前傳回的請求都會像上述處理那樣被延遲傳回。

關于異步IO,比目前這篇過分簡單的介紹更好的學習資料請看 The C10K problem(http://www.kegel.com/c10k.html)

二、源代碼

該項目由github托管,你可以通過如下指令獲得,雖然通過閱讀這篇文章你也可以不需要它是吧。

git clone git://github.com/facebook/tornado.git

在tornado的子目錄中,每個子產品都應該有一個.py檔案,你可以通過檢查他們來判斷你是否從已經從代碼倉庫中完整的遷出了項目。

在每個源代碼的檔案中,你都可以發現至少一個大段落的用來解釋該子產品的doc string,

doc string中給出了一到兩個關于如何使用該子產品的例子。

三、IOLoop子產品

讓我們通過檢視ioloop.py檔案直接進入伺服器的核心。這個子產品是異步機制的核心。

它包含了一系列已經打開的檔案描述符(譯者:也就是檔案指針)和每個描述符的處理器(handlers)。

它的功能是選擇那些已經準備好讀寫的檔案描述符,然後調用它們各自的處理器

(一種IO多路複用的實作,其實就是socket衆多IO模型中的select模型,在Java中就是NIO,譯者注)。

可以通過調用add_handler()方法将一個socket加入IO循環中:

def add_handler(self, fd, handler, events):

    """Registers the given handler to receive the given events for fd."""

    self._handlers[fd] = handler

    self._impl.register(fd, events | self.ERROR)

_handlers這個字典類型的變量儲存着檔案描述符(其實就是socket,譯者注)到當該檔案描述符準備好時需要調用的方法的映射

(在Tornado中,該方法被稱為處理器)。然後,檔案描述符被注冊到epoll(unix中的一種IO輪詢機制,貌似,譯者注)清單中。

Tornado關心三種類型的事件(指發生在檔案描述上的事件,譯者注):READ,WRITE 和 ERROR。

正如你所見,ERROR是預設為你自動添加的。

self._impl是select.epoll()和selet.select()兩者中的一個。我們稍後将看到Tornado是如何在它們之間進行選擇的。

現在讓我們來看看實際的主循環,不知何故,這段代碼被放在了start()方法中:

def start(self):

    """Starts the I/O loop.

    The loop will run until one of the I/O handlers calls stop(), which

    will make the loop stop after the current event iteration completes.

    """

    self._running = True

    while True:

    [ ... ]

        if not self._running:

            break

        [ ... ]

        try:

            event_pairs = self._impl.poll(poll_timeout)

        except Exception, e:

            if e.args == (4, "Interrupted system call"):

                logging.warning("Interrupted system call", exc_info=1)

                continue

            else:

                raise

        # Pop one fd at a time from the set of pending fds and run

        # its handler. Since that handler may perform actions on

        # other file descriptors, there may be reentrant calls to

        # this IOLoop that update self._events

        self._events.update(event_pairs)

        while self._events:

            fd, events = self._events.popitem()

            try:

                self._handlers[fd](fd, events)

            except KeyboardInterrupt:

                raise

            except OSError, e:

                if e[0] == errno.EPIPE:

                    # Happens when the client closes the connection

                    pass

                else:

                    logging.error("Exception in I/O handler for fd %d",

                                  fd, exc_info=True)

            except:

                logging.error("Exception in I/O handler for fd %d",

                              fd, exc_info=True)

poll()方法傳回一個形如(fd: events)的鍵值對,并指派給event_pairs變量。

由于當一個信号在任何一個事件發生前到來時,C函數庫中的poll()方法會傳回EINTR(實際是一個值為4的數值),

是以"Interrupted system call"這個特殊的異常需要被捕獲。更詳細的請檢視man poll。

在内部的while循環中,event_pairs中的内容被一個一個的取出,然後相應的處理器會被調用。

pipe 異常在這裡預設不進行處理。為了讓這個類适應更一般的情況,

在http處理器中處理這個異常是一個更好的方案,但是選擇現在這樣處理或許是因為更容易一些。

注釋中解釋了為什麼使用字典的popitem()方法,而不是使用更普遍一點的下面這種做法(指使用疊代,譯者注):

  for fd, events in self._events.items():

原因很簡單,在主循環期間,這個_events字典變量可能會被處理器所修改。

比如remove_handler()處理器。

這個方法把fd(即檔案描述符,譯者注)從_events字典中取出(extracts,意思是取出并從_events中删除,譯者注),

是以即使fd被選擇到了,它的處理器也不會被調用(作者的意思是,如果使用for疊代循環_events,

那麼在疊代期間_events就不能被修改,否則會産生不可預計的錯誤,

比如,明明調用了remove_handler()方法删除了某個<fd, handler="">鍵值對,但是該handler還是被調用了,譯者注)。

四、(意義不大的)循環結束技巧

怎麼讓這個主循環停止是很有技巧性的。

self._running變量被用來在運作時從主循環中跳出,處理器可以通過調用stop()方法把它設定為False。

通常情況下,這就能讓主循環停止了,但是stop()方法還能被一個信号處理器所調用,

是以,如果

1)主循環正阻塞在poll()方法處,

2)服務端沒有接收到任何來自用戶端的請求

3)信号沒有被OS投遞到正确的線程中,你将不得不等待poll()方法出現逾時情況後才會傳回。

考慮到這些情況并不時常發生,還有poll()方法的預設逾時時間隻不過是0.2秒,是以這種讓主循環停止的方式還算過得去。

但不管怎樣,Tornado的開發者為了讓主循環停止,還是額外的建立了一個沒有名字的管道和對應的處理器,

并把管道的一端放在了輪詢檔案描述符清單中。

當需要停止時,在管道的另一端随便寫點什麼,

這能高效率的(意思是馬上,譯者注)喚醒主循環在poll()方法處的阻塞(貌似Java NIO的Windows實作就用了這種方法,譯者注)。

這裡節選了一些代碼片段:

def __init__(self, impl=None):

    [...]

    # Create a pipe that we send bogus data to when we want to wake

    # the I/O loop when it is idle

    r, w = os.pipe()

    self._set_nonblocking(r)

    self._set_nonblocking(w)

    self._waker_reader = os.fdopen(r, "r", 0)

    self._waker_writer = os.fdopen(w, "w", 0)

    self.add_handler(r, self._read_waker, self.WRITE)

def _wake(self):

    try:

        self._waker_writer.write("x")

    except IOError:

        pass

實際上,上述代碼中存在一個bug:

那個隻讀檔案描述符r,雖然是用來讀的,但在注冊時卻附加上了WRITE類型的事件,這将導緻該注冊實際不會被響應。

正如我先前所說的,用不用專門找個方法其實沒什麼的,是以我對他們沒有發現這個方法不起作用的事實并不感到驚訝。

我在mail list中報告了這個情況,但是尚未收到答複。

五、定時器

另外一個在IOLoop子產品中很有特點的設計是對定時器的簡單實作。

一系列的定時器會被以是否過期的形式來維護和儲存,這用到了python的bisect子產品:

def add_timeout(self, deadline, callback):

    """Calls the given callback at the time deadline from the I/O loop."""

    timeout = _Timeout(deadline, callback)

    bisect.insort(self._timeouts, timeout)

    return timeout

在主循環中,所有過期了的定時器的回調會按照過期的順序被觸發。

poll()方法中的逾時時間會動态的進行調整,調整的結果就是如果沒有新的用戶端請求,

那麼下一個定時器就好像沒有延遲一樣的被觸發(意思是如果沒有新的用戶端的請求,

poll()方法将被阻塞直到逾時,這個逾時時間的設定會根據下一個定時器與目前時間之間的間隔進行調整,

調整後,逾時的時間會等同于距離下一個定時器被觸發的時間,這樣在poll()阻塞完後,下一個定時器剛好過期,譯者注)。

六、選擇select方案

讓我們現在快速的看一下poll和select這兩種select方案的實作代碼。

Python已經在版本2.6的标準庫中支援了epoll,你可以通過在select子產品上使用hasattr()方法檢測目前Python是否支援epoll。

如果python版本小于2.6,Tornado将用它自己的基于C的epoll子產品。

你可以在tornado/epoll.c檔案中找到它源代碼。如果最後這也不行(因為epoll不是每個Linux都有的),

它将回退到selec._Select并把_EPoll類包裝成和select.epoll一樣的api接口。

在你做性能測試之前,請确定你能使用epoll,因為select在有大量檔案描述符情況下的效率非常低。

# Choose a poll implementation. Use epoll if it is available, fall back to

# select() for non-Linux platforms

if hasattr(select, "epoll"):

    # Python 2.6+ on Linux

    _poll = select.epoll

else:

    try:

        # Linux systems with our C module installed

        import epoll

        _poll = _EPoll

    except:

        # All other systems

        import sys

        if "linux" in sys.platform:

            logging.warning("epoll module not found; using select()")

        _poll = _Select

通過上述閱讀,我們的介紹已經涵蓋了大部分IOLoop子產品。正如廣告中介紹的那樣,它是一段優雅而又簡單的代碼。

七、從sockets到流

讓我們來看看IOStream子產品。它的目的是提供一個對非阻塞式sockets的輕量級抽象,它提供了三個方法:

  .read_until(),從socket中讀取直到遇到指定的字元串。這為在讀取HTTP頭時遇到空行分隔符自動停止提供了友善。

  .read_bytes(),從socket中讀取指定數量的位元組。這為讀取HTTP消息的body部分提供了友善。

  .write(),     将指定的buffer寫入socket并持續監測直到這個buffer被發送。

所有上述的方法都可以通過異步方式在它們完成時觸發回調函數。

write()方法提供了将調用者提供的資料加以緩沖直到IOLoop調用了它的(指write方法的,譯者注)處理器的功能,

因為到那時候就說明socket已經為寫資料做好了準備:

def write(self, data, callback=None):

    """Write the given data to this stream.

    If callback is given, we call it when all of the buffered write

    data has been successfully written to the stream. If there was

    previously buffered write data and an old write callback, that

    callback is simply overwritten with this new callback.

    """

    self._check_closed()

    self._write_buffer += data

    self._add_io_state(self.io_loop.WRITE)

    self._write_callback = callback

該方法隻是用socket.send()來處理WRITE類型的事件,直到EWOULDBLOCK異常發生或者buffer被發送完畢。

讀資料的方法和上述過程正好相反。讀事件的處理器持續讀取資料直到緩沖區被填滿為止。

這就意味着要麼讀取指定數量的位元組(如果調用的是read_bytes()),

要麼讀取的内容中包含了指定的分隔符(如果調用的是read_util()):

def _handle_read(self):

    try:

        chunk = self.socket.recv(self.read_chunk_size)

    except socket.error, e:

        if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):

            return

        else:

            logging.warning("Read error on %d: %s",

                            self.socket.fileno(), e)

            self.close()

            return

    if not chunk:

        self.close()

        return

    self._read_buffer += chunk

    if len(self._read_buffer) >= self.max_buffer_size:

        logging.error("Reached maximum read buffer size")

        self.close()

        return

    if self._read_bytes:

        if len(self._read_buffer) >= self._read_bytes:

            num_bytes = self._read_bytes

            callback = self._read_callback

            self._read_callback = None

            self._read_bytes = None

            callback(self._consume(num_bytes))

    elif self._read_delimiter:

        loc = self._read_buffer.find(self._read_delimiter)

        if loc != -1:

            callback = self._read_callback

            delimiter_len = len(self._read_delimiter)

            self._read_callback = None

            self._read_delimiter = None

            callback(self._consume(loc + delimiter_len))

如下所示的_consume方法是為了確定在要求的傳回值中不會包含多餘的來自流的資料,

并且保證後續的讀操作會從目前位元組的下一個位元組開始(先将流中的資料讀到self.read_buffer中,

然後根據要求進行切割,傳回切割掉的資料,保留切割後的資料供下一次的讀取,譯者注):

def _consume(self, loc):

    result = self._read_buffer[:loc]

    self._read_buffer = self._read_buffer[loc:]

    return result

還值得注意的是在上述_handle_read()方法中read buffer的上限——self.max_buffer_size。

預設值是100MB,這似乎對我來說是有點大了。

舉個例子,如果一個攻擊者和服務端建立了100個連接配接,并持續發送不帶頭結束分隔符的頭資訊,

那麼Tornado需要10GB的記憶體來處理這些請求。即使記憶體ok,這種數量級資料的複制操作

(比如像上述_consume()方法中的代碼)很可能使伺服器超負荷。

我們還注意到在每次疊代中_handle_read()方法是如何在這個buffer中搜尋分隔符的,

是以如果攻擊者以小塊形式發送大量的資料,服務端不得不做很多次搜尋工作。

歸根結底,你應該想要将這個參數和諧掉,除非你真的很希望那樣

(Bottom of line, you might want to tune this parameter unless you really expect requests that big 

不大明白怎麼翻譯,譯者注)并且你有足夠的硬體條件。

八、HTTP 伺服器

有了IOLoop子產品和IOStream子產品的幫助,寫一個異步的HTTP伺服器隻差一步之遙,

這一步就在httpserver.py中完成。

HTTPServer類它自己隻負責處理将接收到的新連接配接的socket添加到IOLoop中。

該監聽型的socket自己也是IOLoop的一部分,正如在listen()方法中見到的那樣:

def listen(self, port, address=""):

    assert not self._socket

    self._socket = socket.(socket.AF_INET, socket.SOCK_STREAM, 0)

    flags = fcntl.fcntl(self._socket.fileno(), fcntl.F_GETFD)

    flags |= fcntl.FD_CLOEXEC

    fcntl.fcntl(self._socket.fileno(), fcntl.F_SETFD, flags)

    self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    self._socket.setblocking(0)

    self._socket.bind((address, port))

    self._socket.listen(128)

    self.io_loop.add_handler(self._socket.fileno(), self._handle_events,

                             self.io_loop.READ)

除了綁定給定的位址和端口外,上述代碼還設定了"close on exec"和"reuse address"這兩個标志位。

前者在應用程式建立子程序的時候特别有用。在這種情況下,我們不想讓套接字保持打開的狀态

(任何設定了"close on exec"标志位的檔案描述符,都不能被使用exec函數方式建立的子程序讀寫,

因為該檔案描述符在exec函數調用前就會被自動釋放,譯者注)。

後者用來避免在伺服器重新開機的時候發生“該位址以被使用”這種錯誤時很有用。

正如你所見到的,後備連接配接所允許的最大數目是128(注意,listen方法并不是你想象中的“開始在128端口上監聽”的意思,譯者注)。

這意味着如果有128個連接配接正在等待被accept,那麼直到伺服器有時間将前面128個連接配接中的某幾個accept了

,新的連接配接都将被拒絕。我建議你在做性能測試的時候将該參數調高,因為當新的連接配接被抛棄的時候将直接影響你做測試的準确性。

在上述代碼中注冊的_handle_events()處理器用來accept新連接配接,并建立相關的IOStream對象和初始化一個HTTPConnection對象,

HTTPConnection對象負責處理剩下的互動部分:

def _handle_events(self, fd, events):

    while True:

        try:

            connection, address = self._socket.accept()

        except socket.error, e:

            if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):

                return

            raise

        try:

            stream = iostream.IOStream(connection, io_loop=self.io_loop)

            HTTPConnection(stream, address, self.request_callback,

                           self.no_keep_alive, self.xheaders)

        except:

            logging.error("Error in connection callback", exc_info=True)

可以看到這個方法在一次疊代中accept了所有正在等待處理的連接配接。

也就是說直到EWOULDBLOCK異常發生while True循環才會退出,

這也就意味着目前沒有需要處理accept的連接配接了。

HTTP頭的部分的解析工作開始于HTTPConnection類的構造函數__init()__():

def __init__(self, stream, address, request_callback, no_keep_alive=False,

             xheaders=False):

    self.stream = stream

    self.address = address

    self.request_callback = request_callback

    self.no_keep_alive = no_keep_alive

    self.xheaders = xheaders

    self._request = None

    self._request_finished = False

    self.stream.read_until("rnrn", self._on_headers)

如果你很想知道xheaders參數的意義,請看這段注釋:

如果xheaders為True,我們将支援把所有請求的HTTP頭解析成X-Real-Ip和X-Scheme格式,

而原先我們将HTTP頭解析成remote IP和HTTP scheme格式。

這種格式的HTTP頭在Tornado運作于反向代理或均衡負載伺服器的後端時将非常有用。

_on_headers()回調函數實際用來解析HTTP頭,并在有請求内容的情況下通過使用read_bytes()來讀取請求的内容部分。

_on_request_body()回調函數用來解析POST的參數并調用應用層提供的回調函數:

def _on_headers(self, data):

    eol = data.find("rn")

    start_line = data[:eol]

    method, uri, version = start_line.split(" ")

    if not version.startswith("HTTP/"):

        raise Exception("Malformed HTTP version in HTTP Request-Line")

    headers = HTTPHeaders.parse(data[eol:])

    self._request = HTTPRequest(

        connection=self, method=method, uri=uri, version=version,

        headers=headers, remote_ip=self.address[0])

    content_length = headers.get("Content-Length")

    if content_length:

        content_length = int(content_length)

        if content_length > self.stream.max_buffer_size:

            raise Exception("Content-Length too long")

        if headers.get("Expect") == "100-continue":

            self.stream.write("HTTP/1.1 100 (Continue)rnrn")

        self.stream.read_bytes(content_length, self._on_request_body)

        return

    self.request_callback(self._request)

def _on_request_body(self, data):

    self._request.body = data

    content_type = self._request.headers.get("Content-Type", "")

    if self._request.method == "POST":

        if content_type.startswith("application/x-www-form-urlencoded"):

            arguments = cgi.parse_qs(self._request.body)

            for name, values in arguments.iteritems():

                values = [v for v in values if v]

                if values:

                    self._request.arguments.setdefault(name, []).extend(

                        values)

        elif content_type.startswith("multipart/form-data"):

            boundary = content_type[30:]

            if boundary: self._parse_mime_body(boundary, data)

    self.request_callback(self._request)

将結果寫回用戶端的工作在HTTPRequest類中處理,你可以在上面的_on_headers()方法中看到具體的實作。

HTTPRequest類僅僅将寫回的工作代理給了stream對象。

def write(self, chunk):

    assert self._request, "Request closed"

    self.stream.write(chunk, self._on_write_complete)

未完待續?

通過這篇文章,我已經涵蓋了從socket到應用層的所有方面。這應該能給你關于Tornado是如何工作的一個清晰的了解。

總之,我認為Tornado的代碼是非常友好的,我希望你也這樣認為。

Tornado架構還有很大一部分我們沒有探索,比如wep.py(應該是web.py,譯者注)

這個實際與你應用打交道的子產品,又或者是template engine子產品。如果我有足夠興趣的話,

我也會介紹這些部分。可以通過訂閱我的RSS feed來鼓勵我。

轉載于:https://www.cnblogs.com/DjangoBlog/p/5409206.html