天天看點

wxpython-通過request遠端下載下傳網絡zip檔案,并解壓安裝檔案

  • 明确需求(更新程式)

1.通過wxpython,産生一個窗體,窗體上有一段[文字标簽],一個[進度條],一個[開始按鈕]。

2.點選【開始按鈕】,下載下傳網絡資源檔案

http://example.cn/test.zip

。進度條和文字标簽同時顯示百分比

3.下載下傳完成後,解壓到指定目錄。如果指定目錄下有檔案,則覆寫掉。

  • 設計界面

wxpython-通過request遠端下載下傳網絡zip檔案,并解壓安裝檔案

  • 程式關鍵點

1.wxpython用的進度條控件是wx.guage.定義如下:

self.gauge_1 = wx.Gauge(self, wx.ID_ANY, 100, style=wx.GA_HORIZONTAL | wx.GA_SMOOTH)      

設定進度條的方法如下:msg填的是數字

self.download_gauge.SetValue(msg)      

2.因為下載下傳時間長,是以需要在主線程外再啟用一個線程下載下傳,避免程式假死。

3.通過request.Session().get方法下載下傳比request.get下載下傳要快

requests.Session().get(url, verify=False, stream=True)      

4.通過stream的形式可以獲得下載下傳進度。如下,message是下載下傳進度,例如10%,message為10

for chunk in response.iter_content(chunk_size=512):
    if chunk:
        code.write(chunk)
        code.flush()
    i = i + 512
    number = int(i)
    message = number * 100 / total_length      

5.Zipfile解壓檔案,如果解壓目錄下有同名檔案,則會直接覆寫掉

azip = zipfile.ZipFile(zip_file_path)
azip.extractall(path=unzip_to_path)      
  • 完整代碼(frame窗體和event事件)

updateOTAFrame.py(窗體檔案,用wxglade建立)

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# generated by wxGlade 0.9.0pre on Thu Sep 06 09:41:05 2018
#
import wx
# begin wxGlade: dependencies
# end wxGlade
# begin wxGlade: extracode
# end wxGlade
class MyFrame(wx.Frame):
    def __init__(self, *args, **kwds):
        # begin wxGlade: MyFrame.__init__
        kwds["style"] = kwds.get("style", 0) | wx.DEFAULT_FRAME_STYLE
        wx.Frame.__init__(self, *args, **kwds)
        self.SetSize((475, 137))
        self.label_1 = wx.StaticText(self, wx.ID_ANY, "start")
        self.gauge_1 = wx.Gauge(self, wx.ID_ANY, 100, style=wx.GA_HORIZONTAL | wx.GA_SMOOTH)
        self.button_1 = wx.Button(self, wx.ID_ANY, "start")
        self.__set_properties()
        self.__do_layout()
        # end wxGlade
    def __set_properties(self):
        # begin wxGlade: MyFrame.__set_properties
        self.SetTitle(u"\u5347\u7ea7\u7a0b\u5e8f")
        # end wxGlade
    def __do_layout(self):
        # begin wxGlade: MyFrame.__do_layout
        sizer_1 = wx.BoxSizer(wx.VERTICAL)
        sizer_2 = wx.BoxSizer(wx.VERTICAL)
        sizer_2.Add(self.label_1, 1, 0, 0)
        sizer_2.Add(self.gauge_1, 1, wx.EXPAND, 0)
        sizer_2.Add(self.button_1, 1, wx.ALIGN_CENTER, 0)
        sizer_1.Add(sizer_2, 1, wx.EXPAND, 0)
        self.SetSizer(sizer_1)
        self.Layout()
        # end wxGlade
# end of class MyFrame      

updateOTAevent.py(事件檔案,和窗體檔案分開)

# -*- coding: UTF-8 -*-
import os
import zipfile
from threading import Thread
import requests
import wx
from wx.lib.pubsub import pub
from view.window import updateOTAFrame
class MyApp(wx.App):
    def OnInit(self):
        self.frame = updateOTAFrame.MyFrame(None, wx.ID_ANY, "")
        self.frame.CenterOnScreen()
        self.download_gauge = self.frame.gauge_1
        self.start_button = self.frame.button_1
        self.frame.timer = wx.Timer(self.frame)  # 建立定時器
        # 綁定一個定時器事件,wxpython存在bug,不設定定時器,pub功能不會正常啟用
        self.frame.Bind(wx.EVT_TIMER, self.on_timer, self.frame.timer)
        self.frame.Show()
        self.start_button.Bind(wx.EVT_BUTTON, self.start_event)
        pub.subscribe(self.update_ota, "ota_topic")
        pub.subscribe(self.download_text_topic, "download_text_topic")
        self.download_text = self.frame.label_1
        self.download_text.SetLabelText("點選開始更新按鈕,即刻開始更新")
        self.start_button.SetLabelText("開始更新")
        pub.subscribe(self.close_frame, "close_download_topic")
        return True
    # 下載下傳完成後,關閉視窗
    def close_frame(self, msg):
        self.frame.Destroy()
    # 開始下載下傳按鈕事件
    def start_event(self, event):
        self.start_button.SetLabelText("正在更新")
        self.download_text.SetLabelText("正在下載下傳更新包,請不要關閉程式,目前進度:0%")
        self.start_button.Disable()
        GuageThread()
        event.Skip()
    # 控制下載下傳的時候的文字
    def download_text_topic(self, msg):
        if msg < 100:
            self.download_text.SetLabelText("正在下載下傳更新包,請不要關閉程式,目前進度:" + str(msg) + "%")
        else:
            self.download_text.SetLabelText('下載下傳成功,現在開始解壓,請耐心等待大于10秒')
            self.start_button.SetLabelText('正在解壓')
    # 控制下載下傳的進度條
    def update_ota(self, msg):
        if msg < 100:  # 如果是數字,說明線程正在執行,顯示數字
            self.download_gauge.SetValue(msg)
        else:
            self.download_gauge.SetValue(msg)
    def on_timer(self, evt):  # 定時執行檢查網絡狀态
        pass
# 另外啟動一個線程來控制進度條,不然程式會假死
class GuageThread(Thread):
    def __init__(self):
        # 線程執行個體化時立即啟動
        Thread.__init__(self)
        self.start()
    def run(self):
        # 線程執行的代碼
        self.download_file()
    def download_file(self):
        # url = "http://example.cn/test.zip" 是網絡上的zip壓縮封包件
        url = "http://example.cn/test.zip"
        # 通過Session來下載下傳,速度比直接requests.get快了大約百分之30
        response = requests.Session().get(url, verify=False, stream=True)
        total_length = int(response.headers.get("Content-Length"))
        with open(os.path.abspath(os.getcwd() + "/resource/download/new.zip"), "wb") as code:
            i = 0
            temp = 0
            # 用chunk_size的方法來下載下傳,可以知道目前的下載下傳進度。chunk_size影響每次寫入的記憶體大小
            for chunk in response.iter_content(chunk_size=512):
                if chunk:
                    code.write(chunk)
                    code.flush()
                i = i + 512
                number = int(i)
                # 因為進度條的長度設定成了100,是以這裡要乘以100
                message = number * 100 / total_length
                wx.CallAfter(pub.sendMessage, "ota_topic", msg=message)
                if temp != message:
                    temp = message
                    wx.CallAfter(pub.sendMessage, "download_text_topic", msg=message)
        filepath = os.path.abspath(os.getcwd() + "/resource/download/new.zip")
        # 直接放在程式根目錄下了
        foldpath = os.path.abspath(os.getcwd())
        self.unzip(filepath, foldpath)
        wx.CallAfter(pub.sendMessage, "close_download_topic", msg=1)
    # 解壓檔案用的zipfile,将解壓的檔案,放置到指定路徑下(覆寫複制)
    def unzip(self, zip_file_path, unzip_to_path):
        unzip_flag = False
        try:
            check_zip_flag = zipfile.is_zipfile(zip_file_path)
            if check_zip_flag:
                azip = zipfile.ZipFile(zip_file_path)
                azip.extractall(path=unzip_to_path)
                unzip_flag = True
        except Exception as e:
            print e.message
        finally:
            print "unzip_flag==========", unzip_flag
            return unzip_flag
# end of class MyApp
if __name__ == "__main__":
    app = MyApp(0)
    app.MainLoop()      

注:  把 url = "http://example.cn/test.zip" 修改為自己要下載下傳的網絡zip壓縮封包件。然後在updateOTAevent.py中運作即可。zip檔案會下載下傳完成後解壓至程式根目錄