天天看點

Python記錄檔、加密、發送郵件、線程、生産者消費者

1、記錄檔

logging.basicConfig:日志的統一處理器,對日志的輸出格式和方式做配置

日志級别等級CRITICAL > ERROR > WARNING > INFO > EDBUG

level設定級别以及以上級别的才會列印,這裡注意大小寫!

列印日志資訊在控制台或輸出在一個檔案示例:

1 import logging
 2 import os
 3 
 4 # log_file = os.path.join(os.getcwd(),'wlog.log')
 5 log_format = "%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s: %(message)s"
 6 '''
 7 如果不寫filename和filemode參數則預設列印到console
 8 '''
 9 logging.basicConfig(level=logging.WARNING,format=log_format)
10 # logging.basicConfig(level=logging.WARNING,format=log_format,filename=log_file,filemode='w')
11 
12 logging.warning("waring message")
13 logging.error("error message")      

輸出在控制台資訊如下:

2017-03-20 21:41:07,756 3.19.py [line:24] WARNING: waring message

2017-03-20 21:41:07,756 3.19.py [line:25] ERROR: error message

同時在控制台和輸出到檔案代碼示例:

1 # 建立一個logger
 2 logger = logging.getLogger("mylogger")
 3 logger.setLevel(logging.INFO)
 4 
 5 # 建立一個handler,将log寫入檔案中
 6 fh = logging.FileHandler('D:/pycharm workspace/practice/log.txt','a')
 7 fh.setLevel(logging.INFO)
 8 
 9 # 再建立一個handler,将log輸出在控制台
10 ch = logging.StreamHandler()
11 ch.setLevel(logging.CRITICAL)
12 
13 # 設定輸出格式
14 log_format = "%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s: %(message)s"
15 formatter = logging.Formatter(log_format)
16 fh.setFormatter(formatter)
17 ch.setFormatter(formatter)
18 
19 #把handler添加到logger裡,其實可以了解為彙報給大上司
20 logger.addHandler(fh)
21 logger.addHandler(ch)
22 
23 logger.error("今天天氣陰")      

控制台設定為CRITICAL不會有輸出,因為列印的是error資訊

輸出到檔案設定為INFO,列印的是error資訊,會輸出在檔案中

如果設定成不一樣的實際是沒有任何意義。一般都設定為INFO。

另:

将執行腳本的日志儲存在一個檔案中

1     dirfile = os.listdir("D:\\")
2     for i in dirfile:
3         s=i.split('.')[1]
4         print(s)
5         if s == "py":
6             os.system("D:\\%s 1>>log.txt 2>&1" %i)      

2、加密

#DES加密

# pyDes.des(key,[mode],[IV],[pad],[pdamode])

# 參數的意思分别如下:

# key:加密密鑰,長度為8位。必選

# mode:加密方式。ECB(預設)、CBC(安全性好于前者)

# IV:初始位元組數(長度為8位),如果選擇的加密方式為CBC必須有這個參數。否則可以沒有

# pad:加密時,将該字元添加到資料塊的結尾;解密時,将删除從最後一個往前的8位

# padmode:PAD_NORMAL、PAD_PKCSS,當選擇前者時必須設定pad

md5、sha、des加密代碼示例:

1 import hashlib     #md5 sha
 2 import base64      #des
 3 from pyDes import *
 4 
 5 def md5_encode(data):
 6     m = hashlib.md5()
 7     m.update(data.encode('utf-8'))
 8     return m.hexdigest()     #經過特殊處理之後以字元串形式傳回
 9 
10 def sha1_encode(data):
11     sha1 = hashlib.sha1()
12     sha1.update(data.encode('utf-8'))
13     return sha1.hexdigest()
14 
15 def des_encode(data):
16     k = des("xqtest66",padmode=PAD_PKCS5)
17     # k = des('xqtest66',CBC,'goodluck',pad='hahahaha',padmode=PAD_NORMAL)
18 
19     #encrypt來加密我的資料,然後進行base64編碼
20     encodeStrr = base64.b64encode(k.encrypt(data))
21     return encodeStrr
22 
23 data = "wo"
24 print('md5加密結果:',md5_encode(data))
25 print('sha加密結果:',sha1_encode(data))
26 print('des加密結果:',des_encode(data))      

3、發送郵件

1 import smtplib
 2 import email.mime.multipart
 3 import email.mime.text
 4 
 5 from email.mime.application import MIMEApplication
 6 
 7 class SendMail:
 8     def send_mail(self,title):
 9         msg = email.mime.multipart.MIMEMultipart() #生成包含多個郵件體的對象
10         msg['from'] = 'jiayan****@126.com'
11         msg['to'] = '[email protected]'
12         msg['subject'] = title
13         content = '''
14         這是郵件的正文部分
15         '''
16 
17         #郵件正文
18         txt = email.mime.text.MIMEText(content)
19         msg.attach(txt)
20 
21         #excel附件
22         # xlsxpart = MIMEApplication(open('send_mail_excel.xlsx', 'rb').read())
23         # xlsxpart.add_header('Content-Disposition', 'attachment', filename='send_mail_excel.xlsx')
24         # msg.attach(xlsxpart)
25 
26         #jpg圖檔附件
27         jpgpart = MIMEApplication(open('Aaron.png', 'rb').read())
28         jpgpart.add_header('Content-Disposition', 'attachment', filename='Aaron.png')   #需要圖檔檔案在代碼相應的目錄下
29         msg.attach(jpgpart)
30 
31         #發送郵件
32         smtp=smtplib
33         smtp=smtplib.SMTP()
34         smtp.set_debuglevel(1)            #設定為調試模式,console中顯示
35         smtp.connect('smtp.126.com','25') #連結伺服器,smtp位址+端口
36         smtp.login('jiayan****@126.com','Jiaxxxxxxxx')                   #登入,使用者名+密碼
37         smtp.sendmail('jiayan****@126.com','[email protected]',str(msg)) #發送,from+to+内容
38         smtp.quit()
39 
40 mail = SendMail()
41 mail.send_mail('python自動化測試')      

 查找最進時間修改的檔案,代碼如下:

1 os.path.listdir  #以清單的形式展示檔案
 2 os.path.getmtime #最後修改的時間
 3 os.path.join     #路徑拼接
 4 
 5 import os
 6 filenames = "D:\\pycharm workspace\\appiumframework\\report"
 7 lists = os.listdir(filenames)
 8 print(lists)
 9 lists.sort(key=lambda fn:os.path.getmtime(filenames+"\\"+fn))
10 print(lists[-1])
11 file = os.path.join(filenames,lists[-1])
12 print(file)      
4、程序與線程的差別:
程序不共享空間,線程共享位址空間

線程共享空間優缺點:
優點:多線程給使用者的體驗好些,處理速度快些
缺點:共享位址空間互相影響

      
1 import threading
 2 import time
 3 
 4 class Mythreading(threading.Thread):
 5     def __init__(self,threadID,name,counter):
 6         threading.Thread.__init__(self)    #固定格式
 7         self.threadID = threadID
 8         self.name = name
 9         self.counter = counter
10         print("初始化完成")
11     def run(self):                         #由cpu來處理決定線程間的執行順序
12         print("開始"+self.name)
13         print_time(self.name,self.counter,5)
14         print("結束"+self.name)
15 
16 def print_time(threasName,counter,delay):
17     while counter:
18         time.sleep(delay)
19         print("%s:%s"%(threasName,time.ctime(time.time())))
20         counter -= 1
21 
22 #建立線程
23 thread1 = Mythreading(1,"thread1",1)
24 thread2 = Mythreading(2,"thread2",2)
25 
26 #開啟線程
27 thread1.start()
28 thread2.start()      
1 import threading
 2 import time
 3 
 4 class Mythreading(threading.Thread):
 5     def __init__(self,threadID,name,counter):
 6         threading.Thread.__init__(self)    #固定格式
 7         self.threadID = threadID
 8         self.name = name
 9         self.counter = counter
10         print("初始化完成")
11     def run(self):                         #由cpu來處理決定線程間的執行順序
12         threadLock.acquire()               #獲得鎖,成功獲得鎖定後傳回True,可選的參數timeout不填時将一直阻塞直到獲得鎖定
13         print_time(self.name,self.counter,3)
14         threadLock.release()               #釋放鎖,開始下一個線程
15 
16 def print_time(threasName,counter,delay):
17     while counter:
18         time.sleep(delay)
19         print("%s:%s"%(threasName,time.ctime(time.time())))
20         counter -= 1
21 
22 threadLock = threading.Lock()
23 threads = []
24 
25 #建立線程
26 thread1 = Mythreading(1,"thread1",1)
27 thread2 = Mythreading(2,"thread2",2)
28 
29 #開啟線程
30 thread1.start()
31 thread2.start()
32 
33 # thread1.join()
34 # thread2.join()
35 threads.append(thread1)
36 threads.append(thread2)
37 for t in threads:
38     t.join()       #後邊的代碼必須等待,等線程運作完成才會往後運作代碼
39 
40 print("我的的花兒也謝了")      

為什麼下圖左為串行,下圖右為并行運作呢?

Python記錄檔、加密、發送郵件、線程、生産者消費者

圖左love啟動後分别執行start和join,啟動了join後邊代碼就需要等待前邊代碼運作完成。總共18s

圖右同時啟動love和hate,運作所需要執行的時間然後停止。總共10s

 超級播放器示例,如下:

1 import threading
 2 from time import sleep, ctime
 3 def music(func):
 4     for i in range(2):
 5         print ("I was listening to %s! %s" %(func,ctime()))
 6         sleep(4)
 7 def move(func):
 8     for i in range(2):
 9         print ("I was at the %s! %s" %(func,ctime()))
10         sleep(5)
11 
12 def player(name):
13     r = name.split('.')[1]
14     if r=="mp3":
15         music(name)
16     elif r=="mp4":
17         move(name)
18     else:
19         print("%s is error!"%name)
20 
21 lists = ["love.mp3","hate.mp4","cuicui.mp3","nnnn.mp4"]
22 
23 threads = []
24 files = range(len(lists))
25 for i in files:
26     t = threading.Thread(target=player,args=(lists[i],))
27     threads.append(t)
28 
29 if __name__ == '__main__':
30     for i in files:
31         threads[i].start()
32     for i in files:
33         threads[i].join()
34     print ('all end: %s' %ctime())      

5、生産者與消費者示例:

1 import threading
 2 class Produce(threading.Thread):
 3 
 4     def __init__(self,name):
 5         threading.Thread.__init__(self)
 6         self.name = name
 7     def run(self):
 8         global x
 9         tt.acquire()
10         if x > 0 :
11 
12             print("我不生産了")
13         else:
14             for i in range(5):
15                 x += 1
16                 print("%s在生産中,第%d個"%(self.name,x))
17         tt.release()
18 
19 class Consume(threading.Thread):
20     def __init__(self,name):
21         threading.Thread.__init__(self)
22         self.name = name
23     def run(self):
24         global x
25         tt.acquire()
26         if x == 0:
27 
28             print("我不消費了")
29         else:
30             for i in range(5):
31                 x -= 1
32                 print("%s在消費中,第%d個"%(self.name,x+1))
33         tt.release()
34 x = 0
35 tt = threading.Lock()
36 # tt = threading.Condition
37 
38 p = Produce("produce")
39 c = Consume("consume")
40 
41 p.start()
42 c.start()
43 
44 p.join()
45 c.join()      
上一篇: 階段沖刺9
下一篇: 階段沖刺10