天天看點

python多線程-queue隊列類型優先級隊列,FIFO,LIFO

Queue在python3中重命名為queue,在python2到python3轉換中可以自動轉換

隊列可應用在多個生産者多個消費者的模型中<<詳情參考>>,并且在多線程中可用于線程之間資料資訊的安全交換通信,防止沖突。

在隊列中已經實作多線程的鎖機制

隊列Queue提供三種隊列類型

主要差別是操作順序的不同:

1.class Queue.Queue(maxsize=0)

FIFO隊列,先進先出,maxsize定義隊列長度,若maxsize等于0或者小于0,則該隊列長度為無限,預設是無限

2.class Queue.LifoQueue(maxsize=0)

LIFO 後進先出,類似于堆棧,若maxsize等于0或者小于0,則該隊列長度為無限,預設是無限

3.class Queue.PriorityQueue(maxsize=0)

優先級隊列,值越低則優先級越高,若maxsize等于0或者小于0,則該隊列長度為無限,預設是無限

異常:

1.exception Queue.Empty

當get()或get_nowait() 觸發

2.exception Queue.Full

當put()或put_nowait()觸發

三種隊列均提供如下方法:

1.Queue.qsize() 隊列長度,注:當qsize長度大于0時,get()并不一定成功,當長度小于maxsize時,put()也不一定成功

2.Queue.empty() 是否隊列為空,空True,反之False

3.Queue.full() 隊列是否滿,滿True,反之False

4.Queue.get([block[, timeout]])

擷取隊列一個排隊對象,timeout等待時間,如果get等待timeout時間,仍為空,則抛出異常。預設block為true timeout為null,一直等待有對象傳回

5.Queue.get_nowait() 相當Queue.get(False)

6.Queue.put(item)

将一個對象寫入隊列,timeout等待時間,如果一個put等待了timeout時間,仍為滿,則會抛出異常。預設block為true timeout為null,一直等待對象寫入

7.Queue.put_nowait(item) 相當Queue.put(item, False)

8.Queue.task_done()

9.Queue.join() 相當與threading中将程序挂起,知道将子線程中的所有任務都處理完成後,才退出join,進行後面的步驟

例如模型:

def worker():

while True:

item = q.get()

do_work(item)

q.task_done()

q = Queue()

for i in range(num_worker_threads):

t = Thread(target=worker)

t.daemon = True

t.start()

for item in source():

q.put(item)

q.join()

1.FIFO隊列先進先出:

#coding=utf8
import Queue

queuelist = Queue.Queue()

for i in range():
    if not queuelist.full():
       queuelist.put(i)
       print "put list : %s ,now queue size is %s "%(i,queuelist.qsize())

while not queuelist.empty():
    print "get list : %s , now queue size is %s"%(queuelist.get(),queuelist.qsize())
           

輸出:

put list :  ,now queue size is  
put list :  ,now queue size is  
put list :  ,now queue size is  
put list :  ,now queue size is  
put list :  ,now queue size is  
get list :  , now queue size is 
get list :  , now queue size is 
get list :  , now queue size is 
get list :  , now queue size is 
get list :  , now queue size is 
           

2.LIFO隊列先進後出:

#coding=utf8
import Queue

queuelist = Queue.LifoQueue()

for i in range():
    if not queuelist.full():
       queuelist.put(i)
       print "put list : %s ,now queue size is %s "%(i,queuelist.qsize())

while not queuelist.empty():
    print "get list : %s , now queue size is %s"%(queuelist.get(),queuelist.qsize())
           

輸出:

put list :  ,now queue size is  
put list :  ,now queue size is  
put list :  ,now queue size is  
put list :  ,now queue size is  
put list :  ,now queue size is  
get list :  , now queue size is 
get list :  , now queue size is 
get list :  , now queue size is 
get list :  , now queue size is 
get list :  , now queue size is 
           

3.優先隊列PriorityQueue

ps:這個優先隊列有些垃圾,沒有直接傳入優先級的參數

示例:

#coding=utf8
import Queue
import random

queuelist = Queue.PriorityQueue()

for i in range():
    if not queuelist.full():
        x=random.randint(,)
        y=random.randint(,)
        print y,x
        queuelist.put({y:"th-%d"%x})

while not queuelist.empty():
    print "get list : %s , now queue size is %s"%(queuelist.get(),queuelist.qsize())
           

輸出:

get list : {: 'th-19'} , now queue size is 
get list : {: 'th-13'} , now queue size is 
get list : {: 'th-5'} , now queue size is 
get list : {: 'th-20'} , now queue size is 
get list : {: 'th-16'} , now queue size is 
           

優先級優先值傳入的幾種傳入方法:

将queuelist.put({y:”th-%d”%x})可替換為如下:

1.queuelist.put((y,”th-%d”%x))

2.queuelist.put([y,”th-%d”%x])

若不可替換queuelist.put({y:”th-%d”%x})修改為如下put,沒有想要的優先隊列效果:

1.queuelist.put(y,”th-%d”%x)

2.queuelist.put({y,”th-%d”%x})

檢票綜合示例:

模拟檢票過程

排3隊進行檢票,一個隊列一個線程

票有編号

檢票時間有固定視窗,若時間到了通知隊列關閉

#coding=utf8

import Queue
import threading
import time

exitsingle = 

class myThread(threading.Thread):
    def __init__(self, threadname, queuelist):
        threading.Thread.__init__(self)
        self.threadname = threadname
        self.queuelist = queuelist

    def run(self):
        print "Starting queue %s"%self.threadname
        queue_enter(self.threadname, self.queuelist)
        time.sleep()
        print "close  " + self.threadname

def queue_enter(threadname, queuelist):
    while not exitsingle:
        queueLock.acquire()
        if not workQueue.empty():
            data = queuelist.get()
            queueLock.release()
            print "%s check ticket %s" % (threadname, data)
        else:
            queueLock.release()
        time.sleep()


threadList = ["list-1", "list-2", "list-3"]
queueLock = threading.Lock()
workQueue = Queue.Queue()
threads = []

queueLock.acquire()
for num in range(,):
    workQueue.put(num)

queueLock.release()

print "start .."

for name in threadList:
    thread = myThread( name, workQueue)
    thread.start()
    threads.append(thread)

while not workQueue.empty():
     pass

exitsingle = 

for t in threads:
    t.join()
print "stop enter.."
           

輸出:

start ..
Starting queue list-
list- check ticket 
Starting queue list-
list- check ticket 
Starting queue list-
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
list- check ticket 
close  list-
close  list-
close  list-
stop enter..