socketserver架構主要是使用 ServerClass 跟 RequestHandlerClass 兩大類。
ServerClass 處理服務端與用戶端的通訊
RequestHandlerClass 處理資料的解析,接收和發送;主要的業務邏輯
ServerClass
BaseServer 抽象基類
TCPServer 處理流式套接字
UnixStreamServer 處理本地處理流式套接字,隻适用UNIX平台
UDPServer 處理資料報套接字
UnixDatagramServer 處理本地處理資料報套接字,隻适用UNIX平台
RequestHandlerClass
BaseRequestHandler 處理基類
StreamRequestHandler 處理流式套接字
DatagramRequestHandler 處理資料報套接字
通過對通訊和資料處理的解耦,可以實作不同組合的伺服器用于處理不同的套接字,但這樣的伺服器,僅僅是同步處理請求,如果要實作異步處理請求,
還需使用 MixINClass
MixINClass
ForkingMixIn 用于擴充server,利用多程序實作異步,接收一個請求後,fork 一個程序進行響應處理
ThreadingMixIn 用于擴充server,利用多線程實作異常,接收一個請求後,使用一個線程進行響應處理
Server 類的繼承關系
+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+
UDPServer 繼承 TCPServer,TCPServer 繼承 BaseServer,UnixStreamServer 繼承自 TCPServer,UnixDatagramServer 繼承自 UDPServer。
if hasattr(socket, 'AF_UNIX'):
class UnixStreamServer(TCPServer):
address_family = socket.AF_UNIX
class UnixDatagramServer(UDPServer):
address_family = socket.AF_UNIX
從源碼看,UnixStreamServer 跟 TCPServer,UnixDatagramServer 跟 UDPServer,僅僅是改變了 address_family。
if hasattr(os, "fork"):
class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
而多程序伺服器跟多線程伺服器,也僅僅是把 TCPServer、UDPServer 跟 ForkingMixIn、ThreadingMixIn 組合在一起而已。
RequestHandler 類的繼承關系
+--------------------+ +----------------------+
| BaseRequestHandler |------->| StreamRequestHandler |
+--------------------+ +----------------------+
|
v
+------------------------+
| DatagramRequestHandler |
+------------------------+
StreamRequestHandler 跟 DatagramRequestHandler 是繼承 BaseRequestHandler,它們都重寫了 setup 跟 finish,本質上是用緩存對套接字提供服務。
調用流程
以 示例 來看調用流程
import socketserver
class MyTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
print("{} wrote:".format(self.client_address[0]))
print(self.data)
# just send back the same data, but upper-cased
self.request.sendall(self.data.upper())
if __name__ == "__main__":
HOST, PORT = "localhost", 9999
with socketserver.TCPServer((HOST, PORT), MyTCPHandler) as server:
server.serve_forever()
這裡定義了一個繼承自 BaseRequestHandler 的 MyTCPHandler,重寫了 handle 方法,然後主機、端口一同傳入 TCPServer。
class BaseServer:
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
class TCPServer(BaseServer):
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
"""Constructor. May be extended, do not override."""
BaseServer.__init__(self, server_address, RequestHandlerClass)
self.socket = socket.socket(self.address_family,
self.socket_type)
if bind_and_activate:
try:
self.server_bind()
self.server_activate()
except:
self.server_close()
raise
def server_bind(self):
"""Called by constructor to bind the socket.
May be overridden.
"""
if self.allow_reuse_address:
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)
self.server_address = self.socket.getsockname()
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
self.socket.listen(self.request_queue_size)
1.TCPServer 執行個體化,調用了自身 server_bind() 方法,綁定了傳入的 server_address 參數,而 server_activate() 方法是重寫自 BaseServer類,主要是請求隊列大小的設定。
2.啟動服務 ,調用 BaseServer 類的 serve_forever() 方法,用于不斷監聽請求,當有請求時調用 _handle_request_noblock() 方法完成請求處理,然後調用 service_actions() 處理後的操作;這個方法在 ForkingMixIn 類會被重寫,用于解決完成請求處理的程序。
class BaseServer:
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
# XXX: Consider using another file descriptor or connecting to the
# socket to wake this up instead of polling. Polling reduces our
# responsiveness to a shutdown request and wastes cpu at all other
# times.
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
while not self.__shutdown_request:
ready = selector.select(poll_interval)
# bpo-35017: shutdown() called during select(), exit immediately.
if self.__shutdown_request:
break
if ready:
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
_handle_request_noblock() 方法處理請求時,會調用下列方法
get_request() 方法在 TCPServer 類中定義,調用socket.accept() 方法,傳回 request 參數和 client_address 參數
verify_request(request, client_address) 方法在 BaseServer 類中定義,可以在子類中重寫做一些驗證處理
process_request(request, client_address) 方法請求處理函數,用于調用 finish_request() 方法,在 ForkingMixIn 類和 ThreadingMixIn 類中會被重寫,用程序或線程發起處理
finish_request(request, client_address) 方法具體的請求處理過程,這裡是自定義 MyTCPHandler 類執行個體化,該類繼承了 BaseRequestHandler 類,遵傳它的定義,當自己被執行個體化時,會調用 handle() 方法
shutdown_request(request) 方法在 TCPServer 類中定義,會先 request.shutdown(socket.SHUT_WR) ,出錯再調用 close_request(request) 方法做 request.close(),而在 UDPServer 類中,該函數會重寫 pass 掉
class BaseServer:
def get_request(self):
"""Get the request and client address from the socket.
May be overridden.
"""
return self.socket.accept()
def _handle_request_noblock(self):
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request)
class ThreadingMixIn:
"""Mix-in class to handle each request in a new thread."""
# Decides how threads will act upon termination of the
# main process
daemon_threads = False
# If true, server_close() waits until all non-daemonic threads terminate.
block_on_close = True
# For non-daemonic threads, list of threading.Threading objects
# used by server_close() to wait for all threads completion.
_threads = None
def process_request_thread(self, request, client_address):
"""Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
try:
self.finish_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
finally:
self.shutdown_request(request)
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
if not t.daemon and self.block_on_close:
if self._threads is None:
self._threads = []
self._threads.append(t)
t.start()
class BaseServer:
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)
RequestHandlerClass繼承自BaseRequestHandler,會執行BaseRequestHandler的init方法,調用init中的handle方法
class BaseRequestHandler:
"""Base class for request handler classes.
This class is instantiated for each request to be handled. The
constructor sets the instance variables request, client_address
and server, and then calls the handle() method. To implement a
specific service, all you need to do is to derive a class which
defines a handle() method.
The handle() method can find the request as self.request, the
client address as self.client_address, and the server (in case it
needs access to per-server information) as self.server. Since a
separate instance is created for each request, the handle() method
can define other arbitrary instance variables.
"""
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self): #
pass
def finish(self):
pass
3、關閉服務,要結束服務,可以手動調用 BaseServer 類中的 shutdown() 方法,示例說直接用 Ctrl-C
def shutdown(self):
self.__shutdown_request = True
self.__is_shut_down.wait()