天天看點

socketserver TCPServer調用過程以及源碼分析ServerClassRequestHandlerClassMixINClassServer 類的繼承關系RequestHandler 類的繼承關系調用流程參考

socketserver架構主要是使用 ServerClass 跟 RequestHandlerClass 兩大類。

ServerClass 處理服務端與用戶端的通訊

RequestHandlerClass 處理資料的解析,接收和發送;主要的業務邏輯

ServerClass

BaseServer 抽象基類

TCPServer 處理流式套接字

UnixStreamServer 處理本地處理流式套接字,隻适用UNIX平台

UDPServer 處理資料報套接字

UnixDatagramServer 處理本地處理資料報套接字,隻适用UNIX平台

RequestHandlerClass

BaseRequestHandler 處理基類

StreamRequestHandler 處理流式套接字

DatagramRequestHandler 處理資料報套接字

通過對通訊和資料處理的解耦,可以實作不同組合的伺服器用于處理不同的套接字,但這樣的伺服器,僅僅是同步處理請求,如果要實作異步處理請求,

還需使用 MixINClass

MixINClass

ForkingMixIn 用于擴充server,利用多程序實作異步,接收一個請求後,fork 一個程序進行響應處理

ThreadingMixIn 用于擴充server,利用多線程實作異常,接收一個請求後,使用一個線程進行響應處理

Server 類的繼承關系

+------------+
        | BaseServer |
        +------------+
              |
              v
        +-----------+        +------------------+
        | TCPServer |------->| UnixStreamServer |
        +-----------+        +------------------+
              |
              v
        +-----------+        +--------------------+
        | UDPServer |------->| UnixDatagramServer |
        +-----------+        +--------------------+           

UDPServer 繼承 TCPServer,TCPServer 繼承 BaseServer,UnixStreamServer 繼承自 TCPServer,UnixDatagramServer 繼承自 UDPServer。

if hasattr(socket, 'AF_UNIX'):

    class UnixStreamServer(TCPServer):
        address_family = socket.AF_UNIX

    class UnixDatagramServer(UDPServer):
        address_family = socket.AF_UNIX           

從源碼看,UnixStreamServer 跟 TCPServer,UnixDatagramServer 跟 UDPServer,僅僅是改變了 address_family。

if hasattr(os, "fork"):
    class ForkingUDPServer(ForkingMixIn, UDPServer): pass
    class ForkingTCPServer(ForkingMixIn, TCPServer): pass

class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
           

而多程序伺服器跟多線程伺服器,也僅僅是把 TCPServer、UDPServer 跟 ForkingMixIn、ThreadingMixIn 組合在一起而已。

RequestHandler 類的繼承關系

+--------------------+        +----------------------+
          | BaseRequestHandler |------->| StreamRequestHandler |
          +--------------------+        +----------------------+
                    |
                    v
        +------------------------+       
        | DatagramRequestHandler |
        +------------------------+            

StreamRequestHandler 跟 DatagramRequestHandler 是繼承 BaseRequestHandler,它們都重寫了 setup 跟 finish,本質上是用緩存對套接字提供服務。

調用流程

以 示例 來看調用流程

import socketserver

class MyTCPHandler(socketserver.BaseRequestHandler):

    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(1024).strip()
        print("{} wrote:".format(self.client_address[0]))
        print(self.data)
        # just send back the same data, but upper-cased
        self.request.sendall(self.data.upper())

if __name__ == "__main__":
    HOST, PORT = "localhost", 9999

    with socketserver.TCPServer((HOST, PORT), MyTCPHandler) as server:
        server.serve_forever()           

這裡定義了一個繼承自 BaseRequestHandler 的 MyTCPHandler,重寫了 handle 方法,然後主機、端口一同傳入 TCPServer。

class BaseServer:
    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

class TCPServer(BaseServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

 def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)
           

1.TCPServer 執行個體化,調用了自身 server_bind() 方法,綁定了傳入的 server_address 參數,而 server_activate() 方法是重寫自 BaseServer類,主要是請求隊列大小的設定。

2.啟動服務 ,調用 BaseServer 類的 serve_forever() 方法,用于不斷監聽請求,當有請求時調用 _handle_request_noblock() 方法完成請求處理,然後調用 service_actions() 處理後的操作;這個方法在 ForkingMixIn 類會被重寫,用于解決完成請求處理的程序。

class BaseServer:

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            # XXX: Consider using another file descriptor or connecting to the
            # socket to wake this up instead of polling. Polling reduces our
            # responsiveness to a shutdown request and wastes cpu at all other
            # times.
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)

                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    # bpo-35017: shutdown() called during select(), exit immediately.
                    if self.__shutdown_request:
                        break
                    if ready:
                        self._handle_request_noblock()

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()
           

_handle_request_noblock() 方法處理請求時,會調用下列方法

get_request() 方法在 TCPServer 類中定義,調用socket.accept() 方法,傳回 request 參數和 client_address 參數

verify_request(request, client_address) 方法在 BaseServer 類中定義,可以在子類中重寫做一些驗證處理

process_request(request, client_address) 方法請求處理函數,用于調用 finish_request() 方法,在 ForkingMixIn 類和 ThreadingMixIn 類中會被重寫,用程序或線程發起處理

finish_request(request, client_address) 方法具體的請求處理過程,這裡是自定義 MyTCPHandler 類執行個體化,該類繼承了 BaseRequestHandler 類,遵傳它的定義,當自己被執行個體化時,會調用 handle() 方法

shutdown_request(request) 方法在 TCPServer 類中定義,會先 request.shutdown(socket.SHUT_WR) ,出錯再調用 close_request(request) 方法做 request.close(),而在 UDPServer 類中,該函數會重寫 pass 掉

class BaseServer:
    def get_request(self):
        """Get the request and client address from the socket.

        May be overridden.

        """
        return self.socket.accept()     

    def _handle_request_noblock(self):
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)

class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False
    # If true, server_close() waits until all non-daemonic threads terminate.
    block_on_close = True
    # For non-daemonic threads, list of threading.Threading objects
    # used by server_close() to wait for all threads completion.
    _threads = None

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
        except Exception:
            self.handle_error(request, client_address)
        finally:
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        if not t.daemon and self.block_on_close:
            if self._threads is None:
                self._threads = []
            self._threads.append(t)
        t.start()

class BaseServer:
    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)  
           

RequestHandlerClass繼承自BaseRequestHandler,會執行BaseRequestHandler的init方法,調用init中的handle方法

class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
    constructor sets the instance variables request, client_address
    and server, and then calls the handle() method.  To implement a
    specific service, all you need to do is to derive a class which
    defines a handle() method.

    The handle() method can find the request as self.request, the
    client address as self.client_address, and the server (in case it
    needs access to per-server information) as self.server.  Since a
    separate instance is created for each request, the handle() method
    can define other arbitrary instance variables.

    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):  #
        pass

    def finish(self):
        pass           

3、關閉服務,要結束服務,可以手動調用 BaseServer 類中的 shutdown() 方法,示例說直接用 Ctrl-C

def shutdown(self):
        self.__shutdown_request = True
        self.__is_shut_down.wait()           

參考