天天看點

python多程序程式設計

序. multiprocessing

python中的多線程其實并不是真正的多線程,如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多程序。Python提供了非常好用的多程序包multiprocessing,隻需要定義一個函數,Python會完成其他所有事情。借助這個包,可以輕松完成從單程序到并發執行的轉換。multiprocessing支援子程序、通信和共享資料、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等元件。

1. Process

建立程序的類:Process([group [, target [, name [, args [, kwargs]]]]]),target表示調用對象,args表示調用對象的位置參數元組。kwargs表示調用對象的字典。name為别名。group實質上不使用。

方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()啟動某個程序。

屬性:authkey、daemon(要通過start()設定)、exitcode(程序在運作時為None、如果為–N,表示被信号N結束)、name、pid。其中daemon是父程序終止後自動終止,且自己不能産生新程序,必須在start()之前設定。

例1.1:建立函數并将其作為單個程序

import multiprocessing
import time
def worker(interval):
    n = 5
    while n > 0:
        print("The time is {0}".format(time.ctime()))
        time.sleep(interval)
        n -= 1
if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "p.pid:", p.pid
    print "p.name:", p.name
    print "p.is_alive:", p.is_alive()      

結果

p.pid: 8736
p.name: Process-1
p.is_alive: True
The time is Tue Apr 21 20:55:12 2015
The time is Tue Apr 21 20:55:15 2015
The time is Tue Apr 21 20:55:18 2015
The time is Tue Apr 21 20:55:21 2015
The time is Tue Apr 21 20:55:24 2015      

例1.2:建立函數并将其作為多個程序

import multiprocessing
import time
def worker_1(interval):
    print "worker_1"
    time.sleep(interval)
    print "end worker_1"
def worker_2(interval):
    print "worker_2"
    time.sleep(interval)
    print "end worker_2"
def worker_3(interval):
    print "worker_3"
    time.sleep(interval)
    print "end worker_3"
if __name__ == "__main__":
    p1 = multiprocessing.Process(target = worker_1, args = (2,))
    p2 = multiprocessing.Process(target = worker_2, args = (3,))
    p3 = multiprocessing.Process(target = worker_3, args = (4,))
    p1.start()
    p2.start()
    p3.start()
    print("The number of CPU is:" + str(multiprocessing.cpu_count()))
    for p in multiprocessing.active_children():
        print("child   p.name:" + p.name + "\tp.id" + str(p.pid))
    print "END!!!!!!!!!!!!!!!!!"      
The number of CPU is:4
child   p.name:Process-3    p.id7992
child   p.name:Process-2    p.id4204
child   p.name:Process-1    p.id6380
END!!!!!!!!!!!!!!!!!
worker_1
worker_3
worker_2
end worker_1
end worker_2
end worker_3      

例1.3:将程序定義為類

import multiprocessing
import time
class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval
    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1
if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()      

注:程序p調用start()時,自動調用run()

the time is Tue Apr 21 20:31:30 2015
the time is Tue Apr 21 20:31:33 2015
the time is Tue Apr 21 20:31:36 2015
the time is Tue Apr 21 20:31:39 2015
the time is Tue Apr 21 20:31:42 2015      

例1.4:daemon程式對比結果

#1.4-1 不加daemon屬性

import multiprocessing
import time
def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));
if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.start()
    print "end!"      
end!
work start:Tue Apr 21 21:29:10 2015
work end:Tue Apr 21 21:29:13 2015      

#1.4-2 加上daemon屬性

import multiprocessing
import time
def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));
if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    print "end!"      

end!

注:因子程序設定了daemon屬性,主程序結束,它們就随着結束了。

#1.4-3 設定daemon執行完結束的方法

import multiprocessing
import time
def worker(interval):
    print("work start:{0}".format(time.ctime()));
    time.sleep(interval)
    print("work end:{0}".format(time.ctime()));
if __name__ == "__main__":
    p = multiprocessing.Process(target = worker, args = (3,))
    p.daemon = True
    p.start()
    p.join()
    print "end!"      
work start:Tue Apr 21 22:16:32 2015
work end:Tue Apr 21 22:16:35 2015
end!      

2. Lock

當多個程序需要通路共享資源的時候,Lock可以用來避免通路的沖突。

import multiprocessing
import sys
def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()
        
def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()
    
if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print "end"      

結果(輸出檔案)

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

3. Semaphore

Semaphore用來控制對共享資源的通路數量,例如池的最大連接配接數。

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()      
Process-1acquire
Process-1release
 
Process-2acquire
Process-3acquire
Process-2release
 
Process-5acquire
Process-3release
 
Process-4acquire
Process-5release
 
Process-4release      

4. Event

Event用來實作程序間同步通信。

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")      
wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True      

5. Queue

Queue是多程序安全的隊列,可以使用Queue實作多程序之間的資料傳遞。put方法用以插入資料到隊列中,put方法還有兩個可選參數:blocked和timeout。如果blocked為True(預設值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該隊列有剩餘的空間。如果逾時,會抛出Queue.Full異常。如果blocked為False,但該Queue已滿,會立即抛出Queue.Full異常。

get方法可以從隊列讀取并且删除一個元素。同樣,get方法有兩個可選參數:blocked和timeout。如果blocked為True(預設值),并且timeout為正值,那麼在等待時間内沒有取到任何元素,會抛出Queue.Empty異常。如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即傳回該值,否則,如果隊列為空,則立即抛出Queue.Empty異常。Queue的一段示例代碼:

import multiprocessing

def writer_proc(q):      
    try:         
        q.put(1, block = False) 
    except:         
        pass   

def reader_proc(q):      
    try:         
        print q.get(block = False) 
    except:         
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()      
1      

6. Pipe

Pipe方法傳回(conn1,

conn2)代表一個管道的兩個端。Pipe方法有duplex參數,如果duplex參數為True(預設值),那麼這個管道是全雙工模式,也就是說conn1和conn2均可收發。duplex為False,conn1隻負責接受消息,conn2隻負責發送消息。

send和recv方法分别是發送和接受消息的方法。例如,在全雙工模式下,可以調用conn1.send發送消息,conn1.recv接收消息。如果沒有消息可接收,recv方法會一直阻塞。如果管道已經被關閉,那麼recv方法會抛出EOFError。

import multiprocessing
import time

def proc1(pipe):
    while True:
        for i in xrange(10000):
            print "send: %s" %(i)
            pipe.send(i)
            time.sleep(1)

def proc2(pipe):
    while True:
        print "proc2 rev:", pipe.recv()
        time.sleep(1)

def proc3(pipe):
    while True:
        print "PROC3 rev:", pipe.recv()
        time.sleep(1)

if __name__ == "__main__":
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))
    p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))
    #p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))

    p1.start()
    p2.start()
    #p3.start()

    p1.join()
    p2.join()
    #p3.join()      
python多程式程式設計

7. Pool

在利用Python進行系統管理的時候,特别是同時操作多個檔案目錄,或者遠端控制多台主機,并行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動态成生多個程序,十幾個還好,但如果是上百個,上千個目标,手動的去限制程序數量卻又太過繁瑣,此時可以發揮程序池的功效。

Pool可以提供指定數量的程序,供使用者調用,當有新的請求送出到pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到規定最大值,那麼該請求就會等待,直到池中有程序結束,才會建立新的程序來它。

例7.1:使用程序池(非阻塞)

#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #維持執行的程序總數為processes,當一個程序執行完畢後會添加新的程序進去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的程序加入到pool,join函數等待所有子程序結束
    print "Sub-process(es) done."      

一次執行結果

mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
 
msg: hello 1
msg: hello 2
end
msg: hello 3
end
end
end
Sub-process(es) done.      

函數解釋:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(了解差別,看例1例2結果差別)
  • close()    關閉pool,使其不在接受新的任務。
  • terminate()    結束工作程序,不在處理未完成的任務。
  • join()    主程序阻塞,等待子程序的退出, join方法要在close或terminate之後使用。

執行說明:建立一個程序池pool,并設定程序的數量為3,xrange(4)會相繼産生四個對象[0, 1, 2,

4],四個對象被送出到pool中,因pool指定程序數為3,是以0、1、2會直接送到程序中執行,當其中一個執行完事後才空出一個程序處理對象3,是以會出現輸出“msg:

hello 3”出現在"end"後。因為為非阻塞,主函數會自己執行自個的,不搭理程序的執行,是以運作完for循環後直接輸出“mMsg:

hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程式在pool.join()處等待各個程序的結束。

例7.2:使用程序池(阻塞)

python多程式程式設計
#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply(func, (msg, ))   #維持執行的程序總數為processes,當一個程序執行完畢後會添加新的程序進去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #調用join之前,先調用close函數,否則會出錯。執行完close後不會有新的程序加入到pool,join函數等待所有子程序結束
    print "Sub-process(es) done."      
python多程式程式設計

一次執行的結果

msg: hello 0
end
msg: hello 1
end
msg: hello 2
end
msg: hello 3
end
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done.      

例7.3:使用程序池,并關注結果

import multiprocessing
import time
def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"
    return "done" + msg
if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []
    for i in xrange(3):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()
    print "Sub-process(es) done."      
msg: hello 0
msg: hello 1
msg: hello 2
end
end
end
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.      

例7.4:使用多個程序池

#coding: utf-8
import multiprocessing
import os, time, random
def Lee():
    print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()擷取目前的程序的ID
    start = time.time()
    time.sleep(random.random() * 10) #random.random()随機生成0-1之間的小數
    end = time.time()
    print 'Task Lee, runs %0.2f seconds.' %(end - start)
def Marlon():
    print "\nRun task Marlon-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 40)
    end=time.time()
    print 'Task Marlon runs %0.2f seconds.' %(end - start)
def Allen():
    print "\nRun task Allen-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 30)
    end = time.time()
    print 'Task Allen runs %0.2f seconds.' %(end - start)
def Frank():
    print "\nRun task Frank-%s" %(os.getpid())
    start = time.time()
    time.sleep(random.random() * 20)
    end = time.time()
    print 'Task Frank runs %0.2f seconds.' %(end - start)
        
if __name__=='__main__':
    function_list=  [Lee, Marlon, Allen, Frank] 
    print "parent process %s" %(os.getpid())
    pool=multiprocessing.Pool(4)
    for func in function_list:
        pool.apply_async(func)     #Pool執行函數,apply執行函數,當有一個程序執行完畢後,會添加一個新的程序到pool中
    print 'Waiting for all subprocesses done...'
    pool.close()
    pool.join()    #調用join之前,一定要先調用close() 函數,否則會出錯, close()執行後不會有新的程序加入到pool,join函數等待素有子程序結束
    print 'All subprocesses done.'      
parent process 7704
 
Waiting for all subprocesses done...
Run task Lee-6948
 
Run task Marlon-2896
 
Run task Allen-7304
 
Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.