在进行数据处理前,先将LJSpeech数据集下载至本地,在FastSpeech2论文中使用强制对齐工具MFA从文本和音频中提取对齐信息,代码解析时使用的是作者提供的已经提取好的对齐信息文件,感兴趣的读者也可以自行下载、安装MFA提取对齐信息。根据仓库作者提供的链接下载的每一个*.TextGrid文件与一个音频对应,其中记录了word_level和phone_level两个级别的文本、对应持续时间(单位为秒)等信息,具体格式如下图所示,主要区别就是phone_level比word_level经精细,颗粒度更小。
1.TextGrid文件(MFA对齐文件)详解
2.prepare_align.py
该文件就是相当于一个接口,针对不同的数据集调用对应的文件函数进行数据准备,主要就是调用数据集对应的prepare_align函数处理数据
import argparse
import yaml
from preprocessor import ljspeech, aishell3, libritts
def main(config):
if "LJSpeech" in config["dataset"]:
ljspeech.prepare_align(config)
if "AISHELL3" in config["dataset"]:
aishell3.prepare_align(config)
if "LibriTTS" in config["dataset"]:
libritts.prepare_align(config)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("config", type=str, help="path to preprocess.yaml") # 加载对应的yaml文件,便于后面添加相应参数
args = parser.parse_args()
config = yaml.load(open(args.config, "r"), Loader=yaml.FullLoader)
main(config)
3.LibriTTS.py
虽然该文件只定义了prepare_align函数,但是该函数也只是简单的将LJSpeech数据集中的音频数据和文本数据进行了处理并保存,并没有提取对齐信息。
import os
import librosa
import numpy as np
from scipy.io import wavfile
from tqdm import tqdm
from text import _clean_text
def prepare_align(config):
in_dir = config["path"]["corpus_path"] # "/home/ming/Data/LibriTTS/train-clean-360"
out_dir = config["path"]["raw_path"] # "./raw_data/LibriTTS"
sampling_rate = config["preprocessing"]["audio"]["sampling_rate"] # 22050
max_wav_value = config["preprocessing"]["audio"]["max_wav_value"] # 32768
cleaners = config["preprocessing"]["text"]["text_cleaners"] # english_cleaners
# os.listdir() 返回指定目录下的所有文件名和目录名
# 文件名为speaker
for speaker in tqdm(os.listdir(in_dir)):
# os.path.join()将in_dir与speaker连接起来,返回目录下的所有文件名和目录名
# in_dir/speaker/chapter
for chapter in os.listdir(os.path.join(in_dir, speaker)):
# 返回in_dir/speaker/chapter目录下的所有文件
# 该目录下的文件包括三种:.normalized.txt .wav .original.txt
# file_name文件名
for file_name in os.listdir(os.path.join(in_dir, speaker, chapter)):
if file_name[-4:] != ".wav":
continue
# 100_121669_000001_000000.normalized.txt
# 100_121669_000001_000000.original.txt
# 100_121669_000001_000000.wav
# 不是wav文件就跳过,取wav文件的文件名,不取后缀
base_name = file_name[:-4]
# .normalized.txt文件中存着一句英语句子,如Tom, the Piper's Son
text_path = os.path.join(
in_dir, speaker, chapter, "{}.normalized.txt".format(base_name)
)
wav_path = os.path.join(
in_dir, speaker, chapter, "{}.wav".format(base_name)
)
# 读取文本内容,如text=Tom, the Piper's Son
with open(text_path) as f:
text = f.readline().strip("\n")
# ######## ①
# 乱码处理、大小写处理、缩写展开、空格处理、数字处理
text = _clean_text(text, cleaners)
# 创建文件夹out_dir/speaker,且目录存在不会触发目录存在异常
os.makedirs(os.path.join(out_dir, speaker), exist_ok=True)
# librosa音频信号处理库函数
# load 从文件加载音频数据,而且可以通过参数设置是否保留双声道,采样率,重采样类型
# 返回类型wav为numpy.ndarray _为sampling_rate
wav, _ = librosa.load(wav_path, sampling_rate)
# wav = wav / (max(|wav|) * 32768)
# 归一化,好处1,消除奇异样本数据的影响,好处2,cond
wav = wav / max(abs(wav)) * max_wav_value # 32768.0 ???
# 将numpy格式的wav写入到指定文件中,out_dir/speaker/{base_name}.wav,sr,数值类型转换
wavfile.write(
os.path.join(out_dir, speaker, "{}.wav".format(base_name)),
sampling_rate,
# 设置改类型是由ndarray中的数值大小范围决定的,int16:-32768~32768
wav.astype(np.int16),
)
# 打开out_dir/speaker/{base_name}.lab,
# 将从{base_name}.normalized.txt文件中读取出来,然后经过处理的text写入到{base_name}.lab文件中
with open(
os.path.join(out_dir, speaker, "{}.lab".format(base_name)),
"w",
) as f1:
f1.write(text)
4.preprocessor.py
该文件中才是从下载的TextGrid文件中提取每条音频对应的duration、pitch和energy信息;其中的config是通过config/LibriTTS/preprocess.yaml文件加载而来。
import os
import random
import json
import tgt
import librosa
import numpy as np
import pyworld as pw
from scipy.interpolate import interp1d
from tqdm import tqdm
import audio as Audio
# 定义处理所有数据的处理类
class Preprocessor:
def __init__(self, config):
self.config = config
# 原始数据的存放路径"./raw_data/LibriTTS"
self.in_dir = config["path"]["raw_path"]
# 数据处理后的保存路径./preprocessed_data/LibriTTS"
self.out_dir = config["path"]["preprocessed_path"]
self.val_size = config["preprocessing"]["val_size"] # 512
self.sampling_rate = config["preprocessing"]["audio"]["sampling_rate"] # 22050
self.hop_length = config["preprocessing"]["stft"]["hop_length"] # 256
# assert condition==False,raise AssertionError()
# pitch.feature == phoneme_level或者frame_level时继续运行,否则抛出异常
assert config["preprocessing"]["pitch"]["feature"] in [
"phoneme_level",
"frame_level",
]
# energy.feature == phoneme_level或者frame_level时继续运行,否则抛出异常
assert config["preprocessing"]["energy"]["feature"] in [
"phoneme_level",
"frame_level",
]
# 是否进行pitch_phoneme_averaging
self.pitch_phoneme_averaging = (
config["preprocessing"]["pitch"]["feature"] == "phoneme_level"
)
# 是否进行energy_phoneme_averaging
self.energy_phoneme_averaging = (
config["preprocessing"]["energy"]["feature"] == "phoneme_level"
)
# 是否进行正则化 is True
self.pitch_normalization = config["preprocessing"]["pitch"]["normalization"]
self.energy_normalization = config["preprocessing"]["energy"]["normalization"]
# 初始化STFT模块
self.STFT = Audio.stft.TacotronSTFT(
config["preprocessing"]["stft"]["filter_length"], # 1024
config["preprocessing"]["stft"]["hop_length"], # 256
config["preprocessing"]["stft"]["win_length"], # 1024
config["preprocessing"]["mel"]["n_mel_channels"], # 80
config["preprocessing"]["audio"]["sampling_rate"], # 22050
config["preprocessing"]["mel"]["mel_fmin"], # 0
config["preprocessing"]["mel"]["mel_fmax"], # 8000
)
# 提取所需要的数据
def build_from_path(self):
# out_dir:"./preprocessed_data/LibriTTS"
# os.makedirs() 创建文件夹,exist_ok = True, 存在的话不会抛出异常
os.makedirs((os.path.join(self.out_dir, "mel")), exist_ok=True)
os.makedirs((os.path.join(self.out_dir, "pitch")), exist_ok=True)
os.makedirs((os.path.join(self.out_dir, "energy")), exist_ok=True)
os.makedirs((os.path.join(self.out_dir, "duration")), exist_ok=True)
print("Processing Data ...")
out = list()
n_frames = 0
pitch_scaler = StandardScaler()
energy_scaler = StandardScaler()
# Compute pitch, energy, duration, and mel-spectrogram
speakers = {}
# in_dir "./raw_data/LibriTTS" 下的所有文件都是说话人编号
# tqdm 添加一个进度条
for i, speaker in enumerate(tqdm(os.listdir(self.in_dir))):
speakers[speaker] = i
# in_dir/speaker 目录下的文件包括两种类型:{base_name}.lab , {base_name}.wav
for wav_name in os.listdir(os.path.join(self.in_dir, speaker)):
if ".wav" not in wav_name:
continue
# 基于音频文件的basename构建对应的对齐文件路径名
basename = wav_name.split(".")[0]
# out_dir/TextGrid/speaker/{base_name}.TextGrid
# tg_path 为某个句子的TextGrid文件路径
tg_path = os.path.join(
self.out_dir, "TextGrid", speaker, "{}.TextGrid".format(basename)
)
if os.path.exists(tg_path):
print(tg_path)
# process_utterance() return
# "|".join([basename, speaker, text(音素串), raw_text(音素串对应的文本)]),
# self.remove_outlier(pitch),
# self.remove_outlier(energy),
# mel_spectrogram.shape[1],
ret = self.process_utterance(speaker, basename) # 提取单个音频的mel、pitch、energy数据
if ret is None:
continue
else:
info, pitch, energy, n = ret # n是mel谱图序列的总帧数
out.append(info) # 记录info中文本相关的数据,是一个用“|”分割的字符串
if len(pitch) > 0:
# reshape(-1,1)转换成一列
pitch_scaler.partial_fit(pitch.reshape((-1, 1)))
if len(energy) > 0:
energy_scaler.partial_fit(energy.reshape((-1, 1)))
n_frames += n
print("Computing statistic quantities ...")
# Perform normalization if necessary
if self.pitch_normalization:
pitch_mean = pitch_scaler.mean_[0]
pitch_std = pitch_scaler.scale_[0]
else:
# A numerical trick to avoid normalization...
pitch_mean = 0
pitch_std = 1
if self.energy_normalization:
energy_mean = energy_scaler.mean_[0]
energy_std = energy_scaler.scale_[0]
else:
energy_mean = 0
energy_std = 1
# ./preprocessed_data/LibriTTS/pitch
# normalize() 进行归一化并且将归一化数据保存,返回最大值和最小值
pitch_min, pitch_max = self.normalize(
os.path.join(self.out_dir, "pitch"), pitch_mean, pitch_std
)
energy_min, energy_max = self.normalize(
os.path.join(self.out_dir, "energy"), energy_mean, energy_std
)
# Save files
# json.dump() 将一个python数据结构转为json格式
with open(os.path.join(self.out_dir, "speakers.json"), "w") as f:
f.write(json.dumps(speakers))
with open(os.path.join(self.out_dir, "stats.json"), "w") as f:
stats = {
"pitch": [
float(pitch_min),
float(pitch_max),
float(pitch_mean),
float(pitch_std),
],
"energy": [
float(energy_min),
float(energy_max),
float(energy_mean),
float(energy_std),
],
}
f.write(json.dumps(stats))
print(
"Total time: {} hours".format(
n_frames * self.hop_length / self.sampling_rate / 3600
)
)
random.shuffle(out)
out = [r for r in out if r is not None]
# Write metadata 划分训练集文本数据和验证集文本数据
# val_size = 512
with open(os.path.join(self.out_dir, "train.txt"), "w", encoding="utf-8") as f:
for m in out[self.val_size :]:
f.write(m + "\n")
with open(os.path.join(self.out_dir, "val.txt"), "w", encoding="utf-8") as f:
for m in out[: self.val_size]:
f.write(m + "\n")
return out
# 基于文件路径提取音频文件的mel、pitch、energy、duration数据
def process_utterance(self, speaker, basename):
# ./raw_data/LibriTTS/speaker/{basename}.wav
# ./raw_data/LibriTTS/speaker/{basename}.lab
# lab 文件存储音频对印的文本
wav_path = os.path.join(self.in_dir, speaker, "{}.wav".format(basename))
text_path = os.path.join(self.in_dir, speaker, "{}.lab".format(basename))
# tg_path = ./preprocessed_data/LibriTTS/TextGrid/speaker/{base_name}.TextGrid
tg_path = os.path.join(
self.out_dir, "TextGrid", speaker, "{}.TextGrid".format(basename)
)
# Get alignments
# 读取textgrid标注文件
textgrid = tgt.io.read_textgrid(tg_path)
# 获取文本的对应音素phone[],duration[],start,end
# 数据提取。
# phone中是textgrid对象中文本转为音素的列表,
# duration中为音素列表中每个元素对应的mel帧数,即每个音素的持续时间,
# start为音频开始时间,end为结束时间
phone, duration, start, end = self.get_alignment(
textgrid.get_tier_by_name("phones")
)
# text = {phone1 phone2 ... phone(n)}
# 文本信息拼接成字符串方便存储
text = "{" + " ".join(phone) + "}"
if start >= end:
return None
# Read and trim wav files
# 读取到该音素集合对应的音频片段
wav, _ = librosa.load(wav_path)
wav = wav[
int(self.sampling_rate * start) : int(self.sampling_rate * end)
].astype(np.float32)
# Read raw text
# 读取到该音素集合对应的文本内容
with open(text_path, "r") as f:
raw_text = f.readline().strip("\n")
# Compute fundamental frequency
# 提取基频F0
pitch, t = pw.dio(
wav.astype(np.float64),
self.sampling_rate,
frame_period=self.hop_length / self.sampling_rate * 1000,
)
pitch = pw.stonemask(wav.astype(np.float64), pitch, t, self.sampling_rate)
pitch = pitch[: sum(duration)] # 与总的mel谱图帧数对齐
if np.sum(pitch != 0) <= 1:
return None
# 计算音素对应的mel——specttogram和能量
# Compute mel-scale spectrogram and energy
mel_spectrogram, energy = Audio.tools.get_mel_from_wav(wav, self.STFT) # 计算mel谱图
mel_spectrogram = mel_spectrogram[:, : sum(duration)]
energy = energy[: sum(duration)]
if self.pitch_phoneme_averaging:
# phoneme_level
# perform linear interpolation,线性插值,就是将pitch序列中为0的值赋值一个合理的数值
nonzero_ids = np.where(pitch != 0)[0] # 获取pitch中不为值不为0的索引
# interp1d()
# bounds_error = False, 超界的值由fill_value指定。
interp_fn = interp1d(
nonzero_ids,
pitch[nonzero_ids],
fill_value=(pitch[nonzero_ids[0]], pitch[nonzero_ids[-1]]),
bounds_error=False,
)
pitch = interp_fn(np.arange(0, len(pitch))) # 插值后,pitch中为0的部分通过插值得到了补充
# Phoneme-level average
pos = 0
for i, d in enumerate(duration):
if d > 0:
pitch[i] = np.mean(pitch[pos : pos + d])
else:
pitch[i] = 0
pos += d
pitch = pitch[: len(duration)]
if self.energy_phoneme_averaging:
# Phoneme-level average
pos = 0
for i, d in enumerate(duration):
if d > 0:
energy[i] = np.mean(energy[pos : pos + d])
else:
energy[i] = 0
pos += d
energy = energy[: len(duration)]
# Save files
# ./preprocessed_data/LibriTTS/
dur_filename = "{}-duration-{}.npy".format(speaker, basename)
np.save(os.path.join(self.out_dir, "duration", dur_filename), duration) # 保存时序时间
pitch_filename = "{}-pitch-{}.npy".format(speaker, basename)
np.save(os.path.join(self.out_dir, "pitch", pitch_filename), pitch) # 保存pitch
energy_filename = "{}-energy-{}.npy".format(speaker, basename)
np.save(os.path.join(self.out_dir, "energy", energy_filename), energy) # 保存energy
mel_filename = "{}-mel-{}.npy".format(speaker, basename)
np.save(os.path.join(self.out_dir, "mel", mel_filename),mel_spectrogram.T)
return (
"|".join([basename, speaker, text, raw_text]), # 存储文本形式的数据,字符串
self.remove_outlier(pitch), # 去除离群值的pitch序列
self.remove_outlier(energy), # 去除离群值的energy序列
mel_spectrogram.shape[1], # 记录mel谱图序列帧数
)
# 提取对齐信息
def get_alignment(self, tier):
sil_phones = ["sil", "sp", "spn"]
# tier中存储的主要内容就是音频的持续时间,以及文中中每个音素对应的持续时间信息
phones = [] # 音素
durations = [] # 持续时间
start_time = 0 # 开始时间
end_time = 0 # 结束时间
end_idx = 0
# t的类型是Interval(0.0, 0.04, "P"),第一个开始时间,第二个是结束时间,第三个即为该段对应的文本,这里是音素
for t in tier._objects:
s, e, p = t.start_time, t.end_time, t.text
# Trim leading silences
# 对于句子开头的sil phones
if phones == []:
if p in sil_phones:
continue
else:
start_time = s
if p not in sil_phones:
# For ordinary phones
phones.append(p)
end_time = e
end_idx = len(phones) # 记录已记录的音素的个数
else:
# 对于句子中的sil phones
# For silent phones
phones.append(p)
# np.round()返回浮点数的四舍五入值
# e = end_time
# 记录持续时间,将时间单位秒转换为mel帧数
durations.append(
int(
np.round(e * self.sampling_rate / self.hop_length)
- np.round(s * self.sampling_rate / self.hop_length)
)
)
# Trim tailing silences
phones = phones[:end_idx]
durations = durations[:end_idx]
return phones, durations, start_time, end_time
# 删除离群值,使用箱型图的逻辑
def remove_outlier(self, values):
values = np.array(values)
# 计算分位数值。
p25 = np.percentile(values, 25)
p75 = np.percentile(values, 75)
lower = p25 - 1.5 * (p75 - p25)
upper = p75 + 1.5 * (p75 - p25)
normal_indices = np.logical_and(values > lower, values < upper)
return values[normal_indices]
# ./preprocessed_data/LibriTTS/pitch or energy
def normalize(self, in_dir, mean, std):
max_value = np.finfo(np.float64).min
min_value = np.finfo(np.float64).max
for filename in os.listdir(in_dir):
filename = os.path.join(in_dir, filename)
# normalize
values = (np.load(filename) - mean) / std
np.save(filename, values)
max_value = max(max_value, max(values))
min_value = min(min_value, min(values))
return min_value, max_value