天天看點

python 消息隊列_Python基礎-多線程(線程建立/線程同步/死鎖問題/消息隊列)(19)

多線程

概念

線程是程序中一個“單一的連續控制流程”/執行路徑

  • 線程被稱為輕量級程序
  • 一個程序可以擁有多個并行的線程
  • 一個程序中的線程共享相同的記憶體單元/記憶體位址空間(可以通路相同的變量和對象),而且他們從同一個堆中配置設定對象(通信,資料交換,同步操作)
  • 由于線程間的通信是在同一位址空間上進行的,是以不需要額外的通信機制,這使得通信更簡便并且資訊傳遞的速度更快。

線程幾種狀态

多線程程式的執行順序是不确定的,當執行到sleep時,線程将被阻塞(bloceed),到sleep結束後,線程進入就緒(runable)狀态等待排程。而線程排程将自行選擇一個線程執行。

python 消息隊列_Python基礎-多線程(線程建立/線程同步/死鎖問題/消息隊列)(19)

在這裡插入圖檔描述

多線程建立方式

py中的thread子產品是比較底層的子產品,而threading子產品是對thread子產品做了一些包裝,使之可以更友善的被調用。

threading子產品

  • threading.enumrate(): 檢視目前程序線程數量

threading.Thread類

  • 初始化參數:和多程序Process的初始化方法作用一緻
  • is_alive():是否存活
  • join(): 等待建立的線程執行完
  • run():線程建立後需要執行的方法,可以被重寫
  • start():啟動線程

建立線程方式一:傳入方法運作

import os,time,random,threadingdef say(name):    print('%sSorry'%name)if __name__=='__main__':    t=threading.Thread(target=say,args=('MM',))    t.start()    t.join()'''MMSorry'''
           

建立線程方式二:子類化

import os,time,random,threadingclass Thread_Test(threading.Thread):    def __init__(self,name):        #注意:初始化要放在參數指派前面        threading.Thread.__init__(self)        self.name=name    def run(self):        print('目前程序名%s'%(self.name,))if __name__=='__main__':    t=Thread_Test('MM')    t.start()    t.join()'''目前程序名MM'''
           

線程共享全局變量問題

  • 優點:在一個程序内的所有線程共享全局變量,能夠在不适用其它方式的前提下完成多線程之間的資料共享
  • 缺點:線程對全局變量的随意改變可能造成多線程對全局變量的混亂。

共享全局變量程式設計:使用global關鍵字

當然也可以使用給方法傳參的方式

import os,time,random,threadingnum=100def fun1():    #聲明num為全局變量    global num    for i in range(3):        num+=1        print(str(num),'>>>fun1')def fun2():    global num    print(num,'>>>fun2')t1=threading.Thread(target=fun1)t1.start()time.sleep(1)t2=threading.Thread(target=fun2)t2.start()'''通過結果可以看出,兩個線程共享num變量101 >>>fun1102 >>>fun1103 >>>fun1103 >>>fun2'''
           

ThreadLocal變量

  • 一個ThreadLocal變量雖然是全局變量,但是每個線程都隻能讀寫自己線程的獨立副本。這樣就解決了參數在一個線程中各個參數之間的傳遞問題。
  • ThreadLocal最常用的操作就是為每一個線程綁定一個資料庫連接配接,HTTP請求,使用者身份資訊等,這樣一個線程的所有調用函數都可以友善的通路資源。
import os,time,random,threadingdef fun1(name):    local.name=name    fun2()def fun2():    name=local.name    print(name)#建立全局的ThreadLocal對象local=threading.local()for i in range(5):    threading.Thread(target=fun1,args=(i,)).start()
           

線程同步問題(加鎖)

概念

  • 多個線程幾乎在同時修改某一共享資料時,需要進行同步控制
  • 線程同步能夠保證多個線程安全通路競争資源,最簡單的同步機制就是引入互斥鎖
  • 互斥鎖保證每次隻有一個線程能夠程序寫操作,進而保證資料的準确性
  • threading子產品定義了lock類來進行鎖操作

lock類

  • Lock():建立鎖
  • acquire([blocking]): 進入同步狀态,如果blocking為True,則目前線程會阻塞,直到擷取鎖,False則不會阻塞。預設為True。
  • release():釋放鎖

同步程式設計

import os,time,random,threadingnum=100def fun1():    #聲明num為全局變量    global num    #擷取鎖    lock.acquire()    try:        for i in range(100):            num += 1            print(str(num), '>>>fun1')    finally:        #釋放鎖        lock.release()def fun2():    global num    lock.acquire()    try:        for i in range(100):            num += 1            print(str(num), '>>>fun2')    finally:        lock.release()lock=threading.Lock()t1=threading.Thread(target=fun1)t1.start()time.sleep(1)t2=threading.Thread(target=fun2)t2.start()
           

死鎖問題

線程間共享多個資源時,如果兩個線程分别占有一部分資源并且同時等待對方的資源,就會進入死鎖狀态。

import os,time,random,threadingnum=100def fun1():    #聲明num為全局變量    global num    #擷取鎖    lock1.acquire()    try:        print('>>>fun1')        time.sleep(1)        lock2.acquire()        for i in range(100):            num += 1            print(str(num), '>>>fun1')        lock2.release()    finally:        #釋放鎖        lock1.release()def fun2():    global num    lock2.acquire()    print('>>>fun2')    time.sleep(1)    try:        lock1.acquire()        for i in range(100):            num += 1            print(str(num), '>>>fun2')        lock1.release()    finally:        lock2.release()lock1=threading.Lock()lock2=threading.Lock()t1=threading.Thread(target=fun1)t2=threading.Thread(target=fun2)t1.start()t2.start()'''>>>fun1>>>fun2'''
           

線程間消息隊列

py的Queue子產品提供了同步的,線程安全的隊列類

  • FIFO(先進先出):Queue
  • LIFO(先進後出): LifoQueue ,棧模式
  • 優先級隊列:PriorityQueue

這些隊列都是實作了鎖原語(原子操作),可以在多線程中直接使用。

生産者消費者

from queue import Queue,LifoQueue,PriorityQueueimport os,time,random,threadingclass Producer(threading.Thread):    global queue    def run(self):        count=0        while True:            if queue.qsize()<1000:                count+=1                msg='生産者'+str(count)                queue.put(msg)                print(msg)class Consumer(threading.Thread):    def run(self):        global queue        while True:            if queue.qsize()>0:                msg=self.name+'消費了'+queue.get()                print(msg)#queue=Queue()queue=PriorityQueue()#queue=LifoQueue()for i in range(5):    Producer().start()for i in range(5):    Consumer().start()