1、Python單線程
python的線程需要導入threading包,一般預設都有的,具體建立單線程并測試代碼如下
import threading
import time
def test(hostpool):
for i in range(5):
print("test:",i," ",hostpool)
time.sleep(1)
thread=threading.Thread(target=test,kwargs={"hostpool":"def"})
# target是線程調用的函數,kwargs是傳給target函數的參數名稱字典
thread.start()
# 開啟線程
2、Python多線程
由于測試的需要,需要在多線程内加入定時任務,這裡采用的是schedule子產品來輔助實作相關功能
測試代碼如下:
import schedule
import time
import datetime
import threading
def test(hostpool):
# 實際被調用的函數
print("------",hostpool,":",datetime.datetime.now())
def test_predict(hostpool):
# 定時任務
schedule.every(10).seconds.do(test,{"hostpool":hostpool})
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
f=open("hostpool.txt")
data=f.readlines()
hostpools=[]
for d in data:
hostpool=d.split("-")[0]
hostpools.append(hostpool)
# 建立線程池并啟動
threadpool=[]
for hostpool in hostpools:
th=threading.Thread(target=test_predict,kwargs={"hostpool":hostpool})
threadpool.append(th)
for th in threadpool:
th.start()
for th in threadpool:
threading.Thread.join(th)
展示結果: