天天看點

隐馬爾科夫模型HMM之Baum-Welch算法Python代碼實作1. 初始化一些參數2. 定義前向算法獲得α_{ij}3. 定義後向算法獲得β_{ij}4. 根據《統計學習方法》公式10.24計算γ_{t}(i)5. 根據《統計學習方法》公式10.26計算ξ_{t}(i, j)6. 根據《統計學習方法》算法【10.4】定義模型訓練過程7. 整體代碼8. 執行個體

☕️ 本文系列文章彙總:

(1)HMM開篇:基本概念和幾個要素

(2)HMM計算問題:前後向算法

         代碼實作 

(3)HMM學習問題:Baum-Welch算法

(4)  HMM預測問題:維特比算法

本篇算法原理分析及公式推導請參考: HMM學習問題:Baum-Welch算法

原了解析及公式推導已在系列部落格中介紹,本篇重點用python實作一下Baum-Welch算法,走起~

目錄

1. 初始化一些參數

2. 定義前向算法獲得α_{ij}

3. 定義後向算法獲得β_{ij}

4. 根據《統計學習方法》公式10.24計算γ_{t}(i)

5. 根據《統計學習方法》公式10.26計算ξ_{t}(i, j)

6. 根據《統計學習方法》算法【10.4】定義模型訓練過程

7. 整體代碼

8. 執行個體

1. 初始化一些參數

def __init__(self, N, M, V):
        self.A = np.random.dirichlet(np.ones(N), size=N)  # 狀态轉移機率矩陣
        self.B = np.random.dirichlet(np.ones(M), size=N)  # 觀測機率矩陣
        self.pi = np.array(np.random.dirichlet(np.ones(N), size=1))[0]  # 初始狀态機率矩陣
        self.V = V # 所有可能的觀測
        self.N = N # 所有可能的狀态長度
        self.M = M # 所有可能的觀測長度
           

這裡用到的`np.random.dirichlet(args, size)`是随機生成一個次元為args,size行的數組,并保證每一行之和為1

2. 定義前向算法獲得α_{ij}

def forward(self):
        """
        前向算法,Baum welch算法需要用到
        :param O: 已知的觀測序列
        :return: alpha_{i}
        """
        row, col = len(self.O), self.A.shape[0]
        alpha_t_plus_1 = np.zeros((row, col))
        obj_index = self.V.index(self.O[0])
        # 初值α 公式10.15
        alpha_t_plus_1[0][:] = self.pi * self.B[:].T[obj_index]
        for t, o in enumerate(self.O[1:]):
            t += 1
            # 遞推 公式10.16
            obj_index = self.V.index(o)
            alpha_ji = alpha_t_plus_1[t - 1][:].T @ self.A
            alpha_t_plus_1[t][:] = alpha_ji * self.B[:].T[obj_index]

        self.alpha = alpha_t_plus_1
           

3. 定義後向算法獲得β_{ij}

def backward(self):
        """
        後向算法,Baum welch算法需要用到
        :param O: 已知的觀測序列
        :return: beta_{i}
        """
        row, col = len(self.O), self.A.shape[0]
        betaT = np.zeros((row + 1, col))
        # 初值β 公式10.19
        betaT[0][:] = [1] * self.A.shape[0]
        for t, o in enumerate(self.O[::-1][1:]):
            t += 1
            # 反向遞推 公式10.20
            obj_index = self.V.index(self.O[t - 1])
            beta_t = self.A * self.B[:].T[obj_index] @ betaT[t - 1][:].T
            betaT[t][:] = beta_t
        # 由于我們這裡要的是beta矩陣,不做probs的計算,是以不需要這一行,即不計算公式【10.27】
        # betaT[-1][:] = [self.pi[i] * self.B[i][self.V.index(self.O[0])] * betaT[-2][i] for i in range(self.A.shape[0])]
        # 注意這裡計算後向算法時,betaT是倒着存放的,是以我們需要按照beta1,beta2,...,betaT的順序取
        self.beta = betaT[:-1][::-1]
           

上述前向和後向算法的具體實作在上一篇部落格已經給出,這裡不再解釋

4. 根據《統計學習方法》公式10.24計算γ_{t}(i)

def gamma(self, t, i):
        """
        根據課本公式【10.24】計算γ
        :param t: 目前時間點
        :param i: 目前狀态節點
        :return: γ值
        """
        numerator = self.alpha[t][i] * self.beta[t][i]
        denominator = 0.

        for j in range(self.N):
            denominator += (self.alpha[t][j] * self.beta[t][j])

        return numerator / denominator
           

5. 根據《統計學習方法》公式10.26計算ξ_{t}(i, j)

def ksi(self, t, i, j):
        """
        根據公式【10.26】計算 ξ
        :param t: 目前時間點
        :param i: 目前狀态節點
        :param j: 同i
        :return:
        """
        obj_index = self.V.index(self.O[t + 1])
        numerator = self.alpha[t][i] * self.A[i][j] * self.B[j][obj_index] * self.beta[t + 1][j]
        denominator = 0.

        for i in range(self.N):
            for j in range(self.N):
                denominator += self.alpha[t][i] * self.A[i][j] * self.B[j][obj_index] * self.beta[t + 1][j]

        return numerator / denominator
           

6. 根據《統計學習方法》算法【10.4】定義模型訓練過程

def train(self, O, n):
        """
        根據算法【10.4】訓練模型
        :param O: 已知觀測序列
        :param n: 最大疊代步長
        :return: 模型參數λ=(π,A,B)
        """
        self.O = O
        self.T = len(O)
        maxIter = 0

        while maxIter < n:
            tempA = np.zeros((self.N, self.N))
            tempB = np.zeros((self.N, self.M))
            tempPi = np.array([0.] * self.N)

            # 根據前向算法和後向算法得到α和β
            self.forward()
            self.backward()

            maxIter += 1
            # a_{ij},公式【10.39】
            for i in range(self.N):
                for j in range(self.N):
                    numerator = 0.
                    denominator = 0.
                    for t in range(self.T - 1):
                        numerator += self.ksi(t, i, j)
                        denominator += self.gamma(t, i)
                    tempA[i][j] = numerator / denominator

            # b_{i}{j},公式【10.40】
            for j in range(self.N):
                for k in range(self.M):
                    numerator = 0.
                    denominator = 0.
                    for t in range(self.T):
                        if self.O[t] == self.V[k]:
                            numerator += self.gamma(t, j)
                        denominator += self.gamma(t, j)
                    tempB[j][k] = numerator / denominator

            # π_{i},公式【10.41】
            for i in range(self.N):
                tempPi[i] = self.gamma(0, i)
            # 更新
            self.A = tempA
            self.B = tempB
            self.pi = tempPi

        return AttrDict(
            pi=self.pi,
            A=self.A,
            B=self.B
        )
           

7. 整體代碼

import random
import numpy as np

random.seed(1)  # 好像不起租用


class AttrDict(dict):
    # 一個小trick,将結果傳回成一個字典格式
    def __init__(self, *args, **kwargs):
        super(AttrDict, self).__init__(*args, **kwargs)
        self.__dict__ = self


class Baum_Welch:

    def __init__(self, N, M, V):
        self.A = np.random.dirichlet(np.ones(N), size=N)  # 狀态轉移機率矩陣
        self.B = np.random.dirichlet(np.ones(M), size=N)  # 觀測機率矩陣
        self.pi = np.array(np.random.dirichlet(np.ones(N), size=1))[0]  # 初始狀态機率矩陣
        self.V = V # 所有可能的觀測
        self.N = N # 所有可能的狀态長度
        self.M = M # 所有可能的觀測長度

    def forward(self):
        """
        前向算法,Baum welch算法需要用到
        :param O: 已知的觀測序列
        :return: alpha_{i}
        """
        row, col = len(self.O), self.A.shape[0]
        alpha_t_plus_1 = np.zeros((row, col))
        obj_index = self.V.index(self.O[0])
        # 初值α 公式10.15
        alpha_t_plus_1[0][:] = self.pi * self.B[:].T[obj_index]
        for t, o in enumerate(self.O[1:]):
            t += 1
            # 遞推 公式10.16
            obj_index = self.V.index(o)
            alpha_ji = alpha_t_plus_1[t - 1][:].T @ self.A
            alpha_t_plus_1[t][:] = alpha_ji * self.B[:].T[obj_index]

        self.alpha = alpha_t_plus_1

    def backward(self):
        """
        後向算法,Baum welch算法需要用到
        :param O: 已知的觀測序列
        :return: beta_{i}
        """
        row, col = len(self.O), self.A.shape[0]
        betaT = np.zeros((row + 1, col))
        # 初值β 公式10.19
        betaT[0][:] = [1] * self.A.shape[0]
        for t, o in enumerate(self.O[::-1][1:]):
            t += 1
            # 反向遞推 公式10.20
            obj_index = self.V.index(self.O[t - 1])
            beta_t = self.A * self.B[:].T[obj_index] @ betaT[t - 1][:].T
            betaT[t][:] = beta_t
        # betaT[-1][:] = [self.pi[i] * self.B[i][self.V.index(self.O[0])] * betaT[-2][i] for i in range(self.A.shape[0])]
        self.beta = betaT[:-1][::-1]

    def gamma(self, t, i):
        """
        根據課本公式【10.24】計算γ
        :param t: 目前時間點
        :param i: 目前狀态節點
        :return: γ值
        """
        numerator = self.alpha[t][i] * self.beta[t][i]
        denominator = 0.

        for j in range(self.N):
            denominator += (self.alpha[t][j] * self.beta[t][j])

        return numerator / denominator

    def ksi(self, t, i, j):
        """
        根據公式【10.26】計算 ξ
        :param t: 目前時間點
        :param i: 目前狀态節點
        :param j: 同i
        :return:
        """
        obj_index = self.V.index(self.O[t + 1])
        numerator = self.alpha[t][i] * self.A[i][j] * self.B[j][obj_index] * self.beta[t + 1][j]
        denominator = 0.

        for i in range(self.N):
            for j in range(self.N):
                denominator += self.alpha[t][i] * self.A[i][j] * self.B[j][obj_index] * self.beta[t + 1][j]

        return numerator / denominator

    def train(self, O, n):
        """
        根據算法【10.4】訓練模型
        :param O: 已知觀測序列
        :param n: 最大疊代步長
        :return: 模型參數λ=(π,A,B)
        """
        self.O = O
        self.T = len(O)
        maxIter = 0

        while maxIter < n:
            tempA = np.zeros((self.N, self.N))
            tempB = np.zeros((self.N, self.M))
            tempPi = np.array([0.] * self.N)

            # 根據前向算法和後向算法得到α和β
            self.forward()
            self.backward()

            maxIter += 1
            # a_{ij},公式【10.39】
            for i in range(self.N):
                for j in range(self.N):
                    numerator = 0.
                    denominator = 0.
                    for t in range(self.T - 1):
                        numerator += self.ksi(t, i, j)
                        denominator += self.gamma(t, i)
                    tempA[i][j] = numerator / denominator

            # b_{i}{j},公式【10.40】
            for j in range(self.N):
                for k in range(self.M):
                    numerator = 0.
                    denominator = 0.
                    for t in range(self.T):
                        if self.O[t] == self.V[k]:
                            numerator += self.gamma(t, j)
                        denominator += self.gamma(t, j)
                    tempB[j][k] = numerator / denominator

            # π_{i},公式【10.41】
            for i in range(self.N):
                tempPi[i] = self.gamma(0, i)
            # 更新
            self.A = tempA
            self.B = tempB
            self.pi = tempPi

        return AttrDict(
            pi=self.pi,
            A=self.A,
            B=self.B
        )
           

8. 執行個體

if __name__ == '__main__':
    bm = Baum_Welch(N=3, M=2, V=['紅', '白'])
    O = ['紅', '白', '紅']
    res = bm.train(O, 3)
    print(res.pi)
    print(res.A)
    print(res.B)
           

我們将n設定為3輪,可以得到如下結果:

π: [0.38663305 0.61098003 0.00238692]
A: [[0.00183726 0.03990598 0.95825676]
 [0.03327835 0.93088611 0.03583554]
 [0.00324882 0.9861656  0.01058559]]
B: [[0.98736893 0.01263107]
 [0.72787272 0.27212728]
 [0.03464711 0.96535289]]
           

可以看出,每個參數的每一行之和約等于1,這是正确的。要注意,由于這裡使用random随機初始化,是以每次初始化的結果都不一樣。參數的初始化很影響最後的計算結果,這是個十分玄學的過程。因為原理及公式推導我已經弄明白了,是以我在編寫代碼的時候,基本不會再去糾結于公式内涵,基本是無腦按照公式來寫的,這樣會降低錯誤機率。

代碼已經放到GitHub上了,我将會持續更新其它算法的實作。