天天看點

python并發程式設計補充

一、程序通信

1、信号量

互斥鎖:同時隻允許一個線程更改資料,而Semaphore是同時允許一定數量的線程更改資料

        如果指定信号量為3,那麼來一個人獲得一把鎖,計數加1,當計數等于3時,後面的人均需要等待。一旦釋放,就有人可以獲得一把鎖,信号量與程序池的概念很像,但是要區分開,信号量涉及到加鎖的概念。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

<code>from</code> <code>multiprocessing </code><code>import</code> <code>Process,Semaphore</code>

<code>import</code> <code>time,random</code>

<code>def</code> <code>action(sem,user):</code>

<code>    </code><code>sem.acquire()</code>

<code>    </code><code>print</code><code>(</code><code>'%s 占一個位置'</code> <code>%</code><code>user)</code>

<code>    </code><code>time.sleep(random.randint(</code><code>0</code><code>,</code><code>3</code><code>))           </code><code>#模拟程序執行時間</code>

<code>    </code><code>sem.release()</code>

<code>if</code> <code>__name__ </code><code>=</code><code>=</code> <code>'__main__'</code><code>:</code>

<code>    </code><code>sem</code><code>=</code><code>Semaphore(</code><code>5</code><code>)</code>

<code>    </code><code>p_l</code><code>=</code><code>[]                          </code><code>#存放啟動的程序</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>13</code><code>):</code>

<code>        </code><code>p</code><code>=</code><code>Process(target</code><code>=</code><code>action,args</code><code>=</code><code>(sem,</code><code>'user%s'</code> <code>%</code><code>i,))</code>

<code>        </code><code>p.start()</code>

<code>        </code><code>p_l.append(p)</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>p_l:</code>

<code>        </code><code>i.join()                     </code><code>#保證是以子程序執行完畢</code>

<code>    </code><code>print</code><code>(</code><code>'============》'</code><code>)</code>

2、事件

python線程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。

    事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。

    clear:将“Flag”設定為False

    set:将“Flag”設定為True

3、程序池

在利用Python進行系統管理的時候,特别是同時操作多個檔案目錄,或者遠端控制多台主機,并行操作可以節約大量的時間。多程序是實作并發的手段之一,需要注意的問題是:

(1)很明顯需要并發執行的任務通常要遠大于核數

(2)一個作業系統不可能無限開啟程序,通常有幾個核就開幾個程序

(3)程序開啟過多,效率反而會下降(開啟程序是需要占用系統資源的,而且開啟多餘核數目的程序也無法做到并行)

    我們就可以通過維護一個程序池來控制程序數目

    對于遠端過程調用的進階應用程式而言,應該使用程序池,Pool可以提供指定數量的程序,供使用者調用,當有新的請求送出到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,就重用程序池中的程序。

建立程序池的類:如果指定numprocess為3,則程序池會從無到有建立三個程序,然後自始至終使用這三個程序去執行所有任務,不會開啟其他程序.

4、使用程序池維護固定數目的程序

# 開啟6個用戶端,會發現2個用戶端處于等待狀态

# 服務端

18

19

20

21

22

<code>from</code> <code>socket </code><code>import</code> <code>*</code>

<code>from</code> <code>multiprocessing </code><code>import</code> <code>Pool</code>

<code>import</code> <code>os</code>

<code>server</code><code>=</code><code>socket(AF_INET,SOCK_STREAM)</code>

<code>server.setsockopt(SOL_SOCKET,SO_REUSEADDR,</code><code>1</code><code>)</code>

<code>server.bind((</code><code>'127.0.0.1'</code><code>,</code><code>8080</code><code>))</code>

<code>server.listen(</code><code>5</code><code>)</code>

<code>def</code> <code>talk(conn,client_addr):</code>

<code>    </code><code>print</code><code>(</code><code>'程序pid: %s'</code> <code>%</code><code>os.getpid())</code>

<code>    </code><code>while</code> <code>True</code><code>:</code>

<code>        </code><code>try</code><code>:</code>

<code>            </code><code>msg</code><code>=</code><code>conn.recv(</code><code>1024</code><code>)</code>

<code>            </code><code>if</code> <code>not</code> <code>msg:</code><code>break</code>

<code>            </code><code>conn.send(msg.upper())</code>

<code>        </code><code>except</code> <code>Exception:</code>

<code>            </code><code>break</code>

<code>    </code><code>p</code><code>=</code><code>Pool()                               </code><code>#Pool内的程序數預設是cpu核數,假設為4(檢視方法os.cpu_count())</code>

<code>        </code><code>conn,client_addr</code><code>=</code><code>server.accept()</code>

<code>        </code><code>p.apply_async(talk,args</code><code>=</code><code>(conn,client_addr))</code>

<code>        </code><code># p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間隻有一個用戶端能通路</code>

# 用戶端

<code>client</code><code>=</code><code>socket(AF_INET,SOCK_STREAM)</code>

<code>client.connect((</code><code>'127.0.0.1'</code><code>,</code><code>8080</code><code>))</code>

<code>while</code> <code>True</code><code>:</code>

<code>    </code><code>msg</code><code>=</code><code>input</code><code>(</code><code>'&gt;&gt;: '</code><code>).strip()</code>

<code>    </code><code>if</code> <code>not</code> <code>msg:</code><code>continue</code>

<code>    </code><code>client.send(msg.encode(</code><code>'utf-8'</code><code>))</code>

<code>    </code><code>msg</code><code>=</code><code>client.recv(</code><code>1024</code><code>)</code>

<code>    </code><code>print</code><code>(msg.decode(</code><code>'utf-8'</code><code>))</code>

5、回調函數

需要回調函數的場景:程序池中任何一個任務一旦處理完了,就立即告知主程序可以處理我的結果了。主程序則調用一個函數去處理該結果,該函數即回調函數

我們可以把耗時間(阻塞)的任務放到程序池中,然後指定回調函數(主程序負責執行),這樣主程序在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

<code># 爬蟲案例</code>

<code>import</code> <code>requests</code>

<code>import</code> <code>re</code>

<code>def</code> <code>get_page(url,pattern):</code>

<code>    </code><code>response</code><code>=</code><code>requests.get(url)</code>

<code>    </code><code>if</code> <code>response.status_code </code><code>=</code><code>=</code> <code>200</code><code>:</code>

<code>        </code><code>return</code> <code>(response.text,pattern)</code>

<code>def</code> <code>parse_page(info):</code>

<code>    </code><code>page_content,pattern</code><code>=</code><code>info</code>

<code>    </code><code>res</code><code>=</code><code>re.findall(pattern,page_content)</code>

<code>    </code><code>for</code> <code>item </code><code>in</code> <code>res:</code>

<code>        </code><code>dic</code><code>=</code><code>{</code>

<code>            </code><code>'index'</code><code>:item[</code><code>0</code><code>],</code>

<code>            </code><code>'title'</code><code>:item[</code><code>1</code><code>],</code>

<code>            </code><code>'actor'</code><code>:item[</code><code>2</code><code>].strip()[</code><code>3</code><code>:],</code>

<code>            </code><code>'time'</code><code>:item[</code><code>3</code><code>][</code><code>5</code><code>:],</code>

<code>            </code><code>'score'</code><code>:item[</code><code>4</code><code>]</code><code>+</code><code>item[</code><code>5</code><code>]</code>

<code>        </code><code>}</code>

<code>        </code><code>print</code><code>(dic)</code>

<code>    </code><code>pattern1</code><code>=</code><code>re.</code><code>compile</code><code>(r</code><code>'&lt;dd&gt;.*?board-index.*?&gt;(\d+)&lt;.*?title="(.*?)".*?star.*?&gt;(.*?)&lt;.*?releasetime.*?&gt;(.*?)&lt;.*?integer.*?&gt;(.*?)&lt;.*?fraction.*?&gt;(.*?)&lt;'</code><code>,re.S)</code>

<code>    </code><code>url_dic</code><code>=</code><code>{</code>

<code>        </code><code>'http://maoyan.com/board/7'</code><code>:pattern1,</code>

<code>    </code><code>}</code>

<code>    </code><code>p</code><code>=</code><code>Pool()</code>

<code>    </code><code>res_l</code><code>=</code><code>[]</code>

<code>    </code><code>for</code> <code>url,pattern </code><code>in</code> <code>url_dic.items():</code>

<code>        </code><code>res</code><code>=</code><code>p.apply_async(get_page,args</code><code>=</code><code>(url,pattern),callback</code><code>=</code><code>parse_page)</code>

<code>        </code><code>res_l.append(res)</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>res_l:</code>

<code>        </code><code>i.get()</code>

<code>    </code><code># res=requests.get('http://maoyan.com/board/7')</code>

<code>    </code><code># print(re.findall(pattern,res.text))</code>

<code>'''結果:</code>

<code>{'index': '1', 'title': '神秘巨星', 'actor': '阿米爾·汗,塞伊拉·沃西,梅·維賈', 'time': '2018-01-19', 'score': '9.5'}</code>

<code>{'index': '2', 'title': '奇迹男孩', 'actor': '雅各布·特瑞布雷,朱莉娅·羅伯茨,歐文·威爾遜', 'time': '2018-01-19', 'score': '9.3'}</code>

<code>{'index': '3', 'title': '小狗奶瓶', 'actor': '奶瓶,康潇諾,魏子涵', 'time': '2018-02-02', 'score': '9.3'}</code>

<code>{'index': '4', 'title': '公牛曆險記', 'actor': '約翰·塞納,莉莉·戴,凱特·邁克金農', 'time': '2018-01-19', 'score': '9.2'}</code>

<code>{'index': '5', 'title': '前任3:再見前任', 'actor': '韓庚,鄭恺,于文文', 'time': '2017-12-29', 'score': '9.2'}</code>

<code>{'index': '6', 'title': '一個人的課堂', 'actor': '孫海英,韓三明,王乃訓', 'time': '2018-01-16', 'score': '9.2'}</code>

<code>{'index': '7', 'title': '芳華', 'actor': '黃軒,苗苗,鐘楚曦', 'time': '2017-12-15', 'score': '9.1'}</code>

<code>{'index': '8', 'title': '南極之戀', 'actor': '趙又廷,楊子姗', 'time': '2018-02-01', 'score': '9.0'}</code>

<code>{'index': '9', 'title': '馬戲之王', 'actor': '休·傑克曼,紮克·埃夫隆,米歇爾·威廉姆斯', 'time': '2018-02-01', 'score': '9.0'}</code>

<code>{'index': '10', 'title': '小馬寶莉大電影', 'actor': '奧卓·阿杜巴,艾米莉·布朗特,克裡斯汀·肯諾恩斯', 'time': '2018-02-02', 'score': '8.9'}</code>

<code>'''</code>

6、如果在主程序中等待程序池中所有任務都執行完畢後,再統一處理結果,則無需回調函數

<code>import</code> <code>time,random,os</code>

<code>def</code> <code>work(n):</code>

<code>    </code><code>time.sleep(</code><code>1</code><code>)</code>

<code>    </code><code>return</code> <code>n</code><code>*</code><code>*</code><code>2</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>10</code><code>):</code>

<code>        </code><code>res</code><code>=</code><code>p.apply_async(work,args</code><code>=</code><code>(i,))</code>

<code>    </code><code>p.close()</code>

<code>    </code><code>p.join()                </code><code>#等待程序池中所有程序執行完畢</code>

<code>    </code><code>nums</code><code>=</code><code>[]</code>

<code>    </code><code>for</code> <code>res </code><code>in</code> <code>res_l:</code>

<code>        </code><code>nums.append(res.get()) </code><code>#拿到所有結果</code>

<code>    </code><code>print</code><code>(nums)             </code><code>#主程序拿到所有的處理結果,可以在主程序中進行統一進行處理</code>

二、通信線程

1、死鎖現象與遞歸鎖

所謂死鎖: 是指兩個或兩個以上的程序或線程在執行過程中,因争奪資源而造成的一種互相等待的現象,若無外力作用,它們都将無法推進下去。此時稱系統處于死鎖狀态或系統産生了死鎖,這些永遠在互相等待的程序稱為死鎖程序

解決方法,遞歸鎖,在Python中為了支援在同一線程中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock内部維護着一個Lock和一個counter變量,counter記錄了acquire的次數,進而使得資源可以被多次require。直到一個線程所有的acquire都被release,其他的線程才能獲得資源。

<code>from</code> <code>threading </code><code>import</code> <code>Thread,RLock</code>

<code>import</code> <code>time</code>

<code>mutexA</code><code>=</code><code>mutexB</code><code>=</code><code>RLock()</code>

<code>class</code> <code>MyThread(Thread):</code>

<code>    </code><code>def</code> <code>run(</code><code>self</code><code>):</code>

<code>        </code><code>self</code><code>.func1()</code>

<code>        </code><code>self</code><code>.func2()</code>

<code>    </code><code>def</code> <code>func1(</code><code>self</code><code>):</code>

<code>        </code><code>mutexA.acquire()</code>

<code>        </code><code>print</code><code>(</code><code>'\033[41m%s 拿到A鎖\033[0m'</code> <code>%</code><code>self</code><code>.name)</code>

<code>        </code><code>mutexB.acquire()</code>

<code>        </code><code>print</code><code>(</code><code>'\033[42m%s 拿到B鎖\033[0m'</code> <code>%</code><code>self</code><code>.name)</code>

<code>        </code><code>mutexB.release()</code>

<code>        </code><code>mutexA.release()</code>

<code>    </code><code>def</code> <code>func2(</code><code>self</code><code>):</code>

<code>        </code><code>print</code><code>(</code><code>'\033[43m%s 拿到B鎖\033[0m'</code> <code>%</code><code>self</code><code>.name)</code>

<code>        </code><code>time.sleep(</code><code>2</code><code>)</code>

<code>        </code><code>print</code><code>(</code><code>'\033[44m%s 拿到A鎖\033[0m'</code> <code>%</code><code>self</code><code>.name)</code>

<code>        </code><code>t</code><code>=</code><code>MyThread()</code>

<code>        </code><code>t.start()</code>

2、信号量Semaphore

同程序的一樣

Semaphore管理一個内置的計數器,每當調用acquire()時内置計數器-1;調用release() 時内置計數器+1;

計數器不能小于0;當計數器為0時,acquire()将阻塞線程直到其他線程調用release()。

<code>from</code> <code>threading </code><code>import</code> <code>Thread,Semaphore,current_thread</code>

<code>import</code> <code>threading</code>

<code>def</code> <code>func():</code>

<code>    </code><code>with sm:</code>

<code>        </code><code>print</code><code>(</code><code>'%s get sm'</code> <code>%</code><code>current_thread().getName())</code>

<code>        </code><code>time.sleep(random.randint(</code><code>1</code><code>,</code><code>3</code><code>))</code>

<code>    </code><code>sm</code><code>=</code><code>Semaphore(</code><code>5</code><code>)</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>20</code><code>):</code>

<code>        </code><code>t</code><code>=</code><code>Thread(target</code><code>=</code><code>func)</code>

3、Event

線程的一個關鍵特性是每個線程都是獨立運作且狀态不可預測。如果程式中的其他線程需要通過判斷某個線程的狀态來确定自己下一步的操作,這時線程同步問題就會變得非常棘手。

    是以我們需要使用threading庫中的Event對象。對象包含一個可由線程設定的信号标志,它允許線程等待某些事件的發生。在初始情況下,Event對象中的信号标志被

為假。如果有線程等待一個Event對象, 而這個Event對象的标志為假,那麼這個線程将會被一直阻塞直至該标志為真。一個線程如果将一個Event對象的信号标志設定為真,

它将喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設定為真的Event對象,那麼它将忽略這個事件, 繼續執行。

event.isSet():傳回event的狀态值;

event.wait():如果 event.isSet()==False将阻塞線程;

event.set(): 設定event的狀态值為True,所有阻塞池的線程激活進入就緒狀态, 等待作業系統排程;

event.clear():恢複event的狀态值為False。

<code># 模拟連接配接mysql</code>

<code>from</code> <code>threading </code><code>import</code> <code>Thread,Event</code>

<code>def</code> <code>conn_mysql():</code>

<code>    </code><code>count</code><code>=</code><code>1</code>

<code>    </code><code>while</code> <code>not</code> <code>event.is_set():</code>

<code>        </code><code>if</code> <code>count &gt; </code><code>3</code><code>:                            </code><code>#超過次數就抛出異常連結逾時</code>

<code>            </code><code>raise</code> <code>TimeoutError(</code><code>'連結逾時'</code><code>)</code>

<code>        </code><code>print</code><code>(</code><code>'&lt;%s&gt;第%s次嘗試連結'</code> <code>%</code> <code>(threading.current_thread().getName(), count))</code>

<code>        </code><code>event.wait(</code><code>1</code><code>)                                    </code><code>#等待1秒後接着嘗試連接配接</code>

<code>        </code><code>count</code><code>+</code><code>=</code><code>1</code>

<code>    </code><code>print</code><code>(</code><code>'&lt;%s&gt;連結成功'</code> <code>%</code><code>threading.current_thread().getName())</code>

<code>def</code> <code>check_mysql():</code>

<code>    </code><code>print</code><code>(</code><code>'\033[45m[%s]正在檢查mysql\033[0m'</code> <code>%</code> <code>threading.current_thread().getName())</code>

<code>    </code><code>time.sleep(random.randint(</code><code>2</code><code>,</code><code>4</code><code>))</code>

<code>    </code><code>event.</code><code>set</code><code>()</code>

<code>    </code><code>event</code><code>=</code><code>Event()</code>

<code>    </code><code>conn1</code><code>=</code><code>Thread(target</code><code>=</code><code>conn_mysql)</code>

<code>    </code><code>conn2</code><code>=</code><code>Thread(target</code><code>=</code><code>conn_mysql)</code>

<code>    </code><code>check</code><code>=</code><code>Thread(target</code><code>=</code><code>check_mysql)</code>

<code>    </code><code>conn1.start()</code>

<code>    </code><code>conn2.start()</code>

<code>    </code><code>check.start()</code>

4、定時器

# 驗證碼定時器

<code>from</code> <code>threading </code><code>import</code> <code>Timer</code>

<code>import</code> <code>random,time</code>

<code>class</code> <code>Code:</code>

<code>    </code><code>def</code> <code>__init__(</code><code>self</code><code>):</code>

<code>        </code><code>self</code><code>.make_cache()</code>

<code>    </code><code>def</code> <code>make_cache(</code><code>self</code><code>,interval</code><code>=</code><code>5</code><code>):</code>

<code>        </code><code>self</code><code>.cache</code><code>=</code><code>self</code><code>.make_code()</code>

<code>        </code><code>print</code><code>(</code><code>self</code><code>.cache)</code>

<code>        </code><code>self</code><code>.t</code><code>=</code><code>Timer(interval,</code><code>self</code><code>.make_cache)</code>

<code>        </code><code>self</code><code>.t.start()</code>

<code>    </code><code>def</code> <code>make_code(</code><code>self</code><code>,n</code><code>=</code><code>4</code><code>):</code>

<code>        </code><code>res</code><code>=</code><code>''</code>

<code>        </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(n):</code>

<code>            </code><code>s1</code><code>=</code><code>str</code><code>(random.randint(</code><code>0</code><code>,</code><code>9</code><code>))</code>

<code>            </code><code>s2</code><code>=</code><code>chr</code><code>(random.randint(</code><code>65</code><code>,</code><code>90</code><code>))</code>

<code>            </code><code>res</code><code>+</code><code>=</code><code>random.choice([s1,s2])</code>

<code>        </code><code>return</code> <code>res</code>

<code>    </code><code>def</code> <code>check(</code><code>self</code><code>):</code>

<code>        </code><code>while</code> <code>True</code><code>:</code>

<code>            </code><code>inp</code><code>=</code><code>input</code><code>(</code><code>'&gt;&gt;: '</code><code>).strip()</code>

<code>            </code><code>if</code> <code>inp.upper() </code><code>=</code><code>=</code>  <code>self</code><code>.cache:</code>

<code>                </code><code>print</code><code>(</code><code>'驗證成功'</code><code>,end</code><code>=</code><code>'\n'</code><code>)</code>

<code>                </code><code>self</code><code>.t.cancel()</code>

<code>                </code><code>break</code>

<code>    </code><code>obj</code><code>=</code><code>Code()</code>

<code>    </code><code>obj.check()</code>

5、線程queue

(1)先進先出

import queue

q=queue.Queue()

q.put('first')

q.put('second')

q.put('third')

print(q.get())       #first

print(q.get())       #second

print(q.get())       #third

(2)堆棧(後進先出)

q=queue.LifoQueue()

(3)按照優先級取值

q=queue.PriorityQueue()

#put進入一個元組,元組的第一個元素是優先級(通常是數字,也可以是非數字之間的比較),數字越小優先級越高

q.put((20,'a'))

q.put((-5,'b'))

q.put((30,'c'))

print(q.get())         #(-5, 'b')

print(q.get())         #(20, 'a')

print(q.get())         #(30, 'c')

三、Python标準子產品--concurrent.futures

1、介紹

concurrent.futures子產品提供了高度封裝的異步調用接口

ThreadPoolExecutor:線程池,提供異步調用

ProcessPoolExecutor: 程序池,提供異步調用

2、ProcessPoolExecutor用法

<code>from</code> <code>concurrent.futures </code><code>import</code> <code>ThreadPoolExecutor,ProcessPoolExecutor</code>

<code>import</code> <code>os,time,random</code>

<code>def</code> <code>task(n):</code>

<code>    </code><code>print</code><code>(</code><code>'%s is runing'</code> <code>%</code><code>os.getpid())</code>

<code>    </code><code>time.sleep(random.randint(</code><code>1</code><code>,</code><code>3</code><code>))</code>

<code>    </code><code>executor</code><code>=</code><code>ProcessPoolExecutor(max_workers</code><code>=</code><code>3</code><code>)</code>

<code>    </code><code>futures</code><code>=</code><code>[]</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>11</code><code>):</code>

<code>        </code><code>future</code><code>=</code><code>executor.submit(task,i)         </code><code>#異步送出任務</code>

<code>        </code><code>futures.append(future)</code>

<code>    </code><code>executor.shutdown(</code><code>True</code><code>)                   </code><code>#wait=True,等待池内所有任務執行完畢回收完資源後才繼續,wait=False,立即傳回,并不會等待池内的任務執行完畢</code>

<code>    </code><code>print</code><code>(</code><code>'+++&gt;'</code><code>)</code>

<code>    </code><code>for</code> <code>future </code><code>in</code> <code>futures:</code>

<code>        </code><code>print</code><code>(future.result())                 </code><code>#取得結果</code>

3、map的用法

<code>    </code><code>executor</code><code>=</code><code>ThreadPoolExecutor(max_workers</code><code>=</code><code>3</code><code>)</code>

<code>    </code><code># for i in range(11):</code>

<code>    </code><code>#     future=executor.submit(task,i)</code>

<code>    </code><code>executor.</code><code>map</code><code>(task,</code><code>range</code><code>(</code><code>1</code><code>,</code><code>10</code><code>))               </code><code>#map取代for循環submit的操作</code>

4、回調函數

<code>import</code> <code>json</code>

<code>def</code> <code>get_page(url):</code>

<code>    </code><code>print</code><code>(</code><code>'&lt;程序%s&gt; get %s'</code> <code>%</code><code>(os.getpid(),url))</code>

<code>    </code><code>respone</code><code>=</code><code>requests.get(url)</code>

<code>    </code><code>if</code> <code>respone.status_code </code><code>=</code><code>=</code> <code>200</code><code>:</code>

<code>        </code><code>return</code> <code>{</code><code>'url'</code><code>:url,</code><code>'text'</code><code>:respone.text}</code>

<code>def</code> <code>parse_page(res):</code>

<code>    </code><code>res</code><code>=</code><code>res.result()</code>

<code>    </code><code>print</code><code>(</code><code>'&lt;程序%s&gt; parse %s'</code> <code>%</code><code>(os.getpid(),res[</code><code>'url'</code><code>]))</code>

<code>    </code><code>parse_res</code><code>=</code><code>'url:&lt;%s&gt; size:[%s]\n'</code> <code>%</code><code>(res[</code><code>'url'</code><code>],</code><code>len</code><code>(res[</code><code>'text'</code><code>]))</code>

<code>    </code><code>with </code><code>open</code><code>(</code><code>'db.txt'</code><code>,</code><code>'a'</code><code>) as f:</code>

<code>        </code><code>f.write(parse_res)</code>

<code>    </code><code>urls</code><code>=</code><code>[</code>

<code>        </code><code>'https://www.baidu.com'</code><code>,</code>

<code>        </code><code>'https://www.python.org'</code><code>,</code>

<code>        </code><code>'https://www.openstack.org'</code><code>,</code>

<code>        </code><code>'https://help.github.com/'</code><code>,</code>

<code>        </code><code>'http://www.sina.com.cn/'</code>

<code>    </code><code>]</code>

<code>    </code><code># p=Pool(3)</code>

<code>    </code><code># for url in urls:</code>

<code>    </code><code>#     p.apply_async(get_page,args=(url,),callback=pasrse_page)</code>

<code>    </code><code># p.close()</code>

<code>    </code><code># p.join()</code>

<code>    </code><code>p</code><code>=</code><code>ProcessPoolExecutor(</code><code>3</code><code>)</code>

<code>    </code><code>for</code> <code>url </code><code>in</code> <code>urls:</code>

<code>        </code><code>p.submit(get_page,url).add_done_callback(parse_page) </code><code>#parse_page拿到的是一個future對象obj,需要用obj.result()拿到結果</code>

5、同步調用和異步調用

送出任務的兩種方式:

    同步調用:送出完任務後,就在原地等待,等待任務執行完畢,拿到任務的傳回值,才能繼續下一行代碼,導緻程式串行執行

    異步調用+回調機制:送出完任務後,不在原地等待,任務一旦執行完畢就會觸發回調函數的執行, 程式是并發執行

#同步調用示例:

<code>    </code><code>print</code><code>(</code><code>'%s is ruuning'</code> <code>%</code><code>os.getpid())</code>

<code>def</code> <code>handle(res):</code>

<code>    </code><code>print</code><code>(</code><code>'handle res %s'</code> <code>%</code><code>res)</code>

<code>    </code><code>pool</code><code>=</code><code>ProcessPoolExecutor(</code><code>2</code><code>)</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>5</code><code>):</code>

<code>        </code><code>res</code><code>=</code><code>pool.submit(task,i).result()</code>

<code>        </code><code>handle(res)</code>

<code>    </code><code>pool.shutdown(wait</code><code>=</code><code>True</code><code>)</code>

<code>    </code><code># pool.submit(task,33333)</code>

<code>    </code><code>print</code><code>(</code><code>'主'</code><code>)</code>

# 異步調用示例:

<code>    </code><code># res=n**2</code>

<code>    </code><code># handle(res)</code>

<code>        </code><code>obj</code><code>=</code><code>pool.submit(task,i)</code>

<code>        </code><code>obj.add_done_callback(handle)</code>

四、協程

1、原理

基于單線程來實作并發,即隻用一個主線程(很明顯可利用的cpu隻有一個)情況下實作并發,這就要用到協程

    對于單線程下,我們不可避免程式中出現io操作,但如果我們能在自己的程式中控制單線程下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該線程能夠最大限度地處于就緒态,即随時都可以被cpu執行的狀态,相當于我們在使用者程式級别将自己的io操作最大限度地隐藏起來,進而可以迷惑作業系統,讓其以為該線程好像是一直在計算,io比較少,進而更多的将cpu的執行權限配置設定給我們的線程,提高程式的運作效率。

協程的本質就是在單線程下,由使用者自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。

是以需要同時滿足以下條件的解決方案:

    (1)可以控制多個任務之間的切換,切換之前将任務的狀态儲存下來,以便重新運作時,可以基于暫停的位置繼續執行。

    (2)作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換

2、介紹

協程:是單線程下的并發,又稱微線程,纖程。協程是一種使用者态的輕量級線程,即協程是由使用者程式自己控制排程的

對比作業系統控制線程的切換,使用者在單線程内控制協程的切換有什麼優缺點?

    優點如下:

        #1. 協程的切換開銷更小,屬于程式級别的切換,作業系統完全感覺不到,因而更加輕量級

        #2. 單線程内就可以實作并發的效果,最大限度地利用cpu

    缺點如下:

        #1. 協程的本質是單線程下,無法利用多核,可以是一個程式開啟多個程序,每個程序内開啟多個線程,每個線程内開啟協程

        #2. 協程指的是單個線程,因而一旦協程出現阻塞,将會阻塞整個線程

3、總結協程特點

(1)必須在隻有一個單線程裡實作并發

(2)修改共享資料不需加鎖

(3)使用者程式裡自己儲存多個控制流的上下文棧

(4)一個協程遇到IO操作自動切換到其它協程(如何實作檢測IO,就用到了gevent子產品(select機制))

4、greenlet子產品

<code>#pip3 install greenlet           #安裝greenlet子產品</code>

<code>from</code> <code>greenlet </code><code>import</code> <code>greenlet</code>

<code>def</code> <code>eat(name):</code>

<code>    </code><code>print</code><code>(</code><code>'%s eat 1'</code> <code>%</code><code>name)</code>

<code>    </code><code>time.sleep(</code><code>100</code><code>)</code>

<code>    </code><code>g2.switch(</code><code>'wang'</code><code>)</code>

<code>    </code><code>print</code><code>(</code><code>'%s eat 2'</code> <code>%</code><code>name)</code>

<code>    </code><code>g2.switch()</code>

<code>def</code> <code>play(name):</code>

<code>    </code><code>print</code><code>(</code><code>'%s play 1'</code> <code>%</code> <code>name)</code>

<code>    </code><code>g1.switch()</code>

<code>    </code><code>print</code><code>(</code><code>'%s play 2'</code> <code>%</code> <code>name)</code>

<code>g1</code><code>=</code><code>greenlet(eat)</code>

<code>g2</code><code>=</code><code>greenlet(play)</code>

<code>g1.switch(</code><code>'wang'</code><code>)</code>

<code>使用greenlet子產品可以非常簡單地實作多個任務的切換,當切到一個任務執行時如果遇到io,那就原地阻塞,這仍然沒有解決遇到IO自動切換來提升效率的問題</code>

5、Gevent子產品(遇到IO阻塞時會自動切換任務)

#pip3 install gevent                       #安裝Gevent子產品

(1)用法

<code>g1</code><code>=</code><code>gevent.spawn(func,</code><code>1</code><code>,,</code><code>2</code><code>,</code><code>3</code><code>,x</code><code>=</code><code>4</code><code>,y</code><code>=</code><code>5</code><code>)   </code><code>#建立一個協程對象g1,spawn括号内第一個參數是函數名,後面可以有多個位置實參或關鍵字實參,都是傳給函數的</code>

<code>g2</code><code>=</code><code>gevent.spawn(func2)</code>

<code>g1.join()                     </code><code>#等待g1結束</code>

<code>g2.join()                     </code><code>#等待g2結束</code>

<code>#或者上述兩步合作一步:gevent.joinall([g1,g2])</code>

<code>g1.value                      </code><code>#拿到func1的傳回值</code>

(2)遇到IO阻塞時會自動切換任務

<code>from</code> <code>gevent </code><code>import</code> <code>monkey;monkey.patch_all()</code>

<code>import</code> <code>gevent</code>

<code>    </code><code># gevent.sleep(3)</code>

<code>    </code><code>time.sleep(</code><code>3</code><code>)</code>

<code>    </code><code># gevent.sleep(2)</code>

<code>g1</code><code>=</code><code>gevent.spawn(eat,</code><code>'wang'</code><code>)</code>

<code>g2</code><code>=</code><code>gevent.spawn(play,</code><code>'li'</code><code>)</code>

<code># gevent.sleep(1)</code>

<code># g1.join()</code>

<code># g2.join()</code>

<code>gevent.joinall([g1,g2])</code>

<code>我們可以用threading.current_thread().getName()來檢視每個g1和g2,檢視的結果為DummyThread</code><code>-</code><code>n,即假線程</code>

(3)爬蟲

<code>    </code><code>print</code><code>(</code><code>'GET: %s'</code> <code>%</code><code>url)</code>

<code>        </code><code>print</code><code>(</code><code>'%d bytes received from %s'</code> <code>%</code><code>(</code><code>len</code><code>(response.text),url))  </code><code>#統計内容的長度</code>

<code>start_time</code><code>=</code><code>time.time()</code>

<code>gevent.joinall([</code>

<code>    </code><code>gevent.spawn(get_page,</code><code>'https://www.python.org/'</code><code>),</code>

<code>    </code><code>gevent.spawn(get_page,</code><code>'https://www.yahoo.com/'</code><code>),</code>

<code>    </code><code>gevent.spawn(get_page,</code><code>'https://github.com/'</code><code>),</code>

<code>])</code>

<code>stop_time</code><code>=</code><code>time.time()</code>

<code>print</code><code>(</code><code>'run time is %s'</code> <code>%</code><code>(stop_time</code><code>-</code><code>start_time))</code>

(4)通過gevent實作單線程下的socket并發(from gevent import monkey;monkey.patch_all()一定要放到導入socket子產品之前,否則gevent無法識别socket的阻塞)

<code>#如果不想用money.patch_all()打更新檔,可以用gevent自帶的socket</code>

<code># from gevent import socket</code>

<code># s=socket.socket()</code>

<code>def</code> <code>server(server_ip,port):</code>

<code>    </code><code>s</code><code>=</code><code>socket(AF_INET,SOCK_STREAM)</code>

<code>    </code><code>s.setsockopt(SOL_SOCKET,SO_REUSEADDR,</code><code>1</code><code>)</code>

<code>    </code><code>s.bind((server_ip,port))</code>

<code>    </code><code>s.listen(</code><code>5</code><code>)</code>

<code>        </code><code>conn,addr</code><code>=</code><code>s.accept()</code>

<code>        </code><code>gevent.spawn(talk,conn,addr)</code>

<code>def</code> <code>talk(conn,addr):</code>

<code>    </code><code>try</code><code>:</code>

<code>            </code><code>res</code><code>=</code><code>conn.recv(</code><code>1024</code><code>)</code>

<code>            </code><code>print</code><code>(</code><code>'client %s:%s msg: %s'</code> <code>%</code><code>(addr[</code><code>0</code><code>],addr[</code><code>1</code><code>],res))</code>

<code>            </code><code>conn.send(res.upper())</code>

<code>    </code><code>except</code> <code>Exception as e:</code>

<code>        </code><code>print</code><code>(e)</code>

<code>    </code><code>finally</code><code>:</code>

<code>        </code><code>conn.close()</code>

<code>    </code><code>server(</code><code>'127.0.0.1'</code><code>,</code><code>8080</code><code>)</code>

<code>#_*_coding:utf-8_*_</code>

# 多線程并發多個用戶端

<code>from</code> <code>threading </code><code>import</code> <code>Thread</code>

<code>def</code> <code>client(server_ip,port):</code>

<code>    </code><code>c</code><code>=</code><code>socket(AF_INET,SOCK_STREAM) </code><code>#套接字對象一定要加到函數内,即局部名稱空間内,放在函數外則被所有線程共享,那麼用戶端端口永遠一樣了</code>

<code>    </code><code>c.connect((server_ip,port))</code>

<code>    </code><code>count</code><code>=</code><code>0</code>

<code>        </code><code>c.send((</code><code>'%s say hello %s'</code> <code>%</code><code>(threading.current_thread().getName(),count)).encode(</code><code>'utf-8'</code><code>))</code>

<code>        </code><code>msg</code><code>=</code><code>c.recv(</code><code>1024</code><code>)</code>

<code>        </code><code>print</code><code>(msg.decode(</code><code>'utf-8'</code><code>))</code>

<code>    </code><code>for</code> <code>i </code><code>in</code> <code>range</code><code>(</code><code>500</code><code>):</code>

<code>        </code><code>t</code><code>=</code><code>Thread(target</code><code>=</code><code>client,args</code><code>=</code><code>(</code><code>'127.0.0.1'</code><code>,</code><code>8080</code><code>))</code>

本文轉自 宋鵬超 51CTO部落格,原文連結:http://blog.51cto.com/qidian510/2071128,如需轉載請自行聯系原作者