天天看點

Python線程——單線程和多線程的建立以及測試1、Python單線程2、Python多線程

1、Python單線程

python的線程需要導入threading包,一般預設都有的,具體建立單線程并測試代碼如下

import threading
import time

def test(hostpool):
    for i in range(5):
        print("test:",i," ",hostpool)
        time.sleep(1)
    
thread=threading.Thread(target=test,kwargs={"hostpool":"def"})
# target是線程調用的函數,kwargs是傳給target函數的參數名稱字典

thread.start()
# 開啟線程
           

2、Python多線程

由于測試的需要,需要在多線程内加入定時任務,這裡采用的是schedule子產品來輔助實作相關功能

測試代碼如下:

import schedule
import time
import datetime
import threading

def test(hostpool):
	# 實際被調用的函數
    print("------",hostpool,":",datetime.datetime.now())

def test_predict(hostpool):
    # 定時任務
    schedule.every(10).seconds.do(test,{"hostpool":hostpool})
    while True:
        schedule.run_pending()
        time.sleep(1)


if __name__ == '__main__':
    f=open("hostpool.txt")
    data=f.readlines()
    hostpools=[]
    for d in data:
        hostpool=d.split("-")[0]
        hostpools.append(hostpool)
    
    # 建立線程池并啟動
    threadpool=[]
    for hostpool in hostpools:
        th=threading.Thread(target=test_predict,kwargs={"hostpool":hostpool})
        threadpool.append(th)
    for th in threadpool:
        th.start()
    for th in threadpool:
        threading.Thread.join(th)
    

           

展示結果:

Python線程——單線程和多線程的建立以及測試1、Python單線程2、Python多線程