天天看点

socketserver TCPServer调用过程以及源码分析ServerClassRequestHandlerClassMixINClassServer 类的继承关系RequestHandler 类的继承关系调用流程参考

socketserver框架主要是使用 ServerClass 跟 RequestHandlerClass 两大类。

ServerClass 处理服务端与客户端的通讯

RequestHandlerClass 处理数据的解析,接收和发送;主要的业务逻辑

ServerClass

BaseServer 抽象基类

TCPServer 处理流式套接字

UnixStreamServer 处理本地处理流式套接字,只适用UNIX平台

UDPServer 处理数据报套接字

UnixDatagramServer 处理本地处理数据报套接字,只适用UNIX平台

RequestHandlerClass

BaseRequestHandler 处理基类

StreamRequestHandler 处理流式套接字

DatagramRequestHandler 处理数据报套接字

通过对通讯和数据处理的解耦,可以实现不同组合的服务器用于处理不同的套接字,但这样的服务器,仅仅是同步处理请求,如果要实现异步处理请求,

还需使用 MixINClass

MixINClass

ForkingMixIn 用于扩展server,利用多进程实现异步,接收一个请求后,fork 一个进程进行响应处理

ThreadingMixIn 用于扩展server,利用多线程实现异常,接收一个请求后,使用一个线程进行响应处理

Server 类的继承关系

+------------+
        | BaseServer |
        +------------+
              |
              v
        +-----------+        +------------------+
        | TCPServer |------->| UnixStreamServer |
        +-----------+        +------------------+
              |
              v
        +-----------+        +--------------------+
        | UDPServer |------->| UnixDatagramServer |
        +-----------+        +--------------------+           

UDPServer 继承 TCPServer,TCPServer 继承 BaseServer,UnixStreamServer 继承自 TCPServer,UnixDatagramServer 继承自 UDPServer。

if hasattr(socket, 'AF_UNIX'):

    class UnixStreamServer(TCPServer):
        address_family = socket.AF_UNIX

    class UnixDatagramServer(UDPServer):
        address_family = socket.AF_UNIX           

从源码看,UnixStreamServer 跟 TCPServer,UnixDatagramServer 跟 UDPServer,仅仅是改变了 address_family。

if hasattr(os, "fork"):
    class ForkingUDPServer(ForkingMixIn, UDPServer): pass
    class ForkingTCPServer(ForkingMixIn, TCPServer): pass

class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
           

而多进程服务器跟多线程服务器,也仅仅是把 TCPServer、UDPServer 跟 ForkingMixIn、ThreadingMixIn 组合在一起而已。

RequestHandler 类的继承关系

+--------------------+        +----------------------+
          | BaseRequestHandler |------->| StreamRequestHandler |
          +--------------------+        +----------------------+
                    |
                    v
        +------------------------+       
        | DatagramRequestHandler |
        +------------------------+            

StreamRequestHandler 跟 DatagramRequestHandler 是继承 BaseRequestHandler,它们都重写了 setup 跟 finish,本质上是用缓存对套接字提供服务。

调用流程

以 示例 来看调用流程

import socketserver

class MyTCPHandler(socketserver.BaseRequestHandler):

    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(1024).strip()
        print("{} wrote:".format(self.client_address[0]))
        print(self.data)
        # just send back the same data, but upper-cased
        self.request.sendall(self.data.upper())

if __name__ == "__main__":
    HOST, PORT = "localhost", 9999

    with socketserver.TCPServer((HOST, PORT), MyTCPHandler) as server:
        server.serve_forever()           

这里定义了一个继承自 BaseRequestHandler 的 MyTCPHandler,重写了 handle 方法,然后主机、端口一同传入 TCPServer。

class BaseServer:
    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

class TCPServer(BaseServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
        """Constructor.  May be extended, do not override."""
        BaseServer.__init__(self, server_address, RequestHandlerClass)
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if bind_and_activate:
            try:
                self.server_bind()
                self.server_activate()
            except:
                self.server_close()
                raise

 def server_bind(self):
        """Called by constructor to bind the socket.

        May be overridden.

        """
        if self.allow_reuse_address:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.server_address)
        self.server_address = self.socket.getsockname()

def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        self.socket.listen(self.request_queue_size)
           

1.TCPServer 实例化,调用了自身 server_bind() 方法,绑定了传入的 server_address 参数,而 server_activate() 方法是重写自 BaseServer类,主要是请求队列大小的设置。

2.启动服务 ,调用 BaseServer 类的 serve_forever() 方法,用于不断监听请求,当有请求时调用 _handle_request_noblock() 方法完成请求处理,然后调用 service_actions() 处理后的操作;这个方法在 ForkingMixIn 类会被重写,用于解决完成请求处理的进程。

class BaseServer:

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            # XXX: Consider using another file descriptor or connecting to the
            # socket to wake this up instead of polling. Polling reduces our
            # responsiveness to a shutdown request and wastes cpu at all other
            # times.
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)

                while not self.__shutdown_request:
                    ready = selector.select(poll_interval)
                    # bpo-35017: shutdown() called during select(), exit immediately.
                    if self.__shutdown_request:
                        break
                    if ready:
                        self._handle_request_noblock()

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()
           

_handle_request_noblock() 方法处理请求时,会调用下列方法

get_request() 方法在 TCPServer 类中定义,调用socket.accept() 方法,返回 request 参数和 client_address 参数

verify_request(request, client_address) 方法在 BaseServer 类中定义,可以在子类中重写做一些验证处理

process_request(request, client_address) 方法请求处理函数,用于调用 finish_request() 方法,在 ForkingMixIn 类和 ThreadingMixIn 类中会被重写,用进程或线程发起处理

finish_request(request, client_address) 方法具体的请求处理过程,这里是自定义 MyTCPHandler 类实例化,该类继承了 BaseRequestHandler 类,遵传它的定义,当自己被实例化时,会调用 handle() 方法

shutdown_request(request) 方法在 TCPServer 类中定义,会先 request.shutdown(socket.SHUT_WR) ,出错再调用 close_request(request) 方法做 request.close(),而在 UDPServer 类中,该函数会重写 pass 掉

class BaseServer:
    def get_request(self):
        """Get the request and client address from the socket.

        May be overridden.

        """
        return self.socket.accept()     

    def _handle_request_noblock(self):
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)

class ThreadingMixIn:
    """Mix-in class to handle each request in a new thread."""

    # Decides how threads will act upon termination of the
    # main process
    daemon_threads = False
    # If true, server_close() waits until all non-daemonic threads terminate.
    block_on_close = True
    # For non-daemonic threads, list of threading.Threading objects
    # used by server_close() to wait for all threads completion.
    _threads = None

    def process_request_thread(self, request, client_address):
        """Same as in BaseServer but as a thread.

        In addition, exception handling is done here.

        """
        try:
            self.finish_request(request, client_address)
        except Exception:
            self.handle_error(request, client_address)
        finally:
            self.shutdown_request(request)

    def process_request(self, request, client_address):
        """Start a new thread to process the request."""
        t = threading.Thread(target = self.process_request_thread,
                             args = (request, client_address))
        t.daemon = self.daemon_threads
        if not t.daemon and self.block_on_close:
            if self._threads is None:
                self._threads = []
            self._threads.append(t)
        t.start()

class BaseServer:
    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)  
           

RequestHandlerClass继承自BaseRequestHandler,会执行BaseRequestHandler的init方法,调用init中的handle方法

class BaseRequestHandler:

    """Base class for request handler classes.

    This class is instantiated for each request to be handled.  The
    constructor sets the instance variables request, client_address
    and server, and then calls the handle() method.  To implement a
    specific service, all you need to do is to derive a class which
    defines a handle() method.

    The handle() method can find the request as self.request, the
    client address as self.client_address, and the server (in case it
    needs access to per-server information) as self.server.  Since a
    separate instance is created for each request, the handle() method
    can define other arbitrary instance variables.

    """

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):  #
        pass

    def finish(self):
        pass           

3、关闭服务,要结束服务,可以手动调用 BaseServer 类中的 shutdown() 方法,示例说直接用 Ctrl-C

def shutdown(self):
        self.__shutdown_request = True
        self.__is_shut_down.wait()           

参考