NumPyML 源码解析(四)

2024-02-17 10:04:15 浏览数 (1)

numpy-mlnumpy_mlneural_netsutils__init__.py

代码语言:javascript复制
"""
神经网络特定的常见辅助函数。

``neural_nets.utils` 模块包含神经网络特定的辅助函数,主要用于处理 CNNs。
"""

# 从当前目录下的 utils 模块中导入所有内容
from .utils import *

Wrappers

The wrappers.py module implements wrappers for the layers in layers.py. It includes

  • Dropout (Srivastava, et al., 2014)

numpy-mlnumpy_mlneural_netswrapperswrappers.py

代码语言:javascript复制
"""
A collection of objects thats can wrap / otherwise modify arbitrary neural
network layers.
"""

# 导入必要的库
from abc import ABC, abstractmethod
import numpy as np

# 定义一个抽象基类 WrapperBase
class WrapperBase(ABC):
    def __init__(self, wrapped_layer):
        """An abstract base class for all Wrapper instances"""
        # 初始化函数,接受一个 wrapped_layer 参数
        self._base_layer = wrapped_layer
        # 如果 wrapped_layer 中有 _base_layer 属性,则将其赋值给 self._base_layer
        if hasattr(wrapped_layer, "_base_layer"):
            self._base_layer = wrapped_layer._base_layer
        super().__init__()

    @abstractmethod
    def _init_wrapper_params(self):
        # 抽象方法,用于初始化包装器参数
        raise NotImplementedError

    @abstractmethod
    def forward(self, z, **kwargs):
        """Overwritten by inherited class"""
        # 抽象方法,用于前向传播,由子类实现
        raise NotImplementedError

    @abstractmethod
    def backward(self, out, **kwargs):
        """Overwritten by inherited class"""
        # 抽象方法,用于反向传播,由子类实现
        raise NotImplementedError

    @property
    def trainable(self):
        """Whether the base layer is frozen"""
        # 返回基础层是否可训练
        return self._base_layer.trainable

    @property
    def parameters(self):
        """A dictionary of the base layer parameters"""
        # 返回基础层的参数字典
        return self._base_layer.parameters

    @property
    def hyperparameters(self):
        """A dictionary of the base layer's hyperparameters"""
        # 返回基础层的超参数字典
        hp = self._base_layer.hyperparameters
        hpw = self._wrapper_hyperparameters
        if "wrappers" in hp:
            hp["wrappers"].append(hpw)
        else:
            hp["wrappers"] = [hpw]
        return hp

    @property
    def derived_variables(self):
        """
        A dictionary of the intermediate values computed during layer
        training.
        """
        # 返回在层训练期间计算的中间值的字典
        dv = self._base_layer.derived_variables.copy()
        if "wrappers" in dv:
            dv["wrappers"].append(self._wrapper_derived_variables)
        else:
            dv["wrappers"] = [self._wrapper_derived_variables]
        return dv

    @property
    def gradients(self):
        """A dictionary of the current layer parameter gradients."""
        # 返回当前层参数梯度的字典
        return self._base_layer.gradients

    @property
    def act_fn(self):
        """The activation function for the base layer."""
        # 返回基础层的激活函数
        return self._base_layer.act_fn

    @property
    def X(self):
        """The collection of layer inputs."""
        # 返回层输入的集合
        return self._base_layer.X

    def _init_params(self):
        # 初始化参数
        hp = self._wrapper_hyperparameters
        # 如果基础层的超参数中包含"wrappers",则将当前超参数追加到列表中
        if "wrappers" in self._base_layer.hyperparameters:
            self._base_layer.hyperparameters["wrappers"].append(hp)
        else:
            self._base_layer.hyperparameters["wrappers"] = [hp]

    def freeze(self):
        """
        Freeze the base layer's parameters at their current values so they can
        no longer be updated.
        """
        # 冻结基础层的参数,使其无法再更新
        self._base_layer.freeze()

    def unfreeze(self):
        """Unfreeze the base layer's parameters so they can be updated."""
        # 解冻基础层的参数,使其可以更新
        self._base_layer.freeze()

    def flush_gradients(self):
        """Erase all the wrapper and base layer's derived variables and gradients."""
        # 清除所有包装器和基础层的派生变量和梯度
        assert self.trainable, "Layer is frozen"
        self._base_layer.flush_gradients()

        for k, v in self._wrapper_derived_variables.items():
            self._wrapper_derived_variables[k] = []

    def update(self, lr):
        """
        Update the base layer's parameters using the accrued gradients and
        layer optimizer. Flush all gradients once the update is complete.
        """
        # 使用累积的梯度和层优化器更新基础层的参数。更新完成后清除所有梯度
        assert self.trainable, "Layer is frozen"
        self._base_layer.update(lr)
        self.flush_gradients()

    def _set_wrapper_params(self, pdict):
        # 设置包装器参数
        for k, v in pdict.items():
            if k in self._wrapper_hyperparameters:
                self._wrapper_hyperparameters[k] = v
        return self
    def set_params(self, summary_dict):
        """
        从一个值字典中设置基础层参数。

        Parameters
        ----------
        summary_dict : dict
            一个包含层参数和超参数的字典。如果在 `summary_dict` 中没有包含所需的参数或超参数,
            这个方法将使用当前层的 :meth:`summary` 方法中的值。

        Returns
        -------
        layer : :doc:`Layer <numpy_ml.neural_nets.layers>` object
            新初始化的层。
        """
        return self._base_layer.set_params(summary_dict)

    def summary(self):
        """返回一个包含层参数、超参数和 ID 的字典。"""
        return {
            "layer": self.hyperparameters["layer"],
            "layer_wrappers": [i["wrapper"] for i in self.hyperparameters["wrappers"]],
            "parameters": self.parameters,
            "hyperparameters": self.hyperparameters,
        }
class Dropout(WrapperBase):
    def __init__(self, wrapped_layer, p):
        """
        A dropout regularization wrapper.

        Notes
        -----
        During training, a dropout layer zeroes each element of the layer input
        with probability `p` and scales the activation by `1 / (1 - p)` (to reflect
        the fact that on average only `(1 - p) * N` units are active on any
        training pass). At test time, does not adjust elements of the input at
        all (ie., simply computes the identity function).

        Parameters
        ----------
        wrapped_layer : :doc:`Layer <numpy_ml.neural_nets.layers>` instance
            The layer to apply dropout to.
        p : float in [0, 1)
            The dropout propbability during training
        """
        # 调用父类的构造函数
        super().__init__(wrapped_layer)
        # 初始化 dropout 概率
        self.p = p
        # 初始化包装器参数
        self._init_wrapper_params()
        # 初始化参数
        self._init_params()

    def _init_wrapper_params(self):
        # 初始化包装器的派生变量,dropout_mask 用于存储 dropout 掩码
        self._wrapper_derived_variables = {"dropout_mask": []}
        # 初始化包装器的超参数,包括包装器类型和 dropout 概率
        self._wrapper_hyperparameters = {"wrapper": "Dropout", "p": self.p}
    # 计算带有 dropout 的单个 minibatch 的层输出
    def forward(self, X, retain_derived=True):
        """
        Compute the layer output with dropout for a single minibatch.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_in)`
            Layer input, representing the `n_in`-dimensional features for a
            minibatch of `n_ex` examples.
        retain_derived : bool
            Whether to retain the variables calculated during the forward pass
            for use later during backprop. If False, this suggests the layer
            will not be expected to backprop through wrt. this input. Default
            is True.

        Returns
        -------
        Y : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_out)`
            Layer output for each of the `n_ex` examples.
        """
        # 初始化缩放因子为1.0,掩码为全为True的数组
        scaler, mask = 1.0, np.ones(X.shape).astype(bool)
        # 如果该层可训练
        if self.trainable:
            # 计算缩放因子
            scaler = 1.0 / (1.0 - self.p)
            # 生成与输入形状相同的随机掩码
            mask = np.random.rand(*X.shape) >= self.p
            # 对输入应用掩码
            X = mask * X

        # 如果需要保留派生变量
        if retain_derived:
            # 将 dropout 掩码添加到派生变量字典中
            self._wrapper_derived_variables["dropout_mask"].append(mask)

        # 返回经过缩放的输入经过基础层前向传播的结果
        return scaler * self._base_layer.forward(X, retain_derived)
    # 反向传播,从基础层的输出到输入
    def backward(self, dLdy, retain_grads=True):
        """
        Backprop from the base layer's outputs to inputs.

        Parameters
        ----------
        dLdy : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_out)` or list of arrays
            The gradient(s) of the loss wrt. the layer output(s).
        retain_grads : bool
            Whether to include the intermediate parameter gradients computed
            during the backward pass in the final parameter update. Default is
            True.

        Returns
        -------
        dLdX : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, n_in)` or list of arrays
            The gradient of the loss wrt. the layer input(s) `X`.
        """  # noqa: E501
        # 检查层是否可训练,如果不可训练则抛出异常
        assert self.trainable, "Layer is frozen"
        # 将梯度乘以 1/(1-p),其中 p 是 dropout 概率
        dLdy *= 1.0 / (1.0 - self.p)
        # 调用基础层的反向传播方法,传入梯度和是否保留中间参数梯度
        return self._base_layer.backward(dLdy, retain_grads)
# 初始化层包装器并返回一个包装后的层对象
def init_wrappers(layer, wrappers_list):
    """
    Initialize the layer wrappers in `wrapper_list` and return a wrapped
    `layer` object.

    Parameters
    ----------
    layer : :doc:`Layer <numpy_ml.neural_nets.layers>` instance
        The base layer object to apply the wrappers to.
    wrappers : list of dicts
        A list of parameter dictionaries for a the wrapper objects. The
        wrappers are initialized and applied to the the layer sequentially.

    Returns
    -------
    wrapped_layer : :class:`WrapperBase` instance
        The wrapped layer object
    """
    # 遍历包装器列表
    for wr in wrappers_list:
        # 如果包装器是 "Dropout"
        if wr["wrapper"] == "Dropout":
            # 创建一个 Dropout 包装器对象并应用到层上
            layer = Dropout(layer, 1)._set_wrapper_params(wr)
        else:
            # 如果包装器不是 "Dropout",则抛出未实现的错误
            raise NotImplementedError("{}".format(wr["wrapper"]))
    # 返回包装后的层对象
    return layer

numpy-mlnumpy_mlneural_netswrappers__init__.py

代码语言:javascript复制
# 从当前目录下的wrappers模块中导入所有内容
from .wrappers import *

numpy-mlnumpy_mlneural_nets__init__.py

代码语言:javascript复制
# 导入神经网络构建的基本模块
from . import utils
# 导入损失函数模块
from . import losses
# 导入激活函数模块
from . import activations
# 导入调度器模块
from . import schedulers
# 导入优化器模块
from . import optimizers
# 导入包装器模块
from . import wrappers
# 导入层模块
from . import layers
# 导入初始化器模块
from . import initializers
# 导入模块模块
from . import modules
# 导入模型模块
from . import models

numpy-mlnumpy_mlngramngram.py

代码语言:javascript复制
# 导入文本包装、抽象基类和计数器模块
import textwrap
from abc import ABC, abstractmethod
from collections import Counter

# 导入 NumPy 库
import numpy as np

# 从自定义的线性回归模块中导入 LinearRegression 类
from numpy_ml.linear_models import LinearRegression
# 从自定义的自然语言处理预处理模块中导入 tokenize_words、ngrams 和 strip_punctuation 函数
from numpy_ml.preprocessing.nlp import tokenize_words, ngrams, strip_punctuation

# 定义一个抽象基类 NGramBase
class NGramBase(ABC):
    def __init__(self, N, unk=True, filter_stopwords=True, filter_punctuation=True):
        """
        A simple word-level N-gram language model.

        Notes
        -----
        This is not optimized code and will be slow for large corpora. To see
        how industry-scale NGram models are handled, see the SRLIM-format:

            http://www.speech.sri.com/projects/srilm/
        """
        # 初始化 N-gram 模型的参数
        self.N = N
        self.unk = unk
        self.filter_stopwords = filter_stopwords
        self.filter_punctuation = filter_punctuation

        # 存储超参数信息
        self.hyperparameters = {
            "N": N,
            "unk": unk,
            "filter_stopwords": filter_stopwords,
            "filter_punctuation": filter_punctuation,
        }

        # 调用父类的构造函数
        super().__init__()
    # 训练语言模型,统计语料库中文本的 n-gram 计数
    def train(self, corpus_fp, vocab=None, encoding=None):
        """
        Compile the n-gram counts for the text(s) in `corpus_fp`.

        Notes
        -----
        After running `train`, the ``self.counts`` attribute will store
        dictionaries of the `N`, `N-1`, ..., 1-gram counts.

        Parameters
        ----------
        corpus_fp : str
            The path to a newline-separated text corpus file.
        vocab : :class:`~numpy_ml.preprocessing.nlp.Vocabulary` instance or None
            If not None, only the words in `vocab` will be used to construct
            the language model; all out-of-vocabulary words will either be
            mappend to ``<unk>`` (if ``self.unk = True``) or removed (if
            ``self.unk = False``). Default is None.
        encoding : str or None
            Specifies the text encoding for corpus. Common entries are 'utf-8',
            'utf-8-sig', 'utf-16'. Default is None.
        """
        # 调用内部方法 _train 来实际执行训练过程
        return self._train(corpus_fp, vocab=vocab, encoding=encoding)
    # 定义一个用于训练 N-gram 模型的方法,接受语料文件路径、词汇表和编码方式作为参数
    def _train(self, corpus_fp, vocab=None, encoding=None):
        # 获取超参数
        H = self.hyperparameters
        # 初始化存储 N-gram 的字典
        grams = {N: [] for N in range(1, self.N   1)}
        # 初始化计数器字典
        counts = {N: Counter() for N in range(1, self.N   1)}
        # 获取是否过滤停用词和标点符号的设置
        filter_stop, filter_punc = H["filter_stopwords"], H["filter_punctuation"]

        # 初始化词数计数器
        _n_words = 0
        # 初始化词汇集合
        tokens = {"<unk>"}
        # 初始化句子起始和结束标记
        bol, eol = ["<bol>"], ["<eol>"]

        # 打开语料文件进行读取
        with open(corpus_fp, "r", encoding=encoding) as text:
            # 逐行处理文本
            for line in text:
                # 如果需要过滤标点符号,则去除标点符号
                line = strip_punctuation(line) if filter_punc else line
                # 对文本进行分词处理,根据设置过滤停用词
                words = tokenize_words(line, filter_stopwords=filter_stop)

                # 如果提供了词汇表,则根据词汇表过滤词汇
                if vocab is not None:
                    words = vocab.filter(words, H["unk"])

                # 如果分词结果为空,则继续处理下一行
                if len(words) == 0:
                    continue

                # 更新词数计数器
                _n_words  = len(words)
                # 更新词汇集合
                tokens.update(words)

                # 计算 n, n-1, ... 1-gram
                for N in range(1, self.N   1):
                    # 对词汇进行填充,添加起始和结束标记
                    words_padded = bol * max(1, N - 1)   words   eol * max(1, N - 1)
                    # 将 n-gram 添加到对应的 grams 中
                    grams[N].extend(ngrams(words_padded, N))

        # 统计每个 N-gram 的出现次数
        for N in counts.keys():
            counts[N].update(grams[N])

        # 统计总词数
        n_words = {N: np.sum(list(counts[N].values())) for N in range(1, self.N   1)}
        n_words[1] = _n_words

        # 统计词汇量
        n_tokens = {N: len(counts[N]) for N in range(2, self.N   1)}
        n_tokens[1] = len(vocab) if vocab is not None else len(tokens)

        # 更新模型的计数器、总词数和词汇量
        self.counts = counts
        self.n_words = n_words
        self.n_tokens = n_tokens
    # 定义一个方法,用于返回在 N-gram 语言模型下提议的下一个单词的分布
    def completions(self, words, N):
        """
        Return the distribution over proposed next words under the `N`-gram
        language model.

        Parameters
        ----------
        words : list or tuple of strings
            The initial sequence of words
        N : int
            The gram-size of the language model to use to generate completions

        Returns
        -------
        probs : list of (word, log_prob) tuples
            The list of possible next words and their log probabilities under
            the `N`-gram language model (unsorted)
        """
        # 确保 N 不超过 words 的长度加一
        N = min(N, len(words)   1)
        # 检查 self.counts 中是否有 N-grams 的计数
        assert N in self.counts, "You do not have counts for {}-grams".format(N)
        # 检查 words 中是否至少有 N-1 个单词
        assert len(words) >= N - 1, "`words` must have at least {} words".format(N - 1)

        # 初始化一个空列表用于存储下一个单词的概率
        probs = []
        # 获取基础元组,包含最后 N-1 个单词的小写形式
        base = tuple(w.lower() for w in words[-N   1 :])
        # 遍历 N-grams 的计数字典中的键
        for k in self.counts[N].keys():
            # 如果当前键的前 N-1 个元素与基础元组相同
            if k[:-1] == base:
                # 计算当前键的概率,并添加到概率列表中
                c_prob = self._log_ngram_prob(base   k[-1:])
                probs.append((k[-1], c_prob))
        # 返回概率列表
        return probs
    def generate(self, N, seed_words=["<bol>"], n_sentences=5):
        """
        使用 N-gram 语言模型生成句子。

        Parameters
        ----------
        N : int
            要生成的模型的 gram 大小
        seed_words : list of strs
            用于初始化句子生成的种子词列表。默认为 ["<bol>"]。
        sentences : int
            从 N-gram 模型中生成的句子数量。默认为 50。

        Returns
        -------
        sentences : str
            从 N-gram 模型中抽样生成的句子,用空格连接,句子之间用换行符分隔。
        """
        counter = 0
        sentences = []  # 存储生成的句子
        words = seed_words.copy()  # 复制种子词列表
        while counter < n_sentences:
            # 获取下一个词和对应的概率
            nextw, probs = zip(*self.completions(words, N))
            # 如果进行了平滑处理,则重新归一化概率
            probs = np.exp(probs) / np.exp(probs).sum()
            # 根据概率选择下一个词
            next_word = np.random.choice(nextw, p=probs)

            # 如果到达句子结尾,保存句子并开始新句子
            if next_word == "<eol>":
                # 将词列表连接成句子
                S = " ".join([w for w in words if w != "<bol>"])
                # 将句子格式化为指定宽度,以便显示
                S = textwrap.fill(S, 90, initial_indent="", subsequent_indent="   ")
                print(S)  # 打印句子
                words.append(next_word)
                sentences.append(words)  # 将句子添加到结果列表
                words = seed_words.copy()  # 重置词列表为种子词列表
                counter  = 1
                continue

            words.append(next_word)
        return sentences  # 返回生成的句子列表
    # 计算给定单词序列的模型困惑度
    def perplexity(self, words, N):
        r"""
        Calculate the model perplexity on a sequence of words.

        Notes
        -----
        Perplexity, `PP`, is defined as

        .. math::

            PP(W)  =  left( frac{1}{p(W)} right)^{1 / n}

        or simply

        .. math::

            PP(W)  &=  exp(-log p(W) / n) \
                   &=  exp(H(W))

        where :math:`W = [w_1, ldots, w_k]` is a sequence of words, `H(w)` is
        the cross-entropy of `W` under the current model, and `n` is the number
        of `N`-grams in `W`.

        Minimizing perplexity is equivalent to maximizing the probability of
        `words` under the `N`-gram model. It may also be interpreted as the
        average branching factor when predicting the next word under the
        language model.

        Parameters
        ----------
        N : int
            The gram-size of the model to calculate perplexity with.
        words : list or tuple of strings
            The sequence of words to compute perplexity on.

        Returns
        -------
        perplexity : float
            The model perlexity for the words in `words`.
        """
        # 返回给定单词序列的交叉熵的指数值,即模型困惑度
        return np.exp(self.cross_entropy(words, N))
    # 计算模型在一系列单词上的交叉熵,与样本中单词的经验分布进行比较
    def cross_entropy(self, words, N):
        r"""
        Calculate the model cross-entropy on a sequence of words against the
        empirical distribution of words in a sample.

        Notes
        -----
        Model cross-entropy, `H`, is defined as

        .. math::

            H(W) = -frac{log p(W)}{n}

        where :math:`W = [w_1, ldots, w_k]` is a sequence of words, and `n` is
        the number of `N`-grams in `W`.

        The model cross-entropy is proportional (not equal, since we use base
        `e`) to the average number of bits necessary to encode `W` under the
        model distribution.

        Parameters
        ----------
        N : int
            The gram-size of the model to calculate cross-entropy on.
        words : list or tuple of strings
            The sequence of words to compute cross-entropy on.

        Returns
        -------
        H : float
            The model cross-entropy for the words in `words`.
        """
        # 计算 n-gram 的数量
        n_ngrams = len(ngrams(words, N))
        # 返回交叉熵结果
        return -(1 / n_ngrams) * self.log_prob(words, N)

    # 计算序列单词在 N-gram 模型下的对数概率
    def _log_prob(self, words, N):
        """
        Calculate the log probability of a sequence of words under the
        `N`-gram model
        """
        # 检查是否有 N-gram 的计数
        assert N in self.counts, "You do not have counts for {}-grams".format(N)

        # 如果单词数量不足以形成 N-gram,则引发异常
        if N > len(words):
            err = "Not enough words for a gram-size of {}: {}".format(N, len(words))
            raise ValueError(err)

        # 初始化总概率
        total_prob = 0
        # 遍历所有 N-gram,计算对数概率并累加
        for ngram in ngrams(words, N):
            total_prob  = self._log_ngram_prob(ngram)
        return total_prob
    # 返回在未平滑的 N 元语言模型下,可以跟随序列 `words` 的唯一单词标记的数量
    def _n_completions(self, words, N):
        # 检查是否存在 N 元组的计数
        assert N in self.counts, "You do not have counts for {}-grams".format(N)
        # 检查是否需要大于 N-2 个单词才能使用 N 元组
        assert len(words) <= N - 1, "Need > {} words to use {}-grams".format(N - 2, N)

        # 如果输入的单词是列表,则转换为元组
        if isinstance(words, list):
            words = tuple(words)

        # 获取基础单词序列
        base = words[-N   1 :]
        # 返回在基础单词序列之后出现的唯一单词标记的数量
        return len([k[-1] for k in self.counts[N].keys() if k[:-1] == base])

    # 返回出现次数为 `C` 的唯一 `N` 元组标记的数量
    def _num_grams_with_count(self, C, N):
        # 确保出现次数大于 0
        assert C > 0
        # 检查是否存在 N 元组的计数
        assert N in self.counts, "You do not have counts for {}-grams".format(N)
        # 为将来的调用缓存计数值
        if not hasattr(self, "_NC"):
            self._NC = {N: {} for N in range(1, self.N   1)}
        # 如果计数值不在缓存中,则计算并存储
        if C not in self._NC[N]:
            self._NC[N][C] = len([k for k, v in self.counts[N].items() if v == C])
        return self._NC[N][C]

    @abstractmethod
    # 计算在未平滑、最大似然的 `N` 元语言模型下,单词序列的对数概率
    def log_prob(self, words, N):
        raise NotImplementedError

    @abstractmethod
    # 返回 `ngram` 的未平滑对数概率
    def _log_ngram_prob(self, ngram):
        raise NotImplementedError
class MLENGram(NGramBase):
    # MLENGram 类继承自 NGramBase 类,表示一个简单的未平滑的 N-gram 语言模型

    def __init__(self, N, unk=True, filter_stopwords=True, filter_punctuation=True):
        """
        A simple, unsmoothed N-gram model.

        Parameters
        ----------
        N : int
            The maximum length (in words) of the context-window to use in the
            langauge model. Model will compute all n-grams from 1, ..., N.
        unk : bool
            Whether to include the ``<unk>`` (unknown) token in the LM. Default
            is True.
        filter_stopwords : bool
            Whether to remove stopwords before training. Default is True.
        filter_punctuation : bool
            Whether to remove punctuation before training. Default is True.
        """
        # 初始化函数,设置模型的参数

        super().__init__(N, unk, filter_stopwords, filter_punctuation)

        # 设置模型的超参数
        self.hyperparameters["id"] = "MLENGram"

    def log_prob(self, words, N):
        """
        Compute the log probability of a sequence of words under the
        unsmoothed, maximum-likelihood `N`-gram language model.

        Parameters
        ----------
        words : list of strings
            A sequence of words
        N : int
            The gram-size of the language model to use when calculating the log
            probabilities of the sequence

        Returns
        -------
        total_prob : float
            The total log-probability of the sequence `words` under the
            `N`-gram language model
        """
        # 计算给定序列在未平滑的最大似然 N-gram 语言模型下的对数概率
        return self._log_prob(words, N)

    def _log_ngram_prob(self, ngram):
        """Return the unsmoothed log probability of the ngram"""
        # 返回 ngram 的未平滑对数概率
        N = len(ngram)
        num = self.counts[N][ngram]
        den = self.counts[N - 1][ngram[:-1]] if N > 1 else self.n_words[1]
        return np.log(num) - np.log(den) if (den > 0 and num > 0) else -np.inf


class AdditiveNGram(NGramBase):
    def __init__(
        self, N, K=1, unk=True, filter_stopwords=True, filter_punctuation=True,
    ):
        """
        An N-Gram model with smoothed probabilities calculated via additive /
        Lidstone smoothing.

        Notes
        -----
        The resulting estimates correspond to the expected value of the
        posterior, `p(ngram_prob | counts)`, when using a symmetric Dirichlet
        prior on counts with parameter `K`.

        Parameters
        ----------
        N : int
            The maximum length (in words) of the context-window to use in the
            langauge model. Model will compute all n-grams from 1, ..., N
        K : float
            The pseudocount to add to each observation. Larger values allocate
            more probability toward unseen events. When `K` = 1, the model is
            known as Laplace smoothing.  When `K` = 0.5, the model is known as
            expected likelihood estimation (ELE) or the Jeffreys-Perks law.
            Default is 1.
        unk : bool
            Whether to include the ``<unk>`` (unknown) token in the LM. Default
            is True.
        filter_stopwords : bool
            Whether to remove stopwords before training. Default is True.
        filter_punctuation : bool
            Whether to remove punctuation before training. Default is True.
        """
        # 调用父类的初始化方法,传入参数 N, unk, filter_stopwords, filter_punctuation
        super().__init__(N, unk, filter_stopwords, filter_punctuation)

        # 设置模型的超参数 id 为 "AdditiveNGram"
        self.hyperparameters["id"] = "AdditiveNGram"
        # 设置模型的超参数 K 为传入的参数 K
        self.hyperparameters["K"] = K
    def log_prob(self, words, N):
        r"""
        Compute the smoothed log probability of a sequence of words under the
        `N`-gram language model with additive smoothing.

        Notes
        -----
        For a bigram, additive smoothing amounts to:

        .. math::

            P(w_i mid w_{i-1}) = frac{A   K}{B   KV}

        where

        .. math::

            A  &=  text{Count}(w_{i-1}, w_i) \
            B  &=  sum_j text{Count}(w_{i-1}, w_j) \
            V  &= |{ w_j  :  text{Count}(w_{i-1}, w_j) > 0 }|

        This is equivalent to pretending we've seen every possible `N`-gram
        sequence at least `K` times.

        Additive smoothing can be problematic, as it:
            - Treats each predicted word in the same way
            - Can assign too much probability mass to unseen `N`-grams

        Parameters
        ----------
        words : list of strings
            A sequence of words.
        N : int
            The gram-size of the language model to use when calculating the log
            probabilities of the sequence.

        Returns
        -------
        total_prob : float
            The total log-probability of the sequence `words` under the
            `N`-gram language model.
        """
        # 调用内部方法计算给定序列的平滑对数概率
        return self._log_prob(words, N)

    def _log_ngram_prob(self, ngram):
        """Return the smoothed log probability of the ngram"""
        # 获取 ngram 的长度
        N = len(ngram)
        # 获取超参数 K
        K = self.hyperparameters["K"]
        # 获取各种计数和词汇量
        counts, n_words, n_tokens = self.counts, self.n_words[1], self.n_tokens[1]

        # 获取 ngram 的上下文
        ctx = ngram[:-1]
        # 计算分子
        num = counts[N][ngram]   K
        # 计算上下文的计数
        ctx_count = counts[N - 1][ctx] if N > 1 else n_words
        # 计算分母
        den = ctx_count   K * n_tokens
        # 返回平滑后的对数概率,如果分母为零则返回负无穷
        return np.log(num / den) if den != 0 else -np.inf
# 定义一个 GoodTuringNGram 类,继承自 NGramBase 类
class GoodTuringNGram(NGramBase):
    # 初始化方法,接受多个参数
    def __init__(
        self, N, conf=1.96, unk=True, filter_stopwords=True, filter_punctuation=True,
    ):
        """
        An N-Gram model with smoothed probabilities calculated with the simple
        Good-Turing estimator from Gale (2001).

        Parameters
        ----------
        N : int
            The maximum length (in words) of the context-window to use in the
            langauge model. Model will compute all n-grams from 1, ..., N.
        conf: float
            The multiplier of the standard deviation of the empirical smoothed
            count (the default, 1.96, corresponds to a 95% confidence
            interval). Controls how many datapoints are smoothed using the
            log-linear model.
        unk : bool
            Whether to include the ``<unk>`` (unknown) token in the LM. Default
            is True.
        filter_stopwords : bool
            Whether to remove stopwords before training. Default is True.
        filter_punctuation : bool
            Whether to remove punctuation before training. Default is True.
        """
        # 调用父类的初始化方法,传入参数 N, unk, filter_stopwords, filter_punctuation
        super().__init__(N, unk, filter_stopwords, filter_punctuation)

        # 设置超参数字典中的 id 键值对为 "GoodTuringNGram"
        self.hyperparameters["id"] = "GoodTuringNGram"
        # 设置超参数字典中的 conf 键值对为传入的 conf 参数值
        self.hyperparameters["conf"] = conf
    # 训练语言模型,统计文本语料库中的 n-gram 计数。完成后,self.counts 属性将存储 N、N-1、...、1-gram 计数的字典。
    def train(self, corpus_fp, vocab=None, encoding=None):
        """
        编译 `corpus_fp` 中文本的 n-gram 计数。完成后,`self.counts` 属性将存储 `N`、`N-1`、...、1-gram 计数的字典。

        Parameters
        ----------
        corpus_fp : str
            新行分隔的文本语料库文件的路径
        vocab : :class:`~numpy_ml.preprocessing.nlp.Vocabulary` 实例或 None。
            如果不是 None,则只使用 `vocab` 中的单词来构建语言模型;所有超出词汇表的单词将被映射到 `<unk>`(如果 `self.unk = True`)或删除(如果 `self.unk = False`)。默认为 None。
        encoding : str  or None
            指定语料库的文本编码。常见的条目有 'utf-8'、'utf-8-sig'、'utf-16'。默认为 None。
        """
        # 调用 _train 方法,训练语言模型
        self._train(corpus_fp, vocab=vocab, encoding=encoding)
        # 计算平滑后的计数
        self._calc_smoothed_counts()
    # 计算在具有 Good-Turing 平滑的 N-gram 语言模型下,给定单词序列的平滑对数概率
    def log_prob(self, words, N):
        r"""
        Compute the smoothed log probability of a sequence of words under the
        `N`-gram language model with Good-Turing smoothing.

        Notes
        -----
        For a bigram, Good-Turing smoothing amounts to:

        .. math::

            P(w_i mid w_{i-1}) = frac{C^*}{text{Count}(w_{i-1})}

        where :math:`C^*` is the Good-Turing smoothed estimate of the bigram
        count:

        .. math::

            C^* = frac{(c   1) text{NumCounts}(c   1, 2)}{text{NumCounts}(c, 2)}

        where

        .. math::

            c  &=  text{Count}(w_{i-1}, w_i) \
            text{NumCounts}(r, k)  &=
                |{ ktext{-gram} : text{Count}(ktext{-gram}) = r }|

        In words, the probability of an `N`-gram that occurs `r` times in the
        corpus is estimated by dividing up the probability mass occupied by
        N-grams that occur `r 1` times.

        For large values of `r`, NumCounts becomes unreliable. In this case, we
        compute a smoothed version of NumCounts using a power law function:

        .. math::

            log text{NumCounts}(r) = b   a log r

        Under the Good-Turing estimator, the total probability assigned to
        unseen `N`-grams is equal to the relative occurrence of `N`-grams that
        appear only once.

        Parameters
        ----------
        words : list of strings
            A sequence of words.
        N : int
            The gram-size of the language model to use when calculating the log
            probabilities of the sequence.

        Returns
        -------
        total_prob : float
            The total log-probability of the sequence `words` under the
            `N`-gram language model.
        """
        # 调用内部方法 _log_prob 来计算给定单词序列在 N-gram 语言模型下的总对数概率
        return self._log_prob(words, N)
    # 计算 ngram 的平滑对数概率并返回
    def _log_ngram_prob(self, ngram):
        """Return the smoothed log probability of the ngram"""
        N = len(ngram)
        sc, T = self._smooth_counts[N], self._smooth_totals[N]
        n_tokens, n_seen = self.n_tokens[N], len(self.counts[N])

        # 计算未出现在词汇表中的 ngram 的概率(即 p0 的一部分)
        n_unseen = max((n_tokens ** N) - n_seen, 1)
        prob = np.log(self._p0[N] / n_unseen)

        # 如果 ngram 在计数中存在,则重新计算概率
        if ngram in self.counts[N]:
            C = self.counts[N][ngram]
            prob = np.log(1 - self._p0[N])   np.log(sc[C]) - np.log(T)
        return prob

    # 拟合计数模型
    def _fit_count_models(self):
        """
        Perform the averaging transform proposed by Church and Gale (1991):
        estimate the expected count-of-counts by the *density* of
        count-of-count values.
        """
        self._count_models = {}
        NC = self._num_grams_with_count
        for N in range(1, self.N   1):
            X, Y = [], []
            sorted_counts = sorted(set(self.counts[N].values()))  # r

            # 计算平均转换后的值
            for ix, j in enumerate(sorted_counts):
                i = 0 if ix == 0 else sorted_counts[ix - 1]
                k = 2 * j - i if ix == len(sorted_counts) - 1 else sorted_counts[ix   1]
                y = 2 * NC(j, N) / (k - i)
                X.append(j)
                Y.append(y)

            # 拟合对数线性模型:log(counts) ~ log(average_transform(counts))
            self._count_models[N] = LinearRegression(fit_intercept=True)
            self._count_models[N].fit(np.log(X), np.log(Y))
            b, a = self._count_models[N].beta

            # 如果斜率大于 -1,则输出警告
            if a > -1:
                fstr = "[Warning] Log-log averaging transform has slope > -1 for N={}"
                print(fstr.format(N))

N-Gram Sequence Models

The ngram.py module implements n-gram models with different smoothing techniques:

  • Maximum likelihood (no smoothing)
  • Additive smoothing (incl. Laplace smoothing, expected likelihood estimation, etc.)
  • Simple Good-Turing smoothing (Gale, 1995)

Plots

numpy-mlnumpy_mlngram__init__.py

代码语言:javascript复制
# 从当前目录中导入 ngram 模块
from .ngram import *

numpy-mlnumpy_mlnonparametricgp.py

代码语言:javascript复制
# 导入警告模块
import warnings
# 导入 numpy 库并重命名为 np
import numpy as np
# 从 numpy.linalg 模块中导入 slogdet 和 inv 函数
from numpy.linalg import slogdet, inv

# 尝试导入 scipy.stats 模块,如果失败则将 _SCIPY 设为 False
try:
    _SCIPY = True
    # 从 scipy.stats 模块中导入 norm 函数
    from scipy.stats import norm
except:
    _SCIPY = False
    # 如果导入失败,则发出警告
    warnings.warn(
        "Could not import scipy.stats. Confidence scores "
        "for GPRegression are restricted to 95% bounds"
    )

# 从自定义模块中导入 KernelInitializer 类
from ..utils.kernels import KernelInitializer

# 定义 GPRegression 类
class GPRegression:
    def __init__(self, kernel="RBFKernel", alpha=1e-10):
        """
        A Gaussian Process (GP) regression model.

        .. math::

            y mid X, f  &sim  mathcal{N}( [f(x_1), ldots, f(x_n)], \alpha I ) \\
            f mid X     &sim  \text{GP}(0, K)

        for data :math:`D = {(x_1, y_1), ldots, (x_n, y_n) }` and a covariance matrix :math:`K_{ij}
        = \text{kernel}(x_i, x_j)` for all :math:`i, j in {1, ldots, n }`.

        Parameters
        ----------
        kernel : str
            The kernel to use in fitting the GP prior. Default is 'RBFKernel'.
        alpha : float
            An isotropic noise term for the diagonal in the GP covariance, `K`.
            Larger values correspond to the expectation of greater noise in the
            observed data points. Default is 1e-10.
        """
        # 使用 KernelInitializer 类初始化 kernel 属性
        self.kernel = KernelInitializer(kernel)()
        # 初始化 parameters 和 hyperparameters 字典
        self.parameters = {"GP_mean": None, "GP_cov": None, "X": None}
        self.hyperparameters = {"kernel": str(self.kernel), "alpha": alpha}
    # 定义一个方法用于拟合高斯过程到训练数据
    def fit(self, X, y):
        """
        Fit the GP prior to the training data.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            A training dataset of `N` examples, each with dimensionality `M`.
        y : :py:class:`ndarray <numpy.ndarray>` of shape `(N, O)`
            A collection of real-valued training targets for the
            examples in `X`, each with dimension `O`.
        """
        # 初始化均值向量为零向量
        mu = np.zeros(X.shape[0])
        # 计算训练数据集的协方差矩阵
        K = self.kernel(X, X)

        # 将训练数据、训练目标、协方差矩阵和均值向量保存到参数字典中
        self.parameters["X"] = X
        self.parameters["y"] = y
        self.parameters["GP_cov"] = K
        self.parameters["GP_mean"] = mu
    # 从高斯过程的先验或后验预测分布中抽样函数

    def sample(self, X, n_samples=1, dist="posterior_predictive"):
        """
        Sample functions from the GP prior or posterior predictive
        distribution.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            The collection of datapoints to generate predictions on. Only used if
            `dist` = 'posterior_predictive'.
        n_samples: int
            The number of samples to generate. Default is 1.
        dist : {"posterior_predictive", "prior"}
            The distribution to draw samples from. Default is
            "posterior_predictive".

        Returns
        -------
        samples : :py:class:`ndarray <numpy.ndarray>` of shape `(n_samples, O, N)`
            The generated samples for the points in `X`.
        """
        
        # 从 numpy 中导入多元正态分布函数
        mvnorm = np.random.multivariate_normal

        # 如果选择从先验分布中抽样
        if dist == "prior":
            # 初始化均值为零向量
            mu = np.zeros((X.shape[0], 1))
            # 计算协方差矩阵
            cov = self.kernel(X, X)
        # 如果选择从后验预测分布中抽样
        elif dist == "posterior_predictive":
            # 调用 predict 方法获取均值、标准差和协方差矩阵
            mu, _, cov = self.predict(X, return_cov=True)
        else:
            # 如果选择的分布不是先验或后验预测,则引发 ValueError 异常
            raise ValueError("Unrecognized dist: '{}'".format(dist))

        # 如果均值是一维数组,则将其转换为二维数组
        if mu.ndim == 1:
            mu = mu[:, np.newaxis]

        # 生成 n_samples 个样本,每个样本的均值和协方差由 mu 和 cov 决定
        samples = np.array([mvnorm(_mu, cov, size=n_samples) for _mu in mu.T])
        # 调整数组维度,使得第一个维度是样本数量
        return samples.swapaxes(0, 1)

numpy-mlnumpy_mlnonparametrickernel_regression.py

代码语言:javascript复制
from ..utils.kernels import KernelInitializer
# 导入 KernelInitializer 模块

class KernelRegression:
    def __init__(self, kernel=None):
        """
        A Nadaraya-Watson kernel regression model.

        Notes
        -----
        The Nadaraya-Watson regression model is

        .. math::

            f(x) = sum_i w_i(x) y_i

        where the sample weighting functions, :math:`w_i`, are simply

        .. math::

            w_i(x) = \frac{k(x, x_i)}{sum_j k(x, x_j)}

        with `k` being the kernel function.

        Observe that `k`-nearest neighbors
        (:class:`~numpy_ml.nonparametric.KNN`) regression is a special case of
        kernel regression where the `k` closest observations have a weight
        `1/k`, and all others have weight 0.

        Parameters
        ----------
        kernel : str, :doc:`Kernel <numpy_ml.utils.kernels>` object, or dict
            The kernel to use. If None, default to
            :class:`~numpy_ml.utils.kernels.LinearKernel`. Default is None.
        """
        self.parameters = {"X": None, "y": None}
        # 初始化参数字典,包含键值对 "X": None, "y": None
        self.hyperparameters = {"kernel": str(kernel)}
        # 初始化超参数字典,包含键值对 "kernel": str(kernel)
        self.kernel = KernelInitializer(kernel)()
        # 使用 KernelInitializer 初始化 kernel 对象

    def fit(self, X, y):
        """
        Fit the regression model to the data and targets in `X` and `y`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            An array of N examples to generate predictions on
        y : :py:class:`ndarray <numpy.ndarray>` of shape `(N, ...)`
            Predicted targets for the `N` rows in `X`
        """
        self.parameters = {"X": X, "y": y}
        # 更新参数字典,将 "X" 和 "y" 更新为传入的 X 和 y
    # 为模型对象定义一个预测方法,用于生成与输入数据 `X` 相关的目标预测值
    def predict(self, X):
        """
        Generate predictions for the targets associated with the rows in `X`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N', M')`
            An array of `N'` examples to generate predictions on

        Returns
        -------
        y : :py:class:`ndarray <numpy.ndarray>` of shape `(N', ...)`
            Predicted targets for the `N'` rows in `X`
        """
        # 获取模型对象中的核函数和参数
        K = self.kernel
        P = self.parameters
        # 计算输入数据 `X` 与参数中的训练数据之间的相似度
        sim = K(P["X"], X)
        # 返回预测值,计算方法为相似度乘以参数中的目标值,然后按列求和并除以相似度的列和
        return (sim * P["y"][:, None]).sum(axis=0) / sim.sum(axis=0)

numpy-mlnumpy_mlnonparametricknn.py

代码语言:javascript复制
# 一个用于分类和回归的 k-最近邻(KNN)模型
from collections import Counter
import numpy as np
from ..utils.data_structures import BallTree

class KNN:
    def __init__(
        self, k=5, leaf_size=40, classifier=True, metric=None, weights="uniform",
    ):
        """
        一个基于球树的 `k`-最近邻(kNN)模型,用于高效计算。

        参数
        ----------
        k : int
            预测时要使用的邻居数量。默认为 5。
        leaf_size : int
            球树中每个叶子节点上的最大数据点数量。默认为 40。
        classifier : bool
            是否将 Y 中的值视为类标签(classifier = True)或实值目标(classifier = False)。默认为 True。
        metric : :doc:`距离度量 <numpy_ml.utils.distance_metrics>` 或 None
            用于计算最近邻的距离度量。如果为 None,则默认使用 :func:`~numpy_ml.utils.distance_metrics.euclidean` 距离度量。默认为 None。
        weights : {'uniform', 'distance'}
            如何对每个邻居的预测进行加权。'uniform' 为每个邻居分配均匀权重,而 'distance' 分配的权重与查询点的距离的倒数成比例。默认为 'uniform'。
        """
        # 使用 leaf_size 和 metric 创建一个球树对象
        self._ball_tree = BallTree(leaf_size=leaf_size, metric=metric)
        # 存储模型的超参数
        self.hyperparameters = {
            "id": "KNN",
            "k": k,
            "leaf_size": leaf_size,
            "classifier": classifier,
            "metric": str(metric),
            "weights": weights,
        }
    # 将模型拟合到数据和目标值`X`和`y`中
    def fit(self, X, y):
        r"""
        Fit the model to the data and targets in `X` and `y`

        Parameters
        ----------
        X : numpy array of shape `(N, M)`
            An array of `N` examples to generate predictions on.
        y : numpy array of shape `(N, *)`
            Targets for the `N` rows in `X`.
        """
        # 检查输入数据`X`的维度是否为2
        if X.ndim != 2:
            raise Exception("X must be two-dimensional")
        # 使用BallTree模型拟合数据`X`和目标值`y`
        self._ball_tree.fit(X, y)
    # 为给定输入数据集 X 生成预测结果
    def predict(self, X):
        r"""
        Generate predictions for the targets associated with the rows in `X`.

        Parameters
        ----------
        X : numpy array of shape `(N', M')`
            An array of `N'` examples to generate predictions on.

        Returns
        -------
        y : numpy array of shape `(N', *)`
            Predicted targets for the `N'` rows in `X`.
        """
        # 初始化一个空列表用于存储预测结果
        predictions = []
        # 获取超参数
        H = self.hyperparameters
        # 遍历输入数据集 X 中的每一行
        for x in X:
            # 初始化预测值为 None
            pred = None
            # 获取最近的 k 个邻居
            nearest = self._ball_tree.nearest_neighbors(H["k"], x)
            # 获取邻居的目标值
            targets = [n.val for n in nearest]

            # 如果是分类器
            if H["classifier"]:
                # 如果权重为 "uniform"
                if H["weights"] == "uniform":
                    # 对于与 sklearn / scipy.stats.mode 一致性,返回在出现平局时最小的类别 ID
                    counts = Counter(targets).most_common()
                    pred, _ = sorted(counts, key=lambda x: (-x[1], x[0]))[0]
                # 如果权重为 "distance"
                elif H["weights"] == "distance":
                    best_score = -np.inf
                    for label in set(targets):
                        scores = [1 / n.distance for n in nearest if n.val == label]
                        pred = label if np.sum(scores) > best_score else pred
            # 如果不是分类器
            else:
                # 如果权重为 "uniform"
                if H["weights"] == "uniform":
                    pred = np.mean(targets)
                # 如果权重为 "distance"
                elif H["weights"] == "distance":
                    weights = [1 / n.distance for n in nearest]
                    pred = np.average(targets, weights=weights)
            # 将预测结果添加到列表中
            predictions.append(pred)
        # 将预测结果转换为 numpy 数组并返回
        return np.array(predictions)

Nonparametric Models

The nonparametric module implements several popular nonparameteric regression and classification models.

  • kernel_regression.py implements Nadaraya-Watson kernel regression (Nadaraya, 1964; Watson, 1964)
  • knn.py implements k-nearest neighbors regression and classification models using a ball-tree
  • gp.py implements Gaussian process regression / simple kriging (Krige, 1951; Matheron, 1963; Williams & Rasmussen, 1996)

Plots

k-Nearest Neighbors

Nadaraya-Watson Kernel Regression

Gaussian Process Regression

numpy-mlnumpy_mlnonparametric__init__.py

代码语言:javascript复制
"""
Popular nonparameteric regression and classification models.

The nonparametric module contains an assortment of nonparametric models that
don't fit elsewhere in the package. For other nonparametric models, see the
``numpy_ml.trees`` module.
"""
"""

# 导入高斯过程模型
from .gp import *
# 导入K最近邻模型
from .knn import *
# 导入核回归模型
from .kernel_regression import *

numpy-mlnumpy_mlplotsbandit_plots.py

代码语言:javascript复制
# 导入必要的库
from collections import namedtuple

import numpy as np

# 导入多臂赌博机相关的类和函数
from numpy_ml.bandits import (
    MultinomialBandit,
    BernoulliBandit,
    ShortestPathBandit,
    ContextualLinearBandit,
)
from numpy_ml.bandits.trainer import BanditTrainer
from numpy_ml.bandits.policies import (
    EpsilonGreedy,
    UCB1,
    ThompsonSamplingBetaBinomial,
    LinUCB,
)
from numpy_ml.utils.graphs import random_DAG, DiGraph, Edge


# 生成一个随机的多项式多臂赌博机环境
def random_multinomial_mab(n_arms=10, n_choices_per_arm=5, reward_range=[0, 1]):
    """Generate a random multinomial multi-armed bandit environemt"""
    payoffs = []
    payoff_probs = []
    lo, hi = reward_range
    for a in range(n_arms):
        p = np.random.uniform(size=n_choices_per_arm)
        p = p / p.sum()
        r = np.random.uniform(low=lo, high=hi, size=n_choices_per_arm)

        payoffs.append(list(r))
        payoff_probs.append(list(p))

    return MultinomialBandit(payoffs, payoff_probs)


# 生成一个随机的伯努利多臂赌博机环境
def random_bernoulli_mab(n_arms=10):
    """Generate a random Bernoulli multi-armed bandit environemt"""
    p = np.random.uniform(size=n_arms)
    payoff_probs = p / p.sum()
    return BernoulliBandit(payoff_probs)


# 在一个随机的多项式多臂赌博机环境上评估 epsilon-greedy 策略
def plot_epsilon_greedy_multinomial_payoff():
    """
    Evaluate an epsilon-greedy policy on a random multinomial bandit
    problem
    """
    np.random.seed(12345)
    N = np.random.randint(2, 30)  # n arms
    K = np.random.randint(2, 10)  # n payoffs / arm
    ep_length = 1

    rrange = [0, 1]
    n_duplicates = 5
    n_episodes = 5000

    mab = random_multinomial_mab(N, K, rrange)
    policy = EpsilonGreedy(epsilon=0.05, ev_prior=rrange[1] / 2)
    policy = BanditTrainer().train(policy, mab, ep_length, n_episodes, n_duplicates)


# 在一个多项式多臂赌博机环境上评估 UCB1 策略
def plot_ucb1_multinomial_payoff():
    """Evaluate the UCB1 policy on a multinomial bandit environment"""
    np.random.seed(12345)
    N = np.random.randint(2, 30)  # n arms
    K = np.random.randint(2, 10)  # n payoffs / arm
    # 设置每个回合的长度为1
    ep_length = 1

    # 设置探索参数C为1,奖励范围为[0, 1],重复次数为5,总回合数为5000
    C = 1
    rrange = [0, 1]
    n_duplicates = 5
    n_episodes = 5000

    # 使用随机多项式分布生成多臂赌博机
    mab = random_multinomial_mab(N, K, rrange)
    # 使用UCB1算法初始化策略,设置探索参数C和先验期望值为奖励范围的上限的一半
    policy = UCB1(C=C, ev_prior=rrange[1] / 2)
    # 使用BanditTrainer训练策略,传入策略、多臂赌博机、回合长度、总回合数和重复次数
    policy = BanditTrainer().train(policy, mab, ep_length, n_episodes, n_duplicates)
def plot_thompson_sampling_beta_binomial_payoff():
    """
    Evaluate the ThompsonSamplingBetaBinomial policy on a random Bernoulli
    multi-armed bandit.
    """
    # 设置随机种子
    np.random.seed(12345)
    # 随机生成臂的数量
    N = np.random.randint(2, 30)  # n arms
    ep_length = 1

    n_duplicates = 5
    n_episodes = 5000

    # 创建随机伯努利多臂老虎机
    mab = random_bernoulli_mab(N)
    # 创建 ThompsonSamplingBetaBinomial 策略
    policy = ThompsonSamplingBetaBinomial(alpha=1, beta=1)
    # 训练策略
    policy = BanditTrainer().train(policy, mab, ep_length, n_episodes, n_duplicates)


def plot_lin_ucb():
    """Plot the linUCB policy on a contextual linear bandit problem"""
    # 设置随机种子
    np.random.seed(12345)
    ep_length = 1
    # 随机生成 K 和 D
    K = np.random.randint(2, 25)
    D = np.random.randint(2, 10)

    n_duplicates = 5
    n_episodes = 5000

    # 创建上下文线性老虎机
    cmab = ContextualLinearBandit(K, D, 1)
    # 创建 LinUCB 策略
    policy = LinUCB(alpha=1)
    # 训练策略
    policy = BanditTrainer().train(policy, cmab, ep_length, n_episodes, n_duplicates)


def plot_ucb1_gaussian_shortest_path():
    """
    Plot the UCB1 policy on a graph shortest path problem each edge weight
    drawn from an independent univariate Gaussian
    """
    # 设置随机种子
    np.random.seed(12345)

    ep_length = 1
    n_duplicates = 5
    n_episodes = 5000
    p = np.random.rand()
    n_vertices = np.random.randint(5, 15)

    Gaussian = namedtuple("Gaussian", ["mean", "variance", "EV", "sample"])

    # create randomly-weighted edges
    print("Building graph")
    E = []
    G = random_DAG(n_vertices, p)
    V = G.vertices
    for e in G.edges:
        mean, var = np.random.uniform(0, 1), np.random.uniform(0, 1)
        w = lambda: np.random.normal(mean, var)  # noqa: E731
        rv = Gaussian(mean, var, mean, w)
        E.append(Edge(e.fr, e.to, rv))

    G = DiGraph(V, E)
    while not G.path_exists(V[0], V[-1]):
        print("Skipping")
        idx = np.random.randint(0, len(V))
        V[idx], V[-1] = V[-1], V[idx]

    # 创建最短路径老虎机
    mab = ShortestPathBandit(G, V[0], V[-1])
    # 创建 UCB1 策略
    policy = UCB1(C=1, ev_prior=0.5)
    # 使用BanditTrainer类的train方法对策略进行训练,传入策略、多臂赌博机、每个episode的长度、总共的episode数以及重复次数作为参数
    policy = BanditTrainer().train(policy, mab, ep_length, n_episodes, n_duplicates)
# 定义一个函数用于比较不同策略在相同赌博机问题上的表现
def plot_comparison():
    # 设置随机种子
    np.random.seed(1234)
    # 设置每个回合的长度
    ep_length = 1
    # 设置赌博机的数量
    K = 10

    # 设置重复次数
    n_duplicates = 5
    # 设置回合数
    n_episodes = 5000

    # 创建一个随机伯努利赌博机
    cmab = random_bernoulli_mab(n_arms=K)
    # 创建三种不同的策略
    policy1 = EpsilonGreedy(epsilon=0.05, ev_prior=0.5)
    policy2 = UCB1(C=1, ev_prior=0.5)
    policy3 = ThompsonSamplingBetaBinomial(alpha=1, beta=1)
    policies = [policy1, policy2, policy3]

    # 使用BanditTrainer类的compare方法比较不同策略的表现
    BanditTrainer().compare(
        policies, cmab, ep_length, n_episodes, n_duplicates,
    )

numpy-mlnumpy_mlplotsgmm_plots.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入 numpy 库,并重命名为 np
import numpy as np
# 从 sklearn.datasets.samples_generator 模块中导入 make_blobs 函数
from sklearn.datasets.samples_generator import make_blobs
# 从 scipy.stats 模块中导入 multivariate_normal 函数
from scipy.stats import multivariate_normal
# 导入 matplotlib.pyplot 库,并重命名为 plt
import matplotlib.pyplot as plt
# 导入 seaborn 库,并重命名为 sns
import seaborn as sns

# 设置 seaborn 库的样式为白色
sns.set_style("white")
# 设置 seaborn 库的上下文为 paper,字体比例为 1
sns.set_context("paper", font_scale=1)

# 从 numpy_ml.gmm 模块中导入 GMM 类
from numpy_ml.gmm import GMM
# 从 matplotlib.colors 模块中导入 ListedColormap 类

# 定义函数 plot_countour,用于绘制等高线图
def plot_countour(X, x, y, z, ax, xlim, ylim):
    # 定义函数 fixed_aspect_ratio,用于设置 matplotlib 图的固定纵横比
    def fixed_aspect_ratio(ratio, ax):
        """
        Set a fixed aspect ratio on matplotlib plots
        regardless of axis units
        """
        # 获取当前图的 x 和 y 轴范围
        xvals, yvals = ax.get_xlim(), ax.get_ylim()

        # 计算 x 和 y 轴范围
        xrange = xvals[1] - xvals[0]
        yrange = yvals[1] - yvals[0]
        # 设置图的纵横比
        ax.set_aspect(ratio * (xrange / yrange), adjustable="box")

    # 在随机分布的数据点上绘制等高线
    ax.contour(x, y, z, 6, linewidths=0.5, colors="k")

    # 设置 x 和 y 轴的范围
    ax.set_xlim(*xlim)
    ax.set_ylim(*ylim)
    # 调用函数设置固定纵横比
    fixed_aspect_ratio(1, ax)
    return ax

# 定义函数 plot_clusters,用于绘制聚类结果
def plot_clusters(model, X, ax):
    # 获取聚类数目
    C = model.C

    # 计算 x 和 y 轴的范围
    xmin = min(X[:, 0]) - 0.1 * (max(X[:, 0]) - min(X[:, 0]))
    xmax = max(X[:, 0])   0.1 * (max(X[:, 0]) - min(X[:, 0]))
    ymin = min(X[:, 1]) - 0.1 * (max(X[:, 1]) - min(X[:, 1]))
    ymax = max(X[:, 1])   0.1 * (max(X[:, 1]) - min(X[:, 1]))

    # 遍历每个聚类
    for c in range(C):
        # 创建多元正态分布对象
        rv = multivariate_normal(model.mu[c], model.sigma[c], allow_singular=True)

        # 在 x 和 y 轴上生成均匀间隔的点
        x = np.linspace(xmin, xmax, 500)
        y = np.linspace(ymin, ymax, 500)

        # 生成网格点
        X1, Y1 = np.meshgrid(x, y)
        xy = np.column_stack([X1.flat, Y1.flat])

        # 计算网格点处的密度值
        Z = rv.pdf(xy).reshape(X1.shape)
        # 调用函数绘制等高线图
        ax = plot_countour(X, X1, Y1, Z, ax=ax, xlim=(xmin, xmax), ylim=(ymin, ymax))
        # 在图上标记聚类中心
        ax.plot(model.mu[c, 0], model.mu[c, 1], "ro")

    # 绘制数据点
    cm = ListedColormap(sns.color_palette().as_hex())
    # 获取每个数据点的聚类标签
    labels = model.Q.argmax(1)
    # 将标签列表转换为集合,去除重复的标签
    uniq = set(labels)
    # 遍历每个唯一的标签
    for i in uniq:
        # 根据标签值筛选出对应的数据点,并在散点图上绘制
        ax.scatter(X[labels == i, 0], X[labels == i, 1], c=cm.colors[i - 1], s=30)
    # 返回绘制好的散点图对象
    return ax
# 定义一个绘图函数
def plot():
    # 创建一个包含 4x4 子图的图形对象
    fig, axes = plt.subplots(4, 4)
    # 设置图形大小为 10x10 英寸
    fig.set_size_inches(10, 10)
    # 遍历所有子图
    for i, ax in enumerate(axes.flatten()):
        # 设置样本数量为 150,特征数量为 2,随机生成类别数量
        n_ex = 150
        n_in = 2
        n_classes = np.random.randint(2, 4)
        # 生成聚类数据
        X, y = make_blobs(
            n_samples=n_ex, centers=n_classes, n_features=n_in, random_state=i
        )
        # 数据中心化
        X -= X.mean(axis=0)

        # 进行 10 次运行,选择最佳拟合
        best_elbo = -np.inf
        for k in range(10):
            # 创建 GMM 模型对象
            _G = GMM(C=n_classes, seed=k * 3)
            # 拟合数据
            ret = _G.fit(X, max_iter=100, verbose=False)
            # 如果拟合失败,则重新拟合
            while ret != 0:
                print("Components collapsed; Refitting")
                ret = _G.fit(X, max_iter=100, verbose=False)

            # 选择最佳 ELBO 值的模型
            if _G.best_elbo > best_elbo:
                best_elbo = _G.best_elbo
                G = _G

        # 绘制聚类结果
        ax = plot_clusters(G, X, ax)
        # 设置 x 轴刻度标签为空
        ax.xaxis.set_ticklabels([])
        # 设置 y 轴刻度标签为空
        ax.yaxis.set_ticklabels([])
        # 设置子图标题,显示类别数量和最终 ELBO 值
        ax.set_title("# Classes: {}; Final VLB: {:.2f}".format(n_classes, G.best_elbo))

    # 调整子图布局
    plt.tight_layout()
    # 保存图形为图片文件
    plt.savefig("img/plot.png", dpi=300)
    # 关闭所有图形对象
    plt.close("all")

numpy-mlnumpy_mlplotshmm_plots.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入 numpy 库,并重命名为 np
import numpy as np
# 从 matplotlib 库中导入 pyplot 模块,并重命名为 plt
from matplotlib import pyplot as plt
# 从 seaborn 库中导入 sns 模块
import seaborn as sns

# 设置 seaborn 库的样式为白色
sns.set_style("white")
# 设置 seaborn 库的上下文为 notebook,字体比例为 0.8
sns.set_context("notebook", font_scale=0.8)

# 从 hmmlearn.hmm 模块中导入 MultinomialHMM 类,并重命名为 MHMM
from hmmlearn.hmm import MultinomialHMM as MHMM
# 从 numpy_ml.hmm 模块中导入 MultinomialHMM 类
from numpy_ml.hmm import MultinomialHMM

# 定义生成训练数据的函数
def generate_training_data(params, n_steps=500, n_examples=15):
    # 创建 MultinomialHMM 对象
    hmm = MultinomialHMM(A=params["A"], B=params["B"], pi=params["pi"])

    # 生成新的序列
    observations = []
    for i in range(n_examples):
        # 生成潜在状态和观测值序列
        latent, obs = hmm.generate(
            n_steps, params["latent_states"], params["obs_types"]
        )
        # 断言潜在状态和观测值序列的长度与指定长度相同
        assert len(latent) == len(obs) == n_steps
        observations.append(obs)

    # 将观测值序列转换为 numpy 数组
    observations = np.array(observations)
    return observations

# 定义默认的 HMM 模型参数
def default_hmm():
    obs_types = [0, 1, 2, 3]
    latent_states = ["H", "C"]

    # 计算派生变量
    V = len(obs_types)
    N = len(latent_states)

    # 定义一个非常简单的 HMM 模型,包含 T=3 个观测值
    O = np.array([1, 3, 1]).reshape(1, -1)
    A = np.array([[0.9, 0.1], [0.5, 0.5]])
    B = np.array([[0.2, 0.7, 0.09, 0.01], [0.1, 0.0, 0.8, 0.1]])
    pi = np.array([0.75, 0.25])

    return {
        "latent_states": latent_states,
        "obs_types": obs_types,
        "V": V,
        "N": N,
        "O": O,
        "A": A,
        "B": B,
        "pi": pi,
    }

# 绘制矩阵图
def plot_matrices(params, best, best_theirs):
    cmap = "copper"
    ll_mine, best = best
    ll_theirs, best_theirs = best_theirs

    # 创建包含 3x3 子图的图形对象
    fig, axes = plt.subplots(3, 3)
    axes = {
        "A": [axes[0, 0], axes[0, 1], axes[0, 2]],
        "B": [axes[1, 0], axes[1, 1], axes[1, 2]],
        "pi": [axes[2, 0], axes[2, 1], axes[2, 2]],
    }
    # 遍历包含参数名称和对应标题的元组列表
    for k, tt in [("A", "Transition"), ("B", "Emission"), ("pi", "Prior")]:
        # 获取真实值、估计值和第三方库估计值的轴对象
        true_ax, est_ax, est_theirs_ax = axes[k]
        true, est, est_theirs = params[k], best[k], best_theirs[k]

        # 如果参数是 "pi",则将其形状调整为列向量
        if k == "pi":
            true = true.reshape(-1, 1)
            est = est.reshape(-1, 1)
            est_theirs = est_theirs.reshape(-1, 1)

        # 绘制真实值的热力图
        true_ax = sns.heatmap(
            true,
            vmin=0.0,
            vmax=1.0,
            fmt=".2f",
            cmap=cmap,
            cbar=False,
            annot=True,
            ax=true_ax,
            xticklabels=[],
            yticklabels=[],
            linewidths=0.25,
        )

        # 绘制估计值的热力图
        est_ax = sns.heatmap(
            est,
            vmin=0.0,
            vmax=1.0,
            fmt=".2f",
            ax=est_ax,
            cmap=cmap,
            annot=True,
            cbar=False,
            xticklabels=[],
            yticklabels=[],
            linewidths=0.25,
        )

        # 绘制第三方库估计值的热力图
        est_theirs_ax = sns.heatmap(
            est_theirs,
            vmin=0.0,
            vmax=1.0,
            fmt=".2f",
            cmap=cmap,
            annot=True,
            cbar=False,
            xticklabels=[],
            yticklabels=[],
            linewidths=0.25,
            ax=est_theirs_ax,
        )

        # 设置真实值轴的标题
        true_ax.set_title("{} (True)".format(tt))
        # 设置估计值轴的标题
        est_ax.set_title("{} (Mine)".format(tt))
        # 设置第三方库估计值轴的标题
        est_theirs_ax.set_title("{} (hmmlearn)".format(tt))
    # 设置整体图的标题,包括自己的对数似然和第三方库的对数似然
    fig.suptitle("LL (mine): {:.2f}, LL (hmmlearn): {:.2f}".format(ll_mine, ll_theirs))
    # 调整布局
    plt.tight_layout(rect=[0, 0.03, 1, 0.95])
    # 保存图像
    plt.savefig("img/plot.png", dpi=300)
    # 关闭图像
    plt.close()
# 定义一个测试隐马尔可夫模型的函数
def test_HMM():
    # 设置随机种子,保证结果的可重复性
    np.random.seed(12345)
    # 设置打印选项,精度为5位小数,禁止科学计数法
    np.set_printoptions(precision=5, suppress=True)

    # 获取默认的隐马尔可夫模型参数
    P = default_hmm()
    ls, obs = P["latent_states"], P["obs_types"]

    # 生成一个新的序列
    O = generate_training_data(P, n_steps=30, n_examples=25)

    # 设置容差值
    tol = 1e-5
    n_runs = 5
    best, best_theirs = (-np.inf, []), (-np.inf, [])
    # 进行多次运行以找到最佳结果
    for _ in range(n_runs):
        # 初始化一个多项式隐马尔可夫模型
        hmm = MultinomialHMM()
        # 使用生成的序列拟合模型参数
        A_, B_, pi_ = hmm.fit(O, ls, obs, tol=tol, verbose=True)

        # 初始化一个他们自己实现的多项式隐马尔可夫模型
        theirs = MHMM(
            tol=tol,
            verbose=True,
            n_iter=int(1e9),
            transmat_prior=1,
            startprob_prior=1,
            algorithm="viterbi",
            n_components=len(ls),
        )

        # 将序列展平并拟合他们的模型
        O_flat = O.reshape(1, -1).flatten().reshape(-1, 1)
        theirs = theirs.fit(O_flat, lengths=[O.shape[1]] * O.shape[0])

        # 根据拟合的模型计算对数似然
        hmm2 = MultinomialHMM(A=A_, B=B_, pi=pi_)
        like = np.sum([hmm2.log_likelihood(obs) for obs in O])
        like_theirs = theirs.score(O_flat, lengths=[O.shape[1]] * O.shape[0])

        # 更新最佳结果
        if like > best[0]:
            best = (like, {"A": A_, "B": B_, "pi": pi_})

        if like_theirs > best_theirs[0]:
            best_theirs = (
                like_theirs,
                {
                    "A": theirs.transmat_,
                    "B": theirs.emissionprob_,
                    "pi": theirs.startprob_,
                },
            )
    # 打印最终的对数似然值
    print("Final log likelihood of sequence: {:.5f}".format(best[0]))
    print("Final log likelihood of sequence (theirs): {:.5f}".format(best_theirs[0]))
    # 绘制结果矩阵
    plot_matrices(P, best, best_theirs)

numpy-mlnumpy_mlplotslda_plots.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入 numpy 库,并使用别名 np
import numpy as np
# 导入 matplotlib.pyplot 库,并使用别名 plt
import matplotlib.pyplot as plt
# 导入 seaborn 库,并使用别名 sns
import seaborn as sns

# 设置 seaborn 图表样式为白色背景
sns.set_style("white")
# 设置 seaborn 上下文为 paper,字体比例为 1
sns.set_context("paper", font_scale=1)

# 设置随机种子为 12345
np.random.seed(12345)

# 从 numpy_ml.lda 模块中导入 LDA 类
from numpy_ml.lda import LDA

# 生成语料库的函数
def generate_corpus():
    # 生成一些虚假数据
    D = 300
    T = 10
    V = 30
    N = np.random.randint(150, 200, size=D)

    # 创建三种不同类型文档的文档-主题分布
    alpha1 = np.array((20, 15, 10, 1, 1, 1, 1, 1, 1, 1))
    alpha2 = np.array((1, 1, 1, 10, 15, 20, 1, 1, 1, 1))
    alpha3 = np.array((1, 1, 1, 1, 1, 1, 10, 12, 15, 18))

    # 任意选择每个主题有 3 个非常常见的诊断词
    # 这些词几乎不与其他主题共享
    beta_probs = (
        np.ones((V, T))   np.array([np.arange(V) % T == t for t in range(T)]).T * 19
    )
    beta_gen = np.array(list(map(lambda x: np.random.dirichlet(x), beta_probs.T))).T

    corpus = []
    theta = np.empty((D, T))

    # 从 LDA 模型生成每个文档
    for d in range(D):

        # 为文档绘制主题分布
        if d < (D / 3):
            theta[d, :] = np.random.dirichlet(alpha1, 1)[0]
        elif d < 2 * (D / 3):
            theta[d, :] = np.random.dirichlet(alpha2, 1)[0]
        else:
            theta[d, :] = np.random.dirichlet(alpha3, 1)[0]

        doc = np.array([])
        for n in range(N[d]):
            # 根据文档的主题分布绘制一个主题
            z_n = np.random.choice(np.arange(T), p=theta[d, :])

            # 根据主题-词分布绘制一个词
            w_n = np.random.choice(np.arange(V), p=beta_gen[:, z_n])
            doc = np.append(doc, w_n)

        corpus.append(doc)
    return corpus, T

# 绘制未平滑的 LDA 模型
def plot_unsmoothed():
    # 生成语料库
    corpus, T = generate_corpus()
    # 创建 LDA 对象
    L = LDA(T)
    # 训练 LDA 模型
    L.train(corpus, verbose=False)
    # 创建一个包含两个子图的图形对象和子图对象
    fig, axes = plt.subplots(1, 2)
    # 在第一个子图上绘制热力图,不显示 x 和 y 轴标签,将图形对象传递给 ax1
    ax1 = sns.heatmap(L.beta, xticklabels=[], yticklabels=[], ax=axes[0])
    # 设置第一个子图的 x 轴标签
    ax1.set_xlabel("Topics")
    # 设置第一个子图的 y 轴标签
    ax1.set_ylabel("Words")
    # 设置第一个子图的标题
    ax1.set_title("Recovered topic-word distribution")
    
    # 在第二个子图上绘制热力图,不显示 x 和 y 轴标签,将图形对象传递给 ax2
    ax2 = sns.heatmap(L.gamma, xticklabels=[], yticklabels=[], ax=axes[1])
    # 设置第二个子图的 x 轴标签
    ax2.set_xlabel("Topics")
    # 设置第二个子图的 y 轴标签
    ax2.set_ylabel("Documents")
    # 设置第二个子图的标题
    ax2.set_title("Recovered document-topic distribution")
    
    # 将图形保存为图片文件,设置分辨率为 300 dpi
    plt.savefig("img/plot_unsmoothed.png", dpi=300)
    # 关闭所有图形对象
    plt.close("all")

numpy-mlnumpy_mlplotslm_plots.py

代码语言:javascript复制
# 禁用 flake8 的警告
# 导入 numpy 库,并使用 np 别名
import numpy as np

# 从 sklearn.model_selection 模块中导入 train_test_split 函数
# 从 sklearn.datasets.samples_generator 模块中导入 make_blobs 函数
# 从 sklearn.linear_model 模块中导入 LogisticRegression 类,并使用 LogisticRegression_sk 别名
# 从 sklearn.datasets 模块中导入 make_regression 函数
# 从 sklearn.metrics 模块中导入 zero_one_loss 和 r2_score 函数
import from sklearn.model_selection import train_test_split
import from sklearn.datasets.samples_generator import make_blobs
import from sklearn.linear_model import LogisticRegression as LogisticRegression_sk
import from sklearn.datasets import make_regression
import from sklearn.metrics import zero_one_loss, r2_score

# 导入 matplotlib.pyplot 模块,并使用 plt 别名
import matplotlib.pyplot as plt

# 导入 seaborn 模块,并使用 sns 别名
import seaborn as sns

# 设置 seaborn 图表样式为白色
sns.set_style("white")
# 设置 seaborn 图表上下文为 paper,字体缩放比例为 0.5
sns.set_context("paper", font_scale=0.5)

# 从 numpy_ml.linear_models 模块中导入 RidgeRegression、LinearRegression、BayesianLinearRegressionKnownVariance、BayesianLinearRegressionUnknownVariance、LogisticRegression 类
from numpy_ml.linear_models import (
    RidgeRegression,
    LinearRegression,
    BayesianLinearRegressionKnownVariance,
    BayesianLinearRegressionUnknownVariance,
    LogisticRegression,
)

#######################################################################
#                           Data Generators                           #
#######################################################################

# 定义函数 random_binary_tensor,生成指定形状和稀疏度的随机二进制张量
def random_binary_tensor(shape, sparsity=0.5):
    # 生成随机数组,大于等于 (1 - sparsity) 的元素设为 1,其余设为 0
    X = (np.random.rand(*shape) >= (1 - sparsity)).astype(float)
    return X

# 定义函数 random_regression_problem,生成随机回归问题数据集
def random_regression_problem(n_ex, n_in, n_out, intercept=0, std=1, seed=0):
    # 生成回归问题数据集,包括输入 X、输出 y 和真实系数 coef
    X, y, coef = make_regression(
        n_samples=n_ex,
        n_features=n_in,
        n_targets=n_out,
        bias=intercept,
        noise=std,
        coef=True,
        random_state=seed,
    )
    # 将数据集划分为训练集和测试集
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=seed
    )
    return X_train, y_train, X_test, y_test, coef

# 定义函数 random_classification_problem,生成随机分类问题数据集
def random_classification_problem(n_ex, n_classes, n_in, seed=0):
    # 生成分类问题数据集,包括输入 X 和标签 y
    X, y = make_blobs(
        n_samples=n_ex, centers=n_classes, n_features=n_in, random_state=seed
    )
    # 将数据集划分为训练集和测试集
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=seed
    )
    return X_train, y_train, X_test, y_test

#######################################################################
#                                Plots                                #
#######################################################################

# 绘制 logistic 回归的图形
def plot_logistic():
    # 设置随机种子
    np.random.seed(12345)

    # 创建一个包含 4x4 子图的图形
    fig, axes = plt.subplots(4, 4)
    # 遍历每个子图
    for i, ax in enumerate(axes.flatten()):
        # 设置输入特征数和样本数
        n_in = 1
        n_ex = 150
        # 生成随机分类问题的数据集
        X_train, y_train, X_test, y_test = random_classification_problem(
            n_ex, n_classes=2, n_in=n_in, seed=i
        )
        # 创建 LogisticRegression 对象
        LR = LogisticRegression(penalty="l2", gamma=0.2, fit_intercept=True)
        # 使用训练数据拟合模型
        LR.fit(X_train, y_train, lr=0.1, tol=1e-7, max_iter=1e7)
        # 预测测试数据
        y_pred = (LR.predict(X_test) >= 0.5) * 1.0
        # 计算损失
        loss = zero_one_loss(y_test, y_pred) * 100.0

        # 创建 LogisticRegression_sk 对象
        LR_sk = LogisticRegression_sk(
            penalty="l2", tol=0.0001, C=0.8, fit_intercept=True, random_state=i
        )
        # 使用训练数据拟合模型
        LR_sk.fit(X_train, y_train)
        # 预测测试数据
        y_pred_sk = (LR_sk.predict(X_test) >= 0.5) * 1.0
        # 计算损失
        loss_sk = zero_one_loss(y_test, y_pred_sk) * 100.0

        # 设置 x 轴的范围
        xmin = min(X_test) - 0.1 * (max(X_test) - min(X_test))
        xmax = max(X_test)   0.1 * (max(X_test) - min(X_test))
        # 生成用于绘制的数据点
        X_plot = np.linspace(xmin, xmax, 100)
        y_plot = LR.predict(X_plot)
        y_plot_sk = LR_sk.predict_proba(X_plot.reshape(-1, 1))[:, 1]

        # 绘制散点图和曲线
        ax.scatter(X_test[y_pred == 0], y_test[y_pred == 0], alpha=0.5)
        ax.scatter(X_test[y_pred == 1], y_test[y_pred == 1], alpha=0.5)
        ax.plot(X_plot, y_plot, label="mine", alpha=0.75)
        ax.plot(X_plot, y_plot_sk, label="sklearn", alpha=0.75)
        ax.legend()
        ax.set_title("Loss mine: {:.2f} Loss sklearn: {:.2f}".format(loss, loss_sk))

        # 设置 x 轴和 y 轴的刻度标签为空
        ax.xaxis.set_ticklabels([])
        ax.yaxis.set_ticklabels([])

    # 调整子图布局
    plt.tight_layout()
    # 保存图形为图片文件
    plt.savefig("plot_logistic.png", dpi=300)
    # 关闭所有图形
    plt.close("all")


# 绘制贝叶斯分类器的图形
def plot_bayes():
    # 设置随机种子
    np.random.seed(12345)
    # 设置输入特征数、输出特征数、样本数、标准差和截距
    n_in = 1
    n_out = 1
    n_ex = 20
    std = 15
    intercept = 10
    # 生成随机的回归问题数据集,包括训练集和测试集,以及系数
    X_train, y_train, X_test, y_test, coefs = random_regression_problem(
        n_ex, n_in, n_out, intercept=intercept, std=std, seed=0
    )

    # 添加一些异常值
    x1, x2 = X_train[0]   0.5, X_train[6] - 0.3
    y1 = np.dot(x1, coefs)   intercept   25
    y2 = np.dot(x2, coefs)   intercept - 31
    X_train = np.vstack([X_train, np.array([x1, x2])])
    y_train = np.hstack([y_train, [y1[0], y2[0]])

    # 使用线性回归模型拟合数据
    LR = LinearRegression(fit_intercept=True)
    LR.fit(X_train, y_train)
    y_pred = LR.predict(X_test)
    loss = np.mean((y_test - y_pred) ** 2)

    # 使用岭回归模型拟合数据
    ridge = RidgeRegression(alpha=1, fit_intercept=True)
    ridge.fit(X_train, y_train)
    y_pred = ridge.predict(X_test)
    loss_ridge = np.mean((y_test - y_pred) ** 2)

    # 使用已知方差的贝叶斯线性回归模型拟合数据
    LR_var = BayesianLinearRegressionKnownVariance(
        mu=np.c_[intercept, coefs][0], sigma=np.sqrt(std), V=None, fit_intercept=True,
    )
    LR_var.fit(X_train, y_train)
    y_pred_var = LR_var.predict(X_test)
    loss_var = np.mean((y_test - y_pred_var) ** 2)

    # 使用未知方差的贝叶斯线性回归模型拟合数据
    LR_novar = BayesianLinearRegressionUnknownVariance(
        alpha=1, beta=2, mu=np.c_[intercept, coefs][0], V=None, fit_intercept=True
    )
    LR_novar.fit(X_train, y_train)
    y_pred_novar = LR_novar.predict(X_test)
    loss_novar = np.mean((y_test - y_pred_novar) ** 2)

    # 计算绘图所需的数据范围
    xmin = min(X_test) - 0.1 * (max(X_test) - min(X_test))
    xmax = max(X_test)   0.1 * (max(X_test) - min(X_test))
    X_plot = np.linspace(xmin, xmax, 100)
    y_plot = LR.predict(X_plot)
    y_plot_ridge = ridge.predict(X_plot)
    y_plot_var = LR_var.predict(X_plot)
    y_plot_novar = LR_novar.predict(X_plot)

    # 计算真实值
    y_true = [np.dot(x, coefs)   intercept for x in X_plot]
    # 创建包含4个子图的图形
    fig, axes = plt.subplots(1, 4)

    # 将子图展平
    axes = axes.flatten()
    # 在第一个子图中绘制散点图和拟合曲线
    axes[0].scatter(X_test, y_test)
    axes[0].plot(X_plot, y_plot, label="MLE")
    axes[0].plot(X_plot, y_true, label="True fn")
    axes[0].set_title("Linear RegressionnMLE Test MSE: {:.2f}".format(loss))
    axes[0].legend()
    # 在第一个子图中绘制 X_plot 和 y_plot 之间的区域,用 error 来填充

    # 在第二个子图中绘制测试数据的散点图
    axes[1].scatter(X_test, y_test)
    # 在第二个子图中绘制 Ridge 回归的预测结果
    axes[1].plot(X_plot, y_plot_ridge, label="MLE")
    # 在第二个子图中绘制真实函数的图像
    axes[1].plot(X_plot, y_true, label="True fn")
    # 设置第二个子图的标题
    axes[1].set_title(
        "Ridge Regression (alpha=1)nMLE Test MSE: {:.2f}".format(loss_ridge)
    )
    # 添加图例
    axes[1].legend()

    # 在第三个子图中绘制 MAP 的预测结果
    axes[2].plot(X_plot, y_plot_var, label="MAP")
    # 获取后验分布的均值和协方差
    mu, cov = LR_var.posterior["b"].mean, LR_var.posterior["b"].cov
    # 对后验分布进行采样,并绘制采样结果
    for k in range(200):
        b_samp = np.random.multivariate_normal(mu, cov)
        y_samp = [np.dot(x, b_samp[1])   b_samp[0] for x in X_plot]
        axes[2].plot(X_plot, y_samp, alpha=0.05)
    # 在第三个子图中绘制测试数据的散点图
    axes[2].scatter(X_test, y_test)
    # 在第三个子图中绘制真实函数的图像
    axes[2].plot(X_plot, y_true, label="True fn")
    # 添加图例
    axes[2].legend()
    # 设置第三个子图的标题
    axes[2].set_title(
        "Bayesian Regression (known variance)nMAP Test MSE: {:.2f}".format(loss_var)
    )

    # 在第四个子图中绘制 MAP 的预测结果
    axes[3].plot(X_plot, y_plot_novar, label="MAP")
    # 获取后验分布的均值和协方差
    mu = LR_novar.posterior["b | sigma**2"].mean
    cov = LR_novar.posterior["b | sigma**2"].cov
    # 对后验分布进行采样,并绘制采样结果
    for k in range(200):
        b_samp = np.random.multivariate_normal(mu, cov)
        y_samp = [np.dot(x, b_samp[1])   b_samp[0] for x in X_plot]
        axes[3].plot(X_plot, y_samp, alpha=0.05)
    # 在第四个子图中绘制测试数据的散点图
    axes[3].scatter(X_test, y_test)
    # 在第四个子图中绘制真实函数的图像
    axes[3].plot(X_plot, y_true, label="True fn")
    # 添加图例
    axes[3].legend()
    # 设置第四个子图的标题
    axes[3].set_title(
        "Bayesian Regression (unknown variance)nMAP Test MSE: {:.2f}".format(
            loss_novar
        )
    )

    # 设置所有子图的 x 轴和 y 轴刻度标签为空
    for ax in axes:
        ax.xaxis.set_ticklabels([])
        ax.yaxis.set_ticklabels([])

    # 设置图形的尺寸
    fig.set_size_inches(7.5, 1.875)
    # 保存图形为文件
    plt.savefig("plot_bayes.png", dpi=300)
    # 关闭所有图形
    plt.close("all")
# 定义一个函数用于绘制回归图
def plot_regression():
    # 设置随机种子,以便结果可重现
    np.random.seed(12345)

    # 创建一个包含4行4列子图的图形对象
    fig, axes = plt.subplots(4, 4)
    # 调整子图之间的间距
    plt.tight_layout()
    # 将图形保存为名为"plot_regression.png"的PNG文件,设置分辨率为300dpi
    plt.savefig("plot_regression.png", dpi=300)
    # 关闭所有图形对象,释放资源
    plt.close("all")

numpy-mlnumpy_mlplotsngram_plots.py

代码语言:javascript复制
# 禁用 flake8 的警告
# 导入 numpy 库,并使用 np 别名
import numpy as np

# 导入 matplotlib.pyplot 和 seaborn 库
import matplotlib.pyplot as plt
import seaborn as sns

# 设置 seaborn 库的样式为白色,上下文为 notebook,字体比例为 1
# 更多信息可参考:https://seaborn.pydata.org/generated/seaborn.set_context.html
# 更多信息可参考:https://seaborn.pydata.org/generated/seaborn.set_style.html
sns.set_style("white")
sns.set_context("notebook", font_scale=1)

# 从 numpy_ml.ngram 模块中导入 MLENGram、AdditiveNGram、GoodTuringNGram 类
from numpy_ml.ngram import MLENGram, AdditiveNGram, GoodTuringNGram

# 定义函数 plot_count_models,用于绘制不同模型的计数情况
def plot_count_models(GT, N):
    # 获取 GoodTuringNGram 对象中的 _num_grams_with_count 属性
    NC = GT._num_grams_with_count
    # 获取 GoodTuringNGram 对象中的 _count_models[N] 属性
    mod = GT._count_models[N]
    # 获取 GoodTuringNGram 对象中的 counts[N] 属性中的最大值
    max_n = max(GT.counts[N].values())
    # 创建一个列表,包含每个计数的计数
    emp = [NC(n   1, N) for n in range(max_n)]
    # 创建一个列表,包含模型预测的计数
    prd = [np.exp(mod.predict(np.array([n   1]))) for n in range(max_n   10)]
    # 绘制散点图,展示实际值和模型预测值
    plt.scatter(range(max_n), emp, c="r", label="actual")
    plt.plot(range(max_n   10), prd, "-", label="model")
    plt.ylim([-1, 100])
    plt.xlabel("Count ($r$)")
    plt.ylabel("Count-of-counts ($N_r$)")
    plt.legend()
    plt.savefig("test.png")
    plt.close()

# 定义函数 compare_probs,用于比较不同模型的概率
def compare_probs(fp, N):
    # 创建 MLENGram 对象 MLE
    MLE = MLENGram(N, unk=False, filter_punctuation=False, filter_stopwords=False)
    # 使用给定文件路径 fp 进行训练,编码方式为 utf-8-sig
    MLE.train(fp, encoding="utf-8-sig")

    # 初始化各种概率列表
    add_y, mle_y, gtt_y = [], [], []
    addu_y, mleu_y, gttu_y = [], [], []
    seen = ("<bol>", "the")
    unseen = ("<bol>", "asdf")

    # 创建 GoodTuringNGram 对象 GTT
    GTT = GoodTuringNGram(
        N, conf=1.96, unk=False, filter_stopwords=False, filter_punctuation=False
    )
    # 使用给定文件路径 fp 进行训练,编码方式为 utf-8-sig
    GTT.train(fp, encoding="utf-8-sig")

    # 计算已见序列 seen 的概率和未见序列 unseen 的概率
    gtt_prob = GTT.log_prob(seen, N)
    gtt_prob_u = GTT.log_prob(unseen, N)

    # 在 0 到 10 之间生成 20 个数,用于循环
    for K in np.linspace(0, 10, 20):
        # 创建 AdditiveNGram 对象 ADD
        ADD = AdditiveNGram(
            N, K, unk=False, filter_punctuation=False, filter_stopwords=False
        )
        # 使用给定文件路径 fp 进行训练,编码方式为 utf-8-sig
        ADD.train(fp, encoding="utf-8-sig")

        # 计算已见序列 seen 的概率和 MLENGram 对象 MLE 的概率
        add_prob = ADD.log_prob(seen, N)
        mle_prob = MLE.log_prob(seen, N)

        # 将计算结果添加到对应的列表中
        add_y.append(add_prob)
        mle_y.append(mle_prob)
        gtt_y.append(gtt_prob)

        # 计算未见序列 unseen 的概率
        mle_prob_u = MLE.log_prob(unseen, N)
        add_prob_u = ADD.log_prob(unseen, N)

        # 将计算结果添加到对应的列表中
        addu_y.append(add_prob_u)
        mleu_y.append(mle_prob_u)
        gttu_y.append(gtt_prob_u)
    # 绘制折线图,x轴为0到10的20个点,y轴为add_y,添加图例"Additive (seen ngram)"
    plt.plot(np.linspace(0, 10, 20), add_y, label="Additive (seen ngram)")
    # 绘制折线图,x轴为0到10的20个点,y轴为addu_y,添加图例"Additive (unseen ngram)"
    plt.plot(np.linspace(0, 10, 20), addu_y, label="Additive (unseen ngram)")
    # 注释掉的代码,不会被执行,这里是绘制Good-Turing (seen ngram)的折线图
    # plt.plot(np.linspace(0, 10, 20), gtt_y, label="Good-Turing (seen ngram)")
    # 注释掉的代码,不会被执行,这里是绘制Good-Turing (unseen ngram)的折线图
    # plt.plot(np.linspace(0, 10, 20), gttu_y, label="Good-Turing (unseen ngram)")
    # 绘制折线图,x轴为0到10的20个点,y轴为mle_y,线条为虚线,添加图例"MLE (seen ngram)"
    plt.plot(np.linspace(0, 10, 20), mle_y, "--", label="MLE (seen ngram)")
    # 设置x轴标签为"K"
    plt.xlabel("K")
    # 设置y轴标签为"log P(sequence)"
    plt.ylabel("log P(sequence)")
    # 添加图例
    plt.legend()
    # 保存图像为"img/add_smooth.png"
    plt.savefig("img/add_smooth.png")
    # 关闭所有图形窗口
    plt.close("all")
# 绘制经验频率与简单 Good Turing 平滑值的散点图,按排名顺序。依赖于 pylab 和 matplotlib。
def plot_gt_freqs(fp):
    # 创建一个最大似然估计的 1-gram 模型,不过滤标点符号和停用词
    MLE = MLENGram(1, filter_punctuation=False, filter_stopwords=False)
    # 从文件中训练模型
    MLE.train(fp, encoding="utf-8-sig")
    # 获取模型中的词频统计
    counts = dict(MLE.counts[1])

    # 创建一个 Good Turing 平滑的 1-gram 模型,不过滤停用词和标点符号
    GT = GoodTuringNGram(1, filter_stopwords=False, filter_punctuation=False)
    # 从文件中训练模型
    GT.train(fp, encoding="utf-8-sig")

    # 创建一个拉普拉斯平滑的 1-gram 模型,不过滤停用词和标点符号
    ADD = AdditiveNGram(1, 1, filter_punctuation=False, filter_stopwords=False)
    # 从文件中训练模型
    ADD.train(fp, encoding="utf-8-sig")

    # 计算总词频
    tot = float(sum(counts.values()))
    # 计算每个词的频率
    freqs = dict([(token, cnt / tot) for token, cnt in counts.items()])
    # 计算每个词的简单 Good Turing 平滑概率
    sgt_probs = dict([(tok, np.exp(GT.log_prob(tok, 1))) for tok in counts.keys()])
    # 计算每个词的拉普拉斯平滑概率
    as_probs = dict([(tok, np.exp(ADD.log_prob(tok, 1))) for tok in counts.keys()])

    # 创建 X, Y 坐标,用于绘制 MLE 的散点图
    X, Y = np.arange(len(freqs)), sorted(freqs.values(), reverse=True)
    plt.loglog(X, Y, "k ", alpha=0.25, label="MLE")

    # 创建 X, Y 坐标,用于绘制简单 Good Turing 的散点图
    X, Y = np.arange(len(sgt_probs)), sorted(sgt_probs.values(), reverse=True)
    plt.loglog(X, Y, "r ", alpha=0.25, label="simple Good-Turing")

    # 创建 X, Y 坐标,用于绘制拉普拉斯平滑的散点图
    X, Y = np.arange(len(as_probs)), sorted(as_probs.values(), reverse=True)
    plt.loglog(X, Y, "b ", alpha=0.25, label="Laplace smoothing")

    # 设置 X 轴标签
    plt.xlabel("Rank")
    # 设置 Y 轴标签
    plt.ylabel("Probability")
    # 添加图例
    plt.legend()
    # 调整布局
    plt.tight_layout()
    # 保存图像
    plt.savefig("img/rank_probs.png")
    # 关闭所有图形
    plt.close("all")

numpy-mlnumpy_mlplotsnn_activations_plots.py

代码语言:javascript复制
# 禁用 flake8 的警告
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 设置 seaborn 库的样式为白色背景
sns.set_style("white")
# 设置 seaborn 库的上下文为 notebook,字体缩放比例为 0.7
sns.set_context("notebook", font_scale=0.7)

# 从自定义模块中导入激活函数类
from numpy_ml.neural_nets.activations import (
    Affine,
    ReLU,
    LeakyReLU,
    Tanh,
    Sigmoid,
    ELU,
    Exponential,
    SELU,
    HardSigmoid,
    SoftPlus,
)


# 定义绘制激活函数图像的函数
def plot_activations():
    # 创建包含 2 行 5 列子图的图像
    fig, axes = plt.subplots(2, 5, sharex=True, sharey=True)
    # 定义激活函数列表
    fns = [
        Affine(),
        Tanh(),
        Sigmoid(),
        ReLU(),
        LeakyReLU(),
        ELU(),
        Exponential(),
        SELU(),
        HardSigmoid(),
        SoftPlus(),
    ]

    # 遍历子图和激活函数列表
    for ax, fn in zip(axes.flatten(), fns):
        # 生成输入数据
        X = np.linspace(-3, 3, 100).astype(float).reshape(100, 1)
        # 绘制激活函数图像及其一阶导数和二阶导数
        ax.plot(X, fn(X), label=r"$y$", alpha=1.0)
        ax.plot(X, fn.grad(X), label=r"$frac{dy}{dx}$", alpha=1.0)
        ax.plot(X, fn.grad2(X), label=r"$frac{d^2 y}{dx^2}$", alpha=1.0)
        # 绘制虚线表示 x 轴和 y 轴
        ax.hlines(0, -3, 3, lw=1, linestyles="dashed", color="k")
        ax.vlines(0, -1.2, 1.2, lw=1, linestyles="dashed", color="k")
        # 设置 y 轴范围和 x 轴范围
        ax.set_ylim(-1.1, 1.1)
        ax.set_xlim(-3, 3)
        # 设置 x 轴和 y 轴的刻度
        ax.set_xticks([])
        ax.set_yticks([-1, 0, 1])
        ax.xaxis.set_visible(False)
        # ax.yaxis.set_visible(False)
        # 设置子图标题为激活函数名称
        ax.set_title("{}".format(fn))
        # 显示图例
        ax.legend(frameon=False)
        # 移除图像的左边和底部边框
        sns.despine(left=True, bottom=True)

    # 设置图像大小
    fig.set_size_inches(10, 5)
    # 调整子图布局
    plt.tight_layout()
    # 保存图像为文件
    plt.savefig("img/plot.png", dpi=300)
    # 关闭所有图像
    plt.close("all")


# 当作为脚本直接运行时,调用绘制激活函数图像的函数
if __name__ == "__main__":
    plot_activations()

numpy-mlnumpy_mlplotsnn_schedulers_plots.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入所需的库
import time
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# 设置 seaborn 库的样式和上下文
# 参考链接:https://seaborn.pydata.org/generated/seaborn.set_context.html
# 参考链接:https://seaborn.pydata.org/generated/seaborn.set_style.html
sns.set_style("white")
sns.set_context("notebook", font_scale=0.7)

# 从自定义模块中导入不同的学习率调度器
from numpy_ml.neural_nets.schedulers import (
    ConstantScheduler,
    ExponentialScheduler,
    NoamScheduler,
    KingScheduler,
)

# 定义一个自定义的损失函数 king_loss_fn
def king_loss_fn(x):
    if x <= 250:
        return -0.25 * x   82.50372665317208
    elif 250 < x <= 600:
        return 20.00372665317208
    elif 600 < x <= 700:
        return -0.2 * x   140.00372665317207
    else:
        return 0.003726653172066108

# 定义一个绘制不同学习率调度器效果的函数 plot_schedulers
def plot_schedulers():
    # 创建一个包含4个子图的画布
    fig, axes = plt.subplots(2, 2)

    # 遍历不同的学习率调度器,并绘制对应的学习率曲线
    for ax, schs, title in zip(
        axes.flatten(), schedulers, ["Constant", "Exponential", "Noam", "King"]
    ):
        t0 = time.time()
        print("Running {} scheduler".format(title))
        X = np.arange(1, 1000)
        loss = np.array([king_loss_fn(x) for x in X])

        # 将损失值缩放以适应与学习率相同的轴
        scale = 0.01 / loss[0]
        loss *= scale

        if title == "King":
            ax.plot(X, loss, ls=":", label="Loss")

        for sc, lg in schs:
            Y = np.array([sc(x, ll) for x, ll in zip(X, loss)])
            ax.plot(X, Y, label=lg, alpha=0.6)

        ax.legend(fontsize=5)
        ax.set_xlabel("Steps")
        ax.set_ylabel("Learning rate")
        ax.set_title("{} scheduler".format(title))
        print(
            "Finished plotting {} runs of {} in {:.2f}s".format(
                len(schs), title, time.time() - t0
            )
        )

    # 调整子图布局并保存绘图结果
    plt.tight_layout()
    plt.savefig("plot.png", dpi=300)
    plt.close("all")

# 如果作为脚本运行,则调用 plot_schedulers 函数
if __name__ == "__main__":
    plot_schedulers()

numpy-mlnumpy_mlplotsnonparametric_plots.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入 numpy 库并重命名为 np
import numpy as np

# 导入 matplotlib.pyplot 库并重命名为 plt
import matplotlib.pyplot as plt
# 导入 seaborn 库并重命名为 sns

import seaborn as sns

# 设置 seaborn 图表样式为白色
sns.set_style("white")
# 设置 seaborn 上下文为 paper,字体缩放比例为 0.5
sns.set_context("paper", font_scale=0.5)

# 从 numpy_ml.nonparametric 模块导入 GPRegression、KNN、KernelRegression 类
# 从 numpy_ml.linear_models.lm 模块导入 LinearRegression 类
from numpy_ml.nonparametric import GPRegression, KNN, KernelRegression
from numpy_ml.linear_models.lm import LinearRegression

# 从 sklearn.model_selection 模块导入 train_test_split 函数

from sklearn.model_selection import train_test_split

# 定义函数 random_regression_problem,生成随机回归问题数据集
def random_regression_problem(n_ex, n_in, n_out, d=3, intercept=0, std=1, seed=0):
    # 生成随机系数
    coef = np.random.uniform(0, 50, size=d)
    coef[-1] = intercept

    y = []
    # 生成随机输入数据 X
    X = np.random.uniform(-100, 100, size=(n_ex, n_in))
    for x in X:
        # 计算输出值 y,并加入随机噪声
        val = np.polyval(coef, x)   np.random.normal(0, std)
        y.append(val)
    y = np.array(y)

    # 将数据集划分为训练集和测试集
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.3, random_state=seed
    )
    return X_train, y_train, X_test, y_test, coef

# 定义函数 plot_regression,绘制回归图表
def plot_regression():
    np.random.seed(12345)
    # 创建 4x4 的子图
    fig, axes = plt.subplots(4, 4)
    # 调整子图布局
    plt.tight_layout()
    # 保存图表为 kr_plots.png 文件,分辨率为 300 dpi
    plt.savefig("img/kr_plots.png", dpi=300)
    # 关闭所有图表
    plt.close("all")

# 定义函数 plot_knn,绘制 KNN 图表
def plot_knn():
    np.random.seed(12345)
    # 创建 4x4 的子图
    fig, axes = plt.subplots(4, 4)
    # 调整子图布局
    plt.tight_layout()
    # 保存图表为 knn_plots.png 文件,分辨率为 300 dpi
    plt.savefig("img/knn_plots.png", dpi=300)
    # 关闭所有图表
    plt.close("all")

# 定义函数 plot_gp,绘制高斯过程图表
def plot_gp():
    np.random.seed(12345)
    # 设置 seaborn 上下文为 paper,字体缩放比例为 0.65
    sns.set_context("paper", font_scale=0.65)

    # 生成测试数据 X_test
    X_test = np.linspace(-10, 10, 100)
    X_train = np.array([-3, 0, 7, 1, -9])
    y_train = np.sin(X_train)

    # 创建 2x2 的子图
    fig, axes = plt.subplots(2, 2)
    # 定义 alpha 值列表
    alphas = [0, 1e-10, 1e-5, 1]
    # 遍历 axes 中展平后的子图和 alphas 中的值,同时获取索引 ix 和元组 (ax, alpha)
    for ix, (ax, alpha) in enumerate(zip(axes.flatten(), alphas)):
        # 创建一个高斯过程回归对象 G,使用 RBF 核函数和给定的 alpha 值
        G = GPRegression(kernel="RBFKernel", alpha=alpha)
        # 使用训练数据 X_train 和 y_train 拟合 G 模型
        G.fit(X_train, y_train)
        # 对测试数据 X_test 进行预测,得到预测值 y_pred 和置信区间 conf
        y_pred, conf = G.predict(X_test)

        # 在当前子图 ax 上绘制训练数据点
        ax.plot(X_train, y_train, "rx", label="observed")
        # 在当前子图 ax 上绘制真实函数 np.sin(X_test) 的图像
        ax.plot(X_test, np.sin(X_test), label="true fn")
        # 在当前子图 ax 上绘制预测值 y_pred 的图像,并使用虚线表示
        ax.plot(X_test, y_pred, "--", label="MAP (alpha={})".format(alpha))
        # 在当前子图 ax 上填充置信区间
        ax.fill_between(X_test, y_pred   conf, y_pred - conf, alpha=0.1)
        # 设置 x 轴和 y 轴的刻度为空
        ax.set_xticks([])
        ax.set_yticks([])
        # 移除子图 ax 的上、右边框
        sns.despine()

        # 在子图 ax 上添加图例
        ax.legend()

    # 调整子图布局
    plt.tight_layout()
    # 将当前图保存为图片 gp_alpha.png,分辨率为 300 dpi
    plt.savefig("img/gp_alpha.png", dpi=300)
    # 关闭所有图形窗口
    plt.close("all")
# 定义一个函数用于绘制高斯过程的分布
def plot_gp_dist():
    # 设置随机种子
    np.random.seed(12345)
    # 设置 seaborn 图形的上下文和字体比例
    sns.set_context("paper", font_scale=0.95)

    # 生成测试数据
    X_test = np.linspace(-10, 10, 100)
    # 训练数据
    X_train = np.array([-3, 0, 7, 1, -9])
    y_train = np.sin(X_train)

    # 创建包含3个子图的画布
    fig, axes = plt.subplots(1, 3)
    # 创建高斯过程回归对象
    G = GPRegression(kernel="RBFKernel", alpha=0)
    # 使用训练数据拟合高斯过程
    G.fit(X_train, y_train)

    # 从先验分布中生成3个样本
    y_pred_prior = G.sample(X_test, 3, "prior")
    # 从后验预测分布中生成3个样本
    y_pred_posterior = G.sample(X_test, 3, "posterior_predictive")

    # 绘制先验样本
    for prior_sample in y_pred_prior:
        axes[0].plot(X_test, prior_sample.ravel(), lw=1)
    axes[0].set_title("Prior samples")
    axes[0].set_xticks([])
    axes[0].set_yticks([])

    # 绘制后验样本
    for post_sample in y_pred_posterior:
        axes[1].plot(X_test, post_sample.ravel(), lw=1)
    axes[1].plot(X_train, y_train, "ko", ms=1.2)
    axes[1].set_title("Posterior samples")
    axes[1].set_xticks([])
    axes[1].set_yticks([])

    # 预测测试数据的均值和置信区间
    y_pred, conf = G.predict(X_test)

    # 绘制真实函数、MAP 估计和观测数据
    axes[2].plot(X_test, np.sin(X_test), lw=1, label="true function")
    axes[2].plot(X_test, y_pred, lw=1, label="MAP estimate")
    axes[2].fill_between(X_test, y_pred   conf, y_pred - conf, alpha=0.1)
    axes[2].plot(X_train, y_train, "ko", ms=1.2, label="observed")
    axes[2].legend(fontsize="x-small")
    axes[2].set_title("Posterior mean")
    axes[2].set_xticks([])
    axes[2].set_yticks([])

    # 设置画布大小
    fig.set_size_inches(6, 2)
    # 调整子图布局
    plt.tight_layout()
    # 保存图片
    plt.savefig("img/gp_dist.png", dpi=300)
    # 关闭所有图形
    plt.close("all")

numpy-mlnumpy_mlplotsrl_plots.py

代码语言:javascript复制
# 禁用 flake8 检查
# 导入 gym 库
import gym

# 导入 Trainer 类
from numpy_ml.rl_models.trainer import Trainer
# 导入不同的智能体类
from numpy_ml.rl_models.agents import (
    CrossEntropyAgent,
    MonteCarloAgent,
    TemporalDifferenceAgent,
    DynaAgent,
)

# 测试交叉熵智能体
def test_cross_entropy_agent():
    seed = 12345
    max_steps = 300
    n_episodes = 50
    retain_prcnt = 0.2
    n_samples_per_episode = 500
    # 创建 LunarLander-v2 环境
    env = gym.make("LunarLander-v2")

    # 创建交叉熵智能体
    agent = CrossEntropyAgent(env, n_samples_per_episode, retain_prcnt)
    trainer = Trainer(agent, env)
    # 训练智能体
    trainer.train(
        n_episodes, max_steps, seed=seed, plot=True, verbose=True, render_every=None
    )

# 测试蒙特卡洛智能体
def test_monte_carlo_agent():
    seed = 12345
    max_steps = 300
    n_episodes = 10000

    epsilon = 0.05
    off_policy = True
    smooth_factor = 0.001
    temporal_discount = 0.95
    # 创建 Copy-v0 环境
    env = gym.make("Copy-v0")

    # 创建蒙特卡洛智能体
    agent = MonteCarloAgent(env, off_policy, temporal_discount, epsilon)
    trainer = Trainer(agent, env)
    # 训练智能体
    trainer.train(
        n_episodes,
        max_steps,
        seed=seed,
        plot=True,
        verbose=True,
        render_every=None,
        smooth_factor=smooth_factor,
    )

# 测试时序差分智能体
def test_temporal_difference_agent():
    seed = 12345
    max_steps = 200
    n_episodes = 5000

    lr = 0.4
    n_tilings = 10
    epsilon = 0.10
    off_policy = True
    grid_dims = [100, 100]
    smooth_factor = 0.005
    temporal_discount = 0.999
    # 创建 LunarLander-v2 环境
    env = gym.make("LunarLander-v2")
    obs_max = 1
    obs_min = -1

    # 创建时序差分智能体
    agent = TemporalDifferenceAgent(
        env,
        lr=lr,
        obs_max=obs_max,
        obs_min=obs_min,
        epsilon=epsilon,
        n_tilings=n_tilings,
        grid_dims=grid_dims,
        off_policy=off_policy,
        temporal_discount=temporal_discount,
    )

    trainer = Trainer(agent, env)
    # 训练智能体
    trainer.train(
        n_episodes,
        max_steps,
        seed=seed,
        plot=True,
        verbose=True,
        render_every=None,
        smooth_factor=smooth_factor,
    )

# 测试 Dyna 智能体
def test_dyna_agent():
    seed = 12345
    # 设置最大步数
    max_steps = 200
    # 设置训练的总回合数
    n_episodes = 150

    # 学习率
    lr = 0.4
    # 是否使用 Q  算法
    q_plus = False
    # 瓦片编码的数量
    n_tilings = 10
    # ε-贪心策略中的 ε 值
    epsilon = 0.10
    # 网格维度
    grid_dims = [10, 10]
    # 平滑因子
    smooth_factor = 0.01
    # 时间折扣因子
    temporal_discount = 0.99
    # 探索权重
    explore_weight = 0.05
    # 模拟动作的数量
    n_simulated_actions = 25

    # 观测值的最大值和最小值
    obs_max, obs_min = 1, -1
    # 创建环境
    env = gym.make("Taxi-v2")

    # 创建 DynaAgent 对象
    agent = DynaAgent(
        env,
        lr=lr,
        q_plus=q_plus,
        obs_max=obs_max,
        obs_min=obs_min,
        epsilon=epsilon,
        n_tilings=n_tilings,
        grid_dims=grid_dims,
        explore_weight=explore_weight,
        temporal_discount=temporal_discount,
        n_simulated_actions=n_simulated_actions,
    )

    # 创建 Trainer 对象
    trainer = Trainer(agent, env)
    # 开始训练
    trainer.train(
        n_episodes,
        max_steps,
        seed=seed,
        plot=True,
        verbose=True,
        render_every=None,
        smooth_factor=smooth_factor,
    )

numpy-mlnumpy_mlplotstrees_plots.py

代码语言:javascript复制
# 禁用 flake8 的警告
# 导入 NumPy 库,并使用 np 别名
import numpy as np

# 从 sklearn 库中导入准确率评估、均方误差评估、生成聚类数据、生成回归数据、划分训练集和测试集的函数
from sklearn.metrics import accuracy_score, mean_squared_error
from sklearn.datasets import make_blobs, make_regression
from sklearn.model_selection import train_test_split

# 导入 matplotlib.pyplot 库,并使用 plt 别名
import matplotlib.pyplot as plt

# 导入 seaborn 库,并设置图表样式为白色,字体比例为 0.9
# 参考链接:https://seaborn.pydata.org/generated/seaborn.set_context.html
# 参考链接:https://seaborn.pydata.org/generated/seaborn.set_style.html
import seaborn as sns
sns.set_style("white")
sns.set_context("paper", font_scale=0.9)

# 从 numpy_ml.trees 模块中导入梯度提升决策树、决策树、随机森林类
from numpy_ml.trees import GradientBoostedDecisionTree, DecisionTree, RandomForest

# 定义一个绘图函数
def plot():
    # 创建一个 4x4 的子图,并设置图表大小为 10x10
    fig, axes = plt.subplots(4, 4)
    fig.set_size_inches(10, 10)
    # 保存图表为 plot.png,分辨率为 300 dpi
    plt.savefig("plot.png", dpi=300)
    # 关闭所有图表
    plt.close("all")

numpy-mlnumpy_mlpreprocessingdsp.py

代码语言:javascript复制
# 导入必要的库
import numpy as np
from numpy.lib.stride_tricks import as_strided

# 导入自定义的窗口初始化器
from ..utils.windows import WindowInitializer

#######################################################################
#                          Signal Resampling                          #
#######################################################################

# 批量对每个图像(或类似网格状的2D信号)进行重采样到指定维度
def batch_resample(X, new_dim, mode="bilinear"):
    """
    Resample each image (or similar grid-based 2D signal) in a batch to
    `new_dim` using the specified resampling strategy.

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, in_rows, in_cols, in_channels)`
        An input image volume
    new_dim : 2-tuple of `(out_rows, out_cols)`
        The dimension to resample each image to
    mode : {'bilinear', 'neighbor'}
        The resampling strategy to employ. Default is 'bilinear'.

    Returns
    -------
    resampled : :py:class:`ndarray <numpy.ndarray>` of shape `(n_ex, out_rows, out_cols, in_channels)`
        The resampled image volume.
    """
    # 根据不同的重采样策略选择插值函数
    if mode == "bilinear":
        interpolate = bilinear_interpolate
    elif mode == "neighbor":
        interpolate = nn_interpolate_2D
    else:
        raise NotImplementedError("Unrecognized resampling mode: {}".format(mode))

    out_rows, out_cols = new_dim
    n_ex, in_rows, in_cols, n_in = X.shape

    # 计算重采样的坐标
    x = np.tile(np.linspace(0, in_cols - 2, out_cols), out_rows)
    y = np.repeat(np.linspace(0, in_rows - 2, out_rows), out_cols)

    # 对每个图像进行重采样
    resampled = []
    for i in range(n_ex):
        r = interpolate(X[i, ...], x, y)
        r = r.reshape(out_rows, out_cols, n_in)
        resampled.append(r)
    return np.dstack(resampled)


# 使用最近邻插值策略估计在`X`中坐标(x, y)处的像素值
def nn_interpolate_2D(X, x, y):
    """
    Estimates of the pixel values at the coordinates (x, y) in `X` using a
    nearest neighbor interpolation strategy.

    Notes
    -----
    # 假设当前的 `X` 中的条目反映了一个二维整数网格上等间距采样的样本

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(in_rows, in_cols, in_channels)`
        一个沿着 `in_rows` 行 `in_cols` 列的网格采样的输入图像。
    x : list of length `k`
        我们希望生成样本的 x 坐标列表
    y : list of length `k`
        我们希望生成样本的 y 坐标列表

    Returns
    -------
    samples : :py:class:`ndarray <numpy.ndarray>` of shape `(k, in_channels)`
        每个 (x,y) 坐标的样本,通过最近邻插值计算得到
    """
    # 将 x 和 y 坐标四舍五入到最近的整数
    nx, ny = np.around(x), np.around(y)
    # 将四舍五入后的坐标限制在合适的范围内,并转换为整数类型
    nx = np.clip(nx, 0, X.shape[1] - 1).astype(int)
    ny = np.clip(ny, 0, X.shape[0] - 1).astype(int)
    # 返回根据最近邻插值计算得到的样本
    return X[ny, nx, :]
def nn_interpolate_1D(X, t):
    """
    Estimates of the signal values at `X[t]` using a nearest neighbor
    interpolation strategy.

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(in_length, in_channels)`
        An input image sampled along an integer `in_length`
    t : list of length `k`
        A list of coordinates for the samples we wish to generate

    Returns
    -------
    samples : :py:class:`ndarray <numpy.ndarray>` of shape `(k, in_channels)`
        The samples for each (x,y) coordinate computed via nearest neighbor
        interpolation
    """
    # 对 t 进行四舍五入,并限制在合适的范围内,转换为整数
    nt = np.clip(np.around(t), 0, X.shape[0] - 1).astype(int)
    return X[nt, :]


def bilinear_interpolate(X, x, y):
    """
    Estimates of the pixel values at the coordinates (x, y) in `X` via bilinear
    interpolation.

    Notes
    -----
    Assumes the current entries in X reflect equally-spaced
    samples from a 2D integer grid.

    Modified from https://bit.ly/2NMb1Dr

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(in_rows, in_cols, in_channels)`
        An input image sampled along a grid of `in_rows` by `in_cols`.
    x : list of length `k`
        A list of x-coordinates for the samples we wish to generate
    y : list of length `k`
        A list of y-coordinates for the samples we wish to generate

    Returns
    -------
    samples : list of length `(k, in_channels)`
        The samples for each (x,y) coordinate computed via bilinear
        interpolation
    """
    # 向下取整得到 x0 和 y0 的整数部分
    x0 = np.floor(x).astype(int)
    y0 = np.floor(y).astype(int)
    # 计算 x1 和 y1
    x1 = x0   1
    y1 = y0   1

    # 限制 x0, y0, x1, y1 在合适的范围内
    x0 = np.clip(x0, 0, X.shape[1] - 1)
    y0 = np.clip(y0, 0, X.shape[0] - 1)
    x1 = np.clip(x1, 0, X.shape[1] - 1)
    y1 = np.clip(y1, 0, X.shape[0] - 1)

    # 计算插值所需的四个像素值
    Ia = X[y0, x0, :].T
    Ib = X[y1, x0, :].T
    Ic = X[y0, x1, :].T
    Id = X[y1, x1, :].T

    # 计算插值权重
    wa = (x1 - x) * (y1 - y)
    wb = (x1 - x) * (y - y0)
    wc = (x - x0) * (y1 - y)
    # 计算第四个顶点的权重
    wd = (x - x0) * (y - y0)
    
    # 返回根据权重计算得到的插值结果
    return (Ia * wa).T   (Ib * wb).T   (Ic * wc).T   (Id * wd).T
# 定义一个函数,实现一维离散余弦变换(DCT-II),默认为正交的
def DCT(frame, orthonormal=True):
    """
    A naive :math:`O(N^2)` implementation of the 1D discrete cosine transform-II
    (DCT-II).

    Notes
    -----
    For a signal :math:`mathbf{x} = [x_1, ldots, x_N]` consisting of `N`
    samples, the `k` th DCT coefficient, :math:`c_k`, is

    .. math::

        c_k = 2 sum_{n=0}^{N-1} x_n cos(pi k (2 n   1) / (2 N))

    where `k` ranges from :math:`0, ldots, N-1`.

    The DCT is highly similar to the DFT -- whereas in a DFT the basis
    functions are sinusoids, in a DCT they are restricted solely to cosines. A
    signal's DCT representation tends to have more of its energy concentrated
    in a smaller number of coefficients when compared to the DFT, and is thus
    commonly used for signal compression. [1]

    .. [1] Smoother signals can be accurately approximated using fewer DFT / DCT
       coefficients, resulting in a higher compression ratio. The DCT naturally
       yields a continuous extension at the signal boundaries due its use of
       even basis functions (cosine). This in turn produces a smoother
       extension in comparison to DFT or DCT approximations, resulting in a
       higher compression.

    Parameters
    ----------
    frame : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
        A signal frame consisting of N samples
    orthonormal : bool
        Scale to ensure the coefficient vector is orthonormal. Default is True.

    Returns
    -------
    dct : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
        The discrete cosine transform of the samples in `frame`.
    """
    # 获取信号帧的长度
    N = len(frame)
    # 创建一个与输入信号帧相同形状的全零数组
    out = np.zeros_like(frame)
    # 遍历范围为 N 的循环
    for k in range(N):
        # 遍历帧中的每个元素,返回索引和元素值
        for (n, xn) in enumerate(frame):
            # 计算离散余弦变换的公式
            out[k]  = xn * np.cos(np.pi * k * (2 * n   1) / (2 * N))
        # 根据 k 的值计算缩放因子
        scale = np.sqrt(1 / (4 * N)) if k == 0 else np.sqrt(1 / (2 * N))
        # 根据是否正交归一化来调整输出值
        out[k] *= 2 * scale if orthonormal else 2
    # 返回变换后的结果
    return out
def __DCT2(frame):
    """Currently broken"""
    # 计算窗口长度
    N = len(frame)  # window length

    # 创建一个长度为N的一维数组k
    k = np.arange(N, dtype=float)
    # 创建一个N*N的二维数组F,用于计算DCT
    F = k.reshape(1, -1) * k.reshape(-1, 1)
    # 创建一个N*N的二维数组K,用于计算DCT
    K = np.divide(F, k, out=np.zeros_like(F), where=F != 0)

    # 计算DCT的余弦部分
    FC = np.cos(F * np.pi / N   K * np.pi / 2 * N)
    # 返回DCT结果
    return 2 * (FC @ frame)


def DFT(frame, positive_only=True):
    """
    A naive :math:`O(N^2)` implementation of the 1D discrete Fourier transform (DFT).

    Notes
    -----
    The Fourier transform decomposes a signal into a linear combination of
    sinusoids (ie., basis elements in the space of continuous periodic
    functions).  For a sequence :math:`mathbf{x} = [x_1, ldots, x_N]` of N
    evenly spaced samples, the `k` th DFT coefficient is given by:

    .. math::

        c_k = sum_{n=0}^{N-1} x_n exp(-2 pi i k n / N)

    where `i` is the imaginary unit, `k` is an index ranging from `0, ..., N-1`,
    and :math:`X_k` is the complex coefficient representing the phase
    (imaginary part) and amplitude (real part) of the `k` th sinusoid in the
    DFT spectrum. The frequency of the `k` th sinusoid is :math:`(k 2 pi / N)`
    radians per sample.

    When applied to a real-valued input, the negative frequency terms are the
    complex conjugates of the positive-frequency terms and the overall spectrum
    is symmetric (excluding the first index, which contains the zero-frequency
    / intercept term).

    Parameters
    ----------
    frame : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
        A signal frame consisting of N samples
    positive_only : bool
        Whether to only return the coefficients for the positive frequency
        terms. Default is True.

    Returns
    -------
    spectrum : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)` or `(N // 2   1,)` if `real_only`
        The coefficients of the frequency spectrum for `frame`, including
        imaginary components.
    """
    # 计算窗口长度
    N = len(frame)  # window length
    # 创建一个 N x N 的矩阵,矩阵元素为基向量 i 和时间步长 j 的系数
    F = np.arange(N).reshape(1, -1) * np.arange(N).reshape(-1, 1)
    # 计算每个元素的值为 exp(-j * 2 * pi * i / N)
    F = np.exp(F * (-1j * 2 * np.pi / N))

    # vdot 只能操作向量(而不是 ndarrays),因此需要显式循环遍历 F 中的每个基向量
    # 计算每个基向量与给定帧 frame 的内积,得到频谱
    spectrum = np.array([np.vdot(f, frame) for f in F])
    # 如果 positive_only 为 True,则返回频谱的前一半(包括中间值),否则返回整个频谱
    return spectrum[: (N // 2)   1] if positive_only else spectrum
# 计算具有 N 个系数的 DFT 的频率 bin 中心
def dft_bins(N, fs=44000, positive_only=True):
    # 如果只返回正频率项的频率 bin
    if positive_only:
        freq_bins = np.linspace(0, fs / 2, 1   N // 2, endpoint=True)
    else:
        # 计算负频率项的频率 bin
        l, r = (1   (N - 1) / 2, (1 - N) / 2) if N % 2 else (N / 2, -N / 2)
        freq_bins = np.r_[np.arange(l), np.arange(r, 0)] * fs / N
    return freq_bins


# 计算每个帧的幅度谱(即 DFT 谱的绝对值)
def magnitude_spectrum(frames):
    # 对于 frames 中的每个帧,计算其幅度谱
    return np.vstack([np.abs(DFT(frame, positive_only=True)) for frame in frames])


# 计算信号的功率谱,假设每个帧仅包含实值
def power_spectrum(frames, scale=False):
    # 对于以帧表示的信号,计算功率谱
    # 功率谱简单地是幅度谱的平方,可能会按 FFT bins 的数量进行缩放。它衡量信号的能量在频域上的分布
    # 定义函数参数和返回值的说明
    Parameters
    ----------
    frames : :py:class:`ndarray <numpy.ndarray>` of shape `(M, N)`
        A sequence of `M` frames each consisting of `N` samples
    scale : bool
        Whether the scale by the number of DFT bins. Default is False.

    Returns
    -------
    power_spec : :py:class:`ndarray <numpy.ndarray>` of shape `(M, N // 2   1)`
        The power spectrum for each frame in `frames`. Only includes the
        coefficients for the positive spectrum frequencies.
    """
    # 根据是否需要缩放计算缩放因子
    scaler = frames.shape[1] // 2   1 if scale else 1
    # 返回每个帧的功率谱,只包括正频谱频率的系数
    return (1 / scaler) * magnitude_spectrum(frames) ** 2
# 预处理工具函数,用于将一维信号 x 转换为帧宽度为 frame_width、步长为 stride 的重叠窗口
def to_frames(x, frame_width, stride, writeable=False):
    """
    Convert a 1D signal x into overlapping windows of width `frame_width` using
    a hop length of `stride`.

    Notes
    -----
    如果 ``(len(x) - frame_width) % stride != 0``,则 x 中的一些样本将被丢弃。具体来说::

        n_dropped_frames = len(x) - frame_width - stride * (n_frames - 1)

    其中::

        n_frames = (len(x) - frame_width) // stride   1

    该方法使用低级别的步长操作来避免创建 `x` 的额外副本。缺点是如果 ``writeable`=True``,
    修改 `frame` 输出可能导致意外行为:

        >>> out = to_frames(np.arange(6), 5, 1)
        >>> out
        array([[0, 1, 2, 3, 4],
               [1, 2, 3, 4, 5]])
        >>> out[0, 1] = 99
        >>> out
        array([[ 0, 99,  2,  3,  4],
               [99,  2,  3,  4,  5]])

    Parameters
    ----------
    x : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
        A 1D signal consisting of N samples
    frame_width : int
        The width of a single frame window in samples
    stride : int
        The hop size / number of samples advanced between consecutive frames
    writeable : bool
        If set to False, the returned array will be readonly. Otherwise it will
        be writable if `x` was. It is advisable to set this to False whenever
        possible to avoid unexpected behavior (see NB 2 above). Default is False.

    Returns
    -------
    frame: :py:class:`ndarray <numpy.ndarray>` of shape `(n_frames, frame_width)`
        The collection of overlapping frames stacked into a matrix
    """
    assert x.ndim == 1
    assert stride >= 1
    assert len(x) >= frame_width
    # 获取数组 x 中每个元素的大小(以比特为单位)
    byte = x.itemsize
    # 计算可以生成的帧数,根据数组 x 的长度、帧宽度和步长计算
    n_frames = (len(x) - frame_width) // stride   1
    # 使用 as_strided 函数创建一个新的数组视图
    return as_strided(
        x,
        shape=(n_frames, frame_width),  # 新数组的形状为 (帧数, 帧宽度)
        strides=(byte * stride, byte),  # 新数组的步长
        writeable=writeable,  # 指定新数组是否可写
    )
# 自相关一个一维信号 `x` 与自身。

# 1维自相关的第 `k` 项是

# .. math::

#     a_k = sum_n x_{n   k} x_n

# 注意 这是一个朴素的 :math:`O(N^2)` 实现。对于一个更快的 :math:`O(N log N)` 方法,可以参考 [1]。

# 参考资料
# ----------
# .. [1] https://en.wikipedia.org/wiki/Autocorrelation#Efficient%computation

# 参数
# ----------
# x : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
#     由 N 个样本组成的1维信号

# 返回
# -------
# auto : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
#     `x` 与自身的自相关
def autocorrelate1D(x):
    N = len(x)
    auto = np.zeros(N)
    for k in range(N):
        for n in range(N - k):
            auto[k]  = x[n   k] * x[n]
    return auto


#######################################################################
#                               Filters                               #
#######################################################################


# 预加重,增加高频带的幅度   减少低频带的幅度。

# 预加重滤波是(曾经是?)语音处理中常见的变换,其中高频率在信号消除歧义时更有用。

# .. math::

#     \text{preemphasis}( x_t ) = x_t - \alpha x_{t-1}

# 参数
# ----------
# x : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
#     由 `N` 个样本组成的1维信号
# alpha : float in [0, 1)
#     预加重系数。值为0表示无滤波

# 返回
# -------
# out : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
#     滤波后的信号
def preemphasis(x, alpha):
    return np.concatenate([x[:1], x[1:] - alpha * x[:-1])


# 倒谱提升
    # 在梅尔频率域中应用简单的正弦滤波器
    
    # 备注
    # 倒谱提升有助于平滑频谱包络并减弱较高MFCC系数的幅度,同时保持其他系数不变。滤波器函数为:
    # lifter( x_n ) = x_n * (1   (D * sin(pi * n / D) / 2))
    
    # 参数
    # mfccs: 形状为(G, C)的Mel倒谱系数矩阵。行对应帧,列对应倒谱系数
    # D: 在[0,  ∞]范围内的整数。滤波器系数。0表示无滤波,较大的值表示更大程度的平滑
    
    # 返回
    # out: 形状为(G, C)的倒谱提升后的MFCC系数
    def sinusoidal_filter(mfccs, D):
        # 如果D为0,则返回原始MFCC系数
        if D == 0:
            return mfccs
        # 生成n,范围为MFCC系数的列数
        n = np.arange(mfccs.shape[1])
        # 返回倒谱提升后的MFCC系数
        return mfccs * (1   (D / 2) * np.sin(np.pi * n / D))
# 计算信号 `x` 的 Mel 频谱图
def mel_spectrogram(
    x,
    window_duration=0.025,  # 每个帧/窗口的持续时间(秒)。默认为0.025。
    stride_duration=0.01,  # 连续窗口之间的跳跃持续时间(秒)。默认为0.01。
    mean_normalize=True,  # 是否从最终滤波器值中减去系数均值以提高信噪比。默认为True。
    window="hamming",  # 在FFT之前应用的窗函数。默认为'hamming'。
    n_filters=20,  # 包含在滤波器组中的 Mel 滤波器数量。默认为20。
    center=True,  # 信号的第 `k` 帧是否应该在索引 `x[k * stride_len]` 处*开始*(center = False)或在 `x[k * stride_len]` 处*居中*(center = True)。默认为False。
    alpha=0.95,  # 预加重滤波器的系数。值为0表示无滤波。默认为0.95。
    fs=44000,  # 信号的采样率/频率。默认为44000。
):
    """
    将 Mel 滤波器组应用于信号 `x` 的功率谱。

    Notes
    -----
    Mel 频谱图是对帧化和窗口化信号的功率谱在 Mel 滤波器组提供的基础上的投影。

    Parameters
    ----------
    x : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
        由 N 个样本组成的 1D 信号
    window_duration : float
        每个帧/窗口的持续时间(秒)。默认为0.025。
    stride_duration : float
        连续窗口之间的跳跃持续时间(秒)。默认为0.01。
    mean_normalize : bool
        是否从最终滤波器值中减去系数均值以提高信噪比。默认为True。
    window : {'hamming', 'hann', 'blackman_harris'}
        在FFT之前应用于信号的窗函数。默认为'hamming'。
    n_filters : int
        包含在滤波器组中的 Mel 滤波器数量。默认为20。
    center : bool
        信号的第 `k` 帧是否应该在索引 `x[k * stride_len]` 处*开始*(center = False)或在 `x[k * stride_len]` 处*居中*(center = True)。默认为False。
    alpha : float in [0, 1)
        预加重滤波器的系数。值为0表示无滤波。默认为0.95。
    fs : int
        信号的采样率/频率。默认为44000。

    Returns
    -------
    filter_energies : :py:class:`ndarray <numpy.ndarray>` of shape `(G, n_filters)`
        Mel 滤波器组中每个滤波器的(可能是均值归一化的)功率(即 Mel 频谱图)。行对应帧,列对应滤波器
    """
    # 每帧信号的总能量
    energy_per_frame : :py:class:`ndarray <numpy.ndarray>` of shape `(G,)`
        The total energy in each frame of the signal
    """
    # 机器精度
    eps = np.finfo(float).eps
    # 初始化窗口函数
    window_fn = WindowInitializer()(window)

    # 计算步长
    stride = round(stride_duration * fs)
    # 计算帧宽度
    frame_width = round(window_duration * fs)
    N = frame_width

    # 对原始信号应用预加重滤波器
    x = preemphasis(x, alpha)

    # 将信号转换为重叠帧并应用窗口函数
    x = np.pad(x, N // 2, "reflect") if center else x
    frames = to_frames(x, frame_width, stride, fs)

    # 生成窗口矩阵
    window = np.tile(window_fn(frame_width), (frames.shape[0], 1))
    frames = frames * window

    # 计算功率谱
    power_spec = power_spectrum(frames)
    energy_per_frame = np.sum(power_spec, axis=1)
    energy_per_frame[energy_per_frame == 0] = eps

    # 计算 Mel 滤波器组中每个滤波器的功率
    fbank = mel_filterbank(N, n_filters=n_filters, fs=fs)
    filter_energies = power_spec @ fbank.T
    filter_energies -= np.mean(filter_energies, axis=0) if mean_normalize else 0
    filter_energies[filter_energies == 0] = eps
    return filter_energies, energy_per_frame
# 计算信号的 Mel 频率倒谱系数(MFCC)

def mfcc(
    x,
    fs=44000,
    n_mfccs=13,
    alpha=0.95,
    center=True,
    n_filters=20,
    window="hann",
    normalize=True,
    lifter_coef=22,
    stride_duration=0.01,
    window_duration=0.025,
    replace_intercept=True,
):
    """
    计算信号的 Mel 频率倒谱系数(MFCC)。

    注意
    -----
    计算 MFCC 特征分为以下几个阶段:

        1. 将信号转换为重叠帧并应用窗口函数
        2. 计算每帧的功率谱
        3. 将 Mel 滤波器组应用于功率谱以获得 Mel 滤波器组功率
        4. 对每帧的 Mel 滤波器组功率取对数
        5. 对对数滤波器组能量进行离散余弦变换(DCT),并仅保留前 k 个系数以进一步降低维度

    MFCC 是在 HMM-GMM 自动语音识别(ASR)系统的背景下开发的,可用于提供某种程度上与说话者/音高无关的音素表示。

    参数
    ----------
    x : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
        由 N 个样本组成的 1D 信号
    fs : int
        信号的采样率/频率。默认值为 44000。
    n_mfccs : int
        要返回的倒谱系数的数量(包括截距系数)。默认值为 13。
    alpha : float in [0, 1)
        预加重系数。值为 0 表示无滤波。默认值为 0.95。
    center : bool
        信号的第 k 帧是否应该 *从* 索引 ``x[k * stride_len]`` 开始(center = False),
        还是 *居中* 在 ``x[k * stride_len]`` 处(center = True)。默认值为 True。
    # 设置用于 Mel 滤波器组的滤波器数量,默认为 20
    n_filters : int
        The number of filters to include in the Mel filterbank. Default is 20.
    # 是否对 MFCC 值进行均值归一化,默认为 True
    normalize : bool
        Whether to mean-normalize the MFCC values. Default is True.
    # 倒谱滤波器系数,0 表示不进行滤波,较大的值表示更多的平滑处理,默认为 22
    lifter_coef : int in :math:[0,   infty]`
        The cepstral filter coefficient. 0 corresponds to no filtering, larger
        values correspond to greater amounts of smoothing. Default is 22.
    # 应用于信号在进行 DFT 之前的窗函数,默认为 'hann'
    window : {'hamming', 'hann', 'blackman_harris'}
        The windowing function to apply to the signal before taking the DFT.
        Default is 'hann'.
    # 连续窗口之间的跳跃持续时间(以秒为单位),默认为 0.01
    stride_duration : float
        The duration of the hop between consecutive windows (in seconds).
        Default is 0.01.
    # 每个帧/窗口的持续时间(以秒为单位),默认为 0.025
    window_duration : float
        The duration of each frame / window (in seconds). Default is 0.025.
    # 是否用总帧能量的对数替换第一个 MFCC 系数(截距项),默认为 True
    replace_intercept : bool
        Replace the first MFCC coefficient (the intercept term) with the
        log of the total frame energy instead. Default is True.

    Returns
    -------
    # Mel 频率倒谱系数的矩阵,行对应帧,列对应倒谱系数
    mfccs : :py:class:`ndarray <numpy.ndarray>` of shape `(G, C)`
        Matrix of Mel-frequency cepstral coefficients. Rows correspond to
        frames, columns to cepstral coefficients
    """
    # 将(帧化   窗函数处理后的)`x` 的功率谱映射到 Mel 频率刻度上
    filter_energies, frame_energies = mel_spectrogram(
        x=x,
        fs=fs,
        alpha=alpha,
        center=center,
        window=window,
        n_filters=n_filters,
        mean_normalize=False,
        window_duration=window_duration,
        stride_duration=stride_duration,
    )

    # 计算滤波器能量的对数值
    log_energies = 10 * np.log10(filter_energies)

    # 对对数 Mel 系数执行 DCT 以进一步降低数据维度
    # 早期的 DCT 系数将捕获大部分数据,允许我们丢弃 > n_mfccs 的系数
    mfccs = np.array([DCT(frame) for frame in log_energies])[:, :n_mfccs]

    # 对 MFCC 应用倒谱提升
    mfccs = cepstral_lifter(mfccs, D=lifter_coef)
    # 如果需要对 MFCC 进行归一化,则减去每列的均值
    mfccs -= np.mean(mfccs, axis=0) if normalize else 0

    # 如果需要替换截距项
    if replace_intercept:
        # 第0个 MFCC 系数不提供关于频谱的信息;
        # 用帧能量的对数替换它,得到更有信息量的特征
        mfccs[:, 0] = np.log(frame_energies)
    # 返回处理后的 MFCC 特征
    return mfccs
# 将 mel 频率表示的信号转换为 Hz 频率表示
def mel2hz(mel, formula="htk"):
    # 检查输入的 formula 参数是否合法
    fstr = "formula must be either 'htk' or 'slaney' but got '{}'"
    assert formula in ["htk", "slaney"], fstr.format(formula)
    
    # 根据 formula 参数选择不同的转换公式
    if formula == "htk":
        return 700 * (10 ** (mel / 2595) - 1)
    
    # 如果 formula 参数不是 'htk',则抛出未实现的错误
    raise NotImplementedError("slaney")


# 将 Hz 频率表示的信号转换为 mel 频率表示
def hz2mel(hz, formula="htk"):
    # 检查输入的 formula 参数是否合法
    fstr = "formula must be either 'htk' or 'slaney' but got '{}'"
    assert formula in ["htk", "slaney"], fstr.format(formula)

    # 根据 formula 参数选择不同的转换公式
    if formula == "htk":
        return 2595 * np.log10(1   hz / 700)
    
    # 如果 formula 参数不是 'htk',则抛出未实现的错误
    raise NotImplementedError("slaney")


# 生成 mel 滤波器组
def mel_filterbank(
    N, n_filters=20, fs=44000, min_freq=0, max_freq=None, normalize=True
):
    # 计算 Mel 滤波器组并返回相应的转换矩阵

    # Mel 比例是一种感知比例,旨在模拟人耳的工作方式。在 Mel 比例上,被听众认为在感知/心理距离上相等的音高在 Mel 比例上具有相等的距离。实际上,这对应于在低频率具有更高分辨率,在高频率(> 500 Hz)具有较低分辨率的比例。

    # Mel 滤波器组中的每个滤波器都是三角形的,在其中心具有响应为 1,在两侧具有线性衰减,直到达到下一个相邻滤波器的中心频率。

    # 此实现基于(出色的)LibROSA软件包中的代码。

    # 参考资料
    # McFee 等人(2015年)。"librosa: Python中的音频和音乐信号分析",*第14届Python科学会议论文集*
    # https://librosa.github.io

    # 参数
    # N : int
    #     DFT 布尔数
    # n_filters : int
    #     要包含在滤波器组中的 Mel 滤波器数量。默认为 20。
    # min_freq : int
    #     最小滤波器频率(以 Hz 为单位)。默认为 0。
    # max_freq : int
    #     最大滤波器频率(以 Hz 为单位)。默认为 0。
    # fs : int
    #     信号的采样率/频率。默认为 44000。
    # normalize : bool
    #     如果为 True,则按其在 Mel 空间中的面积缩放 Mel 滤波器权重。默认为 True。

    # 返回
    # fbank : :py:class:`ndarray <numpy.ndarray>` of shape `(n_filters, N // 2   1)`
    #     Mel 滤波器组转换矩阵。行对应滤波器,列对应 DFT 布尔数。
    """
    # 如果 max_freq 为 None,则将其设置为 fs 的一半
    max_freq = fs / 2 if max_freq is None else max_freq
    # 将最小频率和最大频率转换为 Mel 单位
    min_mel, max_mel = hz2mel(min_freq), hz2mel(max_freq)

    # 创建一个形状为 (n_filters, N // 2   1) 的全零数组
    fbank = np.zeros((n_filters, N // 2   1))

    # 在 Mel 比例上均匀分布的值,转换回 Hz 单位
    # 根据最小和最大梅尔频率以及滤波器数量计算梅尔频率的范围
    mel_bins = mel2hz(np.linspace(min_mel, max_mel, n_filters   2))

    # 计算DFT频率区间的中心
    hz_bins = dft_bins(N, fs)

    # 计算相邻梅尔频率之间的间距
    mel_spacing = np.diff(mel_bins)

    # 计算梅尔频率和DFT频率之间的差值
    ramps = mel_bins.reshape(-1, 1) - hz_bins.reshape(1, -1)
    for i in range(n_filters):
        # 计算跨越频率区间左右两侧的滤波器值...
        left = -ramps[i] / mel_spacing[i]
        right = ramps[i   2] / mel_spacing[i   1]

        # ...并在它们穿过x轴时将它们设为零
        fbank[i] = np.maximum(0, np.minimum(left, right))

    # 如果需要进行归一化
    if normalize:
        # 计算能量归一化系数
        energy_norm = 2.0 / (mel_bins[2 : n_filters   2] - mel_bins[:n_filters])
        # 对滤波器组进行能量归一化
        fbank *= energy_norm[:, np.newaxis]

    # 返回滤波器组
    return fbank

numpy-mlnumpy_mlpreprocessinggeneral.py

代码语言:javascript复制
import json
import hashlib
import warnings

import numpy as np

try:
    from scipy.sparse import csr_matrix

    _SCIPY = True
except ImportError:
    # 如果导入失败,则发出警告并设置_SCIPY为False
    warnings.warn("Scipy not installed. FeatureHasher can only create dense matrices")
    _SCIPY = False


def minibatch(X, batchsize=256, shuffle=True):
    """
    Compute the minibatch indices for a training dataset.

    Parameters
    ----------
    X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, *)`
        The dataset to divide into minibatches. Assumes the first dimension
        represents the number of training examples.
    batchsize : int
        The desired size of each minibatch. Note, however, that if ``X.shape[0] %
        batchsize > 0`` then the final batch will contain fewer than batchsize
        entries. Default is 256.
    shuffle : bool
        Whether to shuffle the entries in the dataset before dividing into
        minibatches. Default is True.

    Returns
    -------
    mb_generator : generator
        A generator which yields the indices into `X` for each batch.
    n_batches: int
        The number of batches.
    """
    # 获取数据集的样本数量
    N = X.shape[0]
    # 创建包含样本索引的数组
    ix = np.arange(N)
    # 计算总共的批次数量
    n_batches = int(np.ceil(N / batchsize))

    # 如果需要打乱数据集
    if shuffle:
        np.random.shuffle(ix)

    # 定义生成器函数,用于生成每个批次的索引
    def mb_generator():
        for i in range(n_batches):
            yield ix[i * batchsize : (i   1) * batchsize]

    return mb_generator(), n_batches


class OneHotEncoder:
    def __init__(self):
        """
        Convert between category labels and their one-hot vector
        representations.

        Parameters
        ----------
        categories : list of length `C`
            List of the unique category labels for the items to encode.
        """
        # 初始化OneHotEncoder类的属性
        self._is_fit = False
        self.hyperparameters = {}
        self.parameters = {"categories": None}

    def __call__(self, labels):
        # 调用transform方法
        return self.transform(labels)
    def fit(self, categories):
        """
        Create mappings between columns and category labels.

        Parameters
        ----------
        categories : list of length `C`
            List of the unique category labels for the items to encode.
        """
        # 将传入的类别标签列表存储在参数字典中
        self.parameters["categories"] = categories
        # 创建类别标签到索引的映射字典
        self.cat2idx = {c: i for i, c in enumerate(categories)}
        # 创建索引到类别标签的映射字典
        self.idx2cat = {i: c for i, c in enumerate(categories)}
        # 标记已经进行了fit操作
        self._is_fit = True

    def transform(self, labels, categories=None):
        """
        Convert a list of labels into a one-hot encoding.

        Parameters
        ----------
        labels : list of length `N`
            A list of category labels.
        categories : list of length `C`
            List of the unique category labels for the items to encode. Default
            is None.

        Returns
        -------
        Y : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
            The one-hot encoded labels. Each row corresponds to an example,
            with a single 1 in the column corresponding to the respective
            label.
        """
        # 如果还没有进行fit操作,则根据传入的标签列表进行fit操作
        if not self._is_fit:
            categories = set(labels) if categories is None else categories
            self.fit(categories)

        # 检查是否有未识别的标签
        unknown = list(set(labels) - set(self.cat2idx.keys()))
        assert len(unknown) == 0, "Unrecognized label(s): {}".format(unknown)

        # 获取标签列表的长度和类别标签的数量
        N, C = len(labels), len(self.cat2idx)
        # 将标签列表转换为对应的索引列表
        cols = np.array([self.cat2idx[c] for c in labels])

        # 创建一个全零矩阵,用于存储one-hot编码后的标签
        Y = np.zeros((N, C))
        # 在每行中对应标签的位置设置为1
        Y[np.arange(N), cols] = 1
        return Y
    # 将一个独热编码转换回对应的标签

    def inverse_transform(self, Y):
        """
        Convert a one-hot encoding back into the corresponding labels

        Parameters
        ----------
        Y : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
            One-hot encoded labels. Each row corresponds to an example, with a
            single 1 in the column associated with the label for that example

        Returns
        -------
        labels : list of length `N`
            The list of category labels corresponding to the nonzero columns in
            `Y`
        """
        # 获取类别数量
        C = len(self.cat2idx)
        # 检查 Y 的维度是否为 2
        assert Y.ndim == 2, "Y must be 2D, but has shape {}".format(Y.shape)
        # 检查 Y 的列数是否与类别数量相等
        assert Y.shape[1] == C, "Y must have {} columns, got {}".format(C, Y.shape[1])
        # 返回 Y 中非零元素对应的类别标签
        return [self.idx2cat[ix] for ix in Y.nonzero()[1]]
class Standardizer:
    def __init__(self, with_mean=True, with_std=True):
        """
        Feature-wise standardization for vector inputs.

        Notes
        -----
        Due to the sensitivity of empirical mean and standard deviation
        calculations to extreme values, `Standardizer` cannot guarantee
        balanced feature scales in the presence of outliers. In particular,
        note that because outliers for each feature can have different
        magnitudes, the spread of the transformed data on each feature can be
        very different.

        Similar to sklearn, `Standardizer` uses a biased estimator for the
        standard deviation: ``numpy.std(x, ddof=0)``.

        Parameters
        ----------
        with_mean : bool
            Whether to scale samples to have 0 mean during transformation.
            Default is True.
        with_std : bool
            Whether to scale samples to have unit variance during
            transformation. Default is True.
        """
        # 初始化 Standardizer 类,设置是否计算均值和标准差
        self.with_mean = with_mean
        self.with_std = with_std
        self._is_fit = False

    @property
    def hyperparameters(self):
        # 返回超参数字典,包括是否计算均值和标准差
        H = {"with_mean": self.with_mean, "with_std": self.with_std}
        return H

    @property
    def parameters(self):
        # 返回参数字典,包括均值和标准差(如果已计算)
        params = {
            "mean": self._mean if hasattr(self, "mean") else None,
            "std": self._std if hasattr(self, "std") else None,
        }
        return params

    def __call__(self, X):
        # 调用 transform 方法对输入数据进行标准化
        return self.transform(X)
    def fit(self, X):
        """
        Store the feature-wise mean and standard deviation across the samples
        in `X` for future scaling.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
            An array of N samples, each with dimensionality `C`
        """
        # 如果输入不是 numpy 数组,则将其转换为 numpy 数组
        if not isinstance(X, np.ndarray):
            X = np.array(X)

        # 检查样本数量是否至少为 2
        if X.shape[0] < 2:
            raise ValueError("`X` must contain at least 2 samples")

        # 初始化标准差为 1,均值为 0
        std = np.ones(X.shape[1])
        mean = np.zeros(X.shape[1])

        # 如果需要计算均值,则计算样本的均值
        if self.with_mean:
            mean = np.mean(X, axis=0)

        # 如果需要计算标准差,则计算样本的标准差
        if self.with_std:
            std = np.std(X, axis=0, ddof=0)

        # 存储计算得到的均值和标准差
        self._mean = mean
        self._std = std
        self._is_fit = True

    def transform(self, X):
        """
        Standardize features by removing the mean and scaling to unit variance.

        For a sample `x`, the standardized score is calculated as:

        .. math::

            z = (x - u) / s

        where `u` is the mean of the training samples or zero if `with_mean` is
        False, and `s` is the standard deviation of the training samples or 1
        if `with_std` is False.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
            An array of N samples, each with dimensionality `C`.

        Returns
        -------
        Z : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
            The feature-wise standardized version of `X`.
        """
        # 如果没有进行拟合,则抛出异常
        if not self._is_fit:
            raise Exception("Must call `fit` before using the `transform` method")
        # 返回标准化后的结果
        return (X - self._mean) / self._std
    # 将标准化后的特征集合转换回原始特征空间
    def inverse_transform(self, Z):
        """
        Convert a collection of standardized features back into the original
        feature space.

        For a standardized sample `z`, the unstandardized score is calculated as:

        .. math::

            x = z s   u

        where `u` is the mean of the training samples or zero if `with_mean` is
        False, and `s` is the standard deviation of the training samples or 1
        if `with_std` is False.

        Parameters
        ----------
        Z : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
            An array of `N` standardized samples, each with dimensionality `C`.

        Returns
        -------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
            The unstandardized samples from `Z`.
        """
        # 检查是否已经拟合了`Standardizer`,如果没有则抛出异常
        assert self._is_fit, "Must fit `Standardizer` before calling inverse_transform"
        # 获取参数字典
        P = self.parameters
        # 获取平均值和标准差
        mean, std = P["mean"], P["std"]
        # 返回原始特征空间中的未标准化样本
        return Z * std   mean
# 定义一个特征哈希器类
class FeatureHasher:
    # 初始化方法,设置特征维度和稀疏性
    def __init__(self, n_dim=256, sparse=True):
        """
        Convert a collection of features to a fixed-dimensional matrix using
        the hashing trick.

        Notes
        -----
        Uses the md5 hash.

        Parameters
        ----------
        n_dim : int
            The dimensionality of each example in the output feature matrix.
            Small numbers of features are likely to cause hash collisions, but
            large numbers will cause larger overall parameter dimensions for
            any (linear) learning agent. Default is 256.
        sparse : bool
            Whether the resulting feature matrix should be a sparse
            :py:class:`csr_matrix <scipy.sparse.csr_matrix>` or dense
            :py:class:`ndarray <numpy.ndarray>`. Default is True.
        """
        # 设置特征维度
        self.n_dim = n_dim
        # 使用 md5 哈希算法
        self.hash = hashlib.md5
        # 设置是否使用稀疏矩阵
        self.sparse = sparse and _SCIPY
    # 将一组多特征示例编码为一个 `n_dim` 维特征矩阵,通过特征哈希

    # 特征哈希通过将哈希函数应用于示例的特征,并使用哈希值作为结果特征矩阵中的列索引来工作。
    # 每个哈希特征列上的条目对应于该示例和特征的值。例如,给定以下两个输入示例:

    # 定义示例
    >>> examples = [
        {"furry": 1, "quadruped": 1, "domesticated": 1},
        {"nocturnal": 1, "quadruped": 1},
    ]

    # 定义特征矩阵
    >>> feature_mat = zeros(2, 128)
    >>> ex1_cols = [H("furry"), H("quadruped"), H("domesticated")]
    >>> ex2_cols = [H("nocturnal"), H("quadruped")]
    >>> feat_mat[0, ex1_cols] = 1
    >>> feat_mat[1, ex2_cols] = 1

    # 为了更好地处理哈希冲突,通常将特征值乘以相应特征名称的摘要的符号。

    # 参数:
    # examples : dict or list of dicts
    #     一组 `N` 个示例,每个示例表示为一个字典,其中键对应于特征名称,值对应于特征值。

    # 返回:
    # table : :py:class:`ndarray <numpy.ndarray>` or :py:class:`csr_matrix <scipy.sparse.csr_matrix>` of shape `(N, n_dim)`
    #     编码后的特征矩阵
    def encode(self, examples):
        # 如果示例是字典,则转换为列表
        if isinstance(examples, dict):
            examples = [examples]

        # 根据稀疏性选择编码方式
        sparse = self.sparse
        return self._encode_sparse(examples) if sparse else self._encode_dense(examples)
    # 将稠密特征编码为稀疏矩阵
    def _encode_dense(self, examples):
        # 获取样本数量
        N = len(examples)
        # 创建一个全零矩阵,用于存储稠密特征
        table = np.zeros(N, self.n_dim)  # dense

        # 遍历每个样本
        for row, feat_dict in enumerate(examples):
            # 遍历每个特征及其值
            for f_id, val in feat_dict.items():
                # 如果特征ID是字符串,则转换为UTF-8编码
                if isinstance(f_id, str):
                    f_id = f_id.encode("utf-8")

                # 使用json模块将特征ID转换为与缓冲区API兼容的唯一字符串(哈希算法所需)
                if isinstance(f_id, (tuple, dict, list)):
                    f_id = json.dumps(f_id, sort_keys=True).encode("utf-8")

                # 计算特征ID的哈希值,并取模得到列索引
                h = int(self.hash(f_id).hexdigest(), base=16)
                col = h % self.n_dim
                # 更新稠密特征矩阵的值
                table[row, col]  = np.sign(h) * val

        # 返回稠密特征矩阵
        return table

    # 将稀疏特征编码为稀疏矩阵
    def _encode_sparse(self, examples):
        # 获取样本数量
        N = len(examples)
        # 初始化索引和数据列表
        idxs, data = [], []

        # 遍历每个样本
        for row, feat_dict in enumerate(examples):
            # 遍历每个特征及其值
            for f_id, val in feat_dict.items():
                # 如果特征ID是字符串,则转换为UTF-8编码
                if isinstance(f_id, str):
                    f_id = f_id.encode("utf-8")

                # 使用json模块将特征ID转换为与缓冲区API兼容的唯一字符串(哈希算法所需)
                if isinstance(f_id, (tuple, dict, list)):
                    f_id = json.dumps(f_id, sort_keys=True).encode("utf-8")

                # 计算特征ID的哈希值,并取模得到列索引
                h = int(self.hash(f_id).hexdigest(), base=16)
                col = h % self.n_dim
                # 将行索引和列索引添加到索引列表,将值添加到数据列表
                idxs.append((row, col))
                data.append(np.sign(h) * val)

        # 使用稀疏矩阵的构造函数创建稀疏矩阵
        table = csr_matrix((data, zip(*idxs)), shape=(N, self.n_dim))
        # 返回稀疏特征矩阵
        return table

0 人点赞