天天看點

Python中的SocketServer子產品

Python中的SocketServer子產品

socket程式設計過于底層,程式設計雖然有套路,但是想要寫出健壯的代碼還是比較困難的,是以很多語言都對socket底層 API進行封裝,Python的封裝就是——socketserver子產品。它是網絡服務程式設計架構,便于企業級快速開發。

  1. 類的繼承關系
+------------+
| BaseServer |
+------------+
    |
    v
+-----------+        +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+        +------------------+
    |
    v
+-----------+        +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+        +--------------------+      
  1. SocketServer簡化了網絡伺服器的編寫
  • 它有4個同步類:
  1. TCPServer
  2. UDPServer
  3. UnixStreamServer
  4. UnixDatagramServer
  • 兩個Mixin類,用來支援異步。
  • ForkingMixIn
  • ThreadingMixIn
  • 組合得到
  • class ForkingUDPServer(ForkingMixIn,UDPServer):pass
  • class ForkingTCPServer(ForkingMixIn,TCPServer):pass
  • class ThreadingUDPServer(ThreadingMixIn,UDPServer):pass
  • class ThreadingTCPServer(ThreadingMixIn,TCPServer):pass
  • fork是建立多程序,thread是建立多線程。
  • fork需要作業系統支援,Windows不支援。
  • ThreadingUDPServer與ThreadingTCPServer類中的特有屬性:
  1. daemon_threads=False #預設值是False表示建立的線程都不是daemon線程,改為True表示建立的所有線程都是daemon線程
  2. block_on_close=False #預設值為Fasle,如果為True,可以設定為守護線程3.7版本可以用

程式設計接口

  1. socketserver.BaseServer(server_address,RequestHandlerClass) #執行個體化一個伺服器
  • server_address #伺服器綁定的位址資訊,是一個元組(ip,prost)
  • RequestHandlerClass #必須是BaseRequestHandler的一個子類。
  • 在BaseServer中原碼代碼如下:
def __init__(self, server_address, RequestHandlerClass):
    """Constructor.  May be extended, do not override."""
    self.server_address = server_address
    self.RequestHandlerClass = RequestHandlerClass
    self.__is_shut_down = threading.Event()
    self.__shutdown_request = False

# 處理請求的方法,會執行個體化一個RequestHandlerClass對象
def finish_request(self, request, client_address):
    """Finish one request by instantiating RequestHandlerClass."""
    self.RequestHandlerClass(request, client_address, self)      
  1. BaseServer中定義的接口常用屬性和方法
方法 含義
server_address 伺服器正在監聽的位址和端口,在不同協定格式不一樣。Internet協定上是一個元組(“127.0.0.1”,80)
socket 伺服器正在監聽的套接字對象。socket
request_queue_size 請求隊列的大小。如果處理單個請求需要很長時間,那麼在伺服器繁忙時到達的任何請求都會被放入隊列中,直到request_queue_size請求為止。一旦隊列滿了,來自客戶機的進一步請求将得到一個“連接配接被拒絕”錯誤。預設值通常是5,但是可以被子類覆寫。
address_family 伺服器套接字所屬的協定族。常見的例子是套接字。AF_INET socket.AF_UNIX。
socket_type 伺服器使用的套接字類型;套接字。SOCK_STREAM套接字。SOCK_DGRAM是兩個常見的值。
timeout 逾時持續時間,以秒為機關度量,如果不需要逾時,則為None。如果handle_request()在逾時期間沒有收到傳入的請求,則調用handle_timeout()方法。
handle_request()

處理單個請求,同步執行

這個函數按順序調用以下方法:get_request()、verify_request()和process_request()。如果處理程式類的使用者提供的handle()方法引發異常,将調用伺服器的handle_error()方法。如果在逾時秒内沒有收到任何請求,那麼将調用handle_timeout()并傳回handle_request()。

server_forever(poll_interval=0.5)

異步執行,處理請求。每隔poll_interval秒輪詢一次。

忽略timeout屬性,還會調用service_actions(),在ForkingMixIn的子類中定義,可以用來清理僵屍程序。

shutdown() 告訴serve_forever循環停止。并等待他結束。
server_close() 關閉伺服器
finish_request(request,client_address) 通過執行個體化RequestHandlerClass并調用它的handle()方法來處理請求
server_bind() 由伺服器的構造函數調用,以将套接字綁定到所需的位址。可能會被覆寫。
verify_request(request,client_address)

BaseRequestHandler類

  1. 是和使用者連接配接的使用者請求處理類的基類,
  2. ​BaseRequestHandler(request,client_address,server)​

    ​ #構造函數
  • request #是和用戶端的連接配接的socket對象
  • client_address #是用戶端位址
  • server #是TCPServer執行個體本身
  1. 服務端Server執行個體接收使用者請求後,最後會執行個體化這個類。它被初始化時,送入3個構造參數:request, client_address, server自身 以後就可以在BaseRequestHandler類的執行個體上使用以下屬性:
  • self.request是和用戶端的連接配接的socket對象
  • self.server是TCPServer執行個體本身
  • self.client_address是用戶端位址
  1. 這個類在初始化的時候,它會依次調用3個方法。子類可以覆寫這些方法。
class BaseRequestHandler:

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self): #每一個連接配接初始化
        pass

    def handle(self): #每一次請求處理
        pass

    def finish(self): #每一個連接配接清理
        pass      
  • 測試代碼
import socketserver
import socket

class MyBaseRequestHandle(socketserver.BaseRequestHandler):

    def setup(self):
        super().setup() #可以不調用父類的setup()方法,父類的setup方法什麼都沒做
        print("----setup方法被執行-----")

    def handle(self):
        super().handle() #可以不調用父類的handler(),方法,父類的handler方法什麼都沒做
        print("-------handler方法被執行----")
        print(self.server)
        print(self.request) #服務
        print(self.client_address) #用戶端位址
        print(self.__dict__)
        print("- "*30)
        print(self.server.__dict__)
        print("- "*30)
        sk:socket.socket = self.request
        data = sk.recv(1024)
        print(data)
        sk.send("{}-{}".format(sk.getpeername(),data).encode())

        print("----------handler end ----------")

    def finish(self):
        super().finish() #可以不調用父類的finish(),方法,父類的finish方法什麼都沒做
        print("--------finish方法被執行---")

laddr = "127.0.0.1",3999
tcpserver = socketserver.TCPServer(laddr,MyBaseRequestHandle) #注意:參數是MyBaseRequestHandle
tcpserver.handle_request() #隻接受一個用戶端連接配接
# tcpserver.serve_forever() #永久循環執行,可以接受多個用戶端連接配接      
  • 每個不同的連接配接上的請求過來後,生成這個連接配接的socket對象即self.request,用戶端位址是self.client_address。
  • 将ThreadingTCPServer換成TCPServer,當每個用戶端連接配接進來就會建立一個新的線程。
  • ThreadingTCPServer是異步的,可以同時處理多個連接配接。
  • TCPServer是同步的,一個連接配接處理完了,即一個連接配接的handle方法執行完了,才能處理另一個連接配接,且隻有主線程。

總結

  • 建立伺服器需要幾個步驟:
  1. 從BaseRequestHandler類派生出子類,并覆寫其handle()方法來建立請求處理程式類,此方法将處理傳入請求
  2. 執行個體化一個伺服器類,傳參伺服器的位址和請求處理類
  3. 調用伺服器執行個體的handle_request()或serve_forever()方法
  4. 調用server_close()關閉套接字

實作EchoServer

  • Echo:來聲明消息,回顯什麼消息,即用戶端發來什麼消息,傳回什麼消息
import logging
import sys
import socketserver
import socket
import threading

logging.basicConfig(format="%(asctime)s %(thread)d %(threadName)s %(message)s",stream=sys.stdout,level=logging.INFO)

class Handler(socketserver.BaseRequestHandler):
    def setup(self):
        super().setup()
        self.event = threading.Event()
        logging.info("新加入了一個連接配接{}".format(self.client_address))

    def handle(self):
        super().handle()
        sk:socket.socket = self.request
        while not self.event.is_set():
            try:
                data = sk.recv(1024).decode()
            except Exception as e:
                logging.info(e)
                break

            logging.info(data)
            msg = "{}-{}".format(self.client_address,data).encode()
            sk.send(msg)

    def finish(self):
        super().finish()
        self.event.set()
        self.request.close()

if __name__ == "__main__":
    server = socketserver.ThreadingTCPServer(("127.0.0.1",3999),Handler)
    threading.Thread(target=server.serve_forever,name="server").start()
    while True:
        cmd = input(">>>")
        if cmd.strip() == "quit":
            server.server_close()
            break
        logging.info(threading.enumerate())      

實戰,使用socketserver實作群聊的server

  • 使用ThreadingTCPServer改寫ChatServer
  • 使用BaseRequestHandler定義Handler
import logging
import sys
import socketserver
import socket
import threading

logging.basicConfig(format="%(asctime)s %(thread)d %(threadName)s %(message)s",stream=sys.stdout,level=logging.INFO)
log = logging.getLogger()

class Handler(socketserver.BaseRequestHandler):
    lock = threading.Lock()
    clients = {}

    def setup(self):
        super().setup()
        self.event = threading.Event()
        with self.lock:
            self.clients[self.client_address] = self.request
        log.info("新加入了一個連接配接{}".format(self.client_address))

    def handle(self):
        super().handle()
        sock:socket.socket = self.request
        while not self.event.is_set():
            try:
                data = sock.recv(1024)
            except Exception as e:
                log.error(e)
                data = b""
            log.info(data)
            if data == b"by" or data == b"":
                break
            msg = "service:{}-->{}".format(self.client_address, data).encode()
            expc = []  # 記錄sock出錯時對應的clients
            with self.lock:
                for c, sk in self.clients.items():
                    try:
                        sk.send(msg)  # 可能在發送消息是就出錯
                    except:
                        expc.append(c)
                for c in expc:
                    self.clients.pop(c)

    def finish(self):
        super().finish()
        self.event.set()
        with self.lock:
            if self.client_address in self.clients:
                self.clients.pop(self.client_address)
        self.request.close()
        log.info("{}退出了".format(self.client_address))

if __name__ == "__main__":
    server = socketserver.ThreadingTCPServer(("127.0.0.1",3999),Handler)
    server.daemon_threads = True  #設定所有建立的線程都為Daemo線程
    threading.Thread(target=server.serve_forever,name="server",daemon=True).start()
    while True:
        cmd = input(">>>")
        if cmd.strip() == "quit":
            server.shutdown() #告訴serve_forever循環停止。
            server.server_close()
            break
        logging.info(threading.enumerate())      
  • 使用StreamRequestHandler定義handler
import logging
import sys
import socketserver
import socket
import threading

logging.basicConfig(format="%(asctime)s %(thread)d %(threadName)s %(message)s",stream=sys.stdout,level=logging.INFO)
log = logging.getLogger()

class Handler(socketserver.StreamRequestHandler):
    lock = threading.Lock()
    clients = {}

    def setup(self):
        super().setup()
        self.event = threading.Event()
        with self.lock:
            self.clients[self.client_address] = self.request
        log.info("新加入了一個連接配接{}".format(self.client_address))

    def handle(self):
        super().handle()
        import io
        rfile:io.TextIOWrapper= self.rfile
        while not self.event.is_set():
            try:
                data = rfile.read1(1024) #類似于sock.recv(1024)
                # data = rfile.readline() #行讀取
            except Exception as e:
                log.error(e)
                data = b""
            log.info(data)
            if data == b"by" or data == b"":
                break
            msg = "service:{}-->{}".format(self.client_address, data).encode()
            expc = []  # 記錄sock出錯時對應的clients
            with self.lock:
                for c, sk in self.clients.items():
                    try:
                        sk.send(msg)  # 可能在發送消息是就出錯
                    except:
                        expc.append(c)
                for c in expc:
                    self.clients.pop(c)

    def finish(self):
        super().finish()
        self.event.set()
        with self.lock:
            if self.client_address in self.clients:
                self.clients.pop(self.client_address)
        self.request.close()
        log.info("{}退出了".format(self.client_address))

if __name__ == "__main__":
    server = socketserver.ThreadingTCPServer(("127.0.0.1",3999),Handler)
    server.daemon_threads = True  #設定所有建立的線程都為Daemo線程
    threading.Thread(target=server.serve_forever,name="server",daemon=True).start()
    while True:
        cmd = input(">>>")
        if cmd.strip() == "quit":
            server.shutdown() #告訴serve_forever循環停止。
            server.server_close()
            break
        logging.info(threading.enumerate())      
  • 總結
  1. 為每一個連接配接提供RequestHandlerClass類執行個體,依次調用setup、handle、finish方法,且使用了try…finally結構 保證finish方法一定能被調用。這些方法依次執行完成,如果想維持這個連接配接和用戶端通信,就需要在handle函數 中使用循環。
  2. socketserver子產品提供的不同的類,但是程式設計接口是一樣的,即使是多程序、多線程的類也是一樣,大大減少了編 程的難度。
  3. 将socket程式設計簡化,隻需要程式員關注資料處理本身,實作Handler類就行了。這種風格在Python十分常見。

繼續閱讀