主要是為了實作下載下傳一些大檔案下載下傳源比較慢的問題,代碼可以直接用,更換下載下傳連接配接即可。
import os
import time
import httpx
from tqdm import tqdm
from threading import Thread
class DownloadFile(object):
def __init__(self, download_url, data_folder, thread_num):
"""
:param download_url: 檔案下載下傳連接配接
:param data_folder: 檔案存儲目錄
:param thread_num: 開辟線程數量
"""
self.download_url = download_url
self.data_folder = data_folder
self.thread_num = thread_num
self.file_size = None
self.cut_size = None
self.tqdm_obj = None
self.thread_list = []
self.file_path = os.path.join(self.data_folder, download_url.split('/')[-1])
def downloader(self, etag, thread_index, start_index, stop_index, retry=False):
sub_path_file = "{}_{}".format(self.file_path, thread_index)
if os.path.exists(sub_path_file):
temp_size = os.path.getsize(sub_path_file) # 本地已經下載下傳的檔案大小
if not retry:
self.tqdm_obj.update(temp_size) # 更新下載下傳進度條
else:
temp_size = 0
if stop_index == '-': stop_index = ""
headers = {'Range': 'bytes={}-{}'.format(start_index + temp_size, stop_index),
'ETag': etag, 'if-Range': etag,
}
down_file = open(sub_path_file, 'ab')
try:
with httpx.stream("GET", self.download_url, headers=headers) as response:
num_bytes_downloaded = response.num_bytes_downloaded
for chunk in response.iter_bytes():
if chunk:
down_file.write(chunk)
self.tqdm_obj.update(response.num_bytes_downloaded - num_bytes_downloaded)
num_bytes_downloaded = response.num_bytes_downloaded
except Exception as e:
print("Thread-{}:請求逾時,嘗試重連\n報錯資訊:{}".format(thread_index, e))
self.downloader(etag, thread_index, start_index, stop_index, retry=True)
finally:
down_file.close()
return
def get_file_size(self):
"""
擷取預下載下傳檔案大小和檔案etag
:return:
"""
with httpx.stream("GET", self.download_url) as response2:
etag = ''
total_size = int(response2.headers["Content-Length"])
for tltle in response2.headers.raw:
if tltle[0].decode() == "ETag":
etag = tltle[1].decode()
break
return total_size, etag
def cutting(self):
"""
切割成若幹份
:param file_size: 下載下傳檔案大小
:param thread_num: 線程數量
:return:
"""
cut_info = {}
cut_size = self.file_size // self.thread_num
for num in range(1, self.thread_num + 1):
if num != 1:
cut_info[num] = [cut_size, cut_size * (num - 1) + 1, cut_size * num]
else:
cut_info[num] = [cut_size, cut_size * (num - 1), cut_size * num]
if num == self.thread_num:
cut_info[num][2] = '-'
return cut_info, cut_size
def write_file(self):
"""
合并分段下載下傳的檔案
:param file_path:
:return:
"""
if os.path.exists(self.file_path):
if len(self.file_path) >= self.file_size:
return
with open(self.file_path, 'ab') as f_count:
for thread_index in range(1, self.thread_num + 1):
with open("{}_{}".format(self.file_path, thread_index), 'rb') as sub_write:
f_count.write(sub_write.read())
# 合并完成删除子檔案
os.remove("{}_{}".format(self.file_path, thread_index))
return
def create_thread(self, etag, cut_info):
"""
開辟多線程下載下傳
:param file_path: 檔案存儲路徑
:param etag: headers校驗
:param cut_info:
:return:
"""
for thread_index in range(1, self.thread_num + 1):
thread = Thread(target=self.downloader,
args=(etag, thread_index, cut_info[thread_index][1], cut_info[thread_index][2]))
thread.setName('Thread-{}'.format(thread_index))
thread.setDaemon(True)
thread.start()
self.thread_list.append(thread)
for thread in self.thread_list:
thread.join()
return
def check_thread_status(self):
"""
查詢線程狀态。
:return:
"""
while True:
for thread in self.thread_list:
thread_name = thread.getName()
if not thread.isAlive():
print("{}:已停止".format(thread_name))
time.sleep(1)
def create_data(self):
if not os.path.exists(self.data_folder):
os.mkdir(self.data_folder)
return
def main(self):
# 平分幾份
self.create_data()
self.file_size, etag = self.get_file_size()
# 按線程數量均勻切割下載下傳檔案
cut_info, self.cut_size = self.cutting()
# 下載下傳檔案名稱
# 建立下載下傳進度條
self.tqdm_obj = tqdm(total=self.file_size, unit_scale=True, desc=self.file_path.split('/')[-1],
unit_divisor=1024,
unit="B")
# 開始多線程下載下傳
self.create_thread(etag, cut_info)
# 合并多線程下載下傳檔案
self.write_file()
return
if __name__ == '__main__':
download_url = "https://acvrpublicycchen.blob.core.windows.net/dialogpt/keys-full.tar"
data_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'Data')
thread_num = 20
downloader = DownloadFile(download_url, data_folder, thread_num)
downloader.main()