本文介紹Python程式設計的網絡程式設計進階(IO複用、Socketserver)
一、認證用戶端的連結合法性
如果你想在分布式系統中實作一個簡單的用戶端連結認證功能,又不像SSL那麼複雜,那麼利用hmac+加鹽的方式來實作。
服務端
from socket import *
import hmac,os
secret_key=b'linhaifeng bang bang bang'
def conn_auth(conn):
'''
認證用戶端連結
:param conn:
:return:
'''
print('開始驗證新連結的合法性')
msg=os.urandom(32)
conn.sendall(msg)
h=hmac.new(secret_key,msg)
digest=h.digest()
respone=conn.recv(len(digest))
return hmac.compare_digest(respone,digest)
def data_handler(conn,bufsize=1024):
if not conn_auth(conn):
print('該連結不合法,關閉')
conn.close()
return
print('連結合法,開始通信')
while True:
data=conn.recv(bufsize)
if not data:break
conn.sendall(data.upper())
def server_handler(ip_port,bufsize,backlog=5):
'''
隻處理連結
:param ip_port:
:return:
'''
tcp_socket_server=socket(AF_INET,SOCK_STREAM)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(backlog)
while True:
conn,addr=tcp_socket_server.accept()
print('新連接配接[%s:%s]' %(addr[0],addr[1]))
data_handler(conn,bufsize)
if __name__ == '__main__':
ip_port=('127.0.0.1',9999)
bufsize=1024
server_handler(ip_port,bufsize)
用戶端(合法)
from socket import *
import hmac,os
secret_key=b'linhaifeng bang bang bang'
def conn_auth(conn):
'''
驗證用戶端到伺服器的連結
:param conn:
:return:
'''
msg=conn.recv(32)
h=hmac.new(secret_key,msg)
digest=h.digest()
conn.sendall(digest)
def client_handler(ip_port,bufsize=1024):
tcp_socket_client=socket(AF_INET,SOCK_STREAM)
tcp_socket_client.connect(ip_port)
conn_auth(tcp_socket_client)
while True:
data=input('>>: ').strip()
if not data:continue
if data == 'quit':break
tcp_socket_client.sendall(data.encode('utf-8'))
respone=tcp_socket_client.recv(bufsize)
print(respone.decode('utf-8'))
tcp_socket_client.close()
if __name__ == '__main__':
ip_port=('127.0.0.1',9999)
bufsize=1024
client_handler(ip_port,bufsize)
用戶端(非法:不知道加密方式)
from socket import *
def client_handler(ip_port,bufsize=1024):
tcp_socket_client=socket(AF_INET,SOCK_STREAM)
tcp_socket_client.connect(ip_port)
while True:
data=input('>>: ').strip()
if not data:continue
if data == 'quit':break
tcp_socket_client.sendall(data.encode('utf-8'))
respone=tcp_socket_client.recv(bufsize)
print(respone.decode('utf-8'))
tcp_socket_client.close()
if __name__ == '__main__':
ip_port=('127.0.0.1',9999)
bufsize=1024
client_handler(ip_port,bufsize)
用戶端(非法:不知道secret_key)
from socket import *
import hmac,os
secret_key=b'linhaifeng bang bang bang1111'
def conn_auth(conn):
'''
驗證用戶端到伺服器的連結
:param conn:
:return:
'''
msg=conn.recv(32)
h=hmac.new(secret_key,msg)
digest=h.digest()
conn.sendall(digest)
def client_handler(ip_port,bufsize=1024):
tcp_socket_client=socket(AF_INET,SOCK_STREAM)
tcp_socket_client.connect(ip_port)
conn_auth(tcp_socket_client)
while True:
data=input('>>: ').strip()
if not data:continue
if data == 'quit':break
tcp_socket_client.sendall(data.encode('utf-8'))
respone=tcp_socket_client.recv(bufsize)
print(respone.decode('utf-8'))
tcp_socket_client.close()
if __name__ == '__main__':
ip_port=('127.0.0.1',9999)
bufsize=1024
client_handler(ip_port,bufsize)
二、IO多路複用
I/O多路複用指:通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程式進行相應的讀寫操作。
1.Linux
Linux中的 select,poll,epoll 都是IO多路複用的機制。
(1)select
select最早于1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個檔案描述符的數組,當select()傳回後,該數組中就緒的檔案描述符便會被核心修改标志位,使得程序可以獲得這些檔案描述符進而進行後續的讀寫操作。
select目前幾乎在所有的平台上支援,其良好跨平台支援也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。
select的一個缺點在于單個程序能夠監視的檔案描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯核心的方式提升這一限制。
另外,select()所維護的存儲大量檔案描述符的資料結構,随着檔案描述符數量的增大,其複制的開銷也線性增長。同時,由于網絡響應時間的延遲使得大量TCP連接配接處于非活躍狀态,但調用select()會對所有socket進行一次線性掃描,是以這也浪費了一定的開銷。
(2)poll
poll在1986年誕生于System V Release 3,它和select在本質上沒有多大差别,但是poll沒有最大檔案描述符數量的限制。
poll和select同樣存在一個缺點就是,包含大量檔案描述符的數組被整體複制于使用者态和核心的位址空間之間,而不論這些檔案描述符是否就緒,它的開銷随着檔案描述符數量的增加而線性增大。
另外,select()和poll()将就緒的檔案描述符告訴程序後,如果程序沒有對其進行IO操作,那麼下次調用select()和poll()的時候将再次報告這些檔案描述符,是以它們一般不會丢失就緒的消息,這種方式稱為水準觸發(Level Triggered)。
(3)epoll
直到Linux2.6才出現了由核心直接支援的實作方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。
epoll可以同時支援水準觸發和邊緣觸發(Edge Triggered,隻告訴程序哪些檔案描述符剛剛變為就緒狀态,它隻說一遍,如果我們沒有采取行動,那麼它将不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實作相當複雜。
epoll同樣隻告知那些就緒的檔案描述符,而且當我們調用epoll_wait()獲得就緒檔案描述符時,傳回的不是實際的描述符,而是一個代表就緒描述符數量的值,你隻需要去epoll指定的一個數組中依次取得相應數量的檔案描述符即可,這裡也使用了記憶體映射(mmap)技術,這樣便徹底省掉了這些檔案描述符在系統調用時複制的開銷。
另一個本質的改進在于epoll采用基于事件的就緒通知方式。在select/poll中,程序隻有在調用一定的方法後,核心才對所有監視的檔案描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個檔案描述符,一旦基于某個檔案描述符就緒時,核心會采用類似callback的回調機制,迅速激活這個檔案描述符,當程序調用epoll_wait()時便得到通知。
2.Python
Python中有一個select子產品,其中提供了:select、poll、epoll三個方法,分别調用系統的 select,poll,epoll 進而實作IO多路複用。
-
Windows Python:
提供: select
- Mac Python:
-
Linux Python:
提供: select、poll、epoll
注意:網絡操作、檔案操作、終端操作等均屬于IO操作,對于windows隻支援Socket操作,其他系統支援其他IO操作,但是無法檢測普通檔案操作,自動上次讀取是否已經變化。
3.select方法
句柄清單11, 句柄清單22, 句柄清單33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 逾時時間)
參數: 可接受四個參數(前三個必須)
傳回值:三個清單
select方法用來監視檔案句柄,如果句柄發生變化,則擷取該句柄。
- 當 參數1 序列中的句柄發生可讀時(accetp和read),則擷取發生變化的句柄并添加到 傳回值1 序列中
- 當 參數2 序列中含有句柄時,則将該序列中所有的句柄添加到 傳回值2 序列中
- 當 參數3 序列中的句柄發生錯誤時,則将該發生錯誤的句柄添加到 傳回值3 序列中
- 當 逾時時間 未設定,則select會一直阻塞,直到監聽的句柄發生變化
- 當 逾時時間 = 1時,那麼如果監聽的句柄均無任何變化,則select會阻塞 1 秒,之後傳回三個空清單,如果監聽的句柄有變化,則直接執行。
利用select監聽終端操作執行個體
import select
import threading
import sys
while True:
readable, writeable, error = select.select([sys.stdin,],[],[],1)
if sys.stdin in readable:
print 'select get stdin',sys.stdin.readline()
利用select實作僞同時處理多個Socket用戶端請求:服務端
import socket
import select
sk1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk1.bind(('127.0.0.1',8002))
sk1.listen(5)
sk1.setblocking(0)
inputs = [sk1,]
while True:
readable_list, writeable_list, error_list = select.select(inputs, [], inputs, 1)
for r in readable_list:
# 當用戶端第一次連接配接服務端時
if sk1 == r:
print 'accept'
request, address = r.accept()
request.setblocking(0)
inputs.append(request)
# 當用戶端連接配接上服務端之後,再次發送資料時
else:
received = r.recv(1024)
# 當正常接收用戶端發送的資料時
if received:
print 'received data:', received
# 當用戶端關閉程式時
else:
inputs.remove(r)
sk1.close()
利用select實作僞同時處理多個Socket用戶端請求:用戶端
import socket
ip_port = ('127.0.0.1',8002)
sk = socket.socket()
sk.connect(ip_port)
while True:
inp = raw_input('please input:')
sk.sendall(inp)
sk.close()
此處的Socket服務端相比與原生的Socket,他支援當某一個請求不再發送資料時,伺服器端不會等待而是可以去處理其他請求的資料。但是,如果每個請求的耗時比較長時,select版本的伺服器端也無法完成同時操作。
基于select實作socket服務端
'''
伺服器的實作 采用select的方式
'''
import select
import socket
import sys
import Queue
#建立套接字并設定該套接字為非阻塞模式
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(0)
#綁定套接字
server_address = ('localhost',10000)
print >>sys.stderr,'starting up on %s port %s'% server_address
server.bind(server_address)
#将該socket變成服務模式
#backlog等于5,表示核心已經接到了連接配接請求,但伺服器還沒有調用accept進行處理的連接配接個數最大為5
#這個值不能無限大,因為要在核心中維護連接配接隊列
server.listen(5)
#初始化讀取資料的監聽清單,最開始時希望從server這個套接字上讀取資料
inputs = [server]
#初始化寫入資料的監聽清單,最開始并沒有用戶端連接配接進來,是以清單為空
outputs = []
#要發往用戶端的資料
message_queues = {}
while inputs:
print >>sys.stderr,'waiting for the next event'
#調用select監聽所有監聽清單中的套接字,并将準備好的套接字加入到對應的清單中
readable,writable,exceptional = select.select(inputs,outputs,inputs)#清單中的socket 套接字 如果是檔案呢?
#監控檔案句柄有某一處發生了變化 可寫 可讀 異常屬于Linux中的網絡程式設計
#屬于同步I/O操作,屬于I/O複用模型的一種
#rlist--等待到準備好讀
#wlist--等待到準備好寫
#xlist--等待到一種異常
#處理可讀取的套接字
'''
如果server這個套接字可讀,則說明有新連結到來
此時在server套接字上調用accept,生成一個與用戶端通訊的套接字
并将與用戶端通訊的套接字加入inputs清單,下一次可以通過select檢查連接配接是否可讀
然後在發往用戶端的緩沖中加入一項,鍵名為:與用戶端通訊的套接字,鍵值為空隊列
select系統調用是用來讓我們的程式監視多個檔案句柄(file descrīptor)的狀态變化的。程式會停在select這裡等待,
直到被監視的檔案句柄有某一個或多個發生了狀态改變
'''
'''
若可讀的套接字不是server套接字,有兩種情況:一種是有資料到來,另一種是連結斷開
如果有資料到來,先接收資料,然後将收到的資料填入往用戶端的緩存區中的對應位置,最後
将于用戶端通訊的套接字加入到寫資料的監聽清單:
如果套接字可讀.但沒有接收到資料,則說明用戶端已經斷開。這時需要關閉與用戶端連接配接的套接字
進行資源清理
'''
for s in readable:
if s is server:
connection,client_address = s.accept()
print >>sys.stderr,'connection from',client_address
connection.setblocking(0)#設定非阻塞
inputs.append(connection)
message_queues[connection] = Queue.Queue()
else:
data = s.recv(1024)
if data:
print >>sys.stderr,'received "%s" from %s'% \
(data,s.getpeername())
message_queues[s].put(data)
if s not in outputs:
outputs.append(s)
else:
print >>sys.stderr,'closing',client_address
if s in outputs:
outputs.remove(s)
inputs.remove(s)
s.close()
del message_queues[s]
#處理可寫的套接字
'''
在發送緩沖區中取出響應的資料,發往用戶端。
如果沒有資料需要寫,則将套接字從發送隊列中移除,select中不再監視
'''
for s in writable:
try:
next_msg = message_queues[s].get_nowait()
except Queue.Empty:
print >>sys.stderr,' ',s,getpeername(),'queue empty'
outputs.remove(s)
else:
print >>sys.stderr,'sending "%s" to %s'% \
(next_msg,s.getpeername())
s.send(next_msg)
#處理異常情況
for s in exceptional:
for s in exceptional:
print >>sys.stderr,'exception condition on',s.getpeername()
inputs.remove(s)
if s in outputs:
outputs.remove(s)
s.close()
del message_queues[s]
三、SOCKETSERVER
1.SocketServer子產品
SocketServer内部使用 IO多路複用 以及 “多線程” 和 “多程序” ,進而實作并發處理多個用戶端請求的Socket服務端。即:每個用戶端請求連接配接到伺服器時,Socket服務端都會在伺服器是建立一個“線程”或者“程序” 專門負責處理目前用戶端的所有請求。
(1)ThreadingTCPServer
ThreadingTCPServer實作的Soket伺服器内部會為每個client建立一個 “線程”,該線程用來和用戶端進行互動。
1)ThreadingTCPServer基礎
使用ThreadingTCPServer:
建立一個繼承自 SocketServer.BaseRequestHandler 的類
類中必須定義一個名稱為 handle 的方法
啟動ThreadingTCPServer
SocketServer實作伺服器
import SocketServer
class MyServer(SocketServer.BaseRequestHandler):
def handle(self):
# print self.request,self.client_address,self.server
conn = self.request
conn.sendall('歡迎緻電 10086,請輸入1xxx,0轉人工服務.')
Flag = True
while Flag:
data = conn.recv(1024)
if data == 'exit':
Flag = False
elif data == '0':
conn.sendall('通過可能會被錄音.balabala一大推')
else:
conn.sendall('請重新輸入.')
if __name__ == '__main__':
server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
server.serve_forever()
用戶端
import socket
ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)
while True:
data = sk.recv(1024)
print 'receive:',data
inp = raw_input('please input:')
sk.sendall(inp)
if inp == 'exit':
break
sk.close()
2)ThreadingTCPServer源碼剖析
ThreadingTCPServer的類圖關系如下:
内部調用流程為:
- 啟動服務端程式
- 執行 TCPServer.__init__ 方法,建立服務端Socket對象并綁定 IP 和 端口
- 執行 BaseServer.__init__ 方法,将自定義的繼承自SocketServer.BaseRequestHandler 的類 MyRequestHandle指派給 self.RequestHandlerClass
- 執行 BaseServer.server_forever 方法,While 循環一直監聽是否有用戶端請求到達 ...
- 當用戶端連接配接到達伺服器
- 執行 ThreadingMixIn.process_request 方法,建立一個 “線程” 用來處理請求
- 執行 ThreadingMixIn.process_request_thread 方法
- 執行 BaseServer.finish_request 方法,執行 self.RequestHandlerClass() 即:執行自定義 MyRequestHandler 的構造方法(自動調用基類BaseRequestHandler的構造方法,在該構造方法中又會調用 MyRequestHandler的handle方法)
2.執行個體
import SocketServer
class MyServer(SocketServer.BaseRequestHandler):
def handle(self):
# print self.request,self.client_address,self.server
conn = self.request
conn.sendall('歡迎緻電 10086,請輸入1xxx,0轉人工服務.')
Flag = True
while Flag:
data = conn.recv(1024)
if data == 'exit':
Flag = False
elif data == '0':
conn.sendall('通過可能會被錄音.balabala一大推')
else:
conn.sendall('請重新輸入.')
if __name__ == '__main__':
server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
server.serve_forever()
import socket
ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)
while True:
data = sk.recv(1024)
print 'receive:',data
inp = raw_input('please input:')
sk.sendall(inp)
if inp == 'exit':
break
sk.close()
3.ForkingTCPServer
ForkingTCPServer和ThreadingTCPServer的使用和執行流程基本一緻,隻不過在内部分别為請求者建立 “線程” 和 “程序”。
基本使用:
import SocketServer
class MyServer(SocketServer.BaseRequestHandler):
def handle(self):
# print self.request,self.client_address,self.server
conn = self.request
conn.sendall('歡迎緻電 10086,請輸入1xxx,0轉人工服務.')
Flag = True
while Flag:
data = conn.recv(1024)
if data == 'exit':
Flag = False
elif data == '0':
conn.sendall('通過可能會被錄音.balabala一大推')
else:
conn.sendall('請重新輸入.')
if __name__ == '__main__':
server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyServer)
server.serve_forever()
import socket
ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)
while True:
data = sk.recv(1024)
print 'receive:',data
inp = raw_input('please input:')
sk.sendall(inp)
if inp == 'exit':
break
sk.close()
以上ForkingTCPServer隻是将 ThreadingTCPServer 執行個體中的代碼:
server = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyRequestHandler)
變更為:
server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyRequestHandler)
SocketServer的ThreadingTCPServer之是以可以同時處理請求得益于 select 和 os.fork 兩個東西,其實本質上就是在伺服器端為每一個用戶端建立一個程序,目前新建立的程序用來處理對應用戶端的請求,是以,可以支援同時n個用戶端連結(長連接配接)。
4.socketserver實作并發
基于tcp的套接字,關鍵就是兩個循環,一個連結循環,一個通信循環
socketserver子產品中分兩大類:server類(解決連結問題)和request類(解決通信問題)
server類:
request類:
繼承關系:
以下述代碼為例,分析socketserver源碼:
ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
ftpserver.serve_forever()
查找屬性的順序:ThreadingTCPServer->ThreadingMixIn->TCPServer->BaseServer
- 執行個體化得到ftpserver,先找類ThreadingTCPServer的__init__,在TCPServer中找到,進而執行server_bind,server_active
- 找ftpserver下的serve_forever,在BaseServer中找到,進而執行self._handle_request_noblock(),該方法同樣是在BaseServer中
- 執行self._handle_request_noblock()進而執行request, client_address = self.get_request()(就是TCPServer中的self.socket.accept()),然後執行self.process_request(request, client_address)
- 在ThreadingMixIn中找到process_request,開啟多線程應對并發,進而執行process_request_thread,執行self.finish_request(request, client_address)
- 上述四部分完成了連結循環,本部分開始進入處理通訊部分,在BaseServer中找到finish_request,觸發我們自己定義的類的執行個體化,去找__init__方法,而我們自己定義的類沒有該方法,則去它的父類也就是BaseRequestHandler中找....
源碼分析總結:
基于tcp的socketserver我們自己定義的類中的
- self.server即套接字對象
- self.request即一個連結
- self.client_address即用戶端位址
基于udp的socketserver我們自己定義的類中的
- self.request是一個元組(第一個元素是用戶端發來的資料,第二部分是服務端的udp套接字對象),如(b'adsf', <socket.socket fd=200, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=('127.0.0.1', 8080)>)
FtpServer
import socketserver
import struct
import json
import os
class FtpServer(socketserver.BaseRequestHandler):
coding='utf-8'
server_dir='file_upload'
max_packet_size=1024
BASE_DIR=os.path.dirname(os.path.abspath(__file__))
def handle(self):
print(self.request)
while True:
data=self.request.recv(4)
data_len=struct.unpack('i',data)[0]
head_json=self.request.recv(data_len).decode(self.coding)
head_dic=json.loads(head_json)
# print(head_dic)
cmd=head_dic['cmd']
if hasattr(self,cmd):
func=getattr(self,cmd)
func(head_dic)
def put(self,args):
file_path = os.path.normpath(os.path.join(
self.BASE_DIR,
self.server_dir,
args['filename']
))
filesize = args['filesize']
recv_size = 0
print('----->', file_path)
with open(file_path, 'wb') as f:
while recv_size < filesize:
recv_data = self.request.recv(self.max_packet_size)
f.write(recv_data)
recv_size += len(recv_data)
print('recvsize:%s filesize:%s' % (recv_size, filesize))
ftpserver=socketserver.ThreadingTCPServer(('127.0.0.1',8080),FtpServer)
ftpserver.serve_forever()
FtpClient
import socket
import struct
import json
import os
class MYTCPClient:
address_family = socket.AF_INET
socket_type = socket.SOCK_STREAM
allow_reuse_address = False
max_packet_size = 8192
coding='utf-8'
request_queue_size = 5
def __init__(self, server_address, connect=True):
self.server_address=server_address
self.socket = socket.socket(self.address_family,
self.socket_type)
if connect:
try:
self.client_connect()
except:
self.client_close()
raise
def client_connect(self):
self.socket.connect(self.server_address)
def client_close(self):
self.socket.close()
def run(self):
while True:
inp=input(">>: ").strip()
if not inp:continue
l=inp.split()
cmd=l[0]
if hasattr(self,cmd):
func=getattr(self,cmd)
func(l)
def put(self,args):
cmd=args[0]
filename=args[1]
if not os.path.isfile(filename):
print('file:%s is not exists' %filename)
return
else:
filesize=os.path.getsize(filename)
head_dic={'cmd':cmd,'filename':os.path.basename(filename),'filesize':filesize}
print(head_dic)
head_json=json.dumps(head_dic)
head_json_bytes=bytes(head_json,encoding=self.coding)
head_struct=struct.pack('i',len(head_json_bytes))
self.socket.send(head_struct)
self.socket.send(head_json_bytes)
send_size=0
with open(filename,'rb') as f:
for line in f:
self.socket.send(line)
send_size+=len(line)
print(send_size)
else:
print('upload successful')
client=MYTCPClient(('127.0.0.1',8080))
client.run()