天天看點

基于稀疏自編碼器的高光譜異常檢測模型

這個稀疏自編碼器就是在編碼器的基礎上,于損失函數加上kl散度。

以下給出代碼,分為訓練部分和探測部分:

1、訓練部分

'''
訓練系數自動編碼器用于高光譜異常探測
'''
import torch
from torch import nn, optim
from torch.autograd import Variable
from torchvision import transforms
from torch.utils.data import DataLoader
from torchvision.utils import save_image
import os

import numpy as np
# 加載資料集
def get_data():
    '''
    函數作用:讀取資料集
    return:dataload與img(高光譜圖像)
    '''
    from scipy import io
    #讀取資料
    dataset_name='sandiego_plane.mat'
    load=io.loadmat(dataset_name)
    img = load['data']
    gt = load['map']
    height,width,spectrum=img.shape

    #這一段是預處理資料
    img_data=img.reshape((height*width,spectrum))#改變資料形狀,使其适合送入dataset
    gt_data=gt.reshape((height*width))
    img_data=img_data.astype(np.float32)#資料類型轉換
    # 将像素點轉換到[-1, 1]之間,使得輸入變成一個比較對稱的分布,訓練容易收斂
    img_data=(img_data-(0.5*img_data.max()))/ (0.5*img_data.max())

    #設定dataset與dataload
    from datasets import Dataset #這個datasets是自己寫的代碼檔案,見第三段代碼
    from torch.utils import data
    train_dataset = Dataset(img_data, gt_data,dataset_name=dataset_name)

    train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size, drop_last=True)
    return train_loader,img

def to_img(x):
    x = (x + 1.) * 0.5
    x = x.clamp(0, 1)
    x = x.view(x.size(0), 1, 28, 28)
    return x
class autoencoder(nn.Module):
    def __init__(self,in_dim,hidden_size,):
        super(autoencoder, self).__init__()
        self.encoder = nn.Sequential(nn.Linear(in_dim, 128),
                                     nn.ReLU(True),
                                     nn.Linear(128, 64),
                                     nn.ReLU(True),
                                     nn.Linear(64, hidden_size),
                                     nn.ReLU(True)
                                     )
        self.decoder = nn.Sequential(nn.Linear(hidden_size, 64),
                                     nn.ReLU(True),
                                     nn.Linear(64, 128),
                                     nn.ReLU(True),
                                     nn.Linear(128, in_dim),
                                     nn.Tanh())
    def forward(self, x):
        encode = self.encoder(x)
        decode = self.decoder(encode)
        return encode, decode


def KL_devergence(p, q):
    """
    Calculate the KL-divergence of (p,q),計算p和q之間的KL散度
    :param p:期望激活值擴充為hidden_size的tensor
    :param q:q為隐藏層結點激活後的輸出值
    :return:
    """
    q = torch.nn.functional.softmax(q, dim=0)# 首先要用softmax對隐藏層結點輸出進行歸一化處理
    q = torch.sum(q, dim=0)/batch_size  # dim:縮減的次元,q的第一維是batch維,即大小為batch_size大小,此處是将第j個神經元在batch_size個輸入下所有的輸出取平均
    s1 = torch.sum(p*torch.log(p/q))
    s2 = torch.sum((1-p)*torch.log((1-p)/(1-q)))
    return s1+s2
if __name__ == "__main__":
    # 超參數設定
    batch_size = 100
    lr = 1e-4
    weight_decay = 1e-5
    epoches = 100
    expect_tho = 0.2

    # 讀取資料集
    train_data,img = get_data()
    height, width, spectrum = img.shape
    #建立模型
    hidden_size=12#隐藏層次元
    model = autoencoder(in_dim=spectrum,hidden_size=hidden_size)#參數分别為輸入次元與隐藏層次元


    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)#, weight_decay=weight_decay
    scheduler=torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.3, patience=4, verbose=True,
                                               threshold=0.01, threshold_mode='rel', min_lr=0,
                                               eps=1e-15)
    if torch.cuda.is_available():
        model.cuda()
    # 定義期望平均激活值和KL散度的權重
    tho_tensor = torch.FloatTensor([expect_tho for _ in range(hidden_size)])
    if torch.cuda.is_available():
        tho_tensor = tho_tensor.cuda()
    _beta = 1 #通過β來控制KL散度所占的比重.
    for epoch in range(epoches):

        loss_of_epoch=0
        for img, _ in train_data:

            # img = Variable(img.cuda())
            # forward
            encoder_out, output = model(img)
            loss = criterion(output, img)
            # import torch.functional as F
            import torch.nn as F
            p = torch.nn.functional.log_softmax(tho_tensor, dim=0)
            # q = torch.nn.functional.softmax(encoder_out, dim=1)
            q = torch.nn.functional.softmax(encoder_out, dim=1)
            # x = F.LogSoftmax(tho_tensor)
            # y = F.Softmax(encoder_out, dim=1)
            criterion_ = nn.KLDivLoss(reduction = 'batchmean')
            _kl = criterion_(p, q)
            '''from scipy import stats
            # torch.nn.log_softmax
            p=tho_tensor.log_softmax().detach().numpy()
            q=encoder_out.softmax(y, dim=1).detach().numpy()+1e-10
            _kl = stats.entropy(p,q)#計算kl散度
            _kl = np.sum(_kl)
            _kl = torch.tensor(_kl)'''
            # _kl = KL_devergence(tho_tensor, encoder_out)#計算kl散度
            loss += _beta * _kl #給損失函數加上kl散度,完成稀疏性限制
            # backward
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            loss_of_epoch = loss_of_epoch+loss.item()  # 防止梯度回傳
        loss_of_epoch/=batch_size#計算一個epoch的平均損失
        scheduler.step(loss_of_epoch)
        print("epoch: {}, loss is {}".format((epoch+1), loss.data))
        for param_group in optimizer.param_groups:
            print(param_group['lr'])


    torch.save(model.state_dict(), './autoencoder_state.pth')

    model(img)
           

2、探測部分

import torch
import numpy as np
from torch import nn
from train_SAE_pytorch import autoencoder,get_data #這裡引用的就是上一個代碼中的函數
from einops import reduce
def make_AUC(x_test,label):
    import sklearn
    from sklearn import metrics
    x_test=(x_test-np.min(x_test))/(np.max(x_test)-np.min(x_test))
    fpr, tpr, _ = sklearn.metrics.roc_curve(label.ravel(), x_test.ravel())
    auc = metrics.roc_auc_score(label.ravel(), x_test.ravel(), average='macro')
    '''
    繪ROC圖    
    '''
    import matplotlib.pyplot as plt
    plt.figure(figsize=(8, 7), dpi=80, facecolor='w')  # dpi:每英寸長度的像素點數;facecolor 背景顔色
    plt.xlim((-0.01, 1.02))  # x,y 軸刻度的範圍
    plt.ylim((-0.01, 1.02))
    plt.xticks(np.arange(0, 1.1, 0.1))  # 繪制刻度
    plt.yticks(np.arange(0, 1.1, 0.1))
    plt.plot(fpr, tpr, 'r-', lw=2, label='AUC=%.4f' % auc)  # 繪制AUC 曲線
    plt.legend(loc='lower right')  # 設定顯示标簽的位置
    plt.xlabel('False Positive Rate', fontsize=14)  # 繪制x,y 坐标軸對應的标簽
    plt.ylabel('True Positive Rate', fontsize=14)
    plt.grid(b=True, ls=':')  # 繪制網格作為底闆;b是否顯示網格線;ls表示line style
    plt.title(u'DecisionTree ROC curve And  AUC', fontsize=18)  # 列印标題
    plt.show()
if __name__ == "__main__":
    import os
    os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

    #加載資料
    from scipy import io
    dataset_name = 'sandiego_plane.mat'
    load = io.loadmat(dataset_name)
    img = load['data']
    gt = load['map']
    height, width, spectrum = img.shape
    #建立模型
    hidden_size=12#隐藏層次元
    #建立模型
    model = autoencoder(in_dim=spectrum,hidden_size=hidden_size)
    #加載上次訓練的模型
    try:
        model_path='autoencoder_state.pth'
        model.load_state_dict(torch.load(model_path,map_location=torch.device('cpu')))
    except FileNotFoundError as e:
        print(e)
    print(f'Loading of model succesful.')


    # 這一段是預處理資料
    img_data = img.reshape((height * width, spectrum))  # 改變資料形狀,使其适合送入dataset
    gt_data = gt.reshape((height * width))
    img_data = img_data.astype(np.float32)  # 資料類型轉換
    # 将像素點轉換到[-1, 1]之間,使得輸入變成一個比較對稱的分布,訓練容易收斂
    img_data = (img_data - (0.5 * img_data.max())) / (0.5 * img_data.max())

    result=model(torch.tensor(img_data))[1]#轉換為tensor才能傳入神經網絡中 #result為重建結果

    #計算異常圖,  使用detach剝離梯度屬性然後才能使用numpy()轉換資料類型進而進行計算
    Anomaly_map=(result.detach().numpy()-img_data)**2 #這裡是計算重建誤差
    Anomaly_map=reduce(Anomaly_map, 'i vec -> i', 'mean')
    Anomaly_map=Anomaly_map.reshape((height , width))

    #顯示
    import matplotlib.pyplot as plt
    plt.imshow(Anomaly_map, cmap='gray')
    plt.show()
    make_AUC(Anomaly_map, gt
           

3、用到的函數,命名為 datasets.py 以供調用

import numpy as np
import torch
import torch.utils
import torch.utils.data
import os
from tqdm import tqdm
from sklearn import preprocessing
try:
    # Python 3
    from urllib.request import urlretrieve
except ImportError:
    # Python 2
    from urllib import urlretrieve

class TqdmUpTo(tqdm):
    """Provides `update_to(n)` which uses `tqdm.update(delta_n)`."""
    def update_to(self, b=1, bsize=1, tsize=None):
        """
        b  : int, optional
            Number of blocks transferred so far [default: 1].
        bsize  : int, optional
            Size of each block (in tqdm units) [default: 1].
        tsize  : int, optional
            Total size (in tqdm units). If [default: None] remains unchanged.
        """
        if tsize is not None:
            self.total = tsize
        self.update(b * bsize - self.n)  # will also set self.n = b * bsize
class Dataset(torch.utils.data.Dataset):
        """ Generic class for a hyperspectral scene """

        def __init__(self, data, gt,dataset_name, **hyperparams):
            """
            Args:
                data: 3D hyperspectral image
                gt: 2D array of labels
                patch_size: int, size of the spatial neighbourhood
                center_pixel: bool, set to True to consider only the label of the
                              center pixel
                data_augmentation: bool, set to True to perform random flips
                supervision: 'full' or 'semi' supervised algorithms
            """
            super(Dataset, self).__init__()
            self.data = data
            self.label = gt

            #将label中大于非零值全部置為1,等同于将gt中所有非零值變成1
            mask = np.ones_like(gt)
            mask[gt == 0] = 0
            self.label =mask


        @staticmethod
        def flip(*arrays):
            horizontal = np.random.random() > 0.5
            vertical = np.random.random() > 0.5
            if horizontal:
                arrays = [np.fliplr(arr) for arr in arrays]
            if vertical:
                arrays = [np.flipud(arr) for arr in arrays]
            return arrays



        def __len__(self):  # 這裡傳回的是資料集裡的長度,随機取點就是在這個範圍裡面取
            return len(self.label)

        def __getitem__(self, i):
            data=self.data[i]
            label=self.label[i]

            data = torch.tensor(data)
            label = torch.tensor(label)
            return data, label
           

如果此文對你有幫助的話,還請不要吝惜你的點贊~

繼續閱讀