天天看點

day 7-8 協程

不能無限的開程序,不能無限的開線程
最常用的就是開程序池,開線程池。其中回調函數非常重要
回調函數其實可以作為一種程式設計思想,誰好了誰就去調

      
隻要你用并發,就會有鎖的問題,但是你不能一直去自己加鎖吧
那麼我們就用QUEUE,這樣還解決了自動加鎖的問題
由Queue延伸出的一個點也非常重要的概念。以後寫程式也會用到
這個思想。就是生産者與消費者問題      

一、Python标準子產品--concurrent.futures(并發未來)

concurent.future子產品需要了解的
1.concurent.future子產品是用來建立并行的任務,提供了更進階别的接口,
為了異步執行調用
2.concurent.future這個子產品用起來非常友善,它的接口也封裝的非常簡單
3.concurent.future子產品既可以實作程序池,也可以實作線程池
4.子產品導入程序池和線程池
from  concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
5.p = ProcessPoolExecutor(max_works)對于程序池如果不寫max_works:預設的是cpu的數目,預設是4個
  p = ThreadPoolExecutor(max_works)對于線程池如果不寫max_works:預設的是cpu的數目*5
6.如果是程序池,得到的結果如果是一個對象。我們得用一個.get()方法得到結果
  但是現在用了concurent.future子產品,我們可以用obj.result方法
  p.submit(task,i)  #相當于apply_async異步方法
  p.shutdown()  #預設有個參數wite=True (相當于close和join)      

二、線程池

程序池:就是在一個程序内控制一定個數的線程
基于concurent.future子產品的程序池和線程池 (他們的同步執行和異步執行是一樣的)
線程池内生成一定數量的線程,當遇到I/O時切換.并不是說有多少個任務,就就多少個線程.
      
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os
import time
import random
# I/O密集型的用線程池,用程序池的話開銷大,效率低
#----------------同步執行-----------------
def task(n):
    print("%s is runing"%os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':
    temp_li=[]
    start = time.time()
    p=ProcessPoolExecutor(max_workers=4)
    for i in range(10):
        obj=p.submit(task,i).result()       #等待結果,相當于apple同步方法.永遠都會隻是4個程序,就算有100個任務,也是4個程序輪流切換
        temp_li.append(obj)
    p.shutdown()  #相當于close和join
    print(temp_li)
    print("耗時:%s"%(time.time()-start))



#---------------異步執行------------------
def task(n):
    print("%s is running"%os.getpid())
    time.sleep(random.randint(1,3))
    return n**2


if __name__ == '__main__':
    temp_li=[]
    start=time.time()
    p=ProcessPoolExecutor(max_workers=4) #如果不填寫max_workers,預設是cpu核數
    for i in range(10):
        obj = p.submit(task,i)        #不等待結果,送出完就走
        print(obj)                    #列印程序狀态的話,會看到有些是running(運作态),有些是pending(就緒).
        temp_li.append(obj)
    p.shutdown()
    print([obj.result() for obj in temp_li])
    print("耗時:%s"%(time.time() - start))      

基于concurrent.future程序池

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os
import time
import random

def task(n):
    print("%s is running"%os.getpid())
    time.sleep(random.randint(1,3))
    return n**2


if __name__ == '__main__':
    temp_li=[]
    start=time.time()
    t=ThreadPoolExecutor() #如果不填寫max_workers,預設是cpu核數*5
    for i in range(10):
        obj =t.submit(task,i)        #不等待結果,送出完就走
        print(obj)                    #列印程序狀态的話,會看到有些是running(運作态),有些是pending(就緒).
        temp_li.append(obj)
    t.shutdown()
    print([obj.result() for obj in temp_li])
    print("耗時:%s"%(time.time() - start))    #3.003171682357788      

基于concurrent.future線程池

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import time
import os

def get_page(url):
    print("%s is getting %s"%(os.getpid(),url))
    response = requests.get(url)
    if response.status_code==200:#200表示下載下傳成功的狀态碼
        return {"url":url,"text":response.text}

def pares_page(res):
    res = res.result()
    print("%s is getting %s"%(os.getpid(),res["url"]))
    with open("db1.txt","a",encoding="utf-8") as f:
        pares_res = "url:%s size:%s 
" %(res["url"],len(res["url"]))
        f.write(pares_res)
        
        
if __name__ == '__main__':
    p = ProcessPoolExecutor()
    # p = ThreadPoolExecutor()
    li =[
        "http://www.baidu.com",
        "http://www.google.com",
        "http://www.youporn.com"
    ]

    for url in li:
        res = p.submit(get_page,url).add_done_callback(pares_page)#回調函數.
    p.shutdown()
    print("main",os.getpid())      

線程池應用

 map函數的應用 

# map函數舉例
obj= map(lambda x:x**2 ,range(10))
print(list(obj))

#運作結果[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]      
#! -*- coding:utf-8 -*-
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os
import time
import random
def task(n):
    print("%s is running"%os.getpid())
    time.sleep(random.randint(1,3))
    return n**2


if __name__ == '__main__':
    temp_li=[]
    start=time.time()
    t=ThreadPoolExecutor() #如果不填寫max_workers,預設是cpu核數*5
    obj = t.map(task,range(10))
    t.shutdown()

    print(list(obj))
    print("耗時:%s"%(time.time() - start))       

map函數

三,協程

  協程是一種使用者态的輕量級線程,協程的排程完全由使用者控制。協程擁有自己的寄存器上下文和棧。協程排程切換時,将寄存器上下文和棧儲存到其他地方,在切回來的時候,恢複先前儲存的寄存器上下文和棧,直接操作棧則基本沒有核心切換的開銷,可以不加鎖的通路全局變量,是以上下文的切換非常快。

  優點: 

1. 協程的切換開銷更小,屬于程式級别的切換,作業系統完全感覺不到,因而更加輕量級
2. 單線程内就可以實作并發的效果,最大限度地利用cpu      

  缺點:

 1. 協程的本質是單線程下,無法利用多核,可以是一個程式開啟多個程序,每個程序内開啟多個線程,每個線程内開啟協程

 2. 協程指的是單個線程,因而一旦協程出現阻塞,将會阻塞整個線程

 

  總結協程特點:

  1. 必須在隻有一個單線程裡實作并發
  2. 修改共享資料不需加鎖
  3. 使用者程式裡自己儲存多個控制流的上下文棧
  4. 附加:一個協程遇到IO操作自動切換到其它協程(如何實作檢測IO,yield、greenlet都無法實作,就用到了gevent子產品(select機制))

四,greenlet子產品

  如果我們現在有一個單線程,裡面有10個任務,那麼如果我們使用yield生成器來實作的話,太麻煩了(初始化生成器,調用send).這就用到了greenlet子產品了.

  Greenlet子產品和yield沒有什麼差別,就隻是單純的切,跟效率無關。隻不過比yield更好一點,切的時候友善一點。但是仍然沒有解決效率.Greenlet可以讓你在多個任務之間來回的切.

安裝子產品:

pip3 install greenlet

#! -*- coding:utf-8 -*-
from greenlet import greenlet


def eat(name):
    print("%s is eating 1"%name)
    g2.switch("jack")
    print("%s is eating 2"%name)
    g2.switch()


def running(name):
    print("%s is running"%name)
    g1.switch()
    print("%s is not running"%name)


if __name__ == '__main__':
    g1 = greenlet(eat)
    g2= greenlet(running)

    g1.switch("alex")      

greenlet例子

單純的切換(在沒有io的情況下或者沒有重複開辟記憶體空間的操作),反而會降低程式的執行速度.

import time
def f1():
    res=1
    for i in range(100000000):
        res+=i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start=time.time()
f1()
f2()
stop=time.time()#10.354592561721802



from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 51.55694890022278      

單純的切換,反而降低效率

greenlet隻是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。單線程裡的這10個任務的代碼通常會既有計算操作又有阻塞操作,我們完全可以在執行任務1時遇到阻塞,就利用阻塞的時間去執行任務2。。。。如此,才能提高效率,這就用到了Gevent子產品。

五,gevnet子產品

  安裝:

pip3 install gevent      

  Gevent是一個第三方庫,可以輕松通過gevent實作并發同步或異步程式設計,在gevent中用到的主要模式是Greenlet,它是以C擴充子產品形式接入Python的輕量級協程。Greenlet全部運作在主程式作業系統程序的内部,但它們被協作式地排程。

  用法:  

import gevent

def test(name):
    print("%s is eating"%name)
    return 1111

def test2(name):
    print("%s is going"%name)
    return 3333

# g1 = gevent.spawn(函數名,位置參數(*args),關鍵字參數(**kwargs))
g1 = gevent.spawn(test,"jack")
g2 = gevent.spawn(test2,"alex")
gevent.joinall([g1,g2])     #等待g1,g2結束,也可以寫成單個g1.join(),g2.join()
# g1.join()
print(g1.value)     #拿到傳回值
print(g2.value)      

gevnet的一些方法:

# from gevent import monkey;monkey.patch_all()
import gevent
import time

def eat(name):
    print("%s is eat"%name)
    # time.sleep(1.5)  #模拟IO阻塞
    """
    如果使用time.sleep()表示時間等待的話,需要在代碼頂部加入一行 from gevent import monkey;monkey.patch_all()
    如果使用gevent.sleep()則無需加入代碼
    
    """
    gevent.sleep(1.5)
    print("%s is eat 1"%name)
    return "eat"

def play(name):
    print('%s play 1'%name)
    # time.sleep(3)
    gevent.sleep(3)
    print('%s play 2'%name)
    return 'paly'  # 當有傳回值的時候,gevent子產品也提供了傳回結果的操作

start_time = time.time()
g1 = gevent.spawn(eat,"jack")
g2 = gevent.spawn(play,"Lucy")
gevent.joinall([g1,g2])
print("main",time.time()-start_time)
print(g1.value)
print(g2.value)      

注意time.sleep()和gevent.sleep()

注意:

gevent.sleep(1.5)模拟的是gevent可以識别的io阻塞,

而time.sleep(1.5)或其他的阻塞,gevent是不能直接識别的需要用下面一行代碼,打更新檔,就可以識别了

from gevent import monkey;monkey.patch_all()必須放到被打更新檔者的前面,如time,socket子產品之前

或者我們幹脆記憶成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到檔案的開頭

from gevent import monkey;monkey.patch_all()
import gevent
import time


def task(pid):
    time.sleep(0.5)
    print("Task %s is done" % pid)


def synchronous(): #同步
    for i in range(10):
        task(i)


def asynchronoues():    #異步
    g_l = [gevent.spawn(task,i) for i in range(10)]
    print(g_l)
    gevent.joinall(g_l)

if __name__ == '__main__':
    print("sync")
    synchronous()
    print("async")
    asynchronoues()

#上面程式的重要部分是将task函數封裝到Greenlet内部線程的gevent.spawn。 初始化的greenlet清單存放在數組threads中,此數組被傳給gevent.joinall 函數,後者阻塞目前流程,并執行所有給定的greenlet。執行流程隻會在 所有greenlet執行完後才會繼續向下走。      

同步異步