【前言】最近一段時間在學習使用Python進行異步程式設計,與C++、Java等編譯型語言類型類似,Python也提供了一些可用于異步程式設計的内模組化塊。例如,支援多線程處理的若處理的任務屬于CPU密集型,則應使用多程序;若處理的任務屬于I/O密集型,則應使用多線程。本文着重介紹使用Python多線程進行異步程式設計,以後有時間的話,會另寫一篇使用Python多程序進行異步程式設計的文章。
本篇文章也假設讀者已經明白“阻塞”、“異步”、“鎖”等術語背後的基本原理,本篇文章的所有示例均基于threading 子產品。
本篇文章的解釋内容包括:建立線程
線程鎖
關閉/中斷線程
基于線程實作取消服務。
【建立線程】Python中可通過調用threading.Thread直接建立線程,如下所示,調用threading.Thread通常需要傳入兩個參數:target為“線程函數”;args為傳遞給“線程函數”的參數,必須為tuple類型。
import threading
def worker(name):
print("thread %s is running" % name)
worker = threading.Thread(target=worker, args=('worker',))
worker.start()
worker.join()也可通過繼承threading.Thread類建立線程,如下所示。其中super(MyThread, self).__init__()表示對繼承自父類(threading.Thread)的屬性進行初始化,并用父類的初始化方法來初始化繼承的屬性。此外,繼承自threading.Thread類的子類必須具有run()方法。線程啟動時,自動調用每個線程執行個體的run()方法。
import threading
import time
class MyThread(threading.Thread):
def __init__(self, delay):
super(MyThread, self).__init__()
self.delay = delay
def run(self):
while True:
print("sleep %ds" % self.delay)
time.sleep(self.delay)
delays = [2, 4, 6]
threads = []
for delay in delays:
threads.append(MyThread(delay))
for t in threads:
t.start()
for t in threads:
t.join()
運作結果如下所示
如果多個線程同時對某個資源進行“寫操作”,則可能會出現不可預料的結果。考慮這樣一種情況:一個清單裡所有元素都是0,線程"set"從後向前把所有元素改成1,而線程"print"負責從前往後讀取清單并列印。那麼,可能線程"set"開始改的時候,線程"print"便來列印清單了,輸出就成了一半0一半1,這就是資料的不同步。是以,為了避免這種情況即為了保證資源的正确性,需要引入鎖的概念。鎖有兩種狀态——鎖定和未鎖定。每當一個線程比如"set"要通路共享資料時,必須先獲得鎖定;如果已經有别的線程比如"print"獲得鎖定了,那麼就讓線程"set"暫停,也就是同步阻塞;等到線程"print"通路完畢,釋放鎖以後,再讓線程"set"繼續。
使用 Thread 對象的 Lock 和 Rlock 可以實作簡單的線程同步,這兩個對象都有 acquire 方法和 release 方法,對于那些每次隻允許一個線程操作的資源,可以将對其進行的操作放到 acquire 和 release 方法之間,如下所示。
import threading
import time
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
#super(myThread, self).__init__()
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("開啟線程: " + self.name)
# 擷取鎖,用于線程同步
threadLock.acquire()
print_time(self.name, self.counter, 3)
# 釋放鎖,開啟下一個線程
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
threadLock = threading.Lock()
threads = []
# 建立新線程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 開啟新線程
thread1.start()
thread2.start()
# 添加線程到線程清單
threads.append(thread1)
threads.append(thread2)
# 等待所有線程完成
for t in threads:
t.join()
print ("退出主線程")
運作結果如下所示
當然,Python 的 Queue 子產品中提供了同步的線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列 PriorityQueue。這些隊列都實作了鎖原語,能夠在多線程中直接使用,可以使用隊列來實作線程間的同步。此處,不再贅述。
【關閉/中斷線程】借助
def _async_raise(tid, exctype):
"""Raises an exception in the threads with id tid"""
if not inspect.isclass(exctype):
raise TypeError("Only types can be raised (not instances)")
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stop_thread(thread):
_async_raise(thread.ident, SystemExit)更合理的方式是使用threading.Event()建立一個事件管理标記flag,然後線程在運作過程中以極短的時間間隔通路flag的狀态值,一旦flag的狀态值滿足“關閉線程”,立刻執行關閉線程操作。
class MyThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, *args, **kwargs):
super(MyThread, self).__init__(*args, **kwargs)
self._stop_event = threading.Event()
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def run(self):
while True:
#做任務
print("sleep 1s")
time.sleep(1)
if self.stopped():
# 做一些必要的收尾工作
break
【基于線程實作取消服務】
考慮這樣一個場景:排程器A和執行器B之間通過web通信,排程器A給執行器B傳入“執行”參數,執行器B基于單線程或多線程開始執行任務;之後,排程器A給執行器B傳入了“取消”參數,執行器B取消目前正在執行的任務。那如何實作“取消”服務呢?
解決思路:當排程器A給執行器B傳遞參數後,執行器B端首先判斷參數為“執行”操作還是“取消”操作。若為“執行”操作,執行器B端建立一個“線程池”并開始執行任務;若為“取消”操作,執行器B端将“線程池”中的線程資源利用“threading.Event()”的方式合理關閉,并清理或重新初始化“線程池”。需要注意的是,執行器B端的“線程池”對象需要為全局變量。Demo代碼如下所示。
模拟排程器A端代碼
import requests
#json_data = {"cancel": "yes"}
json_data = {"nums": 2, "cancel": "no"}
r = requests.post("http://127.0.0.1:5000/access_api", json=json_data)
print(r.text, type(r.text))
模拟執行器B端封裝web API代碼
import json
from flask import Flask, request
from test_API.test_process import MyThread
app = Flask(__name__)
threads = []
@app.route('/')
def hello_world():
return 'hello world'
@app.route('/access_api', methods=['POST'])
def access_api():
#print(request.headers)
global threads#聲明變量類型為全局變量
state = None
try:
print(threads)
cancel = request.json['cancel']
if cancel == "no":
print("for testing of performing")
nums = request.json['nums']
for i in range(nums):
threads.append(MyThread(i+1))
for t in threads:
t.start()
elif cancel == "yes":
print('for testing of canceling')
for t in threads:
t.stop()#MyThread.stop()
threads = []
state = 'Successful'
except:
state = 'Failed'
finally:
return state
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
#本地通路ip為127.0.0.1
#區域網路其它電腦通路:提供本電腦的ip位址
模拟執行器B端執行邏輯代碼
import json
import time
import threading
class MyThread(threading.Thread):
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__(self, delay):
super(MyThread, self).__init__()
self._stop_event = threading.Event()
self.delay = delay
def stop(self):
self._stop_event.set()
def stopped(self):
return self._stop_event.is_set()
def run(self):
print("begin run the child thread")
while True:
print("sleep %ds" % self.delay)
time.sleep(self.delay)
if self.stopped():
# 做一些必要的收尾工作
break
當cancel值為no時,測試結果如下所示
重新傳入cancel值為yes時,測試結果如下
再重新傳入cancel值為no時,測試結果如下小結1:基于以上測試結果,demo代碼可實作了“取消”服務,而且“取消”方式也不是采用抛出異常的方式強制關閉線程,而是采用設定線程狀态标記值的方式合理關閉。但請注意,采用設定線程狀态标記值的方式關閉線程,隻能在目前正在運作的這次循環結束之後起作用。也就是說,在執行下次循環中的print時,線程檢測到标記值已被重置,是以退出循環。這種行為的典型應用場景是伺服器監聽用戶端的通路這種類型!
小結2:當然,如果線程之間不涉及到“鎖”,采用抛出異常的方式也可!該種方式适用于單次且耗時任務的執行!比如,從mysql資料庫遷移資料至oracle資料庫。整個過程涉及到資料庫讀、資料庫寫,比較耗時。此時,若想立刻結束該服務,應采用抛出異常的方式立即中斷線程。但應做好資料庫的復原等後續處理,盡量降低“不可預期行為”發生的可能性。
小結3:對“線程池”資源threads粗暴地進行重新指派的方式并不可取,應該對“線程池”資源進行合理釋放并重新初始化。
就寫這麼多吧,歡迎讨論指正!!!