文章目錄
-
-
-
- 1 線程基本概念
-
- 1.1 線程是什麼?
- 1.2 線程和程序關系?
- 2 Python線程子產品
- 3 線程間同步
- 4 線程池
-
- 4.1 傳統多線程問題?
- 4.2 線程池基本原理:
- 5 協程
-
- 5.1 從yield說起
- 5.2 Send來了
- 6. python 進行并發程式設計
-
- 6.1 使用asyncio
- 6.2 使用async/await
- 7 小結
-
-
1 線程基本概念
1.1 線程是什麼?
線程是指程序内的一個執行單元,也是程序内的可排程實體.
與程序的差別:
(1) 位址空間:程序内的一個執行單元;程序至少有一個線程;它們共享程序的位址空間;而程序有自己獨立的位址空間;
(2) 資源擁有:程序是資源配置設定和擁有的機關,同一個程序内的線程共享程序的資源
(3) 線程是處理器排程的基本機關,但程序不是.
(4) 二者均可并發執行.
簡而言之,一個程式至少有一個程序,一個程序至少有一個線程.
線程的劃分尺度小于程序,使得多線程程式的并發性高。
另外,程序在執行過程中擁有獨立的記憶體單元,而多個線程共享記憶體,進而極大地提高了程式的運作效率。
1.2 線程和程序關系?
程序就是一個應用程式在處理機上的一次執行過程,它是一個動态的概念,而線程是程序中的一部分,程序包含多個線程在運作。
多線程可以共享全局變量,多程序不能。多線程中,所有子線程的程序号相同;多程序中,不同的子程序程序号不同。
程序是具有一定獨立功能的程式關于某個資料集合上的一次運作活動, 程序是系統進行資源配置設定和排程的一個獨立機關.
線程是程序的一個實體,是CPU排程和分派的基本機關,它是比程序更小的能獨立運作的基本機關.線程自己基本上不擁有系統資源,隻擁有一點在運作中必不可少的資源(如程式計數器,一組寄存器和棧),但是它可與同屬一個程序的其他的線程共享程序所擁有的全部資源.
一個線程可以建立和撤銷另一個線程;同一個程序中的多個線程之間可以并發執行.
2 Python線程子產品
python主要是通過thread和threading這兩個子產品來實作多線程支援。python的thread子產品是比較底層的子產品,python的threading子產品是對thread做了一些封裝,可以更加友善的被使用。但是python(cpython)由于GIL的存在無法使用threading充分利用CPU資源,如果想充分發揮多核CPU的計算能力需要使用multiprocessing子產品(Windows下使用會有諸多問題)。
python3.x中已經摒棄了Python2.x中采用函數式thread子產品中的start_new_thread()函數來産生新線程方式。
python3.x中通過threading子產品建立新的線程有兩種方法:一種是通過threading.Thread(Target=executable Method)-即傳遞給Thread對象一個可執行方法(或對象);第二種是繼承threading.Thread定義子類并重寫run()方法。第二種方法中,唯一必須重寫的方法是run()
(1)通過threading.Thread進行建立多線程
import threading
import time
def target():
print("the current threading %s is runing"
%(threading.current_thread().name))
time.sleep(1)
print("the current threading %s is ended"%(threading.current_thread().name))
print("the current threading %s is runing"%(threading.current_thread().name))
## 屬于線程t的部分
t = threading.Thread(target=target)
t.start()
## 屬于線程t的部分
t.join() # join是阻塞目前線程(此處的目前線程時主線程) 主線程直到Thread-1結束之後才結束
print("the current threading %s is ended"%(threading.current_thread().name))
(2)通過繼承threading.Thread定義子類建立多線程
使用Threading子產品建立線程,直接從threading.Thread繼承,然後重寫__init__方法和run方法:
import threading
import time
class myThread(threading.Thread): # 繼承父類threading.Thread
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self): # 把要執行的代碼寫到run函數裡面 線程在建立後會直接運作run函數
print("Starting " + self.name)
print_time(self.name, self.counter, 5)
print("Exiting " + self.name)
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print("%s process at: %s" % (threadName, time.ctime(time.time())))
counter -= 1
# 建立新線程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 開啟線程
thread1.start()
thread2.start()
# 等待線程結束
thread1.join()
thread2.join()
print("Exiting Main Thread")
通過以上案例可以知道,thread1和thread2執行順序是亂序的。要使之有序,需要進行線程同步
3 線程間同步
如果多個線程共同對某個資料修改,則可能出現不可預料的結果,為了保證資料的正确性,需要對多個線程進行同步。
使用Thread對象的Lock和Rlock可以實作簡單的線程同步,這兩個對象都有acquire方法和release方法,對于那些需要每次隻允許一個線程操作的資料,可以将其操作放到acquire和release方法之間。
需要注意的是,Python有一個GIL(Global Interpreter Lock)機制,任何線程在運作之前必須擷取這個全局鎖才能執行,每當執行完100條位元組碼,全局鎖才會釋放,切換到其他線程執行。
多線程實作同步有四種方式:
鎖機制,信号量,條件判斷和同步隊列。
下面我主要關注兩種同步機制:鎖機制和同步隊列。
(1)鎖機制
threading的Lock類,用該類的acquire函數進行加鎖,用realease函數進行解鎖
import threading
import time
class myThread(threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print("Starting " + self.name)
# 獲得鎖,成功獲得鎖定後傳回True
# 可選的timeout參數不填時将一直阻塞直到獲得鎖定
# 否則逾時後将傳回False
threadLock.acquire()
print_time(self.name, self.counter, 5)
# 釋放鎖
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
threadLock = threading.Lock()
threads = []
# 建立新線程
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# 開啟新線程
thread1.start()
thread2.start()
# 添加線程到線程清單
threads.append(thread1)
threads.append(thread2)
# 等待所有線程完成
for t in threads:
t.join()
print("Exiting Main Thread")
(2) 線程同步隊列queue
python2.x中提供的Queue, Python3.x中提供的是queue
見import queue.
Python的queue子產品中提供了同步的、線程安全的隊列類,包括FIFO(先入先出)隊列Queue,LIFO(後入先出)隊列LifoQueue,和優先級隊列PriorityQueue。這些隊列都實作了鎖原語,能夠在多線程中直接使用。可以使用隊列來實作線程間的同步。
queue子產品中的常用方法:
- queue.qsize() 傳回隊列的大小
- queue.empty() 如果隊列為空,傳回True,反之False
- queue.full() 如果隊列滿了,傳回True,反之False
- queue.full 與 maxsize 大小對應
- queue.get([block[, timeout]])擷取隊列,timeout等待時間
- queue.get_nowait() 相當Queue.get(False)
- queue.put(item) 寫入隊列,timeout等待時間
- queue.put_nowait(item) 相當Queue.put(item, False)
- queue.task_done() 在完成一項工作之後,Queue.task_done()函數向任務已經完成的隊列發送一個信号
- queue.join() 實際上意味着等到隊列為空,再執行别的操作
案例1:
import queue
import threading
import time
exitFlag = 0
class myThread(threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print("Starting " + self.name)
process_data(self.name, self.q)
print("Exiting " + self.name)
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
queueLock.release()
print("%s processing %s" % (threadName, data))
else:
queueLock.release()
time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1
# 建立新線程
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# 填充隊列
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# 等待隊列清空
while not workQueue.empty():
pass
# 通知線程是時候退出
exitFlag = 1
# 等待所有線程完成
for t in threads:
t.join()
print("Exiting Main Thread")
案例2:
import time
import threading
import queue
class Worker(threading.Thread):
def __init__(self, name, queue):
threading.Thread.__init__(self)
self.queue = queue
self.start() #執行run()
def run(self):
#循環,保證接着跑下一個任務
while True:
# 隊列為空則退出線程
if self.queue.empty():
break
# 擷取一個隊列資料
foo = self.queue.get()
# 延時1S模拟你要做的事情
time.sleep(1)
# 列印
print(self.getName() + " process " + str(foo))
# 任務完成
self.queue.task_done()
# 隊列
queue = queue.Queue()
# 加入100個任務隊列
for i in range(100):
queue.put(i)
# 開10個線程
for i in range(10):
threadName = 'Thread' + str(i)
Worker(threadName, queue)
# 所有線程執行完畢後關閉
queue.join()
4 線程池
4.1 傳統多線程問題?
傳統多線程方案會使用“即時建立, 即時銷毀”的政策。盡管與建立程序相比,建立線程的時間已經大大的縮短,但是如果送出給線程的任務是執行時間較短,而且執行次數極其頻繁,那麼伺服器将處于不停的建立線程,銷毀線程的狀态。
一個線程的運作時間可以分為3部分:線程的啟動時間、線程體的運作時間和線程的銷毀時間。在多線程處理的情景中,如果線程不能被重用,就意味着每次建立都需要經過啟動、銷毀和運作3個過程。這必然會增加系統相應的時間,降低了效率。
有沒有一種高效的解決方案呢? —— 線程池
4.2 線程池基本原理:
我們把任務放進隊列中去,然後開N個線程,每個線程都去隊列中取一個任務,執行完了之後告訴系統說我執行完了,然後接着去隊列中取下一個任務,直至隊列中所有任務取空,退出線程。
使用線程池:
由于線程預先被建立并放入線程池中,同時處理完目前任務之後并不銷毀而是被安排處理下一個任務,是以能夠避免多次建立線程,進而節省線程建立和銷毀的開銷,能帶來更好的性能和系統穩定性。
線程池要設定為多少?
伺服器CPU核數有限,能夠同時并發的線程數有限,并不是開得越多越好,以及線程切換是有開銷的,如果線程切換過于頻繁,反而會使性能降低
線程執行過程中,計算時間分為兩部分:
- CPU計算,占用CPU
-
不需要CPU計算,不占用CPU,等待IO傳回,比如recv(), accept(), sleep()等操作,具體操作就是比如
通路cache、RPC調用下遊service、通路DB,等需要網絡調用的操作
那麼如果計算時間占50%, 等待時間50%,那麼為了使用率達到最高,可以開2個線程:
假如工作時間是2秒, CPU計算完1秒後,線程等待IO的時候需要1秒,此時CPU空閑了,這時就可以切換到另外一個線程,讓CPU工作1秒後,線程等待IO需要1秒,此時CPU又可以切回去,第一個線程這時剛好完成了1秒的IO等待,可以讓CPU繼續工作,就這樣循環的在兩個線程之前切換操作。
那麼如果計算時間占20%, 等待時間80%,那麼為了使用率達到最高,可以開5個線程:
可以想象成完成任務需要5秒,CPU占用1秒,等待時間4秒,CPU線上程等待時,可以同時再激活4個線程,這樣就把CPU和IO等待時間,最大化的重疊起來
抽象一下,計算線程數設定的公式就是:
N核伺服器,通過執行業務的單線程分析出本地計算時間為x,等待時間為y,則工作線程數(線程池線程數)設定為 N*(x+y)/x,能讓CPU的使用率最大化。
由于有GIL的影響,python隻能使用到1個核,是以這裡設定N=1
import queue
import threading
import time
# 聲明線程池管理類
class WorkManager(object):
def __init__(self, work_num=1000, thread_num=2):
self.work_queue = queue.Queue() # 任務隊列
self.threads = [] # 線程池
self.__init_work_queue(work_num) # 初始化任務隊列,添加任務
self.__init_thread_pool(thread_num) # 初始化線程池,建立線程
"""
初始化線程池
"""
def __init_thread_pool(self, thread_num):
for i in range(thread_num):
# 建立工作線程(線程池中的對象)
self.threads.append(Work(self.work_queue))
"""
初始化工作隊列
"""
def __init_work_queue(self, jobs_num):
for i in range(jobs_num):
self.add_job(do_job, i)
"""
添加一項工作入隊
"""
def add_job(self, func, *args):
self.work_queue.put((func, list(args))) # 任務入隊,Queue内部實作了同步機制
"""
等待所有線程運作完畢
"""
def wait_allcomplete(self):
for item in self.threads:
if item.isAlive(): item.join()
class Work(threading.Thread):
def __init__(self, work_queue):
threading.Thread.__init__(self)
self.work_queue = work_queue
self.start()
def run(self):
# 死循環,進而讓建立的線程在一定條件下關閉退出
while True:
try:
do, args = self.work_queue.get(block=False) # 任務異步出隊,Queue内部實作了同步機制
do(args)
self.work_queue.task_done() # 通知系統任務完成
except:
break
# 具體要做的任務
def do_job(args):
time.sleep(0.1) # 模拟處理時間
print(threading.current_thread())
print(list(args))
if __name__ == '__main__':
start = time.time()
work_manager = WorkManager(100, 10) # 或者work_manager = WorkManager(10000, 20)
work_manager.wait_allcomplete()
end = time.time()
print("cost all time: %s" % (end - start))
5 協程
在python GIL之下,同一時刻隻能有一個線程在運作,那麼對于CPU計算密集的程式來說,線程之間的切換開銷就成了拖累,而以I/O為瓶頸的程式正是協程所擅長的:
Python中的協程經曆了很長的一段發展曆程。其大概經曆了如下三個階段:
- 最初的生成器變形yield/send
- 引入@asyncio.coroutine和yield from
- 在最近的Python3.5版本中引入async/await關鍵字
5.1 從yield說起
先看一段普通的計算斐波那契續列的代碼
newlist =[1]
def newfib(n):
a=0
b=1
while n-1:
a,b=b,a+b
n =n-1
newlist.append(b)
return newlist
print(newfib(10))
# [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
如果我們僅僅是需要拿到斐波那契序列的第n位,或者僅僅是希望依此産生斐波那契序列,那麼上面這種傳統方式就會比較耗費記憶體。
這時,yield就派上用場了。
def fib(n):
a = 0
b = 1
while n:
yield b
a, b = b, a + b
n-=1
for fib_res in fib(20):
print(fib_res)
**當一個函數中包含yield語句時,python會自動将其識别為一個生成器。**這時fib(20)并不會真正調用函數體,而是以函數體生成了一個生成器對象執行個體。
yield在這裡可以保留fib函數的計算現場,暫停fib的計算并将b傳回。而将fib放入for…in循環中時,每次循環都會調用next(fib(20)),喚醒生成器,執行到下一個yield語句處,直到抛出StopIteration異常。此異常會被for循環捕獲,導緻跳出循環。
5.2 Send來了
從上面的程式中可以看到,目前隻有資料從fib(20)中通過yield流向外面的for循環;如果可以向fib(20)發送資料,那不是就可以在Python中實作協程了嘛。
于是,Python中的生成器有了send函數,yield表達式也擁有了傳回值。
我們用這個特性,模拟一個慢速斐波那契數列的計算:
import time,random
def stupid_fib(n):
a = 0
b = 1
while n:
sleep_cnt = yield b
print('let me think {0} secs'.format(sleep_cnt))
time.sleep(sleep_cnt)
a, b = b, a + b
n-= 1
print('-' * 10 + 'test yield send' + '-' * 10)
N = 20
sfib = stupid_fib(N)
fib_res = next(sfib)
while True:
print(fib_res)
try:
fib_res = sfib.send(random.uniform(0, 0.5))
except StopIteration:
break
6. python 進行并發程式設計
在Python 2的時代,高性能的網絡程式設計主要是使用Twisted、Tornado和Gevent這三個庫,但是它們的異步代碼互相之間既不相容也不能移植。
asyncio是Python 3.4版本引入的标準庫,直接内置了對異步IO的支援。
asyncio
的程式設計模型就是一個消息循環。我們從
asyncio
子產品中直接擷取一個
EventLoop
的引用,然後把需要執行的協程扔到
EventLoop
中執行,就實作了異步IO。
Python的在3.4中引入了協程的概念,可是這個還是以生成器對象為基礎。
Python 3.5添加了async和await這兩個關鍵字,分别用來替換
asyncio.coroutine
和
yield from
。
python3.5則确定了協程的文法。下面将簡單介紹asyncio的使用。實作協程的不僅僅是asyncio,tornado和gevent, vloop都實作了類似的功能。
6.1 使用asyncio
用
asyncio
實作
Hello world
代碼如下:
import asyncio
@asyncio.coroutine
def hello():
print("Hello world!")
# 異步調用asyncio.sleep(1):
r = yield from asyncio.sleep(1)
print("Hello again!")
# 擷取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
loop.run_until_complete(hello())
loop.close()
@asyncio.coroutine
把一個generator标記為coroutine類型,然後,我們就把這個
coroutine
扔到
EventLoop
中執行。 hello()
會首先列印出
Hello world!
,然後,
yield from
文法可以讓我們友善地調用另一個
generator
。由于
asyncio.sleep()
也是一個
coroutine
,是以線程不會等待
asyncio.sleep()
,而是直接中斷并執行下一個消息循環。當
asyncio.sleep()
傳回時,線程就可以從
yield from
拿到傳回值(此處是
None
),然後接着執行下一行語句。
把
asyncio.sleep(1)
看成是一個耗時1秒的IO操作,在此期間,主線程并未等待,而是去執行
EventLoop
中其他可以執行的
coroutine
了,是以可以實作并發執行。
我們用Task封裝兩個
coroutine
試試:
import threading
import asyncio
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
觀察執行過程:
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暫停約1秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
由列印的目前線程名稱可以看出,兩個
coroutine
是由同一個線程并發執行的。
如果把
asyncio.sleep()
換成真正的IO操作,則多個
coroutine
就可以由一個線程并發執行。
asyncio案例實戰
我們用
asyncio
的異步網絡連接配接來擷取sina、sohu和163的網站首頁:
async_wget.py
import asyncio
@asyncio.coroutine
def wget(host):
print('wget %s...' % host)
connect = asyncio.open_connection(host, 80)
reader, writer = yield from connect
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
writer.write(header.encode('utf-8'))
yield from writer.drain()
while True:
line = yield from reader.readline()
if line == b'\r\n':
break
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
# Ignore the body, close the socket
writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
結果資訊如下:
wget www.sohu.com...
wget www.sina.com.cn...
wget www.163.com...
(等待一段時間)
(列印出sohu的header)
www.sohu.com header > HTTP/1.1 200 OK
www.sohu.com header > Content-Type: text/html
...
(列印出sina的header)
www.sina.com.cn header > HTTP/1.1 200 OK
www.sina.com.cn header > Date: Wed, 20 May 2015 04:56:33 GMT
...
(列印出163的header)
www.163.com header > HTTP/1.0 302 Moved Temporarily
www.163.com header > Server: Cdn Cache Server V2.0
...
可見3個連接配接由一個線程通過
coroutine
并發完成。
6.2 使用async/await
import asyncio
import re
async def browser(host, port=80):
# 連接配接host
reader, writer = await asyncio.open_connection(host, port)
print(host, port, '連接配接成功!')
# 發起 / 首頁請求(HTTP協定)
# 發送請求頭必須是兩個空行
index_get = 'GET {} HTTP/1.1\r\nHost:{}\r\n\r\n'.format('/', host)
writer.write(index_get.encode())
await writer.drain() # 等待向連接配接寫完資料(請求發送完成)
# 開始讀取響應的資料報頭
while True:
line = await reader.readline() # 等待讀取響應資料
if line == b'\r\n':
break
print(host, '<header>', line)
# 讀取響應的資料body
body = await reader.read()
print(encoding)
print(host, '<content>', body)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
tasks = [browser(host) for host in ['www.dushu.com', 'www.sina.com.cn', 'www.baidu.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('---over---')
7 小結
asyncio
提供了完善的異步IO支援;
異步操作需要在
coroutine
中通過
yield from
完成;
多個
coroutine
可以封裝成一組Task然後并發執行。