numpy-mlnumpy_mlteststest_glm.py
代码语言:javascript
复制# 禁用 flake8 检查
# 导入 numpy 库并重命名为 np
import numpy as np
# 导入 statsmodels 库中的 api 模块并重命名为 sm
import statsmodels.api as sm
# 从 numpy_ml.linear_models 模块中导入 GeneralizedLinearModel 类
from numpy_ml.linear_models import GeneralizedLinearModel
# 从 numpy_ml.linear_models.glm 模块中导入 _GLM_LINKS 变量
from numpy_ml.linear_models.glm import _GLM_LINKS
# 从 numpy_ml.utils.testing 模块中导入 random_tensor 函数
from numpy_ml.utils.testing import random_tensor
# 定义一个测试函数 test_glm,参数 N 默认值为 20
def test_glm(N=20):
# 设置随机种子为 12345
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为正无穷
N = np.inf if N is None else N
# 初始化变量 i 为 1
i = 1
# 循环执行直到 i 大于 N
while i < N 1:
# 生成一个随机整数作为样本数量,范围在 10 到 100 之间
n_samples = np.random.randint(10, 100)
# 生成一个随机整数作为特征数量,确保特征数量远小于样本数量,避免完全分离或多个解决方案
n_feats = np.random.randint(1, 1 n_samples // 2)
target_dim = 1
# 随机选择是否拟合截距
fit_intercept = np.random.choice([True, False])
# 随机选择链接函数
_link = np.random.choice(list(_GLM_LINKS.keys()))
# 创建不同链接函数对应的家族
families = {
"identity": sm.families.Gaussian(),
"logit": sm.families.Binomial(),
"log": sm.families.Poisson(),
}
# 打印当前链接函数和是否拟合截距
print(f"Link: {_link}")
print(f"Fit intercept: {fit_intercept}")
# 生成随机数据作为特征矩阵 X
X = random_tensor((n_samples, n_feats), standardize=True)
# 根据链接函数生成随机标签 y
if _link == "logit":
y = np.random.choice([0.0, 1.0], size=(n_samples, target_dim))
elif _link == "log":
y = np.random.choice(np.arange(0, 100), size=(n_samples, target_dim))
elif _link == "identity":
y = random_tensor((n_samples, target_dim), standardize=True)
else:
raise ValueError(f"Unknown link function {_link}")
# 在整个数据集上拟合标准模型
fam = families[_link]
Xdesign = np.c_[np.ones(X.shape[0]), X] if fit_intercept else X
glm_gold = sm.GLM(y, Xdesign, family=fam)
glm_gold = glm_gold.fit()
# 使用自定义的广义线性模型拟合数据
glm_mine = GeneralizedLinearModel(link=_link, fit_intercept=fit_intercept)
glm_mine.fit(X, y)
# 检查模型系数是否匹配
beta = glm_mine.beta.T.ravel()
np.testing.assert_almost_equal(beta, glm_gold.params, decimal=6)
print("t1. Overall model coefficients match")
# 检查模型预测是否匹配
np.testing.assert_almost_equal(
glm_mine.predict(X), glm_gold.predict(Xdesign), decimal=5
)
print("t2. Overall model predictions match")
# 打印测试通过信息
print("tPASSEDn")
i = 1
numpy-mlnumpy_mlteststest_linear_regression.py
代码语言:javascript
复制# 禁用 flake8 检查
# 导入 numpy 库并重命名为 np
import numpy as np
# 从 sklearn 线性模型中导入 LinearRegression 类并重命名为 LinearRegressionGold
from sklearn.linear_model import LinearRegression as LinearRegressionGold
# 从 numpy_ml 线性模型中导入 LinearRegression 类
from numpy_ml.linear_models import LinearRegression
# 从 numpy_ml 工具包中导入 random_tensor 函数
from numpy_ml.utils.testing import random_tensor
# 定义测试线性回归函数,参数 N 默认值为 10
def test_linear_regression(N=10):
# 设置随机种子为 12345
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为正无穷
N = np.inf if N is None else N
# 初始化变量 i 为 1
i = 1
numpy-mlnumpy_mlteststest_naive_bayes.py
代码语言:javascript
复制# 禁用 flake8 检查
# 导入 numpy 库,并使用别名 np
import numpy as np
# 从 sklearn 库中导入 datasets 模块
from sklearn import datasets
# 从 sklearn 库中导入 model_selection 模块中的 train_test_split 函数
from sklearn.model_selection import train_test_split
# 从 sklearn 库中导入 naive_bayes 模块
from sklearn import naive_bayes
# 从 numpy_ml.linear_models 模块中导入 GaussianNBClassifier 类
from numpy_ml.linear_models import GaussianNBClassifier
# 从 numpy_ml.utils.testing 模块中导入 random_tensor 函数
from numpy_ml.utils.testing import random_tensor
# 定义测试函数 test_GaussianNB,参数 N 默认值为 10
def test_GaussianNB(N=10):
# 设置随机种子为 12345
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为正无穷
N = np.inf if N is None else N
# 初始化变量 i 为 1
i = 1
# 获取浮点数的最小精度
eps = np.finfo(float).eps
numpy-mlnumpy_mlteststest_ngram.py
代码语言:javascript
复制# 禁用 flake8 检查
# 导入临时文件模块
import tempfile
# 导入 nltk 和 numpy 模块
import nltk
import numpy as np
# 从上级目录中导入 tokenize_words 函数
from ..preprocessing.nlp import tokenize_words
# 从上级目录中导入 AdditiveNGram 和 MLENGram 类
from ..ngram import AdditiveNGram, MLENGram
# 从上级目录中导入 random_paragraph 函数
from ..utils.testing import random_paragraph
# 定义 MLEGold 类
class MLEGold:
def __init__(
self, N, K=1, unk=True, filter_stopwords=True, filter_punctuation=True
):
# 初始化类的属性
self.N = N
self.K = K
self.unk = unk
self.filter_stopwords = filter_stopwords
self.filter_punctuation = filter_punctuation
# 设置超参数字典
self.hyperparameters = {
"N": N,
"K": K,
"unk": unk,
"filter_stopwords": filter_stopwords,
"filter_punctuation": filter_punctuation,
}
# 计算给定 N-gram 的对数概率
def log_prob(self, words, N):
assert N in self.counts, "You do not have counts for {}-grams".format(N)
if N > len(words):
err = "Not enough words for a gram-size of {}: {}".format(N, len(words))
raise ValueError(err)
total_prob = 0
for ngram in nltk.ngrams(words, N):
total_prob = self._log_ngram_prob(ngram)
return total_prob
# 计算给定 N-gram 的对数概率
def _log_ngram_prob(self, ngram):
N = len(ngram)
return self._models[N].logscore(ngram[-1], ngram[:-1])
# 定义 AdditiveGold 类
class AdditiveGold:
def __init__(
self, N, K=1, unk=True, filter_stopwords=True, filter_punctuation=True
):
# 初始化类的属性
self.N = N
self.K = K
self.unk = unk
self.filter_stopwords = filter_stopwords
self.filter_punctuation = filter_punctuation
# 设置超参数字典
self.hyperparameters = {
"N": N,
"K": K,
"unk": unk,
"filter_stopwords": filter_stopwords,
"filter_punctuation": filter_punctuation,
}
# 计算给定单词列表的对数概率和
def log_prob(self, words, N):
# 检查是否存在 N-grams 的计数
assert N in self.counts, "You do not have counts for {}-grams".format(N)
# 如果单词数量不足以形成 N-grams,则引发异常
if N > len(words):
err = "Not enough words for a gram-size of {}: {}".format(N, len(words))
raise ValueError(err)
# 初始化总概率
total_prob = 0
# 遍历生成给定单词列表的 N-grams,并计算其对数概率
for ngram in nltk.ngrams(words, N):
total_prob = self._log_ngram_prob(ngram)
# 返回总概率
return total_prob
# 计算给定 N-gram 的对数概率
def _log_ngram_prob(self, ngram):
# 获取 N-gram 的长度
N = len(ngram)
# 调用模型对象的 logscore 方法计算 N-gram 的对数概率
return self._models[N].logscore(ngram[-1], ngram[:-1])
# 测试最大似然估计模型
def test_mle():
# 生成一个随机整数 N,范围在 [2, 5) 之间
N = np.random.randint(2, 5)
# 创建一个最大似然估计的金标准对象
gold = MLEGold(N, unk=True, filter_stopwords=False, filter_punctuation=False)
# 创建一个最大似然估计的自己实现的对象
mine = MLENGram(N, unk=True, filter_stopwords=False, filter_punctuation=False)
# 使用临时文件进行训练
with tempfile.NamedTemporaryFile() as temp:
# 将随机生成的一个包含 1000 个单词的段落写入临时文件
temp.write(bytes(" ".join(random_paragraph(1000)), encoding="utf-8-sig"))
# 使用金标准对象进行训练
gold.train(temp.name, encoding="utf-8-sig")
# 使用自己实现的对象进行训练
mine.train(temp.name, encoding="utf-8-sig")
# 遍历自己实现的对象中 N 阶计数的键
for k in mine.counts[N].keys():
# 如果键的第一个和第二个元素相等,并且在 ("<bol>", "<eol>") 中,则跳过
if k[0] == k[1] and k[0] in ("<bol>", "<eol>"):
continue
# 错误信息字符串模板
err_str = "{}, mine: {}, gold: {}"
# 断言自己实现的对象中的计数与金标准对象中的计数相等
assert mine.counts[N][k] == gold.counts[N][k], err_str.format(
k, mine.counts[N][k], gold.counts[N][k]
)
# 计算自己实现的对象中 k 的对数概率
M = mine.log_prob(k, N)
# 计算金标准对象中 k 的对数概率,并转换为以自然对数为底
G = gold.log_prob(k, N) / np.log2(np.e)
# 使用 np.testing.assert_allclose 检查 M 和 G 是否接近
np.testing.assert_allclose(M, G)
# 打印 "PASSED"
print("PASSED")
# 测试加法平滑模型
def test_additive():
# 生成一个随机浮点数 K
K = np.random.rand()
# 生成一个随机整数 N,范围在 [2, 5) 之间
N = np.random.randint(2, 5)
# 创建一个加法平滑的金标准对象
gold = AdditiveGold(
N, K, unk=True, filter_stopwords=False, filter_punctuation=False
)
# 创建一个加法平滑的自己实现的对象
mine = AdditiveNGram(
N, K, unk=True, filter_stopwords=False, filter_punctuation=False
)
# 使用临时文件进行训练
with tempfile.NamedTemporaryFile() as temp:
# 将随机生成的一个包含 1000 个单词的段落写入临时文件
temp.write(bytes(" ".join(random_paragraph(1000)), encoding="utf-8-sig"))
# 使用金标准对象进行训练
gold.train(temp.name, encoding="utf-8-sig")
# 使用自己实现的对象进行训练
mine.train(temp.name, encoding="utf-8-sig")
# 遍历自己实现的对象中 N 阶计数的键
for k in mine.counts[N].keys():
# 如果键的第一个和第二个元素相等,并且在 ("<bol>", "<eol>") 中,则跳过
if k[0] == k[1] and k[0] in ("<bol>", "<eol>"):
continue
# 错误信息字符串模板
err_str = "{}, mine: {}, gold: {}"
# 断言自己实现的对象中的计数与金标准对象中的计数相等
assert mine.counts[N][k] == gold.counts[N][k], err_str.format(
k, mine.counts[N][k], gold.counts[N][k]
)
# 计算自己实现的对象中 k 的对数概率
M = mine.log_prob(k, N)
# 计算金标准对象中 k 的对数概率,并转换为以自然对数为底
G = gold.log_prob(k, N) / np.log2(np.e)
# 使用 np.testing.assert_allclose 检查 M 和 G 是否接近
np.testing.assert_allclose(M, G)
# 打印 "PASSED"
print("PASSED")
numpy-mlnumpy_mlteststest_nn.py
代码语言:javascript
复制# 禁用 flake8 检查
# 导入时间模块
import time
# 从 copy 模块中导入 deepcopy 函数
from copy import deepcopy
# 导入 numpy 模块,并将其命名为 np
import numpy as np
# 从 numpy.testing 模块中导入 assert_almost_equal 函数
from numpy.testing import assert_almost_equal
# 导入 sklearn.metrics 模块中的 log_loss 和 mean_squared_error 函数
from sklearn.metrics import log_loss, mean_squared_error
# 导入 scipy.special 模块中的 expit 函数,用于测试 sigmoid 函数
from scipy.special import expit
# 导入 torch 模块
import torch
# 从 torch.nn 模块中导入 nn 和 F
import torch.nn as nn
import torch.nn.functional as F
# 从 numpy_ml.neural_nets.utils 模块中导入一系列函数
from numpy_ml.neural_nets.utils import (
calc_pad_dims_2D,
conv2D_naive,
conv2D,
pad2D,
pad1D,
)
# 从 numpy_ml.utils.testing 模块中导入一系列函数
from numpy_ml.utils.testing import (
random_one_hot_matrix,
random_stochastic_matrix,
random_tensor,
)
# 从当前目录下的 nn_torch_models 模块中导入一系列类和函数
from .nn_torch_models import (
TFNCELoss,
WGAN_GP_tf,
torch_xe_grad,
torch_mse_grad,
TorchVAELoss,
TorchFCLayer,
TorchRNNCell,
TorchLSTMCell,
TorchAddLayer,
TorchWGANGPLoss,
TorchConv1DLayer,
TorchConv2DLayer,
TorchPool2DLayer,
TorchWavenetModule,
TorchMultiplyLayer,
TorchDeconv2DLayer,
TorchLayerNormLayer,
TorchBatchNormLayer,
TorchEmbeddingLayer,
TorchLinearActivation,
TorchSDPAttentionLayer,
TorchBidirectionalLSTM,
torch_gradient_generator,
TorchSkipConnectionConv,
TorchSkipConnectionIdentity,
TorchMultiHeadedAttentionModule,
)
#######################################################################
# Debug Formatter #
#######################################################################
# 定义一个函数,用于格式化错误信息
def err_fmt(params, golds, ix, warn_str=""):
# 获取当前参数和标签
mine, label = params[ix]
# 构建错误信息字符串
err_msg = "-" * 25 " DEBUG " "-" * 25 "n"
# 获取前一个参数和标签
prev_mine, prev_label = params[max(ix - 1, 0)]
# 添加前一个参数和标签的信息到错误信息字符串中
err_msg = "Mine (prev) [{}]:n{}nnTheirs (prev) [{}]:n{}".format(
prev_label, prev_mine, prev_label, golds[prev_label]
)
# 添加当前参数和标签的信息到错误信息字符串中
err_msg = "nnMine [{}]:n{}nnTheirs [{}]:n{}".format(
label, mine, label, golds[label]
)
# 添加警告信息到错误信息字符串中
err_msg = warn_str
err_msg = "n" "-" * 23 " END DEBUG " "-" * 23
# 返回错误信息字符串
return err_msg
#######################################################################
# Loss Functions #
#######################################################################
# 测试均方误差损失函数
def test_squared_error(N=15):
# 从 numpy_ml.neural_nets.losses 模块导入 SquaredError 类
from numpy_ml.neural_nets.losses import SquaredError
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 创建 SquaredError 实例
mine = SquaredError()
# 创建参考的均方误差损失函数
gold = (
lambda y, y_pred: mean_squared_error(y, y_pred)
* y_pred.shape[0]
* y_pred.shape[1]
* 0.5
)
# 确保当两个数组相等时得到 0
n_dims = np.random.randint(2, 100)
n_examples = np.random.randint(1, 1000)
y = y_pred = random_tensor((n_examples, n_dims))
assert_almost_equal(mine.loss(y, y_pred), gold(y, y_pred))
print("PASSED")
i = 1
while i < N:
n_dims = np.random.randint(2, 100)
n_examples = np.random.randint(1, 1000)
y = random_tensor((n_examples, n_dims))
y_pred = random_tensor((n_examples, n_dims))
assert_almost_equal(mine.loss(y, y_pred), gold(y, y_pred), decimal=5)
print("PASSED")
i = 1
# 测试交叉熵损失函数
def test_cross_entropy(N=15):
# 从 numpy_ml.neural_nets.losses 模块导入 CrossEntropy 类
from numpy_ml.neural_nets.losses import CrossEntropy
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 创建 CrossEntropy 实例
mine = CrossEntropy()
# 创建参考的对数损失函数
gold = log_loss
# 确保当两个数组相等时得到 0
n_classes = np.random.randint(2, 100)
n_examples = np.random.randint(1, 1000)
y = y_pred = random_one_hot_matrix(n_examples, n_classes)
assert_almost_equal(mine.loss(y, y_pred), gold(y, y_pred))
print("PASSED")
# 在随机输入上进行测试
i = 1
while i < N:
n_classes = np.random.randint(2, 100)
n_examples = np.random.randint(1, 1000)
y = random_one_hot_matrix(n_examples, n_classes)
y_pred = random_stochastic_matrix(n_examples, n_classes)
assert_almost_equal(mine.loss(y, y_pred), gold(y, y_pred, normalize=False))
print("PASSED")
i = 1
# 测试变分自编码器损失函数
def test_VAE_loss(N=15):
# 从numpy_ml.neural_nets.losses模块中导入VAELoss类
from numpy_ml.neural_nets.losses import VAELoss
# 设置随机种子为12345
np.random.seed(12345)
# 如果N为None,则将N设置为无穷大,否则保持不变
N = np.inf if N is None else N
# 计算浮点数的最小值
eps = np.finfo(float).eps
# 初始化循环变量i为1
i = 1
# 当i小于N时执行循环
while i < N:
# 生成1到10之间的随机整数作为样本数
n_ex = np.random.randint(1, 10)
# 生成2到10之间的随机整数作为特征维度
t_dim = np.random.randint(2, 10)
# 生成服从标准化的随机张量作为t_mean
t_mean = random_tensor([n_ex, t_dim], standardize=True)
# 生成服从标准化的随机张量,取对数绝对值后加上eps作为t_log_var
t_log_var = np.log(np.abs(random_tensor([n_ex, t_dim], standardize=True) eps))
# 生成2到40之间的随机整数作为图像列数和行数
im_cols, im_rows = np.random.randint(2, 40), np.random.randint(2, 40)
# 生成n_ex行im_rows*im_cols列的随机矩阵作为X和X_recon
X = np.random.rand(n_ex, im_rows * im_cols)
X_recon = np.random.rand(n_ex, im_rows * im_cols)
# 创建VAELoss对象
mine = VAELoss()
# 计算VAE损失
mine_loss = mine(X, X_recon, t_mean, t_log_var)
# 计算损失函数关于输入的梯度
dX_recon, dLogVar, dMean = mine.grad(X, X_recon, t_mean, t_log_var)
# 从TorchVAELoss对象中提取梯度
golds = TorchVAELoss().extract_grads(X, X_recon, t_mean, t_log_var)
# 将损失和梯度存储在params列表中
params = [
(mine_loss, "loss"),
(dX_recon, "dX_recon"),
(dLogVar, "dt_log_var"),
(dMean, "dt_mean"),
]
# 打印当前试验的信息
print("nTrial {}".format(i))
# 遍历params列表,进行梯度检验
for ix, (mine, label) in enumerate(params):
# 使用np.testing.assert_allclose函数检查梯度是否接近期望值
np.testing.assert_allclose(
mine,
golds[label],
err_msg=err_fmt(params, golds, ix),
rtol=0.1,
atol=1e-2,
)
# 打印通过梯度检验的信息
print("tPASSED {}".format(label))
# 更新循环变量i
i = 1
def test_WGAN_GP_loss(N=5):
# 导入 WGAN_GPLoss 类
from numpy_ml.neural_nets.losses import WGAN_GPLoss
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则设置为无穷大
N = np.inf if N is None else N
# 初始化循环计数器 i
i = 1
# 循环执行直到 i 达到 N
while i < N:
# 生成 lambda_ 值
lambda_ = np.random.randint(0, 10)
# 生成样本数 n_ex
n_ex = np.random.randint(1, 10)
# 生成特征数 n_feats
n_feats = np.random.randint(2, 10)
# 生成真实样本 Y_real
Y_real = random_tensor([n_ex], standardize=True)
# 生成虚假样本 Y_fake
Y_fake = random_tensor([n_ex], standardize=True)
# 生成梯度插值 gradInterp
gradInterp = random_tensor([n_ex, n_feats], standardize=True)
# 创建 WGAN_GPLoss 实例
mine = WGAN_GPLoss(lambda_=lambda_)
# 计算 C_loss
C_loss = mine(Y_fake, "C", Y_real, gradInterp)
# 计算 G_loss
G_loss = mine(Y_fake, "G")
# 计算 C_loss 的梯度
C_dY_fake, dY_real, dGradInterp = mine.grad(Y_fake, "C", Y_real, gradInterp)
# 计算 G_loss 的梯度
G_dY_fake = mine.grad(Y_fake, "G")
# 提取 TorchWGANGPLoss 类的梯度
golds = TorchWGANGPLoss(lambda_).extract_grads(Y_real, Y_fake, gradInterp)
# 如果梯度中存在 NaN 值,则跳过当前循环
if np.isnan(golds["C_dGradInterp"]).any():
continue
# 设置参数列表
params = [
(Y_real, "Y_real"),
(Y_fake, "Y_fake"),
(gradInterp, "gradInterp"),
(C_loss, "C_loss"),
(G_loss, "G_loss"),
(-dY_real, "C_dY_real"),
(-C_dY_fake, "C_dY_fake"),
(dGradInterp, "C_dGradInterp"),
(G_dY_fake, "G_dY_fake"),
]
# 打印当前试验的信息
print("nTrial {}".format(i))
# 遍历参数列表,进行断言比较
for ix, (mine, label) in enumerate(params):
np.testing.assert_allclose(
mine,
golds[label],
err_msg=err_fmt(params, golds, ix),
rtol=0.1,
atol=1e-2,
)
print("tPASSED {}".format(label))
# 更新循环计数器 i
i = 1
def test_NCELoss(N=1):
# 导入 NCELoss 类和 DiscreteSampler 类
from numpy_ml.neural_nets.losses import NCELoss
from numpy_ml.utils.data_structures import DiscreteSampler
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则设置为无穷大
N = np.inf if N is None else N
# 初始化循环计数器 i
i = 1
#######################################################################
# Loss Function Gradients #
# 导入所需的模块和函数
def test_squared_error_grad(N=15):
# 导入SquaredError损失函数和Tanh激活函数
from numpy_ml.neural_nets.losses import SquaredError
from numpy_ml.neural_nets.activations import Tanh
# 设置随机种子
np.random.seed(12345)
# 如果N为None,则将N设置为无穷大
N = np.inf if N is None else N
# 创建SquaredError对象和torch_mse_grad对象
mine = SquaredError()
gold = torch_mse_grad
act = Tanh()
# 初始化循环计数器i
i = 1
# 当i小于N时执行循环
while i < N:
# 随机生成维度和样本数
n_dims = np.random.randint(2, 100)
n_examples = np.random.randint(1, 1000)
y = random_tensor((n_examples, n_dims))
# 生成随机输入
z = random_tensor((n_examples, n_dims))
y_pred = act.fn(z)
# 断言SquaredError的梯度计算结果与torch_mse_grad的结果相近
assert_almost_equal(
mine.grad(y, y_pred, z, act), 0.5 * gold(y, z, torch.tanh), decimal=4
)
print("PASSED")
i = 1
# 定义测试交叉熵梯度的函数
def test_cross_entropy_grad(N=15):
# 导入CrossEntropy损失函数和Softmax层
from numpy_ml.neural_nets.losses import CrossEntropy
from numpy_ml.neural_nets.layers import Softmax
# 设置随机种子
np.random.seed(12345)
# 如果N为None,则将N设置为无穷大
N = np.inf if N is None else N
# 创建CrossEntropy对象和torch_xe_grad对象
mine = CrossEntropy()
gold = torch_xe_grad
sm = Softmax()
# 初始化循环计数器i
i = 1
# 当i小于N时执行循环
while i < N:
# 随机生成类别数和样本数
n_classes = np.random.randint(2, 100)
n_examples = np.random.randint(1, 1000)
y = random_one_hot_matrix(n_examples, n_classes)
# cross_entropy_gradient返回相对于z的梯度(而不是softmax(z))
z = random_tensor((n_examples, n_classes))
y_pred = sm.forward(z)
# 断言CrossEntropy的梯度计算结果与torch_xe_grad的结果相近
assert_almost_equal(mine.grad(y, y_pred), gold(y, z), decimal=5)
print("PASSED")
i = 1
#######################################################################
# Activations #
#######################################################################
# 定义测试Sigmoid激活函数的函数
def test_sigmoid_activation(N=15):
# 导入Sigmoid激活函数
from numpy_ml.neural_nets.activations import Sigmoid
# 设置随机种子
np.random.seed(12345)
# 如果N为None,则将N设置为无穷大
N = np.inf if N is None else N
# 创建Sigmoid对象和expit函数
mine = Sigmoid()
gold = expit
# 初始化循环计数器i
i = 0
# 当 i 小于 N 时执行循环
while i < N:
# 生成一个随机整数,表示张量的维度数量
n_dims = np.random.randint(1, 100)
# 生成一个随机张量,形状为 (1, n_dims)
z = random_tensor((1, n_dims))
# 断言自定义函数 mine.fn(z) 的输出与标准函数 gold(z) 的输出几乎相等
assert_almost_equal(mine.fn(z), gold(z))
# 打印 "PASSED" 表示测试通过
print("PASSED")
# i 自增
i = 1
# 测试 ELU 激活函数的功能
def test_elu_activation(N=15):
# 导入 ELU 激活函数
from numpy_ml.neural_nets.activations import ELU
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 初始化计数器 i
i = 0
# 循环 N 次
while i < N:
# 生成随机维度
n_dims = np.random.randint(1, 10)
# 生成随机张量
z = random_tensor((1, n_dims))
# 生成随机 alpha 值
alpha = np.random.uniform(0, 10)
# 创建 ELU 激活函数对象
mine = ELU(alpha)
# 创建 PyTorch 中的 ELU 函数
gold = lambda z, a: F.elu(torch.from_numpy(z), alpha).numpy()
# 断言 ELU 函数的输出与 PyTorch 中的 ELU 函数的输出几乎相等
assert_almost_equal(mine.fn(z), gold(z, alpha))
# 打印 "PASSED"
print("PASSED")
# 更新计数器
i = 1
# 测试 Softmax 激活函数的功能
def test_softmax_activation(N=15):
# 导入 Softmax 层
from numpy_ml.neural_nets.layers import Softmax
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 创建 Softmax 层对象
mine = Softmax()
# 创建 PyTorch 中的 Softmax 函数
gold = lambda z: F.softmax(torch.FloatTensor(z), dim=1).numpy()
# 初始化计数器 i
i = 0
# 循环 N 次
while i < N:
# 生成随机维度
n_dims = np.random.randint(1, 100)
# 生成随机概率矩阵
z = random_stochastic_matrix(1, n_dims)
# 断言 Softmax 函数的输出与 PyTorch 中的 Softmax 函数的输出几乎相等
assert_almost_equal(mine.forward(z), gold(z))
# 打印 "PASSED"
print("PASSED")
# 更新计数器
i = 1
# 测试 ReLU 激活函数的功能
def test_relu_activation(N=15):
# 导入 ReLU 激活函数
from numpy_ml.neural_nets.activations import ReLU
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 创建 ReLU 激活函数对象
mine = ReLU()
# 创建 PyTorch 中的 ReLU 函数
gold = lambda z: F.relu(torch.FloatTensor(z)).numpy()
# 初始化计数器 i
i = 0
# 循环 N 次
while i < N:
# 生成随机维度
n_dims = np.random.randint(1, 100)
# 生成随机概率矩阵
z = random_stochastic_matrix(1, n_dims)
# 断言 ReLU 函数的输出与 PyTorch 中的 ReLU 函数的输出几乎相等
assert_almost_equal(mine.fn(z), gold(z))
# 打印 "PASSED"
print("PASSED")
# 更新计数器
i = 1
# 测试 SoftPlus 激活函数的功能
def test_softplus_activation(N=15):
# 导入 SoftPlus 激活函数
from numpy_ml.neural_nets.activations import SoftPlus
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 创建 SoftPlus 激活函数对象
mine = SoftPlus()
# 创建 PyTorch 中的 SoftPlus 函数
gold = lambda z: F.softplus(torch.FloatTensor(z)).numpy()
# 初始化计数器 i
i = 0
# 循环 N 次
while i < N:
# 生成随机维度
n_dims = np.random.randint(1, 100)
# 生成随机概率矩阵
z = random_stochastic_matrix(1, n_dims)
# 断言 SoftPlus 函数的输出与 PyTorch 中的 SoftPlus 函数的输出几乎相等
assert_almost_equal(mine.fn(z), gold(z))
# 打印 "PASSED"
print("PASSED")
# 更新计数器
i = 1
#######################################################################
# Activation Gradients #
# 导入所需的库和模块
def test_sigmoid_grad(N=15):
# 从 numpy_ml.neural_nets.activations 模块中导入 Sigmoid 类
from numpy_ml.neural_nets.activations import Sigmoid
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 创建 Sigmoid 实例 mine 和 torch 中的 sigmoid 梯度函数实例 gold
mine = Sigmoid()
gold = torch_gradient_generator(torch.sigmoid)
# 初始化计数器 i
i = 0
# 循环执行 N 次
while i < N:
# 生成随机的样本数和维度
n_ex = np.random.randint(1, 100)
n_dims = np.random.randint(1, 100)
# 生成随机的张量 z
z = random_tensor((n_ex, n_dims))
# 断言 Sigmoid 实例的梯度和 torch 中的 sigmoid 梯度函数的结果几乎相等
assert_almost_equal(mine.grad(z), gold(z))
# 打印 "PASSED"
print("PASSED")
# 更新计数器
i = 1
# 类似上面的注释,以下函数 test_elu_grad, test_tanh_grad, test_relu_grad 的注释内容相同,只是激活函数不同
# 测试 Softmax 层的梯度计算
def test_softmax_grad(N=15):
# 导入 Softmax 层和部分函数
from numpy_ml.neural_nets.layers import Softmax
from functools import partial
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则设置为无穷大
N = np.inf if N is None else N
# 创建 Softmax 函数的偏函数
p_soft = partial(F.softmax, dim=1)
# 生成 Torch 梯度函数
gold = torch_gradient_generator(p_soft)
# 初始化计数器 i
i = 0
# 循环直到达到 N 次
while i < N:
# 创建 Softmax 层实例
mine = Softmax()
# 随机生成样本数和维度
n_ex = np.random.randint(1, 3)
n_dims = np.random.randint(1, 50)
# 生成随机张量
z = random_tensor((n_ex, n_dims), standardize=True)
# 前向传播
out = mine.forward(z)
# 断言梯度计算结果准确性
assert_almost_equal(
gold(z),
mine.backward(np.ones_like(out)),
err_msg="Theirs:n{}nnMine:n{}n".format(
gold(z), mine.backward(np.ones_like(out))
),
decimal=3,
)
# 打印测试通过信息
print("PASSED")
i = 1
# 测试 Softplus 层的梯度计算
def test_softplus_grad(N=15):
# 导入 Softplus 层
from numpy_ml.neural_nets.activations import SoftPlus
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则设置为无穷大
N = np.inf if N is None else N
# 创建 Softplus 层实例
mine = SoftPlus()
# 生成 Torch 梯度函数
gold = torch_gradient_generator(F.softplus)
# 初始化计数器 i
i = 0
# 循环直到达到 N 次
while i < N:
# 随机生成样本数和维度
n_ex = np.random.randint(1, 100)
n_dims = np.random.randint(1, 100)
# 生成随机张量
z = random_tensor((n_ex, n_dims), standardize=True)
# 断言梯度计算结果准确性
assert_almost_equal(mine.grad(z), gold(z))
# 打印测试通过信息
print("PASSED")
i = 1
#######################################################################
# Layers #
#######################################################################
# 测试全连接层
def test_FullyConnected(N=15):
# 导入全连接层和激活函数
from numpy_ml.neural_nets.layers import FullyConnected
from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则设置为无穷大
N = np.inf if N is None else N
# 定义激活函数列表
acts = [
(Tanh(), nn.Tanh(), "Tanh"),
(Sigmoid(), nn.Sigmoid(), "Sigmoid"),
(ReLU(), nn.ReLU(), "ReLU"),
(Affine(), TorchLinearActivation(), "Affine"),
]
# 初始化计数器 i
i = 1
# 当 i 小于 N 1 时执行循环
while i < N 1:
# 生成随机整数,作为外部神经元、内部神经元和输出神经元的数量
n_ex = np.random.randint(1, 100)
n_in = np.random.randint(1, 100)
n_out = np.random.randint(1, 100)
# 生成随机张量 X,形状为 (n_ex, n_in),并进行标准化处理
X = random_tensor((n_ex, n_in), standardize=True)
# 随机选择一个激活函数
act_fn, torch_fn, act_fn_name = acts[np.random.randint(0, len(acts))]
# 初始化全连接层 L1,设置输出神经元数量和激活函数
L1 = FullyConnected(n_out=n_out, act_fn=act_fn)
# 前向传播
y_pred = L1.forward(X)
# 反向传播
dLdy = np.ones_like(y_pred)
dLdX = L1.backward(dLdy)
# 获取标准梯度
gold_mod = TorchFCLayer(n_in, n_out, torch_fn, L1.parameters)
golds = gold_mod.extract_grads(X)
# 定义参数列表,包括输入 X、预测值 y、权重 W、偏置 b、损失对预测值的梯度 dLdy、权重梯度 dLdW、偏置梯度 dLdB、输入梯度 dLdX
params = [
(L1.X[0], "X"),
(y_pred, "y"),
(L1.parameters["W"].T, "W"),
(L1.parameters["b"], "b"),
(dLdy, "dLdy"),
(L1.gradients["W"].T, "dLdW"),
(L1.gradients["b"], "dLdB"),
(dLdX, "dLdX"),
]
# 打印当前试验的信息和激活函数名称
print("nTrial {}nact_fn={}".format(i, act_fn_name))
# 遍历参数列表,逐个比较计算得到的梯度和标准梯度是否接近
for ix, (mine, label) in enumerate(params):
assert_almost_equal(
mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=3
)
print("tPASSED {}".format(label))
# 更新试验次数
i = 1
# 测试 Embedding 层的功能,包括前向传播和反向传播
def test_Embedding(N=15):
# 从 numpy_ml.neural_nets.layers 导入 Embedding 模块
from numpy_ml.neural_nets.layers import Embedding
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 初始化计数器 i
i = 1
# 当 i 小于 N 1 时循环
while i < N 1:
# 随机生成词汇表大小
vocab_size = np.random.randint(1, 2000)
# 随机生成示例数
n_ex = np.random.randint(1, 100)
# 随机生成输入维度
n_in = np.random.randint(1, 100)
# 随机生成嵌入维度
emb_dim = np.random.randint(1, 100)
# 随机生成输入数据 X
X = np.random.randint(0, vocab_size, (n_ex, n_in))
# 初始化 Embedding 层
L1 = Embedding(n_out=emb_dim, vocab_size=vocab_size)
# 前向传播
y_pred = L1.forward(X)
# 反向传播
dLdy = np.ones_like(y_pred)
L1.backward(dLdy)
# 获取标准梯度
gold_mod = TorchEmbeddingLayer(vocab_size, emb_dim, L1.parameters)
golds = gold_mod.extract_grads(X)
# 定义参数列表
params = [
(L1.X[0], "X"),
(y_pred, "y"),
(L1.parameters["W"], "W"),
(dLdy, "dLdy"),
(L1.gradients["W"], "dLdW"),
]
# 打印测试结果
print("nTrial {}".format(i))
for ix, (mine, label) in enumerate(params):
assert_almost_equal(
mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=3
)
print("tPASSED {}".format(label))
i = 1
# 测试 BatchNorm1D 层的功能
def test_BatchNorm1D(N=15):
# 从 numpy_ml.neural_nets.layers 导入 BatchNorm1D 模块
from numpy_ml.neural_nets.layers import BatchNorm1D
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 重新设置随机种子
np.random.seed(12345)
# 初始化计数器 i
i = 1
# 当 i 小于 N 1 时执行循环
while i < N 1:
# 生成一个随机整数,范围在[2, 1000)
n_ex = np.random.randint(2, 1000)
# 生成一个随机整数,范围在[1, 1000)
n_in = np.random.randint(1, 1000)
# 生成一个随机的张量,形状为(n_ex, n_in),并进行标准化处理
X = random_tensor((n_ex, n_in), standardize=True)
# 初始化 BatchNorm1D 层
L1 = BatchNorm1D()
# 前向传播
y_pred = L1.forward(X)
# 反向传播
dLdy = np.ones_like(y_pred)
dLdX = L1.backward(dLdy)
# 获取标准梯度
gold_mod = TorchBatchNormLayer(
n_in, L1.parameters, "1D", epsilon=L1.epsilon, momentum=L1.momentum
)
golds = gold_mod.extract_grads(X)
# 定义参数列表
params = [
(L1.X[0], "X"),
(y_pred, "y"),
(L1.parameters["scaler"].T, "scaler"),
(L1.parameters["intercept"], "intercept"),
(L1.parameters["running_mean"], "running_mean"),
# (L1.parameters["running_var"], "running_var"),
(L1.gradients["scaler"], "dLdScaler"),
(L1.gradients["intercept"], "dLdIntercept"),
(dLdX, "dLdX"),
]
# 打印当前试验的信息
print("Trial {}".format(i))
# 遍历参数列表,逐个进行梯度检验
for ix, (mine, label) in enumerate(params):
assert_almost_equal(
mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=1
)
print("tPASSED {}".format(label))
# 更新试验次数
i = 1
# 定义一个测试函数,用于测试 LayerNorm1D 层
def test_LayerNorm1D(N=15):
# 从 numpy_ml.neural_nets.layers 模块导入 LayerNorm1D 类
from numpy_ml.neural_nets.layers import LayerNorm1D
# 如果 N 为 None,则将其设置为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 初始化循环计数器 i
i = 1
# 当 i 小于 N 1 时循环执行以下代码块
while i < N 1:
# 生成随机的样本数和输入特征数
n_ex = np.random.randint(2, 1000)
n_in = np.random.randint(1, 1000)
# 生成随机的输入数据 X
X = random_tensor((n_ex, n_in), standardize=True)
# 初始化 LayerNorm1D 层
L1 = LayerNorm1D()
# 前向传播
y_pred = L1.forward(X)
# 反向传播
dLdy = np.ones_like(y_pred)
dLdX = L1.backward(dLdy)
# 获取标准梯度
gold_mod = TorchLayerNormLayer(n_in, L1.parameters, "1D", epsilon=L1.epsilon)
golds = gold_mod.extract_grads(X)
# 定义参数列表
params = [
(L1.X[0], "X"),
(y_pred, "y"),
(L1.parameters["scaler"].T, "scaler"),
(L1.parameters["intercept"], "intercept"),
(L1.gradients["scaler"], "dLdScaler"),
(L1.gradients["intercept"], "dLdIntercept"),
(dLdX, "dLdX"),
]
# 打印当前试验的编号
print("Trial {}".format(i))
# 遍历参数列表,比较计算得到的梯度和标准梯度是否接近
for ix, (mine, label) in enumerate(params):
assert_almost_equal(
mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=3
)
print("tPASSED {}".format(label))
# 更新循环计数器
i = 1
# 定义一个测试函数,用于测试 LayerNorm2D 层
def test_LayerNorm2D(N=15):
# 从 numpy_ml.neural_nets.layers 模块导入 LayerNorm2D 类
from numpy_ml.neural_nets.layers import LayerNorm2D
# 如果 N 为 None,则将其设置为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 初始化循环计数器 i
i = 1
# 当 i 小于 N 1 时循环执行以下代码块
while i < N 1:
# 生成一个随机整数,范围在 [2, 10)
n_ex = np.random.randint(2, 10)
# 生成一个随机整数,范围在 [1, 10)
in_rows = np.random.randint(1, 10)
# 生成一个随机整数,范围在 [1, 10)
in_cols = np.random.randint(1, 10)
# 生成一个随机整数,范围在 [1, 3)
n_in = np.random.randint(1, 3)
# 初始化 LayerNorm2D 层
X = random_tensor((n_ex, in_rows, in_cols, n_in), standardize=True)
L1 = LayerNorm2D()
# 前向传播
y_pred = L1.forward(X)
# 标准损失函数
dLdy = np.ones_like(X)
dLdX = L1.backward(dLdy)
# 获取标准梯度
gold_mod = TorchLayerNormLayer(
[n_in, in_rows, in_cols], L1.parameters, mode="2D", epsilon=L1.epsilon
)
golds = gold_mod.extract_grads(X, Y_true=None)
# 定义参数列表
params = [
(L1.X[0], "X"),
(L1.hyperparameters["epsilon"], "epsilon"),
(L1.parameters["scaler"], "scaler"),
(L1.parameters["intercept"], "intercept"),
(y_pred, "y"),
(L1.gradients["scaler"], "dLdScaler"),
(L1.gradients["intercept"], "dLdIntercept"),
(dLdX, "dLdX"),
]
# 打印当前试验的信息
print("Trial {}".format(i))
# 遍历参数列表,逐个进行断言比较
for ix, (mine, label) in enumerate(params):
assert_almost_equal(
mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=3
)
# 打印通过断言的参数信息
print("tPASSED {}".format(label))
# 更新循环变量 i
i = 1
# 定义一个测试 MultiplyLayer 的函数,可以指定测试次数 N,默认为 15
def test_MultiplyLayer(N=15):
# 导入所需的模块和类
from numpy_ml.neural_nets.layers import Multiply
from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 定义激活函数列表
acts = [
(Tanh(), nn.Tanh(), "Tanh"),
(Sigmoid(), nn.Sigmoid(), "Sigmoid"),
(ReLU(), nn.ReLU(), "ReLU"),
(Affine(), TorchLinearActivation(), "Affine"),
]
# 初始化计数器 i
i = 1
# 循环进行 N 次测试
while i < N 1:
# 初始化输入数据列表 Xs
Xs = []
# 生成随机数,作为样本数和输入维度
n_ex = np.random.randint(1, 100)
n_in = np.random.randint(1, 100)
n_entries = np.random.randint(2, 5)
# 生成 n_entries 个随机张量,加入 Xs 列表
for _ in range(n_entries):
Xs.append(random_tensor((n_ex, n_in), standardize=True))
# 随机选择一个激活函数
act_fn, torch_fn, act_fn_name = acts[np.random.randint(0, len(acts))]
# 初始化 Multiply 层
L1 = Multiply(act_fn)
# 前向传播
y_pred = L1.forward(Xs)
# 反向传播
dLdy = np.ones_like(y_pred)
dLdXs = L1.backward(dLdy)
# 获取标准梯度
gold_mod = TorchMultiplyLayer(torch_fn)
golds = gold_mod.extract_grads(Xs)
# 构建参数列表
params = [(Xs, "Xs"), (y_pred, "Y")]
params.extend(
[(dldxi, "dLdX{}".format(i 1)) for i, dldxi in enumerate(dLdXs)]
)
# 打印测试结果
print("nTrial {}".format(i))
print("n_ex={}, n_in={}".format(n_ex, n_in))
print("n_entries={}, act_fn={}".format(n_entries, str(act_fn)))
for ix, (mine, label) in enumerate(params):
# 断言近似相等
assert_almost_equal(
mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=1
)
print("tPASSED {}".format(label))
i = 1
# 定义一个测试 AddLayer 的函数,可以指定测试次数 N,默认为 15
def test_AddLayer(N=15):
# 导入所需的模块和类
from numpy_ml.neural_nets.layers import Add
from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 定义不同激活函数的元组列表
acts = [
(Tanh(), nn.Tanh(), "Tanh"),
(Sigmoid(), nn.Sigmoid(), "Sigmoid"),
(ReLU(), nn.ReLU(), "ReLU"),
(Affine(), TorchLinearActivation(), "Affine"),
]
# 初始化循环计数器
i = 1
# 循环执行 N 次
while i < N 1:
# 初始化输入数据列表
Xs = []
# 生成随机的样本数和输入维度
n_ex = np.random.randint(1, 100)
n_in = np.random.randint(1, 100)
# 生成随机的输入数据条目数
n_entries = np.random.randint(2, 5)
# 生成随机的输入数据并添加到 Xs 列表中
for _ in range(n_entries):
Xs.append(random_tensor((n_ex, n_in), standardize=True))
# 随机选择激活函数
act_fn, torch_fn, act_fn_name = acts[np.random.randint(0, len(acts))]
# 初始化 Add 层
L1 = Add(act_fn)
# 前向传播
y_pred = L1.forward(Xs)
# 反向传播
dLdy = np.ones_like(y_pred)
dLdXs = L1.backward(dLdy)
# 获取标准梯度
gold_mod = TorchAddLayer(torch_fn)
golds = gold_mod.extract_grads(Xs)
# 构建参数列表
params = [(Xs, "Xs"), (y_pred, "Y")]
params.extend(
[(dldxi, "dLdX{}".format(i 1)) for i, dldxi in enumerate(dLdXs)]
)
# 打印当前试验信息
print("nTrial {}".format(i))
print("n_ex={}, n_in={}".format(n_ex, n_in))
print("n_entries={}, act_fn={}".format(n_entries, str(act_fn)))
# 遍历参数列表,进行梯度检验
for ix, (mine, label) in enumerate(params):
assert_almost_equal(
mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=1
)
print("tPASSED {}".format(label))
# 更新循环计数器
i = 1
# 定义测试 BatchNorm2D 的函数,参数 N 为测试次数,默认为 15
def test_BatchNorm2D(N=15):
# 导入 BatchNorm2D 模块
from numpy_ml.neural_nets.layers import BatchNorm2D
# 如果 N 为 None,则将其设为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 初始化循环计数器 i
i = 1
# 循环执行 N 次测试
while i < N 1:
# 生成随机的样本数、输入行数、输入列数和输入通道数
n_ex = np.random.randint(2, 10)
in_rows = np.random.randint(1, 10)
in_cols = np.random.randint(1, 10)
n_in = np.random.randint(1, 3)
# 初始化 BatchNorm2D 层
X = random_tensor((n_ex, in_rows, in_cols, n_in), standardize=True)
L1 = BatchNorm2D()
# 前向传播
y_pred = L1.forward(X)
# 标准损失函数
dLdy = np.ones_like(X)
dLdX = L1.backward(dLdy)
# 获取标准梯度
gold_mod = TorchBatchNormLayer(
n_in, L1.parameters, mode="2D", epsilon=L1.epsilon, momentum=L1.momentum
)
golds = gold_mod.extract_grads(X, Y_true=None)
# 定义参数列表
params = [
(L1.X[0], "X"),
(L1.hyperparameters["momentum"], "momentum"),
(L1.hyperparameters["epsilon"], "epsilon"),
(L1.parameters["scaler"].T, "scaler"),
(L1.parameters["intercept"], "intercept"),
(L1.parameters["running_mean"], "running_mean"),
# (L1.parameters["running_var"], "running_var"),
(y_pred, "y"),
(L1.gradients["scaler"], "dLdScaler"),
(L1.gradients["intercept"], "dLdIntercept"),
(dLdX, "dLdX"),
]
# 打印当前测试的序号
print("Trial {}".format(i))
# 遍历参数列表,逐个进行断言比较
for ix, (mine, label) in enumerate(params):
assert_almost_equal(
mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=3
)
print("tPASSED {}".format(label))
# 更新循环计数器
i = 1
# 定义测试 RNNCell 的函数,参数 N 为测试次数,默认为 15
def test_RNNCell(N=15):
# 导入 RNNCell 模块
from numpy_ml.neural_nets.layers import RNNCell
# 如果 N 为 None,则将其设为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 初始化循环计数器 i
i = 1
# 循环执行 N 次
while i < N 1:
# 生成随机数,表示外部输入、内部状态、输出、时间步数
n_ex = np.random.randint(1, 10)
n_in = np.random.randint(1, 10)
n_out = np.random.randint(1, 10)
n_t = np.random.randint(1, 10)
# 生成随机张量 X
X = random_tensor((n_ex, n_in, n_t), standardize=True)
# 初始化 RNN 层
L1 = RNNCell(n_out=n_out)
# 前向传播
y_preds = []
for t in range(n_t):
y_pred = L1.forward(X[:, :, t])
y_preds = [y_pred]
# 反向传播
dLdX = []
dLdAt = np.ones_like(y_preds[t])
for t in reversed(range(n_t)):
dLdXt = L1.backward(dLdAt)
dLdX.insert(0, dLdXt)
dLdX = np.dstack(dLdX)
# 获取标准梯度
gold_mod = TorchRNNCell(n_in, n_out, L1.parameters)
golds = gold_mod.extract_grads(X)
# 定义参数列表
params = [
(X, "X"),
(np.array(y_preds), "y"),
(L1.parameters["ba"].T, "ba"),
(L1.parameters["bx"].T, "bx"),
(L1.parameters["Wax"].T, "Wax"),
(L1.parameters["Waa"].T, "Waa"),
(L1.gradients["ba"].T, "dLdBa"),
(L1.gradients["bx"].T, "dLdBx"),
(L1.gradients["Wax"].T, "dLdWax"),
(L1.gradients["Waa"].T, "dLdWaa"),
(dLdX, "dLdX"),
]
# 打印当前试验次数
print("Trial {}".format(i))
# 遍历参数列表,进行梯度检验
for ix, (mine, label) in enumerate(params):
np.testing.assert_allclose(
mine,
golds[label],
err_msg=err_fmt(params, golds, ix),
atol=1e-3,
rtol=1e-3,
)
print("tPASSED {}".format(label))
i = 1
# 定义一个测试函数,用于测试 Conv2D 层的功能
def test_Conv2D(N=15):
# 从相应的模块中导入需要的类
from numpy_ml.neural_nets.layers import Conv2D
from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine
# 如果 N 为 None,则将其设置为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 定义激活函数列表
acts = [
(Tanh(), nn.Tanh(), "Tanh"),
(Sigmoid(), nn.Sigmoid(), "Sigmoid"),
(ReLU(), nn.ReLU(), "ReLU"),
(Affine(), TorchLinearActivation(), "Affine"),
]
# 初始化计数器 i
i = 1
# 定义一个测试函数,用于测试 DPAttention 层的功能
def test_DPAttention(N=15):
# 从相应的模块中导入 DotProductAttention 类
from numpy_ml.neural_nets.layers import DotProductAttention
# 如果 N 为 None,则将其设置为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 初始化计数器 i
i = 1
# 当 i 小于 N 1 时循环
while i < N 1:
# 生成随机数
n_ex = np.random.randint(1, 10)
d_k = np.random.randint(1, 100)
d_v = np.random.randint(1, 100)
# 生成随机张量 Q, K, V
Q = random_tensor((n_ex, d_k), standardize=True)
K = random_tensor((n_ex, d_k), standardize=True)
V = random_tensor((n_ex, d_v), standardize=True)
# 初始化 DotProductAttention 层
mine = DotProductAttention(scale=True, dropout_p=0)
# 前向传播
y_pred = mine.forward(Q, K, V)
# 反向传播
dLdy = np.ones_like(y_pred)
dLdQ, dLdK, dLdV = mine.backward(dLdy)
# 获取标准梯度
gold_mod = TorchSDPAttentionLayer()
golds = gold_mod.extract_grads(Q, K, V)
# 定义参数列表
params = [
(mine.X[0][0], "Q"),
(mine.X[0][1], "K"),
(mine.X[0][2], "V"),
(y_pred, "Y"),
(dLdV, "dLdV"),
(dLdK, "dLdK"),
(dLdQ, "dLdQ"),
]
# 打印测试结果
print("nTrial {}".format(i))
print("n_ex={} d_k={} d_v={}".format(n_ex, d_k, d_v))
for ix, (mine, label) in enumerate(params):
assert_almost_equal(
mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=4
)
print("tPASSED {}".format(label))
i = 1
# 定义一个测试函数,用于测试 Conv1D 层的功能
def test_Conv1D(N=15):
# 从相应的模块中导入 Conv1D 类
from numpy_ml.neural_nets.layers import Conv1D
# 从指定路径导入 Tanh、ReLU、Sigmoid 和 Affine 激活函数以及 Affine 层
from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine
# 如果 N 为 None,则将 N 设置为正无穷
N = np.inf if N is None else N
# 设置随机种子为 12345
np.random.seed(12345)
# 定义激活函数列表,每个元素包含自定义的激活函数对象、PyTorch 中对应的激活函数对象和激活函数名称
acts = [
(Tanh(), nn.Tanh(), "Tanh"),
(Sigmoid(), nn.Sigmoid(), "Sigmoid"),
(ReLU(), nn.ReLU(), "ReLU"),
(Affine(), TorchLinearActivation(), "Affine"),
]
# 初始化计数器 i 为 1
i = 1
# 循环执行 N 次
while i < N 1:
# 生成随机整数,表示例外数量
n_ex = np.random.randint(1, 10)
# 生成随机整数,表示输入序列长度
l_in = np.random.randint(1, 10)
# 生成随机整数,表示输入输出通道数
n_in, n_out = np.random.randint(1, 3), np.random.randint(1, 3)
# 生成随机整数,表示卷积核宽度
f_width = min(l_in, np.random.randint(1, 5))
# 生成随机整数,表示填充和步长
p, s = np.random.randint(0, 5), np.random.randint(1, 3)
# 生成随机整数,表示膨胀率
d = np.random.randint(0, 5)
# 计算卷积核的参数数量
fc = f_width * (d 1) - d
# 计算输出序列长度
l_out = int(1 (l_in 2 * p - fc) / s)
# 如果输出序列长度小于等于0,则跳过本次循环
if l_out <= 0:
continue
# 生成随机张量作为输入数据
X = random_tensor((n_ex, l_in, n_in), standardize=True)
# 随机选择激活函数
act_fn, torch_fn, act_fn_name = acts[np.random.randint(0, len(acts))]
# 初始化一维卷积层
L1 = Conv1D(
out_ch=n_out,
kernel_width=f_width,
act_fn=act_fn,
pad=p,
stride=s,
dilation=d,
)
# 前向传播
y_pred = L1.forward(X)
# 反向传播
dLdy = np.ones_like(y_pred)
dLdX = L1.backward(dLdy)
# 获取标准梯度
gold_mod = TorchConv1DLayer(
n_in, n_out, torch_fn, L1.parameters, L1.hyperparameters
)
golds = gold_mod.extract_grads(X)
# 定义参数列表
params = [
(L1.X[0], "X"),
(y_pred, "y"),
(L1.parameters["W"], "W"),
(L1.parameters["b"], "b"),
(L1.gradients["W"], "dLdW"),
(L1.gradients["b"], "dLdB"),
(dLdX, "dLdX"),
]
# 打印当前试验信息
print("nTrial {}".format(i))
print("pad={}, stride={}, f_width={}, n_ex={}".format(p, s, f_width, n_ex))
print("l_in={}, n_in={}".format(l_in, n_in))
print("l_out={}, n_out={}".format(l_out, n_out))
print("dilation={}".format(d))
# 遍历参数列表,检查梯度是否正确
for ix, (mine, label) in enumerate(params):
assert_almost_equal(
mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=4
)
print("tPASSED {}".format(label))
i = 1
# 定义用于测试 Deconv2D 层的函数,N 默认为 15
def test_Deconv2D(N=15):
# 导入必要的模块和类
from numpy_ml.neural_nets.layers import Deconv2D
from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 定义激活函数列表,每个元素包含激活函数对象、对应的 Torch 模块、激活函数名称
acts = [
(Tanh(), nn.Tanh(), "Tanh"),
(Sigmoid(), nn.Sigmoid(), "Sigmoid"),
(ReLU(), nn.ReLU(), "ReLU"),
(Affine(), TorchLinearActivation(), "Affine"),
]
# 初始化计数器
i = 1
# 定义用于测试 Pool2D 层的函数,N 默认为 15
def test_Pool2D(N=15):
# 导入必要的模块和类
from numpy_ml.neural_nets.layers import Pool2D
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 初始化计数器
i = 1
# 循环执行直到 i 大于 N
while i < N 1:
# 生成随机整数,表示样本数
n_ex = np.random.randint(1, 10)
# 生成随机整数,表示输入数据的行数
in_rows = np.random.randint(1, 10)
# 生成随机整数,表示输入数据的列数
in_cols = np.random.randint(1, 10)
# 生成随机整数,表示输入数据的通道数
n_in = np.random.randint(1, 3)
# 生成随机的过滤器形状
f_shape = (
min(in_rows, np.random.randint(1, 5)),
min(in_cols, np.random.randint(1, 5)),
)
# 生成随机的填充值和步长
p, s = np.random.randint(0, max(1, min(f_shape) // 2)), np.random.randint(1, 3)
# 设置池化层的模式为"average"
mode = "average"
# 计算输出数据的行数和列数
out_rows = int(1 (in_rows 2 * p - f_shape[0]) / s)
out_cols = int(1 (in_cols 2 * p - f_shape[1]) / s)
# 生成随机输入数据
X = random_tensor((n_ex, in_rows, in_cols, n_in), standardize=True)
print("nmode: {}".format(mode))
print("pad={}, stride={}, f_shape={}, n_ex={}".format(p, s, f_shape, n_ex))
print("in_rows={}, in_cols={}, n_in={}".format(in_rows, in_cols, n_in))
print("out_rows={}, out_cols={}, n_out={}".format(out_rows, out_cols, n_in))
# 初始化 Pool2D 层
L1 = Pool2D(kernel_shape=f_shape, pad=p, stride=s, mode=mode)
# 前向传播
y_pred = L1.forward(X)
# 反向传播
dLdy = np.ones_like(y_pred)
dLdX = L1.backward(dLdy)
# 获取标准梯度
gold_mod = TorchPool2DLayer(n_in, L1.hyperparameters)
golds = gold_mod.extract_grads(X)
# 检查梯度是否正确
params = [(L1.X[0], "X"), (y_pred, "y"), (dLdX, "dLdX")]
for ix, (mine, label) in enumerate(params):
assert_almost_equal(
mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=4
)
print("tPASSED {}".format(label))
i = 1
# 定义一个测试 LSTMCell 的函数,N 默认为 15
def test_LSTMCell(N=15):
# 导入 LSTMCell 模块
from numpy_ml.neural_nets.layers import LSTMCell
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 设置随机种子为 12345
np.random.seed(12345)
# 初始化变量 i 为 1
i = 1
# 手动计算 vanilla RNN 参数的梯度
def grad_check_RNN(model, loss_func, param_name, n_t, X, epsilon=1e-7):
"""
Manual gradient calc for vanilla RNN parameters
"""
# 如果参数名为 "Ba" 或 "Bx",则将参数名转换为小写
if param_name in ["Ba", "Bx"]:
param_name = param_name.lower()
# 如果参数名为 "X" 或 "y",则返回 None
elif param_name in ["X", "y"]:
return None
# 复制原始参数,并初始化梯度为与原始参数相同形状的零矩阵
param_orig = model.parameters[param_name]
model.flush_gradients()
grads = np.zeros_like(param_orig)
# 遍历参数的每个元素
for flat_ix, val in enumerate(param_orig.flat):
param = deepcopy(param_orig)
md_ix = np.unravel_index(flat_ix, param.shape)
# 正向计算
y_preds_plus = []
param[md_ix] = val epsilon
model.parameters[param_name] = param
for t in range(n_t):
y_pred_plus = model.forward(X[:, :, t])
y_preds_plus = [y_pred_plus]
loss_plus = loss_func(y_preds_plus)
model.flush_gradients()
# 反向计算
y_preds_minus = []
param[md_ix] = val - epsilon
model.parameters[param_name] = param
for t in range(n_t):
y_pred_minus = model.forward(X[:, :, t])
y_preds_minus = [y_pred_minus]
loss_minus = loss_func(y_preds_minus)
model.flush_gradients()
# 计算梯度
grad = (loss_plus - loss_minus) / (2 * epsilon)
grads[md_ix] = grad
return grads.T
# 定义一个测试 MultiHeadedAttentionModule 的函数,N 默认为 15
def test_MultiHeadedAttentionModule(N=15):
# 导入 MultiHeadedAttentionModule 模块
from numpy_ml.neural_nets.modules import MultiHeadedAttentionModule
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 设置随机种子为 12345
np.random.seed(12345)
# 初始化变量 i 为 1
i = 1
# 定义一个测试 SkipConnectionIdentityModule 的函数,N 默认为 15
def test_SkipConnectionIdentityModule(N=15):
# 导入 SkipConnectionIdentityModule 模块
from numpy_ml.neural_nets.modules import SkipConnectionIdentityModule
# 从指定路径导入 Tanh、ReLU、Sigmoid 和 Affine 激活函数以及 Affine 层
from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine
# 如果 N 为 None,则将 N 设置为正无穷
N = np.inf if N is None else N
# 设置随机种子为 12345
np.random.seed(12345)
# 定义激活函数列表,每个元素包含自定义的激活函数对象、PyTorch 中对应的激活函数对象和激活函数名称
acts = [
(Tanh(), nn.Tanh(), "Tanh"),
(Sigmoid(), nn.Sigmoid(), "Sigmoid"),
(ReLU(), nn.ReLU(), "ReLU"),
(Affine(), TorchLinearActivation(), "Affine"),
]
# 初始化计数器 i 为 1
i = 1
# 测试 SkipConnectionConvModule 模块
def test_SkipConnectionConvModule(N=15):
# 导入需要的模块和激活函数
from numpy_ml.neural_nets.modules import SkipConnectionConvModule
from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine
# 如果 N 为 None,则设置为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 定义激活函数列表
acts = [
(Tanh(), nn.Tanh(), "Tanh"),
(Sigmoid(), nn.Sigmoid(), "Sigmoid"),
(ReLU(), nn.ReLU(), "ReLU"),
(Affine(), TorchLinearActivation(), "Affine"),
]
# 初始化计数器
i = 1
# 测试 BidirectionalLSTM 模块
def test_BidirectionalLSTM(N=15):
# 导入需要的模块
from numpy_ml.neural_nets.modules import BidirectionalLSTM
# 如果 N 为 None,则设置为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 初始化计数器
i = 1
# 测试 WaveNetModule 模块
def test_WaveNetModule(N=10):
# 导入需要的模块
from numpy_ml.neural_nets.modules import WavenetResidualModule
# 如果 N 为 None,则设置为无穷大
N = np.inf if N is None else N
# 设置随机种子
np.random.seed(12345)
# 初始化计数器
i = 1
#######################################################################
# Utils #
#######################################################################
# 测试 pad1D 函数
def test_pad1D(N=15):
# 导入需要的模块
from numpy_ml.neural_nets.layers import Conv1D
from .nn_torch_models import TorchCausalConv1d, torchify
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则设置为无穷大
N = np.inf if N is None else N
# 初始化计数器
i = 1
# 测试 conv 函数
def test_conv(N=15):
# 设置随机种子
np.random.seed(12345)
# 如果 N 为 None,则设置为无穷大
N = np.inf if N is None else N
# 初始化计数器
i = 0
# 当 i 小于 N 时,执行以下循环
while i < N:
# 生成一个介于 2 到 15 之间的随机整数,作为例子数量
n_ex = np.random.randint(2, 15)
# 生成一个介于 2 到 15 之间的随机整数,作为输入数据的行数
in_rows = np.random.randint(2, 15)
# 生成一个介于 2 到 15 之间的随机整数,作为输入数据的列数
in_cols = np.random.randint(2, 15)
# 生成一个介于 2 到 15 之间的随机整数,作为输入数据的通道数
in_ch = np.random.randint(2, 15)
# 生成一个介于 2 到 15 之间的随机整数,作为输出数据的通道数
out_ch = np.random.randint(2, 15)
# 生成一个随机的形状元组,元组中的元素为输入数据的行数和列数
f_shape = (
min(in_rows, np.random.randint(2, 10)),
min(in_cols, np.random.randint(2, 10)),
)
# 生成一个介于 1 到 3 之间的随机整数,作为卷积步长
s = np.random.randint(1, 3)
# 生成一个介于 0 到 5 之间的随机整数,作为填充大小
p = np.random.randint(0, 5)
# 生成一个随机的输入数据张量
X = np.random.rand(n_ex, in_rows, in_cols, in_ch)
# 对输入数据进行二维填充
X_pad, p = pad2D(X, p)
# 生成一个随机的权重张量
W = np.random.randn(f_shape[0], f_shape[1], in_ch, out_ch)
# 使用朴素的方法进行二维卷积操作,得到期望的输出
gold = conv2D_naive(X, W, s, p)
# 使用优化的方法进行二维卷积操作,得到实际的输出
mine = conv2D(X, W, s, p)
# 检查实际输出和期望输出是否几乎相等
np.testing.assert_almost_equal(mine, gold)
# 打印“PASSED”表示测试通过
print("PASSED")
# 更新循环变量 i
i = 1
# 模型部分
# 定义训练 Variational Autoencoder (VAE) 模型的函数
def fit_VAE():
# 导入所需的库和模块
# 用于测试
import tensorflow.keras.datasets.mnist as mnist
from numpy_ml.neural_nets.models.vae import BernoulliVAE
# 设置随机种子
np.random.seed(12345)
# 加载 MNIST 数据集
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# 将像素强度缩放到 [0, 1] 范围内
X_train = np.expand_dims(X_train.astype("float32") / 255.0, 3)
X_test = np.expand_dims(X_test.astype("float32") / 255.0, 3)
# 只使用前 128 * 1 个样本作为一个 batch
X_train = X_train[: 128 * 1]
# 创建 BernoulliVAE 实例
BV = BernoulliVAE()
# 训练 VAE 模型
BV.fit(X_train, n_epochs=1, verbose=False)
# 定义测试 Wasserstein GAN with Gradient Penalty (WGAN-GP) 模型的函数
def test_WGAN_GP(N=1):
# 导入 WGAN-GP 模型
from numpy_ml.neural_nets.models.wgan_gp import WGAN_GP
# 设置随机种子
np.random.seed(12345)
# 生成一个随机种子
ss = np.random.randint(0, 1000)
np.random.seed(ss)
# 如果 N 为 None,则设置为无穷大
N = np.inf if N is None else N
# 初始化 i 为 1
i = 1
numpy-mlnumpy_mlteststest_nn_activations.py
代码语言:javascript
复制# 禁用 flake8 检查
# 导入时间模块
import time
# 导入 numpy 模块并重命名为 np
import numpy as np
# 从 numpy.testing 模块中导入 assert_almost_equal 函数
from numpy.testing import assert_almost_equal
# 从 scipy.special 模块中导入 expit 函数
from scipy.special import expit
# 导入 torch 模块
import torch
# 从 torch.nn.functional 模块中导入 F
import torch.nn.functional as F
# 从 numpy_ml.utils.testing 模块中导入 random_stochastic_matrix 和 random_tensor 函数
from numpy_ml.utils.testing import random_stochastic_matrix, random_tensor
# 定义一个函数,用于生成 torch 梯度
def torch_gradient_generator(fn, **kwargs):
# 定义内部函数 get_grad,用于计算梯度
def get_grad(z):
# 将 numpy 数组 z 转换为 torch 变量,并设置 requires_grad 为 True
z1 = torch.autograd.Variable(torch.from_numpy(z), requires_grad=True)
# 调用传入的函数 fn 计算 z1 的值,并对结果求和
z2 = fn(z1, **kwargs).sum()
# 对 z2 进行反向传播
z2.backward()
# 获取 z1 的梯度,并转换为 numpy 数组返回
grad = z1.grad.numpy()
return grad
return get_grad
#######################################################################
# Debug Formatter #
#######################################################################
# 定义一个函数,用于格式化错误信息
def err_fmt(params, golds, ix, warn_str=""):
mine, label = params[ix]
err_msg = "-" * 25 " DEBUG " "-" * 25 "n"
prev_mine, prev_label = params[max(ix - 1, 0)]
err_msg = "Mine (prev) [{}]:n{}nnTheirs (prev) [{}]:n{}".format(
prev_label, prev_mine, prev_label, golds[prev_label]
)
err_msg = "nnMine [{}]:n{}nnTheirs [{}]:n{}".format(
label, mine, label, golds[label]
)
err_msg = warn_str
err_msg = "n" "-" * 23 " END DEBUG " "-" * 23
return err_msg
#######################################################################
# Test Suite #
#######################################################################
#
#
# def test_activations(N=50):
# print("Testing Sigmoid activation")
# time.sleep(1)
# test_sigmoid_activation(N)
# test_sigmoid_grad(N)
#
# # print("Testing Softmax activation")
# # time.sleep(1)
# # test_softmax_activation(N)
# # test_softmax_grad(N)
#
# print("Testing Tanh activation")
# time.sleep(1)
# test_tanh_grad(N)
#
# print("Testing ReLU activation")
# time.sleep(1)
# test_relu_activation(N)
# test_relu_grad(N)
#
# print("Testing ELU activation")
# time.sleep(1)
# test_elu_activation(N)
# test_elu_grad(N)
#
# print("Testing SELU activation")
# time.sleep(1)
# test_selu_activation(N)
# test_selu_grad(N)
#
# print("Testing LeakyRelu activation")
# time.sleep(1)
# test_leakyrelu_activation(N)
# test_leakyrelu_grad(N)
#
# print("Testing SoftPlus activation")
# time.sleep(1)
# test_softplus_activation(N)
# test_softplus_grad(N)
#
#######################################################################
# Activations #
#######################################################################
# 测试 Sigmoid 激活函数
def test_sigmoid_activation(N=50):
# 导入 Sigmoid 激活函数
from numpy_ml.neural_nets.activations import Sigmoid
# 如果 N 为 None,则设为无穷大
N = np.inf if N is None else N
# 创建 Sigmoid 激活函数对象
mine = Sigmoid()
# 创建 gold 函数对象,用于比较
gold = expit
i = 0
while i < N:
# 生成随机维度
n_dims = np.random.randint(1, 100)
# 生成随机张量
z = random_tensor((1, n_dims))
# 断言 Sigmoid 函数计算结果与 gold 函数计算结果几乎相等
assert_almost_equal(mine.fn(z), gold(z))
print("PASSED")
i = 1
# 测试 SoftPlus 激活函数
def test_softplus_activation(N=50):
# 导入 SoftPlus 激活函数
from numpy_ml.neural_nets.activations import SoftPlus
# 如果 N 为 None,则设为无穷大
N = np.inf if N is None else N
# 创建 SoftPlus 激活函数对象
mine = SoftPlus()
# 创建 gold 函数对象,用于比较
gold = lambda z: F.softplus(torch.FloatTensor(z)).numpy()
i = 0
while i < N:
# 生成随机维度
n_dims = np.random.randint(1, 100)
# 生成随机随机矩阵
z = random_stochastic_matrix(1, n_dims)
# 断言 SoftPlus 函数计算结果与 gold 函数计算结果几乎相等
assert_almost_equal(mine.fn(z), gold(z))
print("PASSED")
i = 1
# 测试 ELU 激活函数
def test_elu_activation(N=50):
# 导入 ELU 激活函数
from numpy_ml.neural_nets.activations import ELU
# 如果 N 为 None,则设为无穷大
N = np.inf if N is None else N
i = 0
while i < N:
# 生成随机维度
n_dims = np.random.randint(1, 10)
# 生成随机张量
z = random_tensor((1, n_dims))
# 生成随机 alpha 值
alpha = np.random.uniform(0, 10)
# 创建 ELU 激活函数对象
mine = ELU(alpha)
# 创建 gold 函数对象,用于比较
gold = lambda z, a: F.elu(torch.from_numpy(z), alpha).numpy()
# 断言 ELU 函数计算结果与 gold 函数计算结果几乎相等
assert_almost_equal(mine.fn(z), gold(z, alpha))
print("PASSED")
i = 1
# 测试 ReLU 激活函数的功能
def test_relu_activation(N=50):
# 导入 ReLU 激活函数
from numpy_ml.neural_nets.activations import ReLU
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 创建自定义的 ReLU 激活函数对象
mine = ReLU()
# 创建 PyTorch 中的 ReLU 激活函数对象
gold = lambda z: F.relu(torch.FloatTensor(z)).numpy()
# 初始化计数器 i
i = 0
# 循环执行 N 次
while i < N:
# 随机生成维度在 1 到 100 之间的随机数
n_dims = np.random.randint(1, 100)
# 生成一个随机的矩阵 z
z = random_stochastic_matrix(1, n_dims)
# 断言自定义的 ReLU 激活函数和 PyTorch 的 ReLU 激活函数的输出几乎相等
assert_almost_equal(mine.fn(z), gold(z))
# 打印 "PASSED"
print("PASSED")
# 更新计数器
i = 1
# 测试 SELU 激活函数的功能
def test_selu_activation(N=50):
# 导入 SELU 激活函数
from numpy_ml.neural_nets.activations import SELU
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 创建自定义的 SELU 激活函数对象
mine = SELU()
# 创建 PyTorch 中的 SELU 激活函数对象
gold = lambda z: F.selu(torch.FloatTensor(z)).numpy()
# 初始化计数器 i
i = 0
# 循环执行 N 次
while i < N:
# 随机生成维度在 1 到 100 之间的随机数
n_dims = np.random.randint(1, 100)
# 生成一个随机的矩阵 z
z = random_stochastic_matrix(1, n_dims)
# 断言自定义的 SELU 激活函数和 PyTorch 的 SELU 激活函数的输出几乎相等
assert_almost_equal(mine.fn(z), gold(z))
# 打印 "PASSED"
print("PASSED")
# 更新计数器
i = 1
# 测试 LeakyReLU 激活函数的功能
def test_leakyrelu_activation(N=50):
# 导入 LeakyReLU 激活函数
from numpy_ml.neural_nets.activations import LeakyReLU
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 初始化计数器 i
i = 0
# 循环执行 N 次
while i < N:
# 随机生成维度在 1 到 100 之间的随机数
n_dims = np.random.randint(1, 100)
# 生成一个随机的矩阵 z
z = random_stochastic_matrix(1, n_dims)
# 随机生成一个在 0 到 10 之间的 alpha 值
alpha = np.random.uniform(0, 10)
# 创建自定义的 LeakyReLU 激活函数对象
mine = LeakyReLU(alpha=alpha)
# 创建 PyTorch 中的 LeakyReLU 激活函数对象
gold = lambda z: F.leaky_relu(torch.FloatTensor(z), alpha).numpy()
# 断言自定义的 LeakyReLU 激活函数和 PyTorch 的 LeakyReLU 激活函数的输出几乎相等
assert_almost_equal(mine.fn(z), gold(z))
# 打印 "PASSED"
print("PASSED")
# 更新计数器
i = 1
# 测试 GELU 激活函数的功能
def test_gelu_activation(N=50):
# 导入 GELU 激活函数
from numpy_ml.neural_nets.activations import GELU
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 初始化计数器 i
i = 0
# 循环执行 N 次
while i < N:
# 随机生成维度在 1 到 100 之间的随机数
n_dims = np.random.randint(1, 100)
# 生成一个随机的矩阵 z
z = random_stochastic_matrix(1, n_dims)
# 随机选择是否使用近似计算
approx = np.random.choice([True, False])
# 创建自定义的 GELU 激活函数对象
mine = GELU(approximate=False)
mine_approx = GELU(approximate=True)
# 创建 PyTorch 中的 GELU 激活函数对象
gold = lambda z: F.gelu(torch.FloatTensor(z)).numpy()
# 断言自定义的 GELU 激活函数和 PyTorch 的 GELU 激活函数的输出在相对误差范围内接近
np.testing.assert_allclose(mine.fn(z), gold(z), rtol=1e-3)
# 断言自定义的 GELU 激活函数和近似计算的 GELU 激活函数的输出几乎相等
assert_almost_equal(mine.fn(z), mine_approx.fn(z))
# 打印 "PASSED"
print("PASSED")
# 更新计数器
i = 1
#######################################################################
# Activation Gradients #
#######################################################################
# 测试 Sigmoid 激活函数的梯度
def test_sigmoid_grad(N=50):
# 导入 Sigmoid 激活函数
from numpy_ml.neural_nets.activations import Sigmoid
# 如果 N 为 None,则设为无穷大
N = np.inf if N is None else N
# 创建 Sigmoid 激活函数对象
mine = Sigmoid()
# 创建 PyTorch 中 Sigmoid 激活函数的梯度函数
gold = torch_gradient_generator(torch.sigmoid)
i = 0
while i < N:
# 生成随机的样本数和维度
n_ex = np.random.randint(1, 100)
n_dims = np.random.randint(1, 100)
z = random_tensor((n_ex, n_dims))
# 断言 mine 的梯度与 gold 的梯度几乎相等
assert_almost_equal(mine.grad(z), gold(z))
print("PASSED")
i = 1
# 测试 ELU 激活函数的梯度
def test_elu_grad(N=50):
# 导入 ELU 激活函数
from numpy_ml.neural_nets.activations import ELU
# 如果 N 为 None,则设为无穷大
N = np.inf if N is None else N
i = 0
while i < N:
# 生成随机的样本数、维度和 alpha 值
n_ex = np.random.randint(1, 10)
n_dims = np.random.randint(1, 10)
alpha = np.random.uniform(0, 10)
z = random_tensor((n_ex, n_dims))
# 创建 ELU 激活函数对象和 PyTorch 中 ELU 激活函数的梯度函数
mine = ELU(alpha)
gold = torch_gradient_generator(F.elu, alpha=alpha)
# 断言 mine 的梯度与 gold 的梯度几乎相等
assert_almost_equal(mine.grad(z), gold(z), decimal=6)
print("PASSED")
i = 1
# 测试 Tanh 激活函数的梯度
def test_tanh_grad(N=50):
# 导入 Tanh 激活函数
from numpy_ml.neural_nets.activations import Tanh
# 如果 N 为 None,则设为无穷大
N = np.inf if N is None else N
# 创建 Tanh 激活函数对象
mine = Tanh()
# 创建 PyTorch 中 Tanh 激活函数的梯度函数
gold = torch_gradient_generator(torch.tanh)
i = 0
while i < N:
# 生成随机的样本数和维度
n_ex = np.random.randint(1, 100)
n_dims = np.random.randint(1, 100)
z = random_tensor((n_ex, n_dims))
# 断言 mine 的梯度与 gold 的梯度几乎相等
assert_almost_equal(mine.grad(z), gold(z))
print("PASSED")
i = 1
# 测试 ReLU 激活函数的梯度
def test_relu_grad(N=50):
# 导入 ReLU 激活函数
from numpy_ml.neural_nets.activations import ReLU
# 如果 N 为 None,则设为无穷大
N = np.inf if N is None else N
# 创建 ReLU 激活函数对象
mine = ReLU()
# 创建 PyTorch 中 ReLU 激活函数的梯度函数
gold = torch_gradient_generator(F.relu)
i = 0
while i < N:
# 生成随机的样本数和维度
n_ex = np.random.randint(1, 100)
n_dims = np.random.randint(1, 100)
z = random_tensor((n_ex, n_dims))
# 断言 mine 的梯度与 gold 的梯度几乎相等
assert_almost_equal(mine.grad(z), gold(z))
print("PASSED")
i = 1
# 测试 GELU 激活函数的梯度
def test_gelu_grad(N=50):
# 从 numpy_ml.neural_nets.activations 模块中导入 GELU 激活函数
from numpy_ml.neural_nets.activations import GELU
# 如果 N 为 None,则将 N 设置为正无穷
N = np.inf if N is None else N
# 创建一个不使用近似的 GELU 激活函数对象
mine = GELU(approximate=False)
# 创建一个使用近似的 GELU 激活函数对象
mine_approx = GELU(approximate=True)
# 创建一个使用 PyTorch 的 GELU 梯度生成器对象
gold = torch_gradient_generator(F.gelu)
# 初始化计数器 i
i = 0
# 当 i 小于 N 时执行循环
while i < N:
# 生成随机的样本数和维度数
n_ex = np.random.randint(1, 100)
n_dims = np.random.randint(1, 100)
# 生成随机的张量 z
z = random_tensor((n_ex, n_dims))
# 断言 mine 激活函数的梯度与 gold 激活函数的梯度在小数点后三位上几乎相等
assert_almost_equal(mine.grad(z), gold(z), decimal=3)
# 断言 mine 激活函数的梯度与 mine_approx 激活函数的梯度在小数点后三位上几乎相等
assert_almost_equal(mine.grad(z), mine_approx.grad(z))
# 打印 "PASSED"
print("PASSED")
# 计数器 i 自增
i = 1
# 测试 SELU 激活函数的梯度计算
def test_selu_grad(N=50):
# 导入 SELU 激活函数
from numpy_ml.neural_nets.activations import SELU
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 创建 SELU 激活函数对象
mine = SELU()
# 创建 PyTorch 中 SELU 激活函数的梯度计算函数
gold = torch_gradient_generator(F.selu)
# 初始化计数器
i = 0
# 循环进行 N 次测试
while i < N:
# 随机生成样本数和维度
n_ex = np.random.randint(1, 100)
n_dims = np.random.randint(1, 100)
# 生成随机张量
z = random_tensor((n_ex, n_dims))
# 断言 SELU 激活函数的梯度与 PyTorch 中的梯度计算函数结果相近
assert_almost_equal(mine.grad(z), gold(z), decimal=6)
# 打印测试通过信息
print("PASSED")
i = 1
# 测试 LeakyReLU 激活函数的梯度计算
def test_leakyrelu_grad(N=50):
# 导入 LeakyReLU 激活函数
from numpy_ml.neural_nets.activations import LeakyReLU
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 初始化计数器
i = 0
# 循环进行 N 次测试
while i < N:
# 随机生成样本数、维度和 alpha 参数
n_ex = np.random.randint(1, 10)
n_dims = np.random.randint(1, 10)
alpha = np.random.uniform(0, 10)
# 生成随机张量
z = random_tensor((n_ex, n_dims))
# 创建 LeakyReLU 激活函数对象
mine = LeakyReLU(alpha)
# 创建 PyTorch 中 LeakyReLU 激活函数的梯度计算函数
gold = torch_gradient_generator(F.leaky_relu, negative_slope=alpha)
# 断言 LeakyReLU 激活函数的梯度与 PyTorch 中的梯度计算函数结果相近
assert_almost_equal(mine.grad(z), gold(z), decimal=6)
# 打印测试通过信息
print("PASSED")
i = 1
# 测试 SoftPlus 激活函数的梯度计算
def test_softplus_grad(N=50):
# 导入 SoftPlus 激活函数
from numpy_ml.neural_nets.activations import SoftPlus
# 如果 N 为 None,则将 N 设置为无穷大
N = np.inf if N is None else N
# 创建 SoftPlus 激活函数对象
mine = SoftPlus()
# 创建 PyTorch 中 SoftPlus 激活函数的梯度计算函数
gold = torch_gradient_generator(F.softplus)
# 初始化计数器
i = 0
# 循环进行 N 次测试
while i < N:
# 随机生成样本数、维度,并标准化生成的随机张量
n_ex = np.random.randint(1, 100)
n_dims = np.random.randint(1, 100)
z = random_tensor((n_ex, n_dims), standardize=True)
# 断言 SoftPlus 激活函数的梯度与 PyTorch 中的梯度计算函数结果相近
assert_almost_equal(mine.grad(z), gold(z))
# 打印测试通过信息
print("PASSED")
i = 1
# 如果作为主程序运行,则执行激活函数测试
if __name__ == "__main__":
test_activations(N=50)
numpy-mlnumpy_mlteststest_nonparametric.py
代码语言:javascript
复制# 禁用 flake8 的警告
# 导入 numpy 库并重命名为 np
import numpy as np
# 从 sklearn 库中导入 KNeighborsRegressor 和 KNeighborsClassifier 类
# 从 sklearn.gaussian_process 库中导入 GaussianProcessRegressor 类
from sklearn.neighbors import KNeighborsRegressor, KNeighborsClassifier
from sklearn.gaussian_process import GaussianProcessRegressor
# 从 numpy_ml.nonparametric.knn 模块中导入 KNN 类
# 从 numpy_ml.nonparametric.gp 模块中导入 GPRegression 类
# 从 numpy_ml.utils.distance_metrics 模块中导入 euclidean 函数
from numpy_ml.nonparametric.knn import KNN
from numpy_ml.nonparametric.gp import GPRegression
from numpy_ml.utils.distance_metrics import euclidean
# 定义测试 KNN 回归的函数,参数 N 默认值为 15
def test_knn_regression(N=15):
# 设置随机种子为 12345
np.random.seed(12345)
# 初始化循环计数器 i
i = 0
# 循环执行 N 次
while i < N:
# 生成随机数 N 和 M
N = np.random.randint(2, 100)
M = np.random.randint(2, 100)
# 生成随机数 k,ls,weights
k = np.random.randint(1, N)
ls = np.min([np.random.randint(1, 10), N - 1])
weights = np.random.choice(["uniform", "distance"])
# 生成随机数据 X,X_test,y
X = np.random.rand(N, M)
X_test = np.random.rand(N, M)
y = np.random.rand(N)
# 创建 KNN 模型对象 knn
knn = KNN(
k=k, leaf_size=ls, metric=euclidean, classifier=False, weights=weights
)
# 使用 X,y 训练 KNN 模型
knn.fit(X, y)
# 对 X_test 进行预测
preds = knn.predict(X_test)
# 创建 sklearn 中的 KNeighborsRegressor 模型对象 gold
gold = KNeighborsRegressor(
p=2,
leaf_size=ls,
n_neighbors=k,
weights=weights,
metric="minkowski",
algorithm="ball_tree",
)
# 使用 X,y 训练 gold 模型
gold.fit(X, y)
# 对 X_test 进行预测
gold_preds = gold.predict(X_test)
# 检查预测结果是否几乎相等
for mine, theirs in zip(preds, gold_preds):
np.testing.assert_almost_equal(mine, theirs)
# 打印测试通过信息
print("PASSED")
# 更新循环计数器
i = 1
# 定义测试 KNN 分类的函数,参数 N 默认值为 15
def test_knn_clf(N=15):
# 设置随机种子为 12345
np.random.seed(12345)
# 初始化循环计数器 i
i = 0
# 当 i 小于 N 时执行循环
while i < N:
# 生成一个介于 2 到 100 之间的随机整数作为 N
N = np.random.randint(2, 100)
# 生成一个介于 2 到 100 之间的随机整数作为 M
M = np.random.randint(2, 100)
# 生成一个介于 1 到 N 之间的随机整数作为 k
k = np.random.randint(1, N)
# 生成一个介于 2 到 10 之间的随机整数作为 n_classes
n_classes = np.random.randint(2, 10)
# 生成一个介于 1 到 10 之间的随机整数和 N-1 中的最小值作为 ls
ls = np.min([np.random.randint(1, 10), N - 1])
# 设置权重为 "uniform"
# 生成一个 N 行 M 列的随机数组作为 X
X = np.random.rand(N, M)
# 生成一个 N 行 M 列的随机数组作为 X_test
X_test = np.random.rand(N, M)
# 生成一个长度为 N 的随机整数数组作为 y
y = np.random.randint(0, n_classes, size=N)
# 创建 KNN 对象,设置参数 k, leaf_size, metric, classifier 和 weights
knn = KNN(k=k, leaf_size=ls, metric=euclidean, classifier=True, weights=weights)
# 使用 X 和 y 训练 KNN 模型
knn.fit(X, y)
# 对 X_test 进行预测
preds = knn.predict(X_test)
# 创建 KNeighborsClassifier 对象,设置参数 p, metric, leaf_size, n_neighbors, weights 和 algorithm
gold = KNeighborsClassifier(
p=2,
metric="minkowski",
leaf_size=ls,
n_neighbors=k,
weights=weights,
algorithm="ball_tree",
)
# 使用 X 和 y 训练 KNeighborsClassifier 模型
gold.fit(X, y)
# 对 X_test 进行预测
gold_preds = gold.predict(X_test)
# 对 preds 和 gold_preds 进行逐一比较,检查是否几乎相等
for mine, theirs in zip(preds, gold_preds):
np.testing.assert_almost_equal(mine, theirs)
# 打印 "PASSED" 表示测试通过
print("PASSED")
# i 自增 1
i = 1
# 定义一个测试高斯过程回归的函数,参数 N 默认为 15
def test_gp_regression(N=15):
# 设置随机种子
np.random.seed(12345)
# 初始化循环计数器 i
i = 0
# 循环执行 N 次
while i < N:
# 生成随机的 alpha
alpha = np.random.rand()
# 生成随机的 N
N = np.random.randint(2, 100)
# 生成随机的 M
M = np.random.randint(2, 100)
# 生成随机的 K
K = np.random.randint(1, N)
# 生成随机的 J
J = np.random.randint(1, 3)
# 生成随机的 N 行 M 列的数据矩阵 X
X = np.random.rand(N, M)
# 生成随机的 N 行 J 列的数据矩阵 y
y = np.random.rand(N, J)
# 生成随机的 K 行 M 列的数据矩阵 X_test
X_test = np.random.rand(K, M)
# 创建一个高斯过程回归对象 gp,使用 RBF 核函数和指定的 alpha
gp = GPRegression(kernel="RBFKernel(sigma=1)", alpha=alpha)
# 创建一个高斯过程回归对象 gold,使用默认参数
gold = GaussianProcessRegressor(
kernel=None, alpha=alpha, optimizer=None, normalize_y=False
)
# 使用 X, y 训练 gp 模型
gp.fit(X, y)
# 使用 X, y 训练 gold 模型
gold.fit(X, y)
# 对 X_test 进行预测,返回预测值 preds 和方差
preds, _ = gp.predict(X_test)
# 使用 gold 模型对 X_test 进行预测,返回预测值 gold_preds
gold_preds = gold.predict(X_test)
# 检查预测值 preds 和 gold_preds 是否几乎相等
np.testing.assert_almost_equal(preds, gold_preds)
# 计算 gp 模型的边缘对数似然
mll = gp.marginal_log_likelihood()
# 计算 gold 模型的对数边缘似然
gold_mll = gold.log_marginal_likelihood()
# 检查 mll 和 gold_mll 是否几乎相等
np.testing.assert_almost_equal(mll, gold_mll)
# 打印 "PASSED",表示测试通过
print("PASSED")
# 更新循环计数器 i
i = 1
numpy-mlnumpy_mlteststest_preprocessing.py
代码语言:javascript
复制# 禁用 flake8 检查
from collections import Counter
# 导入 huffman 模块
import huffman
# 导入 numpy 模块
import numpy as np
# 从 scipy.fftpack 模块导入 dct 函数
from scipy.fftpack import dct
# 从 sklearn.preprocessing 模块导入 StandardScaler 类
from sklearn.preprocessing import StandardScaler
# 从 sklearn.feature_extraction.text 模块导入 TfidfVectorizer 类
from sklearn.feature_extraction.text import TfidfVectorizer
# 尝试导入 librosa.core.time_frequency 模块中的 fft_frequencies 函数,如果导入失败则导入 librosa 模块中的 fft_frequencies 函数
try:
from librosa.core.time_frequency import fft_frequencies
except ImportError:
# 对于 librosa 版本 >= 0.8.0
from librosa import fft_frequencies
# 从 librosa.feature 模块导入 mfcc 函数
from librosa.feature import mfcc as lr_mfcc
# 从 librosa.util 模块导入 frame 函数
from librosa.util import frame
# 从 librosa.filters 模块导入 mel 函数
from librosa.filters import mel
# 导入 numpy_ml 模块中的相关实现
from numpy_ml.preprocessing.general import Standardizer
from numpy_ml.preprocessing.nlp import HuffmanEncoder, TFIDFEncoder
from numpy_ml.preprocessing.dsp import (
DCT,
DFT,
mfcc,
to_frames,
mel_filterbank,
dft_bins,
)
from numpy_ml.utils.testing import random_paragraph
# 测试 Huffman 编码
def test_huffman(N=15):
np.random.seed(12345)
i = 0
while i < N:
# 生成随机单词数量
n_words = np.random.randint(1, 100)
# 生成随机段落
para = random_paragraph(n_words)
# 创建 Huffman 编码器对象
HT = HuffmanEncoder()
# 对段落进行编码
HT.fit(para)
# 获取编码后的字典
my_dict = HT._item2code
# 使用 gold-standard 的 huffman 模块生成编码字典
their_dict = huffman.codebook(Counter(para).items())
# 检查两个字典是否一致
for k, v in their_dict.items():
fstr = "their_dict['{}'] = {}, but my_dict['{}'] = {}"
assert k in my_dict, "key `{}` not in my_dict".format(k)
assert my_dict[k] == v, fstr.format(k, v, k, my_dict[k])
print("PASSED")
i = 1
# 测试标准化器
def test_standardizer(N=15):
np.random.seed(12345)
i = 0
while i < N:
# 随机生成是否计算均值和标准差的标志
mean = bool(np.random.randint(2))
std = bool(np.random.randint(2))
# 随机生成矩阵的行数和列数
N = np.random.randint(2, 100)
M = np.random.randint(2, 100)
# 生成随机矩阵
X = np.random.rand(N, M)
# 创建标准化器对象
S = Standardizer(with_mean=mean, with_std=std)
# 对数据进行拟合
S.fit(X)
# 进行标准化处理
mine = S.transform(X)
# 使用 sklearn 中的 StandardScaler 进行标准化
theirs = StandardScaler(with_mean=mean, with_std=std)
gold = theirs.fit_transform(X)
# 检查两种标准化结果是否接近
np.testing.assert_almost_equal(mine, gold)
print("PASSED")
i = 1
# 测试 TF-IDF 编码器的功能,生成 N 个测试用例
def test_tfidf(N=15):
# 设置随机种子
np.random.seed(12345)
# 初始化计数器 i
i = 0
# 循环生成 N 个测试用例
while i < N:
# 初始化文档列表
docs = []
# 随机生成文档数量
n_docs = np.random.randint(1, 10)
# 生成每个文档的内容
for d in range(n_docs):
# 随机生成每个文档的行数
n_lines = np.random.randint(1, 1000)
# 生成每行的随机段落
lines = [random_paragraph(np.random.randint(1, 10)) for _ in range(n_lines)]
# 将每行段落连接成文档
docs.append("n".join([" ".join(l) for l in lines]))
# 随机选择是否平滑 IDF
smooth = bool(np.random.randint(2))
# 初始化 TF-IDF 编码器对象
tfidf = TFIDFEncoder(
lowercase=True,
min_count=0,
smooth_idf=smooth,
max_tokens=None,
input_type="strings",
filter_stopwords=False,
)
# 初始化 sklearn 中的 TfidfVectorizer 对象作为对照
gold = TfidfVectorizer(
input="content",
norm=None,
use_idf=True,
lowercase=True,
smooth_idf=smooth,
sublinear_tf=False,
)
# 对 TF-IDF 编码器进行拟合
tfidf.fit(docs)
# 获取 TF-IDF 编码结果
mine = tfidf.transform(ignore_special_chars=True)
# 获取 sklearn 中 TfidfVectorizer 的结果并转换为数组
theirs = gold.fit_transform(docs).toarray()
# 断言 TF-IDF 编码结果与对照结果的近似相等
np.testing.assert_almost_equal(mine, theirs)
# 打印测试通过信息
print("PASSED")
# 计数器加一
i = 1
# 测试 DCT 变换的功能,生成 N 个测试用例
def test_dct(N=15):
# 设置随机种子
np.random.seed(12345)
# 初始化计数器 i
i = 0
# 循环生成 N 个测试用例
while i < N:
# 随机生成信号长度 N
N = np.random.randint(2, 100)
# 随机生成信号
signal = np.random.rand(N)
# 随机选择是否正交
ortho = bool(np.random.randint(2))
# 计算自定义的 DCT 变换
mine = DCT(signal, orthonormal=ortho)
# 计算 scipy 中的 DCT 变换作为对照
theirs = dct(signal, norm="ortho" if ortho else None)
# 断言自定义 DCT 变换结果与对照结果的近似相等
np.testing.assert_almost_equal(mine, theirs)
# 打印测试通过信息
print("PASSED")
# 计数器加一
i = 1
# 测试 DFT 变换的功能,生成 N 个测试用例
def test_dft(N=15):
# 设置随机种子
np.random.seed(12345)
# 初始化计数器 i
i = 0
# 循环生成 N 个测试用例
while i < N:
# 随机生成信号长度 N
N = np.random.randint(2, 100)
# 随机生成信号
signal = np.random.rand(N)
# 计算自定义的 DFT 变换
mine = DFT(signal)
# 计算 numpy 中的快速傅里叶变换作为对照
theirs = np.fft.rfft(signal)
# 断言自定义 DFT 变换结果的实部与对照结果的实部的近似相等
np.testing.assert_almost_equal(mine.real, theirs.real)
# 打印测试通过信息
print("PASSED")
# 计数器加一
i = 1
# 测试 MFCC 的功能,生成 N 个测试用例
def test_mfcc(N=1):
"""Broken"""
# 设置随机种子
np.random.seed(12345)
# 初始化计数器 i
i = 0
# 当 i 小于 N 时执行循环
while i < N:
# 生成一个介于500到1000之间的随机整数,赋值给 N
N = np.random.randint(500, 1000)
# 生成一个介于50到100之间的随机整数,赋值给 fs
fs = np.random.randint(50, 100)
# 设置 MFCC 参数
n_mfcc = 12
window_len = 100
stride_len = 50
n_filters = 20
window_dur = window_len / fs
stride_dur = stride_len / fs
# 生成一个长度为 N 的随机信号
signal = np.random.rand(N)
# 计算自己实现的 MFCC 特征
mine = mfcc(
signal,
fs=fs,
window="hann",
window_duration=window_dur,
stride_duration=stride_dur,
lifter_coef=0,
alpha=0,
n_mfccs=n_mfcc,
normalize=False,
center=True,
n_filters=n_filters,
replace_intercept=False,
)
# 使用库函数计算 MFCC 特征
theirs = lr_mfcc(
signal,
sr=fs,
n_mels=n_filters,
n_mfcc=n_mfcc,
n_fft=window_len,
hop_length=stride_len,
htk=True,
).T
# 检查两种方法计算的 MFCC 特征是否接近
np.testing.assert_almost_equal(mine, theirs, decimal=4)
# 打印“PASSED”表示通过测试
print("PASSED")
# i 自增
i = 1
# 测试帧处理函数
def test_framing(N=15):
# 设置随机种子
np.random.seed(12345)
# 初始化循环计数器
i = 0
# 循环执行 N 次
while i < N:
# 生成随机信号长度
N = np.random.randint(500, 100000)
# 生成随机窗口长度
window_len = np.random.randint(10, 100)
# 生成随机步长
stride_len = np.random.randint(1, 50)
# 生成随机信号
signal = np.random.rand(N)
# 调用自定义的帧处理函数 to_frames
mine = to_frames(signal, window_len, stride_len, writeable=False)
# 调用库函数 frame 进行帧处理
theirs = frame(signal, frame_length=window_len, hop_length=stride_len).T
# 断言两者长度相等
assert len(mine) == len(theirs), "len(mine) = {}, len(theirs) = {}".format(
len(mine), len(theirs)
)
# 使用 np.testing.assert_almost_equal 检查两者是否几乎相等
np.testing.assert_almost_equal(mine, theirs)
# 打印测试通过信息
print("PASSED")
# 更新循环计数器
i = 1
# 测试离散傅立叶变换频率分辨率函数
def test_dft_bins(N=15):
# 设置随机种子
np.random.seed(12345)
# 初始化循环计数器
i = 0
# 循环执行 N 次
while i < N:
# 生成随机信号长度
N = np.random.randint(500, 100000)
# 生成随机采样频率
fs = np.random.randint(50, 1000)
# 调用自定义的频率分辨率函数 dft_bins
mine = dft_bins(N, fs=fs, positive_only=True)
# 调用库函数 fft_frequencies 计算频率分辨率
theirs = fft_frequencies(fs, N)
# 使用 np.testing.assert_almost_equal 检查两者是否几乎相等
np.testing.assert_almost_equal(mine, theirs)
# 打印测试通过信息
print("PASSED")
# 更新循环计数器
i = 1
# 测试梅尔滤波器组函数
def test_mel_filterbank(N=15):
# 设置随机种子
np.random.seed(12345)
# 初始化循环计数器
i = 0
# 循环执行 N 次
while i < N:
# 生成随机采样频率
fs = np.random.randint(50, 10000)
# 生成随机滤波器数量
n_filters = np.random.randint(2, 20)
# 生成随机窗口长度
window_len = np.random.randint(10, 100)
# 生成随机是否归一化参数
norm = np.random.randint(2)
# 调用自定义的梅尔滤波器组函数 mel_filterbank
mine = mel_filterbank(
window_len, n_filters, fs, min_freq=0, max_freq=None, normalize=bool(norm)
)
# 调用库函数 mel 计算梅尔滤波器组
theirs = mel(
fs,
n_fft=window_len,
n_mels=n_filters,
htk=True,
norm="slaney" if norm == 1 else None,
)
# 使用 np.testing.assert_almost_equal 检查两者是否几乎相等
np.testing.assert_almost_equal(mine, theirs)
# 打印测试通过信息
print("PASSED")
# 更新循环计数器
i = 1
numpy-mlnumpy_mlteststest_trees.py
代码语言:javascript
复制# 禁用 flake8 检查
import numpy as np
# 导入随机森林分类器和回归器
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
# 导入决策树分类器和回归器
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
# 导入准确率评估指标和均方误差评估指标
from sklearn.metrics import accuracy_score, mean_squared_error
# 导入生成回归数据和分类数据的函数
from sklearn.datasets import make_regression, make_blobs
# 导入数据集划分函数
from sklearn.model_selection import train_test_split
# 导入梯度提升决策树类
from numpy_ml.trees.gbdt import GradientBoostedDecisionTree
# 导入决策树类、节点类和叶子节点类
from numpy_ml.trees.dt import DecisionTree, Node, Leaf
# 导入随机森林类
from numpy_ml.trees.rf import RandomForest
# 导入随机生成张量的测试函数
from numpy_ml.utils.testing import random_tensor
# 克隆决策树
def clone_tree(dtree):
# 获取决策树的左子节点、右子节点、特征、阈值和值
children_left = dtree.tree_.children_left
children_right = dtree.tree_.children_right
feature = dtree.tree_.feature
threshold = dtree.tree_.threshold
values = dtree.tree_.value
# 递归构建决策树
def grow(node_id):
l, r = children_left[node_id], children_right[node_id]
if l == r:
return Leaf(values[node_id].argmax())
n = Node(None, None, (feature[node_id], threshold[node_id]))
n.left = grow(l)
n.right = grow(r)
return n
node_id = 0
root = Node(None, None, (feature[node_id], threshold[node_id]))
root.left = grow(children_left[node_id])
root.right = grow(children_right[node_id])
return root
# 比较两棵树的结构和值
def compare_trees(mine, gold):
# 克隆金标准决策树
clone = clone_tree(gold)
mine = mine.root
# 递归测试两棵树的结构和值是否相等
def test(mine, clone):
if isinstance(clone, Node) and isinstance(mine, Node):
assert mine.feature == clone.feature, "Node {} not equal".format(depth)
np.testing.assert_allclose(mine.threshold, clone.threshold)
test(mine.left, clone.left, depth 1)
test(mine.right, clone.right, depth 1)
elif isinstance(clone, Leaf) and isinstance(mine, Leaf):
np.testing.assert_allclose(mine.value, clone.value)
return
else:
raise ValueError("Nodes at depth {} are not equal".format(depth))
depth = 0
ok = True
# 当 ok 为真时进入循环
while ok:
# 如果 clone 和 mine 都是 Node 类型的实例
if isinstance(clone, Node) and isinstance(mine, Node):
# 断言 mine 的特征与 clone 的特征相等
assert mine.feature == clone.feature
# 使用 np.testing.assert_allclose 检查 mine 的阈值与 clone 的阈值是否相等
np.testing.assert_allclose(mine.threshold, clone.threshold)
# 递归调用 test 函数,比较 mine 的左子节点和 clone 的左子节点
test(mine.left, clone.left, depth 1)
# 递归调用 test 函数,比较 mine 的右子节点和 clone 的右子节点
test(mine.right, clone.right, depth 1)
# 如果 clone 和 mine 都是 Leaf 类型的实例
elif isinstance(clone, Leaf) and isinstance(mine, Leaf):
# 使用 np.testing.assert_allclose 检查 mine 的值与 clone 的值是否相等
np.testing.assert_allclose(mine.value, clone.value)
# 返回,结束当前递归
return
# 如果 clone 和 mine 不是相同类型的节点
else:
# 抛出 ValueError 异常,提示深度为 depth 的节点不相等
raise ValueError("Nodes at depth {} are not equal".format(depth))
# 测试决策树模型,N默认为1
def test_DecisionTree(N=1):
# 初始化i为1
i = 1
# 设置随机种子为12345
np.random.seed(12345)
# 测试随机森林模型,N默认为1
def test_RandomForest(N=1):
# 设置随机种子为12345
np.random.seed(12345)
# 初始化i为1
i = 1
# 测试梯度提升决策树模型,N默认为1
def test_gbdt(N=1):
# 设置随机种子为12345
np.random.seed(12345)
# 初始化i为1
i = 1
numpy-mlnumpy_mlteststest_utils.py
代码语言:javascript
复制# 禁用 flake8 的警告
import numpy as np
# 导入 scipy 库
import scipy
# 导入 networkx 库并重命名为 nx
import networkx as nx
# 从 sklearn.neighbors 模块中导入 BallTree 类并重命名为 sk_BallTree
from sklearn.neighbors import BallTree as sk_BallTree
# 从 sklearn.metrics.pairwise 模块中导入 rbf_kernel 函数并重命名为 sk_rbf
from sklearn.metrics.pairwise import rbf_kernel as sk_rbf
# 从 sklearn.metrics.pairwise 模块中导入 linear_kernel 函数并重命名为 sk_linear
from sklearn.metrics.pairwise import linear_kernel as sk_linear
# 从 sklearn.metrics.pairwise 模块中导入 polynomial_kernel 函数并重命名为 sk_poly
from sklearn.metrics.pairwise import polynomial_kernel as sk_poly
# 从 numpy_ml.utils.distance_metrics 模块中导入多个距离度量函数
from numpy_ml.utils.distance_metrics import (
hamming,
euclidean,
chebyshev,
manhattan,
minkowski,
)
# 从 numpy_ml.utils.kernels 模块中导入 LinearKernel、PolynomialKernel、RBFKernel 类
from numpy_ml.utils.kernels import LinearKernel, PolynomialKernel, RBFKernel
# 从 numpy_ml.utils.data_structures 模块中导入 BallTree 类
from numpy_ml.utils.data_structures import BallTree
# 从 numpy_ml.utils.graphs 模块中导入多个图相关的类和函数
from numpy_ml.utils.graphs import (
Edge,
DiGraph,
UndirectedGraph,
random_DAG,
random_unweighted_graph,
)
#######################################################################
# Kernels #
#######################################################################
# 定义测试线性核函数的函数
def test_linear_kernel(N=1):
# 设置随机种子
np.random.seed(12345)
i = 0
while i < N:
# 生成随机数 N、M、C
N = np.random.randint(1, 100)
M = np.random.randint(1, 100)
C = np.random.randint(1, 1000)
# 生成随机矩阵 X 和 Y
X = np.random.rand(N, C)
Y = np.random.rand(M, C)
# 计算自定义线性核函数的结果
mine = LinearKernel()(X, Y)
# 计算 sklearn 中线性核函数的结果
gold = sk_linear(X, Y)
# 使用 np.testing.assert_almost_equal 函数比较两个结果的近似程度
np.testing.assert_almost_equal(mine, gold)
# 打印测试通过信息
print("PASSED")
i = 1
# 定义测试多项式核函数的函数
def test_polynomial_kernel(N=1):
np.random.seed(12345)
i = 0
while i < N:
N = np.random.randint(1, 100)
M = np.random.randint(1, 100)
C = np.random.randint(1, 1000)
gamma = np.random.rand()
d = np.random.randint(1, 5)
c0 = np.random.rand()
X = np.random.rand(N, C)
Y = np.random.rand(M, C)
mine = PolynomialKernel(gamma=gamma, d=d, c0=c0)(X, Y)
gold = sk_poly(X, Y, gamma=gamma, degree=d, coef0=c0)
np.testing.assert_almost_equal(mine, gold)
print("PASSED")
i = 1
# 定义测试径向基核函数的函数
def test_radial_basis_kernel(N=1):
np.random.seed(12345)
i = 0
# 当 i 小于 N 时执行循环
while i < N:
# 生成随机整数 N,范围在 [1, 100)
N = np.random.randint(1, 100)
# 生成随机整数 M,范围在 [1, 100)
M = np.random.randint(1, 100)
# 生成随机整数 C,范围在 [1, 1000)
C = np.random.randint(1, 1000)
# 生成随机浮点数 gamma,范围在 [0, 1)
gamma = np.random.rand()
# 生成 N 行 C 列的随机数组 X
X = np.random.rand(N, C)
# 生成 M 行 C 列的随机数组 Y
Y = np.random.rand(M, C)
# sklearn (gamma) <-> mine (sigma) 转换:
# gamma = 1 / (2 * sigma^2)
# sigma = np.sqrt(1 / 2 * gamma)
# 使用 RBFKernel 类计算 RBF 核函数值,传入参数 sigma
mine = RBFKernel(sigma=np.sqrt(1 / (2 * gamma)))(X, Y)
# 使用 sk_rbf 函数计算 RBF 核函数值,传入参数 gamma
gold = sk_rbf(X, Y, gamma=gamma)
# 使用 np.testing.assert_almost_equal 函数检查 mine 和 gold 是否近似相等
np.testing.assert_almost_equal(mine, gold)
# 打印 "PASSED" 表示测试通过
print("PASSED")
# i 自增 1
i = 1
# 距离度量函数的测试
# 测试欧几里德距离函数
def test_euclidean(N=1):
# 设置随机种子
np.random.seed(12345)
i = 0
while i < N:
# 生成随机长度的向量
N = np.random.randint(1, 100)
x = np.random.rand(N)
y = np.random.rand(N)
# 计算自定义的欧几里德距离
mine = euclidean(x, y)
# 使用 SciPy 库计算欧几里德距离
theirs = scipy.spatial.distance.euclidean(x, y)
# 断言两者的结果几乎相等
np.testing.assert_almost_equal(mine, theirs)
print("PASSED")
i = 1
# 测试汉明距离函数
def test_hamming(N=1):
np.random.seed(12345)
i = 0
while i < N:
N = np.random.randint(1, 100)
# 生成随机整数向量
x = (np.random.rand(N) * 100).round().astype(int)
y = (np.random.rand(N) * 100).round().astype(int)
# 计算自定义的汉明距离
mine = hamming(x, y)
# 使用 SciPy 库计算汉明距离
theirs = scipy.spatial.distance.hamming(x, y)
# 断言两者的结果几乎相等
np.testing.assert_almost_equal(mine, theirs)
print("PASSED")
i = 1
# 测试闵可夫斯基距离函数
def test_minkowski(N=1):
np.random.seed(12345)
i = 0
while i < N:
N = np.random.randint(1, 100)
p = 1 np.random.rand() * 10
x = np.random.rand(N)
y = np.random.rand(N)
# 计算自定义的闵可夫斯基距离
mine = minkowski(x, y, p)
# 使用 SciPy 库计算闵可夫斯基距离
theirs = scipy.spatial.distance.minkowski(x, y, p)
# 断言两者的结果几乎相等
np.testing.assert_almost_equal(mine, theirs)
print("PASSED")
i = 1
# 测试切比雪夫距离函数
def test_chebyshev(N=1):
np.random.seed(12345)
i = 0
while i < N:
N = np.random.randint(1, 100)
x = np.random.rand(N)
y = np.random.rand(N)
# 计算自定义的切比雪夫距离
mine = chebyshev(x, y)
# 使用 SciPy 库计算切比雪夫距离
theirs = scipy.spatial.distance.chebyshev(x, y)
# 断言两者的结果几乎相等
np.testing.assert_almost_equal(mine, theirs)
print("PASSED")
i = 1
# 测试曼哈顿距离函数
def test_manhattan(N=1):
np.random.seed(12345)
i = 0
# 当 i 小于 N 时执行循环
while i < N:
# 生成一个随机整数 N,范围在 [1, 100)
N = np.random.randint(1, 100)
# 生成一个包含 N 个随机浮点数的数组 x
x = np.random.rand(N)
# 生成一个包含 N 个随机浮点数的数组 y
y = np.random.rand(N)
# 调用 manhattan 函数计算曼哈顿距离
mine = manhattan(x, y)
# 调用 scipy 库中的 cityblock 函数计算曼哈顿距离
theirs = scipy.spatial.distance.cityblock(x, y)
# 使用 np.testing.assert_almost_equal 函数比较两个距离的近似相等性
np.testing.assert_almost_equal(mine, theirs)
# 打印 "PASSED" 表示测试通过
print("PASSED")
# i 自增
i = 1
# 定义一个函数用于测试 BallTree 数据结构
def test_ball_tree(N=1):
# 设置随机种子
np.random.seed(12345)
# 初始化计数器 i
i = 0
# 循环 N 次
while i < N:
# 生成随机数 N 和 M
N = np.random.randint(2, 100)
M = np.random.randint(2, 100)
# 生成随机数 k
k = np.random.randint(1, N)
# 生成随机数 ls
ls = np.min([np.random.randint(1, 10), N - 1])
# 生成随机数据矩阵 X
X = np.random.rand(N, M)
# 创建 BallTree 对象 BT
BT = BallTree(leaf_size=ls, metric=euclidean)
# 对数据矩阵 X 进行拟合
BT.fit(X)
# 生成随机向量 x
x = np.random.rand(M)
# 获取最近的 k 个邻居
mine = BT.nearest_neighbors(k, x)
# 断言返回的邻居数量等于 k
assert len(mine) == k
# 提取邻居的键和距离
mine_neighb = np.array([n.key for n in mine])
mine_dist = np.array([n.distance for n in mine])
# 对距离进行排序
sort_ix = np.argsort(mine_dist)
mine_dist = mine_dist[sort_ix]
mine_neighb = mine_neighb[sort_ix]
# 创建 scikit-learn 的 BallTree 对象 sk
sk = sk_BallTree(X, leaf_size=ls)
# 使用 sk 查询最近的 k 个邻居
theirs_dist, ind = sk.query(x.reshape(1, -1), k=k)
sort_ix = np.argsort(theirs_dist.flatten())
theirs_dist = theirs_dist.flatten()[sort_ix]
theirs_neighb = X[ind.flatten()[sort_ix]]
# 断言我的结果与 scikit-learn 的结果一致
for j in range(len(theirs_dist)):
np.testing.assert_almost_equal(mine_neighb[j], theirs_neighb[j])
np.testing.assert_almost_equal(mine_dist[j], theirs_dist[j])
# 打印测试通过信息
print("PASSED")
# 更新计数器 i
i = 1
# 将 networkx 图转换为自定义图表示
def from_networkx(G_nx):
V = list(G_nx.nodes)
edges = list(G_nx.edges)
# 检查是否是带权重的图
is_weighted = "weight" in G_nx[edges[0][0]][edges[0][1]]
E = []
for e in edges:
if is_weighted:
# 如果是带权重的边,则添加带权重的边
E.append(Edge(e[0], e[1], G_nx[e[0]][e[1]]["weight"]))
else:
# 如果不带权重,则添加不带权重的边
E.append(Edge(e[0], e[1]))
# 如果输入的图 G_nx 是有向图,则返回一个有向图对象 DiGraph(V, E)
# 否则返回一个无向图对象 UndirectedGraph(V, E)
return DiGraph(V, E) if nx.is_directed(G_nx) else UndirectedGraph(V, E)
# 将自定义的图形表示转换为 networkx 图形
def to_networkx(G):
# 如果图是有向的,则创建有向图,否则创建无向图
G_nx = nx.DiGraph() if G.is_directed else nx.Graph()
# 获取图中所有顶点
V = list(G._V2I.keys())
# 将所有顶点添加到 networkx 图中
G_nx.add_nodes_from(V)
for v in V:
# 获取顶点在图中的索引
fr_i = G._V2I[v]
# 获取与该顶点相连的边
edges = G._G[fr_i]
for edge in edges:
# 将边添加到 networkx 图中,包括权重信息
G_nx.add_edge(edge.fr, edge.to, weight=edge._w)
return G_nx
def test_all_paths(N=1):
np.random.seed(12345)
i = 0
while i < N:
# 生成随机概率值
p = np.random.rand()
# 生成随机值来确定图是否有向
directed = np.random.rand() < 0.5
# 创建一个随机无权重图
G = random_unweighted_graph(n_vertices=5, edge_prob=p, directed=directed)
nodes = G._I2V.keys()
G_nx = to_networkx(G)
# 对于每个图,测试所有起点和终点顶点对的 all_paths 方法
# 注意图不一定是连通的,所以许多路径可能为空
for s_i in nodes:
for e_i in nodes:
if s_i == e_i:
continue
# 获取自定义图中的所有路径
paths = G.all_paths(s_i, e_i)
# 获取 networkx 图中的所有简单路径
paths_nx = nx.all_simple_paths(G_nx, source=s_i, target=e_i, cutoff=10)
# 对路径进行排序
paths = sorted(paths)
paths_nx = sorted(list(paths_nx))
# 断言两个路径列表是否相等
for p1, p2 in zip(paths, paths_nx):
np.testing.assert_array_equal(p1, p2)
print("PASSED")
i = 1
def test_random_DAG(N=1):
np.random.seed(12345)
i = 0
while i < N:
# 生成指定范围内的随机概率值
p = np.random.uniform(0.25, 1)
# 生成指定范围内的随机顶点数量
n_v = np.random.randint(5, 50)
# 创建一个随机有向无环图
G = random_DAG(n_v, p)
G_nx = to_networkx(G)
# 断言 networkx 图是否是有向无环图
assert nx.is_directed_acyclic_graph(G_nx)
print("PASSED")
i = 1
def test_topological_ordering(N=1):
np.random.seed(12345)
i = 0
# 当 i 小于 N 时执行循环
while i < N:
# 生成一个介于 0.25 和 1 之间的随机数
p = np.random.uniform(0.25, 1)
# 生成一个介于 5 和 10 之间的随机整数
n_v = np.random.randint(5, 10)
# 生成一个随机有向无环图
G = random_DAG(n_v, p)
# 将生成的图转换为 NetworkX 图
G_nx = to_networkx(G)
# 如果转换后的图是有向无环图
if nx.is_directed_acyclic_graph(G_nx):
# 获取图的拓扑排序
topo_order = G.topological_ordering()
# 测试拓扑排序
seen_it = set()
for n_i in topo_order:
seen_it.add(n_i)
# 断言:对于每个节点 n_i,其邻居节点 c_i 不在已经遍历过的节点集合中
assert any([c_i in seen_it for c_i in G.get_neighbors(n_i)]) == False
# 打印 "PASSED" 表示测试通过
print("PASSED")
# 增加 i 的值,继续下一次循环
i = 1
# 测试图是否为无环图
def test_is_acyclic(N=1):
# 设置随机种子
np.random.seed(12345)
# 初始化计数器
i = 0
# 循环N次
while i < N:
# 生成随机概率值
p = np.random.rand()
# 生成随机布尔值,表示是否为有向图
directed = np.random.rand() < 0.5
# 生成一个随机无权图
G = random_unweighted_graph(n_vertices=10, edge_prob=p, directed=True)
# 将生成的图转换为 NetworkX 图
G_nx = to_networkx(G)
# 断言当前图是否为无环图,与 NetworkX 中的判断结果进行比较
assert G.is_acyclic() == nx.is_directed_acyclic_graph(G_nx)
# 打印测试通过信息
print("PASSED")
# 更新计数器
i = 1
numpy-mlnumpy_mltests__init__.py
代码语言:javascript
复制# 用于各种 numpy-ml 模块的单元测试
numpy-mlnumpy_mltreesdt.py
代码语言:javascript
复制# 导入 NumPy 库
import numpy as np
# 定义节点类
class Node:
def __init__(self, left, right, rule):
# 初始化节点的左子节点
self.left = left
# 初始化节点的右子节点
self.right = right
# 获取节点的特征
self.feature = rule[0]
# 获取节点的阈值
self.threshold = rule[1]
# 定义叶子节点类
class Leaf:
def __init__(self, value):
"""
`value` is an array of class probabilities if classifier is True, else
the mean of the region
"""
# 初始化叶子节点的值,如果是分类器,则为类别概率数组,否则为区域的均值
self.value = value
# 定义决策树类
class DecisionTree:
def __init__(
self,
classifier=True,
max_depth=None,
n_feats=None,
criterion="entropy",
seed=None,
"""
用于回归和分类问题的决策树模型。
参数
----------
classifier : bool
是否将目标值视为分类(classifier = True)或连续(classifier = False)。默认为True。
max_depth: int or None
停止生长树的深度。如果为None,则生长树直到所有叶子节点都是纯的。默认为None。
n_feats : int
指定在每次分裂时要采样的特征数量。如果为None,则在每次分裂时使用所有特征。默认为None。
criterion : {'mse', 'entropy', 'gini'}
计算分裂时要使用的错误标准。当`classifier`为False时,有效输入为{'mse'}。当`classifier`为True时,有效输入为{'entropy', 'gini'}。默认为'entropy'。
seed : int or None
随机数生成器的种子。默认为None。
"""
如果有种子值,则设置随机数种子
if seed:
np.random.seed(seed)
初始化深度为0
初始化根节点为None
设置特征数量为n_feats
设置错误标准为criterion
设置分类器为classifier
设置最大深度为max_depth,如果max_depth为None,则设置为无穷大
如果不是分类问题且错误标准为["gini", "entropy"]之一,则引发值错误
if not classifier and criterion in ["gini", "entropy"]:
raise ValueError(
"{} is a valid criterion only when classifier = True.".format(criterion)
)
如果是分类问题且错误标准为"mse",则引发值错误
if classifier and criterion == "mse":
raise ValueError("`mse` is a valid criterion only when classifier = False.")
# 适应二叉决策树到数据集
def fit(self, X, Y):
"""
Fit a binary decision tree to a dataset.
Parameters
----------
X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
The training data of `N` examples, each with `M` features
Y : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
An array of integer class labels for each example in `X` if
self.classifier = True, otherwise the set of target values for
each example in `X`.
"""
# 如果是分类器,则确定类别数量,否则为 None
self.n_classes = max(Y) 1 if self.classifier else None
# 确定特征数量,如果未指定则为 X 的特征数量,否则取最小值
self.n_feats = X.shape[1] if not self.n_feats else min(self.n_feats, X.shape[1])
# 生成决策树的根节点
self.root = self._grow(X, Y)
# 使用训练好的决策树对 X 中的示例进行分类或预测
def predict(self, X):
"""
Use the trained decision tree to classify or predict the examples in `X`.
Parameters
----------
X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
The training data of `N` examples, each with `M` features
Returns
-------
preds : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
The integer class labels predicted for each example in `X` if
self.classifier = True, otherwise the predicted target values.
"""
# 对 X 中的每个示例进行预测,返回预测的类别或目标值
return np.array([self._traverse(x, self.root) for x in X])
def predict_class_probs(self, X):
"""
使用训练好的决策树来返回`X`中每个示例的类别概率。
参数
----------
X : :py:class:`ndarray <numpy.ndarray>`,形状为`(N, M)`
`N`个示例的训练数据,每个示例有`M`个特征
返回
-------
preds : :py:class:`ndarray <numpy.ndarray>`,形状为`(N, n_classes)`
针对`X`中每个示例预测的类别概率。
"""
# 检查分类器是否已定义
assert self.classifier, "`predict_class_probs` undefined for classifier = False"
# 对`X`中的每个示例调用`_traverse`方法,返回类别概率数组
return np.array([self._traverse(x, self.root, prob=True) for x in X])
def _grow(self, X, Y, cur_depth=0):
# 如果所有标签都相同,则返回一个叶节点
if len(set(Y)) == 1:
if self.classifier:
prob = np.zeros(self.n_classes)
prob[Y[0]] = 1.0
return Leaf(prob) if self.classifier else Leaf(Y[0])
# 如果达到最大深度,则返回一个叶节点
if cur_depth >= self.max_depth:
v = np.mean(Y, axis=0)
if self.classifier:
v = np.bincount(Y, minlength=self.n_classes) / len(Y)
return Leaf(v)
cur_depth = 1
self.depth = max(self.depth, cur_depth)
N, M = X.shape
feat_idxs = np.random.choice(M, self.n_feats, replace=False)
# 根据`criterion`贪婪地选择最佳分割
feat, thresh = self._segment(X, Y, feat_idxs)
l = np.argwhere(X[:, feat] <= thresh).flatten()
r = np.argwhere(X[:, feat] > thresh).flatten()
# 递归生长左右子树
left = self._grow(X[l, :], Y[l], cur_depth)
right = self._grow(X[r, :], Y[r], cur_depth)
return Node(left, right, (feat, thresh))
def _segment(self, X, Y, feat_idxs):
"""
Find the optimal split rule (feature index and split threshold) for the
data according to `self.criterion`.
"""
# 初始化最佳增益为负无穷
best_gain = -np.inf
split_idx, split_thresh = None, None
# 遍历特征索引
for i in feat_idxs:
# 获取特征值
vals = X[:, i]
# 获取唯一值
levels = np.unique(vals)
# 计算阈值
thresholds = (levels[:-1] levels[1:]) / 2 if len(levels) > 1 else levels
# 计算增益
gains = np.array([self._impurity_gain(Y, t, vals) for t in thresholds])
# 更新最佳增益
if gains.max() > best_gain:
split_idx = i
best_gain = gains.max()
split_thresh = thresholds[gains.argmax()]
return split_idx, split_thresh
def _impurity_gain(self, Y, split_thresh, feat_values):
"""
Compute the impurity gain associated with a given split.
IG(split) = loss(parent) - weighted_avg[loss(left_child), loss(right_child)]
"""
# 根据不同的准则选择损失函数
if self.criterion == "entropy":
loss = entropy
elif self.criterion == "gini":
loss = gini
elif self.criterion == "mse":
loss = mse
# 计算父节点的损失
parent_loss = loss(Y)
# 生成分裂
left = np.argwhere(feat_values <= split_thresh).flatten()
right = np.argwhere(feat_values > split_thresh).flatten()
if len(left) == 0 or len(right) == 0:
return 0
# 计算子节点损失的加权平均
n = len(Y)
n_l, n_r = len(left), len(right)
e_l, e_r = loss(Y[left]), loss(Y[right])
child_loss = (n_l / n) * e_l (n_r / n) * e_r
# 计算增益是分裂前后损失的差异
ig = parent_loss - child_loss
return ig
# 递归遍历决策树节点,根据输入数据 X 和节点信息 node 进行预测
def _traverse(self, X, node, prob=False):
# 如果节点是叶子节点
if isinstance(node, Leaf):
# 如果是分类器模型
if self.classifier:
# 返回节点的值(概率值或类别值)
return node.value if prob else node.value.argmax()
# 如果是回归模型,直接返回节点的值
return node.value
# 如果输入数据 X 在节点的特征上的取值小于等于节点的阈值
if X[node.feature] <= node.threshold:
# 递归遍历左子树节点
return self._traverse(X, node.left, prob)
# 如果输入数据 X 在节点的特征上的取值大于节点的阈值
return self._traverse(X, node.right, prob)
# 计算决策树(即均值)预测的均方误差
def mse(y):
# 返回 y 与 y 的均值之差的平方的均值
return np.mean((y - np.mean(y)) ** 2)
# 计算标签序列的熵
def entropy(y):
# 统计标签序列中每个标签出现的次数
hist = np.bincount(y)
# 计算每个标签出现的概率
ps = hist / np.sum(hist)
# 计算熵值
return -np.sum([p * np.log2(p) for p in ps if p > 0])
# 计算标签序列的基尼不纯度(局部熵)
def gini(y):
# 统计标签序列中每个标签出现的次数
hist = np.bincount(y)
# 计算总样本数
N = np.sum(hist)
# 计算基尼不纯度
return 1 - sum([(i / N) ** 2 for i in hist])