算法原理
预测连续变量的值
假设我们现在要预测某个连续变量的大小,那么最常见的方法就是用平均值来。假设现在我们要预测班级中某个学生的成绩,而班级中除了我们要预测的学生以外其他学生的平均成绩为 60 分,那么我们用 60 分来预测这个学生肯定是比 0 或者 100 显得合理一点。下面来尝试证明一下:
- 定义损失函数,其中是对 y 的预测值,使用 MSE 来评估损失:
- 对求偏导,
- 令导数等于 0,最小化 MSE,得到,,得到
- 可以看到,如果要用一个常量来预测,那么用 y 的均值会是一个比较好的选择。
进一步
如果现在给班级里面的人加上一个限制,就是每个人前 10 次考试的平均分已知,那么如何将这个信息应用于我们的预测中呢?一个显然的思路就是把学生按照前 10 次考试的平均分分为两组,在每一组都应用我们之前的"平均值"模型。比如在前 10 次考试平均分 50 以下的分为 A 组,前 10 次考试平均分 50 以上的分为 B 组,假设 A 组这次考试的平均分为 48,B 组这次考试的成绩为 79。当前这个同学如果前 10 次的平均分<50,那么就预测这个同学这次考试的分数为 48,否则就预测为 79。这里似乎还有一个问题,我们的分割点应该选在哪里呢?50 是否合理? 所以我们可以尝试所有可能的 m 分割点,沿用之前的损失函数,对 A,B 两组分别计算 Loss 并相加得到。最小的所对应的就是我们寻找的最佳分割点。
更进一步
这次我们不仅知道了每个同学前 10 次考试的平均分,还知道了每位同学去年获得得奖学金金额,又该怎么做呢? 我们可以在上面分为 A,B 两组得基础上,再分 C,D 两组,最后形成 AC,AD,BC,BD 这样 4 个组。仍然是最小化之前的损失函数。 可以看到这个回归树实际上也就是机器学习中的决策树,不过决策树的分类技巧稍微复杂点(和信息增益相关)。
代码实现
针对波士顿房价预测数据集。
代码语言:javascript复制# coding=utf-8
from copy import copy
from random import randint, seed, random
from time import time
# 统计程序运行时间函数
# fn代表运行的函数
def run_time(fn):
def fun():
start = time()
fn()
ret = time() - start
if ret < 1e-6:
unit = "ns"
ret *= 1e9
elif ret < 1e-3:
unit = "us"
ret *= 1e6
elif ret < 1:
unit = "ms"
ret *= 1e3
else:
unit = "s"
print("Total run time is %.1f %sn" % (ret, unit))
return fun()
def load_data():
f = open("boston/housing.csv")
X = []
y = []
for line in f:
line = line[:-1].split()
xi = [float(s) for s in line[:-1]]
yi = line[-1]
if '.' in yi:
yi = float(yi)
else:
yi = int(yi)
X.append(xi)
y.append(yi)
f.close()
return X, y
# 划分训练集和测试集
def train_test_split(X, y, prob=0.7, random_state=None):
if random_state is not None:
seed(random_state)
X_train = []
X_test = []
y_train = []
y_test = []
for i in range(len(X)):
if random() < prob:
X_train.append(X[i])
y_train.append(y[i])
else:
X_test.append(X[i])
y_test.append(y[i])
seed()
return X_train, X_test, y_train, y_test
# 计算回归模型的拟合优度
def get_r2(reg, X, y):
y_hat = reg.predict(X)
m = len(y)
n = len(y_hat)
sse = sum((yi - yi_hat) ** 2 for yi, yi_hat in zip(y, y_hat))
y_avg = sum(y) / len(y)
sst = sum((yi - y_avg) ** 2 for yi in y)
r2 = 1 - sse / sst
print("Test r2 is %.3f!" % r2)
return r2
# 创建Node类
class Node(object):
# 初始化,存储预测值,左右节点,特征和分割点
def __init__(self, score=None):
self.score = score
self.left = None
self.right = None
self.feature = None
self.split = None
# 创建回归树类
class RegressionTree(object):
# 初始化,存储根节点和树的高度
def __init__(self):
self.root = Node()
self.height = 0
# 计算分割点,MSE, 根据自变量X、因变量y、X元素中被取出的行号idx,
# 列号feature以及分割点split,计算分割后的MSE。注意这里为了减少
# 计算量,用到了方差公式:D(X)=E{[X-E(X)]^2}=E(X^2)-[E(X)]^2
def get_split_mse(self, X, y, idx, feature, split):
split_sum = [0, 0]
split_cnt = [0, 0]
split_sqr_sum = [0, 0]
for i in idx:
xi, yi = X[i][feature], y[i]
if xi < split:
split_cnt[0] = 1
split_sum[0] = yi
split_sqr_sum[0] = yi ** 2
else:
split_cnt[1] = 1
split_sum[1] = yi
split_sqr_sum[1] = yi ** 2
split_avg = [split_sum[0] / split_cnt[0], split_sum[1] / split_cnt[1]]
split_mse = [split_sqr_sum[0] - split_sum[0] * split_avg[0],
split_sqr_sum[1] - split_sum[1] * split_avg[1]]
return sum(split_mse), split, split_avg
# 计算最佳分割点,遍历特征某一列的所有的不重复的点,找出MSE最小的点
# 作为最佳分割点。如果特征中没有不重复的元素则返回None。
def choose_split_point(self, X, y, idx, feature):
unique = set([X[i][feature] for i in idx])
if (len(unique) == 1):
return None
unique.remove(min(unique))
mse, split, split_avg = min((self.get_split_mse(X, y, idx, feature, split)
for split in unique), key=lambda x: x[0])
return mse, feature, split, split_avg
# 选择最佳特征,遍历所有特征,计算最佳分割点对应的MSE,找出MSE最小
# 的特征、对应的分割点,左右子节点对应的均值和行号。如果所有的特征都没有不重复元素则返回None
def choose_feature(self, X, y, idx):
m = len(X[0])
split_rets = [x for x in map(lambda x: self.choose_split_point(X, y, idx, x),
range(m)) if x is not None]
if (split_rets == []):
return None
_, feature, split, split_avg = min(split_rets, key=lambda x: x[0])
idx_split = [[], []]
while idx:
i = idx.pop()
xi = X[i][feature]
if xi < split:
idx_split[0].append(i)
else:
idx_split[1].append(i)
return feature, split, split_avg, idx_split
# 规则转文字
def expr2literal(self, expr):
feature, op, split = expr
op = ">=" if op == 1 else "<"
return ("Feature%d %s %.4f" % (feature, op, split))
# 获取规则,将回归树的所有规则都用文字表达出来,方便我们了解树的全貌。这里使用BFS。
def get_rules(self):
que = [[self.root, []]]
self.rules = []
while que:
nd, exprs = que.pop(0)
if not (nd.left or nd.right):
literals = list(map(self.expr2literal, exprs))
self.rules.append([literals, nd.score])
if nd.left:
rule_left = copy(exprs)
rule_left.append([nd.feature, -1, nd.split])
que.append([nd.left, rule_left])
if nd.right:
rule_right = copy(exprs)
rule_right.append([nd.feature, 1, nd.split])
que.append([nd.right, rule_right])
# 训练模型,仍然使用队列 广度优先搜索,训练模型的过程中需要注意:
# 1.控制树的最大深度max_depth
# 2.控制分裂时最少的样本量min_samples_split
# 3.叶子节点至少有两个不重复的y值
# 4.至少有一个特征是没有重复值的
def fit(self, X, y, max_depth=5, min_samples_split=2):
self.root = Node()
que = [[0, self.root, list(range(len(y)))]]
while que:
depth, nd, idx = que.pop(0)
if depth > max_depth:
depth -= 1
break
if len(idx) < min_samples_split or set(map(lambda i: y[i], idx)) == 1:
continue
feature_rets = self.choose_feature(X, y, idx)
if feature_rets is None:
continue
nd.feature, nd.split, split_avg, idx_split = feature_rets
nd.left = Node(split_avg[0])
nd.right = Node(split_avg[1])
que.append([depth 1, nd.left, idx_split[0]])
que.append([depth 1, nd.right, idx_split[1]])
self.height = depth
self.get_rules()
# 打印规则
def print_reuls(self):
for i, rule in enumerate(self.rules):
literals, score = rule
print("Rule %d: " % i, ' | '.join(
literals) ' => split_hat %.4f' % score)
# 预测一个样本
def _predict(self, row):
nd = self.root
while nd.left and nd.right:
if row[nd.feature] < nd.split:
nd = nd.left
else:
nd = nd.right
return nd.score
# 预测多个样本
def predict(self, X):
return [self._predict(Xi) for Xi in X]
# 效果评估
if __name__ == '__main__':
print('Testing the accuracy of RegressionTree...')
X, y = load_data()
X_train, X_test, y_train, y_test = train_test_split(
X, y, random_state=10)
reg = RegressionTree()
reg.fit(X=X_train, y=y_train, max_depth=4)
reg.print_reuls()
get_r2(reg, X_test, y_test)
拟合的效果还不错!