天天看点

Python多线程拷贝指定文件到目标目录

需要源代码可私信

方案一: 直接套用脚本,需可以看懂一些脚本逻辑

这个函数使用 threading 模块来创建多个线程,并将每个线程分配到不同的目标目录中进行 txt 文件拷贝。每个线程都会执行 copy_txt_files 函数来拷贝指定目录中的 txt 文件。

import os
import shutil
import threading
import time

def copy_all_txt_files(source_dir, target_dirs, num_threads=10):
    """
    从源目录拷贝所有 txt 文件到多个不同的目标目录

    参数:
    source_dir:源目录路径
    target_dirs:目标目录路径组成的列表
    num_threads:线程数量,默认为 10

    """

    def copy_txt_files(source_dir, target_dir):
        """
        拷贝 txt 文件的函数,被多线程调用

        参数:
        source_dir:源目录路径
        target_dir:目标目录路径

        """
        txt_files = [f for f in os.listdir(source_dir) if os.path.isfile(os.path.join(source_dir, f)) and f.endswith(".txt")]
        for txt_file in txt_files:
            shutil.copy(os.path.join(source_dir, txt_file), os.path.join(target_dir, txt_file))

    # 创建多个线程并启动
    threads = []
    start_time = time.time()
    for target_dir in target_dirs:
        t = threading.Thread(target=copy_txt_files, args=(source_dir, target_dir))
        t.daemon = True
        t.start()
        threads.append(t)

    # 等待所有线程结束
    for t in threads:
        t.join()

    end_time = time.time()

    print(f"拷贝文件总共花费了 {(end_time-start_time):.2f} 秒")

# 示例用法
copy_all_txt_files("/path/folder", ["/path/folder1", "/path/folder2"], num_threads=6)           

方案二:直接调用封装脚本(写用例,执行脚本即可)

脚本实现封装后,只需要在ThreadProcessCopy.yaml文件中写用例即可,此后执行Threadprocess_Copy.py脚本即实现多线程拷贝文件夹下所有指定的文件如(.txt、.exe等结尾的文件)到指定目录

目录介绍:

Python多线程拷贝指定文件到目标目录

ThreadProcessCopy.yaml用例编写配置拷贝文件的源地址和目标地址

source_folder: 原始文件夹路径

dest_folders: 需存放目标文件夹,可多个同时填写

endswith: 特殊文件以什么结尾的 如 .txt .exe .dll 等等

num_threads:线程数 可自定义

ThreadingProcess:
  source_folder: \\127.0.0.1\d$\exportdir
  dest_folders:
    - \\127.0.0.2\d$\p
    - \\127.0.0.3\d$\p
  endswith: .txt
  num_threads: 5           

PublicConfig.py脚本: 配置读取信息,方便调用

import os
from Public_Utils.util_yaml import YamlReader

class YamlPath:
    def __init__(self):
        current = os.path.abspath(__file__)
        self.base_dir = os.path.dirname(os.path.dirname(current))
        self._config_path = self.base_dir + os.sep + "Public_Config\Public_yaml"

    def get_threadprocess_file(self):
        self._config_file = self._config_path + os.sep + "ThreadProcessCopy.yaml"
        return self._config_file

class ConfigYaml:
    def __init__(self):   #初始yaml读取配置文件
        self.threadprocess_config = YamlReader(YamlPath().get_threadprocess_file()).yaml_data()

    def get_threadprocess_yaml(self):
        return self.threadprocess_config['ThreadingProcess']

if __name__ == '__main__':
    pass           

Threadprocess_Copy.py执行脚本

import os
import shutil
import threading
import time

from Public_Config.PublicConfig import ConfigYaml

class ThreadProcess:

    def copy_all_sepcial_files(self,num_threads):
        """
        拷贝 txt 文件的函数,被多线程调用
        参数:
        source_dir:源目录路径
        target_dir:目标目录路径

        """
        source_dir = ConfigYaml().get_threadprocess_yaml()['source_folder']  # 需要复制的原始文件夹
        target_dirs = ConfigYaml().get_threadprocess_yaml()['dest_folders']  # 需要复制到的目标文件夹
        def copy_special_files(source_dir, target_dir):
            txt_files = [f for f in os.listdir(source_dir) if os.path.isfile(os.path.join(source_dir, f)) and f.endswith(ConfigYaml().get_threadprocess_yaml()['endswith'])]
            for txt_file in txt_files:
                shutil.copy(os.path.join(source_dir, txt_file), os.path.join(target_dir, txt_file))

        #创建多个线程并启动
        threads = []
        start_time = time.time()
        for target_dir in target_dirs:
            t = threading.Thread(target=copy_special_files,args=(source_dir,target_dir))
            t.daemon = True
            t.start()
            threads.append(t)

        #等待所有线程结束
        for t in threads:
            t.join()
        end_time = time.time()
        elapsed_time = end_time - start_time
        print(f"拷贝总共花费{elapsed_time:.2f}秒")

if __name__ == '__main__':
    ThreadProcess().copy_all_sepcial_files(num_threads = ConfigYaml().get_threadprocess_yaml()['num_threads'])
    pass           

util_yaml.py

import os
import yaml

class YamlReader:
    #初始化,判断文件是否存在
    def __init__(self,yaml_file):
        if os.path.exists(yaml_file):
            self.yaml_file = yaml_file
        else:
            raise FileNotFoundError("yaml文件不存在")
        self._data = None
        self._data_all = None

    def yaml_data(self):  #yaml文件读取 --单个文档读取
        #第一次调用data,读取yaml文档,如果不是,直接返回之前保存的数据
        if not self._data:
            with open(self.yaml_file,'rb') as f:
                self._data = yaml.safe_load(f)
        return self._data

    def yaml_data_all(self):  #多个文档的读取
        if not self._data_all:
            with open(self.yaml_file,'rb') as f:
                self._data_all = yaml.safe_load_all(f)
        return self._data_all