Bilstm中文微博多情感分析
数据
我的数据是来自github的一个项目:ChineseNlpCorpus 里面收集了蛮多用于自然语言处理的中文数据集/语料。
下载地址: 百度网盘
数据概览: 36 万多条,带情感标注 新浪微博,包含 4 种情感,其中喜悦约 20 万条,愤怒、厌恶、低落各约 5 万条
数据来源: 新浪微博
原数据集: 微博情感分析数据集,网上搜集,具体作者、来源不详
预处理
划分训练集和测试集
将表中的各类情感的数据各抽取前10000条做测试集,剩余的用作训练集
import pandas as pd
import openpyxl
from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE
def test():
file='simplifyweibo_4_moods.csv'
data=pd.read_csv(file)
label=data.get('label')
review = data.get('review')
train_review=[]
train_label=[]
test_review=[]
test_label=[]
n1=n2=n3=n4=0
for i in range(len(review)):
lab=int(label[i])
line=str(review[i])
line=ILLEGAL_CHARACTERS_RE.sub(r'', line)
if int(lab)==0:
if n1<10000:
n1+=1
test_label.append(lab)
test_review.append(line)
else:
train_label.append(lab)
train_review.append(line)
elif int(lab)==1:
if n2<10000:
n2+=1
test_label.append(lab)
test_review.append(line)
else:
train_label.append(lab)
train_review.append(line)
elif int(lab)==2:
if n3<10000:
n3+=1
test_label.append(lab)
test_review.append(line)
else:
train_label.append(lab)
train_review.append(line)
elif int(lab)==3:
if n4<10000:
n4+=1
test_label.append(lab)
test_review.append(line)
else:
train_label.append(lab)
train_review.append(line)
import openpyxl
import xlsxwriter
xl = openpyxl.Workbook()
# 调用对象的add_sheet方法
sheet1 = xl.create_sheet(index=0)
sheet1.cell(1, 1, "label")
sheet1.cell(1, 2, "review")
for i in range(0, len(train_review)):
sheet1.cell(i + 2, 1, train_label[i])
sheet1.cell(i + 2, 2, train_review[i])
xl.save("train.xlsx")
if __name__ == '__main__':
test()
得到对应的文件
生成字典
数据中的一些标点符号、特殊符号、英文字母、数字等对于我们的实验都是没有用处的,所以我们需要将他们过滤掉。
去除停用词
def tokenlize(sentence):
"""
进行文本分词
:param sentence: str
:return: [str,str,str]
"""
fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
'\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '”',
'“', ]
sentence = re.sub("|".join(fileters), "", sentence)
sentence=jieba.cut(sentence,cut_all=False)
sentence=' '.join(sentence)
result = [i for i in sentence.split(" ") if len(i) > 0]
result=movestopwords(result)
return result
def stopwordslist(filepath):
stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()]
return stopwords
# 对句子去除停用词
def movestopwords(sentence):
stopwords = stopwordslist('data/stopwords.txt') # 这里加载停用词的路径
outstr = []
for word in sentence:
if word not in stopwords:
if word != '\t' and '\n':
outstr.append(word)
# outstr += " "
return outstr
设计字典类
"""
文本序列化
"""
class Vocab:
UNK_TAG = "<UNK>" # 表示未知字符
PAD_TAG = "<PAD>" # 填充符
PAD = 0
UNK = 1
def __init__(self):
self.dict = { # 保存词语和对应的数字
self.UNK_TAG: self.UNK,
self.PAD_TAG: self.PAD
}
self.count = {} # 统计词频的
def fit(self, sentence):
"""
接受句子,统计词频
:param sentence:[str,str,str]
:return:None
"""
for word in sentence:
self.count[word] = self.count.get(word, 0) + 1 # 所有的句子fit之后,self.count就有了所有词语的词频
def build_vocab(self, min_count=1, max_count=None, max_features=None):
"""
根据条件构造 词典
:param min_count:最小词频
:param max_count: 最大词频
:param max_features: 最大词语数
:return:
"""
if min_count is not None:
self.count = {word: count for word, count in self.count.items() if count >= min_count}
if max_count is not None:
self.count = {word: count for word, count in self.count.items() if count <= max_count}
if max_features is not None:
# [(k,v),(k,v)....] --->{k:v,k:v}
self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_features])
for word in self.count:
self.dict[word] = len(self.dict) # 每次word对应一个数字
# 把dict进行翻转
self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))
def transform(self, sentence, max_len=None):
"""
把句子转化为数字序列
:param sentence:[str,str,str]
:return: [int,int,int]
"""
if len(sentence) > max_len:
sentence = sentence[:max_len]
else:
sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence)) # 填充PAD
return [self.dict.get(i, 1) for i in sentence]
def inverse_transform(self, incides):
"""
把数字序列转化为字符
:param incides: [int,int,int]
:return: [str,str,str]
"""
return [self.inverse_dict.get(i, "<UNK>") for i in incides]
def __len__(self):
return len(self.dict)
# # 以下是调试代码
# if __name__ == '__main__':
# sentences = [["今天", "天气", "很", "好"],
# ["今天", "去", "吃", "什么"]]
# ws = Vocab()
# for sentence in sentences:
# # 统计词频
# ws.fit(sentence)
# # 构造词典
# ws.build_vocab(min_count=1)
# print(ws.dict)
# # 把句子转换成数字序列
# ret = ws.transform(["好", "好", "好", "好", "好", "好", "好", "热", "呀"], max_len=13)
# print(ret)
# # 把数字序列转换成句子
# ret = ws.inverse_transform(ret)
# print(ret)
# pass
dataset
# -*-coding:utf-8-*-
import os
import pickle
import re
import zipfile
import jieba
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import pandas as pd
class ImdbDataset(Dataset):
def __init__(self, train=True):
if train == True:
url = 'data/train.xlsx'
else:
url = "data/test.xlsx"
data = pd.read_excel(url)
sentence = data.get('review')
label = data.get('label')
self.sentence_list=sentence
self.label_list=label
def __getitem__(self, idx):
line_text=self.sentence_list[idx]
# 从txt获取评论并分词
review = tokenlize(str(line_text))
# 获取评论对应的label
label = int(self.label_list[idx])
return review, label
def __len__(self):
return len(self.sentence_list)
def tokenlize(sentence):
"""
进行文本分词
:param sentence: str
:return: [str,str,str]
"""
fileters = ['!', '"', '#', '$', '%', '&', '\(', '\)', '\*', '\+', ',', '-', '\.', '/', ':', ';', '<', '=', '>',
'\?', '@', '\[', '\\', '\]', '^', '_', '`', '\{', '\|', '\}', '~', '\t', '\n', '\x97', '\x96', '”',
'“', ]
sentence = re.sub("|".join(fileters), "", sentence)
sentence=jieba.cut(sentence,cut_all=False)
sentence=' '.join(sentence)
result = [i for i in sentence.split(" ") if len(i) > 0]
result=movestopwords(result)
return result
def stopwordslist(filepath):
stopwords = [line.strip() for line in open(filepath, 'r', encoding='utf-8').readlines()]
return stopwords
# 对句子去除停用词
def movestopwords(sentence):
stopwords = stopwordslist('data/stopwords.txt') # 这里加载停用词的路径
outstr = []
for word in sentence:
if word not in stopwords:
if word != '\t' and '\n':
outstr.append(word)
# outstr += " "
return outstr
# 以下为调试代码
def collate_fn(batch):
"""
对batch数据进行处理
:param batch: [一个getitem的结果,getitem的结果,getitem的结果]
:return: 元组
"""
reviews, labels = zip(*batch)
return reviews, labels
if __name__ == "__main__":
from 情感分析.imdb_sentiment.vocab import Vocab
imdb_dataset = ImdbDataset(True)
my_dataloader = DataLoader(imdb_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
for review,label in my_dataloader:
vocab_model = pickle.load(open("./models/vocab.pkl", "rb"))
print(review[0])
result = vocab_model.transform(review[0], 30)
print(result)
break
构建字典
# -*-coding:utf-8-*-
import pickle
from tqdm import tqdm
from 情感分析.weibo_many_emotion import dataset
# from 情感分析.imdb_sentiment.vocab import Vocab
from torch.utils.data import DataLoader
class Vocab:
UNK_TAG = "<UNK>" # 表示未知字符
PAD_TAG = "<PAD>" # 填充符
PAD = 0
UNK = 1
def __init__(self):
self.dict = { # 保存词语和对应的数字
self.UNK_TAG: self.UNK,
self.PAD_TAG: self.PAD
}
self.count = {} # 统计词频的
def fit(self, sentence):
"""
接受句子,统计词频
:param sentence:[str,str,str]
:return:None
"""
for word in sentence:
self.count[word] = self.count.get(word, 0) + 1 # 所有的句子fit之后,self.count就有了所有词语的词频
def build_vocab(self, min_count=1, max_count=None, max_features=None):
"""
根据条件构造 词典
:param min_count:最小词频
:param max_count: 最大词频
:param max_features: 最大词语数
:return:
"""
if min_count is not None:
self.count = {word: count for word, count in self.count.items() if count >= min_count}
if max_count is not None:
self.count = {word: count for word, count in self.count.items() if count <= max_count}
if max_features is not None:
# [(k,v),(k,v)....] --->{k:v,k:v}
self.count = dict(sorted(self.count.items(), lambda x: x[-1], reverse=True)[:max_features])
for word in self.count:
self.dict[word] = len(self.dict) # 每次word对应一个数字
# 把dict进行翻转
self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))
def transform(self, sentence, max_len=None):
"""
把句子转化为数字序列
:param sentence:[str,str,str]
:return: [int,int,int]
"""
if len(sentence) > max_len:
sentence = sentence[:max_len]
else:
sentence = sentence + [self.PAD_TAG] * (max_len - len(sentence)) # 填充PAD
return [self.dict.get(i, 1) for i in sentence]
def inverse_transform(self, incides):
"""
把数字序列转化为字符
:param incides: [int,int,int]
:return: [str,str,str]
"""
return [self.inverse_dict.get(i, "<UNK>") for i in incides]
def __len__(self):
return len(self.dict)
def collate_fn(batch):
"""
对batch数据进行处理
:param batch: [一个getitem的结果,getitem的结果,getitem的结果]
:return: 元组
"""
reviews, labels = zip(*batch)
return reviews, labels
def get_dataloader(train=True):
imdb_dataset = dataset.ImdbDataset(train)
my_dataloader = DataLoader(imdb_dataset, batch_size=200, shuffle=True, collate_fn=collate_fn)
return my_dataloader
if __name__ == '__main__':
ws = Vocab()
dl_train = get_dataloader(True)
dl_test = get_dataloader(False)
for reviews, label in tqdm(dl_train, total=len(dl_train)):
for sentence in reviews:
ws.fit(sentence)
for reviews, label in tqdm(dl_test, total=len(dl_test)):
for sentence in reviews:
ws.fit(sentence)
ws.build_vocab()
print(len(ws))
pickle.dump(ws, open("./models/vocab.pkl", "wb"))
模型训练
# -*-coding:utf-8-*-
import pickle
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.utils.data import DataLoader
from tqdm import tqdm
from 情感分析.weibo_many_emotion import dataset
from 情感分析.中文情感分类.vocab import Vocab
train_batch_size = 512
test_batch_size = 128
voc_model = pickle.load(open("./models/vocab.pkl", "rb"))
sequence_max_len = 100
def collate_fn(batch):
"""
对batch数据进行处理
:param batch: [一个getitem的结果,getitem的结果,getitem的结果]
:return: 元组
"""
reviews, labels = zip(*batch)
reviews = torch.LongTensor([voc_model.transform(i, max_len=sequence_max_len) for i in reviews])
labels = torch.LongTensor(labels)
return reviews, labels
def get_dataloader(train=True):
imdb_dataset = dataset.ImdbDataset(train)
batch_size = train_batch_size if train else test_batch_size
return DataLoader(imdb_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
class ImdbModel(nn.Module):
def __init__(self):
super(ImdbModel, self).__init__()
self.embedding = nn.Embedding(num_embeddings=len(voc_model), embedding_dim=200, padding_idx=voc_model.PAD).to()
self.lstm = nn.LSTM(input_size=200, hidden_size=64, num_layers=2, batch_first=True, bidirectional=True,
dropout=0.1)
self.fc1 = nn.Linear(64 * 2, 64)
self.fc2 = nn.Linear(64, 4)
def forward(self, input):
"""
:param input:[batch_size,max_len]
:return:
"""
input_embeded = self.embedding(input) # input embeded :[batch_size,max_len,200]
output, (h_n, c_n) = self.lstm(input_embeded) # h_n :[4,batch_size,hidden_size]
# out :[batch_size,hidden_size*2]
out = torch.cat([h_n[-1, :, :], h_n[-2, :, :]], dim=-1) # 拼接正向最后一个输出和反向最后一个输出
# 进行全连接
out_fc1 = self.fc1(out)
# 进行relu
out_fc1_relu = F.relu(out_fc1)
# 全连接
out_fc2 = self.fc2(out_fc1_relu) # out :[batch_size,2]
return F.log_softmax(out_fc2, dim=-1)
def device():
if torch.cuda.is_available():
return torch.device('cuda')
else:
return torch.device('cpu')
def train(imdb_model, epoch):
"""
:param imdb_model:
:param epoch:
:return:
"""
train_dataloader = get_dataloader(train=True)
optimizer = Adam(imdb_model.parameters())
for i in range(epoch):
bar = tqdm(train_dataloader, total=len(train_dataloader))
for idx, (data, target) in enumerate(bar):
optimizer.zero_grad()
data = data.to(device())
target = target.to(device())
output = imdb_model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
bar.set_description("epcoh:{} idx:{} loss:{:.6f}".format(i, idx, loss.item()))
torch.save(imdb_model, 'lstm_model.pkl')
def test(imdb_model):
"""
验证模型
:param imdb_model:
:return:
"""
test_loss = 0
correct = 0
imdb_model.eval()
test_dataloader = get_dataloader(train=False)
with torch.no_grad():
for data, target in tqdm(test_dataloader):
data = data.to(device())
target = target.to(device())
output = imdb_model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item()
pred = output.data.max(1, keepdim=True)[1] # 获取最大值的位置,[batch_size,1]
correct += pred.eq(target.data.view_as(pred)).sum()
test_loss /= len(test_dataloader.dataset)
print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format(
test_loss, correct, len(test_dataloader.dataset),
100. * correct / len(test_dataloader.dataset)))
def xlftest():
import numpy as np
model = torch.load('lstm_model.pkl')
model.to(device())
from 情感分析.weibo_many_emotion.dataset import tokenlize
lines=['哈哈哈开心','真是无语,你们怎么搞的','小姐姐,祝你生日快乐','你他妈的有病']
for line in lines:
print(line)
review = tokenlize(line)
# review=tokenlize(line)
vocab_model = pickle.load(open("./models/vocab.pkl", "rb"))
result = vocab_model.transform(review,sequence_max_len)
# print(result)
data = torch.LongTensor(result).to(device())
data=torch.reshape(data,(1,sequence_max_len)).to(device())
# print(data.shape)
output = model(data)
print(output.data)
pred = output.data.max(1, keepdim=True)[1] # 获取最大值的位置,[batch_size,1]
print(pred.item())
if pred.item() == 0:
print("喜悦")
elif pred.item() == 1:
print("愤怒")
elif pred.item() == 2:
print("厌恶")
elif pred.item() == 3:
print("低落")
if __name__ == '__main__':
# imdb_model = ImdbModel().to(device())
# train(imdb_model,20)
# test(imdb_model)
xlftest()
测试结果:
对四种分类的准确度只有40左右,一部分原因是数据集不规范,还需要进行调参优化