天天看點

多任務-程序

閱讀目錄

1.程序以及狀态

2.程序的建立-multiprocessing

3.程序、線程對比

4.程序間通信-Queue

5.程序的建立-程序池Pool

6.案例:檔案夾拷貝器(多程序版)

一.程序以及狀态(五狀态模型)

1. 程序

程式:例如xxx.py這是程式,是一個靜态的

程序:一個程式運作起來後,代碼+用到的資源 稱之為程序,它是作業系統配置設定資源的基本單元。

不僅可以通過線程完成多任務,程序也是可以的

2. 程序的狀态

工作中,任務數往往大于cpu的核數,即一定有一些任務正在執行,而另外一些任務在等待cpu進行執行,是以導緻了有了不同的狀态

多任務-程式
  • 就緒态:運作的條件都已經慢去,正在等在cpu執行
  • 執行态:cpu正在執行其功能
  • 等待态:等待某些條件滿足,例如一個程式sleep了,此時就處于等待态

二.程序的建立-multiprocessing

multiprocessing子產品就是跨平台版本的多程序子產品,提供了一個Process類來代表一個程序對象,這個對象可以了解為是一個獨立的程序,可以執行另外的事情

1. 2個while循環一起執行

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time


def run_proc():
    """子程序要執行的代碼"""
    while True:
        print("----2----")
        time.sleep(1)


if __name__=='__main__':
    p = Process(target=run_proc)
    p.start()
    while True:
        print("----1----")
        time.sleep(1)      

說明

  • 建立子程序時,隻需要傳入一個執行函數和函數的參數,建立一個Process執行個體,用start()方法啟動

2. 程序pid

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time

def run_proc():
    """子程序要執行的代碼"""
    print('子程序運作中,pid=%d...' % os.getpid())  # os.getpid擷取目前程序的程序号
    print('子程序将要結束...')

if __name__ == '__main__':
    print('父程序pid: %d' % os.getpid())  # os.getpid擷取目前程序的程序号
    p = Process(target=run_proc)
    p.start()      

3. Process文法結構如下:

Process([group [, target [, name [, args [, kwargs]]]]])

  • target:如果傳遞了函數的引用,可以任務這個子程序就執行這裡的代碼
  • args:給target指定的函數傳遞的參數,以元組的方式傳遞
  • kwargs:給target指定的函數傳遞命名參數
  • name:給程序設定一個名字,可以不設定
  • group:指定程序組,大多數情況下用不到

Process建立的執行個體對象的常用方法:

  • start():啟動子程序執行個體(建立子程序)
  • is_alive():判斷程序子程序是否還在活着
  • join([timeout]):是否等待子程序執行結束,或等待多少秒
  • terminate():不管任務是否完成,立即終止子程序

Process建立的執行個體對象的常用屬性:

  • name:目前程序的别名,預設為Process-N,N為從1開始遞增的整數
  • pid:目前程序的pid(程序号)

4. 給子程序指定的函數傳遞參數

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
from time import sleep


def run_proc(name, age, **kwargs):
    for i in range(10):
        print('子程序運作中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid()))
        print(kwargs)
        sleep(0.2)

if __name__=='__main__':
    p = Process(target=run_proc, args=('test',18), kwargs={"m":20})
    p.start()
    sleep(1)  # 1秒中之後,立即結束子程序
    p.terminate()
    p.join()      

運作結果:

子程序運作中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序運作中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序運作中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序運作中,name= test,age=18 ,pid=45097...
{'m': 20}
子程序運作中,name= test,age=18 ,pid=45097...
{'m': 20}      

5. 程序間不同享全局變量

# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time

nums = [11, 22]

def work1():
    """子程序要執行的代碼"""
    print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))
    for i in range(3):
        nums.append(i)
        time.sleep(1)
        print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))

def work2():
    """子程序要執行的代碼"""
    print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))

if __name__ == '__main__':
    p1 = Process(target=work1)
    p1.start()
    p1.join()

    p2 = Process(target=work2)
    p2.start()      

運作結果:

in process1 pid=11349 ,nums=[11, 22]
in process1 pid=11349 ,nums=[11, 22, 0]
in process1 pid=11349 ,nums=[11, 22, 0, 1]
in process1 pid=11349 ,nums=[11, 22, 0, 1, 2]
in process2 pid=11350 ,nums=[11, 22]      

三.程序、線程對比

功能

  • 程序,能夠完成多任務,比如 在一台電腦上能夠同時運作多個QQ
  • 線程,能夠完成多任務,比如 一個QQ中的多個聊天視窗
多任務-程式

定義的不同

  • 程序是系統進行資源配置設定和排程的一個獨立機關.
  • 線程是程序的一個實體,是CPU排程和分派的基本機關,它是比程序更小的能獨立運作的基本機關.線程自己基本上不擁有系統資源,隻擁有一點在運作中必不可少的資源(如程式計數器,一組寄存器和棧),但是它可與同屬一個程序的其他的線程共享程序所擁有的全部資源.

差別

  • 一個程式至少有一個程序,一個程序至少有一個線程.
  • 線程的劃分尺度小于程序(資源比程序少),使得多線程程式的并發性高。
  • 程序在執行過程中擁有獨立的記憶體單元,而多個線程共享記憶體,進而極大地提高了程式的運作效率
多任務-程式
  • 線程不能夠獨立執行,必須依存在程序中
  • 可以将程序了解為工廠中的一條流水線,而其中的線程就是這個流水線上的勞工
多任務-程式

優缺點

線程和程序在使用上各有優缺點:線程執行開銷小,但不利于資源的管理和保護;而程序正相反。

四.程序間通信-Queue

Process之間有時需要通信,作業系統提供了很多機制來實作程序間的通信。

1. Queue的使用

可以使用multiprocessing子產品的Queue實作多程序之間的資料傳遞,Queue本身是一個消息列隊程式,首先用一個小執行個體來示範一下Queue的工作原理:

#coding=utf-8
from multiprocessing import Queue
q=Queue(3) #初始化一個Queue對象,最多可接收三條put消息
q.put("消息1") 
q.put("消息2")
print(q.full())  #False
q.put("消息3")
print(q.full()) #True

#因為消息列隊已滿下面的try都會抛出異常,第一個try會等待2秒後再抛出異常,第二個Try會立刻抛出異常
try:
    q.put("消息4",True,2)
except:
    print("消息列隊已滿,現有消息數量:%s"%q.qsize())

try:
    q.put_nowait("消息4")
except:
    print("消息列隊已滿,現有消息數量:%s"%q.qsize())

#推薦的方式,先判斷消息列隊是否已滿,再寫入
if not q.full():
    q.put_nowait("消息4")

#讀取消息時,先判斷消息列隊是否為空,再讀取
if not q.empty():
    for i in range(q.qsize()):
        print(q.get_nowait())      

運作結果:

False
True
消息列隊已滿,現有消息數量:3
消息列隊已滿,現有消息數量:3
消息1
消息2
消息3      
說明

初始化Queue()對象時(例如:q=Queue()),若括号中沒有指定最大可接收的消息數量,或數量為負值,那麼就代表可接受的消息數量沒有上限(直到記憶體的盡頭);

  • Queue.qsize():傳回目前隊列包含的消息數量;
  • Queue.empty():如果隊列為空,傳回True,反之False ;
  • Queue.full():如果隊列滿了,傳回True,反之False;
  • Queue.get([block[, timeout]]):擷取隊列中的一條消息,然後将其從列隊中移除,block預設值為True;

1)如果block使用預設值,且沒有設定timeout(機關秒),消息列隊如果為空,此時程式将被阻塞(停在讀取狀态),直到從消息列隊讀到消息為止,如果設定了timeout,則會等待timeout秒,若還沒讀取到任何消息,則抛出"Queue.Empty"異常;

2)如果block值為False,消息列隊如果為空,則會立刻抛出"Queue.Empty"異常;

  • Queue.get_nowait():相當Queue.get(False);
  • Queue.put(item,[block[, timeout]]):将item消息寫入隊列,block預設值為True;

1)如果block使用預設值,且沒有設定timeout(機關秒),消息列隊如果已經沒有空間可寫入,此時程式将被阻塞(停在寫入狀态),直到從消息列隊騰出空間為止,如果設定了timeout,則會等待timeout秒,若還沒空間,則抛出"Queue.Full"異常;

2)如果block值為False,消息列隊如果沒有空間可寫入,則會立刻抛出"Queue.Full"異常;

  • Queue.put_nowait(item):相當Queue.put(item, False);

2. Queue執行個體

我們以Queue為例,在父程序中建立兩個子程序,一個往Queue裡寫資料,一個從Queue裡讀資料:

from multiprocessing import Process, Queue
import os, time, random

# 寫資料程序執行的代碼:
def write(q):
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 讀資料程序執行的代碼:
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print('Get %s from queue.' % value)
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    # 父程序建立Queue,并傳給各個子程序:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子程序pw,寫入:
    pw.start()    
    # 等待pw結束:
    pw.join()
    # 啟動子程序pr,讀取:
    pr.start()
    pr.join()
    # pr程序裡是死循環,無法等待其結束,隻能強行終止:
    print('')
    print('所有資料都寫入并且讀完')      

運作結果:

多任務-程式

五.程序池Pool

當需要建立的子程序數量不多時,可以直接利用multiprocessing中的Process動态成生多個程序,但如果是上百甚至上千個目标,手動的去建立程序的工作量巨大,此時就可以用到multiprocessing子產品提供的Pool方法。

初始化Pool時,可以指定一個最大程序數,當有新的請求送出到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務,請看下面的執行個體:

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import os, time, random

def worker(msg):
    t_start = time.time()
    print("%s開始執行,程序号為%d" % (msg,os.getpid()))
    # random.random()随機生成0~1之間的浮點數
    time.sleep(random.random()*2) 
    t_stop = time.time()
    print(msg,"執行完畢,耗時%0.2f" % (t_stop-t_start))

po = Pool(3)  # 定義一個程序池,最大程序數3
for i in range(0,10):
    # Pool().apply_async(要調用的目标,(傳遞給目标的參數元祖,))
    # 每次循環将會用空閑出來的子程序去調用目标
    po.apply_async(worker,(i,))

print("----start----")
po.close()  # 關閉程序池,關閉後po不再接收新的請求
po.join()  # 等待po中所有子程序執行完成,必須放在close語句之後
print("-----end-----")      

運作結果:

----start----
0開始執行,程序号為21466
1開始執行,程序号為21468
2開始執行,程序号為21467
0 執行完畢,耗時1.01
3開始執行,程序号為21466
2 執行完畢,耗時1.24
4開始執行,程序号為21467
3 執行完畢,耗時0.56
5開始執行,程序号為21466
1 執行完畢,耗時1.68
6開始執行,程序号為21468
4 執行完畢,耗時0.67
7開始執行,程序号為21467
5 執行完畢,耗時0.83
8開始執行,程序号為21466
6 執行完畢,耗時0.75
9開始執行,程序号為21468
7 執行完畢,耗時1.03
8 執行完畢,耗時1.05
9 執行完畢,耗時1.69
-----end-----      

multiprocessing.Pool常用函數解析:

  • apply_async(func[, args[, kwds]]) :使用非阻塞方式調用func(并行執行,堵塞方式必須等待上一個程序退出才能執行下一個程序),args為傳遞給func的參數清單,kwds為傳遞給func的關鍵字參數清單;
  • close():關閉Pool,使其不再接受新的任務;
  • terminate():不管任務是否完成,立即終止;
  • join():主程序阻塞,等待子程序的退出, 必須在close或terminate之後使用;

程序池中的Queue

如果要使用Pool建立程序,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤資訊:

RuntimeError: Queue objects should only be shared between processes through inheritance.

下面的執行個體示範了程序池中的程序如何通信:

# -*- coding:utf-8 -*-

# 修改import中的Queue為Manager
from multiprocessing import Manager,Pool
import os,time,random

def reader(q):
    print("reader啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("reader從Queue擷取到消息:%s" % q.get(True))

def writer(q):
    print("writer啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid()))
    for i in "itcast":
        q.put(i)

if __name__=="__main__":
    print("(%s) start" % os.getpid())
    q = Manager().Queue()  # 使用Manager中的Queue
    po = Pool()
    po.apply_async(writer, (q,))

    time.sleep(1)  # 先讓上面的任務向Queue存入資料,然後再讓下面的任務開始從中取資料

    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())      

運作結果:

(11095) start
writer啟動(11097),父程序為(11095)
reader啟動(11098),父程序為(11095)
reader從Queue擷取到消息:i
reader從Queue擷取到消息:t
reader從Queue擷取到消息:c
reader從Queue擷取到消息:a
reader從Queue擷取到消息:s
reader從Queue擷取到消息:t
(11095) End      

六.應用:檔案夾copy器(多程序版)

import multiprocessing
import os
import time
import random


def copy_file(queue, file_name,source_folder_name,  dest_folder_name):
    """copy檔案到指定的路徑"""
    f_read = open(source_folder_name + "/" + file_name, "rb")
    f_write = open(dest_folder_name + "/" + file_name, "wb")
    while True:
        time.sleep(random.random())
        content = f_read.read(1024)
        if content:
            f_write.write(content)
        else:
            break
    f_read.close()
    f_write.close()

    # 發送已經拷貝完畢的檔案名字
    queue.put(file_name)


def main():
    # 擷取要複制的檔案夾
    source_folder_name = input("請輸入要複制檔案夾名字:")

    # 整理目标檔案夾
    dest_folder_name = source_folder_name + "[副本]"

    # 建立目标檔案夾
    try:
        os.mkdir(dest_folder_name)
    except:
        pass  # 如果檔案夾已經存在,那麼建立會失敗

    # 擷取這個檔案夾中所有的普通檔案名
    file_names = os.listdir(source_folder_name)

    # 建立Queue
    queue = multiprocessing.Manager().Queue()

    # 建立程序池
    pool = multiprocessing.Pool(3)

    for file_name in file_names:
        # 向程序池中添加任務
        pool.apply_async(copy_file, args=(queue, file_name, source_folder_name, dest_folder_name))

    # 主程序顯示進度
    pool.close()

    all_file_num = len(file_names)
    while True:
        file_name = queue.get()
        if file_name in file_names:
            file_names.remove(file_name)

        copy_rate = (all_file_num-len(file_names))*100/all_file_num
        print("\r%.2f...(%s)" % (copy_rate, file_name) + " "*50, end="")
        if copy_rate >= 100:
            break
    print()


if __name__ == "__main__":
    main()