天天看點

pytorch 模型開發模闆常用的庫導入資料讀取模闆生成csv txt 資料模闆 資料類加載模闆模型搭模組化闆

常用的庫導入

import pandas as pd
from collections import Counter
import torch
from torch import nn
from torch import optim
# import transformers as tfs
import math
import tensorflow as tf
import keras
import numpy as np
import pandas as pd
from sklearn.metrics import f1_score
import warnings
import re
import jieba
warnings.filterwarnings('ignore')
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from collections import Counter
import matplotlib.pyplot as plt
plt.rcParams["font.sans-serif"] = ['Simhei']
plt.rcParams["axes.unicode_minus"] = False
import re
from pylab import *
import numpy as np
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn

SEED = 1210
def seed_everything(seed):
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
seed_everything(SEED) #設定幾乎所有的随機種子 随機種子,可使得結果可複現

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
           

資料讀取模闆

讀取csv
data=pd.read_csv('2019-軟體工程#程式設計基礎(2-1)-字段說明.csv',encoding='utf-8')
# ,sep=' ',delimiter="\t"
data=pd.read_csv('nextlinks.txt',encoding='utf-8',sep=' ',delimiter="\t")
data.values

讀取txt
model=[]
with open('ranksvm/gnonormalizationmodel', 'r') as f:
    for line in f:
        model.append(line)
model=model[-1].split(" ")[1:-1]

資料篩選
one_student=data.loc[data["user_id"]==Student_number,:]

           

生成csv txt 資料模闆 

# 生成 txt
with open("ranksvm/gnonormalizationtrain.dat",'a+') as file:
    for idx,attrvec in enumerate(attrvec_list[0:34]): # 循環前30個每個圖
        file.write(str("#"+" "+"query"+" "+str(idx)))# str(idx)是第幾個圖,等價于ranksvm 中的第幾個查詢
        file.write('\n')
        for iddx,feature in enumerate(attrvec): #循環每個圖上的向量。然後根據這個節點的順序去原始的标簽中把節點的标簽取出來
            label=G_sum_bq[idx][iddx] #  idx第幾個圖,iddx第幾個節點,把這個label取出來
#                 print(label)
#                 print(feature)
            file.write(str(label)+" "+"qid:"+str(idx)+" ")# str(idx)是第幾個圖,等價于ranksvm中的第幾個查詢
            for idddx in range(len(feature)): # 把每個特征按照這個格式  3 qid:1 1:1 2:1 3:0 4:0.2 5:0  寫入 
                if int(idddx)!=int(len(feature)-1):
                    file.write(str(idddx+1)+":"+str(feature[idddx])+" ")
                else:
                    file.write(str(idddx+1)+":"+str(feature[idddx])) # 最後一個特征寫入的時候是不需要進行加末尾的空格的,是以進行判斷,以免出錯。
            file.write('\n')
file.close()
生成csv
def visualize_model(model):
    daliebiao=[]
    model.eval()
    fig = plt.figure()
    with torch.no_grad():
        for i, inputs in tqdm(enumerate(dataloaders_query["query"])):
            pred_jpgname_label=[]
            inputs = inputs[0].to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            pred_jpgname_label.append(query_namejpg[i])
            pred_jpgname_label.append(preds.item()+201)
            daliebiao.append(pred_jpgname_label)
    return daliebiao
 
daliebiao=visualize_model(cnn_model)
columns=["filename","label"]
df=pd.DataFrame(data=daliebiao,columns=columns)
df.to_csv('submit_sample.csv')
           

資料類加載模闆

import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from PIL import Image
from torchvision import transforms
from sklearn.metrics import f1_score
import torch.optim as optim
from torch.utils.tensorboard.writer import SummaryWriter
from torch.utils.data import random_split
from sklearn.model_selection import train_test_split
 
class mydataset(Dataset):
        def __init__(self): # 讀取加載資料
            data=pd.read_csv("600519.csv",encoding='gbk')
            self._x=torch.tensor(np.array(data.values[0:20,4:-1]).astype(float))
            self._y=torch.tensor(np.array(data.values[0:20,-1]).astype(float))
            self._len=len(data.values[0:20])
            
        def __getitem__(self,item):
            return self._x[item],self._y[item]
        
        def __len__(self):# 傳回整個資料的長度 
            return self._len
        
data=mydataset()
 
# 劃分 訓練集 測試集 
torch.manual_seed(0)
train_data,test_data=random_split(data,[round(0.8*data._len),round(0.2*data._len)])#這個參數有的版本沒有 generator=torch.Generator().manual_seed(0)
#                     随機混亂順序劃分的     四舍五入
 
# 訓練 loader
train_loader =DataLoader(train_data, batch_size = 2, shuffle = True, num_workers = 0 , drop_last=False)
 
# 測試 loader
test_loader =DataLoader(test_data, batch_size = 2, shuffle = True, num_workers = 0 , drop_last=False)
# dorp_last 是說最後一組資料不足一個batch的時候 能繼續用還是舍棄。 # num_workers 多少個程序載入資料
 
# 訓練
for step,(train_x,train_y) in enumerate(train_loader):
    print(step,':',(train_x.shape,train_y.shape),(train_x,train_y))
print("------------------------------------------------------------------------------------------------------")
# 測試 
for step,(test_x,test_y) in enumerate(test_loader):
    print(step,':',(test_x.shape,test_y.shape),(test_x,test_y))
           

模型搭模組化闆

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class LSTM(nn.Module):  # 注意Module首字母需要大寫
    def __init__(self, ):
        super().__init__()
        input_size = 12
        hidden_size = 128
        output_size = 1
        # input_size:輸入lstm單元向量的長度 ,hidden_size輸出lstm單元向量的長度。也是輸入、輸出隐藏層向量的長度
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers=1)  # ,batch_first=True
        # --------------------------------------------------------------------------

        self.multihead_Linear_k = nn.Linear(hidden_size, hidden_size)
        self.multihead_Linear_q = nn.Linear(hidden_size, hidden_size)
        self.multihead_Linear_v = nn.Linear(hidden_size, hidden_size)
        self.multihead_attn = nn.MultiheadAttention(embed_dim=hidden_size, num_heads=8)
        # 是以模型次元 hidden_size 必須可以被頭部數量整除
        # --------------------------------------------------------------------------

        self.lstm_2 = nn.LSTM(hidden_size, hidden_size, num_layers=1)
        # --------------------------------------------------------------------------

        self.linear_1 = nn.Linear(hidden_size, 1)
        self.Sigmoid = nn.Sigmoid()
        self.linear_2 = nn.Linear(600, 1)

    def forward(self, x, batch_size):
        # print(x.shape)
        x = torch.Tensor(x.numpy()).to(device)
        x = x.transpose(0,1).to(device)

        # 輸入 lstm的矩陣形狀是:[序列長度,batch_size,每個向量的次元] [序列長度,batch, 64]
        lstm_out, h_n = self.lstm(x, None)
        #         print(lstm_out.shape) [序列長度,batch_size, 64]

        #         query,key,value的輸入形狀一定是 [sequence_size, batch_size, emb_size] 比如:value.shape torch.Size( [序列長度,batch_size, 64])
        query = self.multihead_Linear_q(lstm_out)
        key = self.multihead_Linear_k(lstm_out)
        value = self.multihead_Linear_v(lstm_out)

        # multihead_attention 輸入矩陣計算 :
        attn_output, attn_output_weights = self.multihead_attn(query, key, value)
        #   輸出 attn_output.shape torch.Size([序列長度,batch_size, 64])

        lstm_out_2, h_n_2 = self.lstm_2(attn_output, h_n)
        lstm_out_2 = lstm_out_2.transpose(0, 1)
        #         print("lstm_out_2.shape",lstm_out_2.shape)# lstm_out_2.shape torch.Size([20, 600, 128])
        # [序列長度,batch_size, 64]
        #         prediction=lstm_out_2[-1].to(device)
        #         print(prediction.shape)# torch.Size([batch_size, 64])

        # 兩個全連接配接+激活函數
        prediction = self.linear_1(lstm_out_2)
        prediction = prediction.squeeze(2)
        prediction = self.Sigmoid(prediction)
        prediction = self.linear_2(prediction)
        prediction = prediction.squeeze(1)
        #         print("prediction.shape",prediction.shape)
        #          torch.Size([batch_size,1])
        return prediction




# 這個函數是測試用來測試x_test y_test 資料 函數
def eval_test(model):  # 傳回的是這10個 測試資料的平均loss
    test_epoch_loss = []
    with torch.no_grad():
        optimizer.zero_grad()
        for step, (test_x, test_y) in enumerate(test_loader):
            y_pre = model(test_x, batch_size).to(device)
            test_y = test_y.to(device)

            test_loss = loss_function(y_pre, test_y)
            test_epoch_loss.append(test_loss.item())

    return np.mean(test_epoch_loss)


epochs = 200
batch_size =16
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True, num_workers=0, drop_last=False)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=0, drop_last=False)

model = LSTM().to(device)
loss_function = nn.MSELoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.00002)  # 建立優化器執行個體
print(model)

sum_train_epoch_loss = []  # 存儲每個epoch 下 訓練train資料的loss
sum_test_epoch_loss = []  # 存儲每個epoch 下 測試 test資料的loss
best_test_loss = 10000
for epoch in range(epochs):
    epoch_loss = []
    for step, (train_x, train_y) in enumerate(train_loader):
        #                 mask_length=calculation_mask_length(train_x)
        #                 mask_length=torch.tensor(mask_length).to(device)
        y_pred = model(train_x, batch_size).to(device)
        train_y = train_y.to(device)
        # 訓練過程中,正向傳播生成網絡的輸出,計算輸出和實際值之間的損失值
        single_loss = loss_function(y_pred, train_y)
        epoch_loss.append(single_loss.item())
        single_loss.backward()  # 調用backward()自動生成梯度
        optimizer.step()  # 使用optimizer.step()執行優化器,把梯度傳播回每個網絡
    train_epoch_loss = np.mean(epoch_loss)
    test_epoch_loss = eval_test(model)  # 測試資料的平均loss
    if test_epoch_loss<best_test_loss:
        best_test_loss=test_epoch_loss
        print("best_test_loss",best_test_loss)
        best_model=model
    sum_train_epoch_loss.append(train_epoch_loss)
    sum_test_epoch_loss.append(test_epoch_loss)
    print("epoch:" + str(epoch) + "  train_epoch_loss: " + str(train_epoch_loss) + "  test_epoch_loss: " + str(test_epoch_loss))

torch.save(best_model, 'best_model.pth')

# 畫圖
print(sum_train_epoch_loss)
print(sum_test_epoch_loss)
fig = plt.figure(facecolor='white', figsize=(10, 7))
plt.xlabel('第幾個epoch')
plt.ylabel('loss值')
plt.xlim(xmax=len(sum_train_epoch_loss), xmin=0)
plt.ylim(ymax=max(sum_train_epoch_loss), ymin=0)
# 畫兩條(0-9)的坐标軸并設定軸标簽x,y

x1 = [i for i in range(0, len(sum_train_epoch_loss), 1)]  # 随機産生300個平均值為2,方差為1.2的浮點數,即第一簇點的x軸坐标
y1 = sum_train_epoch_loss  # 随機産生300個平均值為2,方差為1.2的浮點數,即第一簇點的y軸坐标

x2 = [i for i in range(0, len(sum_test_epoch_loss), 1)]
y2 = sum_test_epoch_loss

colors1 = '#00CED4'  # 點的顔色
colors2 = '#DC143C'
area = np.pi * 4 ** 1  # 點面積
# 畫散點圖
plt.scatter(x1, y1, s=area, c=colors1, alpha=0.4, label='train_loss')
plt.scatter(x2, y2, s=area, c=colors2, alpha=0.4, label='val_loss')
# plt.plot([0,9.5],[9.5,0],linewidth = '0.5',color='#000000')
plt.legend()
# plt.savefig(r'C:\Users\jichao\Desktop\大論文\12345svm.png', dpi=300)
plt.show()




           

繼續閱讀