天天看点

FastSpeech2 代码阅读笔记——数据处理(2)

作者:语音刺客

在进行数据处理前,先将LJSpeech数据集下载至本地,在FastSpeech2论文中使用强制对齐工具MFA从文本和音频中提取对齐信息,代码解析时使用的是作者提供的已经提取好的对齐信息文件,感兴趣的读者也可以自行下载、安装MFA提取对齐信息。根据仓库作者提供的链接下载的每一个*.TextGrid文件与一个音频对应,其中记录了word_level和phone_level两个级别的文本、对应持续时间(单位为秒)等信息,具体格式如下图所示,主要区别就是phone_level比word_level经精细,颗粒度更小。

1.TextGrid文件(MFA对齐文件)详解

FastSpeech2 代码阅读笔记——数据处理(2)

2.prepare_align.py

该文件就是相当于一个接口,针对不同的数据集调用对应的文件函数进行数据准备,主要就是调用数据集对应的prepare_align函数处理数据

import argparse
 
 import yaml
 
 from preprocessor import ljspeech, aishell3, libritts
 
 
 def main(config):
     if "LJSpeech" in config["dataset"]:
         ljspeech.prepare_align(config)
     if "AISHELL3" in config["dataset"]:
         aishell3.prepare_align(config)
     if "LibriTTS" in config["dataset"]:
         libritts.prepare_align(config)
 
 
 if __name__ == "__main__":
     parser = argparse.ArgumentParser()
     parser.add_argument("config", type=str, help="path to preprocess.yaml")  # 加载对应的yaml文件,便于后面添加相应参数
     args = parser.parse_args()
 
     config = yaml.load(open(args.config, "r"), Loader=yaml.FullLoader)
     main(config)
            

3.LibriTTS.py

虽然该文件只定义了prepare_align函数,但是该函数也只是简单的将LJSpeech数据集中的音频数据和文本数据进行了处理并保存,并没有提取对齐信息。

import os
 import librosa
 import numpy as np
 from scipy.io import wavfile
 from tqdm import tqdm
 from text import _clean_text
 
 
 def prepare_align(config):
     in_dir = config["path"]["corpus_path"] # "/home/ming/Data/LibriTTS/train-clean-360"
     out_dir = config["path"]["raw_path"] # "./raw_data/LibriTTS"
     sampling_rate = config["preprocessing"]["audio"]["sampling_rate"] # 22050
     max_wav_value = config["preprocessing"]["audio"]["max_wav_value"] # 32768
     cleaners = config["preprocessing"]["text"]["text_cleaners"] # english_cleaners
     # os.listdir() 返回指定目录下的所有文件名和目录名
     # 文件名为speaker
     for speaker in tqdm(os.listdir(in_dir)):
         # os.path.join()将in_dir与speaker连接起来,返回目录下的所有文件名和目录名
         # in_dir/speaker/chapter
         for chapter in os.listdir(os.path.join(in_dir, speaker)):
             # 返回in_dir/speaker/chapter目录下的所有文件
             # 该目录下的文件包括三种:.normalized.txt  .wav  .original.txt
             # file_name文件名
             for file_name in os.listdir(os.path.join(in_dir, speaker, chapter)):
                 if file_name[-4:] != ".wav":
                     continue
                 # 100_121669_000001_000000.normalized.txt
                 # 100_121669_000001_000000.original.txt
                 # 100_121669_000001_000000.wav
                 # 不是wav文件就跳过,取wav文件的文件名,不取后缀
                 base_name = file_name[:-4]
                 # .normalized.txt文件中存着一句英语句子,如Tom, the Piper's Son
                 text_path = os.path.join(
                     in_dir, speaker, chapter, "{}.normalized.txt".format(base_name)
                 )
                 wav_path = os.path.join(
                     in_dir, speaker, chapter, "{}.wav".format(base_name)
                 )
                 # 读取文本内容,如text=Tom, the Piper's Son
                 with open(text_path) as f:
                     text = f.readline().strip("\n")
                     
                 # ######## ①
                 
                 # 乱码处理、大小写处理、缩写展开、空格处理、数字处理
                 text = _clean_text(text, cleaners)
                 # 创建文件夹out_dir/speaker,且目录存在不会触发目录存在异常
                 os.makedirs(os.path.join(out_dir, speaker), exist_ok=True)
                 # librosa音频信号处理库函数
                 # load 从文件加载音频数据,而且可以通过参数设置是否保留双声道,采样率,重采样类型
                 # 返回类型wav为numpy.ndarray  _为sampling_rate
                 wav, _ = librosa.load(wav_path, sampling_rate)
                 # wav = wav / (max(|wav|) * 32768)
                 # 归一化,好处1,消除奇异样本数据的影响,好处2,cond
                 wav = wav / max(abs(wav)) * max_wav_value # 32768.0  ???
 
                 # 将numpy格式的wav写入到指定文件中,out_dir/speaker/{base_name}.wav,sr,数值类型转换
                 wavfile.write(
                     os.path.join(out_dir, speaker, "{}.wav".format(base_name)),
                     sampling_rate,
                     # 设置改类型是由ndarray中的数值大小范围决定的,int16:-32768~32768
                     wav.astype(np.int16),
                 )
                 # 打开out_dir/speaker/{base_name}.lab,
                 # 将从{base_name}.normalized.txt文件中读取出来,然后经过处理的text写入到{base_name}.lab文件中
                 with open(
                     os.path.join(out_dir, speaker, "{}.lab".format(base_name)),
                     "w",
                 ) as f1:
                     f1.write(text)           

4.preprocessor.py

该文件中才是从下载的TextGrid文件中提取每条音频对应的duration、pitch和energy信息;其中的config是通过config/LibriTTS/preprocess.yaml文件加载而来。

import os
 import random
 import json
 
 import tgt
 import librosa
 import numpy as np
 import pyworld as pw
 from scipy.interpolate import interp1d
 
 from tqdm import tqdm
 
 import audio as Audio
 
 # 定义处理所有数据的处理类
 class Preprocessor:
     def __init__(self, config):
         self.config = config
         # 原始数据的存放路径"./raw_data/LibriTTS"
         self.in_dir = config["path"]["raw_path"] 
         # 数据处理后的保存路径./preprocessed_data/LibriTTS"
         self.out_dir = config["path"]["preprocessed_path"] 
         self.val_size = config["preprocessing"]["val_size"] # 512
         self.sampling_rate = config["preprocessing"]["audio"]["sampling_rate"] # 22050
         self.hop_length = config["preprocessing"]["stft"]["hop_length"] # 256
 
         # assert condition==False,raise AssertionError()
         # pitch.feature == phoneme_level或者frame_level时继续运行,否则抛出异常
         assert config["preprocessing"]["pitch"]["feature"] in [
             "phoneme_level",
             "frame_level",
         ]
         # energy.feature == phoneme_level或者frame_level时继续运行,否则抛出异常
         assert config["preprocessing"]["energy"]["feature"] in [
             "phoneme_level",
             "frame_level",
         ]
         # 是否进行pitch_phoneme_averaging
         self.pitch_phoneme_averaging = (
             config["preprocessing"]["pitch"]["feature"] == "phoneme_level"
         )
         # 是否进行energy_phoneme_averaging
         self.energy_phoneme_averaging = (
             config["preprocessing"]["energy"]["feature"] == "phoneme_level"
         )
         # 是否进行正则化 is True
         self.pitch_normalization = config["preprocessing"]["pitch"]["normalization"]
         self.energy_normalization = config["preprocessing"]["energy"]["normalization"]
         
         # 初始化STFT模块
         self.STFT = Audio.stft.TacotronSTFT(
             config["preprocessing"]["stft"]["filter_length"],  # 1024
             config["preprocessing"]["stft"]["hop_length"],  # 256
             config["preprocessing"]["stft"]["win_length"],  # 1024
             config["preprocessing"]["mel"]["n_mel_channels"],  # 80
             config["preprocessing"]["audio"]["sampling_rate"],  # 22050
             config["preprocessing"]["mel"]["mel_fmin"],  # 0
             config["preprocessing"]["mel"]["mel_fmax"],  # 8000
         )
     
     # 提取所需要的数据
     def build_from_path(self):
         # out_dir:"./preprocessed_data/LibriTTS"
         # os.makedirs() 创建文件夹,exist_ok = True, 存在的话不会抛出异常
         os.makedirs((os.path.join(self.out_dir, "mel")), exist_ok=True)
         os.makedirs((os.path.join(self.out_dir, "pitch")), exist_ok=True)
         os.makedirs((os.path.join(self.out_dir, "energy")), exist_ok=True)
         os.makedirs((os.path.join(self.out_dir, "duration")), exist_ok=True)
 
         print("Processing Data ...")
         out = list()
         n_frames = 0
         pitch_scaler = StandardScaler()
         energy_scaler = StandardScaler()
 
         # Compute pitch, energy, duration, and mel-spectrogram
         speakers = {}
         # in_dir "./raw_data/LibriTTS" 下的所有文件都是说话人编号
         # tqdm 添加一个进度条
         for i, speaker in enumerate(tqdm(os.listdir(self.in_dir))):
             speakers[speaker] = i
             # in_dir/speaker 目录下的文件包括两种类型:{base_name}.lab , {base_name}.wav
             for wav_name in os.listdir(os.path.join(self.in_dir, speaker)):
                 if ".wav" not in wav_name:
                     continue
                 # 基于音频文件的basename构建对应的对齐文件路径名
                 basename = wav_name.split(".")[0]
                 # out_dir/TextGrid/speaker/{base_name}.TextGrid
                 # tg_path 为某个句子的TextGrid文件路径
                 tg_path = os.path.join(
                     self.out_dir, "TextGrid", speaker, "{}.TextGrid".format(basename)
                 )
                 if os.path.exists(tg_path):
                     print(tg_path)
 
                     # process_utterance() return
                     #     "|".join([basename, speaker, text(音素串), raw_text(音素串对应的文本)]),
                     #     self.remove_outlier(pitch),
                     #     self.remove_outlier(energy),
                     #     mel_spectrogram.shape[1],
 
                     ret = self.process_utterance(speaker, basename) # 提取单个音频的mel、pitch、energy数据
                     if ret is None:
                         continue
                     else:
                         info, pitch, energy, n = ret # n是mel谱图序列的总帧数
                     out.append(info) # 记录info中文本相关的数据,是一个用“|”分割的字符串
 
                 if len(pitch) > 0:
                     # reshape(-1,1)转换成一列
                     pitch_scaler.partial_fit(pitch.reshape((-1, 1)))
                 if len(energy) > 0:
                     energy_scaler.partial_fit(energy.reshape((-1, 1)))
 
                 n_frames += n
 
         print("Computing statistic quantities ...")
         # Perform normalization if necessary
         if self.pitch_normalization:
             pitch_mean = pitch_scaler.mean_[0]
             pitch_std = pitch_scaler.scale_[0]
         else:
             # A numerical trick to avoid normalization...
             pitch_mean = 0
             pitch_std = 1
         if self.energy_normalization:
             energy_mean = energy_scaler.mean_[0]
             energy_std = energy_scaler.scale_[0]
         else:
             energy_mean = 0
             energy_std = 1
         # ./preprocessed_data/LibriTTS/pitch
         # normalize() 进行归一化并且将归一化数据保存,返回最大值和最小值
         pitch_min, pitch_max = self.normalize(
             os.path.join(self.out_dir, "pitch"), pitch_mean, pitch_std
         )
         energy_min, energy_max = self.normalize(
             os.path.join(self.out_dir, "energy"), energy_mean, energy_std
         )
 
         # Save files
         # json.dump() 将一个python数据结构转为json格式
         with open(os.path.join(self.out_dir, "speakers.json"), "w") as f:
             f.write(json.dumps(speakers))
 
         with open(os.path.join(self.out_dir, "stats.json"), "w") as f:
             stats = {
                 "pitch": [
                     float(pitch_min),
                     float(pitch_max),
                     float(pitch_mean),
                     float(pitch_std),
                 ],
                 "energy": [
                     float(energy_min),
                     float(energy_max),
                     float(energy_mean),
                     float(energy_std),
                 ],
             }
             f.write(json.dumps(stats))
 
         print(
             "Total time: {} hours".format(
                 n_frames * self.hop_length / self.sampling_rate / 3600
             )
         )
 
         random.shuffle(out)
         out = [r for r in out if r is not None]
 
         # Write metadata 划分训练集文本数据和验证集文本数据
         # val_size = 512
         with open(os.path.join(self.out_dir, "train.txt"), "w", encoding="utf-8") as f:
             for m in out[self.val_size :]:
                 f.write(m + "\n")
         with open(os.path.join(self.out_dir, "val.txt"), "w", encoding="utf-8") as f:
             for m in out[: self.val_size]:
                 f.write(m + "\n")
 
         return out
     # 基于文件路径提取音频文件的mel、pitch、energy、duration数据
     def process_utterance(self, speaker, basename):
         # ./raw_data/LibriTTS/speaker/{basename}.wav
         # ./raw_data/LibriTTS/speaker/{basename}.lab
         # lab 文件存储音频对印的文本
         wav_path = os.path.join(self.in_dir, speaker, "{}.wav".format(basename))
         text_path = os.path.join(self.in_dir, speaker, "{}.lab".format(basename))
         # tg_path = ./preprocessed_data/LibriTTS/TextGrid/speaker/{base_name}.TextGrid
         tg_path = os.path.join(
             self.out_dir, "TextGrid", speaker, "{}.TextGrid".format(basename)
         )
 
         # Get alignments
         # 读取textgrid标注文件
         textgrid = tgt.io.read_textgrid(tg_path)
         # 获取文本的对应音素phone[],duration[],start,end
         # 数据提取。
         # phone中是textgrid对象中文本转为音素的列表,
         # duration中为音素列表中每个元素对应的mel帧数,即每个音素的持续时间,
         # start为音频开始时间,end为结束时间
         phone, duration, start, end = self.get_alignment(
             textgrid.get_tier_by_name("phones")
         )
         # text = {phone1 phone2 ... phone(n)}
         # 文本信息拼接成字符串方便存储
         text = "{" + " ".join(phone) + "}"
         if start >= end:
             return None
 
         # Read and trim wav files
         # 读取到该音素集合对应的音频片段
         wav, _ = librosa.load(wav_path)
         wav = wav[
             int(self.sampling_rate * start) : int(self.sampling_rate * end)
         ].astype(np.float32)
 
         # Read raw text
         # 读取到该音素集合对应的文本内容
         with open(text_path, "r") as f:
             raw_text = f.readline().strip("\n")
 
         # Compute fundamental frequency
         # 提取基频F0
         pitch, t = pw.dio(
             wav.astype(np.float64),
             self.sampling_rate,
             frame_period=self.hop_length / self.sampling_rate * 1000,
         )
         pitch = pw.stonemask(wav.astype(np.float64), pitch, t, self.sampling_rate)
 
         pitch = pitch[: sum(duration)] # 与总的mel谱图帧数对齐
         if np.sum(pitch != 0) <= 1:
             return None
         # 计算音素对应的mel——specttogram和能量
         # Compute mel-scale spectrogram and energy
         mel_spectrogram, energy = Audio.tools.get_mel_from_wav(wav, self.STFT) # 计算mel谱图
         mel_spectrogram = mel_spectrogram[:, : sum(duration)]
         energy = energy[: sum(duration)]
 
         if self.pitch_phoneme_averaging:
             # phoneme_level
             # perform linear interpolation,线性插值,就是将pitch序列中为0的值赋值一个合理的数值
             nonzero_ids = np.where(pitch != 0)[0] # 获取pitch中不为值不为0的索引
             # interp1d()
             # bounds_error = False, 超界的值由fill_value指定。
             interp_fn = interp1d(
                 nonzero_ids,
                 pitch[nonzero_ids],
                 fill_value=(pitch[nonzero_ids[0]], pitch[nonzero_ids[-1]]),
                 bounds_error=False,
             )
             pitch = interp_fn(np.arange(0, len(pitch))) # 插值后,pitch中为0的部分通过插值得到了补充
 
             # Phoneme-level average
             pos = 0
             for i, d in enumerate(duration):
                 if d > 0:
                     pitch[i] = np.mean(pitch[pos : pos + d])
                 else:
                     pitch[i] = 0
                 pos += d
             pitch = pitch[: len(duration)]
 
         if self.energy_phoneme_averaging:
             # Phoneme-level average
             pos = 0
             for i, d in enumerate(duration):
                 if d > 0:
                     energy[i] = np.mean(energy[pos : pos + d])
                 else:
                     energy[i] = 0
                 pos += d
             energy = energy[: len(duration)]
 
         # Save files
         # ./preprocessed_data/LibriTTS/
         dur_filename = "{}-duration-{}.npy".format(speaker, basename)
         np.save(os.path.join(self.out_dir, "duration", dur_filename), duration) # 保存时序时间
 
         pitch_filename = "{}-pitch-{}.npy".format(speaker, basename)
         np.save(os.path.join(self.out_dir, "pitch", pitch_filename), pitch) # 保存pitch
 
         energy_filename = "{}-energy-{}.npy".format(speaker, basename)
         np.save(os.path.join(self.out_dir, "energy", energy_filename), energy) # 保存energy
 
         mel_filename = "{}-mel-{}.npy".format(speaker, basename)
         np.save(os.path.join(self.out_dir, "mel", mel_filename),mel_spectrogram.T)
 
         return (
             "|".join([basename, speaker, text, raw_text]), # 存储文本形式的数据,字符串
             self.remove_outlier(pitch), # 去除离群值的pitch序列
             self.remove_outlier(energy), # 去除离群值的energy序列
             mel_spectrogram.shape[1], # 记录mel谱图序列帧数
         )
         
     # 提取对齐信息
     def get_alignment(self, tier):
         sil_phones = ["sil", "sp", "spn"]
         # tier中存储的主要内容就是音频的持续时间,以及文中中每个音素对应的持续时间信息
         phones = [] # 音素
         durations = [] # 持续时间
         start_time = 0 # 开始时间
         end_time = 0 # 结束时间
         end_idx = 0
         # t的类型是Interval(0.0, 0.04, "P"),第一个开始时间,第二个是结束时间,第三个即为该段对应的文本,这里是音素
         for t in tier._objects:
             s, e, p = t.start_time, t.end_time, t.text
 
             # Trim leading silences
             # 对于句子开头的sil phones
             if phones == []:
                 if p in sil_phones:
                     continue
                 else:
                     start_time = s
 
             if p not in sil_phones:
                 # For ordinary phones
                 phones.append(p)
                 end_time = e
                 end_idx = len(phones) # 记录已记录的音素的个数
             else:
                 # 对于句子中的sil phones
                 # For silent phones
                 phones.append(p)
 
             # np.round()返回浮点数的四舍五入值
             # e = end_time
             # 记录持续时间,将时间单位秒转换为mel帧数
             durations.append(
                 int(
                     np.round(e * self.sampling_rate / self.hop_length)
                     - np.round(s * self.sampling_rate / self.hop_length)
                 )
             )
 
         # Trim tailing silences
         phones = phones[:end_idx]
         durations = durations[:end_idx]
 
         return phones, durations, start_time, end_time
     
     # 删除离群值,使用箱型图的逻辑
     def remove_outlier(self, values):
         values = np.array(values)
         # 计算分位数值。
         p25 = np.percentile(values, 25)
         p75 = np.percentile(values, 75)
         lower = p25 - 1.5 * (p75 - p25)
         upper = p75 + 1.5 * (p75 - p25)
 
         normal_indices = np.logical_and(values > lower, values < upper)
 
         return values[normal_indices]
 
     # ./preprocessed_data/LibriTTS/pitch or energy
     def normalize(self, in_dir, mean, std):
         max_value = np.finfo(np.float64).min
         min_value = np.finfo(np.float64).max
         for filename in os.listdir(in_dir):
             filename = os.path.join(in_dir, filename)
             # normalize
             values = (np.load(filename) - mean) / std
             np.save(filename, values)
 
             max_value = max(max_value, max(values))
             min_value = min(min_value, min(values))
 
         return min_value, max_value