文章目录
- 提升boosting
- 算法步骤
- numpy复现
提升boosting
“装袋”(bagging)和“提升”(boost)是构建组合模型的两种最主要的方法,所谓的组合模型是由多个基本模型构成的模型,组合模型的预测效果往往比任意一个基本模型的效果都要好。
- 装袋:每个基本模型由从总体样本中随机抽样得到的不同数据集进行训练得到,通过重抽样得到不同训练数据集的过程称为装袋。
- 提升:每个基本模型训练时的数据集采用不同权重,针对上一个基本模型分类错误的样本增加权重,使得新的模型重点关注误分类样本
- 提升boosting方法是一种常用的统计学习方法;
- 强可学习:在概率近似正确学习框架中,一个概念如果存在多项式的额学习算法能够学习他,并且正确率很高,则是强可学习的 3.弱可学习:比随机猜概率略好则称为弱可学习. 4.提升方法就是将很多个弱的分类器组合构成一个强分类器.
- 提升方法如何学习:1)在每一轮如何改变训练数据的权 值或概率分布;前一轮弱分类器错误分类样本的权值,而降低那些被正确分类样本的 权值。2)是如何将弱分类器组合成一个强分类器;AdaBoost采取加权多数表决的方法.
算法步骤
1)给每个训练样本 x_{1},x_{2},….,x_{N} 分配权重,初始权重w_{1} 均为1/N。 2)针对带有权值的样本进行训练,得到模型G_m (初始模型为G1)。 3)计算模型 G_m 的误分率 e_m=sum_{i=1}^Nw_iI(y_inot= G_m(x_i)) 4)计算模型 alpha_m=0.5log[(1-e_m)/e_m] 5)根据误分率e和当前权重向量 w_m 更新权重向量 w_{m 1} D_{m 1}=(w_{m 1,1},...,w_{m 1,i},...,w_{m 1,N}) w_{m 1,i}=frac{w_{m,1}}{Z_{m}}exp(-a_{m}y_{i}G_{m}(x_{i})) Z_{m}=sum_{i=1}^Nw_{mi}exp(-a_{m}y_{i}G_{m}(x_{i}))
6)计算组合模型 f(x)=sum_{m=1}^Malpha_mG_m(x_i) 的误分率。 得到最终分类器: G(x)=sign(f(x))=sign(sum_{m=1}^Malpha_mG_m(x_i)) 7)当组合模型的误分率或迭代次数低于一定阈值,停止迭代;否则,回到步骤2)
numpy复现
代码语言:javascript复制# -*- coding:utf-8 -*-
# /usr/bin/python
import numpy as np
class AdaBoost():
def __init__(self,nEstimators=50,learningRate=0.1):
'''
对象属性
:param nEstimators: 迭代代数
:param learningRate: 学习率
'''
self.nEstimators = nEstimators
self.learningRate = learningRate
def initArgs(self,datasets,labels):
'''
初始化参数
:param datasets: 数据集
:param labels: 标签
:return:
'''
self.X = datasets
self.Y = labels
self.M,self.N = self.X.shape
# 弱分类器
self.clfSets=[]
# 初始化权值
self.weights = [1.0 / self.M]*self.M
# 不同分类器的系数
self.alpha = []
def _G(self,features,labels,weights):
m = len(features)
print("m",m)
error = 10000
bestV = 0
features_min = min(features)
features_max = max(features)
n_step = (features_max - features_min self.learningRate) // self.learningRate
print('n_step:{}'.format(n_step))
direct, compare_array = None, None
for i in range(1, int(n_step)):
v = features_min self.learningRate * i
if v not in features:
# 误分类计算
compare_array_positive = np.array([1 if features[k] > v else -1 for k in range(m)])
print(compare_array_positive)
weight_error_positive = sum([weights[k] for k in range(m) if compare_array_positive[k] != labels[k]])
print(weight_error_positive)
compare_array_nagetive = np.array([-1 if features[k] > v else 1 for k in range(m)])
weight_error_nagetive = sum([weights[k] for k in range(m) if compare_array_nagetive[k] != labels[k]])
if weight_error_positive < weight_error_nagetive:
weight_error = weight_error_positive
_compare_array = compare_array_positive
direct = 'positive'
else:
weight_error = weight_error_nagetive
_compare_array = compare_array_nagetive
direct = 'nagetive'
print('v:{} error:{}'.format(v, weight_error))
if weight_error < error:
error = weight_error
compare_array = _compare_array
bestV = v
return bestV,direct,error,compare_array
# 计算alpha
def _alpha(self,error):
'''
更新alpha
:param error: 误差
:return: 新的alpha
'''
return 0.5 * np.log((1 - error) / error)
def _Z(self,weights,a,clf):
'''
规范化因子
:param weights: 权值
:param a: 系数
:param clf: 类别
:return:
'''
return sum([weights[i] * np.exp(-1 * a * self.Y[i] * clf[i]) for i in range(self.M)])
def _w(self,a,clf,Z):
'''
更新权值
:param a: 系数
:param clf: 类别
:param Z:
:return: 规范因子
'''
self.weights = self.weights * np.exp(-1 * a * self.Y * clf) / Z
# G(x)的线性组合
def _f(self, alpha, clf_sets):
pass
def G(self, x, v, direct):
'''
分类器
:param x: 输入特征
:param v: 阈值
:param direct: 正负倾向
:return:
'''
if direct == 'positive':
return 1 if x > v else -1
else:
return -1 if x > v else 1
def fit(self,X,y):
'''
训练
:param X: x特征
:param y: y标签
:return:
'''
self.initArgs(X,y)
for epoch in range(self.nEstimators):
best_clf_error, bestV, clf_result = 100000, None, None
for j in range(self.N):
features = self.X[:, j]
# 分类阈值,分类误差,分类结果
v, direct, error, compare_array = self._G(features, self.Y, self.weights)
if error < best_clf_error:
best_clf_error = error
bestV = v
final_direct = direct
clf_result = compare_array
axis = j
if best_clf_error == 0:
break
# 计算G(x)系数a
a = self._alpha(best_clf_error)
self.alpha.append(a)
# 记录分类器
self.clfSets.append((axis, bestV, final_direct))
# 规范化因子
Z = self._Z(self.weights, a, clf_result)
# 权值更新
self._w(a, clf_result, Z)
def predict(self, feature):
'''
预测
:param feature: x特征
:return:
'''
result = 0.0
for i in range(len(self.clfSets)):
axis, clf_v, direct = self.clfSets[i]
f_input = feature[axis]
result = self.alpha[i] * self.G(f_input, clf_v, direct)
# sign
return 1 if result > 0 else -1
def score(self, X_test, y_test):
'''
分数计算
:param X_test: 测试数据集的x特征
:param y_test: 测试数据集的y特征
:return:
'''
right_count = 0
for i in range(len(X_test)):
feature = X_test[i]
if self.predict(feature) == y_test[i]:
right_count = 1
return right_count / len(X_test)
X = np.arange(10).reshape(10,1)
y = np.array([1, 1, 1, -1, -1, -1, 1, 1, 1, -1])
clf = AdaBoost(nEstimators=10,learningRate=0.9)
clf.fit(X,y)
testX = np.array([[0],[9],[2]])
testY = np.array([1,-1,1])
print(clf.score(testX,testY))