天天看點

【PyTorch】torch.nn.Transformer解讀與應用nn.TransformerEncoderLayernn.TransformerEncoderPostionEncoderTransformerModel

nn.TransformerEncoderLayer

        這個類是transformer encoder的組成部分,代表encoder的一個層,而encoder就是将transformerEncoderLayer重複幾層。

Args:

d_model: the number of expected features in the input (required).

nhead: the number of heads in the multiheadattention models (required).

dim_feedforward: the dimension of the feedforward network model (default=2048).

dropout: the dropout value (default=0.1).

activation: the activation function of intermediate layer, relu or gelu (default=relu).

Examples::

encoder_layer = nn.TransformerEncoderLayer(d_model=512, nhead=8)

src = torch.rand(10, 32, 512)

out = encoder_layer(src)

        需要注意的是transformer 隻能輸入 seqlenth x batch x dim 形式的資料。

nn.TransformerEncoder

        這裡是transformer的encoder部分,即将上述的encoder-layer作為參數輸入初始話以後可以獲得TransformerEncoder

Args

encoder_layer: an instance of the TransformerEncoderLayer() class (required).

num_layers: the number of sub-encoder-layers in the encoder (required).

norm: the layer normalization component (optional).

Examples::

encoder_layer = nn.TransformerEncoderLayer(d_model=512,nhead=8) transformer_encoder=nn.TransformerEncoder(encoder_layer,num_layers=6)

src = torch.rand(10, 32, 512)

out =transformer_encoder(src)

PostionEncoder

        這裡的數學原理就不再詳細叙述了,因為我也沒搞特别明白反正就是獲得位置資訊,與embedding加起來就行了。

class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)
           

TransformerModel

        這裡将參考pytorch tutorial中的内容

class First_TransformerModel(nn.Module):

    def __init__(self, ninp=300, nhead=4, nhid=128, nlayers=6, dropout=0.5):
        super(First_TransformerModel, self).__init__()
        from torch.nn import TransformerEncoder, TransformerEncoderLayer
        self.model_type = 'Transformer'
        self.src_mask = None
        self.pos_encoder = PositionalEncoding(ninp, dropout)
        encoder_layers = TransformerEncoderLayer(ninp, nhead, nhid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.ninp = ninp
        

    def _generate_square_subsequent_mask(self, src, lenths):
        '''
        padding_mask
        src:max_lenth,num,300
        lenths:[lenth1,lenth2...]
        '''

        # mask num_of_sens x max_lenth
        mask = torch.ones(src.size(1), src.size(0)) == 1
        for i in range(len(lenths)):
            lenth = lenths[i]
            for j in range(lenth):
                mask[i][j] = False

        return mask

    def forward(self, src, mask):
        '''
        src:num_of_all_sens,max_lenth,300
        '''
        self.src_mask = mask

        src = src * math.sqrt(self.ninp)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_key_padding_mask=self.src_mask)
        output = output[0,:,:]
        return output

class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)
           

        在這裡我們隻需将輸入的src (seqlenth x batch x ninp)進行下面的操作即可,先乘上根号下的ninp,經過positionencoder,再經過encoder即可。

    src = src * math.sqrt(self.ninp)
    src = self.pos_encoder(src)
    output = self.transformer_encoder(src, src_key_padding_mask=self.src_mask)
           

        這裡還需要提一下mask

        mask 是什麼呢?

        mask主要可以分為兩種mask,一種是src_mask,一種是src_key_padding_mask, 這裡我們主要解釋src_key_padding_mask。

        nn.Transformer中,提到了src_key_padding_mask的size,必須是 NxS ,即 batch x seqlenths通過這個mask,就可以将padding的部分忽略掉,讓attention注意力機制不再參與這一部分的運算。

        需要注意的是,src_key_padding_mask 是一個二值化的tensor,在需要被忽略地方應該是True,在需要保留原值的情況下,是False

        這裡附上我定義的雙層transformer代碼

第一層

class First_TransformerModel(nn.Module):

    def __init__(self, ninp=300, nhead=4, nhid=128, nlayers=6, dropout=0.5):
        super(First_TransformerModel, self).__init__()
        from torch.nn import TransformerEncoder, TransformerEncoderLayer
        self.model_type = 'Transformer'
        self.src_mask = None
        self.pos_encoder = PositionalEncoding(ninp, dropout)
        encoder_layers = TransformerEncoderLayer(ninp, nhead, nhid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        # self.encoder = nn.Embedding(ntoken, ninp)
        self.ninp = ninp
        # self.decoder = nn.Linear(ninp, ntoken)

    def _generate_square_subsequent_mask(self, src, lenths):
        '''
        padding_mask
        src:max_lenth,num,300
        lenths:[lenth1,lenth2...]
        '''

        # mask num_of_sens x max_lenth
        mask = torch.ones(src.size(1), src.size(0)) == 1
        for i in range(len(lenths)):
            lenth = lenths[i]
            for j in range(lenth):
                mask[i][j] = False

        # mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        #mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def forward(self, src, mask):
        '''
        src:num_of_all_sens,max_lenth,300
        '''
        self.src_mask = mask

        src = src * math.sqrt(self.ninp)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_key_padding_mask=self.src_mask)
        output = output[0,:,:]
        #output = self.decoder(output)
        return output

class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)
           

第二層

#second level

class Second_TransformerModel(nn.Module):

    def __init__(self, ninp=300, nhead=4, nhid=128, nlayers=6, dropout=0.5):
        super(Second_TransformerModel, self).__init__()
        from torch.nn import TransformerEncoder, TransformerEncoderLayer
        self.src_mask = None
        self.pos_encoder = PositionalEncoding(ninp, dropout)
        encoder_layers = TransformerEncoderLayer(ninp, nhead, nhid, dropout)
        self.transformer_encoder = TransformerEncoder(encoder_layers, nlayers)
        self.ninp = ninp
    def _generate_square_subsequent_mask(self, src, lenths):
        '''
        padding_mask
        src:num_of_sentence x batch(文章數) x 300
        lenths:[lenth1,lenth2...]
        '''

        # mask num_of_sens x max_lenth
        mask = torch.ones(src.size(1), src.size(0)) == 1
        for i in range(len(lenths)):
            lenth = lenths[i]
            for j in range(lenth):
                mask[i][j] = False

        return mask
    def forward(self, src, mask):
        '''
        
        src:max_sentence_num x batch(文章數) x 300
        
        '''
        self.src_mask = mask

        src = src * math.sqrt(self.ninp)
        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, src_key_padding_mask=self.src_mask)
        #output = self.decoder(output)
        return output
           

最終代碼

class segmentmodel(nn.Module):
    def __init__(self, ninp=300, nhead=4, nhid=128, nlayers=6, dropout=0.5):
        super(segmentmodel, self).__init__()
        self.first_layer = First_TransformerModel(ninp,nhead,nhid,nlayers,dropout)
        self.second_layer = Second_TransformerModel(ninp,nhead,nhid,nlayers,dropout)
        self.linear = nn.Linear(ninp,2)  
    
    def pad(self, s, max_length):
        s_length = s.size()[0]
        v = torch.tensor(s.unsqueeze(0).unsqueeze(0))
        padded = F.pad(v, (0, 0, 0, max_length - s_length))  # (1, 1, max_length, 300)
        shape = padded.size()
        return padded.view(shape[2], 1, shape[3])  # (max_length, 1, 300)


    def pad_document(self, d, max_document_length):
        d_length = d.size()[0]
        v = d.unsqueeze(0).unsqueeze(0)
        padded = F.pad(v, (0, 0,0, max_document_length - d_length ))  # (1, 1, max_length, 300)
        shape = padded.size()
        return padded.view(shape[2], 1, shape[3])  # (max_length, 1, 300)
    
    def forward(self, batch):
        batch_size = len(batch)

        sentences_per_doc = []
        all_batch_sentences = []
        for document in batch:
            all_batch_sentences.extend(document)
            sentences_per_doc.append(len(document))

        lengths = [s.size()[0] for s in all_batch_sentences]

        max_length = max(lengths)
        #logger.debug('Num sentences: %s, max sentence length: %s', 
                    # sum(sentences_per_doc), max_length)

        padded_sentences = [self.pad(s, max_length) for s in all_batch_sentences]
        big_tensor = torch.cat(padded_sentences, 1)  # (max_length, batch size, 300)
        
        mask = self.first_layer._generate_square_subsequent_mask(big_tensor,
                                                                 lengths).cuda()
        
        firstlayer_out = self.first_layer(src = big_tensor,mask = mask)
        # 句子數 x 300
        
        
        #padded_output  batch x 300 
        # 将各個文章中的句子分别取出來
        encoded_documents =[]
        index = 0
        for sentences_count in sentences_per_doc:
            end_index = index + sentences_count
            encoded_documents.append(firstlayer_out[index : end_index, :])
            index = end_index
            
            
        #docuemnt_padding
        doc_sizes = [doc.size()[0] for doc in encoded_documents]
        max_doc_size = np.max(doc_sizes)
        padded_docs = [self.pad_document(d, max_doc_size) for d in encoded_documents]
        docs_tensor = torch.cat(padded_docs, 1)
        #docs_tensor max_doc_size x batch x 300
        
        mask = self.second_layer._generate_square_subsequent_mask(docs_tensor,doc_sizes).cuda()
        second_layer_out = self.second_layer(src = docs_tensor,mask = mask)
        #去除最後一個句子
        doc_outputs = []
        
        for i, doc_len in enumerate(doc_sizes):
            doc_outputs.append(second_layer_out[0:doc_len - 1, i, :])  # -1 to remove last predic
        sentence_outputs = torch.cat(doc_outputs, 0)
        # 句子數 x 300
        
        
        out = self.linear(sentence_outputs)
        return out
           

值得注意的是,這裡的第一層提取的句子資訊,是采用的第一層的輸出的一個向量來表示的,即從 seqlenth x N x 300 中選出 seqlenth次元的第一個作為句子表示,得到Nx300的tensor。

————————————————

來源:https://blog.csdn.net/qq_43645301/article/details/109279616

繼續閱讀