NumPyML 源码解析(六)

2024-02-17 10:06:05 浏览数 (1)

numpy-mlnumpy_mlteststest_glm.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入 numpy 库并重命名为 np
import numpy as np

# 导入 statsmodels 库中的 api 模块并重命名为 sm
import statsmodels.api as sm
# 从 numpy_ml.linear_models 模块中导入 GeneralizedLinearModel 类
from numpy_ml.linear_models import GeneralizedLinearModel
# 从 numpy_ml.linear_models.glm 模块中导入 _GLM_LINKS 变量
from numpy_ml.linear_models.glm import _GLM_LINKS
# 从 numpy_ml.utils.testing 模块中导入 random_tensor 函数
from numpy_ml.utils.testing import random_tensor

# 定义一个测试函数 test_glm,参数 N 默认值为 20
def test_glm(N=20):
    # 设置随机种子为 12345
    np.random.seed(12345)
    # 如果 N 为 None,则将 N 设置为正无穷
    N = np.inf if N is None else N

    # 初始化变量 i 为 1
    i = 1
    # 循环执行直到 i 大于 N
    while i < N   1:
        # 生成一个随机整数作为样本数量,范围在 10 到 100 之间
        n_samples = np.random.randint(10, 100)

        # 生成一个随机整数作为特征数量,确保特征数量远小于样本数量,避免完全分离或多个解决方案
        n_feats = np.random.randint(1, 1   n_samples // 2)
        target_dim = 1

        # 随机选择是否拟合截距
        fit_intercept = np.random.choice([True, False])
        # 随机选择链接函数
        _link = np.random.choice(list(_GLM_LINKS.keys()))

        # 创建不同链接函数对应的家族
        families = {
            "identity": sm.families.Gaussian(),
            "logit": sm.families.Binomial(),
            "log": sm.families.Poisson(),
        }

        # 打印当前链接函数和是否拟合截距
        print(f"Link: {_link}")
        print(f"Fit intercept: {fit_intercept}")

        # 生成随机数据作为特征矩阵 X
        X = random_tensor((n_samples, n_feats), standardize=True)
        # 根据链接函数生成随机标签 y
        if _link == "logit":
            y = np.random.choice([0.0, 1.0], size=(n_samples, target_dim))
        elif _link == "log":
            y = np.random.choice(np.arange(0, 100), size=(n_samples, target_dim))
        elif _link == "identity":
            y = random_tensor((n_samples, target_dim), standardize=True)
        else:
            raise ValueError(f"Unknown link function {_link}")

        # 在整个数据集上拟合标准模型
        fam = families[_link]
        Xdesign = np.c_[np.ones(X.shape[0]), X] if fit_intercept else X

        glm_gold = sm.GLM(y, Xdesign, family=fam)
        glm_gold = glm_gold.fit()

        # 使用自定义的广义线性模型拟合数据
        glm_mine = GeneralizedLinearModel(link=_link, fit_intercept=fit_intercept)
        glm_mine.fit(X, y)

        # 检查模型系数是否匹配
        beta = glm_mine.beta.T.ravel()
        np.testing.assert_almost_equal(beta, glm_gold.params, decimal=6)
        print("t1. Overall model coefficients match")

        # 检查模型预测是否匹配
        np.testing.assert_almost_equal(
            glm_mine.predict(X), glm_gold.predict(Xdesign), decimal=5
        )
        print("t2. Overall model predictions match")

        # 打印测试通过信息
        print("tPASSEDn")
        i  = 1

numpy-mlnumpy_mlteststest_linear_regression.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入 numpy 库并重命名为 np
import numpy as np

# 从 sklearn 线性模型中导入 LinearRegression 类并重命名为 LinearRegressionGold
from sklearn.linear_model import LinearRegression as LinearRegressionGold

# 从 numpy_ml 线性模型中导入 LinearRegression 类
from numpy_ml.linear_models import LinearRegression

# 从 numpy_ml 工具包中导入 random_tensor 函数
from numpy_ml.utils.testing import random_tensor

# 定义测试线性回归函数,参数 N 默认值为 10
def test_linear_regression(N=10):
    # 设置随机种子为 12345
    np.random.seed(12345)
    # 如果 N 为 None,则将 N 设置为正无穷
    N = np.inf if N is None else N

    # 初始化变量 i 为 1
    i = 1

numpy-mlnumpy_mlteststest_naive_bayes.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入 numpy 库,并使用别名 np
import numpy as np
# 从 sklearn 库中导入 datasets 模块
from sklearn import datasets
# 从 sklearn 库中导入 model_selection 模块中的 train_test_split 函数

from sklearn.model_selection import train_test_split

# 从 sklearn 库中导入 naive_bayes 模块
from sklearn import naive_bayes

# 从 numpy_ml.linear_models 模块中导入 GaussianNBClassifier 类
from numpy_ml.linear_models import GaussianNBClassifier
# 从 numpy_ml.utils.testing 模块中导入 random_tensor 函数

from numpy_ml.utils.testing import random_tensor

# 定义测试函数 test_GaussianNB,参数 N 默认值为 10
def test_GaussianNB(N=10):
    # 设置随机种子为 12345
    np.random.seed(12345)
    # 如果 N 为 None,则将 N 设置为正无穷
    N = np.inf if N is None else N

    # 初始化变量 i 为 1
    i = 1
    # 获取浮点数的最小精度
    eps = np.finfo(float).eps

numpy-mlnumpy_mlteststest_ngram.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入临时文件模块
import tempfile

# 导入 nltk 和 numpy 模块
import nltk
import numpy as np

# 从上级目录中导入 tokenize_words 函数
from ..preprocessing.nlp import tokenize_words
# 从上级目录中导入 AdditiveNGram 和 MLENGram 类
from ..ngram import AdditiveNGram, MLENGram
# 从上级目录中导入 random_paragraph 函数
from ..utils.testing import random_paragraph

# 定义 MLEGold 类
class MLEGold:
    def __init__(
        self, N, K=1, unk=True, filter_stopwords=True, filter_punctuation=True
    ):
        # 初始化类的属性
        self.N = N
        self.K = K
        self.unk = unk
        self.filter_stopwords = filter_stopwords
        self.filter_punctuation = filter_punctuation

        # 设置超参数字典
        self.hyperparameters = {
            "N": N,
            "K": K,
            "unk": unk,
            "filter_stopwords": filter_stopwords,
            "filter_punctuation": filter_punctuation,
        }

    # 计算给定 N-gram 的对数概率
    def log_prob(self, words, N):
        assert N in self.counts, "You do not have counts for {}-grams".format(N)

        if N > len(words):
            err = "Not enough words for a gram-size of {}: {}".format(N, len(words))
            raise ValueError(err)

        total_prob = 0
        for ngram in nltk.ngrams(words, N):
            total_prob  = self._log_ngram_prob(ngram)
        return total_prob

    # 计算给定 N-gram 的对数概率
    def _log_ngram_prob(self, ngram):
        N = len(ngram)
        return self._models[N].logscore(ngram[-1], ngram[:-1])

# 定义 AdditiveGold 类
class AdditiveGold:
    def __init__(
        self, N, K=1, unk=True, filter_stopwords=True, filter_punctuation=True
    ):
        # 初始化类的属性
        self.N = N
        self.K = K
        self.unk = unk
        self.filter_stopwords = filter_stopwords
        self.filter_punctuation = filter_punctuation

        # 设置超参数字典
        self.hyperparameters = {
            "N": N,
            "K": K,
            "unk": unk,
            "filter_stopwords": filter_stopwords,
            "filter_punctuation": filter_punctuation,
        }
    # 计算给定单词列表的对数概率和
    def log_prob(self, words, N):
        # 检查是否存在 N-grams 的计数
        assert N in self.counts, "You do not have counts for {}-grams".format(N)

        # 如果单词数量不足以形成 N-grams,则引发异常
        if N > len(words):
            err = "Not enough words for a gram-size of {}: {}".format(N, len(words))
            raise ValueError(err)

        # 初始化总概率
        total_prob = 0
        # 遍历生成给定单词列表的 N-grams,并计算其对数概率
        for ngram in nltk.ngrams(words, N):
            total_prob  = self._log_ngram_prob(ngram)
        # 返回总概率
        return total_prob

    # 计算给定 N-gram 的对数概率
    def _log_ngram_prob(self, ngram):
        # 获取 N-gram 的长度
        N = len(ngram)
        # 调用模型对象的 logscore 方法计算 N-gram 的对数概率
        return self._models[N].logscore(ngram[-1], ngram[:-1])
# 测试最大似然估计模型
def test_mle():
    # 生成一个随机整数 N,范围在 [2, 5) 之间
    N = np.random.randint(2, 5)
    # 创建一个最大似然估计的金标准对象
    gold = MLEGold(N, unk=True, filter_stopwords=False, filter_punctuation=False)
    # 创建一个最大似然估计的自己实现的对象
    mine = MLENGram(N, unk=True, filter_stopwords=False, filter_punctuation=False)

    # 使用临时文件进行训练
    with tempfile.NamedTemporaryFile() as temp:
        # 将随机生成的一个包含 1000 个单词的段落写入临时文件
        temp.write(bytes(" ".join(random_paragraph(1000)), encoding="utf-8-sig"))
        # 使用金标准对象进行训练
        gold.train(temp.name, encoding="utf-8-sig")
        # 使用自己实现的对象进行训练
        mine.train(temp.name, encoding="utf-8-sig")

    # 遍历自己实现的对象中 N 阶计数的键
    for k in mine.counts[N].keys():
        # 如果键的第一个和第二个元素相等,并且在 ("<bol>", "<eol>") 中,则跳过
        if k[0] == k[1] and k[0] in ("<bol>", "<eol>"):
            continue

        # 错误信息字符串模板
        err_str = "{}, mine: {}, gold: {}"
        # 断言自己实现的对象中的计数与金标准对象中的计数相等
        assert mine.counts[N][k] == gold.counts[N][k], err_str.format(
            k, mine.counts[N][k], gold.counts[N][k]
        )

        # 计算自己实现的对象中 k 的对数概率
        M = mine.log_prob(k, N)
        # 计算金标准对象中 k 的对数概率,并转换为以自然对数为底
        G = gold.log_prob(k, N) / np.log2(np.e)
        # 使用 np.testing.assert_allclose 检查 M 和 G 是否接近
        np.testing.assert_allclose(M, G)
        # 打印 "PASSED"
        print("PASSED")


# 测试加法平滑模型
def test_additive():
    # 生成一个随机浮点数 K
    K = np.random.rand()
    # 生成一个随机整数 N,范围在 [2, 5) 之间
    N = np.random.randint(2, 5)
    # 创建一个加法平滑的金标准对象
    gold = AdditiveGold(
        N, K, unk=True, filter_stopwords=False, filter_punctuation=False
    )
    # 创建一个加法平滑的自己实现的对象
    mine = AdditiveNGram(
        N, K, unk=True, filter_stopwords=False, filter_punctuation=False
    )

    # 使用临时文件进行训练
    with tempfile.NamedTemporaryFile() as temp:
        # 将随机生成的一个包含 1000 个单词的段落写入临时文件
        temp.write(bytes(" ".join(random_paragraph(1000)), encoding="utf-8-sig"))
        # 使用金标准对象进行训练
        gold.train(temp.name, encoding="utf-8-sig")
        # 使用自己实现的对象进行训练
        mine.train(temp.name, encoding="utf-8-sig")

    # 遍历自己实现的对象中 N 阶计数的键
    for k in mine.counts[N].keys():
        # 如果键的第一个和第二个元素相等,并且在 ("<bol>", "<eol>") 中,则跳过
        if k[0] == k[1] and k[0] in ("<bol>", "<eol>"):
            continue

        # 错误信息字符串模板
        err_str = "{}, mine: {}, gold: {}"
        # 断言自己实现的对象中的计数与金标准对象中的计数相等
        assert mine.counts[N][k] == gold.counts[N][k], err_str.format(
            k, mine.counts[N][k], gold.counts[N][k]
        )

        # 计算自己实现的对象中 k 的对数概率
        M = mine.log_prob(k, N)
        # 计算金标准对象中 k 的对数概率,并转换为以自然对数为底
        G = gold.log_prob(k, N) / np.log2(np.e)
        # 使用 np.testing.assert_allclose 检查 M 和 G 是否接近
        np.testing.assert_allclose(M, G)
        # 打印 "PASSED"
        print("PASSED")

numpy-mlnumpy_mlteststest_nn.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入时间模块
import time
# 从 copy 模块中导入 deepcopy 函数
from copy import deepcopy

# 导入 numpy 模块,并将其命名为 np
import numpy as np
# 从 numpy.testing 模块中导入 assert_almost_equal 函数
from numpy.testing import assert_almost_equal

# 导入 sklearn.metrics 模块中的 log_loss 和 mean_squared_error 函数
from sklearn.metrics import log_loss, mean_squared_error

# 导入 scipy.special 模块中的 expit 函数,用于测试 sigmoid 函数
from scipy.special import expit

# 导入 torch 模块
import torch
# 从 torch.nn 模块中导入 nn 和 F
import torch.nn as nn
import torch.nn.functional as F

# 从 numpy_ml.neural_nets.utils 模块中导入一系列函数
from numpy_ml.neural_nets.utils import (
    calc_pad_dims_2D,
    conv2D_naive,
    conv2D,
    pad2D,
    pad1D,
)
# 从 numpy_ml.utils.testing 模块中导入一系列函数
from numpy_ml.utils.testing import (
    random_one_hot_matrix,
    random_stochastic_matrix,
    random_tensor,
)

# 从当前目录下的 nn_torch_models 模块中导入一系列类和函数
from .nn_torch_models import (
    TFNCELoss,
    WGAN_GP_tf,
    torch_xe_grad,
    torch_mse_grad,
    TorchVAELoss,
    TorchFCLayer,
    TorchRNNCell,
    TorchLSTMCell,
    TorchAddLayer,
    TorchWGANGPLoss,
    TorchConv1DLayer,
    TorchConv2DLayer,
    TorchPool2DLayer,
    TorchWavenetModule,
    TorchMultiplyLayer,
    TorchDeconv2DLayer,
    TorchLayerNormLayer,
    TorchBatchNormLayer,
    TorchEmbeddingLayer,
    TorchLinearActivation,
    TorchSDPAttentionLayer,
    TorchBidirectionalLSTM,
    torch_gradient_generator,
    TorchSkipConnectionConv,
    TorchSkipConnectionIdentity,
    TorchMultiHeadedAttentionModule,
)

#######################################################################
#                           Debug Formatter                           #
#######################################################################

# 定义一个函数,用于格式化错误信息
def err_fmt(params, golds, ix, warn_str=""):
    # 获取当前参数和标签
    mine, label = params[ix]
    # 构建错误信息字符串
    err_msg = "-" * 25   " DEBUG "   "-" * 25   "n"
    # 获取前一个参数和标签
    prev_mine, prev_label = params[max(ix - 1, 0)]
    # 添加前一个参数和标签的信息到错误信息字符串中
    err_msg  = "Mine (prev) [{}]:n{}nnTheirs (prev) [{}]:n{}".format(
        prev_label, prev_mine, prev_label, golds[prev_label]
    )
    # 添加当前参数和标签的信息到错误信息字符串中
    err_msg  = "nnMine [{}]:n{}nnTheirs [{}]:n{}".format(
        label, mine, label, golds[label]
    )
    # 添加警告信息到错误信息字符串中
    err_msg  = warn_str
    err_msg  = "n"   "-" * 23   " END DEBUG "   "-" * 23
    # 返回错误信息字符串
    return err_msg

#######################################################################
#                         Loss Functions                              #
#######################################################################

# 测试均方误差损失函数
def test_squared_error(N=15):
    # 从 numpy_ml.neural_nets.losses 模块导入 SquaredError 类
    from numpy_ml.neural_nets.losses import SquaredError

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 创建 SquaredError 实例
    mine = SquaredError()
    # 创建参考的均方误差损失函数
    gold = (
        lambda y, y_pred: mean_squared_error(y, y_pred)
        * y_pred.shape[0]
        * y_pred.shape[1]
        * 0.5
    )

    # 确保当两个数组相等时得到 0
    n_dims = np.random.randint(2, 100)
    n_examples = np.random.randint(1, 1000)
    y = y_pred = random_tensor((n_examples, n_dims))
    assert_almost_equal(mine.loss(y, y_pred), gold(y, y_pred))
    print("PASSED")

    i = 1
    while i < N:
        n_dims = np.random.randint(2, 100)
        n_examples = np.random.randint(1, 1000)
        y = random_tensor((n_examples, n_dims))
        y_pred = random_tensor((n_examples, n_dims))
        assert_almost_equal(mine.loss(y, y_pred), gold(y, y_pred), decimal=5)
        print("PASSED")
        i  = 1

# 测试交叉熵损失函数
def test_cross_entropy(N=15):
    # 从 numpy_ml.neural_nets.losses 模块导入 CrossEntropy 类
    from numpy_ml.neural_nets.losses import CrossEntropy

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 创建 CrossEntropy 实例
    mine = CrossEntropy()
    # 创建参考的对数损失函数
    gold = log_loss

    # 确保当两个数组相等时得到 0
    n_classes = np.random.randint(2, 100)
    n_examples = np.random.randint(1, 1000)
    y = y_pred = random_one_hot_matrix(n_examples, n_classes)
    assert_almost_equal(mine.loss(y, y_pred), gold(y, y_pred))
    print("PASSED")

    # 在随机输入上进行测试
    i = 1
    while i < N:
        n_classes = np.random.randint(2, 100)
        n_examples = np.random.randint(1, 1000)
        y = random_one_hot_matrix(n_examples, n_classes)
        y_pred = random_stochastic_matrix(n_examples, n_classes)

        assert_almost_equal(mine.loss(y, y_pred), gold(y, y_pred, normalize=False))
        print("PASSED")
        i  = 1

# 测试变分自编码器损失函数
def test_VAE_loss(N=15):
    # 从numpy_ml.neural_nets.losses模块中导入VAELoss类
    from numpy_ml.neural_nets.losses import VAELoss

    # 设置随机种子为12345
    np.random.seed(12345)

    # 如果N为None,则将N设置为无穷大,否则保持不变
    N = np.inf if N is None else N
    # 计算浮点数的最小值
    eps = np.finfo(float).eps

    # 初始化循环变量i为1
    i = 1
    # 当i小于N时执行循环
    while i < N:
        # 生成1到10之间的随机整数作为样本数
        n_ex = np.random.randint(1, 10)
        # 生成2到10之间的随机整数作为特征维度
        t_dim = np.random.randint(2, 10)
        # 生成服从标准化的随机张量作为t_mean
        t_mean = random_tensor([n_ex, t_dim], standardize=True)
        # 生成服从标准化的随机张量,取对数绝对值后加上eps作为t_log_var
        t_log_var = np.log(np.abs(random_tensor([n_ex, t_dim], standardize=True)   eps))
        # 生成2到40之间的随机整数作为图像列数和行数
        im_cols, im_rows = np.random.randint(2, 40), np.random.randint(2, 40)
        # 生成n_ex行im_rows*im_cols列的随机矩阵作为X和X_recon
        X = np.random.rand(n_ex, im_rows * im_cols)
        X_recon = np.random.rand(n_ex, im_rows * im_cols)

        # 创建VAELoss对象
        mine = VAELoss()
        # 计算VAE损失
        mine_loss = mine(X, X_recon, t_mean, t_log_var)
        # 计算损失函数关于输入的梯度
        dX_recon, dLogVar, dMean = mine.grad(X, X_recon, t_mean, t_log_var)
        # 从TorchVAELoss对象中提取梯度
        golds = TorchVAELoss().extract_grads(X, X_recon, t_mean, t_log_var)

        # 将损失和梯度存储在params列表中
        params = [
            (mine_loss, "loss"),
            (dX_recon, "dX_recon"),
            (dLogVar, "dt_log_var"),
            (dMean, "dt_mean"),
        ]
        # 打印当前试验的信息
        print("nTrial {}".format(i))
        # 遍历params列表,进行梯度检验
        for ix, (mine, label) in enumerate(params):
            # 使用np.testing.assert_allclose函数检查梯度是否接近期望值
            np.testing.assert_allclose(
                mine,
                golds[label],
                err_msg=err_fmt(params, golds, ix),
                rtol=0.1,
                atol=1e-2,
            )
            # 打印通过梯度检验的信息
            print("tPASSED {}".format(label))
        # 更新循环变量i
        i  = 1
def test_WGAN_GP_loss(N=5):
    # 导入 WGAN_GPLoss 类
    from numpy_ml.neural_nets.losses import WGAN_GPLoss

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则设置为无穷大
    N = np.inf if N is None else N

    # 初始化循环计数器 i
    i = 1
    # 循环执行直到 i 达到 N
    while i < N:
        # 生成 lambda_ 值
        lambda_ = np.random.randint(0, 10)
        # 生成样本数 n_ex
        n_ex = np.random.randint(1, 10)
        # 生成特征数 n_feats
        n_feats = np.random.randint(2, 10)
        # 生成真实样本 Y_real
        Y_real = random_tensor([n_ex], standardize=True)
        # 生成虚假样本 Y_fake
        Y_fake = random_tensor([n_ex], standardize=True)
        # 生成梯度插值 gradInterp
        gradInterp = random_tensor([n_ex, n_feats], standardize=True)

        # 创建 WGAN_GPLoss 实例
        mine = WGAN_GPLoss(lambda_=lambda_)
        # 计算 C_loss
        C_loss = mine(Y_fake, "C", Y_real, gradInterp)
        # 计算 G_loss
        G_loss = mine(Y_fake, "G")

        # 计算 C_loss 的梯度
        C_dY_fake, dY_real, dGradInterp = mine.grad(Y_fake, "C", Y_real, gradInterp)
        # 计算 G_loss 的梯度
        G_dY_fake = mine.grad(Y_fake, "G")

        # 提取 TorchWGANGPLoss 类的梯度
        golds = TorchWGANGPLoss(lambda_).extract_grads(Y_real, Y_fake, gradInterp)
        # 如果梯度中存在 NaN 值,则跳过当前循环
        if np.isnan(golds["C_dGradInterp"]).any():
            continue

        # 设置参数列表
        params = [
            (Y_real, "Y_real"),
            (Y_fake, "Y_fake"),
            (gradInterp, "gradInterp"),
            (C_loss, "C_loss"),
            (G_loss, "G_loss"),
            (-dY_real, "C_dY_real"),
            (-C_dY_fake, "C_dY_fake"),
            (dGradInterp, "C_dGradInterp"),
            (G_dY_fake, "G_dY_fake"),
        ]

        # 打印当前试验的信息
        print("nTrial {}".format(i))
        # 遍历参数列表,进行断言比较
        for ix, (mine, label) in enumerate(params):
            np.testing.assert_allclose(
                mine,
                golds[label],
                err_msg=err_fmt(params, golds, ix),
                rtol=0.1,
                atol=1e-2,
            )
            print("tPASSED {}".format(label))
        # 更新循环计数器 i
        i  = 1


def test_NCELoss(N=1):
    # 导入 NCELoss 类和 DiscreteSampler 类
    from numpy_ml.neural_nets.losses import NCELoss
    from numpy_ml.utils.data_structures import DiscreteSampler

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则设置为无穷大
    N = np.inf if N is None else N

    # 初始化循环计数器 i
    i = 1
#######################################################################
#                       Loss Function Gradients                       #
# 导入所需的模块和函数
def test_squared_error_grad(N=15):
    # 导入SquaredError损失函数和Tanh激活函数
    from numpy_ml.neural_nets.losses import SquaredError
    from numpy_ml.neural_nets.activations import Tanh

    # 设置随机种子
    np.random.seed(12345)

    # 如果N为None,则将N设置为无穷大
    N = np.inf if N is None else N

    # 创建SquaredError对象和torch_mse_grad对象
    mine = SquaredError()
    gold = torch_mse_grad
    act = Tanh()

    # 初始化循环计数器i
    i = 1
    # 当i小于N时执行循环
    while i < N:
        # 随机生成维度和样本数
        n_dims = np.random.randint(2, 100)
        n_examples = np.random.randint(1, 1000)
        y = random_tensor((n_examples, n_dims))

        # 生成随机输入
        z = random_tensor((n_examples, n_dims))
        y_pred = act.fn(z)

        # 断言SquaredError的梯度计算结果与torch_mse_grad的结果相近
        assert_almost_equal(
            mine.grad(y, y_pred, z, act), 0.5 * gold(y, z, torch.tanh), decimal=4
        )
        print("PASSED")
        i  = 1


# 定义测试交叉熵梯度的函数
def test_cross_entropy_grad(N=15):
    # 导入CrossEntropy损失函数和Softmax层
    from numpy_ml.neural_nets.losses import CrossEntropy
    from numpy_ml.neural_nets.layers import Softmax

    # 设置随机种子
    np.random.seed(12345)

    # 如果N为None,则将N设置为无穷大
    N = np.inf if N is None else N

    # 创建CrossEntropy对象和torch_xe_grad对象
    mine = CrossEntropy()
    gold = torch_xe_grad
    sm = Softmax()

    # 初始化循环计数器i
    i = 1
    # 当i小于N时执行循环
    while i < N:
        # 随机生成类别数和样本数
        n_classes = np.random.randint(2, 100)
        n_examples = np.random.randint(1, 1000)

        y = random_one_hot_matrix(n_examples, n_classes)

        # cross_entropy_gradient返回相对于z的梯度(而不是softmax(z))
        z = random_tensor((n_examples, n_classes))
        y_pred = sm.forward(z)

        # 断言CrossEntropy的梯度计算结果与torch_xe_grad的结果相近
        assert_almost_equal(mine.grad(y, y_pred), gold(y, z), decimal=5)
        print("PASSED")
        i  = 1


#######################################################################
#                          Activations                                #
#######################################################################


# 定义测试Sigmoid激活函数的函数
def test_sigmoid_activation(N=15):
    # 导入Sigmoid激活函数
    from numpy_ml.neural_nets.activations import Sigmoid

    # 设置随机种子
    np.random.seed(12345)

    # 如果N为None,则将N设置为无穷大
    N = np.inf if N is None else N

    # 创建Sigmoid对象和expit函数
    mine = Sigmoid()
    gold = expit

    # 初始化循环计数器i
    i = 0
    # 当 i 小于 N 时执行循环
    while i < N:
        # 生成一个随机整数,表示张量的维度数量
        n_dims = np.random.randint(1, 100)
        # 生成一个随机张量,形状为 (1, n_dims)
        z = random_tensor((1, n_dims))
        # 断言自定义函数 mine.fn(z) 的输出与标准函数 gold(z) 的输出几乎相等
        assert_almost_equal(mine.fn(z), gold(z))
        # 打印 "PASSED" 表示测试通过
        print("PASSED")
        # i 自增
        i  = 1
# 测试 ELU 激活函数的功能
def test_elu_activation(N=15):
    # 导入 ELU 激活函数
    from numpy_ml.neural_nets.activations import ELU

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 初始化计数器 i
    i = 0
    # 循环 N 次
    while i < N:
        # 生成随机维度
        n_dims = np.random.randint(1, 10)
        # 生成随机张量
        z = random_tensor((1, n_dims))

        # 生成随机 alpha 值
        alpha = np.random.uniform(0, 10)

        # 创建 ELU 激活函数对象
        mine = ELU(alpha)
        # 创建 PyTorch 中的 ELU 函数
        gold = lambda z, a: F.elu(torch.from_numpy(z), alpha).numpy()

        # 断言 ELU 函数的输出与 PyTorch 中的 ELU 函数的输出几乎相等
        assert_almost_equal(mine.fn(z), gold(z, alpha))
        # 打印 "PASSED"
        print("PASSED")
        # 更新计数器
        i  = 1


# 测试 Softmax 激活函数的功能
def test_softmax_activation(N=15):
    # 导入 Softmax 层
    from numpy_ml.neural_nets.layers import Softmax

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 创建 Softmax 层对象
    mine = Softmax()
    # 创建 PyTorch 中的 Softmax 函数
    gold = lambda z: F.softmax(torch.FloatTensor(z), dim=1).numpy()

    # 初始化计数器 i
    i = 0
    # 循环 N 次
    while i < N:
        # 生成随机维度
        n_dims = np.random.randint(1, 100)
        # 生成随机概率矩阵
        z = random_stochastic_matrix(1, n_dims)
        # 断言 Softmax 函数的输出与 PyTorch 中的 Softmax 函数的输出几乎相等
        assert_almost_equal(mine.forward(z), gold(z))
        # 打印 "PASSED"
        print("PASSED")
        # 更新计数器
        i  = 1


# 测试 ReLU 激活函数的功能
def test_relu_activation(N=15):
    # 导入 ReLU 激活函数
    from numpy_ml.neural_nets.activations import ReLU

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 创建 ReLU 激活函数对象
    mine = ReLU()
    # 创建 PyTorch 中的 ReLU 函数
    gold = lambda z: F.relu(torch.FloatTensor(z)).numpy()

    # 初始化计数器 i
    i = 0
    # 循环 N 次
    while i < N:
        # 生成随机维度
        n_dims = np.random.randint(1, 100)
        # 生成随机概率矩阵
        z = random_stochastic_matrix(1, n_dims)
        # 断言 ReLU 函数的输出与 PyTorch 中的 ReLU 函数的输出几乎相等
        assert_almost_equal(mine.fn(z), gold(z))
        # 打印 "PASSED"
        print("PASSED")
        # 更新计数器
        i  = 1


# 测试 SoftPlus 激活函数的功能
def test_softplus_activation(N=15):
    # 导入 SoftPlus 激活函数
    from numpy_ml.neural_nets.activations import SoftPlus

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 创建 SoftPlus 激活函数对象
    mine = SoftPlus()
    # 创建 PyTorch 中的 SoftPlus 函数
    gold = lambda z: F.softplus(torch.FloatTensor(z)).numpy()

    # 初始化计数器 i
    i = 0
    # 循环 N 次
    while i < N:
        # 生成随机维度
        n_dims = np.random.randint(1, 100)
        # 生成随机概率矩阵
        z = random_stochastic_matrix(1, n_dims)
        # 断言 SoftPlus 函数的输出与 PyTorch 中的 SoftPlus 函数的输出几乎相等
        assert_almost_equal(mine.fn(z), gold(z))
        # 打印 "PASSED"
        print("PASSED")
        # 更新计数器
        i  = 1


#######################################################################
#                      Activation Gradients                           #
# 导入所需的库和模块
def test_sigmoid_grad(N=15):
    # 从 numpy_ml.neural_nets.activations 模块中导入 Sigmoid 类
    from numpy_ml.neural_nets.activations import Sigmoid

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 创建 Sigmoid 实例 mine 和 torch 中的 sigmoid 梯度函数实例 gold
    mine = Sigmoid()
    gold = torch_gradient_generator(torch.sigmoid)

    # 初始化计数器 i
    i = 0
    # 循环执行 N 次
    while i < N:
        # 生成随机的样本数和维度
        n_ex = np.random.randint(1, 100)
        n_dims = np.random.randint(1, 100)
        # 生成随机的张量 z
        z = random_tensor((n_ex, n_dims))
        # 断言 Sigmoid 实例的梯度和 torch 中的 sigmoid 梯度函数的结果几乎相等
        assert_almost_equal(mine.grad(z), gold(z))
        # 打印 "PASSED"
        print("PASSED")
        # 更新计数器
        i  = 1


# 类似上面的注释,以下函数 test_elu_grad, test_tanh_grad, test_relu_grad 的注释内容相同,只是激活函数不同
# 测试 Softmax 层的梯度计算
def test_softmax_grad(N=15):
    # 导入 Softmax 层和部分函数
    from numpy_ml.neural_nets.layers import Softmax
    from functools import partial

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则设置为无穷大
    N = np.inf if N is None else N
    # 创建 Softmax 函数的偏函数
    p_soft = partial(F.softmax, dim=1)
    # 生成 Torch 梯度函数
    gold = torch_gradient_generator(p_soft)

    # 初始化计数器 i
    i = 0
    # 循环直到达到 N 次
    while i < N:
        # 创建 Softmax 层实例
        mine = Softmax()
        # 随机生成样本数和维度
        n_ex = np.random.randint(1, 3)
        n_dims = np.random.randint(1, 50)
        # 生成随机张量
        z = random_tensor((n_ex, n_dims), standardize=True)
        # 前向传播
        out = mine.forward(z)

        # 断言梯度计算结果准确性
        assert_almost_equal(
            gold(z),
            mine.backward(np.ones_like(out)),
            err_msg="Theirs:n{}nnMine:n{}n".format(
                gold(z), mine.backward(np.ones_like(out))
            ),
            decimal=3,
        )
        # 打印测试通过信息
        print("PASSED")
        i  = 1


# 测试 Softplus 层的梯度计算
def test_softplus_grad(N=15):
    # 导入 Softplus 层
    from numpy_ml.neural_nets.activations import SoftPlus

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则设置为无穷大
    N = np.inf if N is None else N

    # 创建 Softplus 层实例
    mine = SoftPlus()
    # 生成 Torch 梯度函数
    gold = torch_gradient_generator(F.softplus)

    # 初始化计数器 i
    i = 0
    # 循环直到达到 N 次
    while i < N:
        # 随机生成样本数和维度
        n_ex = np.random.randint(1, 100)
        n_dims = np.random.randint(1, 100)
        # 生成随机张量
        z = random_tensor((n_ex, n_dims), standardize=True)
        # 断言梯度计算结果准确性
        assert_almost_equal(mine.grad(z), gold(z))
        # 打印测试通过信息
        print("PASSED")
        i  = 1


#######################################################################
#                          Layers                                     #
#######################################################################


# 测试全连接层
def test_FullyConnected(N=15):
    # 导入全连接层和激活函数
    from numpy_ml.neural_nets.layers import FullyConnected
    from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则设置为无穷大
    N = np.inf if N is None else N

    # 定义激活函数列表
    acts = [
        (Tanh(), nn.Tanh(), "Tanh"),
        (Sigmoid(), nn.Sigmoid(), "Sigmoid"),
        (ReLU(), nn.ReLU(), "ReLU"),
        (Affine(), TorchLinearActivation(), "Affine"),
    ]

    # 初始化计数器 i
    i = 1
    # 当 i 小于 N   1 时执行循环
    while i < N   1:
        # 生成随机整数,作为外部神经元、内部神经元和输出神经元的数量
        n_ex = np.random.randint(1, 100)
        n_in = np.random.randint(1, 100)
        n_out = np.random.randint(1, 100)
        # 生成随机张量 X,形状为 (n_ex, n_in),并进行标准化处理
        X = random_tensor((n_ex, n_in), standardize=True)

        # 随机选择一个激活函数
        act_fn, torch_fn, act_fn_name = acts[np.random.randint(0, len(acts))]

        # 初始化全连接层 L1,设置输出神经元数量和激活函数
        L1 = FullyConnected(n_out=n_out, act_fn=act_fn)

        # 前向传播
        y_pred = L1.forward(X)

        # 反向传播
        dLdy = np.ones_like(y_pred)
        dLdX = L1.backward(dLdy)

        # 获取标准梯度
        gold_mod = TorchFCLayer(n_in, n_out, torch_fn, L1.parameters)
        golds = gold_mod.extract_grads(X)

        # 定义参数列表,包括输入 X、预测值 y、权重 W、偏置 b、损失对预测值的梯度 dLdy、权重梯度 dLdW、偏置梯度 dLdB、输入梯度 dLdX
        params = [
            (L1.X[0], "X"),
            (y_pred, "y"),
            (L1.parameters["W"].T, "W"),
            (L1.parameters["b"], "b"),
            (dLdy, "dLdy"),
            (L1.gradients["W"].T, "dLdW"),
            (L1.gradients["b"], "dLdB"),
            (dLdX, "dLdX"),
        ]

        # 打印当前试验的信息和激活函数名称
        print("nTrial {}nact_fn={}".format(i, act_fn_name))
        # 遍历参数列表,逐个比较计算得到的梯度和标准梯度是否接近
        for ix, (mine, label) in enumerate(params):
            assert_almost_equal(
                mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=3
            )
            print("tPASSED {}".format(label))
        # 更新试验次数
        i  = 1
# 测试 Embedding 层的功能,包括前向传播和反向传播
def test_Embedding(N=15):
    # 从 numpy_ml.neural_nets.layers 导入 Embedding 模块
    from numpy_ml.neural_nets.layers import Embedding

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 初始化计数器 i
    i = 1
    # 当 i 小于 N   1 时循环
    while i < N   1:
        # 随机生成词汇表大小
        vocab_size = np.random.randint(1, 2000)
        # 随机生成示例数
        n_ex = np.random.randint(1, 100)
        # 随机生成输入维度
        n_in = np.random.randint(1, 100)
        # 随机生成嵌入维度
        emb_dim = np.random.randint(1, 100)

        # 随机生成输入数据 X
        X = np.random.randint(0, vocab_size, (n_ex, n_in))

        # 初始化 Embedding 层
        L1 = Embedding(n_out=emb_dim, vocab_size=vocab_size)

        # 前向传播
        y_pred = L1.forward(X)

        # 反向传播
        dLdy = np.ones_like(y_pred)
        L1.backward(dLdy)

        # 获取标准梯度
        gold_mod = TorchEmbeddingLayer(vocab_size, emb_dim, L1.parameters)
        golds = gold_mod.extract_grads(X)

        # 定义参数列表
        params = [
            (L1.X[0], "X"),
            (y_pred, "y"),
            (L1.parameters["W"], "W"),
            (dLdy, "dLdy"),
            (L1.gradients["W"], "dLdW"),
        ]

        # 打印测试结果
        print("nTrial {}".format(i))
        for ix, (mine, label) in enumerate(params):
            assert_almost_equal(
                mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=3
            )
            print("tPASSED {}".format(label))
        i  = 1


# 测试 BatchNorm1D 层的功能
def test_BatchNorm1D(N=15):
    # 从 numpy_ml.neural_nets.layers 导入 BatchNorm1D 模块
    from numpy_ml.neural_nets.layers import BatchNorm1D

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 重新设置随机种子
    np.random.seed(12345)

    # 初始化计数器 i
    i = 1
    # 当 i 小于 N 1 时执行循环
    while i < N   1:
        # 生成一个随机整数,范围在[2, 1000)
        n_ex = np.random.randint(2, 1000)
        # 生成一个随机整数,范围在[1, 1000)
        n_in = np.random.randint(1, 1000)
        # 生成一个随机的张量,形状为(n_ex, n_in),并进行标准化处理
        X = random_tensor((n_ex, n_in), standardize=True)

        # 初始化 BatchNorm1D 层
        L1 = BatchNorm1D()

        # 前向传播
        y_pred = L1.forward(X)

        # 反向传播
        dLdy = np.ones_like(y_pred)
        dLdX = L1.backward(dLdy)

        # 获取标准梯度
        gold_mod = TorchBatchNormLayer(
            n_in, L1.parameters, "1D", epsilon=L1.epsilon, momentum=L1.momentum
        )
        golds = gold_mod.extract_grads(X)

        # 定义参数列表
        params = [
            (L1.X[0], "X"),
            (y_pred, "y"),
            (L1.parameters["scaler"].T, "scaler"),
            (L1.parameters["intercept"], "intercept"),
            (L1.parameters["running_mean"], "running_mean"),
            #  (L1.parameters["running_var"], "running_var"),
            (L1.gradients["scaler"], "dLdScaler"),
            (L1.gradients["intercept"], "dLdIntercept"),
            (dLdX, "dLdX"),
        ]

        # 打印当前试验的信息
        print("Trial {}".format(i))
        # 遍历参数列表,逐个进行梯度检验
        for ix, (mine, label) in enumerate(params):
            assert_almost_equal(
                mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=1
            )
            print("tPASSED {}".format(label))
        # 更新试验次数
        i  = 1
# 定义一个测试函数,用于测试 LayerNorm1D 层
def test_LayerNorm1D(N=15):
    # 从 numpy_ml.neural_nets.layers 模块导入 LayerNorm1D 类
    from numpy_ml.neural_nets.layers import LayerNorm1D

    # 如果 N 为 None,则将其设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 初始化循环计数器 i
    i = 1
    # 当 i 小于 N   1 时循环执行以下代码块
    while i < N   1:
        # 生成随机的样本数和输入特征数
        n_ex = np.random.randint(2, 1000)
        n_in = np.random.randint(1, 1000)
        # 生成随机的输入数据 X
        X = random_tensor((n_ex, n_in), standardize=True)

        # 初始化 LayerNorm1D 层
        L1 = LayerNorm1D()

        # 前向传播
        y_pred = L1.forward(X)

        # 反向传播
        dLdy = np.ones_like(y_pred)
        dLdX = L1.backward(dLdy)

        # 获取标准梯度
        gold_mod = TorchLayerNormLayer(n_in, L1.parameters, "1D", epsilon=L1.epsilon)
        golds = gold_mod.extract_grads(X)

        # 定义参数列表
        params = [
            (L1.X[0], "X"),
            (y_pred, "y"),
            (L1.parameters["scaler"].T, "scaler"),
            (L1.parameters["intercept"], "intercept"),
            (L1.gradients["scaler"], "dLdScaler"),
            (L1.gradients["intercept"], "dLdIntercept"),
            (dLdX, "dLdX"),
        ]

        # 打印当前试验的编号
        print("Trial {}".format(i))
        # 遍历参数列表,比较计算得到的梯度和标准梯度是否接近
        for ix, (mine, label) in enumerate(params):
            assert_almost_equal(
                mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=3
            )
            print("tPASSED {}".format(label))
        # 更新循环计数器
        i  = 1


# 定义一个测试函数,用于测试 LayerNorm2D 层
def test_LayerNorm2D(N=15):
    # 从 numpy_ml.neural_nets.layers 模块导入 LayerNorm2D 类
    from numpy_ml.neural_nets.layers import LayerNorm2D

    # 如果 N 为 None,则将其设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 初始化循环计数器 i
    i = 1
    # 当 i 小于 N   1 时循环执行以下代码块
    while i < N   1:
        # 生成一个随机整数,范围在 [2, 10)
        n_ex = np.random.randint(2, 10)
        # 生成一个随机整数,范围在 [1, 10)
        in_rows = np.random.randint(1, 10)
        # 生成一个随机整数,范围在 [1, 10)
        in_cols = np.random.randint(1, 10)
        # 生成一个随机整数,范围在 [1, 3)
        n_in = np.random.randint(1, 3)

        # 初始化 LayerNorm2D 层
        X = random_tensor((n_ex, in_rows, in_cols, n_in), standardize=True)
        L1 = LayerNorm2D()

        # 前向传播
        y_pred = L1.forward(X)

        # 标准损失函数
        dLdy = np.ones_like(X)
        dLdX = L1.backward(dLdy)

        # 获取标准梯度
        gold_mod = TorchLayerNormLayer(
            [n_in, in_rows, in_cols], L1.parameters, mode="2D", epsilon=L1.epsilon
        )
        golds = gold_mod.extract_grads(X, Y_true=None)

        # 定义参数列表
        params = [
            (L1.X[0], "X"),
            (L1.hyperparameters["epsilon"], "epsilon"),
            (L1.parameters["scaler"], "scaler"),
            (L1.parameters["intercept"], "intercept"),
            (y_pred, "y"),
            (L1.gradients["scaler"], "dLdScaler"),
            (L1.gradients["intercept"], "dLdIntercept"),
            (dLdX, "dLdX"),
        ]

        # 打印当前试验的信息
        print("Trial {}".format(i))
        # 遍历参数列表,逐个进行断言比较
        for ix, (mine, label) in enumerate(params):
            assert_almost_equal(
                mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=3
            )

            # 打印通过断言的参数信息
            print("tPASSED {}".format(label))

        # 更新循环变量 i
        i  = 1
# 定义一个测试 MultiplyLayer 的函数,可以指定测试次数 N,默认为 15
def test_MultiplyLayer(N=15):
    # 导入所需的模块和类
    from numpy_ml.neural_nets.layers import Multiply
    from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 定义激活函数列表
    acts = [
        (Tanh(), nn.Tanh(), "Tanh"),
        (Sigmoid(), nn.Sigmoid(), "Sigmoid"),
        (ReLU(), nn.ReLU(), "ReLU"),
        (Affine(), TorchLinearActivation(), "Affine"),
    ]

    # 初始化计数器 i
    i = 1
    # 循环进行 N 次测试
    while i < N   1:
        # 初始化输入数据列表 Xs
        Xs = []
        # 生成随机数,作为样本数和输入维度
        n_ex = np.random.randint(1, 100)
        n_in = np.random.randint(1, 100)
        n_entries = np.random.randint(2, 5)
        # 生成 n_entries 个随机张量,加入 Xs 列表
        for _ in range(n_entries):
            Xs.append(random_tensor((n_ex, n_in), standardize=True))

        # 随机选择一个激活函数
        act_fn, torch_fn, act_fn_name = acts[np.random.randint(0, len(acts))]

        # 初始化 Multiply 层
        L1 = Multiply(act_fn)

        # 前向传播
        y_pred = L1.forward(Xs)

        # 反向传播
        dLdy = np.ones_like(y_pred)
        dLdXs = L1.backward(dLdy)

        # 获取标准梯度
        gold_mod = TorchMultiplyLayer(torch_fn)
        golds = gold_mod.extract_grads(Xs)

        # 构建参数列表
        params = [(Xs, "Xs"), (y_pred, "Y")]
        params.extend(
            [(dldxi, "dLdX{}".format(i   1)) for i, dldxi in enumerate(dLdXs)]
        )

        # 打印测试结果
        print("nTrial {}".format(i))
        print("n_ex={}, n_in={}".format(n_ex, n_in))
        print("n_entries={}, act_fn={}".format(n_entries, str(act_fn)))
        for ix, (mine, label) in enumerate(params):
            # 断言近似相等
            assert_almost_equal(
                mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=1
            )
            print("tPASSED {}".format(label))
        i  = 1


# 定义一个测试 AddLayer 的函数,可以指定测试次数 N,默认为 15
def test_AddLayer(N=15):
    # 导入所需的模块和类
    from numpy_ml.neural_nets.layers import Add
    from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)
    # 定义不同激活函数的元组列表
    acts = [
        (Tanh(), nn.Tanh(), "Tanh"),
        (Sigmoid(), nn.Sigmoid(), "Sigmoid"),
        (ReLU(), nn.ReLU(), "ReLU"),
        (Affine(), TorchLinearActivation(), "Affine"),
    ]

    # 初始化循环计数器
    i = 1
    # 循环执行 N 次
    while i < N   1:
        # 初始化输入数据列表
        Xs = []
        # 生成随机的样本数和输入维度
        n_ex = np.random.randint(1, 100)
        n_in = np.random.randint(1, 100)
        # 生成随机的输入数据条目数
        n_entries = np.random.randint(2, 5)
        # 生成随机的输入数据并添加到 Xs 列表中
        for _ in range(n_entries):
            Xs.append(random_tensor((n_ex, n_in), standardize=True))

        # 随机选择激活函数
        act_fn, torch_fn, act_fn_name = acts[np.random.randint(0, len(acts))]

        # 初始化 Add 层
        L1 = Add(act_fn)

        # 前向传播
        y_pred = L1.forward(Xs)

        # 反向传播
        dLdy = np.ones_like(y_pred)
        dLdXs = L1.backward(dLdy)

        # 获取标准梯度
        gold_mod = TorchAddLayer(torch_fn)
        golds = gold_mod.extract_grads(Xs)

        # 构建参数列表
        params = [(Xs, "Xs"), (y_pred, "Y")]
        params.extend(
            [(dldxi, "dLdX{}".format(i   1)) for i, dldxi in enumerate(dLdXs)]
        )

        # 打印当前试验信息
        print("nTrial {}".format(i))
        print("n_ex={}, n_in={}".format(n_ex, n_in))
        print("n_entries={}, act_fn={}".format(n_entries, str(act_fn)))
        # 遍历参数列表,进行梯度检验
        for ix, (mine, label) in enumerate(params):
            assert_almost_equal(
                mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=1
            )
            print("tPASSED {}".format(label))
        # 更新循环计数器
        i  = 1
# 定义测试 BatchNorm2D 的函数,参数 N 为测试次数,默认为 15
def test_BatchNorm2D(N=15):
    # 导入 BatchNorm2D 模块
    from numpy_ml.neural_nets.layers import BatchNorm2D

    # 如果 N 为 None,则将其设为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 初始化循环计数器 i
    i = 1
    # 循环执行 N 次测试
    while i < N   1:
        # 生成随机的样本数、输入行数、输入列数和输入通道数
        n_ex = np.random.randint(2, 10)
        in_rows = np.random.randint(1, 10)
        in_cols = np.random.randint(1, 10)
        n_in = np.random.randint(1, 3)

        # 初始化 BatchNorm2D 层
        X = random_tensor((n_ex, in_rows, in_cols, n_in), standardize=True)
        L1 = BatchNorm2D()

        # 前向传播
        y_pred = L1.forward(X)

        # 标准损失函数
        dLdy = np.ones_like(X)
        dLdX = L1.backward(dLdy)

        # 获取标准梯度
        gold_mod = TorchBatchNormLayer(
            n_in, L1.parameters, mode="2D", epsilon=L1.epsilon, momentum=L1.momentum
        )
        golds = gold_mod.extract_grads(X, Y_true=None)

        # 定义参数列表
        params = [
            (L1.X[0], "X"),
            (L1.hyperparameters["momentum"], "momentum"),
            (L1.hyperparameters["epsilon"], "epsilon"),
            (L1.parameters["scaler"].T, "scaler"),
            (L1.parameters["intercept"], "intercept"),
            (L1.parameters["running_mean"], "running_mean"),
            #  (L1.parameters["running_var"], "running_var"),
            (y_pred, "y"),
            (L1.gradients["scaler"], "dLdScaler"),
            (L1.gradients["intercept"], "dLdIntercept"),
            (dLdX, "dLdX"),
        ]

        # 打印当前测试的序号
        print("Trial {}".format(i))
        # 遍历参数列表,逐个进行断言比较
        for ix, (mine, label) in enumerate(params):
            assert_almost_equal(
                mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=3
            )

            print("tPASSED {}".format(label))

        # 更新循环计数器
        i  = 1


# 定义测试 RNNCell 的函数,参数 N 为测试次数,默认为 15
def test_RNNCell(N=15):
    # 导入 RNNCell 模块
    from numpy_ml.neural_nets.layers import RNNCell

    # 如果 N 为 None,则将其设为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 初始化循环计数器 i
    i = 1
    # 循环执行 N 次
    while i < N   1:
        # 生成随机数,表示外部输入、内部状态、输出、时间步数
        n_ex = np.random.randint(1, 10)
        n_in = np.random.randint(1, 10)
        n_out = np.random.randint(1, 10)
        n_t = np.random.randint(1, 10)
        # 生成随机张量 X
        X = random_tensor((n_ex, n_in, n_t), standardize=True)

        # 初始化 RNN 层
        L1 = RNNCell(n_out=n_out)

        # 前向传播
        y_preds = []
        for t in range(n_t):
            y_pred = L1.forward(X[:, :, t])
            y_preds  = [y_pred]

        # 反向传播
        dLdX = []
        dLdAt = np.ones_like(y_preds[t])
        for t in reversed(range(n_t)):
            dLdXt = L1.backward(dLdAt)
            dLdX.insert(0, dLdXt)
        dLdX = np.dstack(dLdX)

        # 获取标准梯度
        gold_mod = TorchRNNCell(n_in, n_out, L1.parameters)
        golds = gold_mod.extract_grads(X)

        # 定义参数列表
        params = [
            (X, "X"),
            (np.array(y_preds), "y"),
            (L1.parameters["ba"].T, "ba"),
            (L1.parameters["bx"].T, "bx"),
            (L1.parameters["Wax"].T, "Wax"),
            (L1.parameters["Waa"].T, "Waa"),
            (L1.gradients["ba"].T, "dLdBa"),
            (L1.gradients["bx"].T, "dLdBx"),
            (L1.gradients["Wax"].T, "dLdWax"),
            (L1.gradients["Waa"].T, "dLdWaa"),
            (dLdX, "dLdX"),
        ]

        # 打印当前试验次数
        print("Trial {}".format(i))
        # 遍历参数列表,进行梯度检验
        for ix, (mine, label) in enumerate(params):
            np.testing.assert_allclose(
                mine,
                golds[label],
                err_msg=err_fmt(params, golds, ix),
                atol=1e-3,
                rtol=1e-3,
            )
            print("tPASSED {}".format(label))
        i  = 1
# 定义一个测试函数,用于测试 Conv2D 层的功能
def test_Conv2D(N=15):
    # 从相应的模块中导入需要的类
    from numpy_ml.neural_nets.layers import Conv2D
    from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine

    # 如果 N 为 None,则将其设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 定义激活函数列表
    acts = [
        (Tanh(), nn.Tanh(), "Tanh"),
        (Sigmoid(), nn.Sigmoid(), "Sigmoid"),
        (ReLU(), nn.ReLU(), "ReLU"),
        (Affine(), TorchLinearActivation(), "Affine"),
    ]

    # 初始化计数器 i
    i = 1

# 定义一个测试函数,用于测试 DPAttention 层的功能
def test_DPAttention(N=15):
    # 从相应的模块中导入 DotProductAttention 类
    from numpy_ml.neural_nets.layers import DotProductAttention

    # 如果 N 为 None,则将其设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 初始化计数器 i
    i = 1
    # 当 i 小于 N 1 时循环
    while i < N   1:
        # 生成随机数
        n_ex = np.random.randint(1, 10)
        d_k = np.random.randint(1, 100)
        d_v = np.random.randint(1, 100)

        # 生成随机张量 Q, K, V
        Q = random_tensor((n_ex, d_k), standardize=True)
        K = random_tensor((n_ex, d_k), standardize=True)
        V = random_tensor((n_ex, d_v), standardize=True)

        # 初始化 DotProductAttention 层
        mine = DotProductAttention(scale=True, dropout_p=0)

        # 前向传播
        y_pred = mine.forward(Q, K, V)

        # 反向传播
        dLdy = np.ones_like(y_pred)
        dLdQ, dLdK, dLdV = mine.backward(dLdy)

        # 获取标准梯度
        gold_mod = TorchSDPAttentionLayer()
        golds = gold_mod.extract_grads(Q, K, V)

        # 定义参数列表
        params = [
            (mine.X[0][0], "Q"),
            (mine.X[0][1], "K"),
            (mine.X[0][2], "V"),
            (y_pred, "Y"),
            (dLdV, "dLdV"),
            (dLdK, "dLdK"),
            (dLdQ, "dLdQ"),
        ]

        # 打印测试结果
        print("nTrial {}".format(i))
        print("n_ex={} d_k={} d_v={}".format(n_ex, d_k, d_v))
        for ix, (mine, label) in enumerate(params):
            assert_almost_equal(
                mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=4
            )
            print("tPASSED {}".format(label))
        i  = 1

# 定义一个测试函数,用于测试 Conv1D 层的功能
def test_Conv1D(N=15):
    # 从相应的模块中导入 Conv1D 类
    from numpy_ml.neural_nets.layers import Conv1D
    # 从指定路径导入 Tanh、ReLU、Sigmoid 和 Affine 激活函数以及 Affine 层
    from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine
    
    # 如果 N 为 None,则将 N 设置为正无穷
    N = np.inf if N is None else N
    
    # 设置随机种子为 12345
    np.random.seed(12345)
    
    # 定义激活函数列表,每个元素包含自定义的激活函数对象、PyTorch 中对应的激活函数对象和激活函数名称
    acts = [
        (Tanh(), nn.Tanh(), "Tanh"),
        (Sigmoid(), nn.Sigmoid(), "Sigmoid"),
        (ReLU(), nn.ReLU(), "ReLU"),
        (Affine(), TorchLinearActivation(), "Affine"),
    ]
    
    # 初始化计数器 i 为 1
    i = 1
    # 循环执行 N 次
    while i < N   1:
        # 生成随机整数,表示例外数量
        n_ex = np.random.randint(1, 10)
        # 生成随机整数,表示输入序列长度
        l_in = np.random.randint(1, 10)
        # 生成随机整数,表示输入输出通道数
        n_in, n_out = np.random.randint(1, 3), np.random.randint(1, 3)
        # 生成随机整数,表示卷积核宽度
        f_width = min(l_in, np.random.randint(1, 5))
        # 生成随机整数,表示填充和步长
        p, s = np.random.randint(0, 5), np.random.randint(1, 3)
        # 生成随机整数,表示膨胀率
        d = np.random.randint(0, 5)

        # 计算卷积核的参数数量
        fc = f_width * (d   1) - d
        # 计算输出序列长度
        l_out = int(1   (l_in   2 * p - fc) / s)

        # 如果输出序列长度小于等于0,则跳过本次循环
        if l_out <= 0:
            continue

        # 生成随机张量作为输入数据
        X = random_tensor((n_ex, l_in, n_in), standardize=True)

        # 随机选择激活函数
        act_fn, torch_fn, act_fn_name = acts[np.random.randint(0, len(acts))]

        # 初始化一维卷积层
        L1 = Conv1D(
            out_ch=n_out,
            kernel_width=f_width,
            act_fn=act_fn,
            pad=p,
            stride=s,
            dilation=d,
        )

        # 前向传播
        y_pred = L1.forward(X)

        # 反向传播
        dLdy = np.ones_like(y_pred)
        dLdX = L1.backward(dLdy)

        # 获取标准梯度
        gold_mod = TorchConv1DLayer(
            n_in, n_out, torch_fn, L1.parameters, L1.hyperparameters
        )
        golds = gold_mod.extract_grads(X)

        # 定义参数列表
        params = [
            (L1.X[0], "X"),
            (y_pred, "y"),
            (L1.parameters["W"], "W"),
            (L1.parameters["b"], "b"),
            (L1.gradients["W"], "dLdW"),
            (L1.gradients["b"], "dLdB"),
            (dLdX, "dLdX"),
        ]

        # 打印当前试验信息
        print("nTrial {}".format(i))
        print("pad={}, stride={}, f_width={}, n_ex={}".format(p, s, f_width, n_ex))
        print("l_in={}, n_in={}".format(l_in, n_in))
        print("l_out={}, n_out={}".format(l_out, n_out))
        print("dilation={}".format(d))
        # 遍历参数列表,检查梯度是否正确
        for ix, (mine, label) in enumerate(params):
            assert_almost_equal(
                mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=4
            )
            print("tPASSED {}".format(label))
        i  = 1
# 定义用于测试 Deconv2D 层的函数,N 默认为 15
def test_Deconv2D(N=15):
    # 导入必要的模块和类
    from numpy_ml.neural_nets.layers import Deconv2D
    from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 定义激活函数列表,每个元素包含激活函数对象、对应的 Torch 模块、激活函数名称
    acts = [
        (Tanh(), nn.Tanh(), "Tanh"),
        (Sigmoid(), nn.Sigmoid(), "Sigmoid"),
        (ReLU(), nn.ReLU(), "ReLU"),
        (Affine(), TorchLinearActivation(), "Affine"),
    ]

    # 初始化计数器
    i = 1

# 定义用于测试 Pool2D 层的函数,N 默认为 15
def test_Pool2D(N=15):
    # 导入必要的模块和类
    from numpy_ml.neural_nets.layers import Pool2D

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 初始化计数器
    i = 1
    # 循环执行直到 i 大于 N
    while i < N   1:
        # 生成随机整数,表示样本数
        n_ex = np.random.randint(1, 10)
        # 生成随机整数,表示输入数据的行数
        in_rows = np.random.randint(1, 10)
        # 生成随机整数,表示输入数据的列数
        in_cols = np.random.randint(1, 10)
        # 生成随机整数,表示输入数据的通道数
        n_in = np.random.randint(1, 3)
        # 生成随机的过滤器形状
        f_shape = (
            min(in_rows, np.random.randint(1, 5)),
            min(in_cols, np.random.randint(1, 5)),
        )
        # 生成随机的填充值和步长
        p, s = np.random.randint(0, max(1, min(f_shape) // 2)), np.random.randint(1, 3)
        # 设置池化层的模式为"average"
        mode = "average"
        # 计算输出数据的行数和列数
        out_rows = int(1   (in_rows   2 * p - f_shape[0]) / s)
        out_cols = int(1   (in_cols   2 * p - f_shape[1]) / s)

        # 生成随机输入数据
        X = random_tensor((n_ex, in_rows, in_cols, n_in), standardize=True)
        print("nmode: {}".format(mode))
        print("pad={}, stride={}, f_shape={}, n_ex={}".format(p, s, f_shape, n_ex))
        print("in_rows={}, in_cols={}, n_in={}".format(in_rows, in_cols, n_in))
        print("out_rows={}, out_cols={}, n_out={}".format(out_rows, out_cols, n_in))

        # 初始化 Pool2D 层
        L1 = Pool2D(kernel_shape=f_shape, pad=p, stride=s, mode=mode)

        # 前向传播
        y_pred = L1.forward(X)

        # 反向传播
        dLdy = np.ones_like(y_pred)
        dLdX = L1.backward(dLdy)

        # 获取标准梯度
        gold_mod = TorchPool2DLayer(n_in, L1.hyperparameters)
        golds = gold_mod.extract_grads(X)

        # 检查梯度是否正确
        params = [(L1.X[0], "X"), (y_pred, "y"), (dLdX, "dLdX")]
        for ix, (mine, label) in enumerate(params):
            assert_almost_equal(
                mine, golds[label], err_msg=err_fmt(params, golds, ix), decimal=4
            )
            print("tPASSED {}".format(label))
        i  = 1
# 定义一个测试 LSTMCell 的函数,N 默认为 15
def test_LSTMCell(N=15):
    # 导入 LSTMCell 模块
    from numpy_ml.neural_nets.layers import LSTMCell

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子为 12345
    np.random.seed(12345)

    # 初始化变量 i 为 1
    i = 1

# 手动计算 vanilla RNN 参数的梯度
def grad_check_RNN(model, loss_func, param_name, n_t, X, epsilon=1e-7):
    """
    Manual gradient calc for vanilla RNN parameters
    """
    # 如果参数名为 "Ba" 或 "Bx",则将参数名转换为小写
    if param_name in ["Ba", "Bx"]:
        param_name = param_name.lower()
    # 如果参数名为 "X" 或 "y",则返回 None
    elif param_name in ["X", "y"]:
        return None

    # 复制原始参数,并初始化梯度为与原始参数相同形状的零矩阵
    param_orig = model.parameters[param_name]
    model.flush_gradients()
    grads = np.zeros_like(param_orig)

    # 遍历参数的每个元素
    for flat_ix, val in enumerate(param_orig.flat):
        param = deepcopy(param_orig)
        md_ix = np.unravel_index(flat_ix, param.shape)

        # 正向计算
        y_preds_plus = []
        param[md_ix] = val   epsilon
        model.parameters[param_name] = param
        for t in range(n_t):
            y_pred_plus = model.forward(X[:, :, t])
            y_preds_plus  = [y_pred_plus]
        loss_plus = loss_func(y_preds_plus)
        model.flush_gradients()

        # 反向计算
        y_preds_minus = []
        param[md_ix] = val - epsilon
        model.parameters[param_name] = param
        for t in range(n_t):
            y_pred_minus = model.forward(X[:, :, t])
            y_preds_minus  = [y_pred_minus]
        loss_minus = loss_func(y_preds_minus)
        model.flush_gradients()

        # 计算梯度
        grad = (loss_plus - loss_minus) / (2 * epsilon)
        grads[md_ix] = grad
    return grads.T

# 定义一个测试 MultiHeadedAttentionModule 的函数,N 默认为 15
def test_MultiHeadedAttentionModule(N=15):
    # 导入 MultiHeadedAttentionModule 模块
    from numpy_ml.neural_nets.modules import MultiHeadedAttentionModule

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N
    # 设置随机种子为 12345
    np.random.seed(12345)

    # 初始化变量 i 为 1
    i = 1

# 定义一个测试 SkipConnectionIdentityModule 的函数,N 默认为 15
def test_SkipConnectionIdentityModule(N=15):
    # 导入 SkipConnectionIdentityModule 模块
    from numpy_ml.neural_nets.modules import SkipConnectionIdentityModule
    # 从指定路径导入 Tanh、ReLU、Sigmoid 和 Affine 激活函数以及 Affine 层
    from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine
    
    # 如果 N 为 None,则将 N 设置为正无穷
    N = np.inf if N is None else N
    
    # 设置随机种子为 12345
    np.random.seed(12345)
    
    # 定义激活函数列表,每个元素包含自定义的激活函数对象、PyTorch 中对应的激活函数对象和激活函数名称
    acts = [
        (Tanh(), nn.Tanh(), "Tanh"),
        (Sigmoid(), nn.Sigmoid(), "Sigmoid"),
        (ReLU(), nn.ReLU(), "ReLU"),
        (Affine(), TorchLinearActivation(), "Affine"),
    ]
    
    # 初始化计数器 i 为 1
    i = 1
# 测试 SkipConnectionConvModule 模块
def test_SkipConnectionConvModule(N=15):
    # 导入需要的模块和激活函数
    from numpy_ml.neural_nets.modules import SkipConnectionConvModule
    from numpy_ml.neural_nets.activations import Tanh, ReLU, Sigmoid, Affine

    # 如果 N 为 None,则设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 定义激活函数列表
    acts = [
        (Tanh(), nn.Tanh(), "Tanh"),
        (Sigmoid(), nn.Sigmoid(), "Sigmoid"),
        (ReLU(), nn.ReLU(), "ReLU"),
        (Affine(), TorchLinearActivation(), "Affine"),
    ]

    # 初始化计数器
    i = 1

# 测试 BidirectionalLSTM 模块
def test_BidirectionalLSTM(N=15):
    # 导入需要的模块
    from numpy_ml.neural_nets.modules import BidirectionalLSTM

    # 如果 N 为 None,则设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 初始化计数器
    i = 1

# 测试 WaveNetModule 模块
def test_WaveNetModule(N=10):
    # 导入需要的模块
    from numpy_ml.neural_nets.modules import WavenetResidualModule

    # 如果 N 为 None,则设置为无穷大
    N = np.inf if N is None else N

    # 设置随机种子
    np.random.seed(12345)

    # 初始化计数器
    i = 1

#######################################################################
#                                Utils                                #
#######################################################################

# 测试 pad1D 函数
def test_pad1D(N=15):
    # 导入需要的模块
    from numpy_ml.neural_nets.layers import Conv1D
    from .nn_torch_models import TorchCausalConv1d, torchify

    # 设置随机种子
    np.random.seed(12345)

    # 如果 N 为 None,则设置为无穷大
    N = np.inf if N is None else N

    # 初始化计数器
    i = 1

# 测试 conv 函数
def test_conv(N=15):
    # 设置随机种子
    np.random.seed(12345)
    # 如果 N 为 None,则设置为无穷大
    N = np.inf if N is None else N
    # 初始化计数器
    i = 0
    # 当 i 小于 N 时,执行以下循环
    while i < N:
        # 生成一个介于 2 到 15 之间的随机整数,作为例子数量
        n_ex = np.random.randint(2, 15)
        # 生成一个介于 2 到 15 之间的随机整数,作为输入数据的行数
        in_rows = np.random.randint(2, 15)
        # 生成一个介于 2 到 15 之间的随机整数,作为输入数据的列数
        in_cols = np.random.randint(2, 15)
        # 生成一个介于 2 到 15 之间的随机整数,作为输入数据的通道数
        in_ch = np.random.randint(2, 15)
        # 生成一个介于 2 到 15 之间的随机整数,作为输出数据的通道数
        out_ch = np.random.randint(2, 15)
        # 生成一个随机的形状元组,元组中的元素为输入数据的行数和列数
        f_shape = (
            min(in_rows, np.random.randint(2, 10)),
            min(in_cols, np.random.randint(2, 10)),
        )
        # 生成一个介于 1 到 3 之间的随机整数,作为卷积步长
        s = np.random.randint(1, 3)
        # 生成一个介于 0 到 5 之间的随机整数,作为填充大小
        p = np.random.randint(0, 5)

        # 生成一个随机的输入数据张量
        X = np.random.rand(n_ex, in_rows, in_cols, in_ch)
        # 对输入数据进行二维填充
        X_pad, p = pad2D(X, p)
        # 生成一个随机的权重张量
        W = np.random.randn(f_shape[0], f_shape[1], in_ch, out_ch)

        # 使用朴素的方法进行二维卷积操作,得到期望的输出
        gold = conv2D_naive(X, W, s, p)
        # 使用优化的方法进行二维卷积操作,得到实际的输出
        mine = conv2D(X, W, s, p)

        # 检查实际输出和期望输出是否几乎相等
        np.testing.assert_almost_equal(mine, gold)
        # 打印“PASSED”表示测试通过
        print("PASSED")
        # 更新循环变量 i
        i  = 1
# 模型部分

# 定义训练 Variational Autoencoder (VAE) 模型的函数
def fit_VAE():
    # 导入所需的库和模块
    # 用于测试
    import tensorflow.keras.datasets.mnist as mnist
    from numpy_ml.neural_nets.models.vae import BernoulliVAE

    # 设置随机种子
    np.random.seed(12345)

    # 加载 MNIST 数据集
    (X_train, y_train), (X_test, y_test) = mnist.load_data()

    # 将像素强度缩放到 [0, 1] 范围内
    X_train = np.expand_dims(X_train.astype("float32") / 255.0, 3)
    X_test = np.expand_dims(X_test.astype("float32") / 255.0, 3)

    # 只使用前 128 * 1 个样本作为一个 batch
    X_train = X_train[: 128 * 1]

    # 创建 BernoulliVAE 实例
    BV = BernoulliVAE()
    # 训练 VAE 模型
    BV.fit(X_train, n_epochs=1, verbose=False)


# 定义测试 Wasserstein GAN with Gradient Penalty (WGAN-GP) 模型的函数
def test_WGAN_GP(N=1):
    # 导入 WGAN-GP 模型
    from numpy_ml.neural_nets.models.wgan_gp import WGAN_GP

    # 设置随机种子
    np.random.seed(12345)

    # 生成一个随机种子
    ss = np.random.randint(0, 1000)
    np.random.seed(ss)

    # 如果 N 为 None,则设置为无穷大
    N = np.inf if N is None else N

    # 初始化 i 为 1
    i = 1

numpy-mlnumpy_mlteststest_nn_activations.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入时间模块
import time
# 导入 numpy 模块并重命名为 np
import numpy as np

# 从 numpy.testing 模块中导入 assert_almost_equal 函数
from numpy.testing import assert_almost_equal
# 从 scipy.special 模块中导入 expit 函数
from scipy.special import expit

# 导入 torch 模块
import torch
# 从 torch.nn.functional 模块中导入 F
import torch.nn.functional as F

# 从 numpy_ml.utils.testing 模块中导入 random_stochastic_matrix 和 random_tensor 函数
from numpy_ml.utils.testing import random_stochastic_matrix, random_tensor

# 定义一个函数,用于生成 torch 梯度
def torch_gradient_generator(fn, **kwargs):
    # 定义内部函数 get_grad,用于计算梯度
    def get_grad(z):
        # 将 numpy 数组 z 转换为 torch 变量,并设置 requires_grad 为 True
        z1 = torch.autograd.Variable(torch.from_numpy(z), requires_grad=True)
        # 调用传入的函数 fn 计算 z1 的值,并对结果求和
        z2 = fn(z1, **kwargs).sum()
        # 对 z2 进行反向传播
        z2.backward()
        # 获取 z1 的梯度,并转换为 numpy 数组返回
        grad = z1.grad.numpy()
        return grad

    return get_grad


#######################################################################
#                           Debug Formatter                           #
#######################################################################

# 定义一个函数,用于格式化错误信息
def err_fmt(params, golds, ix, warn_str=""):
    mine, label = params[ix]
    err_msg = "-" * 25   " DEBUG "   "-" * 25   "n"
    prev_mine, prev_label = params[max(ix - 1, 0)]
    err_msg  = "Mine (prev) [{}]:n{}nnTheirs (prev) [{}]:n{}".format(
        prev_label, prev_mine, prev_label, golds[prev_label]
    )
    err_msg  = "nnMine [{}]:n{}nnTheirs [{}]:n{}".format(
        label, mine, label, golds[label]
    )
    err_msg  = warn_str
    err_msg  = "n"   "-" * 23   " END DEBUG "   "-" * 23
    return err_msg


#######################################################################
#                            Test Suite                               #
#######################################################################
#
#
#  def test_activations(N=50):
#      print("Testing Sigmoid activation")
#      time.sleep(1)
#      test_sigmoid_activation(N)
#      test_sigmoid_grad(N)
#
#      #  print("Testing Softmax activation")
#      #  time.sleep(1)
#      #  test_softmax_activation(N)
#      #  test_softmax_grad(N)
#
#      print("Testing Tanh activation")
#      time.sleep(1)
#      test_tanh_grad(N)
#
#      print("Testing ReLU activation")
#      time.sleep(1)
#      test_relu_activation(N)
#      test_relu_grad(N)
#
#      print("Testing ELU activation")
#      time.sleep(1)
#      test_elu_activation(N)
#      test_elu_grad(N)
#
#      print("Testing SELU activation")
#      time.sleep(1)
#      test_selu_activation(N)
#      test_selu_grad(N)
#
#      print("Testing LeakyRelu activation")
#      time.sleep(1)
#      test_leakyrelu_activation(N)
#      test_leakyrelu_grad(N)
#
#      print("Testing SoftPlus activation")
#      time.sleep(1)
#      test_softplus_activation(N)
#      test_softplus_grad(N)
#

#######################################################################
#                          Activations                                #
#######################################################################


# 测试 Sigmoid 激活函数
def test_sigmoid_activation(N=50):
    # 导入 Sigmoid 激活函数
    from numpy_ml.neural_nets.activations import Sigmoid

    # 如果 N 为 None,则设为无穷大
    N = np.inf if N is None else N

    # 创建 Sigmoid 激活函数对象
    mine = Sigmoid()
    # 创建 gold 函数对象,用于比较
    gold = expit

    i = 0
    while i < N:
        # 生成随机维度
        n_dims = np.random.randint(1, 100)
        # 生成随机张量
        z = random_tensor((1, n_dims))
        # 断言 Sigmoid 函数计算结果与 gold 函数计算结果几乎相等
        assert_almost_equal(mine.fn(z), gold(z))
        print("PASSED")
        i  = 1


# 测试 SoftPlus 激活函数
def test_softplus_activation(N=50):
    # 导入 SoftPlus 激活函数
    from numpy_ml.neural_nets.activations import SoftPlus

    # 如果 N 为 None,则设为无穷大
    N = np.inf if N is None else N

    # 创建 SoftPlus 激活函数对象
    mine = SoftPlus()
    # 创建 gold 函数对象,用于比较
    gold = lambda z: F.softplus(torch.FloatTensor(z)).numpy()

    i = 0
    while i < N:
        # 生成随机维度
        n_dims = np.random.randint(1, 100)
        # 生成随机随机矩阵
        z = random_stochastic_matrix(1, n_dims)
        # 断言 SoftPlus 函数计算结果与 gold 函数计算结果几乎相等
        assert_almost_equal(mine.fn(z), gold(z))
        print("PASSED")
        i  = 1


# 测试 ELU 激活函数
def test_elu_activation(N=50):
    # 导入 ELU 激活函数
    from numpy_ml.neural_nets.activations import ELU

    # 如果 N 为 None,则设为无穷大
    N = np.inf if N is None else N

    i = 0
    while i < N:
        # 生成随机维度
        n_dims = np.random.randint(1, 10)
        # 生成随机张量
        z = random_tensor((1, n_dims))

        # 生成随机 alpha 值
        alpha = np.random.uniform(0, 10)

        # 创建 ELU 激活函数对象
        mine = ELU(alpha)
        # 创建 gold 函数对象,用于比较
        gold = lambda z, a: F.elu(torch.from_numpy(z), alpha).numpy()

        # 断言 ELU 函数计算结果与 gold 函数计算结果几乎相等
        assert_almost_equal(mine.fn(z), gold(z, alpha))
        print("PASSED")
        i  = 1
# 测试 ReLU 激活函数的功能
def test_relu_activation(N=50):
    # 导入 ReLU 激活函数
    from numpy_ml.neural_nets.activations import ReLU

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 创建自定义的 ReLU 激活函数对象
    mine = ReLU()
    # 创建 PyTorch 中的 ReLU 激活函数对象
    gold = lambda z: F.relu(torch.FloatTensor(z)).numpy()

    # 初始化计数器 i
    i = 0
    # 循环执行 N 次
    while i < N:
        # 随机生成维度在 1 到 100 之间的随机数
        n_dims = np.random.randint(1, 100)
        # 生成一个随机的矩阵 z
        z = random_stochastic_matrix(1, n_dims)
        # 断言自定义的 ReLU 激活函数和 PyTorch 的 ReLU 激活函数的输出几乎相等
        assert_almost_equal(mine.fn(z), gold(z))
        # 打印 "PASSED"
        print("PASSED")
        # 更新计数器
        i  = 1


# 测试 SELU 激活函数的功能
def test_selu_activation(N=50):
    # 导入 SELU 激活函数
    from numpy_ml.neural_nets.activations import SELU

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 创建自定义的 SELU 激活函数对象
    mine = SELU()
    # 创建 PyTorch 中的 SELU 激活函数对象
    gold = lambda z: F.selu(torch.FloatTensor(z)).numpy()

    # 初始化计数器 i
    i = 0
    # 循环执行 N 次
    while i < N:
        # 随机生成维度在 1 到 100 之间的随机数
        n_dims = np.random.randint(1, 100)
        # 生成一个随机的矩阵 z
        z = random_stochastic_matrix(1, n_dims)
        # 断言自定义的 SELU 激活函数和 PyTorch 的 SELU 激活函数的输出几乎相等
        assert_almost_equal(mine.fn(z), gold(z))
        # 打印 "PASSED"
        print("PASSED")
        # 更新计数器
        i  = 1


# 测试 LeakyReLU 激活函数的功能
def test_leakyrelu_activation(N=50):
    # 导入 LeakyReLU 激活函数
    from numpy_ml.neural_nets.activations import LeakyReLU

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 初始化计数器 i
    i = 0
    # 循环执行 N 次
    while i < N:
        # 随机生成维度在 1 到 100 之间的随机数
        n_dims = np.random.randint(1, 100)
        # 生成一个随机的矩阵 z
        z = random_stochastic_matrix(1, n_dims)
        # 随机生成一个在 0 到 10 之间的 alpha 值
        alpha = np.random.uniform(0, 10)

        # 创建自定义的 LeakyReLU 激活函数对象
        mine = LeakyReLU(alpha=alpha)
        # 创建 PyTorch 中的 LeakyReLU 激活函数对象
        gold = lambda z: F.leaky_relu(torch.FloatTensor(z), alpha).numpy()
        # 断言自定义的 LeakyReLU 激活函数和 PyTorch 的 LeakyReLU 激活函数的输出几乎相等
        assert_almost_equal(mine.fn(z), gold(z))

        # 打印 "PASSED"
        print("PASSED")
        # 更新计数器
        i  = 1


# 测试 GELU 激活函数的功能
def test_gelu_activation(N=50):
    # 导入 GELU 激活函数
    from numpy_ml.neural_nets.activations import GELU

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 初始化计数器 i
    i = 0
    # 循环执行 N 次
    while i < N:
        # 随机生成维度在 1 到 100 之间的随机数
        n_dims = np.random.randint(1, 100)
        # 生成一个随机的矩阵 z
        z = random_stochastic_matrix(1, n_dims)
        # 随机选择是否使用近似计算
        approx = np.random.choice([True, False])

        # 创建自定义的 GELU 激活函数对象
        mine = GELU(approximate=False)
        mine_approx = GELU(approximate=True)
        # 创建 PyTorch 中的 GELU 激活函数对象
        gold = lambda z: F.gelu(torch.FloatTensor(z)).numpy()
        # 断言自定义的 GELU 激活函数和 PyTorch 的 GELU 激活函数的输出在相对误差范围内接近
        np.testing.assert_allclose(mine.fn(z), gold(z), rtol=1e-3)
        # 断言自定义的 GELU 激活函数和近似计算的 GELU 激活函数的输出几乎相等
        assert_almost_equal(mine.fn(z), mine_approx.fn(z))

        # 打印 "PASSED"
        print("PASSED")
        # 更新计数器
        i  = 1


#######################################################################
#                      Activation Gradients                           #
#######################################################################

# 测试 Sigmoid 激活函数的梯度
def test_sigmoid_grad(N=50):
    # 导入 Sigmoid 激活函数
    from numpy_ml.neural_nets.activations import Sigmoid

    # 如果 N 为 None,则设为无穷大
    N = np.inf if N is None else N

    # 创建 Sigmoid 激活函数对象
    mine = Sigmoid()
    # 创建 PyTorch 中 Sigmoid 激活函数的梯度函数
    gold = torch_gradient_generator(torch.sigmoid)

    i = 0
    while i < N:
        # 生成随机的样本数和维度
        n_ex = np.random.randint(1, 100)
        n_dims = np.random.randint(1, 100)
        z = random_tensor((n_ex, n_dims))
        # 断言 mine 的梯度与 gold 的梯度几乎相等
        assert_almost_equal(mine.grad(z), gold(z))
        print("PASSED")
        i  = 1

# 测试 ELU 激活函数的梯度
def test_elu_grad(N=50):
    # 导入 ELU 激活函数
    from numpy_ml.neural_nets.activations import ELU

    # 如果 N 为 None,则设为无穷大
    N = np.inf if N is None else N

    i = 0
    while i < N:
        # 生成随机的样本数、维度和 alpha 值
        n_ex = np.random.randint(1, 10)
        n_dims = np.random.randint(1, 10)
        alpha = np.random.uniform(0, 10)
        z = random_tensor((n_ex, n_dims))

        # 创建 ELU 激活函数对象和 PyTorch 中 ELU 激活函数的梯度函数
        mine = ELU(alpha)
        gold = torch_gradient_generator(F.elu, alpha=alpha)
        # 断言 mine 的梯度与 gold 的梯度几乎相等
        assert_almost_equal(mine.grad(z), gold(z), decimal=6)
        print("PASSED")
        i  = 1

# 测试 Tanh 激活函数的梯度
def test_tanh_grad(N=50):
    # 导入 Tanh 激活函数
    from numpy_ml.neural_nets.activations import Tanh

    # 如果 N 为 None,则设为无穷大
    N = np.inf if N is None else N

    # 创建 Tanh 激活函数对象
    mine = Tanh()
    # 创建 PyTorch 中 Tanh 激活函数的梯度函数
    gold = torch_gradient_generator(torch.tanh)

    i = 0
    while i < N:
        # 生成随机的样本数和维度
        n_ex = np.random.randint(1, 100)
        n_dims = np.random.randint(1, 100)
        z = random_tensor((n_ex, n_dims))
        # 断言 mine 的梯度与 gold 的梯度几乎相等
        assert_almost_equal(mine.grad(z), gold(z))
        print("PASSED")
        i  = 1

# 测试 ReLU 激活函数的梯度
def test_relu_grad(N=50):
    # 导入 ReLU 激活函数
    from numpy_ml.neural_nets.activations import ReLU

    # 如果 N 为 None,则设为无穷大
    N = np.inf if N is None else N

    # 创建 ReLU 激活函数对象
    mine = ReLU()
    # 创建 PyTorch 中 ReLU 激活函数的梯度函数
    gold = torch_gradient_generator(F.relu)

    i = 0
    while i < N:
        # 生成随机的样本数和维度
        n_ex = np.random.randint(1, 100)
        n_dims = np.random.randint(1, 100)
        z = random_tensor((n_ex, n_dims))
        # 断言 mine 的梯度与 gold 的梯度几乎相等
        assert_almost_equal(mine.grad(z), gold(z))
        print("PASSED")
        i  = 1

# 测试 GELU 激活函数的梯度
def test_gelu_grad(N=50):
    # 从 numpy_ml.neural_nets.activations 模块中导入 GELU 激活函数
    from numpy_ml.neural_nets.activations import GELU
    
    # 如果 N 为 None,则将 N 设置为正无穷
    N = np.inf if N is None else N
    
    # 创建一个不使用近似的 GELU 激活函数对象
    mine = GELU(approximate=False)
    # 创建一个使用近似的 GELU 激活函数对象
    mine_approx = GELU(approximate=True)
    # 创建一个使用 PyTorch 的 GELU 梯度生成器对象
    gold = torch_gradient_generator(F.gelu)
    
    # 初始化计数器 i
    i = 0
    # 当 i 小于 N 时执行循环
    while i < N:
        # 生成随机的样本数和维度数
        n_ex = np.random.randint(1, 100)
        n_dims = np.random.randint(1, 100)
        # 生成随机的张量 z
        z = random_tensor((n_ex, n_dims))
        # 断言 mine 激活函数的梯度与 gold 激活函数的梯度在小数点后三位上几乎相等
        assert_almost_equal(mine.grad(z), gold(z), decimal=3)
        # 断言 mine 激活函数的梯度与 mine_approx 激活函数的梯度在小数点后三位上几乎相等
        assert_almost_equal(mine.grad(z), mine_approx.grad(z))
        # 打印 "PASSED"
        print("PASSED")
        # 计数器 i 自增
        i  = 1
# 测试 SELU 激活函数的梯度计算
def test_selu_grad(N=50):
    # 导入 SELU 激活函数
    from numpy_ml.neural_nets.activations import SELU

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 创建 SELU 激活函数对象
    mine = SELU()
    # 创建 PyTorch 中 SELU 激活函数的梯度计算函数
    gold = torch_gradient_generator(F.selu)

    # 初始化计数器
    i = 0
    # 循环进行 N 次测试
    while i < N:
        # 随机生成样本数和维度
        n_ex = np.random.randint(1, 100)
        n_dims = np.random.randint(1, 100)
        # 生成随机张量
        z = random_tensor((n_ex, n_dims))
        # 断言 SELU 激活函数的梯度与 PyTorch 中的梯度计算函数结果相近
        assert_almost_equal(mine.grad(z), gold(z), decimal=6)
        # 打印测试通过信息
        print("PASSED")
        i  = 1


# 测试 LeakyReLU 激活函数的梯度计算
def test_leakyrelu_grad(N=50):
    # 导入 LeakyReLU 激活函数
    from numpy_ml.neural_nets.activations import LeakyReLU

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 初始化计数器
    i = 0
    # 循环进行 N 次测试
    while i < N:
        # 随机生成样本数、维度和 alpha 参数
        n_ex = np.random.randint(1, 10)
        n_dims = np.random.randint(1, 10)
        alpha = np.random.uniform(0, 10)
        # 生成随机张量
        z = random_tensor((n_ex, n_dims))

        # 创建 LeakyReLU 激活函数对象
        mine = LeakyReLU(alpha)
        # 创建 PyTorch 中 LeakyReLU 激活函数的梯度计算函数
        gold = torch_gradient_generator(F.leaky_relu, negative_slope=alpha)
        # 断言 LeakyReLU 激活函数的梯度与 PyTorch 中的梯度计算函数结果相近
        assert_almost_equal(mine.grad(z), gold(z), decimal=6)
        # 打印测试通过信息
        print("PASSED")
        i  = 1


# 测试 SoftPlus 激活函数的梯度计算
def test_softplus_grad(N=50):
    # 导入 SoftPlus 激活函数
    from numpy_ml.neural_nets.activations import SoftPlus

    # 如果 N 为 None,则将 N 设置为无穷大
    N = np.inf if N is None else N

    # 创建 SoftPlus 激活函数对象
    mine = SoftPlus()
    # 创建 PyTorch 中 SoftPlus 激活函数的梯度计算函数
    gold = torch_gradient_generator(F.softplus)

    # 初始化计数器
    i = 0
    # 循环进行 N 次测试
    while i < N:
        # 随机生成样本数、维度,并标准化生成的随机张量
        n_ex = np.random.randint(1, 100)
        n_dims = np.random.randint(1, 100)
        z = random_tensor((n_ex, n_dims), standardize=True)
        # 断言 SoftPlus 激活函数的梯度与 PyTorch 中的梯度计算函数结果相近
        assert_almost_equal(mine.grad(z), gold(z))
        # 打印测试通过信息
        print("PASSED")
        i  = 1


# 如果作为主程序运行,则执行激活函数测试
if __name__ == "__main__":
    test_activations(N=50)

numpy-mlnumpy_mlteststest_nonparametric.py

代码语言:javascript复制
# 禁用 flake8 的警告
# 导入 numpy 库并重命名为 np
import numpy as np

# 从 sklearn 库中导入 KNeighborsRegressor 和 KNeighborsClassifier 类
# 从 sklearn.gaussian_process 库中导入 GaussianProcessRegressor 类
from sklearn.neighbors import KNeighborsRegressor, KNeighborsClassifier
from sklearn.gaussian_process import GaussianProcessRegressor

# 从 numpy_ml.nonparametric.knn 模块中导入 KNN 类
# 从 numpy_ml.nonparametric.gp 模块中导入 GPRegression 类
# 从 numpy_ml.utils.distance_metrics 模块中导入 euclidean 函数
from numpy_ml.nonparametric.knn import KNN
from numpy_ml.nonparametric.gp import GPRegression
from numpy_ml.utils.distance_metrics import euclidean

# 定义测试 KNN 回归的函数,参数 N 默认值为 15
def test_knn_regression(N=15):
    # 设置随机种子为 12345
    np.random.seed(12345)

    # 初始化循环计数器 i
    i = 0
    # 循环执行 N 次
    while i < N:
        # 生成随机数 N 和 M
        N = np.random.randint(2, 100)
        M = np.random.randint(2, 100)
        # 生成随机数 k,ls,weights
        k = np.random.randint(1, N)
        ls = np.min([np.random.randint(1, 10), N - 1])
        weights = np.random.choice(["uniform", "distance"])

        # 生成随机数据 X,X_test,y
        X = np.random.rand(N, M)
        X_test = np.random.rand(N, M)
        y = np.random.rand(N)

        # 创建 KNN 模型对象 knn
        knn = KNN(
            k=k, leaf_size=ls, metric=euclidean, classifier=False, weights=weights
        )
        # 使用 X,y 训练 KNN 模型
        knn.fit(X, y)
        # 对 X_test 进行预测
        preds = knn.predict(X_test)

        # 创建 sklearn 中的 KNeighborsRegressor 模型对象 gold
        gold = KNeighborsRegressor(
            p=2,
            leaf_size=ls,
            n_neighbors=k,
            weights=weights,
            metric="minkowski",
            algorithm="ball_tree",
        )
        # 使用 X,y 训练 gold 模型
        gold.fit(X, y)
        # 对 X_test 进行预测
        gold_preds = gold.predict(X_test)

        # 检查预测结果是否几乎相等
        for mine, theirs in zip(preds, gold_preds):
            np.testing.assert_almost_equal(mine, theirs)
        # 打印测试通过信息
        print("PASSED")
        # 更新循环计数器
        i  = 1

# 定义测试 KNN 分类的函数,参数 N 默认值为 15
def test_knn_clf(N=15):
    # 设置随机种子为 12345
    np.random.seed(12345)

    # 初始化循环计数器 i
    i = 0
    # 当 i 小于 N 时执行循环
    while i < N:
        # 生成一个介于 2 到 100 之间的随机整数作为 N
        N = np.random.randint(2, 100)
        # 生成一个介于 2 到 100 之间的随机整数作为 M
        M = np.random.randint(2, 100)
        # 生成一个介于 1 到 N 之间的随机整数作为 k
        k = np.random.randint(1, N)
        # 生成一个介于 2 到 10 之间的随机整数作为 n_classes
        n_classes = np.random.randint(2, 10)
        # 生成一个介于 1 到 10 之间的随机整数和 N-1 中的最小值作为 ls
        ls = np.min([np.random.randint(1, 10), N - 1])
        # 设置权重为 "uniform"

        # 生成一个 N 行 M 列的随机数组作为 X
        X = np.random.rand(N, M)
        # 生成一个 N 行 M 列的随机数组作为 X_test
        X_test = np.random.rand(N, M)
        # 生成一个长度为 N 的随机整数数组作为 y
        y = np.random.randint(0, n_classes, size=N)

        # 创建 KNN 对象,设置参数 k, leaf_size, metric, classifier 和 weights
        knn = KNN(k=k, leaf_size=ls, metric=euclidean, classifier=True, weights=weights)
        # 使用 X 和 y 训练 KNN 模型
        knn.fit(X, y)
        # 对 X_test 进行预测
        preds = knn.predict(X_test)

        # 创建 KNeighborsClassifier 对象,设置参数 p, metric, leaf_size, n_neighbors, weights 和 algorithm
        gold = KNeighborsClassifier(
            p=2,
            metric="minkowski",
            leaf_size=ls,
            n_neighbors=k,
            weights=weights,
            algorithm="ball_tree",
        )
        # 使用 X 和 y 训练 KNeighborsClassifier 模型
        gold.fit(X, y)
        # 对 X_test 进行预测
        gold_preds = gold.predict(X_test)

        # 对 preds 和 gold_preds 进行逐一比较,检查是否几乎相等
        for mine, theirs in zip(preds, gold_preds):
            np.testing.assert_almost_equal(mine, theirs)
        # 打印 "PASSED" 表示测试通过
        print("PASSED")
        # i 自增 1
        i  = 1
# 定义一个测试高斯过程回归的函数,参数 N 默认为 15
def test_gp_regression(N=15):
    # 设置随机种子
    np.random.seed(12345)

    # 初始化循环计数器 i
    i = 0
    # 循环执行 N 次
    while i < N:
        # 生成随机的 alpha
        alpha = np.random.rand()
        # 生成随机的 N
        N = np.random.randint(2, 100)
        # 生成随机的 M
        M = np.random.randint(2, 100)
        # 生成随机的 K
        K = np.random.randint(1, N)
        # 生成随机的 J
        J = np.random.randint(1, 3)

        # 生成随机的 N 行 M 列的数据矩阵 X
        X = np.random.rand(N, M)
        # 生成随机的 N 行 J 列的数据矩阵 y
        y = np.random.rand(N, J)
        # 生成随机的 K 行 M 列的数据矩阵 X_test
        X_test = np.random.rand(K, M)

        # 创建一个高斯过程回归对象 gp,使用 RBF 核函数和指定的 alpha
        gp = GPRegression(kernel="RBFKernel(sigma=1)", alpha=alpha)
        # 创建一个高斯过程回归对象 gold,使用默认参数
        gold = GaussianProcessRegressor(
            kernel=None, alpha=alpha, optimizer=None, normalize_y=False
        )

        # 使用 X, y 训练 gp 模型
        gp.fit(X, y)
        # 使用 X, y 训练 gold 模型
        gold.fit(X, y)

        # 对 X_test 进行预测,返回预测值 preds 和方差
        preds, _ = gp.predict(X_test)
        # 使用 gold 模型对 X_test 进行预测,返回预测值 gold_preds
        gold_preds = gold.predict(X_test)
        # 检查预测值 preds 和 gold_preds 是否几乎相等
        np.testing.assert_almost_equal(preds, gold_preds)

        # 计算 gp 模型的边缘对数似然
        mll = gp.marginal_log_likelihood()
        # 计算 gold 模型的对数边缘似然
        gold_mll = gold.log_marginal_likelihood()
        # 检查 mll 和 gold_mll 是否几乎相等
        np.testing.assert_almost_equal(mll, gold_mll)

        # 打印 "PASSED",表示测试通过
        print("PASSED")
        # 更新循环计数器 i
        i  = 1

numpy-mlnumpy_mlteststest_preprocessing.py

代码语言:javascript复制
# 禁用 flake8 检查
from collections import Counter

# 导入 huffman 模块
import huffman
# 导入 numpy 模块
import numpy as np

# 从 scipy.fftpack 模块导入 dct 函数
from scipy.fftpack import dct

# 从 sklearn.preprocessing 模块导入 StandardScaler 类
from sklearn.preprocessing import StandardScaler
# 从 sklearn.feature_extraction.text 模块导入 TfidfVectorizer 类
from sklearn.feature_extraction.text import TfidfVectorizer

# 尝试导入 librosa.core.time_frequency 模块中的 fft_frequencies 函数,如果导入失败则导入 librosa 模块中的 fft_frequencies 函数
try:
    from librosa.core.time_frequency import fft_frequencies
except ImportError:
    # 对于 librosa 版本 >= 0.8.0
    from librosa import fft_frequencies
# 从 librosa.feature 模块导入 mfcc 函数
from librosa.feature import mfcc as lr_mfcc
# 从 librosa.util 模块导入 frame 函数
from librosa.util import frame
# 从 librosa.filters 模块导入 mel 函数
from librosa.filters import mel

# 导入 numpy_ml 模块中的相关实现
from numpy_ml.preprocessing.general import Standardizer
from numpy_ml.preprocessing.nlp import HuffmanEncoder, TFIDFEncoder
from numpy_ml.preprocessing.dsp import (
    DCT,
    DFT,
    mfcc,
    to_frames,
    mel_filterbank,
    dft_bins,
)
from numpy_ml.utils.testing import random_paragraph


# 测试 Huffman 编码
def test_huffman(N=15):
    np.random.seed(12345)

    i = 0
    while i < N:
        # 生成随机单词数量
        n_words = np.random.randint(1, 100)
        # 生成随机段落
        para = random_paragraph(n_words)
        # 创建 Huffman 编码器对象
        HT = HuffmanEncoder()
        # 对段落进行编码
        HT.fit(para)
        # 获取编码后的字典
        my_dict = HT._item2code
        # 使用 gold-standard 的 huffman 模块生成编码字典
        their_dict = huffman.codebook(Counter(para).items())

        # 检查两个字典是否一致
        for k, v in their_dict.items():
            fstr = "their_dict['{}'] = {}, but my_dict['{}'] = {}"
            assert k in my_dict, "key `{}` not in my_dict".format(k)
            assert my_dict[k] == v, fstr.format(k, v, k, my_dict[k])
        print("PASSED")
        i  = 1


# 测试标准化器
def test_standardizer(N=15):
    np.random.seed(12345)

    i = 0
    while i < N:
        # 随机生成是否计算均值和标准差的标志
        mean = bool(np.random.randint(2))
        std = bool(np.random.randint(2))
        # 随机生成矩阵的行数和列数
        N = np.random.randint(2, 100)
        M = np.random.randint(2, 100)
        # 生成随机矩阵
        X = np.random.rand(N, M)

        # 创建标准化器对象
        S = Standardizer(with_mean=mean, with_std=std)
        # 对数据进行拟合
        S.fit(X)
        # 进行标准化处理
        mine = S.transform(X)

        # 使用 sklearn 中的 StandardScaler 进行标准化
        theirs = StandardScaler(with_mean=mean, with_std=std)
        gold = theirs.fit_transform(X)

        # 检查两种标准化结果是否接近
        np.testing.assert_almost_equal(mine, gold)
        print("PASSED")
        i  = 1
# 测试 TF-IDF 编码器的功能,生成 N 个测试用例
def test_tfidf(N=15):
    # 设置随机种子
    np.random.seed(12345)

    # 初始化计数器 i
    i = 0
    # 循环生成 N 个测试用例
    while i < N:
        # 初始化文档列表
        docs = []
        # 随机生成文档数量
        n_docs = np.random.randint(1, 10)
        # 生成每个文档的内容
        for d in range(n_docs):
            # 随机生成每个文档的行数
            n_lines = np.random.randint(1, 1000)
            # 生成每行的随机段落
            lines = [random_paragraph(np.random.randint(1, 10)) for _ in range(n_lines)]
            # 将每行段落连接成文档
            docs.append("n".join([" ".join(l) for l in lines]))

        # 随机选择是否平滑 IDF
        smooth = bool(np.random.randint(2))

        # 初始化 TF-IDF 编码器对象
        tfidf = TFIDFEncoder(
            lowercase=True,
            min_count=0,
            smooth_idf=smooth,
            max_tokens=None,
            input_type="strings",
            filter_stopwords=False,
        )
        # 初始化 sklearn 中的 TfidfVectorizer 对象作为对照
        gold = TfidfVectorizer(
            input="content",
            norm=None,
            use_idf=True,
            lowercase=True,
            smooth_idf=smooth,
            sublinear_tf=False,
        )

        # 对 TF-IDF 编码器进行拟合
        tfidf.fit(docs)
        # 获取 TF-IDF 编码结果
        mine = tfidf.transform(ignore_special_chars=True)
        # 获取 sklearn 中 TfidfVectorizer 的结果并转换为数组
        theirs = gold.fit_transform(docs).toarray()

        # 断言 TF-IDF 编码结果与对照结果的近似相等
        np.testing.assert_almost_equal(mine, theirs)
        # 打印测试通过信息
        print("PASSED")
        # 计数器加一
        i  = 1


# 测试 DCT 变换的功能,生成 N 个测试用例
def test_dct(N=15):
    # 设置随机种子
    np.random.seed(12345)

    # 初始化计数器 i
    i = 0
    # 循环生成 N 个测试用例
    while i < N:
        # 随机生成信号长度 N
        N = np.random.randint(2, 100)
        # 随机生成信号
        signal = np.random.rand(N)
        # 随机选择是否正交
        ortho = bool(np.random.randint(2))
        # 计算自定义的 DCT 变换
        mine = DCT(signal, orthonormal=ortho)
        # 计算 scipy 中的 DCT 变换作为对照
        theirs = dct(signal, norm="ortho" if ortho else None)

        # 断言自定义 DCT 变换结果与对照结果的近似相等
        np.testing.assert_almost_equal(mine, theirs)
        # 打印测试通过信息
        print("PASSED")
        # 计数器加一
        i  = 1


# 测试 DFT 变换的功能,生成 N 个测试用例
def test_dft(N=15):
    # 设置随机种子
    np.random.seed(12345)

    # 初始化计数器 i
    i = 0
    # 循环生成 N 个测试用例
    while i < N:
        # 随机生成信号长度 N
        N = np.random.randint(2, 100)
        # 随机生成信号
        signal = np.random.rand(N)
        # 计算自定义的 DFT 变换
        mine = DFT(signal)
        # 计算 numpy 中的快速傅里叶变换作为对照
        theirs = np.fft.rfft(signal)

        # 断言自定义 DFT 变换结果的实部与对照结果的实部的近似相等
        np.testing.assert_almost_equal(mine.real, theirs.real)
        # 打印测试通过信息
        print("PASSED")
        # 计数器加一
        i  = 1


# 测试 MFCC 的功能,生成 N 个测试用例
def test_mfcc(N=1):
    """Broken"""
    # 设置随机种子
    np.random.seed(12345)

    # 初始化计数器 i
    i = 0
    # 当 i 小于 N 时执行循环
    while i < N:
        # 生成一个介于500到1000之间的随机整数,赋值给 N
        N = np.random.randint(500, 1000)
        # 生成一个介于50到100之间的随机整数,赋值给 fs
        fs = np.random.randint(50, 100)
        # 设置 MFCC 参数
        n_mfcc = 12
        window_len = 100
        stride_len = 50
        n_filters = 20
        window_dur = window_len / fs
        stride_dur = stride_len / fs
        # 生成一个长度为 N 的随机信号
        signal = np.random.rand(N)

        # 计算自己实现的 MFCC 特征
        mine = mfcc(
            signal,
            fs=fs,
            window="hann",
            window_duration=window_dur,
            stride_duration=stride_dur,
            lifter_coef=0,
            alpha=0,
            n_mfccs=n_mfcc,
            normalize=False,
            center=True,
            n_filters=n_filters,
            replace_intercept=False,
        )

        # 使用库函数计算 MFCC 特征
        theirs = lr_mfcc(
            signal,
            sr=fs,
            n_mels=n_filters,
            n_mfcc=n_mfcc,
            n_fft=window_len,
            hop_length=stride_len,
            htk=True,
        ).T

        # 检查两种方法计算的 MFCC 特征是否接近
        np.testing.assert_almost_equal(mine, theirs, decimal=4)
        # 打印“PASSED”表示通过测试
        print("PASSED")
        # i 自增
        i  = 1
# 测试帧处理函数
def test_framing(N=15):
    # 设置随机种子
    np.random.seed(12345)

    # 初始化循环计数器
    i = 0
    # 循环执行 N 次
    while i < N:
        # 生成随机信号长度
        N = np.random.randint(500, 100000)
        # 生成随机窗口长度
        window_len = np.random.randint(10, 100)
        # 生成随机步长
        stride_len = np.random.randint(1, 50)
        # 生成随机信号
        signal = np.random.rand(N)

        # 调用自定义的帧处理函数 to_frames
        mine = to_frames(signal, window_len, stride_len, writeable=False)
        # 调用库函数 frame 进行帧处理
        theirs = frame(signal, frame_length=window_len, hop_length=stride_len).T

        # 断言两者长度相等
        assert len(mine) == len(theirs), "len(mine) = {}, len(theirs) = {}".format(
            len(mine), len(theirs)
        )
        # 使用 np.testing.assert_almost_equal 检查两者是否几乎相等
        np.testing.assert_almost_equal(mine, theirs)
        # 打印测试通过信息
        print("PASSED")
        # 更新循环计数器
        i  = 1


# 测试离散傅立叶变换频率分辨率函数
def test_dft_bins(N=15):
    # 设置随机种子
    np.random.seed(12345)

    # 初始化循环计数器
    i = 0
    # 循环执行 N 次
    while i < N:
        # 生成随机信号长度
        N = np.random.randint(500, 100000)
        # 生成随机采样频率
        fs = np.random.randint(50, 1000)

        # 调用自定义的频率分辨率函数 dft_bins
        mine = dft_bins(N, fs=fs, positive_only=True)
        # 调用库函数 fft_frequencies 计算频率分辨率
        theirs = fft_frequencies(fs, N)
        # 使用 np.testing.assert_almost_equal 检查两者是否几乎相等
        np.testing.assert_almost_equal(mine, theirs)
        # 打印测试通过信息
        print("PASSED")
        # 更新循环计数器
        i  = 1


# 测试梅尔滤波器组函数
def test_mel_filterbank(N=15):
    # 设置随机种子
    np.random.seed(12345)

    # 初始化循环计数器
    i = 0
    # 循环执行 N 次
    while i < N:
        # 生成随机采样频率
        fs = np.random.randint(50, 10000)
        # 生成随机滤波器数量
        n_filters = np.random.randint(2, 20)
        # 生成随机窗口长度
        window_len = np.random.randint(10, 100)
        # 生成随机是否归一化参数
        norm = np.random.randint(2)

        # 调用自定义的梅尔滤波器组函数 mel_filterbank
        mine = mel_filterbank(
            window_len, n_filters, fs, min_freq=0, max_freq=None, normalize=bool(norm)
        )

        # 调用库函数 mel 计算梅尔滤波器组
        theirs = mel(
            fs,
            n_fft=window_len,
            n_mels=n_filters,
            htk=True,
            norm="slaney" if norm == 1 else None,
        )

        # 使用 np.testing.assert_almost_equal 检查两者是否几乎相等
        np.testing.assert_almost_equal(mine, theirs)
        # 打印测试通过信息
        print("PASSED")
        # 更新循环计数器
        i  = 1

numpy-mlnumpy_mlteststest_trees.py

代码语言:javascript复制
# 禁用 flake8 检查
import numpy as np

# 导入随机森林分类器和回归器
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
# 导入决策树分类器和回归器
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
# 导入准确率评估指标和均方误差评估指标
from sklearn.metrics import accuracy_score, mean_squared_error
# 导入生成回归数据和分类数据的函数
from sklearn.datasets import make_regression, make_blobs
# 导入数据集划分函数
from sklearn.model_selection import train_test_split

# 导入梯度提升决策树类
from numpy_ml.trees.gbdt import GradientBoostedDecisionTree
# 导入决策树类、节点类和叶子节点类
from numpy_ml.trees.dt import DecisionTree, Node, Leaf
# 导入随机森林类
from numpy_ml.trees.rf import RandomForest
# 导入随机生成张量的测试函数
from numpy_ml.utils.testing import random_tensor

# 克隆决策树
def clone_tree(dtree):
    # 获取决策树的左子节点、右子节点、特征、阈值和值
    children_left = dtree.tree_.children_left
    children_right = dtree.tree_.children_right
    feature = dtree.tree_.feature
    threshold = dtree.tree_.threshold
    values = dtree.tree_.value

    # 递归构建决策树
    def grow(node_id):
        l, r = children_left[node_id], children_right[node_id]
        if l == r:
            return Leaf(values[node_id].argmax())
        n = Node(None, None, (feature[node_id], threshold[node_id]))
        n.left = grow(l)
        n.right = grow(r)
        return n

    node_id = 0
    root = Node(None, None, (feature[node_id], threshold[node_id]))
    root.left = grow(children_left[node_id])
    root.right = grow(children_right[node_id])
    return root

# 比较两棵树的结构和值
def compare_trees(mine, gold):
    # 克隆金标准决策树
    clone = clone_tree(gold)
    mine = mine.root

    # 递归测试两棵树的结构和值是否相等
    def test(mine, clone):
        if isinstance(clone, Node) and isinstance(mine, Node):
            assert mine.feature == clone.feature, "Node {} not equal".format(depth)
            np.testing.assert_allclose(mine.threshold, clone.threshold)
            test(mine.left, clone.left, depth   1)
            test(mine.right, clone.right, depth   1)
        elif isinstance(clone, Leaf) and isinstance(mine, Leaf):
            np.testing.assert_allclose(mine.value, clone.value)
            return
        else:
            raise ValueError("Nodes at depth {} are not equal".format(depth))

    depth = 0
    ok = True
    # 当 ok 为真时进入循环
    while ok:
        # 如果 clone 和 mine 都是 Node 类型的实例
        if isinstance(clone, Node) and isinstance(mine, Node):
            # 断言 mine 的特征与 clone 的特征相等
            assert mine.feature == clone.feature
            # 使用 np.testing.assert_allclose 检查 mine 的阈值与 clone 的阈值是否相等
            np.testing.assert_allclose(mine.threshold, clone.threshold)
            # 递归调用 test 函数,比较 mine 的左子节点和 clone 的左子节点
            test(mine.left, clone.left, depth   1)
            # 递归调用 test 函数,比较 mine 的右子节点和 clone 的右子节点
            test(mine.right, clone.right, depth   1)
        # 如果 clone 和 mine 都是 Leaf 类型的实例
        elif isinstance(clone, Leaf) and isinstance(mine, Leaf):
            # 使用 np.testing.assert_allclose 检查 mine 的值与 clone 的值是否相等
            np.testing.assert_allclose(mine.value, clone.value)
            # 返回,结束当前递归
            return
        # 如果 clone 和 mine 不是相同类型的节点
        else:
            # 抛出 ValueError 异常,提示深度为 depth 的节点不相等
            raise ValueError("Nodes at depth {} are not equal".format(depth))
# 测试决策树模型,N默认为1
def test_DecisionTree(N=1):
    # 初始化i为1
    i = 1
    # 设置随机种子为12345
    np.random.seed(12345)

# 测试随机森林模型,N默认为1
def test_RandomForest(N=1):
    # 设置随机种子为12345
    np.random.seed(12345)
    # 初始化i为1
    i = 1

# 测试梯度提升决策树模型,N默认为1
def test_gbdt(N=1):
    # 设置随机种子为12345
    np.random.seed(12345)
    # 初始化i为1
    i = 1

numpy-mlnumpy_mlteststest_utils.py

代码语言:javascript复制
# 禁用 flake8 的警告
import numpy as np

# 导入 scipy 库
import scipy
# 导入 networkx 库并重命名为 nx
import networkx as nx

# 从 sklearn.neighbors 模块中导入 BallTree 类并重命名为 sk_BallTree
from sklearn.neighbors import BallTree as sk_BallTree
# 从 sklearn.metrics.pairwise 模块中导入 rbf_kernel 函数并重命名为 sk_rbf
from sklearn.metrics.pairwise import rbf_kernel as sk_rbf
# 从 sklearn.metrics.pairwise 模块中导入 linear_kernel 函数并重命名为 sk_linear
from sklearn.metrics.pairwise import linear_kernel as sk_linear
# 从 sklearn.metrics.pairwise 模块中导入 polynomial_kernel 函数并重命名为 sk_poly
from sklearn.metrics.pairwise import polynomial_kernel as sk_poly

# 从 numpy_ml.utils.distance_metrics 模块中导入多个距离度量函数
from numpy_ml.utils.distance_metrics import (
    hamming,
    euclidean,
    chebyshev,
    manhattan,
    minkowski,
)
# 从 numpy_ml.utils.kernels 模块中导入 LinearKernel、PolynomialKernel、RBFKernel 类
from numpy_ml.utils.kernels import LinearKernel, PolynomialKernel, RBFKernel
# 从 numpy_ml.utils.data_structures 模块中导入 BallTree 类
from numpy_ml.utils.data_structures import BallTree
# 从 numpy_ml.utils.graphs 模块中导入多个图相关的类和函数
from numpy_ml.utils.graphs import (
    Edge,
    DiGraph,
    UndirectedGraph,
    random_DAG,
    random_unweighted_graph,
)

#######################################################################
#                               Kernels                               #
#######################################################################

# 定义测试线性核函数的函数
def test_linear_kernel(N=1):
    # 设置随机种子
    np.random.seed(12345)
    i = 0
    while i < N:
        # 生成随机数 N、M、C
        N = np.random.randint(1, 100)
        M = np.random.randint(1, 100)
        C = np.random.randint(1, 1000)

        # 生成随机矩阵 X 和 Y
        X = np.random.rand(N, C)
        Y = np.random.rand(M, C)

        # 计算自定义线性核函数的结果
        mine = LinearKernel()(X, Y)
        # 计算 sklearn 中线性核函数的结果
        gold = sk_linear(X, Y)

        # 使用 np.testing.assert_almost_equal 函数比较两个结果的近似程度
        np.testing.assert_almost_equal(mine, gold)
        # 打印测试通过信息
        print("PASSED")
        i  = 1

# 定义测试多项式核函数的函数
def test_polynomial_kernel(N=1):
    np.random.seed(12345)
    i = 0
    while i < N:
        N = np.random.randint(1, 100)
        M = np.random.randint(1, 100)
        C = np.random.randint(1, 1000)
        gamma = np.random.rand()
        d = np.random.randint(1, 5)
        c0 = np.random.rand()

        X = np.random.rand(N, C)
        Y = np.random.rand(M, C)

        mine = PolynomialKernel(gamma=gamma, d=d, c0=c0)(X, Y)
        gold = sk_poly(X, Y, gamma=gamma, degree=d, coef0=c0)

        np.testing.assert_almost_equal(mine, gold)
        print("PASSED")
        i  = 1

# 定义测试径向基核函数的函数
def test_radial_basis_kernel(N=1):
    np.random.seed(12345)
    i = 0
    # 当 i 小于 N 时执行循环
    while i < N:
        # 生成随机整数 N,范围在 [1, 100)
        N = np.random.randint(1, 100)
        # 生成随机整数 M,范围在 [1, 100)
        M = np.random.randint(1, 100)
        # 生成随机整数 C,范围在 [1, 1000)
        C = np.random.randint(1, 1000)
        # 生成随机浮点数 gamma,范围在 [0, 1)
        gamma = np.random.rand()

        # 生成 N 行 C 列的随机数组 X
        X = np.random.rand(N, C)
        # 生成 M 行 C 列的随机数组 Y
        Y = np.random.rand(M, C)

        # sklearn (gamma) <-> mine (sigma) 转换:
        # gamma = 1 / (2 * sigma^2)
        # sigma = np.sqrt(1 / 2 * gamma)

        # 使用 RBFKernel 类计算 RBF 核函数值,传入参数 sigma
        mine = RBFKernel(sigma=np.sqrt(1 / (2 * gamma)))(X, Y)
        # 使用 sk_rbf 函数计算 RBF 核函数值,传入参数 gamma
        gold = sk_rbf(X, Y, gamma=gamma)

        # 使用 np.testing.assert_almost_equal 函数检查 mine 和 gold 是否近似相等
        np.testing.assert_almost_equal(mine, gold)
        # 打印 "PASSED" 表示测试通过
        print("PASSED")
        # i 自增 1
        i  = 1
# 距离度量函数的测试

# 测试欧几里德距离函数
def test_euclidean(N=1):
    # 设置随机种子
    np.random.seed(12345)
    i = 0
    while i < N:
        # 生成随机长度的向量
        N = np.random.randint(1, 100)
        x = np.random.rand(N)
        y = np.random.rand(N)
        # 计算自定义的欧几里德距离
        mine = euclidean(x, y)
        # 使用 SciPy 库计算欧几里德距离
        theirs = scipy.spatial.distance.euclidean(x, y)
        # 断言两者的结果几乎相等
        np.testing.assert_almost_equal(mine, theirs)
        print("PASSED")
        i  = 1

# 测试汉明距离函数
def test_hamming(N=1):
    np.random.seed(12345)
    i = 0
    while i < N:
        N = np.random.randint(1, 100)
        # 生成随机整数向量
        x = (np.random.rand(N) * 100).round().astype(int)
        y = (np.random.rand(N) * 100).round().astype(int)
        # 计算自定义的汉明距离
        mine = hamming(x, y)
        # 使用 SciPy 库计算汉明距离
        theirs = scipy.spatial.distance.hamming(x, y)
        # 断言两者的结果几乎相等
        np.testing.assert_almost_equal(mine, theirs)
        print("PASSED")
        i  = 1

# 测试闵可夫斯基距离函数
def test_minkowski(N=1):
    np.random.seed(12345)
    i = 0
    while i < N:
        N = np.random.randint(1, 100)
        p = 1   np.random.rand() * 10
        x = np.random.rand(N)
        y = np.random.rand(N)
        # 计算自定义的闵可夫斯基距离
        mine = minkowski(x, y, p)
        # 使用 SciPy 库计算闵可夫斯基距离
        theirs = scipy.spatial.distance.minkowski(x, y, p)
        # 断言两者的结果几乎相等
        np.testing.assert_almost_equal(mine, theirs)
        print("PASSED")
        i  = 1

# 测试切比雪夫距离函数
def test_chebyshev(N=1):
    np.random.seed(12345)
    i = 0
    while i < N:
        N = np.random.randint(1, 100)
        x = np.random.rand(N)
        y = np.random.rand(N)
        # 计算自定义的切比雪夫距离
        mine = chebyshev(x, y)
        # 使用 SciPy 库计算切比雪夫距离
        theirs = scipy.spatial.distance.chebyshev(x, y)
        # 断言两者的结果几乎相等
        np.testing.assert_almost_equal(mine, theirs)
        print("PASSED")
        i  = 1

# 测试曼哈顿距离函数
def test_manhattan(N=1):
    np.random.seed(12345)
    i = 0
    # 当 i 小于 N 时执行循环
    while i < N:
        # 生成一个随机整数 N,范围在 [1, 100)
        N = np.random.randint(1, 100)
        # 生成一个包含 N 个随机浮点数的数组 x
        x = np.random.rand(N)
        # 生成一个包含 N 个随机浮点数的数组 y
        y = np.random.rand(N)
        # 调用 manhattan 函数计算曼哈顿距离
        mine = manhattan(x, y)
        # 调用 scipy 库中的 cityblock 函数计算曼哈顿距离
        theirs = scipy.spatial.distance.cityblock(x, y)
        # 使用 np.testing.assert_almost_equal 函数比较两个距离的近似相等性
        np.testing.assert_almost_equal(mine, theirs)
        # 打印 "PASSED" 表示测试通过
        print("PASSED")
        # i 自增
        i  = 1
# 定义一个函数用于测试 BallTree 数据结构
def test_ball_tree(N=1):
    # 设置随机种子
    np.random.seed(12345)
    # 初始化计数器 i
    i = 0
    # 循环 N 次
    while i < N:
        # 生成随机数 N 和 M
        N = np.random.randint(2, 100)
        M = np.random.randint(2, 100)
        # 生成随机数 k
        k = np.random.randint(1, N)
        # 生成随机数 ls
        ls = np.min([np.random.randint(1, 10), N - 1])

        # 生成随机数据矩阵 X
        X = np.random.rand(N, M)
        # 创建 BallTree 对象 BT
        BT = BallTree(leaf_size=ls, metric=euclidean)
        # 对数据矩阵 X 进行拟合
        BT.fit(X)

        # 生成随机向量 x
        x = np.random.rand(M)
        # 获取最近的 k 个邻居
        mine = BT.nearest_neighbors(k, x)
        # 断言返回的邻居数量等于 k
        assert len(mine) == k

        # 提取邻居的键和距离
        mine_neighb = np.array([n.key for n in mine])
        mine_dist = np.array([n.distance for n in mine])

        # 对距离进行排序
        sort_ix = np.argsort(mine_dist)
        mine_dist = mine_dist[sort_ix]
        mine_neighb = mine_neighb[sort_ix]

        # 创建 scikit-learn 的 BallTree 对象 sk
        sk = sk_BallTree(X, leaf_size=ls)
        # 使用 sk 查询最近的 k 个邻居
        theirs_dist, ind = sk.query(x.reshape(1, -1), k=k)
        sort_ix = np.argsort(theirs_dist.flatten())

        theirs_dist = theirs_dist.flatten()[sort_ix]
        theirs_neighb = X[ind.flatten()[sort_ix]]

        # 断言我的结果与 scikit-learn 的结果一致
        for j in range(len(theirs_dist)):
            np.testing.assert_almost_equal(mine_neighb[j], theirs_neighb[j])
            np.testing.assert_almost_equal(mine_dist[j], theirs_dist[j])

        # 打印测试通过信息
        print("PASSED")
        # 更新计数器 i
        i  = 1


# 将 networkx 图转换为自定义图表示
def from_networkx(G_nx):
    V = list(G_nx.nodes)
    edges = list(G_nx.edges)
    # 检查是否是带权重的图
    is_weighted = "weight" in G_nx[edges[0][0]][edges[0][1]]

    E = []
    for e in edges:
        if is_weighted:
            # 如果是带权重的边,则添加带权重的边
            E.append(Edge(e[0], e[1], G_nx[e[0]][e[1]]["weight"]))
        else:
            # 如果不带权重,则添加不带权重的边
            E.append(Edge(e[0], e[1]))
    # 如果输入的图 G_nx 是有向图,则返回一个有向图对象 DiGraph(V, E)
    # 否则返回一个无向图对象 UndirectedGraph(V, E)
    return DiGraph(V, E) if nx.is_directed(G_nx) else UndirectedGraph(V, E)
# 将自定义的图形表示转换为 networkx 图形
def to_networkx(G):
    # 如果图是有向的,则创建有向图,否则创建无向图
    G_nx = nx.DiGraph() if G.is_directed else nx.Graph()
    # 获取图中所有顶点
    V = list(G._V2I.keys())
    # 将所有顶点添加到 networkx 图中
    G_nx.add_nodes_from(V)

    for v in V:
        # 获取顶点在图中的索引
        fr_i = G._V2I[v]
        # 获取与该顶点相连的边
        edges = G._G[fr_i]

        for edge in edges:
            # 将边添加到 networkx 图中,包括权重信息
            G_nx.add_edge(edge.fr, edge.to, weight=edge._w)
    return G_nx


def test_all_paths(N=1):
    np.random.seed(12345)
    i = 0
    while i < N:
        # 生成随机概率值
        p = np.random.rand()
        # 生成随机值来确定图是否有向
        directed = np.random.rand() < 0.5
        # 创建一个随机无权重图
        G = random_unweighted_graph(n_vertices=5, edge_prob=p, directed=directed)

        nodes = G._I2V.keys()
        G_nx = to_networkx(G)

        # 对于每个图,测试所有起点和终点顶点对的 all_paths 方法
        # 注意图不一定是连通的,所以许多路径可能为空
        for s_i in nodes:
            for e_i in nodes:
                if s_i == e_i:
                    continue

                # 获取自定义图中的所有路径
                paths = G.all_paths(s_i, e_i)
                # 获取 networkx 图中的所有简单路径
                paths_nx = nx.all_simple_paths(G_nx, source=s_i, target=e_i, cutoff=10)

                # 对路径进行排序
                paths = sorted(paths)
                paths_nx = sorted(list(paths_nx))

                # 断言两个路径列表是否相等
                for p1, p2 in zip(paths, paths_nx):
                    np.testing.assert_array_equal(p1, p2)

                print("PASSED")
                i  = 1


def test_random_DAG(N=1):
    np.random.seed(12345)
    i = 0
    while i < N:
        # 生成指定范围内的随机概率值
        p = np.random.uniform(0.25, 1)
        # 生成指定范围内的随机顶点数量
        n_v = np.random.randint(5, 50)

        # 创建一个随机有向无环图
        G = random_DAG(n_v, p)
        G_nx = to_networkx(G)

        # 断言 networkx 图是否是有向无环图
        assert nx.is_directed_acyclic_graph(G_nx)
        print("PASSED")
        i  = 1


def test_topological_ordering(N=1):
    np.random.seed(12345)
    i = 0
    # 当 i 小于 N 时执行循环
    while i < N:
        # 生成一个介于 0.25 和 1 之间的随机数
        p = np.random.uniform(0.25, 1)
        # 生成一个介于 5 和 10 之间的随机整数
        n_v = np.random.randint(5, 10)

        # 生成一个随机有向无环图
        G = random_DAG(n_v, p)
        # 将生成的图转换为 NetworkX 图
        G_nx = to_networkx(G)

        # 如果转换后的图是有向无环图
        if nx.is_directed_acyclic_graph(G_nx):
            # 获取图的拓扑排序
            topo_order = G.topological_ordering()

            # 测试拓扑排序
            seen_it = set()
            for n_i in topo_order:
                seen_it.add(n_i)
                # 断言:对于每个节点 n_i,其邻居节点 c_i 不在已经遍历过的节点集合中
                assert any([c_i in seen_it for c_i in G.get_neighbors(n_i)]) == False

            # 打印 "PASSED" 表示测试通过
            print("PASSED")
            # 增加 i 的值,继续下一次循环
            i  = 1
# 测试图是否为无环图
def test_is_acyclic(N=1):
    # 设置随机种子
    np.random.seed(12345)
    # 初始化计数器
    i = 0
    # 循环N次
    while i < N:
        # 生成随机概率值
        p = np.random.rand()
        # 生成随机布尔值,表示是否为有向图
        directed = np.random.rand() < 0.5
        # 生成一个随机无权图
        G = random_unweighted_graph(n_vertices=10, edge_prob=p, directed=True)
        # 将生成的图转换为 NetworkX 图
        G_nx = to_networkx(G)

        # 断言当前图是否为无环图,与 NetworkX 中的判断结果进行比较
        assert G.is_acyclic() == nx.is_directed_acyclic_graph(G_nx)
        # 打印测试通过信息
        print("PASSED")
        # 更新计数器
        i  = 1

numpy-mlnumpy_mltests__init__.py

代码语言:javascript复制
# 用于各种 numpy-ml 模块的单元测试

numpy-mlnumpy_mltreesdt.py

代码语言:javascript复制
# 导入 NumPy 库
import numpy as np

# 定义节点类
class Node:
    def __init__(self, left, right, rule):
        # 初始化节点的左子节点
        self.left = left
        # 初始化节点的右子节点
        self.right = right
        # 获取节点的特征
        self.feature = rule[0]
        # 获取节点的阈值
        self.threshold = rule[1]

# 定义叶子节点类
class Leaf:
    def __init__(self, value):
        """
        `value` is an array of class probabilities if classifier is True, else
        the mean of the region
        """
        # 初始化叶子节点的值,如果是分类器,则为类别概率数组,否则为区域的均值
        self.value = value

# 定义决策树类
class DecisionTree:
    def __init__(
        self,
        classifier=True,
        max_depth=None,
        n_feats=None,
        criterion="entropy",
        seed=None,
        """
        用于回归和分类问题的决策树模型。

        参数
        ----------
        classifier : bool
            是否将目标值视为分类(classifier = True)或连续(classifier = False)。默认为True。
        max_depth: int or None
            停止生长树的深度。如果为None,则生长树直到所有叶子节点都是纯的。默认为None。
        n_feats : int
            指定在每次分裂时要采样的特征数量。如果为None,则在每次分裂时使用所有特征。默认为None。
        criterion : {'mse', 'entropy', 'gini'}
            计算分裂时要使用的错误标准。当`classifier`为False时,有效输入为{'mse'}。当`classifier`为True时,有效输入为{'entropy', 'gini'}。默认为'entropy'。
        seed : int or None
            随机数生成器的种子。默认为None。
        """
        如果有种子值,则设置随机数种子
        if seed:
            np.random.seed(seed)

        初始化深度为0
        初始化根节点为None

        设置特征数量为n_feats
        设置错误标准为criterion
        设置分类器为classifier
        设置最大深度为max_depth,如果max_depth为None,则设置为无穷大

        如果不是分类问题且错误标准为["gini", "entropy"]之一,则引发值错误
        if not classifier and criterion in ["gini", "entropy"]:
            raise ValueError(
                "{} is a valid criterion only when classifier = True.".format(criterion)
            )
        如果是分类问题且错误标准为"mse",则引发值错误
        if classifier and criterion == "mse":
            raise ValueError("`mse` is a valid criterion only when classifier = False.")
    # 适应二叉决策树到数据集
    def fit(self, X, Y):
        """
        Fit a binary decision tree to a dataset.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            The training data of `N` examples, each with `M` features
        Y : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            An array of integer class labels for each example in `X` if
            self.classifier = True, otherwise the set of target values for
            each example in `X`.
        """
        # 如果是分类器,则确定类别数量,否则为 None
        self.n_classes = max(Y)   1 if self.classifier else None
        # 确定特征数量,如果未指定则为 X 的特征数量,否则取最小值
        self.n_feats = X.shape[1] if not self.n_feats else min(self.n_feats, X.shape[1])
        # 生成决策树的根节点
        self.root = self._grow(X, Y)

    # 使用训练好的决策树对 X 中的示例进行分类或预测
    def predict(self, X):
        """
        Use the trained decision tree to classify or predict the examples in `X`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            The training data of `N` examples, each with `M` features

        Returns
        -------
        preds : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            The integer class labels predicted for each example in `X` if
            self.classifier = True, otherwise the predicted target values.
        """
        # 对 X 中的每个示例进行预测,返回预测的类别或目标值
        return np.array([self._traverse(x, self.root) for x in X])
    def predict_class_probs(self, X):
        """
        使用训练好的决策树来返回`X`中每个示例的类别概率。

        参数
        ----------
        X : :py:class:`ndarray <numpy.ndarray>`,形状为`(N, M)`
            `N`个示例的训练数据,每个示例有`M`个特征

        返回
        -------
        preds : :py:class:`ndarray <numpy.ndarray>`,形状为`(N, n_classes)`
            针对`X`中每个示例预测的类别概率。
        """
        # 检查分类器是否已定义
        assert self.classifier, "`predict_class_probs` undefined for classifier = False"
        # 对`X`中的每个示例调用`_traverse`方法,返回类别概率数组
        return np.array([self._traverse(x, self.root, prob=True) for x in X])

    def _grow(self, X, Y, cur_depth=0):
        # 如果所有标签都相同,则返回一个叶节点
        if len(set(Y)) == 1:
            if self.classifier:
                prob = np.zeros(self.n_classes)
                prob[Y[0]] = 1.0
            return Leaf(prob) if self.classifier else Leaf(Y[0])

        # 如果达到最大深度,则返回一个叶节点
        if cur_depth >= self.max_depth:
            v = np.mean(Y, axis=0)
            if self.classifier:
                v = np.bincount(Y, minlength=self.n_classes) / len(Y)
            return Leaf(v)

        cur_depth  = 1
        self.depth = max(self.depth, cur_depth)

        N, M = X.shape
        feat_idxs = np.random.choice(M, self.n_feats, replace=False)

        # 根据`criterion`贪婪地选择最佳分割
        feat, thresh = self._segment(X, Y, feat_idxs)
        l = np.argwhere(X[:, feat] <= thresh).flatten()
        r = np.argwhere(X[:, feat] > thresh).flatten()

        # 递归生长左右子树
        left = self._grow(X[l, :], Y[l], cur_depth)
        right = self._grow(X[r, :], Y[r], cur_depth)
        return Node(left, right, (feat, thresh))
    def _segment(self, X, Y, feat_idxs):
        """
        Find the optimal split rule (feature index and split threshold) for the
        data according to `self.criterion`.
        """
        # 初始化最佳增益为负无穷
        best_gain = -np.inf
        split_idx, split_thresh = None, None
        # 遍历特征索引
        for i in feat_idxs:
            # 获取特征值
            vals = X[:, i]
            # 获取唯一值
            levels = np.unique(vals)
            # 计算阈值
            thresholds = (levels[:-1]   levels[1:]) / 2 if len(levels) > 1 else levels
            # 计算增益
            gains = np.array([self._impurity_gain(Y, t, vals) for t in thresholds])

            # 更新最佳增益
            if gains.max() > best_gain:
                split_idx = i
                best_gain = gains.max()
                split_thresh = thresholds[gains.argmax()]

        return split_idx, split_thresh

    def _impurity_gain(self, Y, split_thresh, feat_values):
        """
        Compute the impurity gain associated with a given split.

        IG(split) = loss(parent) - weighted_avg[loss(left_child), loss(right_child)]
        """
        # 根据不同的准则选择损失函数
        if self.criterion == "entropy":
            loss = entropy
        elif self.criterion == "gini":
            loss = gini
        elif self.criterion == "mse":
            loss = mse

        # 计算父节点的损失
        parent_loss = loss(Y)

        # 生成分裂
        left = np.argwhere(feat_values <= split_thresh).flatten()
        right = np.argwhere(feat_values > split_thresh).flatten()

        if len(left) == 0 or len(right) == 0:
            return 0

        # 计算子节点损失的加权平均
        n = len(Y)
        n_l, n_r = len(left), len(right)
        e_l, e_r = loss(Y[left]), loss(Y[right])
        child_loss = (n_l / n) * e_l   (n_r / n) * e_r

        # 计算增益是分裂前后损失的差异
        ig = parent_loss - child_loss
        return ig
    # 递归遍历决策树节点,根据输入数据 X 和节点信息 node 进行预测
    def _traverse(self, X, node, prob=False):
        # 如果节点是叶子节点
        if isinstance(node, Leaf):
            # 如果是分类器模型
            if self.classifier:
                # 返回节点的值(概率值或类别值)
                return node.value if prob else node.value.argmax()
            # 如果是回归模型,直接返回节点的值
            return node.value
        # 如果输入数据 X 在节点的特征上的取值小于等于节点的阈值
        if X[node.feature] <= node.threshold:
            # 递归遍历左子树节点
            return self._traverse(X, node.left, prob)
        # 如果输入数据 X 在节点的特征上的取值大于节点的阈值
        return self._traverse(X, node.right, prob)
# 计算决策树(即均值)预测的均方误差
def mse(y):
    # 返回 y 与 y 的均值之差的平方的均值
    return np.mean((y - np.mean(y)) ** 2)


# 计算标签序列的熵
def entropy(y):
    # 统计标签序列中每个标签出现的次数
    hist = np.bincount(y)
    # 计算每个标签出现的概率
    ps = hist / np.sum(hist)
    # 计算熵值
    return -np.sum([p * np.log2(p) for p in ps if p > 0])


# 计算标签序列的基尼不纯度(局部熵)
def gini(y):
    # 统计标签序列中每个标签出现的次数
    hist = np.bincount(y)
    # 计算总样本数
    N = np.sum(hist)
    # 计算基尼不纯度
    return 1 - sum([(i / N) ** 2 for i in hist])

0 人点赞