天天看點

20.2、python程序間通信——隊列和管道

程序間通信——隊列和管道(multiprocess.Queue、multiprocess.Pipe)

程序間通信

IPC(Inter-Process Communication)

隊列 

概念介紹

建立共享的程序隊列,Queue是多程序安全的隊列,可以使用Queue實作多程序之間的資料傳遞。 

Queue([maxsize])

建立共享的程序隊列。

參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。

底層隊列使用管道和鎖定實作。

方法介紹

建立共享的程序隊列。maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。底層隊列使用管道和鎖定實作。另外,還需要運作支援線程以便隊列中的資料傳輸到底層管道中。

Queue的執行個體q具有以下方法:

q.get( [ block [ ,timeout ] ] )

傳回q中的一個項目。如果q為空,此方法将阻塞,直到隊列中有項目可用為止。block用于控制阻塞行為,預設為True. 如果設定為False,将引發Queue.Empty異常(定義在Queue子產品中)。timeout是可選逾時時間,用在阻塞模式中。如果在制定的時間間隔内沒有項目變為可用,将引發Queue.Empty異常。

q.get_nowait( )

同q.get(False)方法。

q.put(item [, block [,timeout ] ] )

将item放入隊列。如果隊列已滿,此方法将阻塞至有空間可用為止。block控制阻塞行為,預設為True。如果設定為False,将引發Queue.Empty異常(定義在Queue庫子產品中)。timeout指定在阻塞模式中等待可用空間的時間長短。逾時後将引發Queue.Full異常。

q.qsize()

傳回隊列中目前項目的正确數量。此函數的結果并不可靠,因為在傳回結果和在稍後程式中使用結果之間,隊列中可能添加或删除了項目。在某些系統上,此方法可能引發NotImplementedError異常。

q.empty()

如果調用此方法時 q為空,傳回True。如果其他程序或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在傳回和使用結果之間,隊列中可能已經加入新的項目。

q.full()

如果q已滿,傳回為True. 由于線程的存在,結果也可能是不可靠的(參考q.empty()方法)。。

其他方法(了解)

q.close()

關閉隊列,防止隊列中加入更多資料。調用此方法時,背景線程将繼續寫入那些已入隊列但尚未寫入的資料,但将在此方法完成時馬上關閉。如果q被垃圾收集,将自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的資料結束信号或異常。例如,如果某個使用者正被阻塞在get()操作上,關閉生産者中的隊列不會導緻get()方法傳回錯誤。

q.cancel_join_thread()

不會再程序退出時自動連接配接背景線程。這可以防止join_thread()方法阻塞。

q.join_thread()

連接配接隊列的背景線程。此方法用于在調用q.close()方法後,等待所有隊列項被消耗。預設情況下,此方法由不是q的原始建立者的所有程序調用。調用q.cancel_join_thread()方法可以禁止這種行為。

代碼執行個體

單看隊列用法

'''

multiprocessing子產品支援程序間通信的兩種主要形式:管道和隊列

都是基于消息傳遞實作的,但是隊列接口

'''from multiprocessing import Queue

q=Queue(3)#put ,get ,put_nowait,get_nowait,full,emptyq.put(3)

q.put(3)

q.put(3)# q.put(3)  # 如果隊列已經滿了,程式就會停在這裡,等待資料被别人取走,再将資料放入隊列。          # 如果隊列中的資料一直不被取走,程式就會永遠停在這裡。try:

    q.put_nowait(3) # 可以使用put_nowait,如果隊列滿了不會阻塞,但是會因為隊列滿了而報錯。except: # 是以我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去,但是會丢掉這個消息。    print('隊列已經滿了')# 是以,我們再放入資料之前,可以先看一下隊列的狀态,如果已經滿了,就不繼續put了。print(q.full()) #滿了print(q.get())print(q.get())print(q.get())# print(q.get()) # 同put方法一樣,如果隊列已經空了,那麼繼續取就會出現阻塞。try:

    q.get_nowait(3) # 可以使用get_nowait,如果隊列滿了不會阻塞,但是會因為沒取到值而報錯。except: # 是以我們可以用一個try語句來處理這個錯誤。這樣程式不會一直阻塞下去。    print('隊列已經空了')print(q.empty()) #空了

上面這個例子還沒有加入程序通信,隻是先來看看隊列為我們提供的方法,以及這些方法的使用和現象。

import timefrom multiprocessing import Process, Queuedef f(q):

    q.put([time.asctime(), 'from Eva', 'hello'])  #調用主函數中p程序傳遞過來的程序參數 put函數為向隊列中添加一條資料。if __name__ == '__main__':

    q = Queue() #建立一個Queue對象    p = Process(target=f, args=(q,)) #建立一個程序    p.start()

    print(q.get())

    p.join()

批量生産資料放入隊列再批量擷取結果 x

上面是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最先進入的資料。 接下來看一個稍微複雜一些的例子:

import osimport timeimport multiprocessing# 向queue中輸入資料的函數def inputQ(queue):

    info = str(os.getpid()) + '(put):' + str(time.asctime())

    queue.put(info)# 向queue中輸出資料的函數def outputQ(queue):

    info = queue.get()

    print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))# Mainif __name__ == '__main__':

    multiprocessing.freeze_support()

    record1 = []  # store input processes    record2 = []  # store output processes    queue = multiprocessing.Queue(3)

    # 輸入程序    for i in range(10):

        process = multiprocessing.Process(target=inputQ,args=(queue,))

        process.start()

        record1.append(process)

    # 輸出程序    for i in range(10):

        process = multiprocessing.Process(target=outputQ,args=(queue,))

        record2.append(process)

    for p in record1:

        p.join()

    for p in record2:

生産者消費者模型

在并發程式設計中使用生産者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生産線程和消費線程的工作能力來提高程式的整體處理資料的速度。

為什麼要使用生産者和消費者模式

線上程世界裡,生産者就是生産資料的線程,消費者就是消費資料的線程。在多線程開發當中,如果生産者處理速度很快,而消費者處理速度很慢,那麼生産者就必須等待消費者處理完,才能繼續生産資料。同樣的道理,如果消費者的處理能力大于生産者,那麼消費者就必須等待生産者。為了解決這個問題于是引入了生産者和消費者模式。

什麼是生産者消費者模式

生産者消費者模式是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,是以生産者生産完資料之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列裡取,阻塞隊列就相當于一個緩沖區,平衡了生産者和消費者的處理能力。

基于隊列實作生産者消費者模型

from multiprocessing import Process,Queueimport time,random,osdef consumer(q):

    while True:

        res=q.get()

        time.sleep(random.randint(1,3))

        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(q):

    for i in range(10):

        res='包子%s' %i

        q.put(res)

        print('\033[44m%s 生産了 %s\033[0m' %(os.getpid(),res))if __name__ == '__main__':

    q=Queue()

    #生産者們:即廚師們    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們    c1=Process(target=consumer,args=(q,))

    #開始    p1.start()

    c1.start()

    print('主')

此時的問題是主程序永遠不會結束,原因是:生産者p在生産完後就結束了,但是消費者c在取空了q之後,則一直處于死循環中且卡在q.get()這一步。

解決方式無非是讓生産者在生産完畢後,往隊列中再發一個結束信号,這樣消費者在接收到結束信号後就可以break出死循環。

改良版——生産者消費者模型

        if res is None:break #收到結束信号則結束        time.sleep(random.randint(1,3))

        print('\033[44m%s 生産了 %s\033[0m' %(os.getpid(),res))

    q.put(None) #發送結束信号if __name__ == '__main__':

注意:結束信号None,不一定要由生産者發,主程序裡同樣可以發,但主程序需要等生産者結束後才應該發送該信号

主程序在生産者生産完畢後發送結束信号None

    for i in range(2):

    p1.join()

    q.put(None) #發送結束信号    print('主')

但上述解決方式,在有多個生産者和多個消費者時,我們則需要用一個很low的方式去解決

多個消費者的例子:有幾個消費者就需要發送幾次結束信号

        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))def producer(name,q):

        res='%s%s' %(name,i)

    #生産者們:即廚師們    p1=Process(target=producer,args=('包子',q))

    p2=Process(target=producer,args=('骨頭',q))

    p3=Process(target=producer,args=('泔水',q))

    c2=Process(target=consumer,args=(q,))

    p2.start()

    p3.start()

    p1.join() #必須保證生産者全部生産完畢,才應該發送結束信号    p2.join()

    p3.join()

    q.put(None) #有幾個消費者就應該發送幾次結束信号None    q.put(None) #發送結束信号    print('主')

JoinableQueue([maxsize]) 

建立可連接配接的共享程序隊列。這就像是一個Queue對象,但隊列允許項目的使用者通知生産者項目已經被成功處理。通知程序是使用共享的信号和條件變量來實作的。 

JoinableQueue的執行個體p除了與Queue對象相同的方法之外,還具有以下方法:

q.task_done()

使用者使用此方法發出信号,表示q.get()傳回的項目已經被處理。如果調用此方法的次數大于從隊列中删除的項目數量,将引發ValueError異常。

q.join()

生産者将使用此方法進行阻塞,直到隊列中所有項目均被處理。阻塞将持續到為隊列中的每個項目均調用q.task_done()方法為止。

下面的例子說明如何建立永遠運作的程序,使用和處理隊列上的項目。生産者将項目放入隊列,并等待它們被處理。

from multiprocessing import Process,JoinableQueueimport time,random,osdef consumer(q):

        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

        q.task_done() #向q.join()發送一次信号,證明一個資料已經被取走了def producer(name,q):

    q.join() #生産完畢,使用此方法進行阻塞,直到隊列中所有項目均被處理。if __name__ == '__main__':

    q=JoinableQueue()

    c1.daemon=True

    c2.daemon=True

    #開始    p_l=[p1,p2,p3,c1,c2]

    for p in p_l:

        p.start()

    p2.join()

    #主程序等--->p1,p2,p3等---->c1,c2    #p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到隊列的資料    #因而c1,c2也沒有存在的價值了,不需要繼續阻塞在程序中影響主程序了。應該随着主程序的結束而結束,是以設定成守護程序就可以了。

管道(了解)

介紹

#建立管道的類:Pipe([duplex]):在程序之間建立一條管道,并傳回元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接配接對象,強調一點:必須在産生Process對象之前産生管道#參數介紹:dumplex:預設管道是全雙工的,如果将duplex射成False,conn1隻能用于接收,conn2隻能用于發送。#主要方法:    conn1.recv():接收conn2.send(obj)發送的對象。如果沒有消息可接收,recv方法會一直阻塞。如果連接配接的另外一端已經關閉,那麼recv方法會抛出EOFError。

    conn1.send(obj):通過連接配接發送對象。obj是與序列化相容的任意對象

#其他方法:conn1.close():關閉連接配接。如果conn1被垃圾回收,将自動調用此方法

conn1.fileno():傳回連接配接使用的整數檔案描述符

conn1.poll([timeout]):如果連接配接上的資料可用,傳回True。timeout指定等待的最長時限。如果省略此參數,方法将立即傳回結果。如果将timeout射成None,操作将無限期地等待資料到達。

conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的位元組消息。maxlength指定要接收的最大位元組數。如果進入的消息,超過了這個最大值,将引發IOError異常,并且在連接配接上無法進行進一步讀取。如果連接配接的另外一端已經關閉,再也不存在任何資料,将引發EOFError異常。

conn.send_bytes(buffer [, offset [, size]]):通過連接配接發送位元組資料緩沖區,buffer是支援緩沖區接口的任意對象,offset是緩沖區中的位元組偏移量,而size是要發送位元組數。結果資料以單條消息的形式發出,然後調用c.recv_bytes()函數進行接收   

conn1.recv_bytes_into(buffer [, offset]):接收一條完整的位元組消息,并把它儲存在buffer對象中,該對象支援可寫入的緩沖區接口(即bytearray對象或類似的對象)。offset指定緩沖區中放置消息處的位元組位移。傳回值是收到的位元組數。如果消息長度大于可用的緩沖區空間,将引發BufferTooShort異常。

pipe初使用

from multiprocessing import Process, Pipedef f(conn):

    conn.send("Hello The_Third_Wave")

    conn.close()if __name__ == '__main__':

    parent_conn, child_conn = Pipe()

    p = Process(target=f, args=(child_conn,))

    p.start()

    print(parent_conn.recv())

應該特别注意管道端點的正确管理問題。如果是生産者或消費者中都沒有使用管道的某個端點,就應将它關閉。這也說明了為何在生産者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。如果忘記執行這些步驟,程式可能在消費者中的recv()操作上挂起。管道是由作業系統進行引用計數的,必須在所有程序中關閉管道後才能生成EOFError異常。是以,在生産者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。 

from multiprocessing import Process, Pipedef f(parent_conn,child_conn):

    #parent_conn.close() #不寫close将不會引發EOFError    while True:

        try:

            print(child_conn.recv())

        except EOFError:

            child_conn.close()if __name__ == '__main__':

    p = Process(target=f, args=(parent_conn,child_conn,))

    child_conn.close()

    parent_conn.send('hello')

    parent_conn.close()

pipe實作生産者消費者模型

from multiprocessing import Process,Pipedef consumer(p,name):

    produce, consume=p

    produce.close()

            baozi=consume.recv()

            print('%s 收到包子:%s' %(name,baozi))

            breakdef producer(seq,p):

    consume.close()

    for i in seq:

        produce.send(i)if __name__ == '__main__':

    produce,consume=Pipe()

    c1=Process(target=consumer,args=((produce,consume),'c1'))

    seq=(i for i in range(10))

    producer(seq,(produce,consume))

    c1.join()

    print('主程序')

多個消費之之間的競争問題帶來的資料不安全問題

from multiprocessing import Process,Pipe,Lockdef consumer(p,name,lock):

        lock.acquire()

        baozi=consume.recv()

        lock.release()

        if baozi:

        else:

            consume.close()

            breakdef producer(p,n):

    for i in range(n):

        produce.send(i)

    produce.send(None)

    produce.close()if __name__ == '__main__':

    lock = Lock()

    c1=Process(target=consumer,args=((produce,consume),'c1',lock))

    c2=Process(target=consumer,args=((produce,consume),'c2',lock))

    p1=Process(target=producer,args=((produce,consume),10))

    c2.start()

    p1.start()

    c2.join()

程序之間的資料共享

展望未來,基于消息傳遞的并發程式設計是大勢所趨

即便是使用線程,推薦做法也是将程式設計為大量獨立的線程集合,通過消息隊列交換資料。

這樣極大地減少了對使用鎖定和其他同步手段的需求,還可以擴充到分布式系統中。

但程序間應該盡量避免通信,即便需要通信,也應該選擇程序安全的工具來避免加鎖帶來的問題。

以後我們會嘗試使用資料庫來解決現在程序之間的資料共享問題。

Manager子產品介紹

程序間資料是獨立的,可以借助于隊列或管道實作通信,二者都是基于消息傳遞的

雖然程序間資料獨立,但可以通過Manager實作資料共享,事實上Manager的功能遠不止于此

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.

Manager例子

from multiprocessing import Manager,Process,Lockdef work(d,lock):

    with lock: #不加鎖而操作共享的資料,肯定會出現資料錯亂        d['count']-=1if __name__ == '__main__':

    lock=Lock()

    with Manager() as m:

        dic=m.dict({'count':100})

        p_l=[]

        for i in range(100):

            p=Process(target=work,args=(dic,lock))

            p_l.append(p)

            p.start()

        for p in p_l:

            p.join()

        print(dic)

程序池和multiprocess.Pool子產品

程序池

為什麼要有程序池?程序池的概念。

在程式實際處理問題過程中,忙時會有成千上萬的任務需要被執行,閑時可能隻有零星任務。那麼在成千上萬個任務需要被執行的時候,我們就需要去建立成千上萬個程序麼?首先,建立程序需要消耗時間,銷毀程序也需要消耗時間。第二即便開啟了成千上萬的程序,作業系統也不能讓他們同時執行,這樣反而會影響程式的效率。是以我們不能無限制的根據任務開啟或者結束程序。那麼我們要怎麼做呢?

在這裡,要給大家介紹一個程序池的概念,定義一個池子,在裡面放上固定數量的程序,有需求來了,就拿一個池中的程序來處理任務,等到處理完畢,程序并不關閉,而是将程序再放回程序池中繼續等待任務。如果有很多任務需要執行,池中的程序數量不夠,任務就要等待之前的程序執行任務完畢歸來,拿到空閑程序才能繼續執行。也就是說,池中程序的數量是固定的,那麼同一時間最多有固定數量的程序在運作。這樣不會增加作業系統的排程難度,還節省了開閉程序的時間,也一定程度上能夠實作并發效果。

multiprocess.Pool子產品

Pool([numprocess  [,initializer [, initargs]]]):建立程序池

參數介紹

1 numprocess:要建立的程序數,如果省略,将預設使用cpu_count()的值2 initializer:是每個工作程序啟動時要執行的可調用對象,預設為None3 initargs:是要傳給initializer的參數組

主要方法

1 p.apply(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後傳回結果。2 '''需要強調的是:此操作并不會在所有池工作程序中并執行func函數。如果要通過不同參數并發地執行func函數,必須從不同線程調用p.apply()函數或者使用p.apply_async()'''3 4 p.apply_async(func [, args [, kwargs]]):在一個池工作程序中執行func(*args,**kwargs),然後傳回結果。5 '''此方法的結果是AsyncResult類的執行個體,callback是可調用對象,接收輸入參數。當func的結果變為可用時,将了解傳遞給callback。callback禁止執行任何阻塞操作,否則将接收其他異步操作中的結果。'''6    7 p.close():關閉程序池,防止進一步操作。如果所有操作持續挂起,它們将在工作程序終止前完成8 9 P.jion():等待所有工作程序退出。此方法隻能在close()或teminate()之後調用

1 方法apply_async()和map_async()的傳回值是AsyncResul的執行個體obj。執行個體具有以下方法2 obj.get():傳回結果,如果有必要則等待結果到達。timeout是可選的。如果在指定時間内還沒有到達,将引發一場。如果遠端操作中引發了異常,它将在調用此方法時再次被引發。3 obj.ready():如果調用完成,傳回True4 obj.successful():如果調用完成且沒有引發異常,傳回True,如果在結果就緒之前調用此方法,引發異常5 obj.wait([timeout]):等待結果變為可用。6 obj.terminate():立即終止所有工作程序,同時不執行任何清理或結束任何挂起工作。如果p被垃圾回收,将自動調用此函數

同步和異步

程序池的同步調用

import os,timefrom multiprocessing import Pooldef work(n):

    print('%s run' %os.getpid())

    time.sleep(3)

    return n**2if __name__ == '__main__':

    p=Pool(3) #程序池中從無到有建立三個程序,以後一直是這三個程序在執行任務    res_l=[]

        res=p.apply(work,args=(i,)) # 同步調用,直到本次任務執行完畢拿到res,等待任務work執行的過程中可能有阻塞也可能沒有阻塞                                    # 但不管該任務是否存在阻塞,同步調用都會在原地等着    print(res_l)

程序池的異步調用

import osimport timeimport randomfrom multiprocessing import Pooldef work(n):

    time.sleep(random.random())

        res=p.apply_async(work,args=(i,)) # 異步運作,根據程序池中有的程序數,每次最多3個子程序在異步執行                                          # 傳回結果之後,将結果放入清單,歸還程序,之後再執行新的任務                                          # 需要注意的是,程序池中的三個程序不會同時開啟或者同時結束                                          # 而是執行完一個就釋放一個程序,這個程序就去接收新的任務。          res_l.append(res)

    # 異步apply_async用法:如果使用異步送出的任務,主程序需要使用jion,等待程序池内任務都處理完,然後可以用get收集結果    # 否則,主程序結束,程序池可能還沒來得及執行,也就跟着一起結束了    p.close()

    for res in res_l:

        print(res.get()) #使用get來擷取apply_aync的結果,如果是apply,則沒有get方法,因為apply是同步執行,立刻擷取結果,也根本無需get

server:程序池版socket并發聊天

#Pool内的程序數預設是cpu核數,假設為4(檢視方法os.cpu_count())

#開啟6個用戶端,會發現2個用戶端處于等待狀态

#在每個程序内檢視pid,會發現pid使用為4個,即多個用戶端公用4個程序from socket import *from multiprocessing import Poolimport os

server=socket(AF_INET,SOCK_STREAM)

server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

server.bind(('127.0.0.1',8080))

server.listen(5)def talk(conn):

    print('程序pid: %s' %os.getpid())

            msg=conn.recv(1024)

            if not msg:break            conn.send(msg.upper())

        except Exception:

            breakif __name__ == '__main__':

    p=Pool(4)

        conn,*_=server.accept()

        p.apply_async(talk,args=(conn,))

        # p.apply(talk,args=(conn,client_addr)) #同步的話,則同一時間隻有一個用戶端能通路

client

from socket import *

client=socket(AF_INET,SOCK_STREAM)

client.connect(('127.0.0.1',8080))while True:

    msg=input('>>: ').strip()

    if not msg:continue    client.send(msg.encode('utf-8'))

    msg=client.recv(1024)

    print(msg.decode('utf-8'))

發現:并發開啟多個用戶端,服務端同一時間隻有4個不同的pid,隻能結束一個用戶端,另外一個用戶端才會進來.

回調函數

需要回調函數的場景:程序池中任何一個任務一旦處理完了,就立即告知主程序:我好了額,你可以處理我的結果了。主程序則調用一個函數去處理該結果,該函數即回調函數

我們可以把耗時間(阻塞)的任務放到程序池中,然後指定回調函數(主程序負責執行),這樣主程序在執行回調函數時就省去了I/O的過程,直接拿到的是任務的結果。

使用多程序請求多個url來減少網絡等待浪費的時間

from multiprocessing import Poolimport requestsimport jsonimport osdef get_page(url):

    print('<程序%s> get %s' %(os.getpid(),url))

    respone=requests.get(url)

    if respone.status_code == 200:

        return {'url':url,'text':respone.text}def pasrse_page(res):

    print('<程序%s> parse %s' %(os.getpid(),res['url']))

    parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))

    with open('db.txt','a') as f:

        f.write(parse_res)if __name__ == '__main__':

    urls=[

        'https://www.baidu.com',

        'https://www.python.org',

        'https://www.openstack.org',

        'https://help.github.com/',

        'http://www.sina.com.cn/'    ]

    p=Pool(3)

    res_l=[]

    for url in urls:

        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)

        res_l.append(res)

    p.close()

    print([res.get() for res in res_l]) #拿到的是get_page的結果,其實完全沒必要拿該結果,該結果已經傳給回調函數處理了'''

列印結果:

<程序3388> get https://www.baidu.com

<程序3389> get https://www.python.org

<程序3390> get https://www.openstack.org

<程序3388> get https://help.github.com/

<程序3387> parse https://www.baidu.com

<程序3389> get http://www.sina.com.cn/

<程序3387> parse https://www.python.org

<程序3387> parse https://help.github.com/

<程序3387> parse http://www.sina.com.cn/

<程序3387> parse https://www.openstack.org

[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>\r\n...',...}]

爬蟲執行個體

import refrom urllib.request import urlopenfrom multiprocessing import Pooldef get_page(url,pattern):

    response=urlopen(url).read().decode('utf-8')

    return pattern,responsedef parse_page(info):

    pattern,page_content=info

    res=re.findall(pattern,page_content)

    for item in res:

        dic={

            'index':item[0].strip(),

            'title':item[1].strip(),

            'actor':item[2].strip(),

            'time':item[3].strip(),

        }

        print(dic)if __name__ == '__main__':

    regex = r'<dd>.*?<.*?class="board-index.*?>(\d+)</i>.*?title="(.*?)".*?class="movie-item-info".*?<p class="star">(.*?)</p>.*?<p class="releasetime">(.*?)</p>'    pattern1=re.compile(regex,re.S)

    url_dic={

        'http://maoyan.com/board/7':pattern1,

    }

    p=Pool()

    for url,pattern in url_dic.items():

        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)

    for i in res_l:

        i.get()

如果在主程序中等待程序池中所有任務都執行完畢後,再統一處理結果,則無需回調函數

無需回調函數

from multiprocessing import Poolimport time,random,osdef work(n):

    time.sleep(1)

        res=p.apply_async(work,args=(i,))