天天看點

Python 多線程與多程序

原文位址:http://www.cnblogs.com/whatisfantasy/p/6440585.html

1 概念梳理:

1.1 線程

1.1.1 什麼是線程

線程是作業系統能夠進行運算排程的最小機關。它被包含在程序之中,是程序中的實際運作機關。一條線程指的是程序中一個單一順序的控制流,一個程序中可以并發多個線程,每條線程并行執行不同的任務。一個線程是一個execution context(執行上下文),即一個cpu執行時所需要的一串指令。

1.1.2 線程的工作方式

假設你正在讀一本書,沒有讀完,你想休息一下,但是你想在回來時恢複到當時讀的具體進度。有一個方法就是記下頁數、行數與字數這三個數值,這些數值就是execution context。如果你的室友在你休息的時候,使用相同的方法讀這本書。你和她隻需要這三個數字記下來就可以在交替的時間共同閱讀這本書了。

線程的工作方式與此類似。CPU會給你一個在同一時間能夠做多個運算的幻覺,實際上它在每個運算上隻花了極少的時間,本質上CPU同一時刻隻幹了一件事。它能這樣做就是因為它有每個運算的execution context。就像你能夠和你朋友共享同一本書一樣,多任務也能共享同一塊CPU。

1.2 程序

一個程式的執行執行個體就是一個程序。每一個程序提供執行程式所需的所有資源。(程序本質上是資源的集合)

一個程序有一個虛拟的位址空間、可執行的代碼、作業系統的接口、安全的上下文(記錄啟動該程序的使用者和權限等等)、唯一的程序ID、環境變量、優先級類、最小和最大的工作空間(記憶體空間),還要有至少一個線程。

每一個程序啟動時都會最先産生一個線程,即主線程。然後主線程會再建立其他的子線程。

與程序相關的資源包括:
  • 記憶體頁(同一個程序中的所有線程共享同一個記憶體空間)
  • 檔案描述符(e.g. open sockets)
  • 安全憑證(e.g.啟動該程序的使用者ID)

1.3 程序與線程差別

1.同一個程序中的線程共享同一記憶體空間,但是程序之間是獨立的。

2.同一個程序中的所有線程的資料是共享的(程序通訊),程序之間的資料是獨立的。

3.對主線程的修改可能會影響其他線程的行為,但是父程序的修改(除了删除以外)不會影響其他子程序。

4.線程是一個上下文的執行指令,而程序則是與運算相關的一簇資源。

5.同一個程序的線程之間可以直接通信,但是程序之間的交流需要借助中間代理來實作。

6.建立新的線程很容易,但是建立新的程序需要對父程序做一次複制。

7.一個線程可以操作同一程序的其他線程,但是程序隻能操作其子程序。

8.線程啟動速度快,程序啟動速度慢(但是兩者運作速度沒有可比性)。

2 多線程

2.1 線程常用方法

方法 注釋
start() 線程準備就緒,等待CPU排程
setName() 為線程設定名稱
getName() 擷取線程名稱
setDaemon(True) 設定為守護線程
join() 逐個執行每個線程,執行完畢後繼續往下執行
run() 線程被cpu排程後自動執行線程對象的run方法,如果想自定義線程類,直接重寫run方法就行了
2.1.1 Thread類

1.普通建立方式

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)
    print('2s')
    time.sleep(1)
    print('1s')
    time.sleep(1)
    print('0s')
    time.sleep(1)

t1 = threading.Thread(target=run, args=("t1",))
t2 = threading.Thread(target=run, args=("t2",))
t1.start()
t2.start()

"""
task t1
task t2
2s
2s
1s
1s
0s
0s
"""           

複制

2.繼承threading.Thread來自定義線程類

其本質是重構Thread類中的run方法

import threading
import time


class MyThread(threading.Thread):
    def __init__(self, n):
        super(MyThread, self).__init__()  # 重構run函數必須要寫
        self.n = n

    def run(self):
        print("task", self.n)
        time.sleep(1)
        print('2s')
        time.sleep(1)
        print('1s')
        time.sleep(1)
        print('0s')
        time.sleep(1)


if __name__ == "__main__":
    t1 = MyThread("t1")
    t2 = MyThread("t2")

    t1.start()
    t2.start()           

複制

2.1.2 計算子線程執行的時間

注:sleep的時候是不會占用cpu的,在sleep的時候作業系統會把線程暫時挂起。

join()  #等此線程執行完後,再執行其他線程或主線程
threading.current_thread()      #輸出目前線程           

複制

import threading
import time

def run(n):
    print("task", n,threading.current_thread())    #輸出目前的線程
    time.sleep(1)
    print('3s')
    time.sleep(1)
    print('2s')
    time.sleep(1)
    print('1s')

strat_time = time.time()

t_obj = []   #定義清單用于存放子線程執行個體

for i in range(3):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()
    t_obj.append(t)
    
"""
由主線程生成的三個子線程
task t-0 <Thread(Thread-1, started 44828)>
task t-1 <Thread(Thread-2, started 42804)>
task t-2 <Thread(Thread-3, started 41384)>
"""

for tmp in t_obj:
    t.join()            #為每個子線程添加join之後,主線程就會等這些子線程執行完之後再執行。

print("cost:", time.time() - strat_time) #主線程

print(threading.current_thread())       #輸出目前線程
"""
<_MainThread(MainThread, started 43740)>
"""           

複制

2.1.3 統計目前活躍的線程數

由于主線程比子線程快很多,當主線程執行active_count()時,其他子線程都還沒執行完畢,是以利用主線程統計的活躍的線程數num = sub_num(子線程數量)+1(主線程本身)

import threading
import time

def run(n):
    print("task", n)    
    time.sleep(1)       #此時子線程停1s

for i in range(3):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()

time.sleep(0.5)     #主線程停0.5秒
print(threading.active_count()) #輸出目前活躍的線程數

"""
task t-0
task t-1
task t-2
4
"""           

複制

由于主線程比子線程慢很多,當主線程執行active_count()時,其他子線程都已經執行完畢,是以利用主線程統計的活躍的線程數num = 1(主線程本身)

import threading
import time


def run(n):
    print("task", n)
    time.sleep(0.5)       #此時子線程停0.5s


for i in range(3):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()

time.sleep(1)     #主線程停1秒
print(threading.active_count()) #輸出活躍的線程數
"""
task t-0
task t-1
task t-2
1
"""           

複制

此外我們還能發現在python内部預設會等待最後一個程序執行完後再執行exit(),或者說python内部在此時有一個隐藏的join()。

2.2 守護程序

我們看下面這個例子,這裡使用setDaemon(True)把所有的子線程都變成了主線程的守護線程,是以當主程序結束後,子線程也會随之結束。是以當主線程結束後,整個程式就退出了。

import threading
import time

def run(n):
    print("task", n)
    time.sleep(1)       #此時子線程停1s
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')

for i in range(3):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.setDaemon(True)   #把子程序設定為守護線程,必須在start()之前設定
    t.start()

time.sleep(0.5)     #主線程停0.5秒
print(threading.active_count()) #輸出活躍的線程數
"""
task t-0
task t-1
task t-2
4

Process finished with exit code 0
"""           

複制

2.3 GIL

在非python環境中,單核情況下,同時隻能有一個任務執行。多核時可以支援多個線程同時執行。但是在python中,無論有多少核,同時隻能執行一個線程。究其原因,這就是由于GIL的存在導緻的。

GIL的全稱是Global Interpreter Lock(全局解釋器鎖),來源是python設計之初的考慮,為了資料安全所做的決定。某個線程想要執行,必須先拿到GIL,我們可以把GIL看作是“通行證”,并且在一個python程序中,GIL隻有一個。拿不到通行證的線程,就不允許進入CPU執行。GIL隻在cpython中才有,因為cpython調用的是c語言的原生線程,是以他不能直接操作cpu,隻能利用GIL保證同一時間隻能有一個線程拿到資料。而在pypy和jpython中是沒有GIL的。

Python多線程的工作過程:

python在使用多線程的時候,調用的是c語言的原生線程。

  1. 拿到公共資料
  2. 申請gil
  3. python解釋器調用os原生線程
  4. os操作cpu執行運算
  5. 當該線程執行時間到後,無論運算是否已經執行完,gil都被要求釋放
  6. 進而由其他程序重複上面的過程
  7. 等其他程序執行完後,又會切換到之前的線程(從他記錄的上下文繼續執行)

    整個過程是每個線程執行自己的運算,當執行時間到就進行切換(context switch)。

  • python針對不同類型的代碼執行效率也是不同的:

    1、CPU密集型代碼(各種循環處理、計算等等),在這種情況下,由于計算工作多,ticks計數很快就會達到門檻值,然後觸發GIL的釋放與再競争(多個線程來回切換當然是需要消耗資源的),是以python下的多線程對CPU密集型代碼并不友好。

    2、IO密集型代碼(檔案處理、網絡爬蟲等涉及檔案讀寫的操作),多線程能夠有效提升效率(單線程下有IO操作會進行IO等待,造成不必要的時間浪費,而開啟多線程能線上程A等待時,自動切換到線程B,可以不浪費CPU的資源,進而能提升程式執行效率)。是以python的多線程對IO密集型代碼比較友好。

  • 使用建議?

    python下想要充分利用多核CPU,就用多程序。因為每個程序有各自獨立的GIL,互不幹擾,這樣就可以真正意義上的并行執行,在python中,多程序的執行效率優于多線程(僅僅針對多核CPU而言)。

  • GIL在python中的版本差異:

    1、在python2.x裡,GIL的釋放邏輯是目前線程遇見

    IO操作

    或者

    ticks計數達到100

    時進行釋放。(ticks可以看作是python自身的一個計數器,專門做用于GIL,每次釋放後歸零,這個計數可以通過sys.setcheckinterval 來調整)。而每次釋放GIL鎖,線程進行鎖競争、切換線程,會消耗資源。并且由于GIL鎖存在,python裡一個程序永遠隻能同時執行一個線程(拿到GIL的線程才能執行),這就是為什麼在多核CPU上,python的多線程效率并不高。

    2、在python3.x中,GIL不使用ticks計數,改為使用計時器(執行時間達到門檻值後,目前線程釋放GIL),這樣對CPU密集型程式更加友好,但依然沒有解決GIL導緻的同一時間隻能執行一個線程的問題,是以效率依然不盡如人意。

2.4 線程鎖

由于線程之間是進行随機排程,并且每個線程可能隻執行n條執行之後,當多個線程同時修改同一條資料時可能會出現髒資料,是以,出現了線程鎖,即同一時刻允許一個線程執行操作。線程鎖用于鎖定資源,你可以定義多個鎖, 像下面的代碼, 當你需要獨占某一資源時,任何一個鎖都可以鎖這個資源,就好比你用不同的鎖都可以把相同的一個門鎖住是一個道理。

由于線程之間是進行随機排程,如果有多個線程同時操作一個對象,如果沒有很好地保護該對象,會造成程式結果的不可預期,我們也稱此為“線程不安全”。

#實測:在python2.7、mac os下,運作以下代碼可能會産生髒資料。但是在python3中就不一定會出現下面的問題。

import threading
import time

def run(n):
    global num
    num += 1

num = 0
t_obj = [] 

for i in range(20000):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()
    t_obj.append(t)

for t in t_obj:
    t.join()

print "num:", num
"""
産生髒資料後的運作結果:
num: 19999
"""           

複制

2.5 互斥鎖(mutex)

為了方式上面情況的發生,就出現了互斥鎖(Lock)

import threading
import time


def run(n):
    lock.acquire()  #擷取鎖
    global num
    num += 1
    lock.release()  #釋放鎖

lock = threading.Lock()     #執行個體化一個鎖對象

num = 0
t_obj = []  

for i in range(20000):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()
    t_obj.append(t)

for t in t_obj:
    t.join()

print "num:", num           

複制

2.6 遞歸鎖

RLcok類的用法和Lock類一模一樣,但它支援嵌套,,在多個鎖沒有釋放的時候一般會使用使用RLcok類。

import threading
import time
   
gl_num = 0
   
lock = threading.RLock()
   
def Func():
    lock.acquire()
    global gl_num
    gl_num +=1
    time.sleep(1)
    print gl_num
    lock.release()
       
for i in range(10):
    t = threading.Thread(target=Func)
    t.start()           

複制

2.7 信号量(BoundedSemaphore類)

互斥鎖同時隻允許一個線程更改資料,而Semaphore是同時允許一定數量的線程更改資料 ,比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人隻能等裡面有人出來了才能再進去。

import threading
import time


def run(n):
    semaphore.acquire()   #加鎖
    time.sleep(1)
    print("run the thread:%s\n" % n)
    semaphore.release()     #釋放


num = 0
semaphore = threading.BoundedSemaphore(5)  # 最多允許5個線程同時運作

for i in range(22):
    t = threading.Thread(target=run, args=("t-%s" % i,))
    t.start()

while threading.active_count() != 1:
    pass  # print threading.active_count()
else:
    print('-----all threads done-----')           

複制

2.8 事件(Event類)

python線程的事件用于主線程控制其他線程的執行,事件是一個簡單的線程同步對象,其主要提供以下幾個方法:

方法 注釋
clear 将flag設定為“False”
set 将flag設定為“True”
is_set 判斷是否設定了flag
wait 會一直監聽flag,如果沒有檢測到flag就一直處于阻塞狀态

事件處理的機制:全局定義了一個“Flag”,當flag值為“False”,那麼event.wait()就會阻塞,當flag值為“True”,那麼event.wait()便不再阻塞。

#利用Event類模拟紅綠燈
import threading
import time

event = threading.Event()


def lighter():
    count = 0
    event.set()     #初始值為綠燈
    while True:
        if 5 < count <=10 :
            event.clear()  # 紅燈,清除标志位
            print("\33[41;1mred light is on...\033[0m")
        elif count > 10:
            event.set()  # 綠燈,設定标志位
            count = 0
        else:
            print("\33[42;1mgreen light is on...\033[0m")

        time.sleep(1)
        count += 1

def car(name):
    while True:
        if event.is_set():      #判斷是否設定了标志位
            print("[%s] running..."%name)
            time.sleep(1)
        else:
            print("[%s] sees red light,waiting..."%name)
            event.wait()
            print("[%s] green light is on,start going..."%name)

light = threading.Thread(target=lighter,)
light.start()

car = threading.Thread(target=car,args=("MINI",))
car.start()           

複制

2.9 條件(Condition類)

使得線程等待,隻有滿足某條件時,才釋放n個線程

2.10 定時器(Timer類)

定時器,指定n秒後執行某操作

from threading import Timer
 
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()  # after 1 seconds, "hello, world" will be printed           

複制

3 多程序

在linux中,每個程序都是由父程序提供的。每啟動一個子程序就從父程序克隆一份資料,但是程序之間的資料本身是不能共享的。

from multiprocessing import Process
import time
def f(name):
    time.sleep(2)
    print('hello', name)
 
if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()           

複制

from multiprocessing import Process
import os
 
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())  #擷取父程序id
    print('process id:', os.getpid())   #擷取自己的程序id
    print("\n\n")
 
def f(name):
    info('\033[31;1mfunction f\033[0m')
    print('hello', name)
 
if __name__ == '__main__':
    info('\033[32;1mmain process line\033[0m')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()           

複制

3.1 程序間通信

由于程序之間資料是不共享的,是以不會出現多線程GIL帶來的問題。多程序之間的通信通過Queue()或Pipe()來實作

3.1.1 Queue()

使用方法跟threading裡的queue差不多

from multiprocessing import Process, Queue
 
def f(q):
    q.put([42, None, 'hello'])
 
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()           

複制

3.1.2 Pipe()

Pipe的本質是程序之間的資料傳遞,而不是資料共享,這和socket有點像。pipe()傳回兩個連接配接對象分别表示管道的兩端,每端都有send()和recv()方法。如果兩個程序試圖在同一時間的同一端進行讀取和寫入那麼,這可能會損壞管道中的資料。

from multiprocessing import Process, Pipe
 
def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
 
if __name__ == '__main__':
    parent_conn, child_conn = Pipe() 
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()           

複制

3.2 Manager

通過Manager可實作程序間資料的共享。Manager()傳回的manager對象會通過一個服務程序,來使其他程序通過代理的方式操作python對象。manager對象支援 

list

dict

Namespace

Lock

RLock

Semaphore

BoundedSemaphore

Condition

Event

Barrier

Queue

Value

 ,

Array

.

from multiprocessing import Process, Manager
 
def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.append(1)
    print(l)
 
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
 
        l = manager.list(range(5))
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
 
        print(d)
        print(l)           

複制

3.3 程序鎖(程序同步)

資料輸出的時候保證不同程序的輸出内容在同一塊螢幕正常顯示,防止資料亂序的情況。

Without using the lock output from the different processes is liable to get all mixed up.

from multiprocessing import Process, Lock
 
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
 
if __name__ == '__main__':
    lock = Lock()
 
    for num in range(10):
        Process(target=f, args=(lock, num)).start()           

複制

3.4 程序池

由于程序啟動的開銷比較大,使用多程序的時候會導緻大量記憶體空間被消耗。為了防止這種情況發生可以使用程序池,(由于啟動線程的開銷比較小,是以不需要線程池這種概念,多線程隻會頻繁得切換cpu導緻系統變慢,并不會占用過多的記憶體空間)

程序池中常用方法:

apply()

 同步執行(串行)

apply_async()

 異步執行(并行)

terminate()

 立刻關閉程序池

join()

 主程序等待所有子程序執行完畢。必須在close或terminate()之後。

close()

 等待所有程序結束後,才關閉程序池。

from  multiprocessing import Process,Pool
import time
 
def Foo(i):
    time.sleep(2)
    return i+100
 
def Bar(arg):
    print('-->exec done:',arg)
 
pool = Pool(5)  #允許程序池同時放入5個程序
 
for i in range(10):
    pool.apply_async(func=Foo, args=(i,),callback=Bar)  #func子程序執行完後,才會執行callback,否則callback不執行(而且callback是由父程序來執行了)
    #pool.apply(func=Foo, args=(i,))
 
print('end')
pool.close()
pool.join() #主程序等待所有子程序執行完畢。必須在close()或terminate()之後。           

複制

程序池内部維護一個程序式列,當使用時,去程序池中擷取一個程序,如果程序池序列中沒有可供使用的程序,那麼程式就會等待,直到程序池中有可用程序為止。在上面的程式中産生了10個程序,但是隻能有5同時被放入程序池,剩下的都被暫時挂起,并不占用記憶體空間,等前面的五個程序執行完後,再執行剩下5個程序。

4 補充:協程

線程和程序的操作是由程式觸發系統接口,最後的執行者是系統,它本質上是作業系統提供的功能。而協程的操作則是程式員指定的,在python中通過yield,人為的實作并發處理。

協程存在的意義:對于多線程應用,CPU通過切片的方式來切換線程間的執行,線程切換時需要耗時。協程,則隻使用一個線程,分解一個線程成為多個“微線程”,在一個線程中規定某個代碼塊的執行順序。

協程的适用場景:當程式中存在大量不需要CPU的操作時(IO)。

常用第三方子產品gevent和greenlet。(本質上,gevent是對greenlet的進階封裝,是以一般用它就行,這是一個相當高效的子產品。)

4.1 greenlet

from greenlet import greenlet

def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()

def test2():
    print(56)
    gr1.switch()
    print(78)

gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()           

複制

實際上,greenlet就是通過switch方法在不同的任務之間進行切換。

4.2 gevent

from gevent import monkey; monkey.patch_all()
import gevent
import requests

def f(url):
    print('GET: %s' % url)
    resp = requests.get(url)
    data = resp.text
    print('%d bytes received from %s.' % (len(data), url))

gevent.joinall([
        gevent.spawn(f, 'https://www.python.org/'),
        gevent.spawn(f, 'https://www.yahoo.com/'),
        gevent.spawn(f, 'https://github.com/'),
])           

複制

通過joinall将任務f和它的參數進行統一排程,實作單線程中的協程。代碼封裝層次很高,實際使用隻需要了解它的幾個主要方法即可。