天天看點

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

最初的條件随機場論文發表于本世紀初,從那時起,機器學習社群開始将CRF應用于各個領域,從生物序列、計算機視覺到自然語言處理。

在過去幾年中,CRF模型與LSTM結合使用,獲得了新的結果。在NLP社群中,這被認為是序列标記的經驗法則:如果你想要更高的準确性,隻需将一個CRF堆疊在你的LSTM層之上。

在序列分類問題中,最終目标是在給定序列向量(X)的輸入的情況下找到序列标簽(y)的機率。這被表示為P(y|x)。

首先,讓我們定義符号:

  • 訓練集:輸入和目标序列對{(X i,y i)}
  • 第i個輸入向量序列:ξ=(x1,…,xℓ)
  • 第i個标簽序列:yi = [y1, …, yℓ]
  • ℓ是序列長度。

對于樣本(X,y),在正常分類問題中,我們通過乘以序列中第k個位置的每個項目的機率來計算P(y | X),其中1≤k≤ℓ:

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

我們用歸一化指數來模組化P(yk | xk)這類似于在神經網絡中廣泛使用的softmax操作。以下是我們使用exp的一些直覺:

  1. 下溢:當我們用非常小的數字,我們得到一個較小的數量可能遭受下溢。
  2. 非負輸出:所有值都映射在0和+inf之間。
  3. 單調增加:這與argmax操作具有類似的效果。

U(x,y) 被稱為我們的emissions或一進制分數(unary scores)。它隻是在第k個時間步給出x向量的标簽y的分數。您可以将其視為LSTM的第k個輸出。理論上,x向量可以是你想要的任何向量。實際上,我們的x向量通常是周圍元素的連接配接,比如滑動視窗中的單詞嵌入。在我們的模型中,每一個一進制因子都由一個可學習的權重來權重。如果我們将它們視為LSTM輸出,這很容易了解。

Z(x) 通常稱為配分函數(partition function)。我們可以将其視為歸一化因子,因為我們想獲得機率:每個不同标簽的分數應該總和為1.您可以将其視為softmax函數的分母。

到目前為止,我們描述了一個正常分類模型,最後使用softmax激活以獲得機率。 現在我們将添加新的可學習權重來模拟标簽yk跟随yk + 1的可能性。 通過對此進行模組化,我們在連續标簽之間建立依賴關系!叫做線性鍊CRF!為了做到這一點,我們将我們先前的機率乘以P(yk + 1 | yk),我們可以使用指數屬性将其重寫為unary scores U(x,y)加上可學習的transition scores T(y,y):

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

T(y, y)是一個具有形狀(nb_tags, nb_tags)的矩陣,其中每個條目都是一個可學習的參數,表示從第i個标簽到第j個标簽的轉換。讓我們回顧一下所有的新變量

  • emissions或unary scores(U):表示給定輸入x k的yk可能性的分數。
  • transition scores(T):表示yk跟随yk + 1的可能性的分數
  • partition function(Z):歸一化因子,以便在結束時獲得機率。

唯一需要正确定義的是配分函數Z:

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

事實證明,計算Z(X)并非易事,因為我們有太多的嵌套循環!它是每個時間步标簽集上所有可能組合的總和。更确切地說,我們對标簽集進行了ℓ!計算。這給了我們時間複雜度O(ℓ!| y |²)。

幸運的是,我們可以利用循環依賴關系并使用動态程式設計來有效地計算它!執行此操作的算法稱為前向算法或後向算法 - 取決于您在序列上疊代的順序。

Python編碼

讓我們通過建立一個名為CRF的類來啟動我們的代碼,該類繼承自pytorch的nn.Module,以便自動跟蹤我們的梯度。

import torchfrom torch import nnclass CRF(nn.Module): """ Linear-chain Conditional Random Field (CRF). Args: nb_labels (int): number of labels in your tagset, including special symbols. bos_tag_id (int): integer representing the beginning of sentence symbol in your tagset. eos_tag_id (int): integer representing the end of sentence symbol in your tagset. batch_first (bool): Whether the first dimension represents the batch dimension. """ def __init__( self, nb_labels, bos_tag_id, eos_tag_id, batch_first=True ): super().__init__() self.nb_labels = nb_labels self.BOS_TAG_ID = bos_tag_id self.EOS_TAG_ID = eos_tag_id self.batch_first = batch_first self.transitions = nn.Parameter(torch.empty(self.nb_labels, self.nb_labels)) self.init_weights() def init_weights(self): # initialize transitions from a random uniform distribution between -0.1 and 0.1 nn.init.uniform_(self.transitions, -0.1, 0.1) # enforce contraints (rows=from, columns=to) with a big negative number # so exp(-10000) will tend to zero # no transitions allowed to the beginning of sentence self.transitions.data[:, self.BOS_TAG_ID] = -10000.0 # no transition alloed from the end of sentence self.transitions.data[self.EOS_TAG_ID, :] = -10000.0
           
pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

我們也可以為pad id添加一個特殊标記。如果我們這樣做,我們必須確定強制執行限制以防止transitions from padding和transitions to padding的轉換- 除非我們已經處于pad位置。我們可以看到我們的transitions scores T被表示為矩陣self.transitions,這是PyTorch将使用反向傳播學習的torch.parameter!

定義損失函數

在分類問題中,我們希望最小化訓練過程中的誤差。我們可以通過定義一個損失函數L來實作這一點,這個函數将我們的預測和真實标簽作為輸入,如果它們相等,則傳回零;如果它們不同,則傳回正數,這表明出現了誤差。

請注意,我們正在計算P(y | X),這是我們想要最大化的。為了将其建構為最小化問題,我們采用該機率的負對數。這也被稱為負對數似然損失(NLL-Loss)。我們得到:L = - log(P(y | X))。并應用log-properties,如log(a / b)= log(a) - log(b),我們得到:

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

其中Z_log表示我們在計算配分函數期間擷取log。讓我們看看Python代碼:

def forward(self, emissions, tags, mask=None): """Compute the negative log-likelihood. See `log_likelihood` method.""" nll = -self.log_likelihood(emissions, tags, mask=mask) return nlldef log_likelihood(self, emissions, tags, mask=None): """Compute the probability of a sequence of tags given a sequence of emissions scores. Args: emissions (torch.Tensor): Sequence of emissions for each label. Shape of (batch_size, seq_len, nb_labels) if batch_first is True, (seq_len, batch_size, nb_labels) otherwise. tags (torch.LongTensor): Sequence of labels. Shape of (batch_size, seq_len) if batch_first is True, (seq_len, batch_size) otherwise. mask (torch.FloatTensor, optional): Tensor representing valid positions. If None, all positions are considered valid. Shape of (batch_size, seq_len) if batch_first is True, (seq_len, batch_size) otherwise. Returns: torch.Tensor: the log-likelihoods for each sequence in the batch. Shape of (batch_size,) """ # fix tensors order by setting batch as the first dimension if not self.batch_first: emissions = emissions.transpose(0, 1) tags = tags.transpose(0, 1) if mask is None: mask = torch.ones(emissions.shape[:2], dtype=torch.float) scores = self._compute_scores(emissions, tags, mask=mask) partition = self._compute_log_partition(emissions, mask=mask) return torch.sum(scores - partition)
           
pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

我們的forward pass隻是NLL損失(不要将它與前向算法混淆),其中我們在正常log_likelihood方法前面插入減号。該log_likelihood通過首先計算所述scores和log partition的方法,然後再互相減去。此外,我們将mask矩陣傳遞給這些方法,以便它們可以忽略與pad符号相關的計算。為完整起見,mask矩陣看起來像:

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

計算分子:scores

因為我們對exp應用了log,分子就是每個時間步的emission和transition scores的總和!

def _compute_scores(self, emissions, tags, mask): """Compute the scores for a given batch of emissions with their tags. Args: emissions (torch.Tensor): (batch_size, seq_len, nb_labels) tags (Torch.LongTensor): (batch_size, seq_len) mask (Torch.FloatTensor): (batch_size, seq_len) Returns: torch.Tensor: Scores for each batch. Shape of (batch_size,) """ batch_size, seq_length = tags.shape scores = torch.zeros(batch_size) # save first and last tags to be used later first_tags = tags[:, 0] last_valid_idx = mask.int().sum(1) - 1 last_tags = tags.gather(1, last_valid_idx.unsqueeze(1)).squeeze() # add the transition from BOS to the first tags for each batch t_scores = self.transitions[self.BOS_TAG_ID, first_tags] # add the [unary] emission scores for the first tags for each batch # for all batches, the first word, see the correspondent emissions # for the first tags (which is a list of ids): # emissions[:, 0, [tag_1, tag_2, ..., tag_nblabels]] e_scores = emissions[:, 0].gather(1, first_tags.unsqueeze(1)).squeeze() # the scores for a word is just the sum of both scores scores += e_scores + t_scores # now lets do this for each remaining word for i in range(1, seq_length): # we could: iterate over batches, check if we reached a mask symbol # and stop the iteration, but vecotrizing is faster due to gpu, # so instead we perform an element-wise multiplication is_valid = mask[:, i] previous_tags = tags[:, i - 1] current_tags = tags[:, i] # calculate emission and transition scores as we did before e_scores = emissions[:, i].gather(1, current_tags.unsqueeze(1)).squeeze() t_scores = self.transitions[previous_tags, current_tags] # apply the mask e_scores = e_scores * is_valid t_scores = t_scores * is_valid scores += e_scores + t_scores # add the transition from the end tag to the EOS tag for each batch scores += self.transitions[last_tags, self.EOS_TAG_ID] return scores
           
pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

為了了解這段代碼,你必須認為batch中每個句子的所有操作都是相同的。我們首先通過調用tags[:, 0]擷取每個batch中第一個單詞的标記。類似地,我們對時間步長次元上的mask求和,以得到長度清單,對于上一個示例,長度清單是[5,3]。在實踐中,它傳回一個torch.LongTensor。。例如,讓我們看看第26行:

emissions[:, 0].gather(1, first_tags.unsqueeze(1)).squeeze()
           
  1. 首先,我們從第一個時間步中選擇所有批次:emissions[:, 0],它傳回一個具有形狀(batch_size, nb_tags)的張量。
  2. 然後我們隻想選擇LongTensor first_tags中具有形狀(batch_size,)的列(dim = 1)中的值。 由于emissions是一個2D矩陣,我們解壓縮first_tags的最後一個次元來獲得形狀(batch_size,1):first_tags.unsqueeze(1)
  3. 現在它們具有相同的形狀,我們可以使用gather函數來選擇first_tags指定次元内的值emissions:emissions[:, 0].gather(1, first_tags.unsqueeze(1))
  4. 最後,這會得到一個形狀為(batch_size, 1)的矩陣,是以我們把它squeeze得到一個1D LongTensor。

在整個代碼中使用這個簡單的過程在指定的次元中選擇一組标簽。

我想談談這段代碼的最後一件事是我們如何忽略與padding符号相關的分數。解決這個問題的想法很簡單:我們在兩個向量之間執行逐元素乘法,以将時間步已經處于pad位置的新分數歸零。

計算配分函數:前向算法

現在我們計算了我們的分數,讓我們關注我們的分母。為了有效地計算配分函數,我們使用前向算法。我将簡要描述它并展示我們如何在 log-space中計算它。

前向算法的僞代碼如下:

1)初始化,對于y'2的所有值:

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

2)For k=2 to ℓ-1,對于y'k + 1(log-space)的所有值:

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

3)最後:

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

注意,在第二步中,我們對exp求和取對數。這可能是有問題的。如果給定标簽y 'k的數太大,那麼該指數将快速增長到一個非常大的數字!在最後做log之前,我們可能會發現一個溢出問題。幸運的是,有一個技巧可以讓這個操作變得穩定:

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

左邊等于右邊的證明是這樣的:

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

将c設定為max(z),我們就完成了。此外,PyTorch已經為我們提供了這種穩定的實作方式torch.logsumexp。現在讓我們使用PyTorch對上面的算法進行編碼:

def _compute_log_partition(self, emissions, mask): """Compute the partition function in log-space using the forward-algorithm. Args: emissions (torch.Tensor): (batch_size, seq_len, nb_labels) mask (Torch.FloatTensor): (batch_size, seq_len) Returns: torch.Tensor: the partition scores for each batch. Shape of (batch_size,) """ batch_size, seq_length, nb_labels = emissions.shape # in the first iteration, BOS will have all the scores alphas = self.transitions[self.BOS_TAG_ID, :].unsqueeze(0) + emissions[:, 0] for i in range(1, seq_length): alpha_t = [] for tag in range(nb_labels): # get the emission for the current tag e_scores = emissions[:, i, tag] # broadcast emission to all labels # since it will be the same for all previous tags # (bs, nb_labels) e_scores = e_scores.unsqueeze(1) # transitions from something to our tag t_scores = self.transitions[:, tag] # broadcast the transition scores to all batches # (bs, nb_labels) t_scores = t_scores.unsqueeze(0) # combine current scores with previous alphas # since alphas are in log space (see logsumexp below), # we add them instead of multiplying scores = e_scores + t_scores + alphas # add the new alphas for the current tag alpha_t.append(torch.logsumexp(scores, dim=1)) # create a torch matrix from alpha_t # (bs, nb_labels) new_alphas = torch.stack(alpha_t).t() # set alphas if the mask is valid, otherwise keep the current values is_valid = mask[:, i].unsqueeze(-1) alphas = is_valid * new_alphas + (1 - is_valid) * alphas # add the scores for the final transition last_transition = self.transitions[:, self.EOS_TAG_ID] end_scores = alphas + last_transition.unsqueeze(0) # return a *log* of sums of exps return torch.logsumexp(end_scores, dim=1)
           
pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

上面的代碼與我們之前計算得分的方式非常相似。

這裡隻有一件事我們之前沒有看到過:

  • alphas = is_valid * new_alphas + (1 — is_valid) * alphas:在這一行中,如果我們沒有達到pad 位置,我們正在将新的α的目前值更改為α,如果我們到達pad 位置,則保持相同的值。要了解其工作原理,請在時間步長i = 1時檢視此示例:
pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

我們更新了第2和第3個序列,但沒有更新第1個,因為在timetep i = 1時我們到達了pad位置。

作為一個很好的觀察,你可以看到,一旦我們采用logsumexp,我們已經在log-space了!是以,我們可以将alpha的值添加到我們的分數中。最後,我們在最後再執行一次logsumexp操作,以傳回到達句子末尾的最終值 - 是以我們仍然在log-space中。

找到最佳标簽序列

如果我們計算後向算法 - 它隻是向後周遊序列,我們可以找到在每個時間步k最大化P(y k | X)的标簽。有趣的是, 如果我們假設CRF是真正的分布,這将是最佳解。它可以這樣表達:

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

其中α分數來自前向算法,β分數來自後向算法。為了找到标簽y * 的最佳序列,我們在每個時間步長取argmax:

pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

維特比算法(Viterbi algorithm)

但是,事實證明我們不需要計算後向算法以找到最可能的标簽序列。相反,我們可以簡單地跟蹤前向算法期間每個時間步的最大分數。完成後,我們可以跟蹤max操作的後向跟蹤(argmax),以便解碼最大化分數的序列。這正是下面的Python代碼所做的:

def decode(self, emissions, mask=None): """Find the most probable sequence of labels given the emissions using the Viterbi algorithm. Args: emissions (torch.Tensor): Sequence of emissions for each label. Shape (batch_size, seq_len, nb_labels) if batch_first is True, (seq_len, batch_size, nb_labels) otherwise. mask (torch.FloatTensor, optional): Tensor representing valid positions. If None, all positions are considered valid. Shape (batch_size, seq_len) if batch_first is True, (seq_len, batch_size) otherwise. Returns: torch.Tensor: the viterbi score for the for each batch. Shape of (batch_size,) list of lists: the best viterbi sequence of labels for each batch. """ if mask is None: mask = torch.ones(emissions.shape[:2], dtype=torch.float) scores, sequences = self._viterbi_decode(emissions, mask) return scores, sequencesdef _viterbi_decode(self, emissions, mask): """Compute the viterbi algorithm to find the most probable sequence of labels given a sequence of emissions. Args: emissions (torch.Tensor): (batch_size, seq_len, nb_labels) mask (Torch.FloatTensor): (batch_size, seq_len) Returns: torch.Tensor: the viterbi score for the for each batch. Shape of (batch_size,) list of lists of ints: the best viterbi sequence of labels for each batch """ batch_size, seq_length, nb_labels = emissions.shape # in the first iteration, BOS will have all the scores and then, the max alphas = self.transitions[self.BOS_TAG_ID, :].unsqueeze(0) + emissions[:, 0] backpointers = [] for i in range(1, seq_length): alpha_t = [] backpointers_t = [] for tag in range(nb_labels): # get the emission for the current tag and broadcast to all labels e_scores = emissions[:, i, tag] e_scores = e_scores.unsqueeze(1) # transitions from something to our tag and broadcast to all batches t_scores = self.transitions[:, tag] t_scores = t_scores.unsqueeze(0) # combine current scores with previous alphas scores = e_scores + t_scores + alphas # so far is exactly like the forward algorithm, # but now, instead of calculating the logsumexp, # we will find the highest score and the tag associated with it max_score, max_score_tag = torch.max(scores, dim=-1) # add the max score for the current tag alpha_t.append(max_score) # add the max_score_tag for our list of backpointers backpointers_t.append(max_score_tag) # create a torch matrix from alpha_t # (bs, nb_labels) new_alphas = torch.stack(alpha_t).t() # set alphas if the mask is valid, otherwise keep the current values is_valid = mask[:, i].unsqueeze(-1) alphas = is_valid * new_alphas + (1 - is_valid) * alphas # append the new backpointers backpointers.append(backpointers_t) # add the scores for the final transition last_transition = self.transitions[:, self.EOS_TAG_ID] end_scores = alphas + last_transition.unsqueeze(0) # get the final most probable score and the final most probable tag max_final_scores, max_final_tags = torch.max(end_scores, dim=1)
           
pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論
pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

該算法稱為維特比算法。它與我們在log_partition函數中使用的前向算法幾乎相同,但不是對整個序列進行正常分數,而是具有最大分數和最大化這些分數的标記。換句話說,我們通過操作torch.max替換torch.logsumexp,它傳回max和argmax。

現在我們需要做的一切就是選擇這些最終的标記,并按照反向跟蹤來查找“argmax”标記的整個序列。上述代碼的延續部分如下:

# find the best sequence of labels for each sample in the batch best_sequences = [] emission_lengths = mask.int().sum(dim=1) for i in range(batch_size): # recover the original sentence length for the i-th sample in the batch sample_length = emission_lengths[i].item() # recover the max tag for the last timestep sample_final_tag = max_final_tags[i].item() # limit the backpointers until the last but one # since the last corresponds to the sample_final_tag sample_backpointers = backpointers[: sample_length - 1] # follow the backpointers to build the sequence of labels sample_path = self._find_best_path(i, sample_final_tag, sample_backpointers) # add this path to the list of best sequences best_sequences.append(sample_path) return max_final_scores, best_sequencesdef _find_best_path(self, sample_id, best_tag, backpointers): """Auxiliary function to find the best path sequence for a specific sample. Args: sample_id (int): sample index in the range [0, batch_size) best_tag (int): tag which maximizes the final score backpointers (list of lists of tensors): list of pointers with shape (seq_len_i-1, nb_labels, batch_size) where seq_len_i represents the length of the ith sample in the batch Returns: list of ints: a list of tag indexes representing the bast path """ # add the final best_tag to our best path best_path = [best_tag] # traverse the backpointers in backwards for backpointers_t in reversed(backpointers): # recover the best_tag at this timestep best_tag = backpointers_t[best_tag][sample_id].item() # append to the beginning of the list so we don't need to reverse it later best_path.insert(0, best_tag) return best_path
           
pytorch argmax_在PyTorch中實作線性鍊條件随機場(CRF)Python編碼定義損失函數計算分子:scores計算配分函數:前向算法找到最佳标簽序列維特比算法(Viterbi algorithm)結論

可以看到,對于每個樣本,我們都在周遊該樣本的backtrace,并且在每個timestep中插入标記,該标記将在best_path的開頭最大化分數。是以,在最後,我們有一個清單,其中第一個元素對應于第一個标記,最後一個元素對應于序列的最後一個有效标記(參見第14行)。

結論

如果您想在生産中使用CRF模型,我強烈建議您使用經過良好測試的高效實作,比如pytorch包。