天天看點

Python實作Adaboost

一 提升方法是什麼?

首先介紹兩個概念 強可學習,弱可學習

在機率近似正确學習的架構中 , 一個概念(一個類) ,如果存在一個多項式的學習算法 能夠學習它,并且正确率很高,那麼就稱這個概念是強可學習的;

一個概念,如果存在 一個多項式的學習算法能夠學習它,學習的正确率僅比随機猜測略好,那麼就稱這個概念是弱可學習的。

但是事實上已經有學者證明 一個概念是強可學習的充分必要條件是這個概念 是弱可學習的,即二者是等價的。

顯而易見,我們學習一個僅僅比随機猜測正确率高的模型是很容易的,但是學習一個正确率很高的模型卻是很困難的。如果我們已經學習出一個弱學習模型,根據我們之前的介紹,我們必然可以學習出一個強學習模型。

提升方法要解決的問題就是如何将一個弱學習的模型提升為一個強學習的模型。 它的具體做法是從弱學習算法出發,反複學習,得到一系列弱分類器,然後組合這些弱分類器,構成一個強分類器,這也就是我們的一句古話,三個臭皮匠頂個諸葛亮所表達的含義 。大多數的提升方法是根據改變訓練資料的機率分布(訓練資料的權值分布),針對不同的訓練資料分布調用弱學習算法學習一系列弱分類器。

下面介紹一個具體的提升方法

二 AdaBoost算法

在我們前面的介紹中,大家可能會有一些疑惑,改變訓練資料的機率分布是什麼東東? 還有多個弱分類器組合成一個強分類器, what? 怎麼組合?一頭霧水…

我現在這裡簡單的介紹一下,讓大家有那麼一點頭緒,之後再看下面的就會恍然大悟。

AdaBoost初始時,會給所有的訓練樣本一樣的權值,假如我們的訓練集中有N個訓練樣本,所有訓練樣本的權值就都是1/N , 之後每疊代完一輪,訓練好一個弱分類器後都會重新更新這些樣本的權值,提高那些被前一輪弱分類器錯誤分類樣本的權值,而降低那些被正确分類樣本的權值。這樣可以使下一輪弱分類器重點關注被上一輪弱分類器錯誤分類的樣本。

至于組合弱分類器,是給每一個弱分類器G(x)一個權值 a 之後讓所有弱分類器和它們的權值的乘積做一個加和運算,得到我們的強分類器。權值是根據弱分類器的誤內插補點來配置設定的,誤差越大的弱分類器,它的權值越小,這就使得在建構強分類器時,誤差大的弱分類器起的作用比較小,誤差小的弱分類器起的作用比較大。

簡單回答了以上兩個問題,我們正式來認識一下AdaBoost這個厲害的朋友吧 !

AdaBoost算法流程

輸入: 訓練集 T={(x1,y1),(x2,y2),…,(xN,yN)}

Python實作Adaboost
Python實作Adaboost

AdaBoost算法在初始時會給每一個樣本一個權值

1 初始權值為 1/N

Python實作Adaboost

2 M是設定的一個疊代輪次 每疊代一輪都會生成一個弱分類器, 我們需要疊代 第2步–第5步 M 次 若完成疊代 則繼續往下進行 6

基于目前訓練資料以及其權值分布Dm 進行訓練,得到分類器Gm(x) m=1,2,…,M

Python實作Adaboost

3 計算Gm(x) 在訓練資料集上的分類誤差率

Python實作Adaboost
Python實作Adaboost

Gm(X) 在權重的訓練資料集上的分類誤差率是被 Gm(X) 誤分類樣本的權值之和

4 計算Gm(x) 的系數

Python實作Adaboost
Python實作Adaboost

5 進行權值更新

Python實作Adaboost

其中 Zm 稱作規範化因子

Python實作Adaboost

更新訓練資料的權值分布的公式可以寫成這樣

Python實作Adaboost

兩式相比較可知 被誤分類樣本的權值被放大了

Python實作Adaboost

誤分類樣本在下一輪學習中起更大的作用

6 建構基本分類器的線性組合

Python實作Adaboost

得到最終分類器

Python實作Adaboost

下面給大家講解一個例子

我們根據這個例子看AdaBoost算法到底怎麼進行的

樣本序号 1 2 3 4 5 6 7 8 9 10
樣本點 X (1,5) (2,2) (3,1) (4,6) (6,8) (6,5) (7,9) (8,7) (9,8) (10,2)
類别Y 1 1 -1 -1 1 -1 1 1 -1 -1
Python實作Adaboost
Python實作Adaboost
Python實作Adaboost
Python實作Adaboost

三 Python實作Adaboost

代碼實作基于以上講解

class Gx:
    def __init__(self,feature,split,coef,left,right):
        #一個分布 這個分布是以哪個feature 作為劃分的 劃分點 split是 這個分布的系數是coef
         self.feature=feature
         self.split= split
         self.coef=coef
         self.left_class =left#小于切分點的類
         self.right_class = right #大于切分點的類

class Adaboost:
    def __init__(self,Train,Test,M):
        self.Train = Train
        self.Test = Test
        self.w=[1/(self.Train.shape[0])]* self.Train.shape[0] #賦初值,權重的初值是 1/ 訓練樣本的個數
        self.feature_num = self.Train.shape[1] - 1  # 求特征個數 因為訓練資料的最後一列是标簽 是以特征數等于次元數減一
        self.label = self.Train.shape[1] - 1  # 标簽所在列
        self.Gx = []
        self.M = M  # 疊代次數
        self.split=self.cal_split()

    def cal_split(self):
        # 因為資料的特征都是連續值 計算每一個特征的切分點
        split={}
        for i in range(self.feature_num):
            split[i]=[] #将某一個特征的所有切分點放到一個清單中
            d = self.Train[np.argsort(self.Train[:, i])]  # 以這個特征的大小來進行排序
            for j in range(d.shape[0]-1):
                sp=(d[j][i]+d[j+1][i])/2
                if sp not in split[i]:  #可能有的切分點是重複的 這一步去重
                    split[i].append(sp)
        return split

    def cal_class(self,data):
        #傳回資料中正類和負類的占比
        positive =0
        negative =0
        for i in range(data.shape[0]):
            if data[i][self.label]==1:
                positive+=1
            else:
                negative+=1
        return positive/data.shape[0] , negative/data.shape[0]

    def cal_error(self,feature,split,left,right):
       #計算按某個分布的誤差
        error=0
        for i in range(self.Train.shape[0]):
            if (self.Train[i][feature] <= split and self.Train[i][self.label]!=left) or (self.Train[i][feature] > split and self.Train[i][self.label]!=right):
               error+=self.w[i]
        return error

    def cal_Gx(self): #求誤差最小的分布
        min_error=sys.maxsize #最小誤差 李航書上的 em
        min_Gx=None #最小誤差對應的分布
        min_a=0
        for i in range(self.feature_num): #周遊每一個特征
            for j in self.split[i]: #周遊切分點
                left = self.Train[(self.Train[:, i] <= j),: ]
                right = self.Train[(self.Train[:, i] > j),: ]
                p1,n1=self.cal_class(left)
                p2,n2=self.cal_class(right)
                if p1>p2:
                    left_class=1
                    right_class=-1
                else:
                    left_class = -1
                    right_class = 1
                error=self.cal_error(i,j,left_class,right_class)
                a = 1/2*math.log((1-error)/error)
                if error < min_error:
                    min_error=error
                    min_Gx=Gx(i,j,a,left_class,right_class)
                    min_a = a
        self.Gx.append(min_Gx)
        return min_a,min_Gx

    def cal_Zm(self,a,Gx):
        #計算規範化因子Zm
        Zm=0
        for i in range(self.Train.shape[0]):
            y=self.Train[i][self.label]
            feature=Gx.feature
            split=Gx.split
            if self.Train[i][feature]<=split:
                g=Gx.left_class
            else:
                g=Gx.right_class
            Zm+= self.w[i]*math.exp(-a*y*g)
        return Zm


    def update_w(self,Zm,a,Gx):
        # 更新權值w
        for i in range(self.Train.shape[0]):
            y = self.Train[i][self.label]
            feature = Gx.feature
            split = Gx.split
            if self.Train[i][feature] <= split:
                g = Gx.left_class
            else:
                g = Gx.right_class
            self.w[i]=self.w[i] * math.exp(-a * y * g) / Zm

    def cal_fx(self,X):
        #計算 f(x)
        fx=0
        for gx in self.Gx:
            feature = gx.feature
            split = gx.split
            left=gx.left_class
            right=gx.right_class
            a=gx.coef
            if X[feature]<=split:
               g = left
            else:
                g=right
            fx +=a*g
        return fx

    def sign_fx(self,fx):
        # 計算 sign(f(x))
        if fx>0:
            return 1
        elif fx<0:
            return -1
        else:
            return 0

    def fit(self):
        for i in range(self.M):
            print(i)
            min_a, min_Gx=self.cal_Gx()
            Zm=self.cal_Zm(min_a,min_Gx)
            self.update_w(Zm,min_a,min_Gx)
            print(self.predict())

    def predict(self):
        right=0
        for i in range(self.Test.shape[0]):
            y=self.sign_fx(self.cal_fx(self.Test[i]))
            if y==self.Test[i][self.label]:
                right+=1
        return right / self.Test.shape[0]
           

調用代碼

資料基于sklearn的datasets中的breast_cancer資料集

breast_cancer=datasets.load_breast_cancer()
data1=breast_cancer.data
data2=breast_cancer.target
data2.resize([data2.shape[0],1])
for i in range(data2.shape[0]):
    if data2[i][0]==0:
        data2[i][0] =-1
data=np.concatenate((data1,data2), axis=1)
train_size = int(len(data) * 0.7)#劃分訓練集與測試集
train = np.array(data[:train_size])
# print(X_train.shape)
test=np.array(data[train_size:])
model=Adaboost(train,test,100)
model.fit()
print(model.predict())
           

繼續閱讀