NumPyML 源码解析(三)

2024-02-17 10:03:16 浏览数 (1)

Losses

The losses.py module implements several common loss functions, including:

  • Squared error
  • Cross-entropy
  • Variational lower-bound for binary VAE (Kingma & Welling, 2014)
  • WGAN-GP loss for generator and critic (Gulrajani et al., 2017)
  • Noise contrastive estimation (NCE) loss (Gutmann & Hyvärinen, 2010; Minh & Teh, 2012)

numpy-mlnumpy_mlneural_netslosses__init__.py

代码语言:javascript复制
"""
Common neural network loss functions.

This module implements loss objects that can be used during neural network
training.
"""
# 导入 loss 模块中的所有内容
from .losses import *

Models

The models module implements popular full neural networks. It includes:

  • vae.py: A Bernoulli variational autoencoder (Kingma & Welling, 2014)
  • wgan_gp.py: A Wasserstein generative adversarial network with gradient penalty (Gulrajani et al., 2017; Goodfellow et al., 2014)
  • w2v.py: word2vec model with CBOW and skip-gram architectures and training via noise contrastive estimation (Mikolov et al., 2012)

numpy-mlnumpy_mlneural_netsmodelsvae.py

代码语言:javascript复制
# 从 time 模块中导入 time 函数
# 从 collections 模块中导入 OrderedDict 类
# 从 numpy 模块中导入 np 别名
# 从相对路径中导入 VAELoss 类
# 从相对路径中导入 minibatch 函数
# 从相对路径中导入 ReLU、Affine、Sigmoid 类
# 从相对路径中导入 Conv2D、Pool2D、Flatten、FullyConnected 类
from time import time
from collections import OrderedDict
import numpy as np
from ..losses import VAELoss
from ..utils import minibatch
from ..activations import ReLU, Affine, Sigmoid
from ..layers import Conv2D, Pool2D, Flatten, FullyConnected

# 定义 BernoulliVAE 类
class BernoulliVAE(object):
    # 初始化函数
    def __init__(
        self,
        T=5,
        latent_dim=256,
        enc_conv1_pad=0,
        enc_conv2_pad=0,
        enc_conv1_out_ch=32,
        enc_conv2_out_ch=64,
        enc_conv1_stride=1,
        enc_pool1_stride=2,
        enc_conv2_stride=1,
        enc_pool2_stride=1,
        enc_conv1_kernel_shape=(5, 5),
        enc_pool1_kernel_shape=(2, 2),
        enc_conv2_kernel_shape=(5, 5),
        enc_pool2_kernel_shape=(2, 2),
        optimizer="RMSProp(lr=0.0001)",
        init="glorot_uniform",
    # 初始化参数函数
    def _init_params(self):
        # 初始化参数字典
        self._dv = {}
        # 构建编码器
        self._build_encoder()
        # 构建解码器
        self._build_decoder()
    def _build_encoder(self):
        """
        构建 CNN 编码器

        Conv1 -> ReLU -> MaxPool1 -> Conv2 -> ReLU -> MaxPool2 ->
            Flatten -> FC1 -> ReLU -> FC2
        """
        # 初始化编码器为有序字典
        self.encoder = OrderedDict()
        # 添加第一层卷积层 Conv1
        self.encoder["Conv1"] = Conv2D(
            act_fn=ReLU(),
            init=self.init,
            pad=self.enc_conv1_pad,
            optimizer=self.optimizer,
            out_ch=self.enc_conv1_out_ch,
            stride=self.enc_conv1_stride,
            kernel_shape=self.enc_conv1_kernel_shape,
        )
        # 添加第一层池化层 Pool1
        self.encoder["Pool1"] = Pool2D(
            mode="max",
            optimizer=self.optimizer,
            stride=self.enc_pool1_stride,
            kernel_shape=self.enc_pool1_kernel_shape,
        )
        # 添加第二层卷积层 Conv2
        self.encoder["Conv2"] = Conv2D(
            act_fn=ReLU(),
            init=self.init,
            pad=self.enc_conv2_pad,
            optimizer=self.optimizer,
            out_ch=self.enc_conv2_out_ch,
            stride=self.enc_conv2_stride,
            kernel_shape=self.enc_conv2_kernel_shape,
        )
        # 添加第二层池化层 Pool2
        self.encoder["Pool2"] = Pool2D(
            mode="max",
            optimizer=self.optimizer,
            stride=self.enc_pool2_stride,
            kernel_shape=self.enc_pool2_kernel_shape,
        )
        # 添加展平层 Flatten
        self.encoder["Flatten3"] = Flatten(optimizer=self.optimizer)
        # 添加第一层全连接层 FC4
        self.encoder["FC4"] = FullyConnected(
            n_out=self.latent_dim, act_fn=ReLU(), optimizer=self.optimizer
        )
        # 添加第二层全连接层 FC5
        self.encoder["FC5"] = FullyConnected(
            n_out=self.T * 2,
            optimizer=self.optimizer,
            act_fn=Affine(slope=1, intercept=0),
            init=self.init,
        )
    # 构建 MLP 解码器
    def _build_decoder(self):
        """
        MLP decoder

        FC1 -> ReLU -> FC2 -> Sigmoid
        """
        # 初始化解码器为有序字典
        self.decoder = OrderedDict()
        # 添加全连接层 FC1 到解码器,使用 ReLU 激活函数
        self.decoder["FC1"] = FullyConnected(
            act_fn=ReLU(),
            init=self.init,
            n_out=self.latent_dim,
            optimizer=self.optimizer,
        )
        # 注意:`n_out` 取决于 X 的维度。我们现在使用占位符,并在 `forward` 方法中更新它
        # 添加全连接层 FC2 到解码器,使用 Sigmoid 激活函数
        self.decoder["FC2"] = FullyConnected(
            n_out=None, act_fn=Sigmoid(), optimizer=self.optimizer, init=self.init
        )

    # 返回模型的参数
    @property
    def parameters(self):
        return {
            "components": {
                # 返回编码器的参数
                "encoder": {k: v.parameters for k, v in self.encoder.items()},
                # 返回解码器的参数
                "decoder": {k: v.parameters for k, v in self.decoder.items()},
            }
        }

    @property
    # 返回模型的超参数字典
    def hyperparameters(self):
        return {
            "layer": "BernoulliVAE",  # 模型层类型
            "T": self.T,  # T 参数
            "init": self.init,  # 初始化方法
            "loss": str(self.loss),  # 损失函数
            "optimizer": self.optimizer,  # 优化器
            "latent_dim": self.latent_dim,  # 潜在空间维度
            "enc_conv1_pad": self.enc_conv1_pad,  # 编码器第一层卷积填充
            "enc_conv2_pad": self.enc_conv2_pad,  # 编码器第二层卷积填充
            "enc_conv1_in_ch": self.enc_conv1_in_ch,  # 编码器第一层卷积输入通道数
            "enc_conv1_stride": self.enc_conv1_stride,  # 编码器第一层卷积步长
            "enc_conv1_out_ch": self.enc_conv1_out_ch,  # 编码器第一层卷积输出通道数
            "enc_pool1_stride": self.enc_pool1_stride,  # 编码器第一层池化步长
            "enc_conv2_out_ch": self.enc_conv2_out_ch,  # 编码器第二层卷积输出通道数
            "enc_conv2_stride": self.enc_conv2_stride,  # 编码器第二层卷积步长
            "enc_pool2_stride": self.enc_pool2_stride,  # 编码器第二层池化步长
            "enc_conv2_kernel_shape": self.enc_conv2_kernel_shape,  # 编码器第二层卷积核形状
            "enc_pool2_kernel_shape": self.enc_pool2_kernel_shape,  # 编码器第二层池化核形状
            "enc_conv1_kernel_shape": self.enc_conv1_kernel_shape,  # 编码器第一层卷积核形状
            "enc_pool1_kernel_shape": self.enc_pool1_kernel_shape,  # 编码器第一层池化核形状
            "encoder_ids": list(self.encoder.keys()),  # 编码器 ID 列表
            "decoder_ids": list(self.decoder.keys()),  # 解码器 ID 列表
            "components": {
                "encoder": {k: v.hyperparameters for k, v in self.encoder.items()},  # 编码器超参数字典
                "decoder": {k: v.hyperparameters for k, v in self.decoder.items()},  # 解码器超参数字典
            },
        }

    @property
    # 计算派生变量,包括噪声、均值、对数方差等
    def derived_variables(self):
        # 初始化派生变量字典
        dv = {
            "noise": None,
            "t_mean": None,
            "t_log_var": None,
            "dDecoder_FC1_in": None,
            "dDecoder_t_mean": None,
            "dEncoder_FC5_out": None,
            "dDecoder_FC1_out": None,
            "dEncoder_FC4_out": None,
            "dEncoder_Pool2_out": None,
            "dEncoder_Conv2_out": None,
            "dEncoder_Pool1_out": None,
            "dEncoder_Conv1_out": None,
            "dDecoder_t_log_var": None,
            "dEncoder_Flatten3_out": None,
            # 初始化组件字典,包括编码器和解码器
            "components": {
                "encoder": {k: v.derived_variables for k, v in self.encoder.items()},
                "decoder": {k: v.derived_variables for k, v in self.decoder.items()},
            },
        }
        # 更新派生变量字典
        dv.update(self._dv)
        # 返回派生变量字典
        return dv

    # 获取梯度信息
    @property
    def gradients(self):
        # 返回梯度信息字典,包括编码器和解码器
        return {
            "components": {
                "encoder": {k: v.gradients for k, v in self.encoder.items()},
                "decoder": {k: v.gradients for k, v in self.decoder.items()},
            }
        }

    # 从分布中抽样
    def _sample(self, t_mean, t_log_var):
        """
        Returns a sample from the distribution

            q(t | x) = N(t_mean, diag(exp(t_log_var)))

        using the reparameterization trick.

        Parameters
        ----------
        t_mean : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, latent_dim)`
            Mean of the desired distribution.
        t_log_var : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, latent_dim)`
            Log variance vector of the desired distribution.

        Returns
        -------
        samples: :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, latent_dim)`
        """
        # 生成服从标准正态分布的噪声
        noise = np.random.normal(loc=0.0, scale=1.0, size=t_mean.shape)
        # 使用重参数化技巧从分布中抽样
        samples = noise * np.exp(t_log_var)   t_mean
        # 保存抽样的噪声用于反向传播
        self._dv["noise"] = noise
        # 返回抽样结果
        return samples
    # VAE 前向传播
    def forward(self, X_train):
        """VAE forward pass"""
        # 如果解码器的输出大小未知,则设置为 N
        if self.decoder["FC2"].n_out is None:
            fc2 = self.decoder["FC2"]
            self.decoder["FC2"] = fc2.set_params({"n_out": self.N})

        # 假设每个图像被表示为一个扁平化的行向量
        n_ex, in_rows, N, in_ch = X_train.shape

        # 对训练批次进行编码,以估计变分分布的均值和方差
        out = X_train
        for k, v in self.encoder.items():
            out = v.forward(out)

        # 从编码器输出中提取变分分布的均值和对数方差
        t_mean = out[:, : self.T]
        t_log_var = out[:, self.T :]

        # 使用重参数化技巧从 q(t | x) 中采样 t
        t = self._sample(t_mean, t_log_var)

        # 将采样的潜在值 t 通过解码器传递,生成平均重构
        X_recon = t
        for k, v in self.decoder.items():
            X_recon = v.forward(X_recon)

        self._dv["t_mean"] = t_mean
        self._dv["t_log_var"] = t_log_var
        return X_recon

    # 执行梯度更新
    def update(self, cur_loss=None):
        """Perform gradient updates"""
        # 对解码器进行反向梯度更新
        for k, v in reversed(list(self.decoder.items())):
            v.update(cur_loss)
        # 对编码器进行反向梯度更新
        for k, v in reversed(list(self.encoder.items())):
            v.update(cur_loss)
        # 清空梯度
        self.flush_gradients()

    # 在更新后重置参数梯度
    def flush_gradients(self):
        """Reset parameter gradients after update"""
        # 重置解码器参数梯度
        for k, v in self.decoder.items():
            v.flush_gradients()
        # 重置编码器参数梯度
        for k, v in self.encoder.items():
            v.flush_gradients()

numpy-mlnumpy_mlneural_netsmodelsw2v.py

代码语言:javascript复制
# 从 time 模块中导入 time 函数
from time import time

# 导入 numpy 库,并使用 np 别名
import numpy as np

# 从上一级目录中导入 layers 模块中的 Embedding 类
from ..layers import Embedding

# 从上一级目录中导入 losses 模块中的 NCELoss 类
from ..losses import NCELoss

# 从 preprocessing.nlp 模块中导入 Vocabulary 和 tokenize_words 函数
from ...preprocessing.nlp import Vocabulary, tokenize_words

# 从 utils.data_structures 模块中导入 DiscreteSampler 类
from ...utils.data_structures import DiscreteSampler

# 定义 Word2Vec 类
class Word2Vec(object):
    # 初始化方法
    def __init__(
        self,
        context_len=5,
        min_count=None,
        skip_gram=False,
        max_tokens=None,
        embedding_dim=300,
        filter_stopwords=True,
        noise_dist_power=0.75,
        init="glorot_uniform",
        num_negative_samples=64,
        optimizer="SGD(lr=0.1)",
    # 初始化参数方法
    def _init_params(self):
        # 初始化词典
        self._dv = {}
        # 构建噪声分布
        self._build_noise_distribution()

        # 初始化 Embedding 层
        self.embeddings = Embedding(
            init=self.init,
            vocab_size=self.vocab_size,
            n_out=self.embedding_dim,
            optimizer=self.optimizer,
            pool=None if self.skip_gram else "mean",
        )

        # 初始化 NCELoss 损失函数
        self.loss = NCELoss(
            init=self.init,
            optimizer=self.optimizer,
            n_classes=self.vocab_size,
            subtract_log_label_prob=False,
            noise_sampler=self._noise_sampler,
            num_negative_samples=self.num_negative_samples,
        )

    # 参数属性,返回模型参数
    @property
    def parameters(self):
        """Model parameters"""
        param = {"components": {"embeddings": {}, "loss": {}}}
        if hasattr(self, "embeddings"):
            param["components"] = {
                "embeddings": self.embeddings.parameters,
                "loss": self.loss.parameters,
            }
        return param

    @property
    # 返回模型的超参数,包括模型结构、初始化方式、优化器、最大标记数、上下文长度、嵌入维度、噪声分布幂、是否过滤停用词、负采样数、词汇表大小等信息
    def hyperparameters(self):
        """Model hyperparameters"""
        hp = {
            "layer": "Word2Vec",
            "init": self.init,
            "skip_gram": self.skip_gram,
            "optimizer": self.optimizer,
            "max_tokens": self.max_tokens,
            "context_len": self.context_len,
            "embedding_dim": self.embedding_dim,
            "noise_dist_power": self.noise_dist_power,
            "filter_stopwords": self.filter_stopwords,
            "num_negative_samples": self.num_negative_samples,
            "vocab_size": self.vocab_size if hasattr(self, "vocab_size") else None,
            "components": {"embeddings": {}, "loss": {}},
        }

        # 如果模型包含嵌入层和损失函数,则更新超参数信息
        if hasattr(self, "embeddings"):
            hp["components"] = {
                "embeddings": self.embeddings.hyperparameters,
                "loss": self.loss.hyperparameters,
            }
        return hp

    # 返回模型操作期间计算的变量,包括嵌入层和损失函数的派生变量
    @property
    def derived_variables(self):
        """Variables computed during model operation"""
        dv = {"components": {"embeddings": {}, "loss": {}}}
        dv.update(self._dv)

        # 如果模型包含嵌入层和损失函数,则更新派生变量信息
        if hasattr(self, "embeddings"):
            dv["components"] = {
                "embeddings": self.embeddings.derived_variables,
                "loss": self.loss.derived_variables,
            }
        return dv

    # 返回模型参数的梯度信息,包括嵌入层和损失函数的梯度
    @property
    def gradients(self):
        """Model parameter gradients"""
        grad = {"components": {"embeddings": {}, "loss": {}}}
        if hasattr(self, "embeddings"):
            grad["components"] = {
                "embeddings": self.embeddings.gradients,
                "loss": self.loss.gradients,
            }
        return grad
    def forward(self, X, targets, retain_derived=True):
        """
        Evaluate the network on a single minibatch.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_in)`
            Layer input, representing a minibatch of `n_ex` examples, each
            consisting of `n_in` integer word indices
        targets : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex,)`
            Target word index for each example in the minibatch.
        retain_derived : bool
            Whether to retain the variables calculated during the forward pass
            for use later during backprop. If `False`, this suggests the layer
            will not be expected to backprop through wrt. this input. Default
            True.

        Returns
        -------
        loss : float
            The loss associated with the current minibatch
        y_pred : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex,)`
            The conditional probabilities of the words in `targets` given the
            corresponding example / context in `X`.
        """
        # 计算输入层的嵌入表示
        X_emb = self.embeddings.forward(X, retain_derived=True)
        # 计算损失和预测值
        loss, y_pred = self.loss.loss(X_emb, targets.flatten(), retain_derived=True)
        return loss, y_pred

    def backward(self):
        """
        Compute the gradient of the loss wrt the current network parameters.
        """
        # 计算损失相对于当前网络参数的梯度
        dX_emb = self.loss.grad(retain_grads=True, update_params=False)
        # 反向传播梯度到嵌入层
        self.embeddings.backward(dX_emb)

    def update(self, cur_loss=None):
        """Perform gradient updates"""
        # 更新损失
        self.loss.update(cur_loss)
        # 更新嵌入层
        self.embeddings.update(cur_loss)
        # 清空梯度
        self.flush_gradients()

    def flush_gradients(self):
        """Reset parameter gradients after update"""
        # 清空损失的梯度
        self.loss.flush_gradients()
        # 清空嵌入层的梯度
        self.embeddings.flush_gradients()
    def get_embedding(self, word_ids):
        """
        Retrieve the embeddings for a collection of word IDs.

        Parameters
        ----------
        word_ids : :py:class:`ndarray <numpy.ndarray>` of shape `(M,)`
            An array of word IDs to retrieve embeddings for.

        Returns
        -------
        embeddings : :py:class:`ndarray <numpy.ndarray>` of shape `(M, n_out)`
            The embedding vectors for each of the `M` word IDs.
        """
        # 如果输入的word_ids是列表,则转换为NumPy数组
        if isinstance(word_ids, list):
            word_ids = np.array(word_ids)
        # 调用embeddings对象的lookup方法来获取word_ids对应的嵌入向量
        return self.embeddings.lookup(word_ids)

    def _build_noise_distribution(self):
        """
        Construct the noise distribution for use during negative sampling.

        For a word ``w`` in the corpus, the noise distribution is::

            P_n(w) = Count(w) ** noise_dist_power / Z

        where ``Z`` is a normalizing constant, and `noise_dist_power` is a
        hyperparameter of the model. Mikolov et al. report best performance
        using a `noise_dist_power` of 0.75.
        """
        # 检查是否已经存在vocab属性,如果不存在则抛出异常
        if not hasattr(self, "vocab"):
            raise ValueError("Must call `fit` before constructing noise distribution")

        # 初始化一个全零数组来存储噪声分布的概率
        probs = np.zeros(len(self.vocab))
        # 获取噪声分布的幂指数
        power = self.hyperparameters["noise_dist_power"]

        # 遍历词汇表中的每个词,计算其出现次数的幂指数作为噪声分布的概率
        for ix, token in enumerate(self.vocab):
            count = token.count
            probs[ix] = count ** power

        # 对概率进行归一化处理
        probs /= np.sum(probs)
        # 使用DiscreteSampler类来构建噪声分布的采样器
        self._noise_sampler = DiscreteSampler(probs, log=False, with_replacement=False)
    # 训练一个 epoch 的数据集
    def _train_epoch(self, corpus_fps, encoding):
        # 初始化总损失
        total_loss = 0
        # 获取一个批次的数据生成器
        batch_generator = self.minibatcher(corpus_fps, encoding)
        # 遍历每个批次的数据
        for ix, (X, target) in enumerate(batch_generator):
            # 计算当前批次的损失
            loss = self._train_batch(X, target)
            # 累加总损失
            total_loss  = loss
            # 如果设置了 verbose,则输出当前批次的损失
            if self.verbose:
                # 计算平滑损失
                smooth_loss = 0.99 * smooth_loss   0.01 * loss if ix > 0 else loss
                fstr = "[Batch {}] Loss: {:.5f} | Smoothed Loss: {:.5f}"
                print(fstr.format(ix   1, loss, smooth_loss))
        # 返回平均损失
        return total_loss / (ix   1)

    # 训练一个批次的数据
    def _train_batch(self, X, target):
        # 前向传播计算损失
        loss, _ = self.forward(X, target)
        # 反向传播
        self.backward()
        # 更新参数
        self.update(loss)
        # 返回当前批次的损失
        return loss

    # 拟合模型
    def fit(
        self, corpus_fps, encoding="utf-8-sig", n_epochs=20, batchsize=128, verbose=True
        ):
        """
        Learn word2vec embeddings for the examples in `X_train`.

        Parameters
        ----------
        corpus_fps : str or list of strs
            The filepath / list of filepaths to the document(s) to be encoded.
            Each document is expected to be encoded as newline-separated
            string of text, with adjacent tokens separated by a whitespace
            character.
        encoding : str
            Specifies the text encoding for corpus. Common entries are either
            'utf-8' (no header byte), or 'utf-8-sig' (header byte).  Default
            value is 'utf-8-sig'.
        n_epochs : int
            The maximum number of training epochs to run. Default is 20.
        batchsize : int
            The desired number of examples in each training batch. Default is
            128.
        verbose : bool
            Print batch information during training. Default is True.
        """
        # 设置是否打印训练信息
        self.verbose = verbose
        # 设置最大训练轮数
        self.n_epochs = n_epochs
        # 设置每个训练批次的样本数量
        self.batchsize = batchsize

        # 初始化词汇表对象
        self.vocab = Vocabulary(
            lowercase=True,
            min_count=self.min_count,
            max_tokens=self.max_tokens,
            filter_stopwords=self.filter_stopwords,
        )
        # 根据语料库文件路径和编码方式构建词汇表
        self.vocab.fit(corpus_fps, encoding=encoding)
        # 获取词汇表大小
        self.vocab_size = len(self.vocab)

        # 在训练模型时忽略特殊字符
        for sp in self.special_chars:
            self.vocab.counts[sp] = 0

        # 初始化词嵌入参数
        self._init_params()

        # 初始化上一次损失值
        prev_loss = np.inf
        # 开始训练
        for i in range(n_epochs):
            # 初始化损失值和时间
            loss, estart = 0.0, time()
            # 训练一个轮次,计算损失值
            loss = self._train_epoch(corpus_fps, encoding)

            # 打印每轮训练的平均损失值和时间
            fstr = "[Epoch {}] Avg. loss: {:.3f}  Delta: {:.3f} ({:.2f}m/epoch)"
            print(fstr.format(i   1, loss, prev_loss - loss, (time() - estart) / 60.0))
            prev_loss = loss

numpy-mlnumpy_mlneural_netsmodelswgan_gp.py

代码语言:javascript复制
# 导入所需的模块和库
from time import time
from collections import OrderedDict
import numpy as np
# 导入自定义的模块
from ..utils import minibatch
from ..layers import FullyConnected
from ..losses import WGAN_GPLoss

# 定义 WGAN_GP 类,实现 Wasserstein 生成对抗网络(WGAN)结构和梯度惩罚(GP)
class WGAN_GP(object):
    """
    A Wasserstein generative adversarial network (WGAN) architecture with
    gradient penalty (GP).

    Notes
    -----
    In contrast to a regular WGAN, WGAN-GP uses gradient penalty on the
    generator rather than weight clipping to encourage the 1-Lipschitz
    constraint:

    .. math::

        | \text{Generator}(mathbf{x}_1) - \text{Generator}(mathbf{x}_2) |
            leq |mathbf{x}_1 - mathbf{x}_2 |     \forall mathbf{x}_1, mathbf{x}_2

    In other words, the generator must have input gradients with a norm of at
    most 1 under the :math:`mathbf{X}_{real}` and :math:`mathbf{X}_{fake}`
    data distributions.

    To enforce this constraint, WGAN-GP penalizes the model if the generator
    gradient norm moves away from a target norm of 1. See
    :class:`~numpy_ml.neural_nets.losses.WGAN_GPLoss` for more details.

    In contrast to a standard WGAN, WGAN-GP avoids using BatchNorm in the
    critic, as correlation between samples in a batch can impact the stability
    of the gradient penalty.

    WGAP-GP architecture:

    .. code-block:: text

        X_real ------------------------|
                                        >---> [Critic] --> Y_out
        Z --> [Generator] --> X_fake --|

    where ``[Generator]`` is

    .. code-block:: text

        FC1 -> ReLU -> FC2 -> ReLU -> FC3 -> ReLU -> FC4

    and ``[Critic]`` is

    .. code-block:: text

        FC1 -> ReLU -> FC2 -> ReLU -> FC3 -> ReLU -> FC4

    and

    .. math::

        Z sim mathcal{N}(0, 1)
    """

    # 初始化函数,设置网络参数和优化器
    def __init__(
        self,
        g_hidden=512,
        init="he_uniform",
        optimizer="RMSProp(lr=0.0001)",
        debug=False,
    ):
        """
        Wasserstein generative adversarial network with gradient penalty.

        Parameters
        ----------
        g_hidden : int
            The number of units in the critic and generator hidden layers.
            Default is 512.
        init : str
            The weight initialization strategy. Valid entries are
            {'glorot_normal', 'glorot_uniform', 'he_normal', 'he_uniform',
            'std_normal', 'trunc_normal'}. Default is "he_uniform".
        optimizer : str or :doc:`Optimizer <numpy_ml.neural_nets.optimizers>` object or None
            The optimization strategy to use when performing gradient updates.
            If None, use the :class:`~numpy_ml.neural_nets.optimizers.SGD`
            optimizer with default parameters. Default is "RMSProp(lr=0.0001)".
        debug : bool
            Whether to store additional intermediate output within
            ``self.derived_variables``. Default is False.
        """
        # 初始化函数,设置初始参数
        self.init = init
        self.debug = debug
        self.g_hidden = g_hidden
        self.optimizer = optimizer

        self.lambda_ = None
        self.n_steps = None
        self.batchsize = None

        self.is_initialized = False

    # 初始化参数函数
    def _init_params(self):
        # 初始化存储派生变量的字典
        self._dv = {}
        # 初始化存储梯度的字典
        self._gr = {}
        # 构建评论者网络
        self._build_critic()
        # 构建生成器网络
        self._build_generator()
        # 设置初始化标志为True
        self.is_initialized = True
    def _build_generator(self):
        """
        构建生成器网络结构:FC1 -> ReLU -> FC2 -> ReLU -> FC3 -> ReLU -> FC4
        """
        # 初始化生成器网络结构为有序字典
        self.generator = OrderedDict()
        # 添加全连接层 FC1 到生成器网络结构中
        self.generator["FC1"] = FullyConnected(
            self.g_hidden, act_fn="ReLU", optimizer=self.optimizer, init=self.init
        )
        # 添加全连接层 FC2 到生成器网络结构中
        self.generator["FC2"] = FullyConnected(
            self.g_hidden, act_fn="ReLU", optimizer=self.optimizer, init=self.init
        )
        # 添加全连接层 FC3 到生成器网络结构中
        self.generator["FC3"] = FullyConnected(
            self.g_hidden, act_fn="ReLU", optimizer=self.optimizer, init=self.init
        )
        # 添加全连接层 FC4 到生成器网络结构中
        self.generator["FC4"] = FullyConnected(
            self.n_feats,
            act_fn="Affine(slope=1, intercept=0)",
            optimizer=self.optimizer,
            init=self.init,
        )

    def _build_critic(self):
        """
        构建评论者网络结构:FC1 -> ReLU -> FC2 -> ReLU -> FC3 -> ReLU -> FC4
        """
        # 初始化评论者网络结构为有序字典
        self.critic = OrderedDict()
        # 添加全连接层 FC1 到评论者网络结构中
        self.critic["FC1"] = FullyConnected(
            self.g_hidden, act_fn="ReLU", optimizer=self.optimizer, init=self.init
        )
        # 添加全连接层 FC2 到评论者网络结构中
        self.critic["FC2"] = FullyConnected(
            self.g_hidden, act_fn="ReLU", optimizer=self.optimizer, init=self.init
        )
        # 添加全连接层 FC3 到评论者网络结构中
        self.critic["FC3"] = FullyConnected(
            self.g_hidden, act_fn="ReLU", optimizer=self.optimizer, init=self.init
        )
        # 添加全连接层 FC4 到评论者网络结构中
        self.critic["FC4"] = FullyConnected(
            1,
            act_fn="Affine(slope=1, intercept=0)",
            optimizer=self.optimizer,
            init=self.init,
        )

    @property
    # 返回超参数字典,包括初始化方法、lambda参数、生成器隐藏层大小、训练步数、优化器、批大小、每轮更新critic的次数等
    def hyperparameters(self):
        return {
            "init": self.init,
            "lambda_": self.lambda_,
            "g_hidden": self.g_hidden,
            "n_steps": self.n_steps,
            "optimizer": self.optimizer,
            "batchsize": self.batchsize,
            "c_updates_per_epoch": self.c_updates_per_epoch,
            "components": {
                # 获取critic组件的超参数字典
                "critic": {k: v.hyperparameters for k, v in self.critic.items()},
                # 获取generator组件的超参数字典
                "generator": {k: v.hyperparameters for k, v in self.generator.items()},
            },
        }

    # 返回参数字典,包括critic和generator组件的参数
    @property
    def parameters(self):
        return {
            "components": {
                # 获取critic组件的参数字典
                "critic": {k: v.parameters for k, v in self.critic.items()},
                # 获取generator组件的参数字典
                "generator": {k: v.parameters for k, v in self.generator.items()},
            }
        }

    # 返回派生变量字典,包括critic和generator组件的派生变量
    @property
    def derived_variables(self):
        C = self.critic.items()
        G = self.generator.items()
        dv = {
            "components": {
                # 获取critic组件的派生变量字典
                "critic": {k: v.derived_variables for k, v in C},
                # 获取generator组件的派生变量字典
                "generator": {k: v.derived_variables for k, v in G},
            }
        }
        # 更新派生变量字典
        dv.update(self._dv)
        return dv

    # 返回梯度字典,包括critic和generator组件的梯度
    @property
    def gradients(self):
        grads = {
            "dC_Y_fake": None,
            "dC_Y_real": None,
            "dG_Y_fake": None,
            "dC_gradInterp": None,
            "components": {
                # 获取critic组件的梯度字典
                "critic": {k: v.gradients for k, v in self.critic.items()},
                # 获取generator组件的梯度字典
                "generator": {k: v.gradients for k, v in self.generator.items()},
            },
        }
        # 更新梯度字典
        grads.update(self._gr)
        return grads
    # 执行生成器或评论者的前向传播

    def forward(self, X, module, retain_derived=True):
        """
        Perform the forward pass for either the generator or the critic.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(batchsize, *)`
            Input data
        module : {'C' or 'G'}
            Whether to perform the forward pass for the critic ('C') or for the
            generator ('G').
        retain_derived : bool
            Whether to retain the variables calculated during the forward pass
            for use later during backprop. If False, this suggests the layer
            will not be expected to backprop through wrt. this input. Default
            is True.

        Returns
        -------
        out : :py:class:`ndarray <numpy.ndarray>` of shape `(batchsize, *)`
            The output of the final layer of the module.
        Xs : dict
            A dictionary with layer ids as keys and values corresponding to the
            input to each intermediate layer during the forward pass. Useful
            during debugging.
        """
        # 根据模块类型选择要执行的模块
        if module == "G":
            mod = self.generator
        elif module == "C":
            mod = self.critic
        else:
            raise ValueError("Unrecognized module name: {}".format(module))

        # 初始化存储中间结果的字典
        Xs = {}
        # 初始化输出和是否保留派生变量的标志
        out, rd = X, retain_derived
        # 遍历模块中的每一层,执行前向传播
        for k, v in mod.items():
            # 将当前层的输入存储到字典中
            Xs[k] = out
            # 执行当前层的前向传播
            out = v.forward(out, retain_derived=rd)
        # 返回最终层的输出和中间结果字典
        return out, Xs
    def backward(self, grad, module, retain_grads=True):
        """
        Perform the backward pass for either the generator or the critic.

        Parameters
        ----------
        grad : :py:class:`ndarray <numpy.ndarray>` of shape `(batchsize, *)` or list of arrays
            Gradient of the loss with respect to module output(s).
        module : {'C' or 'G'}
            Whether to perform the backward pass for the critic ('C') or for the
            generator ('G').
        retain_grads : bool
            Whether to include the intermediate parameter gradients computed
            during the backward pass in the final parameter update. Default is True.

        Returns
        -------
        out : :py:class:`ndarray <numpy.ndarray>` of shape `(batchsize, *)`
            The gradient of the loss with respect to the module input.
        dXs : dict
            A dictionary with layer ids as keys and values corresponding to the
            input to each intermediate layer during the backward pass. Useful
            during debugging.
        """
        # 根据传入的 module 参数选择要执行反向传播的模块
        if module == "G":
            mod = self.generator
        elif module == "C":
            mod = self.critic
        else:
            raise ValueError("Unrecognized module name: {}".format(module))

        # 初始化存储每个中间层输入的字典
        dXs = {}
        # 初始化输出和是否保留梯度的标志
        out, rg = grad, retain_grads
        # 遍历模块中的层,执行反向传播
        for k, v in reversed(list(mod.items())):
            # 将当前输出保存到中间层输入字典中
            dXs[k] = out
            # 执行当前层的反向传播,更新输出
            out = v.backward(out, retain_grads=rg)
        # 返回最终输出和中间层输入字典
        return out, dXs
    def _dGradInterp(self, dLdGradInterp, dYi_outs):
        """
        Compute the gradient penalty's contribution to the critic loss and
        update the parameter gradients accordingly.

        Parameters
        ----------
        dLdGradInterp : :py:class:`ndarray <numpy.ndarray>` of shape `(batchsize, critic_in_dim)`
            Gradient of `Y_interp` with respect to `X_interp`.
        dYi_outs : dict
            The intermediate outputs generated during the backward pass when
            computing `dLdGradInterp`.
        """
        # 将梯度值初始化为dLdGradInterp
        dy = dLdGradInterp
        # 遍历critic字典中的每个键值对
        for k, v in self.critic.items():
            # 获取当前层的输入X
            X = v.X[-1]  # layer input during forward pass
            # 计算当前层的梯度、权重和偏置
            dy, dW, dB = v._bwd2(dy, X, dYi_outs[k][2])
            # 更新当前层的权重梯度
            self.critic[k].gradients["W"]  = dW
            # 更新当前层的偏置梯度
            self.critic[k].gradients["b"]  = dB

    def update_generator(self, X_shape):
        """
        Compute parameter gradients for the generator on a single minibatch.

        Parameters
        ----------
        X_shape : tuple of `(batchsize, n_feats)`
            Shape for the input batch.

        Returns
        -------
        G_loss : float
            The generator loss on the fake data (generated during the critic
            update)
        """
        # 重置生成器的梯度为0
        self.flush_gradients("G")
        # 获取生成器生成的假数据Y_fake
        Y_fake = self.derived_variables["Y_fake"]

        # 获取Y_fake的行数和列数
        n_ex, _ = Y_fake.shape
        # 计算生成器损失
        G_loss = -Y_fake.mean()
        # 计算生成器损失的梯度
        dG_loss = -np.ones_like(Y_fake) / n_ex
        # 反向传播计算生成器的梯度
        self.backward(dG_loss, "G")

        # 如果开启了调试模式,则保存生成器损失和梯度
        if self.debug:
            self._dv["G_loss"] = G_loss
            self._dv["dG_Y_fake"] = dG_loss

        return G_loss

    def flush_gradients(self, module):
        """Reset parameter gradients to 0 after an update."""
        # 根据模块名称选择要重置梯度的模块
        if module == "G":
            mod = self.generator
        elif module == "C":
            mod = self.critic
        else:
            raise ValueError("Unrecognized module name: {}".format(module))

        # 将选定模块的所有参数梯度重置为0
        for k, v in mod.items():
            v.flush_gradients()
    # 更新模型参数,根据传入的模块名称选择对应的模型
    def update(self, module, module_loss=None):
        # 如果模块名称为 "G",则选择生成器模型
        if module == "G":
            mod = self.generator
        # 如果模块名称为 "C",则选择评论者模型
        elif module == "C":
            mod = self.critic
        # 如果模块名称不是 "G" 或 "C",则抛出数值错误异常
        else:
            raise ValueError("Unrecognized module name: {}".format(module))

        # 遍历模型中的参数,逆序遍历参数列表
        for k, v in reversed(list(mod.items())):
            # 更新模型参数
            v.update(module_loss)
        # 清空梯度
        self.flush_gradients(module)

    # 拟合模型
    def fit(
        self,
        X_real,
        lambda_,
        n_steps=1000,
        batchsize=128,
        c_updates_per_epoch=5,
        verbose=True,

numpy-mlnumpy_mlneural_netsmodels__init__.py

代码语言:javascript复制
# 从当前目录中导入 vae 模块中的所有内容
from .vae import *
# 从当前目录中导入 wgan_gp 模块中的所有内容
from .wgan_gp import *
# 从当前目录中导入 w2v 模块中的所有内容
from .w2v import *

numpy-mlnumpy_mlneural_netsmodulesmodules.py

代码语言:javascript复制
# 从 abc 模块导入 ABC 和 abstractmethod 装饰器
from abc import ABC, abstractmethod

# 导入 re 模块用于正则表达式操作
import re
# 导入 numpy 模块并重命名为 np
import numpy as np

# 从当前包的 wrappers 模块中导入 Dropout 类
from ..wrappers import Dropout
# 从当前包的 utils 模块中导入 calc_pad_dims_2D 函数
from ..utils import calc_pad_dims_2D
# 从当前包的 activations 模块中导入 Tanh, Sigmoid, ReLU, LeakyReLU, Affine 类
from ..activations import Tanh, Sigmoid, ReLU, LeakyReLU, Affine
# 从当前包的 layers 模块中导入各种层类
from ..layers import (
    DotProductAttention,
    FullyConnected,
    BatchNorm2D,
    Conv1D,
    Conv2D,
    Multiply,
    LSTMCell,
    Add,
)

# 定义一个抽象基类 ModuleBase
class ModuleBase(ABC):
    # 初始化方法
    def __init__(self):
        # 初始化 X 属性为 None
        self.X = None
        # 初始化 trainable 属性为 True
        self.trainable = True

        # 调用父类的初始化方法
        super().__init__()

    # 抽象方法,用于初始化参数
    @abstractmethod
    def _init_params(self, **kwargs):
        raise NotImplementedError

    # 抽象方法,前向传播
    @abstractmethod
    def forward(self, z, **kwargs):
        raise NotImplementedError

    # 抽象方法,反向传播
    @abstractmethod
    def backward(self, out, **kwargs):
        raise NotImplementedError

    # components 属性,返回组件列表
    @property
    def components(self):
        comps = []
        # 遍历组件列表中的组件 ID
        for c in self.hyperparameters["component_ids"]:
            # 如果当前对象有该组件,则添加到 comps 列表中
            if hasattr(self, c):
                comps.append(getattr(self, c))
        return comps

    # 冻结方法,将当前层及其组件设为不可训练
    def freeze(self):
        self.trainable = False
        for c in self.components:
            c.freeze()

    # 解冻方法,将当前层及其组件设为可训练
    def unfreeze(self):
        self.trainable = True
        for c in self.components:
            c.unfreeze()

    # 更新方法,更新参数
    def update(self, cur_loss=None):
        assert self.trainable, "Layer is frozen"
        for c in self.components:
            c.update(cur_loss)
        self.flush_gradients()

    # 清空梯度方法
    def flush_gradients(self):
        assert self.trainable, "Layer is frozen"

        # 清空梯度相关变量
        self.X = []
        self._dv = {}
        for c in self.components:
            for k, v in c.derived_variables.items():
                c.derived_variables[k] = None

            for k, v in c.gradients.items():
                c.gradients[k] = np.zeros_like(v)
    # 设置模型的参数,根据传入的字典 summary_dict
    def set_params(self, summary_dict):
        # 获取组件的 ID 列表
        cids = self.hyperparameters["component_ids"]
        # 遍历 summary_dict 中的参数
        for k, v in summary_dict["parameters"].items():
            # 如果参数是 "components",则进一步处理
            if k == "components":
                # 遍历组件参数字典
                for c, cd in summary_dict["parameters"][k].items():
                    # 如果组件在组件 ID 列表中,则设置组件的参数
                    if c in cids:
                        getattr(self, c).set_params(cd)

            # 如果参数在模型的参数列表中,则更新模型的参数值
            elif k in self.parameters:
                self.parameters[k] = v

        # 遍历 summary_dict 中的超参数
        for k, v in summary_dict["hyperparameters"].items():
            # 如果参数是 "components",则进一步处理
            if k == "components":
                # 遍历组件超参数字典
                for c, cd in summary_dict["hyperparameters"][k].items():
                    # 如果组件在组件 ID 列表中,则设置组件的超参数
                    if c in cids:
                        getattr(self, c).set_params(cd)

            # 如果超参数在模型的超参数列表中,则更新模型的超参数值
            if k in self.hyperparameters:
                # 根据不同的超参数值类型进行处理
                if k == "act_fn" and v == "ReLU":
                    self.hyperparameters[k] = ReLU()
                elif v == "act_fn" and v == "Sigmoid":
                    self.hyperparameters[k] = Sigmoid()
                elif v == "act_fn" and v == "Tanh":
                    self.hyperparameters[k] = Tanh()
                elif v == "act_fn" and "Affine" in v:
                    # 解析 Affine 函数的参数值
                    r = r"Affine(slope=(.*), intercept=(.*))"
                    slope, intercept = re.match(r, v).groups()
                    self.hyperparameters[k] = Affine(float(slope), float(intercept))
                elif v == "act_fn" and "Leaky ReLU" in v:
                    # 解析 Leaky ReLU 函数的参数值
                    r = r"Leaky ReLU(alpha=(.*))"
                    alpha = re.match(r, v).groups()[0]
                    self.hyperparameters[k] = LeakyReLU(float(alpha))
                else:
                    # 其他情况直接更新超参数值
                    self.hyperparameters[k] = v

    # 返回模型的摘要信息,包括参数、层信息和超参数
    def summary(self):
        return {
            "parameters": self.parameters,
            "layer": self.hyperparameters["layer"],
            "hyperparameters": self.hyperparameters,
        }
class WavenetResidualModule(ModuleBase):
    # 定义一个 Wavenet 残差模块类,继承自 ModuleBase 类
    def __init__(
        self,
        ch_residual,
        ch_dilation,
        dilation,
        kernel_width,
        optimizer=None,
        init="glorot_uniform",
    # 初始化函数,接受残差通道数、扩张通道数、扩张率、卷积核宽度、优化器和初始化方式等参数
    def _init_params(self):
        # 初始化参数字典
        self._dv = {}

        # 创建扩张卷积层对象
        self.conv_dilation = Conv1D(
            stride=1,
            pad="causal",
            init=self.init,
            kernel_width=2,
            dilation=self.dilation,
            out_ch=self.ch_dilation,
            optimizer=self.optimizer,
            act_fn=Affine(slope=1, intercept=0),
        )

        # 创建 Tanh 激活函数对象
        self.tanh = Tanh()
        # 创建 Sigmoid 激活函数对象
        self.sigm = Sigmoid()
        # 创建 Multiply 门对象
        self.multiply_gate = Multiply(act_fn=Affine(slope=1, intercept=0))

        # 创建 1x1 卷积层对象
        self.conv_1x1 = Conv1D(
            stride=1,
            pad="same",
            dilation=0,
            init=self.init,
            kernel_width=1,
            out_ch=self.ch_residual,
            optimizer=self.optimizer,
            act_fn=Affine(slope=1, intercept=0),
        )

        # 创建残差相加层对象
        self.add_residual = Add(act_fn=Affine(slope=1, intercept=0))
        # 创建跳跃连接相加层对象
        self.add_skip = Add(act_fn=Affine(slope=1, intercept=0))

    @property
    def parameters(self):
        """A dictionary of the module parameters."""
        # 返回模块参数的字典
        return {
            "components": {
                "conv_1x1": self.conv_1x1.parameters,
                "add_skip": self.add_skip.parameters,
                "add_residual": self.add_residual.parameters,
                "conv_dilation": self.conv_dilation.parameters,
                "multiply_gate": self.multiply_gate.parameters,
            }
        }

    @property
    # 返回模块的参数
    # 返回模块的超参数字典
    def hyperparameters(self):
        """A dictionary of the module hyperparameters"""
        return {
            "layer": "WavenetResidualModule",
            "init": self.init,
            "dilation": self.dilation,
            "optimizer": self.optimizer,
            "ch_residual": self.ch_residual,
            "ch_dilation": self.ch_dilation,
            "kernel_width": self.kernel_width,
            "component_ids": [
                "conv_1x1",
                "add_skip",
                "add_residual",
                "conv_dilation",
                "multiply_gate",
            ],
            "components": {
                "conv_1x1": self.conv_1x1.hyperparameters,
                "add_skip": self.add_skip.hyperparameters,
                "add_residual": self.add_residual.hyperparameters,
                "conv_dilation": self.conv_dilation.hyperparameters,
                "multiply_gate": self.multiply_gate.hyperparameters,
            },
        }

    # 返回计算过程中前向/后向传播期间计算的中间值的字典
    @property
    def derived_variables(self):
        """A dictionary of intermediate values computed during the
        forward/backward passes."""
        dv = {
            "conv_1x1_out": None,
            "conv_dilation_out": None,
            "multiply_gate_out": None,
            "components": {
                "conv_1x1": self.conv_1x1.derived_variables,
                "add_skip": self.add_skip.derived_variables,
                "add_residual": self.add_residual.derived_variables,
                "conv_dilation": self.conv_dilation.derived_variables,
                "multiply_gate": self.multiply_gate.derived_variables,
            },
        }
        # 更新中间值字典
        dv.update(self._dv)
        return dv

    @property
    # 返回模块参数梯度的字典
    def gradients(self):
        # 返回包含各组件参数梯度的字典
        return {
            "components": {
                # 获取 conv_1x1 组件的参数梯度
                "conv_1x1": self.conv_1x1.gradients,
                # 获取 add_skip 组件的参数梯度
                "add_skip": self.add_skip.gradients,
                # 获取 add_residual 组件的参数梯度
                "add_residual": self.add_residual.gradients,
                # 获取 conv_dilation 组件的参数梯度
                "conv_dilation": self.conv_dilation.gradients,
                # 获取 multiply_gate 组件的参数梯度
                "multiply_gate": self.multiply_gate.gradients,
            }
        }
    def forward(self, X_main, X_skip=None):
        """
        Compute the module output on a single minibatch.

        Parameters
        ----------
        X_main : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, in_rows, in_cols, in_ch)`
            The input volume consisting of `n_ex` examples, each with dimension
            (`in_rows`, `in_cols`, `in_ch`).
        X_skip : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, in_rows, in_cols, in_ch)`, or None
            The output of the preceding skip-connection if this is not the
            first module in the network.

        Returns
        -------
        Y_main : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, out_rows, out_cols, out_ch)`
            The output of the main pathway.
        Y_skip : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, out_rows, out_cols, out_ch)`
            The output of the skip-connection pathway.
        """
        # 设置输入数据和跳跃连接数据
        self.X_main, self.X_skip = X_main, X_skip
        # 计算卷积扩张层的输出
        conv_dilation_out = self.conv_dilation.forward(X_main)

        # 计算tanh门的输出
        tanh_gate = self.tanh.fn(conv_dilation_out)
        # 计算sigmoid门的输出
        sigm_gate = self.sigm.fn(conv_dilation_out)

        # 计算门控乘积的输出
        multiply_gate_out = self.multiply_gate.forward([tanh_gate, sigm_gate])
        # 计算1x1卷积层的输出
        conv_1x1_out = self.conv_1x1.forward(multiply_gate_out)

        # 如果这是第一个Wavenet块,则将“前一个”跳跃连接和卷积1x1输出的和初始化为0
        self.X_skip = np.zeros_like(conv_1x1_out) if X_skip is None else X_skip

        # 计算跳跃连接路径的输出
        Y_skip = self.add_skip.forward([X_skip, conv_1x1_out])
        # 计算主路径的输出
        Y_main = self.add_residual.forward([X_main, conv_1x1_out])

        # 保存各个中间结果,以便后续调试
        self._dv["tanh_out"] = tanh_gate
        self._dv["sigm_out"] = sigm_gate
        self._dv["conv_dilation_out"] = conv_dilation_out
        self._dv["multiply_gate_out"] = multiply_gate_out
        self._dv["conv_1x1_out"] = conv_1x1_out
        # 返回主路径和跳跃连接路径的输出
        return Y_main, Y_skip
    # 反向传播函数,计算梯度并返回
    def backward(self, dY_skip, dY_main=None):
        # 调用 add_skip 模块的反向传播函数,计算 skip path 的梯度和输出
        dX_skip, dConv_1x1_out = self.add_skip.backward(dY_skip)

        # 如果这是最后一个 wavenet block,dY_main 将为 None。如果不是,
        # 计算来自 dY_main 的误差贡献并添加到 skip path 的贡献中
        dX_main = np.zeros_like(self.X_main)
        if dY_main is not None:
            # 调用 add_residual 模块的反向传播函数,计算 main path 的梯度和输出
            dX_main, dConv_1x1_main = self.add_residual.backward(dY_main)
            dConv_1x1_out  = dConv_1x1_main

        # 调用 conv_1x1 模块的反向传播函数,计算梯度并返回
        dMultiply_out = self.conv_1x1.backward(dConv_1x1_out)
        # 调用 multiply_gate 模块的反向传播函数,计算梯度并返回
        dTanh_out, dSigm_out = self.multiply_gate.backward(dMultiply_out)

        # 获取派生变量中的 conv_dilation_out
        conv_dilation_out = self.derived_variables["conv_dilation_out"]
        # 计算 dTanh_in 和 dSigm_in,并乘以对应激活函数的梯度
        dTanh_in = dTanh_out * self.tanh.grad(conv_dilation_out)
        dSigm_in = dSigm_out * self.sigm.grad(conv_dilation_out)
        # 计算 dDilation_out
        dDilation_out = dTanh_in   dSigm_in

        # 调用 conv_dilation 模块的反向传播函数,计算梯度并返回
        conv_back = self.conv_dilation.backward(dDilation_out)
        dX_main  = conv_back

        # 存储各个梯度到派生变量中
        self._dv["dLdTanh"] = dTanh_out
        self._dv["dLdSigmoid"] = dSigm_out
        self._dv["dLdConv_1x1"] = dConv_1x1_out
        self._dv["dLdMultiply"] = dMultiply_out
        self._dv["dLdConv_dilation"] = dDilation_out
        # 返回 main path 和 skip path 的梯度
        return dX_main, dX_skip
class SkipConnectionIdentityModule(ModuleBase):
    # 定义一个继承自 ModuleBase 的 SkipConnectionIdentityModule 类
    def __init__(
        self,
        out_ch,
        kernel_shape1,
        kernel_shape2,
        stride1=1,
        stride2=1,
        act_fn=None,
        epsilon=1e-5,
        momentum=0.9,
        optimizer=None,
        init="glorot_uniform",
    # 初始化函数,接受多个参数,包括输出通道数、卷积核形状等
    def _init_params(self):
        # 初始化参数字典
        self._dv = {}

        # 创建第一个卷积层对象
        self.conv1 = Conv2D(
            pad="same",
            init=self.init,
            out_ch=self.out_ch,
            act_fn=self.act_fn,
            stride=self.stride1,
            optimizer=self.optimizer,
            kernel_shape=self.kernel_shape1,
        )
        # 无法初始化 `conv2`,需要 X 的维度;参见 `forward` 获取更多细节
        self.batchnorm1 = BatchNorm2D(epsilon=self.epsilon, momentum=self.momentum)
        self.batchnorm2 = BatchNorm2D(epsilon=self.epsilon, momentum=self.momentum)
        self.add3 = Add(self.act_fn)

    def _init_conv2(self):
        # 创建第二个卷积层对象
        self.conv2 = Conv2D(
            pad="same",
            init=self.init,
            out_ch=self.in_ch,
            stride=self.stride2,
            optimizer=self.optimizer,
            kernel_shape=self.kernel_shape2,
            act_fn=Affine(slope=1, intercept=0),
        )

    @property
    def parameters(self):
        """A dictionary of the module parameters."""
        # 返回模块参数的字典
        return {
            "components": {
                "add3": self.add3.parameters,
                "conv1": self.conv1.parameters,
                "conv2": self.conv2.parameters,
                "batchnorm1": self.batchnorm1.parameters,
                "batchnorm2": self.batchnorm2.parameters,
            }
        }

    @property
    # 返回模块的参数
    # 返回模块的超参数字典
    def hyperparameters(self):
        """A dictionary of the module hyperparameters."""
        return {
            "layer": "SkipConnectionIdentityModule",
            "init": self.init,
            "in_ch": self.in_ch,
            "out_ch": self.out_ch,
            "epsilon": self.epsilon,
            "stride1": self.stride1,
            "stride2": self.stride2,
            "momentum": self.momentum,
            "optimizer": self.optimizer,
            "act_fn": str(self.act_fn),
            "kernel_shape1": self.kernel_shape1,
            "kernel_shape2": self.kernel_shape2,
            "component_ids": ["conv1", "batchnorm1", "conv2", "batchnorm2", "add3"],
            "components": {
                "add3": self.add3.hyperparameters,
                "conv1": self.conv1.hyperparameters,
                "conv2": self.conv2.hyperparameters,
                "batchnorm1": self.batchnorm1.hyperparameters,
                "batchnorm2": self.batchnorm2.hyperparameters,
            },
        }

    # 返回模块计算过程中的中间值字典
    @property
    def derived_variables(self):
        """A dictionary of intermediate values computed during the
        forward/backward passes."""
        dv = {
            "conv1_out": None,
            "conv2_out": None,
            "batchnorm1_out": None,
            "batchnorm2_out": None,
            "components": {
                "add3": self.add3.derived_variables,
                "conv1": self.conv1.derived_variables,
                "conv2": self.conv2.derived_variables,
                "batchnorm1": self.batchnorm1.derived_variables,
                "batchnorm2": self.batchnorm2.derived_variables,
            },
        }
        # 更新中间值字典
        dv.update(self._dv)
        return dv

    @property
    # 返回累积模块参数梯度的字典
    def gradients(self):
        return {
            "components": {
                "add3": self.add3.gradients,
                "conv1": self.conv1.gradients,
                "conv2": self.conv2.gradients,
                "batchnorm1": self.batchnorm1.gradients,
                "batchnorm2": self.batchnorm2.gradients,
            }
        }

    # 计算给定输入体积 X 的模块输出
    def forward(self, X, retain_derived=True):
        """
        Compute the module output given input volume `X`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape (n_ex, in_rows, in_cols, in_ch)
            The input volume consisting of `n_ex` examples, each with dimension
            (`in_rows`, `in_cols`, `in_ch`).
        retain_derived : bool
            Whether to retain the variables calculated during the forward pass
            for use later during backprop. If False, this suggests the layer
            will not be expected to backprop through wrt. this input. Default
            is True.

        Returns
        -------
        Y : :py:class:`ndarray <numpy.ndarray>` of shape (n_ex, out_rows, out_cols, out_ch)
            The module output volume.
        """
        # 如果 self 没有属性 "conv2",则初始化 conv2
        if not hasattr(self, "conv2"):
            self.in_ch = X.shape[3]
            self._init_conv2()

        # 计算 conv1 的输出
        conv1_out = self.conv1.forward(X, retain_derived)
        # 计算 batchnorm1 的输出
        bn1_out = self.batchnorm1.forward(conv1_out, retain_derived)
        # 计算 conv2 的输出
        conv2_out = self.conv2.forward(bn1_out, retain_derived)
        # 计算 batchnorm2 的输出
        bn2_out = self.batchnorm2.forward(conv2_out, retain_derived)
        # 计算 add3 的输出
        Y = self.add3.forward([X, bn2_out], retain_derived)

        # 如果 retain_derived 为 True,则保存中间变量以备后用
        if retain_derived:
            self._dv["conv1_out"] = conv1_out
            self._dv["conv2_out"] = conv2_out
            self._dv["batchnorm1_out"] = bn1_out
            self._dv["batchnorm2_out"] = bn2_out
        # 返回模块输出
        return Y
    def backward(self, dLdY, retain_grads=True):
        """
        Compute the gradient of the loss with respect to the layer parameters.

        Parameters
        ----------
        dLdy : :py:class:`ndarray <numpy.ndarray>` of shape (`n_ex, out_rows, out_cols, out_ch`) or list of arrays
            The gradient(s) of the loss with respect to the module output(s).
        retain_grads : bool
            Whether to include the intermediate parameter gradients computed
            during the backward pass in the final parameter update. Default is
            True.

        Returns
        -------
        dX : :py:class:`ndarray <numpy.ndarray>` of shape (n_ex, in_rows, in_cols, in_ch)
            The gradient of the loss with respect to the module input volume.
        """
        # Compute the backward pass for the add3 layer and get the gradient of the loss with respect to the input and output of add3
        dX, dBn2_out = self.add3.backward(dLdY, retain_grads)
        # Compute the backward pass for the batchnorm2 layer using the gradient from add3 layer
        dConv2_out = self.batchnorm2.backward(dBn2_out, retain_grads)
        # Compute the backward pass for the conv2 layer using the gradient from batchnorm2 layer
        dBn1_out = self.conv2.backward(dConv2_out, retain_grads)
        # Compute the backward pass for the batchnorm1 layer using the gradient from conv2 layer
        dConv1_out = self.batchnorm1.backward(dBn1_out, retain_grads)
        # Compute the backward pass for the conv1 layer using the gradient from batchnorm1 layer and add it to the existing gradient
        dX  = self.conv1.backward(dConv1_out, retain_grads)

        # Store the gradients for each layer in the internal dictionary for reference
        self._dv["dLdAdd3_X"] = dX
        self._dv["dLdBn2"] = dBn2_out
        self._dv["dLdBn1"] = dBn1_out
        self._dv["dLdConv2"] = dConv2_out
        self._dv["dLdConv1"] = dConv1_out
        # Return the final gradient of the loss with respect to the module input volume
        return dX
class SkipConnectionConvModule(ModuleBase):
    # 定义一个继承自 ModuleBase 的 SkipConnectionConvModule 类
    def __init__(
        self,
        out_ch1,
        out_ch2,
        kernel_shape1,
        kernel_shape2,
        kernel_shape_skip,
        pad1=0,
        pad2=0,
        stride1=1,
        stride2=1,
        act_fn=None,
        epsilon=1e-5,
        momentum=0.9,
        stride_skip=1,
        optimizer=None,
        init="glorot_uniform",
    ):
        # 初始化函数,接受多个参数,包括卷积层的参数、激活函数、优化器等
    def _init_params(self, X=None):
        # 初始化参数函数,接受输入 X,但在此处未使用
        self._dv = {}
        # 初始化一个空字典 _dv
        self.conv1 = Conv2D(
            pad=self.pad1,
            init=self.init,
            act_fn=self.act_fn,
            out_ch=self.out_ch1,
            stride=self.stride1,
            optimizer=self.optimizer,
            kernel_shape=self.kernel_shape1,
        )
        # 初始化第一个卷积层,设置卷积参数、激活函数、输出通道数等
        self.conv2 = Conv2D(
            pad=self.pad2,
            init=self.init,
            out_ch=self.out_ch2,
            stride=self.stride2,
            optimizer=self.optimizer,
            kernel_shape=self.kernel_shape2,
            act_fn=Affine(slope=1, intercept=0),
        )
        # 初始化第二个卷积层,设置卷积参数、输出通道数、激活函数等
        # 无法初始化 `conv_skip`,需要 X 的维度;参见 `forward` 获取更多细节
        self.batchnorm1 = BatchNorm2D(epsilon=self.epsilon, momentum=self.momentum)
        # 初始化第一个批归一化层,设置 epsilon 和 momentum 参数
        self.batchnorm2 = BatchNorm2D(epsilon=self.epsilon, momentum=self.momentum)
        # 初始化第二个批归一化层,设置 epsilon 和 momentum 参数
        self.batchnorm_skip = BatchNorm2D(epsilon=self.epsilon, momentum=self.momentum)
        # 初始化跳跃连接的批归一化层,设置 epsilon 和 momentum 参数
        self.add3 = Add(self.act_fn)
        # 初始化一个加法层,使用给定的激活函数
    # 计算卷积层之间的填充大小
    def _calc_skip_padding(self, X):
        # 初始化填充列表
        pads = []
        # 遍历每个填充参数
        for p in [self.pad1, self.pad2]:
            # 如果填充参数是整数,则转换为四元组
            if isinstance(p, int):
                pads.append((p, p, p, p))
            # 如果填充参数是二元组且长度为2,则转换为四元组
            elif isinstance(p, tuple) and len(p) == 2:
                pads.append((p[0], p[0], p[1], p[1])
        # 更新填充参数
        self.pad1, self.pad2 = pads

        # 计算卷积1输出的维度
        s1 = self.stride1
        fr1, fc1 = self.kernel_shape1
        _, in_rows, in_cols, _ = X.shape
        pr11, pr12, pc11, pc12 = self.pad1

        out_rows1 = np.floor(1   (in_rows   pr11   pr12 - fr1) / s1).astype(int)
        out_cols1 = np.floor(1   (in_cols   pc11   pc12 - fc1) / s1).astype(int)

        # 计算卷积2输出的维度
        s2 = self.stride2
        fr2, fc2 = self.kernel_shape2
        pr21, pr22, pc21, pc22 = self.pad2

        out_rows2 = np.floor(1   (out_rows1   pr21   pr22 - fr2) / s2).astype(int)
        out_cols2 = np.floor(1   (out_cols1   pc21   pc22 - fc2) / s2).astype(int)

        # 最后,计算跳跃卷积的适当填充维度
        desired_dims = (out_rows2, out_cols2)
        self.pad_skip = calc_pad_dims_2D(
            X.shape,
            desired_dims,
            stride=self.stride_skip,
            kernel_shape=self.kernel_shape_skip,
        )

    # 初始化跳跃卷积层
    def _init_conv_skip(self, X):
        # 计算跳跃卷积的填充大小
        self._calc_skip_padding(X)
        # 创建跳跃卷积层对象
        self.conv_skip = Conv2D(
            init=self.init,
            pad=self.pad_skip,
            out_ch=self.out_ch2,
            stride=self.stride_skip,
            kernel_shape=self.kernel_shape_skip,
            act_fn=Affine(slope=1, intercept=0),
            optimizer=self.optimizer,
        )

    # 属性方法
    @property
    # 返回模块参数的字典
    def parameters(self):
        """A dictionary of the module parameters."""
        return {
            # 返回包含各组件参数的字典
            "components": {
                # 添加3的参数
                "add3": self.add3.parameters,
                # 卷积层1的参数
                "conv1": self.conv1.parameters,
                # 卷积层2的参数
                "conv2": self.conv2.parameters,
                # 如果存在跳跃连接的卷积层,返回其参数;否则返回None
                "conv_skip": self.conv_skip.parameters
                if hasattr(self, "conv_skip")
                else None,
                # 批归一化层1的参数
                "batchnorm1": self.batchnorm1.parameters,
                # 批归一化层2的参数
                "batchnorm2": self.batchnorm2.parameters,
                # 如果存在跳跃连接的批归一化层,返回其参数
                "batchnorm_skip": self.batchnorm_skip.parameters,
            }
        }

    @property
    # 返回模块超参数的字典
    def hyperparameters(self):
        """A dictionary of the module hyperparameters."""
        return {
            "layer": "SkipConnectionConvModule",
            "init": self.init,
            "pad1": self.pad1,
            "pad2": self.pad2,
            "in_ch": self.in_ch,
            "out_ch1": self.out_ch1,
            "out_ch2": self.out_ch2,
            "epsilon": self.epsilon,
            "stride1": self.stride1,
            "stride2": self.stride2,
            "momentum": self.momentum,
            "act_fn": str(self.act_fn),
            "stride_skip": self.stride_skip,
            "kernel_shape1": self.kernel_shape1,
            "kernel_shape2": self.kernel_shape2,
            "kernel_shape_skip": self.kernel_shape_skip,
            "pad_skip": self.pad_skip if hasattr(self, "pad_skip") else None,
            "component_ids": [
                "add3",
                "conv1",
                "conv2",
                "conv_skip",
                "batchnorm1",
                "batchnorm2",
                "batchnorm_skip",
            ],
            "components": {
                "add3": self.add3.hyperparameters,
                "conv1": self.conv1.hyperparameters,
                "conv2": self.conv2.hyperparameters,
                "conv_skip": self.conv_skip.hyperparameters
                if hasattr(self, "conv_skip")
                else None,
                "batchnorm1": self.batchnorm1.hyperparameters,
                "batchnorm2": self.batchnorm2.hyperparameters,
                "batchnorm_skip": self.batchnorm_skip.hyperparameters,
            },
        }

    @property
    # 计算前向/后向传播过程中计算的中间值的字典
    def derived_variables(self):
        dv = {
            "conv1_out": None,
            "conv2_out": None,
            "conv_skip_out": None,
            "batchnorm1_out": None,
            "batchnorm2_out": None,
            "batchnorm_skip_out": None,
            "components": {
                "add3": self.add3.derived_variables,  # 计算 add3 模块的派生变量
                "conv1": self.conv1.derived_variables,  # 计算 conv1 模块的派生变量
                "conv2": self.conv2.derived_variables,  # 计算 conv2 模块的派生变量
                "conv_skip": self.conv_skip.derived_variables  # 如果存在 conv_skip 模块,则计算其派生变量
                if hasattr(self, "conv_skip")  # 检查是否存在 conv_skip 模块
                else None,
                "batchnorm1": self.batchnorm1.derived_variables,  # 计算 batchnorm1 模块的派生变量
                "batchnorm2": self.batchnorm2.derived_variables,  # 计算 batchnorm2 模块的派生变量
                "batchnorm_skip": self.batchnorm_skip.derived_variables,  # 计算 batchnorm_skip 模块的派生变量
            },
        }
        # 更新派生变量字典
        dv.update(self._dv)
        return dv

    @property
    # 累积模块参数梯度的字典
    def gradients(self):
        return {
            "components": {
                "add3": self.add3.gradients,  # 获取 add3 模块的梯度
                "conv1": self.conv1.gradients,  # 获取 conv1 模块的梯度
                "conv2": self.conv2.gradients,  # 获取 conv2 模块的梯度
                "conv_skip": self.conv_skip.gradients  # 获取 conv_skip 模块的梯度
                if hasattr(self, "conv_skip")  # 检查是否存在 conv_skip 模块
                else None,
                "batchnorm1": self.batchnorm1.gradients,  # 获取 batchnorm1 模块的梯度
                "batchnorm2": self.batchnorm2.gradients,  # 获取 batchnorm2 模块的梯度
                "batchnorm_skip": self.batchnorm_skip.gradients,  # 获取 batchnorm_skip 模块的梯度
            }
        }
    def forward(self, X, retain_derived=True):
        """
        Compute the layer output given input volume `X`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, in_rows, in_cols, in_ch)`
            The input volume consisting of `n_ex` examples, each with dimension
            (`in_rows`, `in_cols`, `in_ch`).
        retain_derived : bool
            Whether to retain the variables calculated during the forward pass
            for use later during backprop. If False, this suggests the layer
            will not be expected to backprop through wrt. this input. Default
            is True.

        Returns
        -------
        Y : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, out_rows, out_cols, out_ch)`
            The module output volume.
        """
        # 现在我们有了输入 X 的维度,可以初始化 `conv_skip` 层的正确填充
        if not hasattr(self, "conv_skip"):
            self._init_conv_skip(X)
            self.in_ch = X.shape[3]

        # 计算第一个卷积层的输出
        conv1_out = self.conv1.forward(X, retain_derived)
        # 计算第一个批归一化层的输出
        bn1_out = self.batchnorm1.forward(conv1_out, retain_derived)
        # 计算第二个卷积层的输出
        conv2_out = self.conv2.forward(bn1_out, retain_derived)
        # 计算第二个批归一化层的输出
        bn2_out = self.batchnorm2.forward(conv2_out, retain_derived)
        # 计算跳跃连接卷积层的输出
        conv_skip_out = self.conv_skip.forward(X, retain_derived)
        # 计算跳跃连接批归一化层的输出
        bn_skip_out = self.batchnorm_skip.forward(conv_skip_out, retain_derived)
        # 计算三个层的输出相加的结果
        Y = self.add3.forward([bn_skip_out, bn2_out], retain_derived)

        # 如果需要保留派生变量,则将它们保存在 _dv 字典中
        if retain_derived:
            self._dv["conv1_out"] = conv1_out
            self._dv["conv2_out"] = conv2_out
            self._dv["batchnorm1_out"] = bn1_out
            self._dv["batchnorm2_out"] = bn2_out
            self._dv["conv_skip_out"] = conv_skip_out
            self._dv["batchnorm_skip_out"] = bn_skip_out
        # 返回模块的输出
        return Y
    def backward(self, dLdY, retain_grads=True):
        """
        Compute the gradient of the loss with respect to the module parameters.

        Parameters
        ----------
        dLdy : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, out_rows, out_cols, out_ch)`
        or list of arrays
            The gradient(s) of the loss with respect to the module output(s).
        retain_grads : bool
            Whether to include the intermediate parameter gradients computed
            during the backward pass in the final parameter update. Default is
            True.

        Returns
        -------
        dX : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, in_rows, in_cols, in_ch)`
            The gradient of the loss with respect to the module input volume.
        """
        # 计算 Add3 模块的反向传播,得到对应的梯度
        dBnskip_out, dBn2_out = self.add3.backward(dLdY)
        # 计算 BatchNorm 模块的反向传播,得到对应的梯度
        dConvskip_out = self.batchnorm_skip.backward(dBnskip_out)
        # 计算 Convolution 模块的反向传播,得到对应的梯度
        dX = self.conv_skip.backward(dConvskip_out)

        # 计算 BatchNorm2 模块的反向传播,得到对应的梯度
        dConv2_out = self.batchnorm2.backward(dBn2_out)
        # 计算 Convolution2 模块的反向传播,得到对应的梯度
        dBn1_out = self.conv2.backward(dConv2_out)
        # 计算 BatchNorm1 模块的反向传播,得到对应的梯度
        dConv1_out = self.batchnorm1.backward(dBn1_out)
        # 将 Convolution1 模块的反向传播梯度加到之前的梯度上
        dX  = self.conv1.backward(dConv1_out)

        # 如果需要保留中间参数梯度,则将它们保存在对应的变量中
        if retain_grads:
            self._dv["dLdAdd3_X"] = dX
            self._dv["dLdBn1"] = dBn1_out
            self._dv["dLdBn2"] = dBn2_out
            self._dv["dLdConv1"] = dConv1_out
            self._dv["dLdConv2"] = dConv2_out
            self._dv["dLdBnSkip"] = dBnskip_out
            self._dv["dLdConvSkip"] = dConvskip_out
        # 返回输入体积的梯度
        return dX
class BidirectionalLSTM(ModuleBase):
    # 定义一个双向长短期记忆(LSTM)层
    def __init__(
        self,
        n_out,
        act_fn=None,
        gate_fn=None,
        merge_mode="concat",
        init="glorot_uniform",
        optimizer=None,
    ):
        """
        A single bidirectional long short-term memory (LSTM) layer.

        Parameters
        ----------
        n_out : int
            The dimension of a single hidden state / output on a given timestep
        act_fn : :doc:`Activation <numpy_ml.neural_nets.activations>` object or None
            The activation function for computing ``A[t]``. If not specified,
            use :class:`~numpy_ml.neural_nets.activations.Tanh` by default.
        gate_fn : :doc:`Activation <numpy_ml.neural_nets.activations>` object or None
            The gate function for computing the update, forget, and output
            gates. If not specified, use
            :class:`~numpy_ml.neural_nets.activations.Sigmoid` by default.
        merge_mode : {"sum", "multiply", "concat", "average"}
            Mode by which outputs of the forward and backward LSTMs will be
            combined. Default is 'concat'.
        optimizer : str or :doc:`Optimizer <numpy_ml.neural_nets.optimizers>` object or None
            The optimization strategy to use when performing gradient updates
            within the `update` method.  If None, use the
            :class:`~numpy_ml.neural_nets.optimizers.SGD` optimizer with
            default parameters. Default is None.
        init : {'glorot_normal', 'glorot_uniform', 'he_normal', 'he_uniform'}
            The weight initialization strategy. Default is 'glorot_uniform'.
        """
        # 调用父类的构造函数
        super().__init__()

        # 初始化参数
        self.init = init
        self.n_in = None
        self.n_out = n_out
        self.optimizer = optimizer
        self.merge_mode = merge_mode
        # 如果未指定激活函数,则使用Tanh作为默认激活函数
        self.act_fn = Tanh() if act_fn is None else act_fn
        # 如果未指定门函数,则使用Sigmoid作为默认门函数
        self.gate_fn = Sigmoid() if gate_fn is None else gate_fn
        # 初始化参数
        self._init_params()
    # 初始化参数,创建前向和后向的LSTM单元
    def _init_params(self):
        self.cell_fwd = LSTMCell(
            init=self.init,
            n_out=self.n_out,
            act_fn=self.act_fn,
            gate_fn=self.gate_fn,
            optimizer=self.optimizer,
        )
        self.cell_bwd = LSTMCell(
            init=self.init,
            n_out=self.n_out,
            act_fn=self.act_fn,
            gate_fn=self.gate_fn,
            optimizer=self.optimizer,
        )

    # 前向传播函数,对输入的所有时间步进行前向传播
    def forward(self, X):
        """
        Run a forward pass across all timesteps in the input.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_in, n_t)`
            Input consisting of `n_ex` examples each of dimensionality `n_in`
            and extending for `n_t` timesteps.

        Returns
        -------
        Y : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_out, n_t)`
            The value of the hidden state for each of the `n_ex` examples
            across each of the `n_t` timesteps.
        """
        Y_fwd, Y_bwd, Y = [], [], []  # 初始化前向、后向和合并后的隐藏状态列表
        n_ex, self.n_in, n_t = X.shape  # 获取输入数据的形状信息

        # 前向LSTM
        for t in range(n_t):
            yt, ct = self.cell_fwd.forward(X[:, :, t])  # 对每个时间步进行前向传播
            Y_fwd.append(yt)  # 将隐藏状态添加到前向隐藏状态列表中

        # 后向LSTM
        for t in reversed(range(n_t)):
            yt, ct = self.cell_bwd.forward(X[:, :, t])  # 对每个时间步进行后向传播
            Y_bwd.insert(0, yt)  # 将隐藏状态添加到后向隐藏状态列表中

        # 合并前向和后向状态
        for t in range(n_t):
            if self.merge_mode == "concat":
                Y.append(np.concatenate([Y_fwd[t], Y_bwd[t]], axis=1))  # 按照指定方式合并隐藏状态
            elif self.merge_mode == "sum":
                Y.append(Y_fwd[t]   Y_bwd[t])  # 按照指定方式合并隐藏状态
            elif self.merge_mode == "average":
                Y.append((Y_fwd[t]   Y_bwd[t]) / 2)  # 按照指定方式合并隐藏状态
            elif self.merge_mode == "multiply":
                Y.append(Y_fwd[t] * Y_bwd[t])  # 按照指定方式合并隐藏状态

        self.Y_fwd, self.Y_bwd = Y_fwd, Y_bwd  # 保存前向和后向隐藏状态列表
        return np.dstack(Y)  # 返回合并后的隐藏状态
    # 在输入的所有时间步上运行反向传播

    def backward(self, dLdA):
        """
        Run a backward pass across all timesteps in the input.

        Parameters
        ----------
        dLdA : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_out, n_t)`
            The gradient of the loss with respect to the layer output for each
            of the `n_ex` examples across all `n_t` timesteps.

        Returns
        -------
        dLdX : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_in, n_t)`
            The value of the hidden state for each of the `n_ex` examples
            across each of the `n_t` timesteps.
        """
        # 检查层是否可训练
        assert self.trainable, "Layer is frozen"

        # 获取输入的形状信息
        n_ex, n_out, n_t = dLdA.shape
        dLdX_f, dLdX_b, dLdX = [], [], []

        # 前向 LSTM
        for t in reversed(range(n_t)):
            # 根据合并模式选择不同的反向传播方式
            if self.merge_mode == "concat":
                dLdXt_f = self.cell_fwd.backward(dLdA[:, : self.n_out, t])
            elif self.merge_mode == "sum":
                dLdXt_f = self.cell_fwd.backward(dLdA[:, :, t])
            elif self.merge_mode == "multiplty":
                dLdXt_f = self.cell_fwd.backward(dLdA[:, :, t] * self.Y_bwd[t])
            elif self.merge_mode == "average":
                dLdXt_f = self.cell_fwd.backward(dLdA[:, :, t] * 0.5)
            dLdX_f.insert(0, dLdXt_f)

        # 后向 LSTM
        for t in range(n_t):
            # 根据合并模式选择不同的反向传播方式
            if self.merge_mode == "concat":
                dLdXt_b = self.cell_bwd.backward(dLdA[:, self.n_out :, t])
            elif self.merge_mode == "sum":
                dLdXt_b = self.cell_bwd.backward(dLdA[:, :, t])
            elif self.merge_mode == "multiplty":
                dLdXt_b = self.cell_bwd.backward(dLdA[:, :, t] * self.Y_fwd[t])
            elif self.merge_mode == "average":
                dLdXt_b = self.cell_bwd.backward(dLdA[:, :, t] * 0.5)
            dLdX_b.append(dLdXt_b)

        # 将前向和后向 LSTM 的结果相加
        for t in range(n_t):
            dLdX.append(dLdX_f[t]   dLdX_b[t])

        # 沿着第三个维度堆叠结果
        return np.dstack(dLdX)
    @property
    def derived_variables(self):
        """返回在前向/后向传递过程中计算的中间值的字典。"""
        return {
            "components": {
                "cell_fwd": self.cell_fwd.derived_variables,
                "cell_bwd": self.cell_bwd.derived_variables,
            }
        }

    @property
    def gradients(self):
        """返回累积的模块参数梯度的字典。"""
        return {
            "components": {
                "cell_fwd": self.cell_fwd.gradients,
                "cell_bwd": self.cell_bwd.gradients,
            }
        }

    @property
    def parameters(self):
        """返回模块参数的字典。"""
        return {
            "components": {
                "cell_fwd": self.cell_fwd.parameters,
                "cell_bwd": self.cell_bwd.parameters,
            }
        }

    @property
    def hyperparameters(self):
        """返回模块超参数的字典。"""
        return {
            "layer": "BidirectionalLSTM",
            "init": self.init,
            "n_in": self.n_in,
            "n_out": self.n_out,
            "act_fn": str(self.act_fn),
            "optimizer": self.optimizer,
            "merge_mode": self.merge_mode,
            "component_ids": ["cell_fwd", "cell_bwd"],
            "components": {
                "cell_fwd": self.cell_fwd.hyperparameters,
                "cell_bwd": self.cell_bwd.hyperparameters,
            },
        }
class MultiHeadedAttentionModule(ModuleBase):
    # 多头注意力模块类,继承自 ModuleBase 类
    def _init_params(self):
        # 初始化参数字典
        self._dv = {}

        # 假设 keys、query、values 的维度相同
        assert self.kqv_dim % self.n_heads == 0
        # 计算每个头的潜在维度
        self.latent_dim = self.kqv_dim // self.n_heads

        # 创建点积注意力对象,设置缩放参数和丢弃率
        self.attention = DotProductAttention(scale=True, dropout_p=self.dropout_p)
        # 创建投影矩阵字典,包括 Q、K、V、O 四个投影矩阵
        self.projections = {
            k: Dropout(
                FullyConnected(
                    init=self.init,
                    n_out=self.kqv_dim,
                    optimizer=self.optimizer,
                    act_fn="Affine(slope=1, intercept=0)",
                ),
                self.dropout_p,
            )
            for k in ["Q", "K", "V", "O"]
        }

        # 标记初始化完成
        self.is_initialized = True
    # 实现多头注意力机制的前向传播过程
    def forward(self, Q, K, V):
        # 如果模型未初始化,则初始化参数
        if not self.is_initialized:
            # 获取查询向量的维度
            self.kqv_dim = Q.shape[-1]
            # 初始化参数
            self._init_params()

        # 将查询、键和值投影到 `latent_dim` 维度的子空间
        n_ex = Q.shape[0]
        for k, x in zip(["Q", "K", "V"], [Q, K, V]):
            # 对输入数据进行投影
            proj = self.projections[k].forward(x)
            # 重塑投影后的数据形状
            proj = proj.reshape(n_ex, -1, self.n_heads, self.latent_dim).swapaxes(1, 2)
            self._dv["{}_proj".format(k)] = proj

        # 获取派生变量
        dv = self.derived_variables
        Q_proj, K_proj, V_proj = dv["Q_proj"], dv["K_proj"], dv["V_proj"]

        # 对投影后的向量应用缩放点积注意力机制
        attn = self.attention
        attn_out = attn.forward(Q_proj, K_proj, V_proj)
        self._dv["attention_weights"] = attn.derived_variables["attention_weights"]

        # 使用 `reshape` 连接不同头的注意力输出,创建一个 `kqv_dim` 维向量
        attn_out = attn_out.swapaxes(1, 2).reshape(n_ex, self.kqv_dim)
        self._dv["attention_out"] = attn_out.reshape(n_ex, -1, self.kqv_dim)

        # 应用最终的输出投影
        Y = self.projections["O"].forward(attn_out)
        Y = Y.reshape(n_ex, -1, self.kqv_dim)
        # 返回最终输出结果
        return Y
    # 反向传播函数,计算损失对查询、键、值的梯度
    def backward(self, dLdy):
        # 获取样本数量
        n_ex = dLdy.shape[0]
        # 重塑梯度形状
        dLdy = dLdy.reshape(n_ex, self.kqv_dim)
        # 调用投影层的反向传播函数
        dLdX = self.projections["O"].backward(dLdy)
        # 重塑梯度形状
        dLdX = dLdX.reshape(n_ex, self.n_heads, -1, self.latent_dim)

        # 调用注意力机制的反向传播函数
        dLdQ_proj, dLdK_proj, dLdV_proj = self.attention.backward(dLdX)

        # 更新导数字典
        self._dv["dQ_proj"] = dLdQ_proj
        self._dv["dK_proj"] = dLdK_proj
        self._dv["dV_proj"] = dLdV_proj

        # 重塑梯度形状
        dLdQ_proj = dLdQ_proj.reshape(n_ex, self.kqv_dim)
        dLdK_proj = dLdK_proj.reshape(n_ex, self.kqv_dim)
        dLdV_proj = dLdV_proj.reshape(n_ex, self.kqv_dim)

        # 调用投影层的反向传播函数
        dLdQ = self.projections["Q"].backward(dLdQ_proj)
        dLdK = self.projections["K"].backward(dLdK_proj)
        dLdV = self.projections["V"].backward(dLdV_proj)
        # 返回查询、键、值的梯度
        return dLdQ, dLdK, dLdV

    # 派生变量属性,存储前向/反向传播过程中计算的中间值
    @property
    def derived_variables(self):
        """A dictionary of intermediate values computed during the
        forward/backward passes."""
        dv = {
            "Q_proj": None,
            "K_proj": None,
            "V_proj": None,
            "components": {
                "Q": self.projections["Q"].derived_variables,
                "K": self.projections["K"].derived_variables,
                "V": self.projections["V"].derived_variables,
                "O": self.projections["O"].derived_variables,
                "attention": self.attention.derived_variables,
            },
        }
        # 更新派生变量字典
        dv.update(self._dv)
        return dv

    # 梯度属性,存储累积的模块参数梯度
    @property
    def gradients(self):
        """A dictionary of the accumulated module parameter gradients."""
        return {
            "components": {
                "Q": self.projections["Q"].gradients,
                "K": self.projections["K"].gradients,
                "V": self.projections["V"].gradients,
                "O": self.projections["O"].gradients,
                "attention": self.attention.gradients,
            }
        }

    @property
    # 返回模块参数的字典
    def parameters(self):
        """A dictionary of the module parameters."""
        return {
            "components": {
                "Q": self.projections["Q"].parameters,
                "K": self.projections["K"].parameters,
                "V": self.projections["V"].parameters,
                "O": self.projections["O"].parameters,
                "attention": self.attention.parameters,
            }
        }

    # 返回模块超参数的字典
    @property
    def hyperparameters(self):
        """A dictionary of the module hyperparameters."""
        return {
            "layer": "MultiHeadedAttentionModule",
            "init": self.init,
            "kqv_dim": self.kqv_dim,
            "latent_dim": self.latent_dim,
            "n_heads": self.n_heads,
            "dropout_p": self.dropout_p,
            "component_ids": ["attention", "Q", "K", "V", "O"],
            "components": {
                "Q": self.projections["Q"].hyperparameters,
                "K": self.projections["K"].hyperparameters,
                "V": self.projections["V"].hyperparameters,
                "O": self.projections["O"].hyperparameters,
                "attention": self.attention.hyperparameters,
            },
        }

Modules

The modules.py module implements common multi-layer blocks that appear across many modern deep networks. It includes:

  • Bidirectional LSTMs (Schuster & Paliwal, 1997)
  • ResNet-style “identity” (i.e., same-convolution) residual blocks (He et al., 2015)
  • ResNet-style “convolutional” (i.e., parametric) residual blocks (He et al., 2015)
  • WaveNet-style residual block with dilated causal convolutions (van den Oord et al., 2016)
  • Transformer-style multi-headed dot-product attention (Vaswani et al., 2017)

numpy-mlnumpy_mlneural_netsmodules__init__.py

代码语言:javascript复制
# 从当前目录下的 modules 模块中导入所有内容
from .modules import *

numpy-mlnumpy_mlneural_netsoptimizersoptimizers.py

代码语言:javascript复制
# 从 copy 模块中导入 deepcopy 函数
# 从 abc 模块中导入 ABC 和 abstractmethod 装饰器
import numpy as np
# 从 numpy.linalg 模块中导入 norm 函数

# 定义一个抽象基类 OptimizerBase,继承自 ABC 类
class OptimizerBase(ABC):
    # 初始化方法,接受学习率 lr 和调度器 scheduler 作为参数
    def __init__(self, lr, scheduler=None):
        """
        An abstract base class for all Optimizer objects.

        This should never be used directly.
        """
        # 从 ..initializers 模块中导入 SchedulerInitializer 类
        from ..initializers import SchedulerInitializer

        # 初始化缓存字典
        self.cache = {}
        # 初始化当前步数为 0
        self.cur_step = 0
        # 初始化超参数字典
        self.hyperparameters = {}
        # 使用 SchedulerInitializer 类创建学习率调度器对象
        self.lr_scheduler = SchedulerInitializer(scheduler, lr=lr)()

    # 定义 __call__ 方法,接受参数 param, param_grad, param_name, cur_loss,默认为 None
    def __call__(self, param, param_grad, param_name, cur_loss=None):
        return self.update(param, param_grad, param_name, cur_loss)

    # 定义 step 方法,用于将优化器步数加一
    def step(self):
        """Increment the optimizer step counter by 1"""
        self.cur_step  = 1

    # 定义 reset_step 方法,用于将步数重置为 0
    def reset_step(self):
        """Reset the step counter to zero"""
        self.cur_step = 0

    # 定义 copy 方法,返回优化器对象的深拷贝
    def copy(self):
        """Return a copy of the optimizer object"""
        return deepcopy(self)

    # 定义 set_params 方法,从字典中设置优化器对象的参数
    def set_params(self, hparam_dict=None, cache_dict=None):
        """Set the parameters of the optimizer object from a dictionary"""
        # 从 ..initializers 模块中导入 SchedulerInitializer 类
        from ..initializers import SchedulerInitializer

        # 如果传入了超参数字典
        if hparam_dict is not None:
            # 遍历超参数字典
            for k, v in hparam_dict.items():
                # 如果键在超参数字典中
                if k in self.hyperparameters:
                    # 更新超参数字典的值
                    self.hyperparameters[k] = v
                    # 如果键是 "lr_scheduler"
                    if k == "lr_scheduler":
                        # 使用 SchedulerInitializer 类创建学习率调度器对象
                        self.lr_scheduler = SchedulerInitializer(v, lr=None)()

        # 如果传入了缓存字典
        if cache_dict is not None:
            # 遍历缓存字典
            for k, v in cache_dict.items():
                # 如果键在缓存字典中
                if k in self.cache:
                    # 更新缓存字典的值
                    self.cache[k] = v

    # 定义抽象方法 update,用于更新参数
    @abstractmethod
    def update(self, param, param_grad, param_name, cur_loss=None):
        raise NotImplementedError


# 定义 SGD 类,继承自 OptimizerBase 类
class SGD(OptimizerBase):
    # 初始化方法,接受学习率 lr、动量 momentum、梯度裁剪 clip_norm、学习率调度器 lr_scheduler 和其他关键字参数 kwargs
    def __init__(
        self, lr=0.01, momentum=0.0, clip_norm=None, lr_scheduler=None, **kwargs
    ):
        """
        A stochastic gradient descent optimizer.

        Notes
        -----
        For model parameters :math:`\theta`, averaged parameter gradients
        :math:`\nabla_{\theta} mathcal{L}`, and learning rate :math:`eta`,
        the SGD update at timestep `t` is

        .. math::

            \text{update}^{(t)}
                &=  \text{momentum} cdot \text{update}^{(t-1)}   eta^{(t)} \nabla_{\theta} mathcal{L}\\
            \theta^{(t 1)}
                &leftarrow  \theta^{(t)} - \text{update}^{(t)}

        Parameters
        ----------
        lr : float
            Learning rate for SGD. If scheduler is not None, this is used as
            the starting learning rate. Default is 0.01.
        momentum : float in range [0, 1]
            The fraction of the previous update to add to the current update.
            If 0, no momentum is applied. Default is 0.
        clip_norm : float
            If not None, all param gradients are scaled to have maximum l2 norm of
            `clip_norm` before computing update. Default is None.
        lr_scheduler : str, :doc:`Scheduler <numpy_ml.neural_nets.schedulers>` object, or None
            The learning rate scheduler. If None, use a constant learning
            rate equal to `lr`. Default is None.
        """
        # 调用父类的构造函数,初始化学习率和学习率调度器
        super().__init__(lr, lr_scheduler)

        # 设置超参数字典
        self.hyperparameters = {
            "id": "SGD",
            "lr": lr,
            "momentum": momentum,
            "clip_norm": clip_norm,
            "lr_scheduler": str(self.lr_scheduler),
        }

    # 返回优化器的字符串表示
    def __str__(self):
        # 获取超参数字典
        H = self.hyperparameters
        lr, mm, cn, sc = H["lr"], H["momentum"], H["clip_norm"], H["lr_scheduler"]
        # 返回优化器的字符串表示
        return "SGD(lr={}, momentum={}, clip_norm={}, lr_scheduler={})".format(
            lr, mm, cn, sc
        )
    # 定义一个方法,用于计算给定参数的 SGD 更新
    def update(self, param, param_grad, param_name, cur_loss=None):
        """
        Compute the SGD update for a given parameter

        Parameters
        ----------
        param : :py:class:`ndarray <numpy.ndarray>` of shape (n, m)
            The value of the parameter to be updated.
        param_grad : :py:class:`ndarray <numpy.ndarray>` of shape (n, m)
            The gradient of the loss function with respect to `param_name`.
        param_name : str
            The name of the parameter.
        cur_loss : float
            The training or validation loss for the current minibatch. Used for
            learning rate scheduling e.g., by
            :class:`~numpy_ml.neural_nets.schedulers.KingScheduler`.
            Default is None.

        Returns
        -------
        updated_params : :py:class:`ndarray <numpy.ndarray>` of shape (n, m)
            The value of `param` after applying the momentum update.
        """
        # 获取缓存和超参数
        C = self.cache
        H = self.hyperparameters
        momentum, clip_norm = H["momentum"], H["clip_norm"]
        # 根据当前步数和当前损失计算学习率
        lr = self.lr_scheduler(self.cur_step, cur_loss)

        # 如果参数名不在缓存中,则初始化为全零数组
        if param_name not in C:
            C[param_name] = np.zeros_like(param_grad)

        # 缩放梯度以避免梯度爆炸
        t = np.inf if clip_norm is None else clip_norm
        if norm(param_grad) > t:
            param_grad = param_grad * t / norm(param_grad)

        # 计算更新值,包括动量和学习率
        update = momentum * C[param_name]   lr * param_grad
        # 更新缓存中的参数值
        self.cache[param_name] = update
        # 返回更新后的参数值
        return param - update
# 自适应梯度方法
# 定义 AdaGrad 类,继承自 OptimizerBase 类
class AdaGrad(OptimizerBase):
    # 初始化 AdaGrad 优化器
    def __init__(self, lr=0.01, eps=1e-7, clip_norm=None, lr_scheduler=None, **kwargs):
        """
        An AdaGrad optimizer.

        Notes
        -----
        Weights that receive large gradients will have their effective learning
        rate reduced, while weights that receive small or infrequent updates
        will have their effective learning rate increased.

        Equations::

            cache[t] = cache[t-1]   grad[t] ** 2
            update[t] = lr * grad[t] / (np.sqrt(cache[t])   eps)
            param[t 1] = param[t] - update[t]

        Note that the ``**`` and `/` operations are elementwise

        "A downside of Adagrad ... is that the monotonic learning rate usually
        proves too aggressive and stops learning too early." [1]

        References
        ----------
        .. [1] Karpathy, A. "CS231n: Convolutional neural networks for visual
           recognition" https://cs231n.github.io/neural-networks-3/

        Parameters
        ----------
        lr : float
            Global learning rate
        eps : float
            Smoothing term to avoid divide-by-zero errors in the update calc.
            Default is 1e-7.
        clip_norm : float or None
            If not None, all param gradients are scaled to have maximum `L2` norm of
            `clip_norm` before computing update. Default is None.
        lr_scheduler : str or :doc:`Scheduler <numpy_ml.neural_nets.schedulers>` object or None
            The learning rate scheduler. If None, use a constant learning
            rate equal to `lr`. Default is None.
        """
        # 调用父类的初始化方法,传入全局学习率 lr 和学习率调度器 lr_scheduler
        super().__init__(lr, lr_scheduler)

        # 初始化缓存字典
        self.cache = {}
        # 初始化超参数字典
        self.hyperparameters = {
            "id": "AdaGrad",
            "lr": lr,
            "eps": eps,
            "clip_norm": clip_norm,
            "lr_scheduler": str(self.lr_scheduler),
        }
    # 定义对象的字符串表示形式,包括超参数 lr, eps, clip_norm, lr_scheduler
    def __str__(self):
        H = self.hyperparameters
        lr, eps, cn, sc = H["lr"], H["eps"], H["clip_norm"], H["lr_scheduler"]
        return "AdaGrad(lr={}, eps={}, clip_norm={}, lr_scheduler={})".format(
            lr, eps, cn, sc
        )

    # 更新给定参数的 AdaGrad 更新
    def update(self, param, param_grad, param_name, cur_loss=None):
        """
        Compute the AdaGrad update for a given parameter.

        Notes
        -----
        Adjusts the learning rate of each weight based on the magnitudes of its
        gradients (big gradient -> small lr, small gradient -> big lr).

        Parameters
        ----------
        param : :py:class:`ndarray <numpy.ndarray>` of shape (n, m)
            The value of the parameter to be updated
        param_grad : :py:class:`ndarray <numpy.ndarray>` of shape (n, m)
            The gradient of the loss function with respect to `param_name`
        param_name : str
            The name of the parameter
        cur_loss : float or None
            The training or validation loss for the current minibatch. Used for
            learning rate scheduling e.g., by
            :class:`~numpy_ml.neural_nets.schedulers.KingScheduler`.
            Default is None.

        Returns
        -------
        updated_params : :py:class:`ndarray <numpy.ndarray>` of shape (n, m)
            The value of `param` after applying the AdaGrad update
        """
        C = self.cache
        H = self.hyperparameters
        eps, clip_norm = H["eps"], H["clip_norm"]
        lr = self.lr_scheduler(self.cur_step, cur_loss)

        # 如果参数名不在缓存中,则初始化为零数组
        if param_name not in C:
            C[param_name] = np.zeros_like(param_grad)

        # 缩放梯度以避免梯度爆炸
        t = np.inf if clip_norm is None else clip_norm
        if norm(param_grad) > t:
            param_grad = param_grad * t / norm(param_grad)

        # 更新缓存中的值
        C[param_name]  = param_grad ** 2
        # 计算更新值
        update = lr * param_grad / (np.sqrt(C[param_name])   eps)
        self.cache = C
        return param - update
class RMSProp(OptimizerBase):
    # RMSProp 优化器类,继承自 OptimizerBase 基类
    def __init__(
        self, lr=0.001, decay=0.9, eps=1e-7, clip_norm=None, lr_scheduler=None, **kwargs
    ):
        """
        RMSProp optimizer.

        Notes
        -----
        RMSProp was proposed as a refinement of :class:`AdaGrad` to reduce its
        aggressive, monotonically decreasing learning rate.

        RMSProp uses a *decaying average* of the previous squared gradients
        (second moment) rather than just the immediately preceding squared
        gradient for its `previous_update` value.

        Equations::

            cache[t] = decay * cache[t-1]   (1 - decay) * grad[t] ** 2
            update[t] = lr * grad[t] / (np.sqrt(cache[t])   eps)
            param[t 1] = param[t] - update[t]

        Note that the ``**`` and ``/`` operations are elementwise.

        Parameters
        ----------
        lr : float
            Learning rate for update. Default is 0.001.
        decay : float in [0, 1]
            Rate of decay for the moving average. Typical values are [0.9,
            0.99, 0.999]. Default is 0.9.
        eps : float
            Constant term to avoid divide-by-zero errors during the update calc. Default is 1e-7.
        clip_norm : float or None
            If not None, all param gradients are scaled to have maximum l2 norm of
            `clip_norm` before computing update. Default is None.
        lr_scheduler : str or :doc:`Scheduler <numpy_ml.neural_nets.schedulers>` object or None
            The learning rate scheduler. If None, use a constant learning
            rate equal to `lr`. Default is None.
        """
        # 调用父类的初始化方法,传入学习率和学习率调度器
        super().__init__(lr, lr_scheduler)

        # 初始化缓存字典
        self.cache = {}
        # 初始化超参数字典
        self.hyperparameters = {
            "id": "RMSProp",
            "lr": lr,
            "eps": eps,
            "decay": decay,
            "clip_norm": clip_norm,
            "lr_scheduler": str(self.lr_scheduler),
        }
    # 定义对象的字符串表示形式,包括超参数和学习率调度器信息
    def __str__(self):
        # 获取超参数字典和学习率调度器
        H = self.hyperparameters
        sc = H["lr_scheduler"]
        lr, eps, dc, cn = H["lr"], H["eps"], H["decay"], H["clip_norm"]
        # 返回对象的字符串表示形式
        return "RMSProp(lr={}, eps={}, decay={}, clip_norm={}, lr_scheduler={})".format(
            lr, eps, dc, cn, sc
        )

    # 更新给定参数的 RMSProp 更新
    def update(self, param, param_grad, param_name, cur_loss=None):
        """
        Compute the RMSProp update for a given parameter.

        Parameters
        ----------
        param : :py:class:`ndarray <numpy.ndarray>` of shape (n, m)
            The value of the parameter to be updated
        param_grad : :py:class:`ndarray <numpy.ndarray>` of shape (n, m)
            The gradient of the loss function with respect to `param_name`
        param_name : str
            The name of the parameter
        cur_loss : float or None
            The training or validation loss for the current minibatch. Used for
            learning rate scheduling e.g., by
            :class:`~numpy_ml.neural_nets.schedulers.KingScheduler`.
            Default is None.

        Returns
        -------
        updated_params : :py:class:`ndarray <numpy.ndarray>` of shape (n, m)
            The value of `param` after applying the RMSProp update.
        """
        # 获取缓存和超参数字典
        C = self.cache
        H = self.hyperparameters
        eps, decay, clip_norm = H["eps"], H["decay"], H["clip_norm"]
        # 根据当前步数和损失计算学习率
        lr = self.lr_scheduler(self.cur_step, cur_loss)

        # 如果参数名不在缓存中,则初始化为零数组
        if param_name not in C:
            C[param_name] = np.zeros_like(param_grad)

        # 缩放梯度以避免梯度爆炸
        t = np.inf if clip_norm is None else clip_norm
        if norm(param_grad) > t:
            param_grad = param_grad * t / norm(param_grad)

        # 计算 RMSProp 更新
        C[param_name] = decay * C[param_name]   (1 - decay) * param_grad ** 2
        update = lr * param_grad / (np.sqrt(C[param_name])   eps)
        self.cache = C
        # 返回更新后的参数值
        return param - update
# 定义 Adam 优化器类,继承自 OptimizerBase 类
class Adam(OptimizerBase):
    # 初始化 Adam 优化器对象
    def __init__(
        self,
        lr=0.001,  # 学习率,默认为 0.001
        decay1=0.9,  # 第一矩估计的衰减率,默认为 0.9
        decay2=0.999,  # 第二矩估计的衰减率,默认为 0.999
        eps=1e-7,  # 避免除零错误的常数项,默认为 1e-7
        clip_norm=None,  # 梯度裁剪的最大 l2 范数,默认为 None
        lr_scheduler=None,  # 学习率调度器,默认为 None
        **kwargs
    ):
        """
        Adam (adaptive moment estimation) optimization algorithm.

        Notes
        -----
        Designed to combine the advantages of :class:`AdaGrad`, which works
        well with sparse gradients, and :class:`RMSProp`, which works well in
        online and non-stationary settings.

        Parameters
        ----------
        lr : float
            Learning rate for update. This parameter is ignored if using
            :class:`~numpy_ml.neural_nets.schedulers.NoamScheduler`.
            Default is 0.001.
        decay1 : float
            The rate of decay to use for in running estimate of the first
            moment (mean) of the gradient. Default is 0.9.
        decay2 : float
            The rate of decay to use for in running estimate of the second
            moment (variance) of the gradient. Default is 0.999.
        eps : float
            Constant term to avoid divide-by-zero errors during the update
            calc. Default is 1e-7.
        clip_norm : float
            If not None, all param gradients are scaled to have maximum l2 norm of
            `clip_norm` before computing update. Default is None.
        lr_scheduler : str, or :doc:`Scheduler <numpy_ml.neural_nets.schedulers>` object, or None
            The learning rate scheduler. If None, use a constant learning rate
            equal to `lr`. Default is None.
        """
        # 调用父类的初始化方法
        super().__init__(lr, lr_scheduler)

        # 初始化缓存字典
        self.cache = {}
        # 初始化超参数字典
        self.hyperparameters = {
            "id": "Adam",
            "lr": lr,
            "eps": eps,
            "decay1": decay1,
            "decay2": decay2,
            "clip_norm": clip_norm,
            "lr_scheduler": str(self.lr_scheduler),
        }
    # 定义类的字符串表示方法,返回 Adam 优化器的超参数信息
    def __str__(self):
        # 获取超参数字典
        H = self.hyperparameters
        # 从超参数字典中获取 lr, decay1, decay2 的值
        lr, d1, d2 = H["lr"], H["decay1"], H["decay2"]
        # 从超参数字典中获取 eps, clip_norm, lr_scheduler 的值
        eps, cn, sc = H["eps"], H["clip_norm"], H["lr_scheduler"]
        # 返回格式化后的字符串,包含 lr, decay1, decay2, eps, clip_norm, lr_scheduler 的值
        return "Adam(lr={}, decay1={}, decay2={}, eps={}, clip_norm={}, lr_scheduler={})".format(
            lr, d1, d2, eps, cn, sc
        )

Optimizers

The optimizers.py module implements common modifications to stochastic gradient descent. It includes:

  • SGD with momentum (Rummelhart, Hinton, & Williams, 1986)
  • AdaGrad (Duchi, Hazan, & Singer, 2011)
  • RMSProp (Tieleman & Hinton, 2012)
  • Adam (Kingma & Ba, 2015)

numpy-mlnumpy_mlneural_netsoptimizers__init__.py

代码语言:javascript复制
# 从当前目录下的 optimizers 模块中导入所有内容
from .optimizers import *

Neural network models

This module implements building-blocks for larger neural network models in the Keras-style. This module does not implement a general autograd system in order emphasize conceptual understanding over flexibility.

  1. Activations. Common activation nonlinearities. Includes:
    • Rectified linear units (ReLU) (Hahnloser et al., 2000)
    • Leaky rectified linear units (Maas, Hannun, & Ng, 2013)
    • Exponential linear units (ELU) (Clevert, Unterthiner, & Hochreiter, 2016)
    • Scaled exponential linear units (Klambauer, Unterthiner, & Mayr, 2017)
    • Softplus units
    • Hard sigmoid units
    • Exponential units
    • Hyperbolic tangent (tanh)
    • Logistic sigmoid
    • Affine
  2. Losses. Common loss functions. Includes:
    • Squared error
    • Categorical cross entropy
    • VAE Bernoulli loss (Kingma & Welling, 2014)
    • Wasserstein loss with gradient penalty (Gulrajani et al., 2017)
    • Noise contrastive estimation (NCE) loss (Gutmann & Hyvärinen; Minh & Teh, 2012)
  3. Wrappers. Layer wrappers. Includes:
    • Dropout (Srivastava, et al., 2014)
  4. Layers. Common layers / layer-wise operations that can be composed to create larger neural networks. Includes:
    • Fully-connected
    • Sparse evolutionary (Mocanu et al., 2018)
    • Dot-product attention (Luong, Pho, & Manning, 2015; Vaswani et al., 2017)
    • 1D and 2D convolution (with stride, padding, and dilation) (van den Oord et al., 2016; Yu & Kolton, 2016)
    • 2D “deconvolution” (with stride and padding) (Zeiler et al., 2010)
    • Restricted Boltzmann machines (with CD-n training) (Smolensky, 1996; Carreira-Perpiñán & Hinton, 2005)
    • Elementwise multiplication
    • Embedding
    • Summation
    • Flattening
    • Softmax
    • Max & average pooling
    • 1D and 2D batch normalization (Ioffe & Szegedy, 2015)
    • 1D and 2D layer normalization (Ba, Kiros, & Hinton, 2016)
    • Recurrent (Elman, 1990)
    • Long short-term memory (LSTM) (Hochreiter & Schmidhuber, 1997)
  5. Optimizers. Common modifications to stochastic gradient descent. Includes:
    • SGD with momentum (Rummelhart, Hinton, & Williams, 1986)
    • AdaGrad (Duchi, Hazan, & Singer, 2011)
    • RMSProp (Tieleman & Hinton, 2012)
    • Adam (Kingma & Ba, 2015)
  6. Learning Rate Schedulers. Common learning rate decay schedules.
    • Constant
    • Exponential decay
    • Noam/Transformer scheduler (Vaswani et al., 2017)
    • King/Dlib scheduler (King, 2018)
  7. Initializers. Common weight initialization strategies.
    • Glorot/Xavier uniform and normal (Glorot & Bengio, 2010)
    • He/Kaiming uniform and normal (He et al., 2015)
    • Standard normal
    • Truncated normal
  8. Modules. Common multi-layer blocks that appear across many deep networks. Includes:
    • Bidirectional LSTMs (Schuster & Paliwal, 1997)
    • ResNet-style “identity” (i.e., same-convolution) residual blocks (He et al., 2015)
    • ResNet-style “convolutional” (i.e., parametric) residual blocks (He et al., 2015)
    • WaveNet-style residual block with dilated causal convolutions (van den Oord et al., 2016)
    • Transformer-style multi-headed dot-product attention (Vaswani et al., 2017)
  9. Models. Well-known network architectures. Includes:
    • vae.py: Bernoulli variational autoencoder (Kingma & Welling, 2014)
    • wgan_gp.py: Wasserstein generative adversarial network with gradient penalty (Gulrajani et al., 2017; Goodfellow et al., 2014)
    • w2v.py: word2vec model with CBOW and skip-gram architectures and training via noise contrastive estimation (Mikolov et al., 2012)
  10. Utils. Common helper functions, primarily for dealing with CNNs. Includes:
    • im2col
    • col2im
    • conv1D
    • conv2D
    • dilate
    • deconv2D
    • minibatch
    • Various weight initialization utilities
    • Various padding and convolution arithmetic utilities

Learning Rate Schedulers

The schedulers module implements several common strategies for learning rate decay:

  • Constant
  • Exponential decay
  • Noam/Transformer decay (Vaswani et al., 2017)
  • Davis King/Dlib decay (King, 2018)

Plots

numpy-mlnumpy_mlneural_netsschedulersschedulers.py

代码语言:javascript复制
from copy import deepcopy
from abc import ABC, abstractmethod

import numpy as np

from math import erf

# 定义一个函数,计算从具有均值`mean`和方差`var`的一维高斯分布中随机抽取的值小于或等于`x`的概率
def gaussian_cdf(x, mean, var):
    eps = np.finfo(float).eps
    x_scaled = (x - mean) / np.sqrt(var   eps)
    return (1   erf(x_scaled / np.sqrt(2))) / 2

# 定义一个抽象基类,用于所有调度器对象的基类
class SchedulerBase(ABC):
    def __init__(self):
        """Abstract base class for all Scheduler objects."""
        self.hyperparameters = {}

    def __call__(self, step=None, cur_loss=None):
        return self.learning_rate(step=step, cur_loss=cur_loss)

    def copy(self):
        """Return a copy of the current object."""
        return deepcopy(self)

    def set_params(self, hparam_dict):
        """Set the scheduler hyperparameters from a dictionary."""
        if hparam_dict is not None:
            for k, v in hparam_dict.items():
                if k in self.hyperparameters:
                    self.hyperparameters[k] = v

    @abstractmethod
    def learning_rate(self, step=None):
        raise NotImplementedError

# 定义一个常数调度器类,继承自SchedulerBase
class ConstantScheduler(SchedulerBase):
    def __init__(self, lr=0.01, **kwargs):
        """
        Returns a fixed learning rate, regardless of the current step.

        Parameters
        ----------
        initial_lr : float
            The learning rate. Default is 0.01
        """
        super().__init__()
        self.lr = lr
        self.hyperparameters = {"id": "ConstantScheduler", "lr": self.lr}

    def __str__(self):
        return "ConstantScheduler(lr={})".format(self.lr)

    def learning_rate(self, **kwargs):
        """
        Return the current learning rate.

        Returns
        -------
        lr : float
            The learning rate
        """
        return self.lr

# 定义一个指数调度器类,继承自SchedulerBase
class ExponentialScheduler(SchedulerBase:
    # 初始化指数学习率调度器
    def __init__(
        self, initial_lr=0.01, stage_length=500, staircase=False, decay=0.1, **kwargs
    ):
        """
        An exponential learning rate scheduler.

        Notes
        -----
        The exponential scheduler decays the learning rate by `decay` every
        `stage_length` steps, starting from `initial_lr`::

            learning_rate = initial_lr * decay ** curr_stage

        where::

            curr_stage = step / stage_length          if staircase = False
            curr_stage = floor(step / stage_length)   if staircase = True

        Parameters
        ----------
        initial_lr : float
            The learning rate at the first step. Default is 0.01.
        stage_length : int
            The length of each stage, in steps. Default is 500.
        staircase : bool
            If True, only adjusts the learning rate at the stage transitions,
            producing a step-like decay schedule. If False, adjusts the
            learning rate after each step, creating a smooth decay schedule.
            Default is False.
        decay : float
            The amount to decay the learning rate at each new stage. Default is
            0.1.
        """
        # 调用父类的初始化方法
        super().__init__()
        # 设置学习率衰减值
        self.decay = decay
        # 设置是否阶梯式调整学习率
        self.staircase = staircase
        # 设置初始学习率
        self.initial_lr = initial_lr
        # 设置每个阶段的长度
        self.stage_length = stage_length
        # 设置超参数字典
        self.hyperparameters = {
            "id": "StepScheduler",
            "decay": self.decay,
            "staircase": self.staircase,
            "initial_lr": self.initial_lr,
            "stage_length": self.stage_length,
        }

    # 返回调度器的字符串表示
    def __str__(self):
        return "ExponentialScheduler(initial_lr={}, stage_length={}, staircase={}, decay={})".format(
            self.initial_lr, self.stage_length, self.staircase, self.decay
        )
    # 定义一个方法,根据步数返回当前的学习率
    def learning_rate(self, step, **kwargs):
        """
        Return the current learning rate as a function of `step`.

        Parameters
        ----------
        step : int
            The current step number.

        Returns
        -------
        lr : float
            The learning rate for the current step.
        """
        # 计算当前阶段,即步数除以阶段长度
        cur_stage = step / self.stage_length
        # 如果采用阶梯式学习率衰减,则取当前阶段的下限值
        if self.staircase:
            cur_stage = np.floor(cur_stage)
        # 返回当前步数对应的学习率,根据初始学习率和衰减率计算
        return self.initial_lr * self.decay ** cur_stage
class NoamScheduler(SchedulerBase):
    def __init__(self, model_dim=512, scale_factor=1, warmup_steps=4000, **kwargs):
        """
        The Noam learning rate scheduler, originally used in conjunction with
        the Adam optimizer in [1].

        Notes
        -----
        The Noam scheduler increases the learning rate linearly for the first
        `warmup_steps` steps, and decreases it thereafter proportionally to the
        inverse square root of the step number::

            lr = scale_factor * ( (model_dim ** (-0.5)) * adj_step )
            adj_step = min(step_num ** (-0.5), step_num * warmup_steps ** (-1.5))

        References
        ----------
        .. [1] Vaswani et al. (2017) "Attention is all you need". *31st
           Conference on Neural Information Processing Systems*,
           https://arxiv.org/pdf/1706.03762.pdf

        Parameters
        ----------
        model_dim : int
            The number of units in the layer output. Default is 512.
        scale_factor : float
            A fixed coefficient for rescaling the final learning rate. Default
            is 1.
        warmup_steps : int
            The number of steps in the warmup stage of training. Default is
            4000.
        """
        # 调用父类的构造函数
        super().__init__()
        # 初始化 NoamScheduler 的属性
        self.model_dim = model_dim
        self.scale_factor = scale_factor
        self.warmup_steps = warmup_steps
        self.hyperparameters = {
            "id": "NoamScheduler",
            "model_dim": self.model_dim,
            "scale_factor": self.scale_factor,
            "warmup_steps": self.warmup_steps,
        }

    def __str__(self):
        # 返回 NoamScheduler 对象的字符串表示形式
        return "NoamScheduler(model_dim={}, scale_factor={}, warmup_steps={})".format(
            self.model_dim, self.scale_factor, self.warmup_steps
        )
    # 定义学习率函数,根据当前步数和额外参数计算学习率
    def learning_rate(self, step, **kwargs):
        # 获取预热步数和模型维度
        warmup, d_model = self.warmup_steps, self.model_dim
        # 根据论文提出的公式计算新的学习率
        new_lr = d_model ** (-0.5) * min(step ** (-0.5), step * warmup ** (-1.5))
        # 返回经过缩放因子调整后的新学习率
        return self.scale_factor * new_lr
# 定义 KingScheduler 类,继承自 SchedulerBase 类
class KingScheduler(SchedulerBase):
    # 初始化方法,设置学习率初始值、耐心值、衰减率等参数
    def __init__(self, initial_lr=0.01, patience=1000, decay=0.99, **kwargs):
        """
        The Davis King / DLib learning rate scheduler.

        Notes
        -----
        The KingScheduler computes the probability that the slope of the OLS
        fit to the loss history is negative. If the probability that it is
        negative is less than 51% over the last `patience` steps, the scheduler
        exponentially decreases the current learning rate by `decay`.

        References
        ----------
        .. [1] King, D. (2018). "Automatic learning rate scheduling that really
           works". http://blog.dlib.net/2018/02/automatic-learning-rate-scheduling-that.html

        Parameters
        ----------
        initial_lr : float
            The learning rate to begin at. Default is 0.01.
        patience : int
            Amount of time to maintain the current learning rate without a
            decrease in loss before adjustment. Default is 1000.
        decay : float
            The amount to decay the learning rate at each new stage. Default is
            0.99.
        """
        # 调用父类的初始化方法
        super().__init__()
        # 设置衰减率、耐心值、学习率初始值等属性
        self.decay = decay
        self.patience = patience
        self.initial_lr = initial_lr
        self.current_lr = initial_lr
        # 计算历史记录的最大长度
        self.max_history = np.ceil(1.1 * (patience   1)).astype(int)

        # 初始化损失历史记录和超参数字典
        self.loss_history = []
        self.hyperparameters = {
            "id": "KingScheduler",
            "decay": self.decay,
            "patience": self.patience,
            "initial_lr": self.initial_lr,
        }

    # 定义 __str__ 方法,返回 KingScheduler 对象的字符串表示
    def __str__(self):
        return "KingScheduler(initial_lr={}, patience={}, decay={})".format(
            self.initial_lr, self.patience, self.decay
        )
    # 返回最大的时间步数,其中`P(loss is decreasing) < 0.51`
    def _steps_without_decrease(self, robust=False, check_all=False):
        """
        Returns the maximum number of timesteps for which `P(loss is decreasing)
        < 0.51`.

        Parameters
        ----------
        robust : bool
            If `robust=True`, first filter out the largest 10% of the loss
            values to remove transient spikes in the loss due to, e.g., a few
            bad minibatches. Default is False.
        check_all : bool
            If False, returns the maximum number of timesteps for which P(loss
            is decreasing) < 0.51. If True, only checks whether the number of
            timesteps for which P(loss is decreasing) < 0.51 is equal to
            ``self.patience``. The former provides more information but is
            significantly more computationally expensive.  Default is False.

        Returns
        -------
        steps_without_decrease: int
            The maximum number of steps back in loss_history for which P(loss
            is decreasing) < 0.51.
        """
        # 将损失历史转换为 NumPy 数组
        lh = np.array(self.loss_history)

        # 如果 robust 为 True,则过滤掉损失值中最大的 10%,以消除由于一些坏的小批次导致的损失暂时性波动
        if robust:
            thresh = np.quantile(lh, 0.9)
            lh = np.array([i for i in lh if i <= thresh])

        # 获取损失历史的长度
        N = len(lh)
        steps_without_decrease = 0
        # 如果 check_all 为 True,则遍历损失历史,找到 P(loss is decreasing) < 0.51 的最大时间步数
        if check_all:
            for i in reversed(range(N - 2)):
                if self._p_decreasing(lh, i) < 0.51:
                    steps_without_decrease = N - i
        # 如果 check_all 为 False,则只检查 P(loss is decreasing) < 0.51 的时间步数是否等于 self.patience
        else:
            i = max(0, N - self.patience - 1)
            if self._p_decreasing(lh, i) < 0.51:
                steps_without_decrease = N - i
        return steps_without_decrease
    def _p_decreasing(self, loss_history, i):
        """
        Compute the probability that the slope of the OLS fit to the loss
        history is negative.

        Parameters
        ----------
        loss_history : numpy array of shape (N,)
            The sequence of loss values for the previous `N` minibatches.
        i : int
            Compute P(Slope < 0) beginning at index i in `history`.

        Returns
        ------
        p_decreasing : float
            The probability that the slope of the OLS fit to loss_history is
            less than or equal to 0.
        """
        # 从索引 i 开始截取 loss_history,得到截取后的 loss 数组
        loss = loss_history[i:]
        # 获取截取后的 loss 数组的长度
        N = len(loss)

        # 对 loss 数组执行最小二乘法(OLS),计算斜率均值
        X = np.c_[np.ones(N), np.arange(i, len(loss_history))]
        intercept, s_mean = np.linalg.inv(X.T @ X) @ X.T @ loss
        # 计算预测的 loss 值
        loss_pred = s_mean * X[:, 1]   intercept

        # 计算我们的 loss 预测的方差,并用此计算(无偏)斜率方差的估计值
        loss_var = 1 / (N - 2) * np.sum((loss - loss_pred) ** 2)
        s_var = (12 * loss_var) / (N ** 3 - N)

        # 计算从以 s_mean 和 s_var 为参数的高斯分布中随机抽取的样本小于或等于 0 的概率
        p_decreasing = gaussian_cdf(0, s_mean, s_var)
        return p_decreasing
    # 计算当前步数和损失值对应的更新后学习率

    def learning_rate(self, step, cur_loss):
        """
        Compute the updated learning rate for the current step and loss.

        Parameters
        ----------
        step : int
            The current step number. Unused.
            当前步数,未使用
        cur_loss : float
            The loss at the current step.
            当前步数的损失值

        Returns
        -------
        lr : float
            The learning rate for the current step.
            当前步数的学习率
        """
        # 如果当前损失值为空,则抛出数值错误
        if cur_loss is None:
            raise ValueError("cur_loss must be a float, but got {}".format(cur_loss))

        # 如果没有属性"max_history",则初始化为1.1倍的(patience   1)的向上取整值
        if not hasattr(self, "max_history"):
            self.max_history = np.ceil(1.1 * (self.patience   1)).astype(int)
        patience, max_history = self.patience, self.max_history

        # 将当前损失值添加到损失历史记录中
        self.loss_history.append(cur_loss)
        # 如果损失历史记录长度小于patience,则返回当前学习率
        if len(self.loss_history) < patience:
            return self.current_lr
        # 保留最近的max_history个损失值
        self.loss_history = self.loss_history[-max_history:]

        # 如果损失值连续patience步没有减小,则降低学习率
        if (
            self._steps_without_decrease() > patience
            and self._steps_without_decrease(robust=True) > patience
        ):
            self.current_lr *= self.decay

        return self.current_lr

numpy-mlnumpy_mlneural_netsschedulers__init__.py

代码语言:javascript复制
# 从当前目录下的schedulers模块中导入所有内容
from .schedulers import *

Utilities

The utils.py module implements common, neural network-specific helper functions, primarily for dealing with CNNs. It includes:

  • im2col
  • col2im
  • conv1D
  • conv2D
  • dilate
  • deconv2D
  • minibatch
  • Various weight initialization utilities
  • Various padding and convolution arithmetic utilities

numpy-mlnumpy_mlneural_netsutilsutils.py

代码语言:javascript复制
import numpy as np

#######################################################################
#                           Training Utils                            #
#######################################################################

# 定义一个函数用于生成训练数据的小批量索引
def minibatch(X, batchsize=256, shuffle=True):
    """
    Compute the minibatch indices for a training dataset.

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, *)`
        The dataset to divide into minibatches. Assumes the first dimension
        represents the number of training examples.
    batchsize : int
        The desired size of each minibatch. Note, however, that if ``X.shape[0] %
        batchsize > 0`` then the final batch will contain fewer than batchsize
        entries. Default is 256.
    shuffle : bool
        Whether to shuffle the entries in the dataset before dividing into
        minibatches. Default is True.

    Returns
    -------
    mb_generator : generator
        A generator which yields the indices into X for each batch
    n_batches: int
        The number of batches
    """
    N = X.shape[0]
    ix = np.arange(N)
    n_batches = int(np.ceil(N / batchsize))

    if shuffle:
        np.random.shuffle(ix)

    def mb_generator():
        for i in range(n_batches):
            yield ix[i * batchsize : (i   1) * batchsize]

    return mb_generator(), n_batches


#######################################################################
#                            Padding Utils                            #
#######################################################################

# 计算二维卷积时的填充维度
def calc_pad_dims_2D(X_shape, out_dim, kernel_shape, stride, dilation=0):
    """
    Compute the padding necessary to ensure that convolving `X` with a 2D kernel
    of shape `kernel_shape` and stride `stride` produces outputs with dimension
    `out_dim`.

    Parameters
    ----------
    X_shape : tuple of `(n_ex, in_rows, in_cols, in_ch)`
        输入体积的维度。对 `in_rows` 和 `in_cols` 进行填充。
    out_dim : tuple of `(out_rows, out_cols)`
        应用卷积后输出示例的期望维度。
    kernel_shape : 2-tuple
        2D 卷积核的维度。
    stride : int
        卷积核的步幅。
    dilation : int
        卷积核元素之间插入的像素数。默认为 0。

    Returns
    -------
    padding_dims : 4-tuple
        `X` 的填充维度。组织形式为 (左,右,上,下)
    """
    if not isinstance(X_shape, tuple):
        raise ValueError("`X_shape` must be of type tuple")

    if not isinstance(out_dim, tuple):
        raise ValueError("`out_dim` must be of type tuple")

    if not isinstance(kernel_shape, tuple):
        raise ValueError("`kernel_shape` must be of type tuple")

    if not isinstance(stride, int):
        raise ValueError("`stride` must be of type int")

    d = dilation
    fr, fc = kernel_shape
    out_rows, out_cols = out_dim
    n_ex, in_rows, in_cols, in_ch = X_shape

    # 根据膨胀因子更新有效滤波器形状
    _fr, _fc = fr * (d   1) - d, fc * (d   1) - d

    pr = int((stride * (out_rows - 1)   _fr - in_rows) / 2)
    pc = int((stride * (out_cols - 1)   _fc - in_cols) / 2)

    out_rows1 = int(1   (in_rows   2 * pr - _fr) / stride)
    out_cols1 = int(1   (in_cols   2 * pc - _fc) / stride)

    # 向右/向下添加不对称填充像素
    pr1, pr2 = pr, pr
    if out_rows1 == out_rows - 1:
        pr1, pr2 = pr, pr   1
    elif out_rows1 != out_rows:
        raise AssertionError

    pc1, pc2 = pc, pc
    if out_cols1 == out_cols - 1:
        pc1, pc2 = pc, pc   1
    elif out_cols1 != out_cols:
        raise AssertionError
    # 检查是否有任何一个元素小于0,如果是则抛出数值错误异常
    if any(np.array([pr1, pr2, pc1, pc2]) < 0):
        raise ValueError(
            "Padding cannot be less than 0. Got: {}".format((pr1, pr2, pc1, pc2))
        )
    # 返回元组 (pr1, pr2, pc1, pc2)
    return (pr1, pr2, pc1, pc2)
def calc_pad_dims_1D(X_shape, l_out, kernel_width, stride, dilation=0, causal=False):
    """
    Compute the padding necessary to ensure that convolving `X` with a 1D kernel
    of shape `kernel_shape` and stride `stride` produces outputs with length
    `l_out`.

    Parameters
    ----------
    X_shape : tuple of `(n_ex, l_in, in_ch)`
        Dimensions of the input volume. Padding is applied on either side of
        `l_in`.
    l_out : int
        The desired length an output example after applying the convolution.
    kernel_width : int
        The width of the 1D convolution kernel.
    stride : int
        The stride for the convolution kernel.
    dilation : int
        Number of pixels inserted between kernel elements. Default is 0.
    causal : bool
        Whether to compute the padding dims for a regular or causal
        convolution. If causal, padding is added only to the left side of the
        sequence. Default is False.

    Returns
    -------
    padding_dims : 2-tuple
        Padding dims for X. Organized as (left, right)
    """
    if not isinstance(X_shape, tuple):
        raise ValueError("`X_shape` must be of type tuple")

    if not isinstance(l_out, int):
        raise ValueError("`l_out` must be of type int")

    if not isinstance(kernel_width, int):
        raise ValueError("`kernel_width` must be of type int")

    if not isinstance(stride, int):
        raise ValueError("`stride` must be of type int")

    d = dilation
    fw = kernel_width
    n_ex, l_in, in_ch = X_shape

    # update effective filter shape based on dilation factor
    _fw = fw * (d   1) - d
    total_pad = int((stride * (l_out - 1)   _fw - l_in))

    if not causal:
        pw = total_pad // 2
        l_out1 = int(1   (l_in   2 * pw - _fw) / stride)

        # add asymmetric padding pixels to right / bottom
        pw1, pw2 = pw, pw
        if l_out1 == l_out - 1:
            pw1, pw2 = pw, pw   1
        elif l_out1 != l_out:
            raise AssertionError
    # 如果是因果卷积,只在序列的左侧填充
    if causal:
        pw1, pw2 = total_pad, 0
        # 计算输出序列的长度
        l_out1 = int(1   (l_in   total_pad - _fw) / stride)
        # 断言输出序列的长度与给定的长度相同
        assert l_out1 == l_out

    # 如果填充值中有任何一个小于0,抛出数值错误异常
    if any(np.array([pw1, pw2]) < 0):
        raise ValueError("Padding cannot be less than 0. Got: {}".format((pw1, pw2)))
    # 返回填充值元组
    return (pw1, pw2)
def pad1D(X, pad, kernel_width=None, stride=None, dilation=0):
    """
    Zero-pad a 3D input volume `X` along the second dimension.

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, l_in, in_ch)`
        Input volume. Padding is applied to `l_in`.
    pad : tuple, int, or {'same', 'causal'}
        The padding amount. If 'same', add padding to ensure that the output
        length of a 1D convolution with a kernel of `kernel_shape` and stride
        `stride` is the same as the input length.  If 'causal' compute padding
        such that the output both has the same length as the input AND
        ``output[t]`` does not depend on ``input[t   1:]``. If 2-tuple,
        specifies the number of padding columns to add on each side of the
        sequence.
    kernel_width : int
        The dimension of the 2D convolution kernel. Only relevant if p='same'
        or 'causal'. Default is None.
    stride : int
        The stride for the convolution kernel. Only relevant if p='same' or
        'causal'. Default is None.
    dilation : int
        The dilation of the convolution kernel. Only relevant if p='same' or
        'causal'. Default is None.

    Returns
    -------
    X_pad : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, padded_seq, in_channels)`
        The padded output volume
    p : 2-tuple
        The number of 0-padded columns added to the (left, right) of the sequences
        in `X`.
    """
    # 将 pad 参数赋值给 p
    p = pad
    # 如果 p 是整数,则转换为元组
    if isinstance(p, int):
        p = (p, p)

    # 如果 p 是元组
    if isinstance(p, tuple):
        # 对输入 X 进行零填充,沿第二维度填充
        X_pad = np.pad(
            X,
            pad_width=((0, 0), (p[0], p[1]), (0, 0)),
            mode="constant",
            constant_values=0,
        )

    # 计算 'same' 或 'causal' 卷积的正确填充维度
    # 如果填充方式为"same"或"causal",并且存在卷积核宽度和步长
    if p in ["same", "causal"] and kernel_width and stride:
        # 判断是否为因果卷积
        causal = p == "causal"
        # 计算填充维度
        p = calc_pad_dims_1D(
            X.shape, X.shape[1], kernel_width, stride, causal=causal, dilation=dilation
        )
        # 对输入数据进行一维填充
        X_pad, p = pad1D(X, p)

    # 返回填充后的数据和填充维度
    return X_pad, p
# 在二维输入体积 `X` 的第二和第三维度上进行零填充

def pad2D(X, pad, kernel_shape=None, stride=None, dilation=0):
    """
    Zero-pad a 4D input volume `X` along the second and third dimensions.

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, in_rows, in_cols, in_ch)`
        Input volume. Padding is applied to `in_rows` and `in_cols`.
    pad : tuple, int, or 'same'
        The padding amount. If 'same', add padding to ensure that the output of
        a 2D convolution with a kernel of `kernel_shape` and stride `stride`
        has the same dimensions as the input.  If 2-tuple, specifies the number
        of padding rows and colums to add *on both sides* of the rows/columns
        in `X`. If 4-tuple, specifies the number of rows/columns to add to the
        top, bottom, left, and right of the input volume.
    kernel_shape : 2-tuple
        The dimension of the 2D convolution kernel. Only relevant if p='same'.
        Default is None.
    stride : int
        The stride for the convolution kernel. Only relevant if p='same'.
        Default is None.
    dilation : int
        The dilation of the convolution kernel. Only relevant if p='same'.
        Default is 0.

    Returns
    -------
    X_pad : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, padded_in_rows, padded_in_cols, in_channels)`
        The padded output volume.
    p : 4-tuple
        The number of 0-padded rows added to the (top, bottom, left, right) of
        `X`.
    """
    p = pad
    # 如果 `p` 是整数,则转换为四元组
    if isinstance(p, int):
        p = (p, p, p, p)

    # 如果 `p` 是元组
    if isinstance(p, tuple):
        # 如果元组长度为2,则扩展为四元组
        if len(p) == 2:
            p = (p[0], p[0], p[1], p[1])

        # 对输入体积 `X` 进行零填充
        X_pad = np.pad(
            X,
            pad_width=((0, 0), (p[0], p[1]), (p[2], p[3]), (0, 0)),
            mode="constant",
            constant_values=0,
        )

    # 计算 'same' 卷积的正确填充维度
    # 如果填充方式为"same"且卷积核形状和步长都不为空
    if p == "same" and kernel_shape and stride is not None:
        # 计算二维卷积的填充维度
        p = calc_pad_dims_2D(
            X.shape, X.shape[1:3], kernel_shape, stride, dilation=dilation
        )
        # 对输入数据进行二维填充
        X_pad, p = pad2D(X, p)
    # 返回填充后的输入数据和填充维度
    return X_pad, p
def dilate(X, d):
    """
    Dilate the 4D volume `X` by `d`.

    Notes
    -----
    For a visual depiction of a dilated convolution, see [1].

    References
    ----------
    .. [1] Dumoulin & Visin (2016). "A guide to convolution arithmetic for deep
       learning." https://arxiv.org/pdf/1603.07285v1.pdf

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, in_rows, in_cols, in_ch)`
        Input volume.
    d : int
        The number of 0-rows to insert between each adjacent row   column in `X`.

    Returns
    -------
    Xd : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, out_rows, out_cols, out_ch)`
        The dilated array where

        .. math::

            \text{out_rows}  &=  \text{in_rows}   d(\text{in_rows} - 1) \\
            \text{out_cols}  &=  \text{in_cols}   d (\text{in_cols} - 1)
    """
    # 获取输入体积的形状信息
    n_ex, in_rows, in_cols, n_in = X.shape
    # 生成行索引,重复插入0的行数
    r_ix = np.repeat(np.arange(1, in_rows), d)
    # 生成列索引,重复插入0的列数
    c_ix = np.repeat(np.arange(1, in_cols), d)
    # 在行方向插入0行
    Xd = np.insert(X, r_ix, 0, axis=1)
    # 在列方向插入0列
    Xd = np.insert(Xd, c_ix, 0, axis=2)
    # 返回扩张后的数组
    return Xd


#######################################################################
#                     Convolution Arithmetic                          #
#######################################################################


def calc_fan(weight_shape):
    """
    Compute the fan-in and fan-out for a weight matrix/volume.

    Parameters
    ----------
    weight_shape : tuple
        The dimensions of the weight matrix/volume. The final 2 entries must be
        `in_ch`, `out_ch`.

    Returns
    -------
    fan_in : int
        The number of input units in the weight tensor
    fan_out : int
        The number of output units in the weight tensor
    """
    # 如果权重矩阵维度为2
    if len(weight_shape) == 2:
        fan_in, fan_out = weight_shape
    # 如果权重形状的长度为3或4,则进行以下操作
    elif len(weight_shape) in [3, 4]:
        # 获取输入通道数和输出通道数
        in_ch, out_ch = weight_shape[-2:]
        # 计算卷积核大小
        kernel_size = np.prod(weight_shape[:-2])
        # 计算输入和输出的神经元数量
        fan_in, fan_out = in_ch * kernel_size, out_ch * kernel_size
    # 如果权重形状的长度不是3或4,则引发值错误异常
    else:
        raise ValueError("Unrecognized weight dimension: {}".format(weight_shape))
    # 返回输入神经元数量和输出神经元数量
    return fan_in, fan_out
# 计算给定卷积的输出体积的维度

def calc_conv_out_dims(X_shape, W_shape, stride=1, pad=0, dilation=0):
    """
    Compute the dimension of the output volume for the specified convolution.

    Parameters
    ----------
    X_shape : 3-tuple or 4-tuple
        The dimensions of the input volume to the convolution. If 3-tuple,
        entries are expected to be (`n_ex`, `in_length`, `in_ch`). If 4-tuple,
        entries are expected to be (`n_ex`, `in_rows`, `in_cols`, `in_ch`).
    weight_shape : 3-tuple or 4-tuple
        The dimensions of the weight volume for the convolution. If 3-tuple,
        entries are expected to be (`f_len`, `in_ch`, `out_ch`). If 4-tuple,
        entries are expected to be (`fr`, `fc`, `in_ch`, `out_ch`).
    pad : tuple, int, or {'same', 'causal'}
        The padding amount. If 'same', add padding to ensure that the output
        length of a 1D convolution with a kernel of `kernel_shape` and stride
        `stride` is the same as the input length.  If 'causal' compute padding
        such that the output both has the same length as the input AND
        ``output[t]`` does not depend on ``input[t   1:]``. If 2-tuple, specifies the
        number of padding columns to add on each side of the sequence. Default
        is 0.
    stride : int
        The stride for the convolution kernel. Default is 1.
    dilation : int
        The dilation of the convolution kernel. Default is 0.

    Returns
    -------
    out_dims : 3-tuple or 4-tuple
        The dimensions of the output volume. If 3-tuple, entries are (`n_ex`,
        `out_length`, `out_ch`). If 4-tuple, entries are (`n_ex`, `out_rows`,
        `out_cols`, `out_ch`).
    """
    
    # 创建一个与输入形状相同的零矩阵,用于计算卷积输出维度
    dummy = np.zeros(X_shape)
    
    # 将输入参数中的stride、pad、dilation分别赋值给s、p、d
    s, p, d = stride, pad, dilation
    # 如果输入数据的维度为3
    if len(X_shape) == 3:
        # 对输入数据进行一维填充
        _, p = pad1D(dummy, p)
        # 获取填充后的两个维度值
        pw1, pw2 = p
        # 获取权重矩阵的形状信息
        fw, in_ch, out_ch = W_shape
        # 获取输入数据的批量大小、长度和通道数
        n_ex, in_length, in_ch = X_shape

        # 调整有效滤波器大小以考虑膨胀
        _fw = fw * (d   1) - d
        # 计算输出长度
        out_length = (in_length   pw1   pw2 - _fw) // s   1
        # 定义输出维度
        out_dims = (n_ex, out_length, out_ch)

    # 如果输入数据的维度为4
    elif len(X_shape) == 4:
        # 对输入数据进行二维填充
        _, p = pad2D(dummy, p)
        # 获取填充后的四个维度值
        pr1, pr2, pc1, pc2 = p
        # 获取权重矩阵的形状信息
        fr, fc, in_ch, out_ch = W_shape
        # 获取输入数据的批量大小、行数、列数和通道数
        n_ex, in_rows, in_cols, in_ch = X_shape

        # 调整有效滤波器大小以考虑膨胀
        _fr, _fc = fr * (d   1) - d, fc * (d   1) - d
        # 计算输出行数和列数
        out_rows = (in_rows   pr1   pr2 - _fr) // s   1
        out_cols = (in_cols   pc1   pc2 - _fc) // s   1
        # 定义输出维度
        out_dims = (n_ex, out_rows, out_cols, out_ch)
    else:
        # 抛出异常,表示无法识别的输入维度数量
        raise ValueError("Unrecognized number of input dims: {}".format(len(X_shape)))
    # 返回输出维度
    return out_dims
# 定义一个函数,用于计算在 im2col 函数中列向量化之前的 X 矩阵的索引
def _im2col_indices(X_shape, fr, fc, p, s, d=0):
    """
    Helper function that computes indices into X in prep for columnization in
    :func:`im2col`.

    Code extended from Andrej Karpathy's `im2col.py`
    """
    pr1, pr2, pc1, pc2 = p
    n_ex, n_in, in_rows, in_cols = X_shape

    # 调整有效滤波器大小以考虑膨胀
    _fr, _fc = fr * (d   1) - d, fc * (d   1) - d

    out_rows = (in_rows   pr1   pr2 - _fr) // s   1
    out_cols = (in_cols   pc1   pc2 - _fc) // s   1

    if any([out_rows <= 0, out_cols <= 0]):
        raise ValueError(
            "Dimension mismatch during convolution: "
            "out_rows = {}, out_cols = {}".format(out_rows, out_cols)
        )

    # i1/j1 : row/col templates
    # i0/j0 : n. copies (len) and offsets (values) for row/col templates
    i0 = np.repeat(np.arange(fr), fc)
    i0 = np.tile(i0, n_in) * (d   1)
    i1 = s * np.repeat(np.arange(out_rows), out_cols)
    j0 = np.tile(np.arange(fc), fr * n_in) * (d   1)
    j1 = s * np.tile(np.arange(out_cols), out_rows)

    # i.shape = (fr * fc * n_in, out_height * out_width)
    # j.shape = (fr * fc * n_in, out_height * out_width)
    # k.shape = (fr * fc * n_in, 1)
    i = i0.reshape(-1, 1)   i1.reshape(1, -1)
    j = j0.reshape(-1, 1)   j1.reshape(1, -1)
    k = np.repeat(np.arange(n_in), fr * fc).reshape(-1, 1)
    return k, i, j
    # 输入体积(未填充)的 numpy 数组,形状为 `(n_ex, in_rows, in_cols, in_ch)`
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, in_rows, in_cols, in_ch)`
        Input volume (not padded).
    # 包含 `(kernel_rows, kernel_cols, in_ch, out_ch)` 的 4 元组
    W_shape: 4-tuple containing `(kernel_rows, kernel_cols, in_ch, out_ch)`
        The dimensions of the weights/kernels in the present convolutional
        layer.
    # 填充量。如果为 'same',则添加填充以确保使用 `kernel_shape` 和 `stride` 进行 2D 卷积的输出体积与输入体积具有相同的维度。
    # 如果为 2 元组,则指定要在 X 的行和列的两侧添加的填充行数和列数。如果为 4 元组,则指定要添加到输入体积的顶部、底部、左侧和右侧的行数/列数。
    pad : tuple, int, or 'same'
        The padding amount. If 'same', add padding to ensure that the output of
        a 2D convolution with a kernel of `kernel_shape` and stride `stride`
        produces an output volume of the same dimensions as the input.  If
        2-tuple, specifies the number of padding rows and colums to add *on both
        sides* of the rows/columns in X. If 4-tuple, specifies the number of
        rows/columns to add to the top, bottom, left, and right of the input
        volume.
    # 每个卷积核的步幅
    stride : int
        The stride of each convolution kernel
    # 插入在卷积核元素之间的像素数。默认为 0。
    dilation : int
        Number of pixels inserted between kernel elements. Default is 0.

    Returns
    -------
    # 重塑后的输入体积,其中:

    # Q = kernel_rows * kernel_cols * n_in
    # Z = n_ex * out_rows * out_cols
    X_col : :py:class:`ndarray <numpy.ndarray>` of shape (Q, Z)
        The reshaped input volume where where:

        .. math::

            Q  &=  \text{kernel_rows} \times \text{kernel_cols} \times \text{n_in} \\
            Z  &=  \text{n_ex} \times \text{out_rows} \times \text{out_cols}
    """
    # 解包 W_shape
    fr, fc, n_in, n_out = W_shape
    # 解包步幅、填充、扩张
    s, p, d = stride, pad, dilation
    # 解包 X 的形状
    n_ex, in_rows, in_cols, n_in = X.shape

    # 对输入进行零填充
    X_pad, p = pad2D(X, p, W_shape[:2], stride=s, dilation=d)
    pr1, pr2, pc1, pc2 = p

    # 重新排列以使通道成为第一维
    X_pad = X_pad.transpose(0, 3, 1, 2)

    # 获取 im2col 的索引
    k, i, j = _im2col_indices((n_ex, n_in, in_rows, in_cols), fr, fc, p, s, d)

    X_col = X_pad[:, k, i, j]
    X_col = X_col.transpose(1, 2, 0).reshape(fr * fc * n_in, -1)
    return X_col, p
def col2im(X_col, X_shape, W_shape, pad, stride, dilation=0):
    """
    Take columns of a 2D matrix and rearrange them into the blocks/windows of
    a 4D image volume.

    Notes
    -----
    A NumPy reimagining of MATLAB's ``col2im`` 'sliding' function.

    Code extended from Andrej Karpathy's ``im2col.py``.

    Parameters
    ----------
    X_col : :py:class:`ndarray <numpy.ndarray>` of shape `(Q, Z)`
        The columnized version of `X` (assumed to include padding)
    X_shape : 4-tuple containing `(n_ex, in_rows, in_cols, in_ch)`
        The original dimensions of `X` (not including padding)
    W_shape: 4-tuple containing `(kernel_rows, kernel_cols, in_ch, out_ch)`
        The dimensions of the weights in the present convolutional layer
    pad : 4-tuple of `(left, right, up, down)`
        Number of zero-padding rows/cols to add to `X`
    stride : int
        The stride of each convolution kernel
    dilation : int
        Number of pixels inserted between kernel elements. Default is 0.

    Returns
    -------
    img : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, in_rows, in_cols, in_ch)`
        The reshaped `X_col` input matrix
    """
    # 检查 pad 是否为 4 元组
    if not (isinstance(pad, tuple) and len(pad) == 4):
        raise TypeError("pad must be a 4-tuple, but got: {}".format(pad))

    # 获取 stride 和 dilation
    s, d = stride, dilation
    pr1, pr2, pc1, pc2 = pad
    fr, fc, n_in, n_out = W_shape
    n_ex, in_rows, in_cols, n_in = X_shape

    # 创建一个填充后的 X 矩阵
    X_pad = np.zeros((n_ex, n_in, in_rows   pr1   pr2, in_cols   pc1   pc2))
    # 获取 im2col 的索引
    k, i, j = _im2col_indices((n_ex, n_in, in_rows, in_cols), fr, fc, pad, s, d)

    # 重塑 X_col
    X_col_reshaped = X_col.reshape(n_in * fr * fc, -1, n_ex)
    X_col_reshaped = X_col_reshaped.transpose(2, 0, 1)

    # 在 X_pad 上进行加法操作
    np.add.at(X_pad, (slice(None), k, i, j), X_col_reshaped)

    # 更新 pr2 和 pc2
    pr2 = None if pr2 == 0 else -pr2
    pc2 = None if pc2 == 0 else -pc2
    # 返回填充后的 X 矩阵
    return X_pad[:, :, pr1:pr2, pc1:pc2]
#                             Convolution                             #
#######################################################################

# 定义一个二维卷积函数,用于计算输入 `X` 与一组卷积核 `W` 的卷积(实际上是互相关)
def conv2D(X, W, stride, pad, dilation=0):
    """
    A faster (but more memory intensive) implementation of the 2D "convolution"
    (technically, cross-correlation) of input `X` with a collection of kernels in
    `W`.

    Notes
    -----
    Relies on the :func:`im2col` function to perform the convolution as a single
    matrix multiplication.

    For a helpful diagram, see Pete Warden's 2015 blogpost [1].

    References
    ----------
    .. [1] Warden (2015). "Why GEMM is at the heart of deep learning,"
       https://petewarden.com/2015/04/20/why-gemm-is-at-the-heart-of-deep-learning/

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, in_rows, in_cols, in_ch)`
        Input volume (unpadded).
    W: :py:class:`ndarray <numpy.ndarray>` of shape `(kernel_rows, kernel_cols, in_ch, out_ch)`
        A volume of convolution weights/kernels for a given layer.
    stride : int
        The stride of each convolution kernel.
    pad : tuple, int, or 'same'
        The padding amount. If 'same', add padding to ensure that the output of
        a 2D convolution with a kernel of `kernel_shape` and stride `stride`
        produces an output volume of the same dimensions as the input.  If
        2-tuple, specifies the number of padding rows and colums to add *on both
        sides* of the rows/columns in `X`. If 4-tuple, specifies the number of
        rows/columns to add to the top, bottom, left, and right of the input
        volume.
    dilation : int
        Number of pixels inserted between kernel elements. Default is 0.

    Returns
    -------
    Z : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, out_rows, out_cols, out_ch)`
        The covolution of `X` with `W`.
    """
    # 将步长和膨胀设为变量s和d
    s, d = stride, dilation
    # 调用pad2D函数计算二维卷积的填充情况,返回填充后的输入和输出尺寸
    _, p = pad2D(X, pad, W.shape[:2], s, dilation=dilation)
    # 将输入的参数解包为四个变量:pr1, pr2, pc1, pc2
    pr1, pr2, pc1, pc2 = p
    # 解包卷积核的形状信息:fr, fc, in_ch, out_ch
    fr, fc, in_ch, out_ch = W.shape
    # 解包输入数据的形状信息:n_ex, in_rows, in_cols, in_ch
    n_ex, in_rows, in_cols, in_ch = X.shape

    # 根据膨胀因子更新有效滤波器形状
    _fr, _fc = fr * (d   1) - d, fc * (d   1) - d

    # 计算卷积输出的维度
    out_rows = int((in_rows   pr1   pr2 - _fr) / s   1)
    out_cols = int((in_cols   pc1   pc2 - _fc) / s   1)

    # 将输入数据和卷积核转换为适当的二维矩阵,并计算它们的乘积
    X_col, _ = im2col(X, W.shape, p, s, d)
    W_col = W.transpose(3, 2, 0, 1).reshape(out_ch, -1)

    # 计算卷积结果并重塑为指定形状
    Z = (W_col @ X_col).reshape(out_ch, out_rows, out_cols, n_ex).transpose(3, 1, 2, 0)

    # 返回卷积结果
    return Z
# 定义一个一维卷积函数,用于对输入 X 和卷积核集合 W 进行卷积操作
def conv1D(X, W, stride, pad, dilation=0):
    """
    A faster (but more memory intensive) implementation of a 1D "convolution"
    (technically, cross-correlation) of input `X` with a collection of kernels in
    `W`.

    Notes
    -----
    Relies on the :func:`im2col` function to perform the convolution as a single
    matrix multiplication.

    For a helpful diagram, see Pete Warden's 2015 blogpost [1].

    References
    ----------
    .. [1] Warden (2015). "Why GEMM is at the heart of deep learning,"
       https://petewarden.com/2015/04/20/why-gemm-is-at-the-heart-of-deep-learning/

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, l_in, in_ch)`
        Input volume (unpadded)
    W: :py:class:`ndarray <numpy.ndarray>` of shape `(kernel_width, in_ch, out_ch)`
        A volume of convolution weights/kernels for a given layer
    stride : int
        The stride of each convolution kernel
    pad : tuple, int, or 'same'
        The padding amount. If 'same', add padding to ensure that the output of
        a 1D convolution with a kernel of `kernel_shape` and stride `stride`
        produces an output volume of the same dimensions as the input.  If
        2-tuple, specifies the number of padding colums to add *on both sides*
        of the columns in X.
    dilation : int
        Number of pixels inserted between kernel elements. Default is 0.

    Returns
    -------
    Z : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, l_out, out_ch)`
        The convolution of X with W.
    """
    # 计算需要添加的填充数量
    _, p = pad1D(X, pad, W.shape[0], stride, dilation=dilation)

    # 为 X 添加一个行维度,以便使用 im2col/col2im
    X2D = np.expand_dims(X, axis=1)
    W2D = np.expand_dims(W, axis=0)
    p2D = (0, 0, p[0], p[1])
    # 调用二维卷积函数 conv2D 进行卷积操作
    Z2D = conv2D(X2D, W2D, stride, p2D, dilation)

    # 去除行维度,返回结果
    return np.squeeze(Z2D, axis=1)


def deconv2D_naive(X, W, stride, pad, dilation=0):
    """
    # 对输入体积 `X` 进行“反卷积”(更准确地说是转置卷积),考虑步长、填充和膨胀
    # 注意
    # 与使用卷积矩阵的转置不同,这种方法使用直接卷积并进行零填充,概念上简单直观,但计算效率低下
    # 详细解释请参见 [1]
    # 参考资料
    # [1] Dumoulin & Visin (2016). "A guide to convolution arithmetic for deep learning." https://arxiv.org/pdf/1603.07285v1.pdf
    # 参数
    # X : 形状为 `(n_ex, in_rows, in_cols, in_ch)` 的 :py:class:`ndarray <numpy.ndarray>`
    #     输入体积(未填充)
    # W: 形状为 `(kernel_rows, kernel_cols, in_ch, out_ch)` 的 :py:class:`ndarray <numpy.ndarray>`
    #     给定层的卷积权重/卷积核体积
    # stride : int
    #     每个卷积核的步长
    # pad : tuple, int, 或 'same'
    #     填充量。如果为 'same',则添加填充以确保具有 `kernel_shape` 和步长 `stride` 的 2D 卷积的输出体积与输入体积具有相同的维度。
    #     如果为 2-元组,则指定要在 `X` 的行和列的 *两侧* 添加的填充行和列数。如果为 4-元组,则指定要添加到输入体积的顶部、底部、左侧和右侧的行/列数。
    # dilation : int
    #     插入在卷积核元素之间的像素数。默认为 0
    # 返回
    # Y : 形状为 `(n_ex, out_rows, out_cols, n_out)` 的 :py:class:`ndarray <numpy.ndarray>`
    #     使用步长 `s` 和膨胀 `d` 对(填充的)输入体积 `X` 与 `W` 进行反卷积后的结果
    """
    # 如果步长大于 1
    if stride > 1:
        # 对 X 进行膨胀操作,膨胀次数为 stride - 1
        X = dilate(X, stride - 1)
        # 将步长设为 1
        stride = 1

    # 对输入进行填充
    # X_pad 为填充后的输入,p 为填充的参数
    X_pad, p = pad2D(X, pad, W.shape[:2], stride=stride, dilation=dilation)
    # 获取输入张量 X_pad 的扩展维度、行数、列数和通道数
    n_ex, in_rows, in_cols, n_in = X_pad.shape
    # 获取权重矩阵 W 的滤波器行数、列数、输入通道数和输出通道数
    fr, fc, n_in, n_out = W.shape
    # 设置步长 s 和膨胀因子 d
    s, d = stride, dilation
    # 设置填充参数 pr1, pr2, pc1, pc2
    pr1, pr2, pc1, pc2 = p

    # 根据膨胀因子更新有效滤波器形状
    _fr, _fc = fr * (d   1) - d, fc * (d   1) - d

    # 计算反卷积输出维度
    out_rows = s * (in_rows - 1) - pr1 - pr2   _fr
    out_cols = s * (in_cols - 1) - pc1 - pc2   _fc
    out_dim = (out_rows, out_cols)

    # 添加额外填充以达到目标输出维度
    _p = calc_pad_dims_2D(X_pad.shape, out_dim, W.shape[:2], s, d)
    X_pad, pad = pad2D(X_pad, _p, W.shape[:2], stride=s, dilation=dilation)

    # 使用翻转的权重矩阵执行前向卷积(注意设置 pad 为 0,因为我们已经添加了填充)
    Z = conv2D(X_pad, np.rot90(W, 2), s, 0, d)

    # 如果 pr2 为 0,则将其设置为 None;否则将其设置为 -pr2
    pr2 = None if pr2 == 0 else -pr2
    # 如果 pc2 为 0,则将其设置为 None;否则将其设置为 -pc2
    pc2 = None if pc2 == 0 else -pc2
    # 返回 Z 的切片,根据 pr1, pr2, pc1, pc2 进行切片
    return Z[:, pr1:pr2, pc1:pc2, :]
# 定义一个使用朴素方法实现的二维“卷积”(实际上是交叉相关)的函数
def conv2D_naive(X, W, stride, pad, dilation=0):
    """
    A slow but more straightforward implementation of a 2D "convolution"
    (technically, cross-correlation) of input `X` with a collection of kernels `W`.

    Notes
    -----
    This implementation uses ``for`` loops and direct indexing to perform the
    convolution. As a result, it is slower than the vectorized :func:`conv2D`
    function that relies on the :func:`col2im` and :func:`im2col`
    transformations.

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, in_rows, in_cols, in_ch)`
        Input volume.
    W: :py:class:`ndarray <numpy.ndarray>` of shape `(kernel_rows, kernel_cols, in_ch, out_ch)`
        The volume of convolution weights/kernels.
    stride : int
        The stride of each convolution kernel.
    pad : tuple, int, or 'same'
        The padding amount. If 'same', add padding to ensure that the output of
        a 2D convolution with a kernel of `kernel_shape` and stride `stride`
        produces an output volume of the same dimensions as the input.  If
        2-tuple, specifies the number of padding rows and colums to add *on both
        sides* of the rows/columns in `X`. If 4-tuple, specifies the number of
        rows/columns to add to the top, bottom, left, and right of the input
        volume.
    dilation : int
        Number of pixels inserted between kernel elements. Default is 0.

    Returns
    -------
    Z : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, out_rows, out_cols, out_ch)`
        The covolution of `X` with `W`.
    """
    # 将 stride 和 dilation 参数赋值给 s 和 d
    s, d = stride, dilation
    # 对输入 X 进行二维填充,得到填充后的输入 X_pad 和填充元组 p
    X_pad, p = pad2D(X, pad, W.shape[:2], stride=s, dilation=d)

    # 解包填充元组 p,得到填充的行数和列数
    pr1, pr2, pc1, pc2 = p
    # 获取卷积核的形状信息
    fr, fc, in_ch, out_ch = W.shape
    # 获取输入 X 的形状信息
    n_ex, in_rows, in_cols, in_ch = X.shape

    # 根据膨胀因子更新有效滤波器形状
    fr, fc = fr * (d   1) - d, fc * (d   1) - d

    # 计算输出的行数
    out_rows = int((in_rows   pr1   pr2 - fr) / s   1)
    # 计算输出列数
    out_cols = int((in_cols   pc1   pc2 - fc) / s   1)

    # 创建一个全零数组,用于存储卷积操作的结果
    Z = np.zeros((n_ex, out_rows, out_cols, out_ch))
    # 遍历每个样本
    for m in range(n_ex):
        # 遍历输出通道
        for c in range(out_ch):
            # 遍历输出行
            for i in range(out_rows):
                # 遍历输出列
                for j in range(out_cols):
                    # 计算窗口的起始和结束位置
                    i0, i1 = i * s, (i * s)   fr
                    j0, j1 = j * s, (j * s)   fc

                    # 从输入数据中提取窗口数据
                    window = X_pad[m, i0 : i1 : (d   1), j0 : j1 : (d   1), :]
                    # 执行卷积操作并将结果存储到输出数组中
                    Z[m, i, j, c] = np.sum(window * W[:, :, :, c])
    # 返回卷积操作的结果数组
    return Z
# 权重初始化函数:He uniform 初始化策略
def he_uniform(weight_shape):
    """
    Initializes network weights `W` with using the He uniform initialization
    strategy.

    Notes
    -----
    The He uniform initializations trategy initializes the weights in `W` using
    draws from Uniform(-b, b) where

    .. math::

        b = sqrt{\frac{6}{\text{fan_in}}}

    Developed for deep networks with ReLU nonlinearities.

    Parameters
    ----------
    weight_shape : tuple
        The dimensions of the weight matrix/volume.

    Returns
    -------
    W : :py:class:`ndarray <numpy.ndarray>` of shape `weight_shape`
        The initialized weights.
    """
    # 计算输入和输出的神经元数量
    fan_in, fan_out = calc_fan(weight_shape)
    # 计算 b 值
    b = np.sqrt(6 / fan_in)
    # 从 Uniform(-b, b) 中随机初始化权重
    return np.random.uniform(-b, b, size=weight_shape)


# 权重初始化函数:He normal 初始化策略
def he_normal(weight_shape):
    """
    Initialize network weights `W` using the He normal initialization strategy.

    Notes
    -----
    The He normal initialization strategy initializes the weights in `W` using
    draws from TruncatedNormal(0, b) where the variance `b` is

    .. math::

        b = \frac{2}{\text{fan_in}}

    He normal initialization was originally developed for deep networks with
    :class:`~numpy_ml.neural_nets.activations.ReLU` nonlinearities.

    Parameters
    ----------
    weight_shape : tuple
        The dimensions of the weight matrix/volume.

    Returns
    -------
    W : :py:class:`ndarray <numpy.ndarray>` of shape `weight_shape`
        The initialized weights.
    """
    # 计算输入和输出的神经元数量
    fan_in, fan_out = calc_fan(weight_shape)
    # 计算标准差
    std = np.sqrt(2 / fan_in)
    # 从 TruncatedNormal(0, b) 中随机初始化权重
    return truncated_normal(0, std, weight_shape)


# 权重初始化函数:Glorot uniform 初始化策略
def glorot_uniform(weight_shape, gain=1.0):
    """
    Initialize network weights `W` using the Glorot uniform initialization
    strategy.

    Notes
    -----
    The Glorot uniform initialization strategy initializes weights using draws
    from ``Uniform(-b, b)`` where:

    .. math::

        b = \text{gain} sqrt{\frac{6}{\text{fan_in}   \text{fan_out}}}

    The motivation for Glorot uniform initialization is to choose weights to
    ensure that the variance of the layer outputs are approximately equal to
    the variance of its inputs.

    This initialization strategy was primarily developed for deep networks with
    tanh and logistic sigmoid nonlinearities.

    Parameters
    ----------
    weight_shape : tuple
        The dimensions of the weight matrix/volume.

    Returns
    -------
    W : :py:class:`ndarray <numpy.ndarray>` of shape `weight_shape`
        The initialized weights.
    """
    # 计算权重矩阵/体积的维度
    fan_in, fan_out = calc_fan(weight_shape)
    # 计算 b 的值
    b = gain * np.sqrt(6 / (fan_in   fan_out))
    # 返回从 Uniform(-b, b) 中随机初始化的权重
    return np.random.uniform(-b, b, size=weight_shape)
# 使用 Glorot 正态初始化策略初始化网络权重 `W`
def glorot_normal(weight_shape, gain=1.0):
    # 计算权重矩阵/体的维度
    fan_in, fan_out = calc_fan(weight_shape)
    # 计算标准差
    std = gain * np.sqrt(2 / (fan_in   fan_out))
    # 返回从截断正态分布中抽取的值
    return truncated_normal(0, std, weight_shape)

# 通过拒绝抽样生成从截断正态分布中抽取的值
def truncated_normal(mean, std, out_shape):
    # 拒绝抽样方案从具有均值 `mean` 和标准差 `std` 的正态分布中抽取样本,并重新抽样任何值超过 `mean` 两个标准差的值
    pass
    # 从参数为 `mean` 和 `std` 的截断正态分布中抽取形状为 `out_shape` 的样本
    samples = np.random.normal(loc=mean, scale=std, size=out_shape)
    # 创建一个逻辑数组,标记样本是否超出均值加减两倍标准差的范围
    reject = np.logical_or(samples >= mean   2 * std, samples <= mean - 2 * std)
    # 当仍有样本超出范围时,重新抽取这些样本
    while any(reject.flatten()):
        # 重新从参数为 `mean` 和 `std` 的正态分布中抽取超出范围的样本数量的样本
        resamples = np.random.normal(loc=mean, scale=std, size=reject.sum())
        # 将重新抽取的样本替换原来超出范围的样本
        samples[reject] = resamples
        # 更新标记数组,检查是否仍有样本超出范围
        reject = np.logical_or(samples >= mean   2 * std, samples <= mean - 2 * std)
    # 返回处理后的样本数组
    return samples

0 人点赞