socketserver框架主要是使用 ServerClass 跟 RequestHandlerClass 两大类。
ServerClass 处理服务端与客户端的通讯
RequestHandlerClass 处理数据的解析,接收和发送;主要的业务逻辑
ServerClass
BaseServer 抽象基类
TCPServer 处理流式套接字
UnixStreamServer 处理本地处理流式套接字,只适用UNIX平台
UDPServer 处理数据报套接字
UnixDatagramServer 处理本地处理数据报套接字,只适用UNIX平台
RequestHandlerClass
BaseRequestHandler 处理基类
StreamRequestHandler 处理流式套接字
DatagramRequestHandler 处理数据报套接字
通过对通讯和数据处理的解耦,可以实现不同组合的服务器用于处理不同的套接字,但这样的服务器,仅仅是同步处理请求,如果要实现异步处理请求,
还需使用 MixINClass
MixINClass
ForkingMixIn 用于扩展server,利用多进程实现异步,接收一个请求后,fork 一个进程进行响应处理
ThreadingMixIn 用于扩展server,利用多线程实现异常,接收一个请求后,使用一个线程进行响应处理
Server 类的继承关系
+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+
UDPServer 继承 TCPServer,TCPServer 继承 BaseServer,UnixStreamServer 继承自 TCPServer,UnixDatagramServer 继承自 UDPServer。
if hasattr(socket, 'AF_UNIX'):
class UnixStreamServer(TCPServer):
address_family = socket.AF_UNIX
class UnixDatagramServer(UDPServer):
address_family = socket.AF_UNIX
从源码看,UnixStreamServer 跟 TCPServer,UnixDatagramServer 跟 UDPServer,仅仅是改变了 address_family。
if hasattr(os, "fork"):
class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
而多进程服务器跟多线程服务器,也仅仅是把 TCPServer、UDPServer 跟 ForkingMixIn、ThreadingMixIn 组合在一起而已。
RequestHandler 类的继承关系
+--------------------+ +----------------------+
| BaseRequestHandler |------->| StreamRequestHandler |
+--------------------+ +----------------------+
|
v
+------------------------+
| DatagramRequestHandler |
+------------------------+
StreamRequestHandler 跟 DatagramRequestHandler 是继承 BaseRequestHandler,它们都重写了 setup 跟 finish,本质上是用缓存对套接字提供服务。
调用流程
以 示例 来看调用流程
import socketserver
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print("{} wrote:".format(self.client_address[0]))
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())
if __name__ == "__main__":
HOST, PORT = "localhost", 9999
with socketserver.TCPServer((HOST, PORT), MyTCPHandler) as server:
server.serve_forever()
这里定义了一个继承自 BaseRequestHandler 的 MyTCPHandler,重写了 handle 方法,然后主机、端口一同传入 TCPServer。
class BaseServer:
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
class TCPServer(BaseServer):
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
def server_bind(self):
"""Called by constructor to bind the socket.
May be overridden.
"""
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
self.socket.listen(self.request_queue_size)
1.TCPServer 实例化,调用了自身 server_bind() 方法,绑定了传入的 server_address 参数,而 server_activate() 方法是重写自 BaseServer类,主要是请求队列大小的设置。
2.启动服务 ,调用 BaseServer 类的 serve_forever() 方法,用于不断监听请求,当有请求时调用 _handle_request_noblock() 方法完成请求处理,然后调用 service_actions() 处理后的操作;这个方法在 ForkingMixIn 类会被重写,用于解决完成请求处理的进程。
class BaseServer:
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
# XXX: Consider using another file descriptor or connecting to the
# socket to wake this up instead of polling. Polling reduces our
# responsiveness to a shutdown request and wastes cpu at all other
# times.
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
while not self.__shutdown_request:
ready = selector.select(poll_interval)
# bpo-35017: shutdown() called during select(), exit immediately.
if self.__shutdown_request:
break
if ready:
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
_handle_request_noblock() 方法处理请求时,会调用下列方法
get_request() 方法在 TCPServer 类中定义,调用socket.accept() 方法,返回 request 参数和 client_address 参数
verify_request(request, client_address) 方法在 BaseServer 类中定义,可以在子类中重写做一些验证处理
process_request(request, client_address) 方法请求处理函数,用于调用 finish_request() 方法,在 ForkingMixIn 类和 ThreadingMixIn 类中会被重写,用进程或线程发起处理
finish_request(request, client_address) 方法具体的请求处理过程,这里是自定义 MyTCPHandler 类实例化,该类继承了 BaseRequestHandler 类,遵传它的定义,当自己被实例化时,会调用 handle() 方法
shutdown_request(request) 方法在 TCPServer 类中定义,会先 request.shutdown(socket.SHUT_WR) ,出错再调用 close_request(request) 方法做 request.close(),而在 UDPServer 类中,该函数会重写 pass 掉
class BaseServer:
def get_request(self):
"""Get the request and client address from the socket.
May be overridden.
"""
return self.socket.accept()
def _handle_request_noblock(self):
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request)
class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread."""
# Decides how threads will act upon termination of the
# main process
daemon_threads = False
# If true, server_close() waits until all non-daemonic threads terminate.
block_on_close = True
# For non-daemonic threads, list of threading.Threading objects
# used by server_close() to wait for all threads completion.
_threads = None
def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request)
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
if not t.daemon and self.block_on_close:
if self._threads is None:
self._threads = []
self._threads.append(t)
t.start()
class BaseServer:
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)
RequestHandlerClass继承自BaseRequestHandler,会执行BaseRequestHandler的init方法,调用init中的handle方法
class BaseRequestHandler:
"""Base class for request handler classes.
This class is instantiated for each request to be handled. The
constructor sets the instance variables request, client_address
and server, and then calls the handle() method. To implement a
specific service, all you need to do is to derive a class which
defines a handle() method.
The handle() method can find the request as self.request, the
client address as self.client_address, and the server (in case it
needs access to per-server information) as self.server. Since a
separate instance is created for each request, the handle() method
can define other arbitrary instance variables.
"""
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self): #
pass
def finish(self):
pass
3、关闭服务,要结束服务,可以手动调用 BaseServer 类中的 shutdown() 方法,示例说直接用 Ctrl-C
def shutdown(self):
self.__shutdown_request = True
self.__is_shut_down.wait()