天天看點

python ctypes 多線程_Python多線程一覽

【前言】最近一段時間在學習使用Python進行異步程式設計,與C++、Java等編譯型語言類型類似,Python也提供了一些可用于異步程式設計的内模組化塊。例如,支援多線程處理的若處理的任務屬于CPU密集型,則應使用多程序;若處理的任務屬于I/O密集型,則應使用多線程。本文着重介紹使用Python多線程進行異步程式設計,以後有時間的話,會另寫一篇使用Python多程序進行異步程式設計的文章。

本篇文章也假設讀者已經明白“阻塞”、“異步”、“鎖”等術語背後的基本原理,本篇文章的所有示例均基于threading 子產品。

本篇文章的解釋内容包括:建立線程

python ctypes 多線程_Python多線程一覽

線程鎖

python ctypes 多線程_Python多線程一覽

關閉/中斷線程

python ctypes 多線程_Python多線程一覽

基于線程實作取消服務。

【建立線程】Python中可通過調用threading.Thread直接建立線程,如下所示,調用threading.Thread通常需要傳入兩個參數:target為“線程函數”;args為傳遞給“線程函數”的參數,必須為tuple類型。

import threading

def worker(name):

print("thread %s is running" % name)

worker = threading.Thread(target=worker, args=('worker',))

worker.start()

worker.join()也可通過繼承threading.Thread類建立線程,如下所示。其中super(MyThread, self).__init__()表示對繼承自父類(threading.Thread)的屬性進行初始化,并用父類的初始化方法來初始化繼承的屬性。此外,繼承自threading.Thread類的子類必須具有run()方法。線程啟動時,自動調用每個線程執行個體的run()方法。

import threading

import time

class MyThread(threading.Thread):

def __init__(self, delay):

super(MyThread, self).__init__()

self.delay = delay

def run(self):

while True:

print("sleep %ds" % self.delay)

time.sleep(self.delay)

delays = [2, 4, 6]

threads = []

for delay in delays:

threads.append(MyThread(delay))

for t in threads:

t.start()

for t in threads:

t.join()

運作結果如下所示

如果多個線程同時對某個資源進行“寫操作”,則可能會出現不可預料的結果。考慮這樣一種情況:一個清單裡所有元素都是0,線程"set"從後向前把所有元素改成1,而線程"print"負責從前往後讀取清單并列印。那麼,可能線程"set"開始改的時候,線程"print"便來列印清單了,輸出就成了一半0一半1,這就是資料的不同步。是以,為了避免這種情況即為了保證資源的正确性,需要引入鎖的概念。鎖有兩種狀态——鎖定和未鎖定。每當一個線程比如"set"要通路共享資料時,必須先獲得鎖定;如果已經有别的線程比如"print"獲得鎖定了,那麼就讓線程"set"暫停,也就是同步阻塞;等到線程"print"通路完畢,釋放鎖以後,再讓線程"set"繼續。

使用 Thread 對象的 Lock 和 Rlock 可以實作簡單的線程同步,這兩個對象都有 acquire 方法和 release 方法,對于那些每次隻允許一個線程操作的資源,可以将對其進行的操作放到 acquire 和 release 方法之間,如下所示。

import threading

import time

class myThread (threading.Thread):

def __init__(self, threadID, name, counter):

#super(myThread, self).__init__()

threading.Thread.__init__(self)

self.threadID = threadID

self.name = name

self.counter = counter

def run(self):

print ("開啟線程: " + self.name)

# 擷取鎖,用于線程同步

threadLock.acquire()

print_time(self.name, self.counter, 3)

# 釋放鎖,開啟下一個線程

threadLock.release()

def print_time(threadName, delay, counter):

while counter:

time.sleep(delay)

print ("%s: %s" % (threadName, time.ctime(time.time())))

counter -= 1

threadLock = threading.Lock()

threads = []

# 建立新線程

thread1 = myThread(1, "Thread-1", 1)

thread2 = myThread(2, "Thread-2", 2)

# 開啟新線程

thread1.start()

thread2.start()

# 添加線程到線程清單

threads.append(thread1)

threads.append(thread2)

# 等待所有線程完成

for t in threads:

t.join()

print ("退出主線程")

運作結果如下所示

當然,Python 的 Queue 子產品中提供了同步的線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列 PriorityQueue。這些隊列都實作了鎖原語,能夠在多線程中直接使用,可以使用隊列來實作線程間的同步。此處,不再贅述。

【關閉/中斷線程】借助

def _async_raise(tid, exctype):

"""Raises an exception in the threads with id tid"""

if not inspect.isclass(exctype):

raise TypeError("Only types can be raised (not instances)")

res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype))

if res == 0:

raise ValueError("invalid thread id")

elif res != 1:

# """if it returns a number greater than one, you're in trouble,

# and you should call it again with exc=NULL to revert the effect"""

ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)

raise SystemError("PyThreadState_SetAsyncExc failed")

def stop_thread(thread):

_async_raise(thread.ident, SystemExit)更合理的方式是使用threading.Event()建立一個事件管理标記flag,然後線程在運作過程中以極短的時間間隔通路flag的狀态值,一旦flag的狀态值滿足“關閉線程”,立刻執行關閉線程操作。

class MyThread(threading.Thread):

"""Thread class with a stop() method. The thread itself has to check

regularly for the stopped() condition."""

def __init__(self, *args, **kwargs):

super(MyThread, self).__init__(*args, **kwargs)

self._stop_event = threading.Event()

def stop(self):

self._stop_event.set()

def stopped(self):

return self._stop_event.is_set()

def run(self):

while True:

#做任務

print("sleep 1s")

time.sleep(1)

if self.stopped():

# 做一些必要的收尾工作

break

【基于線程實作取消服務】

考慮這樣一個場景:排程器A和執行器B之間通過web通信,排程器A給執行器B傳入“執行”參數,執行器B基于單線程或多線程開始執行任務;之後,排程器A給執行器B傳入了“取消”參數,執行器B取消目前正在執行的任務。那如何實作“取消”服務呢?

解決思路:當排程器A給執行器B傳遞參數後,執行器B端首先判斷參數為“執行”操作還是“取消”操作。若為“執行”操作,執行器B端建立一個“線程池”并開始執行任務;若為“取消”操作,執行器B端将“線程池”中的線程資源利用“threading.Event()”的方式合理關閉,并清理或重新初始化“線程池”。需要注意的是,執行器B端的“線程池”對象需要為全局變量。Demo代碼如下所示。

模拟排程器A端代碼

import requests

#json_data = {"cancel": "yes"}

json_data = {"nums": 2, "cancel": "no"}

r = requests.post("http://127.0.0.1:5000/access_api", json=json_data)

print(r.text, type(r.text))

模拟執行器B端封裝web API代碼

import json

from flask import Flask, request

from test_API.test_process import MyThread

app = Flask(__name__)

threads = []

@app.route('/')

def hello_world():

return 'hello world'

@app.route('/access_api', methods=['POST'])

def access_api():

#print(request.headers)

global threads#聲明變量類型為全局變量

state = None

try:

print(threads)

cancel = request.json['cancel']

if cancel == "no":

print("for testing of performing")

nums = request.json['nums']

for i in range(nums):

threads.append(MyThread(i+1))

for t in threads:

t.start()

elif cancel == "yes":

print('for testing of canceling')

for t in threads:

t.stop()#MyThread.stop()

threads = []

state = 'Successful'

except:

state = 'Failed'

finally:

return state

if __name__ == '__main__':

app.run(host='0.0.0.0', port=5000, debug=True)

#本地通路ip為127.0.0.1

#區域網路其它電腦通路:提供本電腦的ip位址

模拟執行器B端執行邏輯代碼

import json

import time

import threading

class MyThread(threading.Thread):

"""Thread class with a stop() method. The thread itself has to check

regularly for the stopped() condition."""

def __init__(self, delay):

super(MyThread, self).__init__()

self._stop_event = threading.Event()

self.delay = delay

def stop(self):

self._stop_event.set()

def stopped(self):

return self._stop_event.is_set()

def run(self):

print("begin run the child thread")

while True:

print("sleep %ds" % self.delay)

time.sleep(self.delay)

if self.stopped():

# 做一些必要的收尾工作

break

當cancel值為no時,測試結果如下所示

重新傳入cancel值為yes時,測試結果如下

再重新傳入cancel值為no時,測試結果如下小結1:基于以上測試結果,demo代碼可實作了“取消”服務,而且“取消”方式也不是采用抛出異常的方式強制關閉線程,而是采用設定線程狀态标記值的方式合理關閉。但請注意,采用設定線程狀态标記值的方式關閉線程,隻能在目前正在運作的這次循環結束之後起作用。也就是說,在執行下次循環中的print時,線程檢測到标記值已被重置,是以退出循環。這種行為的典型應用場景是伺服器監聽用戶端的通路這種類型!

小結2:當然,如果線程之間不涉及到“鎖”,采用抛出異常的方式也可!該種方式适用于單次且耗時任務的執行!比如,從mysql資料庫遷移資料至oracle資料庫。整個過程涉及到資料庫讀、資料庫寫,比較耗時。此時,若想立刻結束該服務,應采用抛出異常的方式立即中斷線程。但應做好資料庫的復原等後續處理,盡量降低“不可預期行為”發生的可能性。

小結3:對“線程池”資源threads粗暴地進行重新指派的方式并不可取,應該對“線程池”資源進行合理釋放并重新初始化。

就寫這麼多吧,歡迎讨論指正!!!