Python中的SocketServer子產品
socket程式設計過于底層,程式設計雖然有套路,但是想要寫出健壯的代碼還是比較困難的,是以很多語言都對socket底層 API進行封裝,Python的封裝就是——socketserver子產品。它是網絡服務程式設計架構,便于企業級快速開發。
- 類的繼承關系
+------------+
| BaseServer |
+------------+
|
v
+-----------+ +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+ +------------------+
|
v
+-----------+ +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+ +--------------------+
- SocketServer簡化了網絡伺服器的編寫
- 它有4個同步類:
- TCPServer
- UDPServer
- UnixStreamServer
- UnixDatagramServer
- 兩個Mixin類,用來支援異步。
- ForkingMixIn
- ThreadingMixIn
- 組合得到
- class ForkingUDPServer(ForkingMixIn,UDPServer):pass
- class ForkingTCPServer(ForkingMixIn,TCPServer):pass
- class ThreadingUDPServer(ThreadingMixIn,UDPServer):pass
- class ThreadingTCPServer(ThreadingMixIn,TCPServer):pass
- fork是建立多程序,thread是建立多線程。
- fork需要作業系統支援,Windows不支援。
- ThreadingUDPServer與ThreadingTCPServer類中的特有屬性:
- daemon_threads=False #預設值是False表示建立的線程都不是daemon線程,改為True表示建立的所有線程都是daemon線程
- block_on_close=False #預設值為Fasle,如果為True,可以設定為守護線程3.7版本可以用
程式設計接口
- socketserver.BaseServer(server_address,RequestHandlerClass) #執行個體化一個伺服器
- server_address #伺服器綁定的位址資訊,是一個元組(ip,prost)
- RequestHandlerClass #必須是BaseRequestHandler的一個子類。
- 在BaseServer中原碼代碼如下:
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
# 處理請求的方法,會執行個體化一個RequestHandlerClass對象
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)
- BaseServer中定義的接口常用屬性和方法
方法 | 含義 |
server_address | 伺服器正在監聽的位址和端口,在不同協定格式不一樣。Internet協定上是一個元組(“127.0.0.1”,80) |
socket | 伺服器正在監聽的套接字對象。socket |
request_queue_size | 請求隊列的大小。如果處理單個請求需要很長時間,那麼在伺服器繁忙時到達的任何請求都會被放入隊列中,直到request_queue_size請求為止。一旦隊列滿了,來自客戶機的進一步請求将得到一個“連接配接被拒絕”錯誤。預設值通常是5,但是可以被子類覆寫。 |
address_family | 伺服器套接字所屬的協定族。常見的例子是套接字。AF_INET socket.AF_UNIX。 |
socket_type | 伺服器使用的套接字類型;套接字。SOCK_STREAM套接字。SOCK_DGRAM是兩個常見的值。 |
timeout | 逾時持續時間,以秒為機關度量,如果不需要逾時,則為None。如果handle_request()在逾時期間沒有收到傳入的請求,則調用handle_timeout()方法。 |
handle_request() | 處理單個請求,同步執行 這個函數按順序調用以下方法:get_request()、verify_request()和process_request()。如果處理程式類的使用者提供的handle()方法引發異常,将調用伺服器的handle_error()方法。如果在逾時秒内沒有收到任何請求,那麼将調用handle_timeout()并傳回handle_request()。 |
server_forever(poll_interval=0.5) | 異步執行,處理請求。每隔poll_interval秒輪詢一次。 忽略timeout屬性,還會調用service_actions(),在ForkingMixIn的子類中定義,可以用來清理僵屍程序。 |
shutdown() | 告訴serve_forever循環停止。并等待他結束。 |
server_close() | 關閉伺服器 |
finish_request(request,client_address) | 通過執行個體化RequestHandlerClass并調用它的handle()方法來處理請求 |
server_bind() | 由伺服器的構造函數調用,以将套接字綁定到所需的位址。可能會被覆寫。 |
verify_request(request,client_address) |
BaseRequestHandler類
- 是和使用者連接配接的使用者請求處理類的基類,
-
#構造函數BaseRequestHandler(request,client_address,server)
- request #是和用戶端的連接配接的socket對象
- client_address #是用戶端位址
- server #是TCPServer執行個體本身
- 服務端Server執行個體接收使用者請求後,最後會執行個體化這個類。它被初始化時,送入3個構造參數:request, client_address, server自身 以後就可以在BaseRequestHandler類的執行個體上使用以下屬性:
- self.request是和用戶端的連接配接的socket對象
- self.server是TCPServer執行個體本身
- self.client_address是用戶端位址
- 這個類在初始化的時候,它會依次調用3個方法。子類可以覆寫這些方法。
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self): #每一個連接配接初始化
pass
def handle(self): #每一次請求處理
pass
def finish(self): #每一個連接配接清理
pass
- 測試代碼
import socketserver
import socket
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
def setup(self):
super().setup() #可以不調用父類的setup()方法,父類的setup方法什麼都沒做
print("----setup方法被執行-----")
def handle(self):
super().handle() #可以不調用父類的handler(),方法,父類的handler方法什麼都沒做
print("-------handler方法被執行----")
print(self.server)
print(self.request) #服務
print(self.client_address) #用戶端位址
print(self.__dict__)
print("- "*30)
print(self.server.__dict__)
print("- "*30)
sk:socket.socket = self.request
data = sk.recv(1024)
print(data)
sk.send("{}-{}".format(sk.getpeername(),data).encode())
print("----------handler end ----------")
def finish(self):
super().finish() #可以不調用父類的finish(),方法,父類的finish方法什麼都沒做
print("--------finish方法被執行---")
laddr = "127.0.0.1",3999
tcpserver = socketserver.TCPServer(laddr,MyBaseRequestHandle) #注意:參數是MyBaseRequestHandle
tcpserver.handle_request() #隻接受一個用戶端連接配接
# tcpserver.serve_forever() #永久循環執行,可以接受多個用戶端連接配接
- 每個不同的連接配接上的請求過來後,生成這個連接配接的socket對象即self.request,用戶端位址是self.client_address。
- 将ThreadingTCPServer換成TCPServer,當每個用戶端連接配接進來就會建立一個新的線程。
- ThreadingTCPServer是異步的,可以同時處理多個連接配接。
- TCPServer是同步的,一個連接配接處理完了,即一個連接配接的handle方法執行完了,才能處理另一個連接配接,且隻有主線程。
總結
- 建立伺服器需要幾個步驟:
- 從BaseRequestHandler類派生出子類,并覆寫其handle()方法來建立請求處理程式類,此方法将處理傳入請求
- 執行個體化一個伺服器類,傳參伺服器的位址和請求處理類
- 調用伺服器執行個體的handle_request()或serve_forever()方法
- 調用server_close()關閉套接字
實作EchoServer
- Echo:來聲明消息,回顯什麼消息,即用戶端發來什麼消息,傳回什麼消息
import logging
import sys
import socketserver
import socket
import threading
logging.basicConfig(format="%(asctime)s %(thread)d %(threadName)s %(message)s",stream=sys.stdout,level=logging.INFO)
class Handler(socketserver.BaseRequestHandler):
def setup(self):
super().setup()
self.event = threading.Event()
logging.info("新加入了一個連接配接{}".format(self.client_address))
def handle(self):
super().handle()
sk:socket.socket = self.request
while not self.event.is_set():
try:
data = sk.recv(1024).decode()
except Exception as e:
logging.info(e)
break
logging.info(data)
msg = "{}-{}".format(self.client_address,data).encode()
sk.send(msg)
def finish(self):
super().finish()
self.event.set()
self.request.close()
if __name__ == "__main__":
server = socketserver.ThreadingTCPServer(("127.0.0.1",3999),Handler)
threading.Thread(target=server.serve_forever,name="server").start()
while True:
cmd = input(">>>")
if cmd.strip() == "quit":
server.server_close()
break
logging.info(threading.enumerate())
實戰,使用socketserver實作群聊的server
- 使用ThreadingTCPServer改寫ChatServer
- 使用BaseRequestHandler定義Handler
import logging
import sys
import socketserver
import socket
import threading
logging.basicConfig(format="%(asctime)s %(thread)d %(threadName)s %(message)s",stream=sys.stdout,level=logging.INFO)
log = logging.getLogger()
class Handler(socketserver.BaseRequestHandler):
lock = threading.Lock()
clients = {}
def setup(self):
super().setup()
self.event = threading.Event()
with self.lock:
self.clients[self.client_address] = self.request
log.info("新加入了一個連接配接{}".format(self.client_address))
def handle(self):
super().handle()
sock:socket.socket = self.request
while not self.event.is_set():
try:
data = sock.recv(1024)
except Exception as e:
log.error(e)
data = b""
log.info(data)
if data == b"by" or data == b"":
break
msg = "service:{}-->{}".format(self.client_address, data).encode()
expc = [] # 記錄sock出錯時對應的clients
with self.lock:
for c, sk in self.clients.items():
try:
sk.send(msg) # 可能在發送消息是就出錯
except:
expc.append(c)
for c in expc:
self.clients.pop(c)
def finish(self):
super().finish()
self.event.set()
with self.lock:
if self.client_address in self.clients:
self.clients.pop(self.client_address)
self.request.close()
log.info("{}退出了".format(self.client_address))
if __name__ == "__main__":
server = socketserver.ThreadingTCPServer(("127.0.0.1",3999),Handler)
server.daemon_threads = True #設定所有建立的線程都為Daemo線程
threading.Thread(target=server.serve_forever,name="server",daemon=True).start()
while True:
cmd = input(">>>")
if cmd.strip() == "quit":
server.shutdown() #告訴serve_forever循環停止。
server.server_close()
break
logging.info(threading.enumerate())
- 使用StreamRequestHandler定義handler
import logging
import sys
import socketserver
import socket
import threading
logging.basicConfig(format="%(asctime)s %(thread)d %(threadName)s %(message)s",stream=sys.stdout,level=logging.INFO)
log = logging.getLogger()
class Handler(socketserver.StreamRequestHandler):
lock = threading.Lock()
clients = {}
def setup(self):
super().setup()
self.event = threading.Event()
with self.lock:
self.clients[self.client_address] = self.request
log.info("新加入了一個連接配接{}".format(self.client_address))
def handle(self):
super().handle()
import io
rfile:io.TextIOWrapper= self.rfile
while not self.event.is_set():
try:
data = rfile.read1(1024) #類似于sock.recv(1024)
# data = rfile.readline() #行讀取
except Exception as e:
log.error(e)
data = b""
log.info(data)
if data == b"by" or data == b"":
break
msg = "service:{}-->{}".format(self.client_address, data).encode()
expc = [] # 記錄sock出錯時對應的clients
with self.lock:
for c, sk in self.clients.items():
try:
sk.send(msg) # 可能在發送消息是就出錯
except:
expc.append(c)
for c in expc:
self.clients.pop(c)
def finish(self):
super().finish()
self.event.set()
with self.lock:
if self.client_address in self.clients:
self.clients.pop(self.client_address)
self.request.close()
log.info("{}退出了".format(self.client_address))
if __name__ == "__main__":
server = socketserver.ThreadingTCPServer(("127.0.0.1",3999),Handler)
server.daemon_threads = True #設定所有建立的線程都為Daemo線程
threading.Thread(target=server.serve_forever,name="server",daemon=True).start()
while True:
cmd = input(">>>")
if cmd.strip() == "quit":
server.shutdown() #告訴serve_forever循環停止。
server.server_close()
break
logging.info(threading.enumerate())
- 總結
- 為每一個連接配接提供RequestHandlerClass類執行個體,依次調用setup、handle、finish方法,且使用了try…finally結構 保證finish方法一定能被調用。這些方法依次執行完成,如果想維持這個連接配接和用戶端通信,就需要在handle函數 中使用循環。
- socketserver子產品提供的不同的類,但是程式設計接口是一樣的,即使是多程序、多線程的類也是一樣,大大減少了編 程的難度。
- 将socket程式設計簡化,隻需要程式員關注資料處理本身,實作Handler類就行了。這種風格在Python十分常見。