NumPyML 源码解析(一)

2024-02-17 10:01:42 浏览数 (1)

name: Bug/Performance Issue about: Use this template for reporting a bug or a performance issue. labels: bugfix

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 16.04):
  • Python version:
  • NumPy version:

Describe the current behavior

Describe the expected behavior

Code to reproduce the issue

Other info / logs

All Submissions
  • Is the code you are submitting your own work?
  • Have you followed the contributing guidelines?
  • Have you checked to ensure there aren’t other open Pull Requests for the same update/change?
New Model Submissions
  • Is the code you are submitting your own work?
  • Did you properly attribute the authors of any code you referenced?
  • Did you write unit tests for your new model?
  • Does your submission pass the unit tests?
  • Did you write documentation for your new model?
  • Have you formatted your code using the black deaults?
Changes to Existing Models
  • Have you added an explanation of what your changes do and why you’d like us to include them?
  • Have you written new tests for your changes, as applicable?
  • Have you successfully ran tests with your changes locally?

NumPy-ML Code of Conduct

Our Pledge

In the interest of fostering an open and welcoming environment, we as contributors and maintainers pledge to making participation in our project and our community a harassment-free experience for everyone, regardless of age, body size, disability, ethnicity, sex characteristics, gender identity and expression, level of experience, education, socio-economic status, nationality, personal appearance, race, religion, or sexual identity and orientation.

Our Standards

Examples of behavior that contributes to creating a positive environment include:

  • Using welcoming and inclusive language
  • Being respectful of differing viewpoints and experiences
  • Gracefully accepting constructive criticism
  • Focusing on what is best for the community
  • Showing empathy towards other community members

Examples of unacceptable behavior by participants include:

  • The use of sexualized language or imagery and unwelcome sexual attention or advances
  • Trolling, insulting/derogatory comments, and personal or political attacks
  • Public or private harassment
  • Publishing others’ private information, such as a physical or electronic address, without explicit permission
  • Other conduct which could reasonably be considered inappropriate in a professional setting

Our Responsibilities

Project maintainers are responsible for clarifying the standards of acceptable behavior and are expected to take appropriate and fair corrective action in response to any instances of unacceptable behavior.

Project maintainers have the right and responsibility to remove, edit, or reject comments, commits, code, wiki edits, issues, and other contributions that are not aligned to this Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors that they deem inappropriate, threatening, offensive, or harmful.

Scope

This Code of Conduct applies within all project spaces, and it also applies when an individual is representing the project or its community in public spaces. Examples of representing a project or community include using an official project e-mail address, posting via an official social media account, or acting as an appointed representative at an online or offline event. Representation of a project may be further defined and clarified by project maintainers.

Enforcement

Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by contacting the project team at ddbourgin@gmail.com. All complaints will be reviewed and investigated and will result in a response that is deemed necessary and appropriate to the circumstances. The project team is obligated to maintain confidentiality with regard to the reporter of an incident. Further details of specific enforcement policies may be posted separately.

Project maintainers who do not follow or enforce the Code of Conduct in good faith may face temporary or permanent repercussions as determined by other members of the project’s leadership.

Attribution

This Code of Conduct is adapted from the Contributor Covenant, version 1.4, available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html

For answers to common questions about this code of conduct, see https://www.contributor-covenant.org/faq

Contributing

Thank you for contributing to numpy-ml!

⚠️ ⚠️ All PRs should reflect earnest attempts at implementing a model yourself. ⚠️⚠️ It is fine to reference others’ code. It is not fine to blindly copy without attribution. When in doubt, please ask.

General guidelines
  1. Please include a clear list of what you’ve done
  2. For pull requests, please make sure all commits are atomic (i.e., one feature per commit)
  3. If you’re submitting a new model / feature / module, please include proper documentation and unit tests.
    • See the test.py file in one of the existing modules for examples of unit tests.
    • Documentation is loosely based on the NumPy docstring style. When in doubt, refer to existing examples
  4. Please format your code using the black defaults. You can use this online formatter.
Specific guidelines
I have a new model / model component to contribute

Awesome - create a pull request! When preparing your PR, please include a brief description of the model, the canonical reference(s) in the literature, and, most importantly unit tests against an existing implementation!

  • Refer to the test.py file in one of the existing modules for examples.
I have a major new enhancement / adjustment that will affect multiple models

Please post an issue with your proposal before you begin working on it. When outlining your proposal, please include as much detail about your intended changes as possible.

I found a bug

If there isn’t already an open issue, please start one! When creating your issue, include:

  1. A title and clear description
  2. As much relevant information as possible
  3. A code sample demonstrating the expected behavior that is not occurring
I fixed a bug

Thank you! Please open a new pull request with the patch. When doing so, ensure the PR description clearly describes the problem and solution. Include the relevant issue number if applicable.

numpy-mldocsconf.py

代码语言:javascript复制
# -*- coding: utf-8 -*-
#
# Configuration file for the Sphinx documentation builder.
#
# This file does only contain a selection of the most common options. For a
# full list see the documentation:
# http://www.sphinx-doc.org/en/master/config

# -- Path setup --------------------------------------------------------------

# 如果需要在其他目录中使用扩展(或要使用autodoc进行文档化的模块),在这里添加这些目录。
# 如果目录相对于文档根目录,请使用os.path.abspath使其绝对化,如下所示。
#
import os
import sys
import inspect

sys.path.insert(0, os.path.abspath(".."))


gh_url = "https://github.com/ddbourgin/numpy-ml"

# -- Project information -----------------------------------------------------

project = "numpy-ml"
copyright = "2022, David Bourgin"
author = "David Bourgin"

# The short X.Y version
version = "0.1"
# The full version, including alpha/beta/rc tags
release = "0.1.0"


# -- General configuration ---------------------------------------------------

# 如果文档需要最低的Sphinx版本,请在这里声明。
#
# needs_sphinx = '1.0'

# 在这里添加任何Sphinx扩展模块名称,作为字符串。它们可以是
# 与Sphinx一起提供的扩展(命名为'sphinx.ext.*')或您自定义的扩展。
extensions = [
    "sphinx.ext.autodoc",
    "sphinx.ext.doctest",
    "sphinx.ext.intersphinx",
    "sphinx.ext.todo",
    "sphinx.ext.coverage",
    "sphinx.ext.mathjax",
    "sphinx.ext.ifconfig",
    "sphinx.ext.githubpages",
    "sphinx.ext.napoleon",
    "sphinx.ext.linkcode"
    #  "numpydoc",
]

# 为了避免在read-the-docs构建过程中出现内存错误
autodoc_mock_imports = ["tensorflow", "torch", "gym"]

# 尝试在GitHub上链接到源代码
def linkcode_resolve(domain, info):
    if domain != "py":
        return None

    module = info.get("module", None)
    fullname = info.get("fullname", None)

    if not module or not fullname:
        return None
    # 获取指定模块的对象
    obj = sys.modules.get(module, None)
    # 如果对象不存在,则返回 None
    if obj is None:
        return None

    # 根据完整的模块名进行循环,逐级获取属性
    for part in fullname.split("."):
        obj = getattr(obj, part)
        # 如果属性是 property 类型,则获取其 fget 方法
        if isinstance(obj, property):
            obj = obj.fget

    # 尝试获取对象对应的源文件路径
    try:
        file = inspect.getsourcefile(obj)
        # 如果源文件路径为空,则返回 None
        if file is None:
            return None
    except:
        return None

    # 将文件路径转换为相对路径
    file = os.path.relpath(file, start=os.path.abspath(".."))
    # 获取对象对应源代码的起始行号和结束行号
    source, line_start = inspect.getsourcelines(obj)
    line_end = line_start   len(source) - 1
    # 构建包含文件名和行号范围的字符串
    filename = f"{file}#L{line_start}-L{line_end}"
    # 返回包含 GitHub URL 的完整链接
    return f"{gh_url}/blob/master/{filename}"
# 设置 Napoleon 插件的配置选项
napoleon_google_docstring = False
napoleon_numpy_docstring = True
napoleon_include_init_with_doc = False
napoleon_include_private_with_doc = False
napoleon_include_special_with_doc = False
napoleon_use_admonition_for_examples = False
napoleon_use_admonition_for_notes = False
napoleon_use_admonition_for_references = False
napoleon_use_ivar = True
napoleon_use_param = True
napoleon_use_rtype = False
napoleon_use_keyword = True

# 添加包含模板的路径,相对于当前目录
templates_path = ["_templates"]

# 源文件名的后缀
source_suffix = ".rst"

# 主目录文档
master_doc = "index"

# 自动生成内容的语言
language = None

# 忽略查找源文件时匹配的文件和目录的模式
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]

# 使用的 Pygments(语法高亮)样式
pygments_style = "friendly"

# 类文档的内容
autoclass_content = "both"

# HTML 输出选项
# 使用的主题
html_theme = "alabaster"

# 主题选项用于自定义主题的外观
# html_theme_options = {}
html_css_files = ["css/custom.css"]
# 添加包含自定义静态文件(如样式表)的路径,相对于当前目录
# 这些文件会在内置静态文件之后复制,因此名为"default.css"的文件将覆盖内置的"default.css"
html_static_path = ["_static"]

# 自定义侧边栏模板,必须是一个将文档名称映射到模板名称的字典
#
# 默认侧边栏(不匹配任何模式的文档)由主题自己定义。内置主题默认使用这些模板:['localtoc.html', 'relations.html', 'sourcelink.html', 'searchbox.html']
#
html_sidebars = {
    "**": [
        "about.html",
        "navigation.html",
        "relations.html",
        "searchbox.html",
        "donate.html",
    ]
}

html_theme_options = {
    "github_user": "ddbourgin",
    "github_repo": "numpy-ml",
    "description": "Machine learning, in NumPy",
    "github_button": True,
    "show_powered_by": False,
    "fixed_sidebar": True,
    "analytics_id": "UA-65839510-3",
    #  'logo': 'logo.png',
}

# -- 用于 HTML 帮助输出的选项 ---------------------------------------------

# HTML 帮助生成器的输出文件基本名称
htmlhelp_basename = "numpy-mldoc"

# -- 用于 LaTeX 输出的选项 --------------------------------------------

latex_elements = {
    # 纸张大小('letterpaper' 或 'a4paper')
    #
    # 'papersize': 'letterpaper',
    # 字体大小('10pt'、'11pt' 或 '12pt')
    #
    # 'pointsize': '10pt',
    # LaTeX 导言中的额外内容
    #
    # 'preamble': '',
    # LaTeX 图片(浮动)对齐
    #
    # 'figure_align': 'htbp',
}

# 将文档树分组为 LaTeX 文件。元组列表
# (源起始文件,目标名称,标题,
# 作者,文档类[howto、manual 或自定义类])。
latex_documents = [
    (master_doc, "numpy-ml.tex", "numpy-ml Documentation", "David Bourgin", "manual")
]

# -- 用于手册页输出的选项 ------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [(master_doc, "numpy-ml", "numpy-ml Documentation", [author], 1)]

# -- Options for Texinfo output ----------------------------------------------

# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
#  dir menu entry, description, category)
texinfo_documents = [
    (
        master_doc,
        "numpy-ml",
        "numpy-ml Documentation",
        author,
        "numpy-ml",
        "Machine learning, in NumPy.",
        "Miscellaneous",
    )
]

# -- Options for Epub output -------------------------------------------------

# Bibliographic Dublin Core info.
epub_title = project

# The unique identifier of the text. This can be a ISBN number
# or the project homepage.
#
# epub_identifier = ''

# A unique identification for the text.
#
# epub_uid = ''

# A list of files that should not be packed into the epub file.
epub_exclude_files = ["search.html"]

# Set the order in which members are documented in the autodoc output
autodoc_member_order = "bysource"

# -- Extension configuration -------------------------------------------------

# -- Options for intersphinx extension ---------------------------------------

# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
    "python": ("https://docs.python.org/", None),
    "numpy": ("http://docs.scipy.org/doc/numpy/", None),
}

# -- Options for todo extension ----------------------------------------------

# If true, `todo` and `todoList` produce output, else they produce nothing.
todo_include_todos = True

# -- Options for numpydocs extension -----------------------------------------
# https://numpydoc.readthedocs.io/en/latest/install.html

# Whether to produce plot:: directives for Examples sections that contain
# import matplotlib or from matplotlib import.
numpydoc_use_plots = True
# 是否自动显示类的所有成员在方法和属性部分,默认为True
numpydoc_show_class_members = True

# 是否自动显示类的所有继承成员在方法和属性部分,默认为True。如果为false,则不显示继承成员。默认为True。
numpydoc_show_inherited_class_members = True

# 是否为类方法和属性列表创建Sphinx目录。如果创建了目录,Sphinx期望每个条目有单独的页面。默认为False。
numpydoc_class_members_toctree = False

# 匹配引用的正则表达式,应该被修改以避免在文档中重复。默认为[w-] 。
numpydoc_citation_re = r"[w-] "

# 在版本0.8之前,参数定义显示为块引用,而不是在定义列表中。如果您的样式需要块引用,请将此配置选项切换为True。此选项将在版本0.10中删除。
numpydoc_use_blockquotes = False

# 是否以与参数部分相同的方式格式化类页面的属性部分。如果为False,则属性部分将以使用autosummary表的方法部分格式化。默认为True。
numpydoc_attributes_as_param_list = False

# 是否为docstring的Parameters、Other Parameters、Returns和Yields部分中的参数类型创建交叉引用。默认为False。
numpydoc_xref_param_type = False

# 完全限定路径的映射(或正确的ReST引用),用于指定参数类型时使用的别名/快捷方式。键不应包含任何空格。与intersphinx扩展一起使用,可以映射到任何文档中的链接。默认为空字典。此选项取决于numpydoc_xref_param_type选项为True。
numpydoc_xref_aliases = {}
# 定义一个集合,用于存储不需要交叉引用的单词。这些单词通常是在参数类型描述中使用的常见单词,可能会与相同名称的类混淆。例如:{'type','optional','default'}。默认为空集。
numpydoc_xref_ignore = set([])

# 是否在文档字符串后插入编辑链接。已弃用自版本编辑:您的HTML模板。
numpydoc_edit_link: bool

To build the documentation locally, install sphinx, cd into the docs, directory and run make html. Local files will be generated in the docs/_build/html directory.

numpy-mlnumpy_mlbanditsbandits.py

代码语言:javascript复制
# 导入必要的模块
from abc import ABC, abstractmethod
# 导入 numpy 库
import numpy as np
# 导入自定义的测试函数
from numpy_ml.utils.testing import random_one_hot_matrix, is_number

# 定义一个抽象类 Bandit,表示多臂赌博机环境
class Bandit(ABC):
    # 初始化函数,接受奖励和奖励概率列表以及上下文信息
    def __init__(self, rewards, reward_probs, context=None):
        # 断言奖励和奖励概率列表长度相同
        assert len(rewards) == len(reward_probs)
        # 初始化步数为 0
        self.step = 0
        # 记录赌博机臂数
        self.n_arms = len(rewards)

        super().__init__()

    # 返回对象的字符串表示形式
    def __repr__(self):
        """A string representation for the bandit"""
        # 获取超参数字典
        HP = self.hyperparameters
        # 格式化超参数字典为字符串
        params = ", ".join(["{}={}".format(k, v) for (k, v) in HP.items() if k != "id"])
        return "{}({})".format(HP["id"], params)

    # 返回超参数字典
    @property
    def hyperparameters(self):
        """A dictionary of the bandit hyperparameters"""
        return {}

    # 抽象方法,返回最优策略下的期望奖励
    @abstractmethod
    def oracle_payoff(self, context=None):
        """
        Return the expected reward for an optimal agent.

        Parameters
        ----------
        context : :py:class:`ndarray <numpy.ndarray>` of shape `(D, K)` or None
            The current context matrix for each of the bandit arms, if
            applicable. Default is None.

        Returns
        -------
        optimal_rwd : float
            The expected reward under an optimal policy.
        """
        pass
    # 从给定臂的收益分布中“拉取”(即采样)一个样本
    def pull(self, arm_id, context=None):
        """
        "Pull" (i.e., sample from) a given arm's payoff distribution.

        Parameters
        ----------
        arm_id : int
            The integer ID of the arm to sample from
        context : :py:class:`ndarray <numpy.ndarray>` of shape `(D,)` or None
            The context vector for the current timestep if this is a contextual
            bandit. Otherwise, this argument is unused and defaults to None.

        Returns
        -------
        reward : float
            The reward sampled from the given arm's payoff distribution
        """
        # 确保臂的ID小于臂的总数
        assert arm_id < self.n_arms

        # 增加步数计数器
        self.step  = 1
        # 调用内部方法_pull来实际执行拉取操作
        return self._pull(arm_id, context)

    # 重置赌博机的步数和动作计数器为零
    def reset(self):
        """Reset the bandit step and action counters to zero."""
        self.step = 0

    # 抽象方法,用于实际执行拉取操作,需要在子类中实现
    @abstractmethod
    def _pull(self, arm_id):
        pass
class MultinomialBandit(Bandit):
    # 多项式赌博机类,每个臂与不同的多项式支付分布相关联
    def __init__(self, payoffs, payoff_probs):
        """
        A multi-armed bandit where each arm is associated with a different
        multinomial payoff distribution.

        Parameters
        ----------
        payoffs : ragged list of length `K`
            The payoff values for each of the `n` bandits. ``payoffs[k][i]``
            holds the `i` th payoff value for arm `k`.
        payoff_probs : ragged list of length `K`
            A list of the probabilities associated with each of the payoff
            values in ``payoffs``. ``payoff_probs[k][i]`` holds the probability
            of payoff index `i` for arm `k`.
        """
        # 调用父类的构造函数
        super().__init__(payoffs, payoff_probs)

        # 检查每个臂的支付值和概率列表长度是否相等
        for r, rp in zip(payoffs, payoff_probs):
            assert len(r) == len(rp)
            np.testing.assert_almost_equal(sum(rp), 1.0)

        # 将支付值和概率列表转换为 NumPy 数组
        payoffs = np.array([np.array(x) for x in payoffs])
        payoff_probs = np.array([np.array(x) for x in payoff_probs])

        # 初始化实例变量
        self.payoffs = payoffs
        self.payoff_probs = payoff_probs
        self.arm_evs = np.array([sum(p * v) for p, v in zip(payoff_probs, payoffs)])
        self.best_ev = np.max(self.arm_evs)
        self.best_arm = np.argmax(self.arm_evs)

    @property
    def hyperparameters(self):
        """A dictionary of the bandit hyperparameters"""
        # 返回赌博机的超参数字典
        return {
            "id": "MultinomialBandit",
            "payoffs": self.payoffs,
            "payoff_probs": self.payoff_probs,
        }
    # 定义一个方法,用于返回最优策略下的预期奖励和最优臂
    def oracle_payoff(self, context=None):
        """
        Return the expected reward for an optimal agent.
    
        Parameters
        ----------
        context : :py:class:`ndarray <numpy.ndarray>` of shape `(D, K)` or None
            Unused. Default is None.
    
        Returns
        -------
        optimal_rwd : float
            The expected reward under an optimal policy.
        optimal_arm : float
            The arm ID with the largest expected reward.
        """
        # 返回最优奖励和最优臂
        return self.best_ev, self.best_arm
    
    # 定义一个私有方法,用于选择特定臂的奖励
    def _pull(self, arm_id, context):
        # 获取特定臂的奖励
        payoffs = self.payoffs[arm_id]
        # 获取特定臂的奖励概率
        probs = self.payoff_probs[arm_id]
        # 根据奖励概率随机选择奖励
        return np.random.choice(payoffs, p=probs)
class BernoulliBandit(Bandit):
    def __init__(self, payoff_probs):
        """
        A multi-armed bandit where each arm is associated with an independent
        Bernoulli payoff distribution.

        Parameters
        ----------
        payoff_probs : list of length `K`
            A list of the payoff probability for each arm. ``payoff_probs[k]``
            holds the probability of payoff for arm `k`.
        """
        # 初始化每个臂的奖励为1
        payoffs = [1] * len(payoff_probs)
        # 调用父类的初始化方法,传入奖励和概率
        super().__init__(payoffs, payoff_probs)

        # 检查每个臂的概率是否在0到1之间
        for p in payoff_probs:
            assert p >= 0 and p <= 1

        # 将奖励和概率转换为NumPy数组
        self.payoffs = np.array(payoffs)
        self.payoff_probs = np.array(payoff_probs)

        # 计算每个臂的期望值和最佳臂的期望值
        self.arm_evs = self.payoff_probs
        self.best_ev = np.max(self.arm_evs)
        self.best_arm = np.argmax(self.arm_evs)

    @property
    def hyperparameters(self):
        """A dictionary of the bandit hyperparameters"""
        # 返回包含超参数的字典
        return {
            "id": "BernoulliBandit",
            "payoff_probs": self.payoff_probs,
        }

    def oracle_payoff(self, context=None):
        """
        Return the expected reward for an optimal agent.

        Parameters
        ----------
        context : :py:class:`ndarray <numpy.ndarray>` of shape `(D, K)` or None
            Unused. Default is None.

        Returns
        -------
        optimal_rwd : float
            The expected reward under an optimal policy.
        optimal_arm : float
            The arm ID with the largest expected reward.
        """
        # 返回最佳臂的期望值和最佳臂的ID
        return self.best_ev, self.best_arm

    def _pull(self, arm_id, context):
        # 模拟拉动臂的动作,根据概率返回奖励
        return int(np.random.rand() <= self.payoff_probs[arm_id])


class GaussianBandit(Bandit):
    def __init__(self, payoff_dists, payoff_probs):
        """
        初始化函数,创建一个类似于BernoulliBandit的多臂赌博机,但每个臂的奖励值不是固定的1,而是从独立的高斯随机变量中抽样得到。

        Parameters
        ----------
        payoff_dists : 长度为`K`的2元组列表
            每个臂的奖励值分布的参数。具体来说,``payoffs[k]``是与臂`k`关联的奖励值高斯分布的均值和方差的元组。
        payoff_probs : 长度为`n`的列表
            每个奖励值在``payoffs``中的概率列表。``payoff_probs[k]``保存了臂`k`的奖励概率。
        """
        super().__init__(payoff_dists, payoff_probs)

        for (mean, var), rp in zip(payoff_dists, payoff_probs):
            assert var > 0
            assert np.testing.assert_almost_equal(sum(rp), 1.0)

        self.payoff_dists = payoff_dists
        self.payoff_probs = payoff_probs
        self.arm_evs = np.array([mu for (mu, var) in payoff_dists])
        self.best_ev = np.max(self.arm_evs)
        self.best_arm = np.argmax(self.arm_evs)

    @property
    def hyperparameters(self):
        """返回赌博机的超参数字典"""
        return {
            "id": "GaussianBandit",
            "payoff_dists": self.payoff_dists,
            "payoff_probs": self.payoff_probs,
        }

    def _pull(self, arm_id, context):
        mean, var = self.payoff_dists[arm_id]

        reward = 0
        if np.random.rand() < self.payoff_probs[arm_id]:
            reward = np.random.normal(mean, var)

        return reward
    # 定义一个方法,用于返回最佳策略下的预期奖励
    def oracle_payoff(self, context=None):
        """
        Return the expected reward for an optimal agent.

        Parameters
        ----------
        context : :py:class:`ndarray <numpy.ndarray>` of shape `(D, K)` or None
            Unused. Default is None.

        Returns
        -------
        optimal_rwd : float
            The expected reward under an optimal policy.
        optimal_arm : float
            The arm ID with the largest expected reward.
        """
        # 返回最佳策略下的预期奖励和最佳臂的ID
        return self.best_ev, self.best_arm
class ShortestPathBandit(Bandit):
    # 最短路径赌博机类,继承自Bandit类
    def __init__(self, G, start_vertex, end_vertex):
        """
        A weighted graph shortest path problem formulated as a multi-armed
        bandit.

        Notes
        -----
        Each arm corresponds to a valid path through the graph from start to
        end vertex. The agent's goal is to find the path that minimizes the
        expected sum of the weights on the edges it traverses.

        Parameters
        ----------
        G : :class:`Graph <numpy_ml.utils.graphs.Graph>` instance
            A weighted graph object. Weights can be fixed or probabilistic.
        start_vertex : int
            The index of the path's start vertex in the graph
        end_vertex : int
            The index of the path's end vertex in the graph
        """
        # 初始化函数,接受图G、起始顶点start_vertex和结束顶点end_vertex作为参数
        self.G = G
        self.end_vertex = end_vertex
        self.adj_dict = G.to_adj_dict()
        self.start_vertex = start_vertex
        self.paths = G.all_paths(start_vertex, end_vertex)

        self.arm_evs = self._calc_arm_evs()
        self.best_ev = np.max(self.arm_evs)
        self.best_arm = np.argmax(self.arm_evs)

        placeholder = [None] * len(self.paths)
        # 调用父类的初始化函数,传入占位符列表作为参数
        super().__init__(placeholder, placeholder)

    @property
    def hyperparameters(self):
        """A dictionary of the bandit hyperparameters"""
        # 返回赌博机的超参数字典
        return {
            "id": "ShortestPathBandit",
            "G": self.G,
            "end_vertex": self.end_vertex,
            "start_vertex": self.start_vertex,
        }
    # 返回最佳策略下的预期奖励和最佳臂的 ID
    def oracle_payoff(self, context=None):
        """
        Return the expected reward for an optimal agent.

        Parameters
        ----------
        context : :py:class:`ndarray <numpy.ndarray>` of shape `(D, K)` or None
            Unused. Default is None.

        Returns
        -------
        optimal_rwd : float
            The expected reward under an optimal policy.
        optimal_arm : float
            The arm ID with the largest expected reward.
        """
        # 返回最佳策略下的预期奖励和最佳臂的 ID
        return self.best_ev, self.best_arm

    # 计算每个臂的预期奖励
    def _calc_arm_evs(self):
        # 获取顶点映射函数
        I2V = self.G.get_vertex
        # 初始化预期奖励数组
        evs = np.zeros(len(self.paths))
        # 遍历所有路径
        for p_ix, path in enumerate(self.paths):
            # 遍历路径中的每个顶点
            for ix, v_i in enumerate(path[:-1]):
                # 获取当前边的权重
                e = [e for e in self.adj_dict[v_i] if e.to == I2V(path[ix   1])][0]
                # 累加边的权重到预期奖励中
                evs[p_ix] -= e.weight
        return evs

    # 拉动指定臂
    def _pull(self, arm_id, context):
        # 初始化奖励
        reward = 0
        # 获取顶点映射函数
        I2V = self.G.get_vertex
        # 获取指定臂对应的路径
        path = self.paths[arm_id]
        # 遍历路径中的每个顶点
        for ix, v_i in enumerate(path[:-1]):
            # 获取当前边的权重
            e = [e for e in self.adj_dict[v_i] if e.to == I2V(path[ix   1])][0]
            # 累加边的权重到奖励中
            reward -= e.weight
        return reward
class ContextualBernoulliBandit(Bandit):
    # 定义一个上下文版本的 BernoulliBandit 类,其中每个二进制上下文特征与独立的伯努利支付分布相关联
    def __init__(self, context_probs):
        """
        A contextual version of :class:`BernoulliBandit` where each binary
        context feature is associated with an independent Bernoulli payoff
        distribution.

        Parameters
        ----------
        context_probs : :py:class:`ndarray <numpy.ndarray>` of shape `(D, K)`
            A matrix of the payoff probabilities associated with each of the
            `D` context features, for each of the `K` arms. Index `(i, j)`
            contains the probability of payoff for arm `j` under context `i`.
        """
        # 获取上下文概率矩阵的形状
        D, K = context_probs.shape

        # 使用一个虚拟占位符变量来初始化 Bandit 超类
        placeholder = [None] * K
        super().__init__(placeholder, placeholder)

        # 设置上下文概率矩阵和臂的期望值
        self.context_probs = context_probs
        self.arm_evs = self.context_probs
        self.best_evs = self.arm_evs.max(axis=1)
        self.best_arms = self.arm_evs.argmax(axis=1)

    @property
    def hyperparameters(self):
        """A dictionary of the bandit hyperparameters"""
        # 返回一个包含 bandit 超参数的字典
        return {
            "id": "ContextualBernoulliBandit",
            "context_probs": self.context_probs,
        }

    def get_context(self):
        """
        Sample a random one-hot context vector. This vector will be the same
        for all arms.

        Returns
        -------
        context : :py:class:`ndarray <numpy.ndarray>` of shape `(D, K)`
            A random `D`-dimensional one-hot context vector repeated for each
            of the `K` bandit arms.
        """
        # 获取上下文概率矩阵的形状
        D, K = self.context_probs.shape
        # 创建一个全零矩阵作为上下文向量
        context = np.zeros((D, K))
        # 在随机选择的维度上设置为 1,生成一个随机的 one-hot 上下文向量
        context[np.random.choice(D), :] = 1
        return random_one_hot_matrix(1, D).ravel()
    # 计算在给定上下文下,最优策略的预期奖励和最优臂
    def oracle_payoff(self, context):
        """
        Return the expected reward for an optimal agent.

        Parameters
        ----------
        context : :py:class:`ndarray <numpy.ndarray>` of shape `(D, K)` or None
            The current context matrix for each of the bandit arms.

        Returns
        -------
        optimal_rwd : float
            The expected reward under an optimal policy.
        optimal_arm : float
            The arm ID with the largest expected reward.
        """
        # 获取上下文矩阵中第一列的最大值所在的索引
        context_id = context[:, 0].argmax()
        # 返回最优策略下的预期奖励和最优臂
        return self.best_evs[context_id], self.best_arms[context_id]

    # 执行拉动动作,返回奖励
    def _pull(self, arm_id, context):
        # 获取上下文概率矩阵的维度
        D, K = self.context_probs.shape
        # 计算选择的臂的概率
        arm_probs = context[:, arm_id] @ self.context_probs
        # 生成服从该臂概率分布的奖励
        arm_rwds = (np.random.rand(K) <= arm_probs).astype(int)
        # 返回选择臂的奖励
        return arm_rwds[arm_id]
class ContextualLinearBandit(Bandit):
    def __init__(self, K, D, payoff_variance=1):
        r"""
        A contextual linear multi-armed bandit.

        Notes
        -----
        In a contextual linear bandit the expected payoff of an arm :math:`a
        in mathcal{A}` at time `t` is a linear combination of its context
        vector :math:`mathbf{x}_{t,a}` with a coefficient vector
        :math:`theta_a`:

        .. math::

            mathbb{E}[r_{t, a} mid mathbf{x}_{t, a}] = mathbf{x}_{t,a}^top theta_a

        In this implementation, the arm coefficient vectors :math:`theta` are
        initialized independently from a uniform distribution on the interval
        [-1, 1], and the specific reward at timestep `t` is normally
        distributed:

        .. math::

            r_{t, a} mid mathbf{x}_{t, a} sim
                mathcal{N}(mathbf{x}_{t,a}^top theta_a, sigma_a^2)

        Parameters
        ----------
        K : int
            The number of bandit arms
        D : int
            The dimensionality of the context vectors
        payoff_variance : float or :py:class:`ndarray <numpy.ndarray>` of shape `(K,)`
            The variance of the random noise in the arm payoffs. If a float,
            the variance is assumed to be equal for each arm. Default is 1.
        """
        # 如果 payoff_variance 是一个数字,则将其扩展为长度为 K 的列表
        if is_number(payoff_variance):
            payoff_variance = [payoff_variance] * K

        # 确保 payoff_variance 的长度等于 K
        assert len(payoff_variance) == K
        # 确保 payoff_variance 中的值都大于 0
        assert all(v > 0 for v in payoff_variance)

        # 初始化 K、D 和 payoff_variance 属性
        self.K = K
        self.D = D
        self.payoff_variance = payoff_variance

        # 使用占位符变量初始化 Bandit 超类
        placeholder = [None] * K
        super().__init__(placeholder, placeholder)

        # 初始化 theta 矩阵
        self.thetas = np.random.uniform(-1, 1, size=(D, K))
        self.thetas /= np.linalg.norm(self.thetas, 2)

    @property
    def hyperparameters(self):
        """返回一个字典,包含赌博机的超参数"""
        return {
            "id": "ContextualLinearBandit",
            "K": self.K,
            "D": self.D,
            "payoff_variance": self.payoff_variance,
        }

    @property
    def parameters(self):
        """返回当前赌博机参数的字典"""
        return {"thetas": self.thetas}

    def get_context(self):
        """
        从多元标准正态分布中抽样每个臂的上下文向量。

        Returns
        -------
        context : :py:class:`ndarray <numpy.ndarray>` of shape `(D, K)`
            对于每个 `K` 赌博机臂,从标准正态分布中抽样的 `D` 维上下文向量。
        """
        return np.random.normal(size=(self.D, self.K))

    def oracle_payoff(self, context):
        """
        返回最佳策略下的预期奖励。

        Parameters
        ----------
        context : :py:class:`ndarray <numpy.ndarray>` of shape `(D, K)` or None
            每个赌博机臂的当前上下文矩阵,如果适用。默认为 None。

        Returns
        -------
        optimal_rwd : float
            最佳策略下的预期奖励。
        optimal_arm : float
            具有最大预期奖励的臂的 ID。
        """
        best_arm = np.argmax(self.arm_evs)
        return self.arm_evs[best_arm], best_arm

    def _pull(self, arm_id, context):
        K, thetas = self.K, self.thetas
        self._noise = np.random.normal(scale=self.payoff_variance, size=self.K)
        self.arm_evs = np.array([context[:, k] @ thetas[:, k] for k in range(K)])
        return (self.arm_evs   self._noise)[arm_id]

numpy-mlnumpy_mlbanditspolicies.py

代码语言:javascript复制
# 一个包含各种多臂赌博问题探索策略的模块
from abc import ABC, abstractmethod
from collections import defaultdict
import numpy as np
from ..utils.testing import is_number

# 定义一个多臂赌博策略的基类
class BanditPolicyBase(ABC):
    def __init__(self):
        """一个简单的多臂赌博策略基类"""
        # 初始化步数为0,估计值为空字典,未初始化标志为False
        self.step = 0
        self.ev_estimates = {}
        self.is_initialized = False
        super().__init__()

    def __repr__(self):
        """返回策略的字符串表示"""
        HP = self.hyperparameters
        params = ", ".join(["{}={}".format(k, v) for (k, v) in HP.items() if k != "id"])
        return "{}({})".format(HP["id"], params)

    @property
    def hyperparameters(self):
        """包含策略超参数的字典"""
        pass

    @property
    def parameters(self):
        """包含当前策略参数的字典"""
        pass

    def act(self, bandit, context=None):
        """
        选择一个臂并从其收益分布中采样。

        Parameters
        ----------
        bandit : :class:`Bandit <numpy_ml.bandits.bandits.Bandit>` 实例
            要操作的多臂赌博机
        context : :py:class:`ndarray <numpy.ndarray>` 形状为 `(D,)` 或 None
            如果与上下文多臂赌博机交互,则为当前时间步的上下文向量。否则,此参数未使用。默认为None。

        Returns
        -------
        rwd : float
            拉动 ``arm_id`` 后收到的奖励。
        arm_id : int
            生成 ``rwd`` 的被拉动的臂。
        """
        if not self.is_initialized:
            self._initialize_params(bandit)

        arm_id = self._select_arm(bandit, context)
        rwd = self._pull_arm(bandit, arm_id, context)
        self._update_params(arm_id, rwd, context)
        return rwd, arm_id
    # 重置策略参数和计数器到初始状态
    def reset(self):
        """Reset the policy parameters and counters to their initial states."""
        self.step = 0
        self._reset_params()
        self.is_initialized = False

    # 执行一个摇臂动作并返回收到的奖励
    def _pull_arm(self, bandit, arm_id, context):
        """Execute a bandit action and return the received reward."""
        self.step  = 1
        return bandit.pull(arm_id, context)

    # 根据当前上下文选择一个摇臂
    @abstractmethod
    def _select_arm(self, bandit, context):
        """Select an arm based on the current context"""
        pass

    # 在交互后更新策略参数
    @abstractmethod
    def _update_params(self, bandit, context):
        """Update the policy parameters after an interaction"""
        pass

    # 初始化依赖于摇臂环境信息的任何策略特定参数
    @abstractmethod
    def _initialize_params(self, bandit):
        """
        Initialize any policy-specific parameters that depend on information
        from the bandit environment.
        """
        pass

    # 重置任何模型特定参数,在公共的 `self.reset()` 方法中调用
    @abstractmethod
    def _reset_params(self):
        """
        Reset any model-specific parameters. This gets called within the
        public `self.reset()` method.
        """
        pass
class EpsilonGreedy(BanditPolicyBase):
    # 初始化函数,创建一个 epsilon-greedy 策略对象
    def __init__(self, epsilon=0.05, ev_prior=0.5):
        r"""
        An epsilon-greedy policy for multi-armed bandit problems.

        Notes
        -----
        Epsilon-greedy policies greedily select the arm with the highest
        expected payoff with probability :math:`1-epsilon`, and selects an arm
        uniformly at random with probability :math:`epsilon`:

        .. math::

            P(a) = left{
                 begin{array}{lr}
                   epsilon / N   (1 - epsilon) &text{if }
                        a = arg max_{a' in mathcal{A}}
                            mathbb{E}_{q_{hat{theta}}}[r mid a']\
                   epsilon / N &text{otherwise}
                 end{array}
               right.

        where :math:`N = |mathcal{A}|` is the number of arms,
        :math:`q_{hat{theta}}` is the estimate of the arm payoff
        distribution under current model parameters :math:`hat{theta}`, and
        :math:`mathbb{E}_{q_{hat{theta}}}[r mid a']` is the expected
        reward under :math:`q_{hat{theta}}` of receiving reward `r` after
        taking action :math:`a'`.

        Parameters
        ----------
        epsilon : float in [0, 1]
            The probability of taking a random action. Default is 0.05.
        ev_prior : float
            The starting expected payoff for each arm before any data has been
            observed. Default is 0.5.
        """
        # 调用父类的初始化函数
        super().__init__()
        # 设置 epsilon 参数
        self.epsilon = epsilon
        # 设置 ev_prior 参数
        self.ev_prior = ev_prior
        # 创建一个默认值为 0 的字典,用于记录每个臂被拉动的次数
        self.pull_counts = defaultdict(lambda: 0)

    @property
    # 返回当前策略的参数字典
    def parameters(self):
        """A dictionary containing the current policy parameters"""
        return {"ev_estimates": self.ev_estimates}

    @property
    # 返回包含策略超参数的字典
    def hyperparameters(self):
        """A dictionary containing the policy hyperparameters"""
        return {
            "id": "EpsilonGreedy",
            "epsilon": self.epsilon,
            "ev_prior": self.ev_prior,
        }

    # 初始化依赖于bandit环境信息的策略特定参数
    def _initialize_params(self, bandit):
        """
        Initialize any policy-specific parameters that depend on information
        from the bandit environment.
        """
        # 初始化每个臂的估计值为先验值
        self.ev_estimates = {i: self.ev_prior for i in range(bandit.n_arms)}
        # 标记已初始化
        self.is_initialized = True

    # 选择臂的方法
    def _select_arm(self, bandit, context=None):
        # 根据epsilon贪心策略选择臂
        if np.random.rand() < self.epsilon:
            arm_id = np.random.choice(bandit.n_arms)
        else:
            # 选择估计值最大的臂
            ests = self.ev_estimates
            (arm_id, _) = max(ests.items(), key=lambda x: x[1])
        return arm_id

    # 更新参数的方法
    def _update_params(self, arm_id, reward, context=None):
        # 获取估计值和拉动次数
        E, C = self.ev_estimates, self.pull_counts
        # 更新拉动次数
        C[arm_id]  = 1
        # 更新估计值
        E[arm_id]  = (reward - E[arm_id]) / (C[arm_id])

    # 重置参数的方法
    def _reset_params(self):
        """
        Reset any model-specific parameters. This gets called within the
        public `self.reset()` method.
        """
        # 重置估计值和拉动次数
        self.ev_estimates = {}
        self.pull_counts = defaultdict(lambda: 0)
class UCB1(BanditPolicyBase):
    # 初始化UCB1算法的参数,包括置信度参数C和初始期望值ev_prior
    def __init__(self, C=1, ev_prior=0.5):
        r"""
        A UCB1 policy for multi-armed bandit problems.

        Notes
        -----
        The UCB1 algorithm [*]_ guarantees the cumulative regret is bounded by log
        `t`, where `t` is the current timestep. To make this guarantee UCB1
        assumes all arm payoffs are between 0 and 1.

        Under UCB1, the upper confidence bound on the expected value for
        pulling arm `a` at timestep `t` is:

        .. math::

            text{UCB}(a, t) = text{EV}_t(a)   C sqrt{frac{2 log t}{N_t(a)}}

        where :math:`text{EV}_t(a)` is the average of the rewards recieved so
        far from pulling arm `a`, `C` is a free parameter controlling the
        "optimism" of the confidence upper bound for :math:`text{UCB}(a, t)`
        (for logarithmic regret bounds, `C` must equal 1), and :math:`N_t(a)`
        is the number of times arm `a` has been pulled during the previous `t -
        1` timesteps.

        References
        ----------
        .. [*] Auer, P., Cesa-Bianchi, N., & Fischer, P. (2002). Finite-time
           analysis of the multiarmed bandit problem. *Machine Learning,
           47(2)*.

        Parameters
        ----------
        C : float in (0,  infinity)
            A confidence/optimisim parameter affecting the degree of
            exploration, where larger values encourage greater exploration. The
            UCB1 algorithm assumes `C=1`. Default is 1.
        ev_prior : float
            The starting expected value for each arm before any data has been
            observed. Default is 0.5.
        """
        # 设置参数C和ev_prior
        self.C = C
        self.ev_prior = ev_prior
        # 调用父类的初始化方法
        super().__init__()

    @property
    def parameters(self):
        """A dictionary containing the current policy parameters"""
        # 返回包含当前策略参数的字典
        return {"ev_estimates": self.ev_estimates}

    @property
    # 返回包含策略超参数的字典
    def hyperparameters(self):
        """A dictionary containing the policy hyperparameters"""
        return {
            "C": self.C,
            "id": "UCB1",
            "ev_prior": self.ev_prior,
        }

    # 初始化依赖于赌博环境信息的任何特定于策略的参数
    def _initialize_params(self, bandit):
        """
        Initialize any policy-specific parameters that depend on information
        from the bandit environment.
        """
        self.ev_estimates = {i: self.ev_prior for i in range(bandit.n_arms)}
        self.is_initialized = True

    # 选择臂,计算每个臂的得分并返回得分最高的臂
    def _select_arm(self, bandit, context=None):
        # 添加 eps 以避免在每个臂的第一次拉动时出现除零错误
        eps = np.finfo(float).eps
        N, T = bandit.n_arms, self.step   1
        E, C = self.ev_estimates, self.pull_counts
        scores = [E[a]   self.C * np.sqrt(np.log(T) / (C[a]   eps)) for a in range(N)]
        return np.argmax(scores)

    # 更新参数,根据臂的奖励更新估计值和计数
    def _update_params(self, arm_id, reward, context=None):
        E, C = self.ev_estimates, self.pull_counts
        C[arm_id]  = 1
        E[arm_id]  = (reward - E[arm_id]) / (C[arm_id])

    # 重置模型特定的参数,在公共的 reset 方法中调用
    def _reset_params(self):
        """
        Reset any model-specific parameters. This gets called within the
        public :method:`reset` method.
        """
        self.ev_estimates = {}
        self.pull_counts = defaultdict(lambda: 0)
class ThompsonSamplingBetaBinomial(BanditPolicyBase):
    # 返回当前策略参数的字典
    @property
    def parameters(self):
        """A dictionary containing the current policy parameters"""
        return {
            "ev_estimates": self.ev_estimates,
            "alphas": self.alphas,
            "betas": self.betas,
        }

    # 返回策略超参数的字典
    @property
    def hyperparameters(self):
        """A dictionary containing the policy hyperparameters"""
        return {
            "id": "ThompsonSamplingBetaBinomial",
            "alpha": self.alpha,
            "beta": self.beta,
        }

    # 初始化参数
    def _initialize_params(self, bandit):
        bhp = bandit.hyperparameters
        fstr = "ThompsonSamplingBetaBinomial only defined for BernoulliBandit, got: {}"
        assert bhp["id"] == "BernoulliBandit", fstr.format(bhp["id"])

        # 初始化模型先验
        if is_number(self.alpha):
            self.alphas = [self.alpha] * bandit.n_arms
        if is_number(self.beta):
            self.betas = [self.beta] * bandit.n_arms
        assert len(self.alphas) == len(self.betas) == bandit.n_arms

        # 为每个手臂计算期望值估计
        self.ev_estimates = {i: self._map_estimate(i, 1) for i in range(bandit.n_arms)}
        self.is_initialized = True

    # 选择手臂
    def _select_arm(self, bandit, context):
        if not self.is_initialized:
            self._initialize_prior(bandit)

        # 从当前模型后验中抽取一个样本
        posterior_sample = np.random.beta(self.alphas, self.betas)

        # 基于这个样本贪婪地选择一个动作
        return np.argmax(posterior_sample)

    # 更新参数
    def _update_params(self, arm_id, rwd, context):
        """
        Compute the parameters of the Beta posterior, P(payoff prob | rwd),
        for arm `arm_id`.
        """
        # 更新 Beta 后验的参数,P(获胜概率 | 收益)
        self.alphas[arm_id]  = rwd
        self.betas[arm_id]  = 1 - rwd
        self.ev_estimates[arm_id] = self._map_estimate(arm_id, rwd)
    # 计算当前臂的概率支付概率的 MAP 估计值
    def _map_estimate(self, arm_id, rwd):
        """Compute the current MAP estimate for an arm's payoff probability"""
        # 获取当前臂的 alpha 和 beta 参数
        A, B = self.alphas, self.betas
        # 根据 alpha 和 beta 参数的取值情况计算 MAP 估计值
        if A[arm_id] > 1 and B[arm_id] > 1:
            map_payoff_prob = (A[arm_id] - 1) / (A[arm_id]   B[arm_id] - 2)
        elif A[arm_id] < 1 and B[arm_id] < 1:
            map_payoff_prob = rwd  # 0 or 1 equally likely, make a guess
        elif A[arm_id] <= 1 and B[arm_id] > 1:
            map_payoff_prob = 0
        elif A[arm_id] > 1 and B[arm_id] <= 1:
            map_payoff_prob = 1
        else:
            map_payoff_prob = 0.5
        return map_payoff_prob

    # 重置模型特定的参数,该方法在公共的 `self.reset()` 方法中被调用
    def _reset_params(self):
        """
        Reset any model-specific parameters. This gets called within the
        public `self.reset()` method.
        """
        # 重置 alpha 和 beta 参数列表
        self.alphas, self.betas = [], []
        # 重置期望值估计字典
        self.ev_estimates = {}
class LinUCB(BanditPolicyBase):
    def __init__(self, alpha=1):
        """
        A disjoint linear UCB policy [*]_ for contextual linear bandits.

        Notes
        -----
        LinUCB is only defined for :class:`ContextualLinearBandit <numpy_ml.bandits.ContextualLinearBandit>` environments.

        References
        ----------
        .. [*] Li, L., Chu, W., Langford, J., & Schapire, R. (2010). A
           contextual-bandit approach to personalized news article
           recommendation. In *Proceedings of the 19th International Conference
           on World Wide Web*, 661-670.

        Parameters
        ----------
        alpha : float
            A confidence/optimisim parameter affecting the amount of
            exploration. Default is 1.
        """  # noqa
        # 调用父类的构造函数
        super().__init__()

        # 初始化参数 alpha
        self.alpha = alpha
        # 初始化参数 A 和 b 为空列表
        self.A, self.b = [], []
        # 初始化标志位为 False
        self.is_initialized = False

    @property
    def parameters(self):
        """A dictionary containing the current policy parameters"""
        # 返回当前策略参数的字典
        return {"ev_estimates": self.ev_estimates, "A": self.A, "b": self.b}

    @property
    def hyperparameters(self):
        """A dictionary containing the policy hyperparameters"""
        # 返回策略的超参数字典
        return {
            "id": "LinUCB",
            "alpha": self.alpha,
        }

    def _initialize_params(self, bandit):
        """
        Initialize any policy-specific parameters that depend on information
        from the bandit environment.
        """
        # 获取环境的超参数
        bhp = bandit.hyperparameters
        # 检查环境是否为 ContextualLinearBandit 类型
        fstr = "LinUCB only defined for contextual linear bandits, got: {}"
        assert bhp["id"] == "ContextualLinearBandit", fstr.format(bhp["id"])

        # 初始化参数 A 和 b
        self.A, self.b = [], []
        # 根据环境的臂数初始化 A 和 b
        for _ in range(bandit.n_arms):
            self.A.append(np.eye(bandit.D))
            self.b.append(np.zeros(bandit.D))

        # 设置初始化标志位为 True
        self.is_initialized = True
    # 选择最优的臂,根据每个臂的概率计算
    def _select_arm(self, bandit, context):
        # 初始化概率列表
        probs = []
        # 遍历每个臂
        for a in range(bandit.n_arms):
            # 获取当前臂的上下文信息、参数 A 和 b
            C, A, b = context[:, a], self.A[a], self.b[a]
            # 计算参数 A 的逆矩阵
            A_inv = np.linalg.inv(A)
            # 计算参数 theta_hat
            theta_hat = A_inv @ b
            # 计算当前臂的概率 p
            p = theta_hat @ C   self.alpha * np.sqrt(C.T @ A_inv @ C)

            # 将当前臂的概率添加到概率列表中
            probs.append(p)
        # 返回概率最大的臂的索引
        return np.argmax(probs)

    # 更新参数 A 和 b
    def _update_params(self, arm_id, rwd, context):
        """Compute the parameters for A and b."""
        # 更新参数 A
        self.A[arm_id]  = context[:, arm_id] @ context[:, arm_id].T
        # 更新参数 b
        self.b[arm_id]  = rwd * context[:, arm_id]

    # 重置模型特定的参数
    def _reset_params(self):
        """
        Reset any model-specific parameters. This gets called within the
        public `self.reset()` method.
        """
        # 重置参数 A 和 b
        self.A, self.b = [], []
        # 重置预估值字典
        self.ev_estimates = {}

Bandits

The bandit.py module includes several simple multi-arm bandit environments.

The policies.py module implements a number of standard multi-arm bandit policies.

  1. Bandits
    • MAB: Bernoulli, Multinomial, and Gaussian payout distributions
    • Contextual MAB: Linear contextual bandits
  2. Policies
    • Epsilon-greedy
    • UCB1 (Auer, Cesa-Bianchi, & Fisher, 2002)
    • Conjugate Thompson sampler for Bernoulli bandits (Thompson, 1933; Chapelle & Li, 2010)
    • LinUCB (Li, Chu, Langford, & Schapire, 2010)

Plots

numpy-mlnumpy_mlbanditstrainer.py

代码语言:javascript复制
# 为执行和比较多臂赌博(MAB)策略而创建的训练器/运行器对象
import warnings
import os.path as op
from collections import defaultdict

import numpy as np

# 导入自定义的依赖警告类
from numpy_ml.utils.testing import DependencyWarning

# 尝试导入 matplotlib 库,如果导入失败则禁用绘图功能
try:
    import matplotlib.pyplot as plt

    _PLOTTING = True
except ImportError:
    fstr = "Cannot import matplotlib. Plotting functionality disabled."
    warnings.warn(fstr, DependencyWarning)
    _PLOTTING = False


# 返回包含 `trainer.py` 脚本的目录
def get_scriptdir():
    return op.dirname(op.realpath(__file__))


# 计算策略对期望臂收益的估计与真实期望收益之间的均方误差
def mse(bandit, policy):
    if not hasattr(policy, "ev_estimates") or len(policy.ev_estimates) == 0:
        return np.nan

    se = []
    evs = bandit.arm_evs
    ests = sorted(policy.ev_estimates.items(), key=lambda x: x[0])
    for ix, (est, ev) in enumerate(zip(ests, evs)):
        se.append((est[1] - ev) ** 2)
    return np.mean(se)


# 计算前一个值和当前值的简单加权平均
def smooth(prev, cur, weight):
    r"""
    Compute a simple weighted average of the previous and current value.

    Notes
    -----
    The smoothed value at timestep `t`, :math:`tilde{X}_t` is calculated as

    .. math::

        tilde{X}_t = epsilon tilde{X}_{t-1}   (1 - epsilon) X_t

    where :math:`X_t` is the value at timestep `t`, :math:`tilde{X}_{t-1}` is
    the value of the smoothed signal at timestep `t-1`, and :math:`epsilon` is
    the smoothing weight.

    Parameters
    ----------
    prev : float or :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
        The value of the smoothed signal at the immediately preceding
        timestep.
    cur : float or :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
        The value of the signal at the current timestep
    # weight: 平滑权重,可以是浮点数或形状为`(N,)`的数组
    #         值越接近0,平滑效果越弱;值越接近1,平滑效果越强
    #         如果weight是一个数组,每个维度将被解释为一个单独的平滑权重,对应于`cur`中的相应维度

    # 返回值
    # smoothed: 平滑后的信号,可以是浮点数或形状为`(N,)`的数组
    #           返回平滑后的信号
    """
    返回加权平滑后的结果
    """
    return weight * prev   (1 - weight) * cur
class BanditTrainer:
    def __init__(self):
        """
        初始化 BanditTrainer 类的实例,用于多臂赌博机的训练、比较和评估。
        """
        self.logs = {}

    def compare(
        self,
        policies,
        bandit,
        n_trials,
        n_duplicates,
        plot=True,
        seed=None,
        smooth_weight=0.999,
        out_dir=None,
    """
    比较不同策略在给定赌博机上的性能,包括进行多次试验、绘制图表等。
    """

    def train(
        self,
        policy,
        bandit,
        n_trials,
        n_duplicates,
        plot=True,
        axes=None,
        verbose=True,
        print_every=100,
        smooth_weight=0.999,
        out_dir=None,
    """
    在给定赌博机上训练指定策略,包括进行多次试验、绘制图表等。
    """

    def _train_step(self, bandit, policy):
        """
        私有方法,执行单步训练,返回奖励、选择的臂、理想情况下的奖励和臂。
        """
        P, B = policy, bandit
        C = B.get_context() if hasattr(B, "get_context") else None
        rwd, arm = P.act(B, C)
        oracle_rwd, oracle_arm = B.oracle_payoff(C)
        return rwd, arm, oracle_rwd, oracle_arm
    # 初始化日志记录,用于存储训练过程中的数据
    def init_logs(self, policies):
        """
        Initialize the episode logs.

        Notes
        -----
        Training logs are represented as a nested set of dictionaries with the
        following structure:

            log[model_id][metric][trial_number][duplicate_number]

        For example, ``logs['model1']['regret'][3][1]`` holds the regret value
        accrued on the 3rd trial of the 2nd duplicate run for model1.

        Available fields are 'regret', 'cregret' (cumulative regret), 'reward',
        'mse' (mean-squared error between estimated arm EVs and the true EVs),
        'optimal_arm', 'selected_arm', and 'optimal_reward'.
        """
        # 如果输入的策略不是列表,则转换为列表
        if not isinstance(policies, list):
            policies = [policies]

        # 初始化日志记录字典,包含不同策略的不同指标数据
        self.logs = {
            str(p): {
                "mse": defaultdict(lambda: []),
                "regret": defaultdict(lambda: []),
                "reward": defaultdict(lambda: []),
                "cregret": defaultdict(lambda: []),
                "optimal_arm": defaultdict(lambda: []),
                "selected_arm": defaultdict(lambda: []),
                "optimal_reward": defaultdict(lambda: []),
            }
            for p in policies
        }

    # 打印运行摘要信息,包括估计值与真实值的均方误差和遗憾值
    def _print_run_summary(self, bandit, policy, regret):
        # 如果策略没有估计值或估计值为空,则返回空
        if not hasattr(policy, "ev_estimates") or len(policy.ev_estimates) == 0:
            return None

        # 获取臂的真实值和估计值
        evs, se = bandit.arm_evs, []
        fstr = "Arm {}: {:.4f} v. {:.4f}"
        ests = sorted(policy.ev_estimates.items(), key=lambda x: x[0])
        print("nnEstimated vs. Real EVn"   "-" * 21)
        # 打印每个臂的估计值和真实值
        for ix, (est, ev) in enumerate(zip(ests, evs)):
            print(fstr.format(ix   1, est[1], ev))
            se.append((est[1] - ev) ** 2)
        fstr = "nFinal MSE: {:.4f}nFinal Regret: {:.4f}nn"
        # 打印最终的均方误差和遗憾值
        print(fstr.format(np.mean(se), regret))
    # 绘制奖励图表,包括平滑处理、对比最优奖励、绘制曲线等
    def _plot_reward(self, optimal_rwd, policy, smooth_weight, axes=None, out_dir=None):
        # 获取指定策略的日志数据
        L = self.logs[str(policy)]
        # 计算平滑后的指标数据
        smds = self._smoothed_metrics(policy, optimal_rwd, smooth_weight)

        # 如果未提供绘图坐标轴,则创建新的图表和子图
        if axes is None:
            fig, [ax1, ax2] = plt.subplots(1, 2)
        else:
            # 如果提供了绘图坐标轴,则确保长度为2
            assert len(axes) == 2
            ax1, ax2 = axes

        # 生成实验次数的范围
        e_ids = range(1, len(L["reward"])   1)
        # 定义绘图参数列表
        plot_params = [[ax1, ax2], ["reward", "cregret"], ["b", "r"], [optimal_rwd, 0]]

        # 遍历绘图参数列表,绘制曲线
        for (ax, m, c, opt) in zip(*plot_params):
            avg, std = "sm_{}_avg sm_{}_std".format(m, m).split()
            ax.plot(e_ids, smds[avg], color=c)
            ax.axhline(opt, 0, 1, color=c, ls="--")
            ax.fill_between(
                e_ids,
                smds[avg]   smds[std],
                smds[avg] - smds[std],
                color=c,
                alpha=0.25,
            )
            ax.set_xlabel("Trial")
            # 设置 Y 轴标签
            m = "Cumulative Regret" if m == "cregret" else m
            ax.set_ylabel("Smoothed Avg. {}".format(m.title())

            # 如果未提供绘图坐标轴,则设置纵横比例
            if axes is None:
                ax.set_aspect(np.diff(ax.get_xlim()) / np.diff(ax.get_ylim()))

            # 如果提供了绘图坐标轴,则设置子图标题
            if axes is not None:
                ax.set_title(str(policy))

        # 如果未提供绘图坐标轴,则设置整体标题和布局
        if axes is None:
            fig.suptitle(str(policy))
            fig.tight_layout()

            # 如果指定输出目录,则保存图表
            if out_dir is not None:
                bid = policy.hyperparameters["id"]
                plt.savefig(op.join(out_dir, f"{bid}.png"), dpi=300)
            # 显示图表
            plt.show()
        # 返回绘图坐标轴
        return ax1, ax2
    # 计算平滑后的指标数据
    def _smoothed_metrics(self, policy, optimal_rwd, smooth_weight):
        # 获取指定策略的日志数据
        L = self.logs[str(policy)]

        # 预先分配平滑后的数据结构
        smds = {}
        for m in L.keys():
            # 如果是"selections"则跳过
            if m == "selections":
                continue

            # 初始化平均值的平滑数据
            smds["sm_{}_avg".format(m)] = np.zeros(len(L["reward"]))
            smds["sm_{}_avg".format(m)][0] = np.mean(L[m][1])

            # 初始化标准差的平滑数据
            smds["sm_{}_std".format(m)] = np.zeros(len(L["reward"]))
            smds["sm_{}_std".format(m)][0] = np.std(L[m][1])

        # 复制原始数据到平滑数据
        smoothed = {m: L[m][1] for m in L.keys()}
        # 从第二个元素开始遍历数据
        for e_id in range(2, len(L["reward"])   1):
            for m in L.keys():
                # 如果是"selections"则跳过
                if m == "selections":
                    continue
                # 获取前一个和当前的数据
                prev, cur = smoothed[m], L[m][e_id]
                # 对数据进行平滑处理
                smoothed[m] = [smooth(p, c, smooth_weight) for p, c in zip(prev, cur)]
                # 计算平均值和标准差并存储到平滑数据结构中
                smds["sm_{}_avg".format(m)][e_id - 1] = np.mean(smoothed[m])
                smds["sm_{}_std".format(m)][e_id - 1] = np.std(smoothed[m])
        # 返回平滑后的数据结构
        return smds

numpy-mlnumpy_mlbandits__init__.py

代码语言:javascript复制
# 从当前目录中导入 bandits 模块中的所有内容
from .bandits import *
# 从当前目录中导入 policies 模块
from . import policies
# 从当前目录中导入 trainer 模块
from . import trainer

numpy-mlnumpy_mlfactorizationfactors.py

代码语言:javascript复制
# 包含了用于近似矩阵分解的算法
from copy import deepcopy
# 导入 numpy 库并重命名为 np
import numpy as np

# 定义 VanillaALS 类
class VanillaALS:
    # 返回当前模型参数的字典
    @property
    def parameters(self):
        return {"W": self.W, "H": self.H}

    # 返回模型超参数的字典
    @property
    def hyperparameters(self):
        return {
            "id": "ALSFactor",
            "K": self.K,
            "tol": self.tol,
            "alpha": self.alpha,
            "max_iter": self.max_iter,
        }

    # 初始化因子矩阵,可以选择随机初始化或者传入已有的矩阵
    def _init_factor_matrices(self, X, W=None, H=None):
        N, M = X.shape
        scale = np.sqrt(X.mean() / self.K)
        # 如果未传入 W,则随机初始化
        self.W = np.random.rand(N, self.K) * scale if W is None else W
        # 如果未传入 H,则随机初始化
        self.H = np.random.rand(self.K, M) * scale if H is None else H

        assert self.W.shape == (N, self.K)
        assert self.H.shape == (self.K, M)

    # 计算正则化的 Frobenius 损失
    def _loss(self, X, Xhat):
        alpha, W, H = self.alpha, self.W, self.H
        # 定义计算平方 Frobenius 范数的函数
        sq_fnorm = lambda x: np.sum(x ** 2)  # noqa: E731
        return sq_fnorm(X - Xhat)   alpha * (sq_fnorm(W)   sq_fnorm(H))

    # 执行 ALS 更新
    def _update_factor(self, X, A):
        T1 = np.linalg.inv(A.T @ A   self.alpha * np.eye(self.K))
        return X @ A @ T1
    # 将数据矩阵分解为两个低秩因子,使用ALS算法
    def fit(self, X, W=None, H=None, n_initializations=10, verbose=False):
        """
        Factor a data matrix into two low rank factors via ALS.

        Parameters
        ----------
        X : numpy array of shape `(N, M)`
            The data matrix to factor.
        W : numpy array of shape `(N, K)` or None
            An initial value for the `W` factor matrix. If None, initialize `W`
            randomly. Default is None.
        H : numpy array of shape `(K, M)` or None
            An initial value for the `H` factor matrix. If None, initialize `H`
            randomly. Default is None.
        n_initializations : int
            Number of re-initializations of the algorithm to perform before
            taking the answer with the lowest reconstruction error. This value
            is ignored and set to 1 if both `W` and `H` are not None. Default
            is 10.
        verbose : bool
            Whether to print the loss at each iteration. Default is False.
        """
        # 如果W和H都不为None,则将n_initializations设置为1
        if W is not None and H is not None:
            n_initializations = 1

        # 初始化最佳损失值为正无穷
        best_loss = np.inf
        # 进行n_initializations次初始化
        for f in range(n_initializations):
            # 如果verbose为True,则打印初始化信息
            if verbose:
                print("nINITIALIZATION {}".format(f   1))

            # 调用_fit方法进行拟合,得到新的W、H和损失值
            new_W, new_H, loss = self._fit(X, W, H, verbose)

            # 如果当前损失值小于等于最佳损失值,则更新最佳损失值和最佳的W、H
            if loss <= best_loss:
                best_loss = loss
                best_W, best_H = deepcopy(new_W), deepcopy(new_H)

        # 将最佳的W、H赋值给模型的W、H
        self.W, self.H = best_W, best_H

        # 如果verbose为True,则打印最终损失值
        if verbose:
            print("nFINAL LOSS: {}".format(best_loss))
    # 在模型拟合过程中,更新因子矩阵 W 和 H
    def _fit(self, X, W, H, verbose):
        # 初始化因子矩阵 W 和 H
        self._init_factor_matrices(X, W, H)
        # 获取初始化后的因子矩阵 W 和 H
        W, H = self.W, self.H

        # 迭代更新因子矩阵,直到达到最大迭代次数
        for i in range(self.max_iter):
            # 更新因子矩阵 W
            W = self._update_factor(X, H.T)
            # 更新因子矩阵 H
            H = self._update_factor(X.T, W).T

            # 计算损失函数值
            loss = self._loss(X, W @ H)

            # 如果 verbose 为 True,则打印当前迭代的损失值
            if verbose:
                print("[Iter {}] Loss: {:.8f}".format(i   1, loss))

            # 如果损失值小于等于设定的阈值 tol,则提前结束迭代
            if loss <= self.tol:
                break

        # 返回更新后的因子矩阵 W 和 H,以及最终的损失值
        return W, H, loss
# 定义一个 NMF 类,用于执行非负矩阵分解
class NMF:
    # 返回当前模型参数的字典
    @property
    def parameters(self):
        """Return a dictionary of the current model parameters"""
        return {"W": self.W, "H": self.H}

    # 返回模型超参数的字典
    @property
    def hyperparameters(self):
        """Return a dictionary of the model hyperparameters"""
        return {
            "id": "NMF",
            "K": self.K,
            "tol": self.tol,
            "max_iter": self.max_iter,
        }

    # 初始化因子矩阵,使用普通 ALS 算法
    def _init_factor_matrices(self, X, W, H):
        """Initialize the factor matrices using vanilla ALS"""
        ALS = None
        N, M = X.shape

        # 如果 W 未定义,则使用 ALS 初始化
        if W is None:
            ALS = VanillaALS(self.K, alpha=0, max_iter=200)
            ALS.fit(X, verbose=False)
            W = ALS.W / np.linalg.norm(ALS.W, axis=0)

        # 如果 H 未定义,则随机初始化,或者使用 ALS 初始化
        if H is None:
            H = np.abs(np.random.rand(self.K, M)) if ALS is None else ALS.H

        # 断言确保矩阵形状正确
        assert W.shape == (N, self.K)
        assert H.shape == (self.K, M)

        # 将初始化后的 W 和 H 赋值给类属性
        self.H = H
        self.W = W

    # 计算重构损失函数,即 X 和 Xhat 之间的最小二乘损失
    def _loss(self, X, Xhat):
        """Return the least-squares reconstruction loss between X and Xhat"""
        return np.sum((X - Xhat) ** 2)

    # 更新 H 矩阵,使用快速 HALS 算法
    def _update_H(self, X, W, H):
        """Perform the fast HALS update for H"""
        eps = np.finfo(float).eps
        XtW = X.T @ W  # dim: (M, K)
        WtW = W.T @ W  # dim: (K, K)

        # 针对每个 K 值进行更新
        for k in range(self.K):
            H[k, :]  = XtW[:, k] - H.T @ WtW[:, k]
            H[k, :] = np.clip(H[k, :], eps, np.inf)  # 强制非负性
        return H
    def _update_W(self, X, W, H):
        """Perform the fast HALS update for W"""
        eps = np.finfo(float).eps
        XHt = X @ H.T  # 计算 X 与 H 转置的乘积,维度为 (N, K)
        HHt = H @ H.T  # 计算 H 与 H 转置的乘积,维度为 (K, K)

        for k in range(self.K):
            # 更新 W 矩阵的第 k 列
            W[:, k] = W[:, k] * HHt[k, k]   XHt[:, k] - W @ HHt[:, k]
            # 强制非负性,将小于 eps 的值设为 eps,大于 np.inf 的值设为 np.inf
            W[:, k] = np.clip(W[:, k], eps, np.inf)

            # 重新归一化新的列
            n = np.linalg.norm(W[:, k])
            W[:, k] /= n if n > 0 else 1.0
        return W

    def _fit(self, X, W, H, verbose):
        self._init_factor_matrices(X, W, H)

        W, H = self.W, self.H
        for i in range(self.max_iter):
            H = self._update_H(X, W, H)
            W = self._update_W(X, W, H)
            loss = self._loss(X, W @ H)

            if verbose:
                print("[Iter {}] Loss: {:.8f}".format(i   1, loss))

            if loss <= self.tol:
                break
        return W, H, loss

Factors

The factors.py module includes common approximate matrix-factorization algorithms including:

  • Regularized alternating least squares (ALS)
  • Non-negative matrix factorization via fast hierarchical least squares (HALS) (Cichocki & Phan, 2008)

numpy-mlnumpy_mlfactorization__init__.py

代码语言:javascript复制
# 导入近似矩阵分解的算法模块
from .factors import *

numpy-mlnumpy_mlgmmgmm.py

代码语言:javascript复制
# 导入必要的库
import numpy as np

# 导入自定义的辅助函数
from numpy_ml.utils.misc import logsumexp, log_gaussian_pdf

# 定义一个高斯混合模型类
class GMM(object):
    def __init__(self, C=3, seed=None):
        """
        通过期望最大化算法训练的高斯混合模型。

        参数
        ----------
        C : int
            GMM 中的簇数/混合成分数量。默认为 3。
        seed : int
            随机数生成器的种子。默认为 None。

        属性
        ----------
        N : int
            训练数据集中的示例数量。
        d : int
            训练数据集中每个示例的维度。
        pi : :py:class:`ndarray <numpy.ndarray>` of shape `(C,)`
            簇先验。
        Q : :py:class:`ndarray <numpy.ndarray>` of shape `(N, C)`
            变分分布 `q(T)`。
        mu : :py:class:`ndarray <numpy.ndarray>` of shape `(C, d)`
            簇均值。
        sigma : :py:class:`ndarray <numpy.ndarray>` of shape `(C, d, d)`
            簇协方差矩阵。
        """
        # 初始化 ELBO 和参数字典
        self.elbo = None
        self.parameters = {}
        # 初始化超参数字典
        self.hyperparameters = {
            "C": C,
            "seed": seed,
        }

        # 是否已拟合的标志
        self.is_fit = False

        # 如果有指定种子,则设置随机数种子
        if seed:
            np.random.seed(seed)

    def _initialize_params(self, X):
        """随机初始化起始 GMM 参数。"""
        # 获取训练数据集的示例数量和维度
        N, d = X.shape
        C = self.hyperparameters["C"]

        # 随机生成 C 个随机数
        rr = np.random.rand(C)

        # 初始化参数字典
        self.parameters = {
            "pi": rr / rr.sum(),  # 簇先验
            "Q": np.zeros((N, C)),  # 变分分布 q(T)
            "mu": np.random.uniform(-5, 10, C * d).reshape(C, d),  # 簇均值
            "sigma": np.array([np.eye(d) for _ in range(C)]),  # 簇协方差矩阵
        }

        # 重置 ELBO 和拟合标志
        self.elbo = None
        self.is_fit = False
    # 计算当前 GMM 参数下的 LLB(对数似然下界)
    def likelihood_lower_bound(self, X):
        """Compute the LLB under the current GMM parameters."""
        # 获取样本数量
        N = X.shape[0]
        # 获取模型参数
        P = self.parameters
        # 获取超参数中的聚类数
        C = self.hyperparameters["C"]
        # 获取模型参数中的 pi, Q, mu, sigma
        pi, Q, mu, sigma = P["pi"], P["Q"], P["mu"], P["sigma"]

        # 设置一个很小的数,用于避免除零错误
        eps = np.finfo(float).eps
        # 初始化两个期望值
        expec1, expec2 = 0.0, 0.0
        # 遍历每个样本
        for i in range(N):
            x_i = X[i]

            # 遍历每个聚类
            for c in range(C):
                # 获取当前聚类的 pi, Q, mu, sigma
                pi_k = pi[c]
                z_nk = Q[i, c]
                mu_k = mu[c, :]
                sigma_k = sigma[c, :, :]

                # 计算当前聚类的对数 pi 和对数概率密度
                log_pi_k = np.log(pi_k   eps)
                log_p_x_i = log_gaussian_pdf(x_i, mu_k, sigma_k)
                # 计算当前样本在当前聚类下的概率
                prob = z_nk * (log_p_x_i   log_pi_k)

                # 更新期望值
                expec1  = prob
                expec2  = z_nk * np.log(z_nk   eps)

        # 计算损失值
        loss = expec1 - expec2
        # 返回损失值
        return loss
    def fit(self, X, max_iter=100, tol=1e-3, verbose=False):
        """
        Fit the parameters of the GMM on some training data.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, d)`
            A collection of `N` training data points, each with dimension `d`.
        max_iter : int
            The maximum number of EM updates to perform before terminating
            training. Default is 100.
        tol : float
            The convergence tolerance. Training is terminated if the difference
            in VLB between the current and previous iteration is less than
            `tol`. Default is 1e-3.
        verbose : bool
            Whether to print the VLB at each training iteration. Default is
            False.

        Returns
        -------
        success : {0, -1}
            Whether training terminated without incident (0) or one of the
            mixture components collapsed and training was halted prematurely
            (-1).
        """
        # 初始化变量 prev_vlb 为负无穷
        prev_vlb = -np.inf
        # 初始化 GMM 参数
        self._initialize_params(X)

        # 迭代进行 EM 算法更新参数
        for _iter in range(max_iter):
            try:
                # E 步:计算后验概率
                self._E_step(X)
                # M 步:更新参数
                self._M_step(X)
                # 计算变分下界
                vlb = self.likelihood_lower_bound(X)

                # 如果 verbose 为 True,则打印当前迭代的变分下界
                if verbose:
                    print(f"{_iter   1}. Lower bound: {vlb}")

                # 判断是否收敛
                converged = _iter > 0 and np.abs(vlb - prev_vlb) <= tol
                # 如果变分下界为 NaN 或者已经收敛,则跳出循环
                if np.isnan(vlb) or converged:
                    break

                # 更新 prev_vlb
                prev_vlb = vlb

            except np.linalg.LinAlgError:
                # 捕获线性代数错误,表示某个混合成分崩溃
                print("Singular matrix: components collapsed")
                return -1

        # 将最终的变分下界赋值给 elbo,并标记已拟合完成
        self.elbo = vlb
        self.is_fit = True
        return 0
    def predict(self, X, soft_labels=True):
        """
        Return the log probability of each data point in `X` under each
        mixture components.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(M, d)`
            A collection of `M` data points, each with dimension `d`.
        soft_labels : bool
            If True, return the log probabilities of the M data points in X
            under each mixture component. If False, return only the ID of the
            most probable mixture. Default is True.

        Returns
        -------
        y : :py:class:`ndarray <numpy.ndarray>` of shape `(M, C)` or `(M,)`
            If `soft_labels` is True, `y` is a 2D array where index (i,j) gives
            the log probability of the `i` th data point under the `j` th
            mixture component. If `soft_labels` is False, `y` is a 1D array
            where the `i` th index contains the ID of the most probable mixture
            component.
        """
        # 检查模型是否已经拟合
        assert self.is_fit, "Must call the `.fit` method before making predictions"

        # 获取模型参数和混合成分数量
        P = self.parameters
        C = self.hyperparameters["C"]
        mu, sigma = P["mu"], P["sigma"]

        # 初始化结果列表
        y = []
        # 遍历数据集中的每个数据点
        for x_i in X:
            # 计算每个数据点在每个混合成分下的对数概率
            cprobs = [log_gaussian_pdf(x_i, mu[c, :], sigma[c, :, :]) for c in range(C)]

            # 如果不需要软标签
            if not soft_labels:
                # 将最可能的混合成分的索引添加到结果列表中
                y.append(np.argmax(cprobs))
            else:
                # 将每个数据点在每个混合成分下的对数概率添加到结果列表中
                y.append(cprobs)

        # 将结果列表转换为 numpy 数组并返回
        return np.array(y)
    # 执行 E 步骤,更新隐藏变量 Q
    def _E_step(self, X):
        # 获取模型参数和超参数
        P = self.parameters
        C = self.hyperparameters["C"]
        pi, Q, mu, sigma = P["pi"], P["Q"], P["mu"], P["sigma"]

        # 遍历数据集 X 中的每个样本
        for i, x_i in enumerate(X):
            # 存储分母值的列表
            denom_vals = []
            # 遍历每个聚类中心
            for c in range(C):
                pi_c = pi[c]
                mu_c = mu[c, :]
                sigma_c = sigma[c, :, :]

                # 计算对数概率值
                log_pi_c = np.log(pi_c)
                log_p_x_i = log_gaussian_pdf(x_i, mu_c, sigma_c)

                # 计算分母值并添加到列表中
                denom_vals.append(log_p_x_i   log_pi_c)

            # 计算对数分母值
            log_denom = logsumexp(denom_vals)
            # 计算隐藏变量 Q
            q_i = np.exp([num - log_denom for num in denom_vals])
            # 确保 Q 的每行和为 1
            np.testing.assert_allclose(np.sum(q_i), 1, err_msg="{}".format(np.sum(q_i)))

            Q[i, :] = q_i

    # 执行 M 步骤,更新模型参数
    def _M_step(self, X):
        N, d = X.shape
        P = self.parameters
        C = self.hyperparameters["C"]
        pi, Q, mu, sigma = P["pi"], P["Q"], P["mu"], P["sigma"]

        # 计算每个聚类的总权重
        denoms = np.sum(Q, axis=0)

        # 更新聚类先验概率
        pi = denoms / N

        # 更新聚类均值
        nums_mu = [np.dot(Q[:, c], X) for c in range(C)]
        for ix, (num, den) in enumerate(zip(nums_mu, denoms)):
            mu[ix, :] = num / den if den > 0 else np.zeros_like(num)

        # 更新聚类协方差矩阵
        for c in range(C):
            mu_c = mu[c, :]
            n_c = denoms[c]

            outer = np.zeros((d, d))
            for i in range(N):
                wic = Q[i, c]
                xi = X[i, :]
                outer  = wic * np.outer(xi - mu_c, xi - mu_c)

            outer = outer / n_c if n_c > 0 else outer
            sigma[c, :, :] = outer

        # 确保聚类先验概率之和为 1
        np.testing.assert_allclose(np.sum(pi), 1, err_msg="{}".format(np.sum(pi)))

Gaussian Mixture Models

The gmm.py module implements the standard (ie., non-Bayesian) Gaussian mixture model with maximum-likelihood parameter estimates via the EM algorithm.

Plots

numpy-mlnumpy_mlgmm__init__.py

代码语言:javascript复制
# 从当前目录下的 gmm 模块中导入所有内容
from .gmm import *

numpy-mlnumpy_mlhmmhmm.py

代码语言:javascript复制
# 隐马尔可夫模型模块
import numpy as np
# 导入 logsumexp 函数
from numpy_ml.utils.misc import logsumexp

# 定义 MultinomialHMM 类
class MultinomialHMM:
    # 从 HMM 中生成一个序列
    def generate(self, n_steps, latent_state_types, obs_types):
        """
        从 HMM 中采样一个序列。

        参数
        ----------
        n_steps : int
            生成序列的长度
        latent_state_types : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            潜在状态的标签集合
        obs_types : :py:class:`ndarray <numpy.ndarray>` of shape `(V,)`
            观测值的标签集合

        返回
        -------
        states : :py:class:`ndarray <numpy.ndarray>` of shape `(n_steps,)`
            采样的潜在状态。
        emissions : :py:class:`ndarray <numpy.ndarray>` of shape `(n_steps,)`
            采样的观测值。
        """
        # 获取 HMM 参数
        P = self.parameters
        A, B, pi = P["A"], P["B"], P["pi"]

        # 采样初始潜在状态
        s = np.random.multinomial(1, pi).argmax()
        states = [latent_state_types[s]]

        # 生成给定潜在状态的观测值
        v = np.random.multinomial(1, B[s, :]).argmax()
        emissions = [obs_types[v]]

        # 采样潜在状态转移,重复此过程
        for i in range(n_steps - 1):
            s = np.random.multinomial(1, A[s, :]).argmax()
            states.append(latent_state_types[s])

            v = np.random.multinomial(1, B[s, :]).argmax()
            emissions.append(obs_types[v])

        return np.array(states), np.array(emissions)
    # 初始化模型参数
    def _initialize_parameters(self):
        # 获取参数字典
        P = self.parameters
        # 获取模型参数 A, B, pi
        A, B, pi = P["A"], P["B"], P["pi"]
        # 获取派生变量 N, V
        N, V = self.derived_variables["N"], self.derived_variables["V"]

        # 对隐藏状态的先验进行均匀初始化
        if pi is None:
            pi = np.ones(N)
            pi = pi / pi.sum()

        # 对转移矩阵 A 进行均匀初始化
        if A is None:
            A = np.ones((N, N))
            A = A / A.sum(axis=1)[:, None]

        # 对观测概率矩阵 B 进行随机初始化
        if B is None:
            B = np.random.rand(N, V)
            B = B / B.sum(axis=1)[:, None]

        # 更新模型参数字典中的 A, B, pi
        P["A"], P["B"], P["pi"] = A, B, pi

    # 拟合模型
    def fit(
        self,
        O,
        latent_state_types,
        observation_types,
        pi=None,
        tol=1e-5,
        verbose=False,

Hidden Markov model

The hmm.py module implements a standard (i.e., non-Bayesian) Hidden Markov model with maximum-likelihood parameter estimation via the EM-algorithm (specifically, Baum-Welch).

Plots

numpy-mlnumpy_mlhmm__init__.py

代码语言:javascript复制
# 从当前目录下的 hmm 模块中导入所有内容
from .hmm import *

numpy-mlnumpy_mlldalda.py

代码语言:javascript复制
import numpy as np
from scipy.special import digamma, polygamma, gammaln

class LDA(object):
    def __init__(self, T=10):
        """
        Vanilla (non-smoothed) LDA model trained using variational EM.
        Generates maximum-likelihood estimates for model paramters
        `alpha` and `beta`.

        Parameters
        ----------
        T : int
            Number of topics

        Attributes
        ----------
        D : int
            Number of documents
        N : list of length `D`
            Number of words in each document
        V : int
            Number of unique word tokens across all documents
        phi : :py:class:`ndarray <numpy.ndarray>` of shape `(D, N[d], T)`
            Variational approximation to word-topic distribution
        gamma : :py:class:`ndarray <numpy.ndarray>` of shape `(D, T)`
            Variational approximation to document-topic distribution
        alpha : :py:class:`ndarray <numpy.ndarray>` of shape `(1, T)`
            Parameter for the Dirichlet prior on the document-topic distribution
        beta  : :py:class:`ndarray <numpy.ndarray>` of shape `(V, T)`
            Word-topic distribution
        """
        # 初始化LDA模型,设置主题数T
        self.T = T

    def _maximize_phi(self):
        """
        Optimize variational parameter phi
        ϕ_{t, n} ∝ β_{t, w_n}  e^( Ψ(γ_t) )
        """
        # 获取模型参数
        D = self.D
        N = self.N
        T = self.T

        # 获取模型变量
        phi = self.phi
        beta = self.beta
        gamma = self.gamma
        corpus = self.corpus

        # 遍历文档
        for d in range(D):
            # 遍历文档中的单词
            for n in range(N[d]):
                # 遍历主题
                for t in range(T):
                    # 获取单词索引
                    w_n = int(corpus[d][n])
                    # 计算phi的值
                    phi[d][n, t] = beta[w_n, t] * np.exp(digamma(gamma[d, t]))

                # 对主题进行归一化
                phi[d][n, :] = phi[d][n, :] / np.sum(phi[d][n, :])
        return phi
    # 最大化变分参数 gamma
    def _maximize_gamma(self):
        """
        Optimize variational parameter gamma
        γ_t = α_t   sum_{n=1}^{N_d} ϕ_{t, n}
        """
        # 获取文档数量
        D = self.D
        # 获取文档-主题分布
        phi = self.phi
        # 获取先验参数 alpha
        alpha = self.alpha

        # 计算 gamma
        gamma = np.tile(alpha, (D, 1))   np.array(
            list(map(lambda x: np.sum(x, axis=0), phi))
        )
        return gamma

    # 最大化模型参数 beta
    def _maximize_beta(self):
        """
        Optimize model parameter beta
        β_{t, n} ∝ sum_{d=1}^D sum_{i=1}^{N_d} ϕ_{d, t, n} [ i = n]
        """
        # 获取主题数量
        T = self.T
        # 获取词汇表大小
        V = self.V

        # 获取文档-主题分布
        phi = self.phi
        # 获取模型参数 beta
        beta = self.beta
        # 获取语料库
        corpus = self.corpus

        # 遍历词汇表中的每个词
        for n in range(V):
            # 构建与 phi 相同形状的二进制掩码 [i == n]
            mask = [np.tile((doc == n), (T, 1)).T for doc in corpus]
            # 更新 beta
            beta[n, :] = np.sum(
                np.array(list(map(lambda x: np.sum(x, axis=0), phi * mask))), axis=0
            )

        # 对每个主题进行归一化
        for t in range(T):
            beta[:, t] = beta[:, t] / np.sum(beta[:, t])

        return beta
    # 使用 Blei 的 O(n) 牛顿-拉普拉斯修改来优化 alpha,针对具有特殊结构的 Hessian 矩阵
    def _maximize_alpha(self, max_iters=1000, tol=0.1):
        # 获取文档数和主题数
        D = self.D
        T = self.T

        # 获取当前的 alpha 和 gamma
        alpha = self.alpha
        gamma = self.gamma

        # 迭代最大次数
        for _ in range(max_iters):
            # 保存旧的 alpha
            alpha_old = alpha

            # 计算梯度
            g = D * (digamma(np.sum(alpha)) - digamma(alpha))   np.sum(
                digamma(gamma) - np.tile(digamma(np.sum(gamma, axis=1)), (T, 1)).T,
                axis=0,
            )

            # 计算 Hessian 对角线分量
            h = -D * polygamma(1, alpha)

            # 计算 Hessian 常数分量
            z = D * polygamma(1, np.sum(alpha))

            # 计算常数
            c = np.sum(g / h) / (z ** (-1.0)   np.sum(h ** (-1.0)))

            # 更新 alpha
            alpha = alpha - (g - c) / h

            # 检查收敛性
            if np.sqrt(np.mean(np.square(alpha - alpha_old))) < tol:
                break

        return alpha

    # E 步:最大化变分参数 γ 和 ϕ 以最大化 VLB
    def _E_step(self):
        # 最大化 ϕ
        self.phi = self._maximize_phi()
        # 最大化 γ
        self.gamma = self._maximize_gamma()

    # M 步:最大化模型参数 α 和 β 以最大化 VLB
    def _M_step(self):
        # 最大化 β
        self.beta = self._maximize_beta()
        # 最大化 alpha
        self.alpha = self._maximize_alpha()
    def VLB(self):
        """
        Return the variational lower bound associated with the current model
        parameters.
        """
        # 获取当前模型参数
        phi = self.phi
        alpha = self.alpha
        beta = self.beta
        gamma = self.gamma
        corpus = self.corpus

        D = self.D
        T = self.T
        N = self.N

        a, b, c, _d = 0, 0, 0, 0
        # 遍历文档
        for d in range(D):
            # 计算 a 部分
            a  = (
                gammaln(np.sum(alpha))
                - np.sum(gammaln(alpha))
                  np.sum([(alpha[t] - 1) * dg(gamma, d, t) for t in range(T)])
            )

            _d  = (
                gammaln(np.sum(gamma[d, :]))
                - np.sum(gammaln(gamma[d, :]))
                  np.sum([(gamma[d, t] - 1) * dg(gamma, d, t) for t in range(T)])
            )

            # 遍历文档中的词
            for n in range(N[d]):
                w_n = int(corpus[d][n])

                # 计算 b 部分
                b  = np.sum([phi[d][n, t] * dg(gamma, d, t) for t in range(T)])
                # 计算 c 部分
                c  = np.sum([phi[d][n, t] * np.log(beta[w_n, t]) for t in range(T)])
                # 计算 _d 部分
                _d  = np.sum([phi[d][n, t] * np.log(phi[d][n, t]) for t in range(T)])

        # 返回变分下界
        return a   b   c - _d

    def initialize_parameters(self):
        """
        Provide reasonable initializations for model and variational parameters.
        """
        T = self.T
        V = self.V
        N = self.N
        D = self.D

        # 初始化模型参数
        self.alpha = 100 * np.random.dirichlet(10 * np.ones(T), 1)[0]
        self.beta = np.random.dirichlet(np.ones(V), T).T

        # 初始化变分参数
        self.phi = np.array([1 / T * np.ones([N[d], T]) for d in range(D)])
        self.gamma = np.tile(self.alpha, (D, 1))   np.tile(N / T, (T, 1)).T
    def train(self, corpus, verbose=False, max_iter=1000, tol=5):
        """
        Train the LDA model on a corpus of documents (bags of words).

        Parameters
        ----------
        corpus : list of length `D`
            A list of lists, with each sublist containing the tokenized text of
            a single document.
        verbose : bool
            Whether to print the VLB at each training iteration. Default is
            True.
        max_iter : int
            The maximum number of training iterations to perform before
            breaking. Default is 1000.
        tol : int
            Break the training loop if the difference betwen the VLB on the
            current iteration and the previous iteration is less than `tol`.
            Default is 5.
        """
        # 设置文档数量 D 为输入语料库的长度
        self.D = len(corpus)
        # 设置词汇量 V 为语料库中所有不同词的数量
        self.V = len(set(np.concatenate(corpus)))
        # 计算每个文档的词数,存储在 N 中
        self.N = np.array([len(d) for d in corpus])
        # 存储输入的语料库
        self.corpus = corpus

        # 初始化模型参数
        self.initialize_parameters()
        # 初始化变分下界值为负无穷
        vlb = -np.inf

        # 迭代训练模型
        for i in range(max_iter):
            # 保存上一次迭代的变分下界值
            old_vlb = vlb

            # E 步:更新变分参数
            self._E_step()
            # M 步:更新模型参数
            self._M_step()

            # 计算当前迭代的变分下界值
            vlb = self.VLB()
            # 计算当前迭代变分下界值与上一次迭代的差值
            delta = vlb - old_vlb

            # 如果 verbose 为 True,则打印当前迭代的变分下界值和差值
            if verbose:
                print("Iteration {}: {:.3f} (delta: {:.2f})".format(i   1, vlb, delta))

            # 如果变分下界值的变化小于设定的阈值 tol,则结束训练
            if delta < tol:
                break
# 定义一个函数,计算 Dirichlet 分布中随机变量 X_t 的期望对数值,其中 X_t ~ Dirichlet
def dg(gamma, d, t):
    """
    E[log X_t] where X_t ~ Dir
    """
    # 返回 digamma(gamma[d, t]) 减去 digamma(np.sum(gamma[d, :])) 的结果
    return digamma(gamma[d, t]) - digamma(np.sum(gamma[d, :]))

numpy-mlnumpy_mlldalda_smoothed.py

代码语言:javascript复制
import numpy as np

# 定义 SmoothedLDA 类
class SmoothedLDA(object):
    def __init__(self, T, **kwargs):
        """
        A smoothed LDA model trained using collapsed Gibbs sampling. Generates
        posterior mean estimates for model parameters `phi` and `theta`.

        Parameters
        ----------
        T : int
            Number of topics

        Attributes
        ----------
        D : int
            Number of documents
        N : int
            Total number of words across all documents
        V : int
            Number of unique word tokens across all documents
        phi : :py:class:`ndarray <numpy.ndarray>` of shape `(N[d], T)`
            The word-topic distribution
        theta : :py:class:`ndarray <numpy.ndarray>` of shape `(D, T)`
            The document-topic distribution
        alpha : :py:class:`ndarray <numpy.ndarray>` of shape `(1, T)`
            Parameter for the Dirichlet prior on the document-topic distribution
        beta  : :py:class:`ndarray <numpy.ndarray>` of shape `(V, T)`
            Parameter for the Dirichlet prior on the topic-word distribution
        """
        # 初始化 T 属性
        self.T = T

        # 初始化 alpha 属性,如果 kwargs 中包含 alpha,则使用 kwargs 中的值,否则使用默认值
        self.alpha = (50.0 / self.T) * np.ones(self.T)
        if "alpha" in kwargs.keys():
            self.alpha = (kwargs["alpha"]) * np.ones(self.T)

        # 初始化 beta 属性,如果 kwargs 中包含 beta,则使用 kwargs 中的值,否则使用默认值
        self.beta = 0.01
        if "beta" in kwargs.keys():
            self.beta = kwargs["beta"]
    # 初始化模型参数
    def _init_params(self, texts, tokens):
        # 设置模型的 tokens 属性
        self.tokens = tokens
        # 获取文档数量 D
        self.D = len(texts)
        # 获取唯一 tokens 的数量 V
        self.V = len(np.unique(self.tokens))
        # 获取文档中 token 的总数 N
        self.N = np.sum(np.array([len(doc) for doc in texts]))
        # 初始化 word_document 矩阵
        self.word_document = np.zeros(self.N)

        # 根据 tokens 数量设置 beta
        self.beta = self.beta * np.ones(self.V)

        # 初始化计数器
        count = 0
        # 遍历文档和文档中的单词
        for doc_idx, doc in enumerate(texts):
            for word_idx, word in enumerate(doc):
                # 更新 word_idx
                word_idx = word_idx   count
                # 将文档索引存入 word_document 矩阵
                self.word_document[word_idx] = doc_idx
            # 更新计数器
            count = count   len(doc)

    # 训练主题模型
    def train(self, texts, tokens, n_gibbs=2000):
        """
        Trains a topic model on the documents in texts.

        Parameters
        ----------
        texts : array of length `(D,)`
            The training corpus represented as an array of subarrays, where
            each subarray corresponds to the tokenized words of a single
            document.
        tokens : array of length `(V,)`
            The set of unique tokens in the documents in `texts`.
        n_gibbs : int
            The number of steps to run the collapsed Gibbs sampler during
            training. Default is 2000.

        Returns
        -------
        C_wt : :py:class:`ndarray <numpy.ndarray>` of shape (V, T)
            The word-topic count matrix
        C_dt : :py:class:`ndarray <numpy.ndarray>` of shape (D, T)
            The document-topic count matrix
        assignments : :py:class:`ndarray <numpy.ndarray>` of shape (N, n_gibbs)
            The topic assignments for each word in the corpus on each Gibbs
            step.
        """
        # 初始化模型参数
        self._init_params(texts, tokens)
        # 运行 Gibbs 采样器
        C_wt, C_dt, assignments = self._gibbs_sampler(n_gibbs, texts)
        # 拟合参数
        self.fit_params(C_wt, C_dt)
        # 返回结果
        return C_wt, C_dt, assignments
    def what_did_you_learn(self, top_n=10):
        """
        打印每个主题下概率最高的 `top_n` 个单词
        """
        遍历每个主题
        for tt in range(self.T):
            对每个主题的单词分布进行排序,找出概率最高的 `top_n` 个单词的索引
            top_idx = np.argsort(self.phi[:, tt])[::-1][:top_n]
            根据索引获取对应的单词
            top_tokens = self.tokens[top_idx]
            打印当前主题下的概率最高的单词
            print("nTop Words for Topic %s:n" % (str(tt)))
            遍历每个概率最高的单词并打印
            for token in top_tokens:
                print("t%sn" % (str(token)))

    def fit_params(self, C_wt, C_dt):
        """
        估计 `phi`,即单词-主题分布,和 `theta`,即主题-文档分布。

        参数
        ----------
        C_wt : :py:class:`ndarray <numpy.ndarray>` of shape (V, T)
            单词-主题计数矩阵
        C_dt : :py:class:`ndarray <numpy.ndarray>` of shape (D, T)
            文档-主题计数矩阵

        返回
        -------
        phi : :py:class:`ndarray <numpy.ndarray>` of shape `(V, T)`
            单词-主题分布
        theta : :py:class:`ndarray <numpy.ndarray>` of shape `(D, T)`
            主题-文档分布
        """
        初始化单词-主题分布矩阵 phi 和主题-文档分布矩阵 theta
        self.phi = np.zeros([self.V, self.T])
        self.theta = np.zeros([self.D, self.T])

        获取超参数 beta 和 alpha
        b, a = self.beta[0], self.alpha[0]
        遍历每个单词
        for ii in range(self.V):
            遍历每个主题
            for jj in range(self.T):
                计算单词-主题分布 phi
                self.phi[ii, jj] = (C_wt[ii, jj]   b) / (
                    np.sum(C_wt[:, jj])   self.V * b
                )

        遍历每个文档
        for dd in range(self.D):
            遍历每个主题
            for jj in range(self.T):
                计算主题-文档分布 theta
                self.theta[dd, jj] = (C_dt[dd, jj]   a) / (
                    np.sum(C_dt[dd, :])   self.T * a
                )
        返回计算得到的 phi 和 theta
        return self.phi, self.theta
    # 估算给定条件下 token ii 被分配到主题 jj 的概率的近似值
    def _estimate_topic_prob(self, ii, d, C_wt, C_dt):
        """
        Compute an approximation of the conditional probability that token ii
        is assigned to topic jj given all previous topic assignments and the
        current document d: p(t_i = j | t_{-i}, w_i, d_i)
        """
        # 初始化概率向量
        p_vec = np.zeros(self.T)
        # 获取 beta 和 alpha 参数
        b, a = self.beta[0], self.alpha[0]
        # 遍历所有主题
        for jj in range(self.T):
            # 计算在主题 jj 下 token ii 的概率
            frac1 = (C_wt[ii, jj]   b) / (np.sum(C_wt[:, jj])   self.V * b)
            # 计算在文档 d 下主题 jj 的概率
            frac2 = (C_dt[d, jj]   a) / (np.sum(C_dt[d, :])   self.T * a)
            # 计算 token ii 被分配到主题 jj 的概率
            p_vec[jj] = frac1 * frac2
        # 对概率向量进行归一化处理
        return p_vec / np.sum(p_vec)
    # 定义一个私有方法,使用Collapsed Gibbs采样器估计主题分配的后验分布
    def _gibbs_sampler(self, n_gibbs, texts):
        """
        Collapsed Gibbs sampler for estimating the posterior distribution over
        topic assignments.
        """
        # 初始化计数矩阵
        C_wt = np.zeros([self.V, self.T])
        C_dt = np.zeros([self.D, self.T])
        assignments = np.zeros([self.N, n_gibbs   1])

        # 为单词随机初始化主题分配
        for ii in range(self.N):
            token_idx = np.concatenate(texts)[ii]
            assignments[ii, 0] = np.random.randint(0, self.T)

            doc = self.word_document[ii]
            C_dt[doc, assignments[ii, 0]]  = 1
            C_wt[token_idx, assignments[ii, 0]]  = 1

        # 运行Collapsed Gibbs采样器
        for gg in range(n_gibbs):
            print("Gibbs iteration {} of {}".format(gg   1, n_gibbs))
            for jj in range(self.N):
                token_idx = np.concatenate(texts)[jj]

                # 将计数矩阵减1
                doc = self.word_document[jj]
                C_wt[token_idx, assignments[jj, gg]] -= 1
                C_dt[doc, assignments[jj, gg]] -= 1

                # 从条件分布的近似中抽取新的主题
                p_topics = self._estimate_topic_prob(token_idx, doc, C_wt, C_dt)
                sampled_topic = np.nonzero(np.random.multinomial(1, p_topics))[0][0]

                # 更新计数矩阵
                C_wt[token_idx, sampled_topic]  = 1
                C_dt[doc, sampled_topic]  = 1
                assignments[jj, gg   1] = sampled_topic
        return C_wt, C_dt, assignments

Latent Dirichlet allocation

The lda.py module implements:

  1. Standard (ie., non-Bayesian) latent Dirichlet allocation with MLE parameter estimates via variational EM (Blei, Ng, & Jordan, 2003).
  2. Fully-Bayesian (ie., smoothed) latent Dirichlet allocation with MAP parameter estimates via collapsed Gibbs sampling (Griffiths & Steyvers, 2004).

Plots

Unsmoothed ### Smoothed TODO

numpy-mlnumpy_mllda__init__.py

代码语言:javascript复制
# 从当前目录中导入 lda 模块
from .lda import *
# 从当前目录中导入 lda_smoothed 模块
from .lda_smoothed import *

numpy-mlnumpy_mllinear_modelsbayesian_regression.py

代码语言:javascript复制
# 导入必要的库:numpy 和 scipy.stats
import numpy as np
import scipy.stats as stats

# 从自定义的测试模块中导入 is_number 和 is_symmetric_positive_definite 函数
from numpy_ml.utils.testing import is_number, is_symmetric_positive_definite

# 定义一个类 BayesianLinearRegressionUnknownVariance,用于贝叶斯线性回归模型
class BayesianLinearRegressionUnknownVariance:
    def fit(self, X, y):
        """
        Compute the posterior over model parameters using the data in `X` and
        `y`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            A dataset consisting of `N` examples, each of dimension `M`.
        y : :py:class:`ndarray <numpy.ndarray>` of shape `(N, K)`
            The targets for each of the `N` examples in `X`, where each target
            has dimension `K`.

        Returns
        -------
        self : :class:`BayesianLinearRegressionUnknownVariance<numpy_ml.linear_models.BayesianLinearRegressionUnknownVariance>` instance
        """  # noqa: E501
        # 如果需要拟合截距,将 X 转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.ones(X.shape[0]), X]

        N, M = X.shape
        alpha, beta, V, mu = self.alpha, self.beta, self.V, self.mu

        if is_number(V):
            # 如果 V 是数字,将其转换为对角矩阵
            V *= np.eye(M)

        if is_number(mu):
            # 如果 mu 是数字,将其转换为长度为 M 的数组
            mu *= np.ones(M)

        # 计算 sigma
        I = np.eye(N)  # noqa: E741
        a = y - (X @ mu)
        b = np.linalg.inv(X @ V @ X.T   I)
        c = y - (X @ mu)

        shape = N   alpha
        # 计算 sigma
        sigma = (1 / shape) * (alpha * beta ** 2   a @ b @ c)
        scale = sigma ** 2

        # sigma 是 sigma^2 的逆 Gamma 先验的众数
        sigma = scale / (shape - 1)

        # 计算均值
        V_inv = np.linalg.inv(V)
        L = np.linalg.inv(V_inv   X.T @ X)
        R = V_inv @ mu   X.T @ y

        mu = L @ R
        cov = L * sigma

        # 计算 sigma^2 和 b 的后验分布
        self.posterior = {
            "sigma**2": stats.distributions.invgamma(a=shape, scale=scale),
            "b | sigma**2": stats.multivariate_normal(mean=mu, cov=cov),
        }
        return self
    # 预测目标与输入数据关联的最大后验概率(MAP)预测
    def predict(self, X):
        """
        返回与`X`相关联的目标的MAP预测。

        参数
        ----------
        X:形状为`(Z, M)`的 :py:class:`ndarray <numpy.ndarray>`
            由`Z`个新示例组成的数据集,每个示例的维度为`M`。

        返回
        -------
        y_pred:形状为`(Z, K)`的 :py:class:`ndarray <numpy.ndarray>`
            `X`中项目的模型预测。
        """
        # 如果拟合截距,则将X转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.ones(X.shape[0]), X]

        # 创建单位矩阵I,形状为X的行数
        I = np.eye(X.shape[0])  # noqa: E741
        # 计算mu,即X与后验分布的均值的乘积
        mu = X @ self.posterior["b | sigma**2"].mean
        # 计算cov,即X与后验分布的协方差矩阵的乘积再加上单位矩阵I
        cov = X @ self.posterior["b | sigma**2"].cov @ X.T   I

        # MAP估计的y对应于后验预测的均值
        self.posterior_predictive = stats.multivariate_normal(mu, cov)
        # 返回mu作为预测结果
        return mu
class BayesianLinearRegressionKnownVariance:
    def fit(self, X, y):
        """
        Compute the posterior over model parameters using the data in `X` and
        `y`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            A dataset consisting of `N` examples, each of dimension `M`.
        y : :py:class:`ndarray <numpy.ndarray>` of shape `(N, K)`
            The targets for each of the `N` examples in `X`, where each target
            has dimension `K`.
        """
        # 如果需要拟合截距,将X转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.ones(X.shape[0]), X]

        N, M = X.shape

        # 如果V是数字,将其转换为M维单位矩阵
        if is_number(self.V):
            self.V *= np.eye(M)

        # 如果mu是数字,将其转换为M维全1向量
        if is_number(self.mu):
            self.mu *= np.ones(M)

        V = self.V
        mu = self.mu
        sigma = self.sigma

        # 计算V的逆矩阵
        V_inv = np.linalg.inv(V)
        # 计算L矩阵
        L = np.linalg.inv(V_inv   X.T @ X)
        # 计算R矩阵
        R = V_inv @ mu   X.T @ y

        # 计算后验均值mu
        mu = L @ R
        # 计算协方差矩阵cov
        cov = L * sigma ** 2

        # 在给定sigma的条件下,计算b的后验分布
        self.posterior["b"] = stats.multivariate_normal(mu, cov)
    def predict(self, X):
        """
        Return the MAP prediction for the targets associated with `X`.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(Z, M)`
            A dataset consisting of `Z` new examples, each of dimension `M`.

        Returns
        -------
        y_pred : :py:class:`ndarray <numpy.ndarray>` of shape `(Z, K)`
            The MAP predictions for the targets associated with the items in
            `X`.
        """
        # 如果需要拟合截距,将 X 转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.ones(X.shape[0]), X]

        # 创建单位矩阵 I,形状与 X 的行数相同
        I = np.eye(X.shape[0])  # noqa: E741
        # 计算 mu,使用 X 与后验均值的矩阵乘法
        mu = X @ self.posterior["b"].mean
        # 计算 cov,使用 X 与后验协方差的矩阵乘法,加上单位矩阵 I
        cov = X @ self.posterior["b"].cov @ X.T   I

        # MAP 预测 y 对应于高斯后验预测分布的均值/众数
        # 将 mu 和 cov 作为参数创建多元正态分布对象
        self.posterior_predictive = stats.multivariate_normal(mu, cov)
        # 返回 mu 作为预测结果
        return mu

numpy-mlnumpy_mllinear_modelsglm.py

代码语言:javascript复制
# 导入 numpy 库
import numpy as np

# 导入线性回归模型
from numpy_ml.linear_models.linear_regression import LinearRegression

# 定义一个极小值
eps = np.finfo(float).eps

# 不同链接函数的字典
_GLM_LINKS = {
    "logit": {
        # 对数几率链接函数
        "link": lambda mu: np.log((mu   eps) / (1 - mu   eps)),
        "inv_link": lambda eta: 1.0 / (1.0   np.exp(-eta)),
        "link_prime": lambda x: (1 / (x   eps))   (1 / (1 - x   eps)),
        "theta": lambda mu: np.log((mu   eps) / (1 - mu   eps)),
        "phi": lambda x: np.ones(x.shape[0]),
        "a": lambda phi: phi,
        "b": lambda theta: np.log(1   np.exp(theta)),
        "p": 1,
        "b_prime": lambda theta: np.exp(theta) / (1   np.exp(theta)),
        "b_prime2": lambda theta: np.exp(theta) / ((1   np.exp(theta)) ** 2),
    },
    "identity": {
        # 恒等链接函数
        "link": lambda mu: mu,
        "inv_link": lambda eta: eta,
        "link_prime": lambda x: np.ones_like(x),
        "theta": lambda mu: mu,
        "phi": lambda x: np.var(x, axis=0),
        "a": lambda phi: phi,
        "b": lambda theta: 0.5 * theta ** 2,
        "p": 1,
        "b_prime": lambda theta: theta,
        "b_prime2": lambda theta: np.ones_like(theta),
    },
    "log": {
        # 对数链接函数
        "link": lambda mu: np.log(mu   eps),
        "inv_link": lambda eta: np.exp(eta),
        "link_prime": lambda x: 1 / (x   eps),
        "theta": lambda mu: np.log(mu   eps),
        "phi": lambda x: np.ones(x.shape[0]),
        "a": lambda phi: phi,
        "p": 1,
        "b": lambda theta: np.exp(theta),
        "b_prime": lambda theta: np.exp(theta),
        "b_prime2": lambda theta: np.exp(theta),
    },
}

# 广义线性模型类
class GeneralizedLinearModel:
    def fit(self, X, y):
        """
        Find the maximum likelihood GLM coefficients via IRLS.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            A dataset consisting of `N` examples, each of dimension `M`.
        y : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            The targets for each of the `N` examples in `X`.

        Returns
        -------
        self : :class:`GeneralizedLinearModel <numpy_ml.linear_models.GeneralizedLinearModel>` instance
        """  # noqa: E501
        # 将 y 压缩为一维数组
        y = np.squeeze(y)
        # 断言 y 的维度为 1
        assert y.ndim == 1

        # 获取 X 的形状
        N, M = X.shape
        # 获取 GLM 链接函数
        L = _GLM_LINKS[self.link]

        # 初始化参数的起始值
        mu = np.ones_like(y) * np.mean(y)
        eta = L["link"](mu)
        theta = L["theta"](mu)

        # 如果拟合截距,将 X 转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.ones(N), X]

        # 通过 IRLS 进行 GLM 拟合
        i = 0
        diff, beta = np.inf, np.inf
        while diff > (self.tol * M):
            if i > self.max_iter:
                print("Warning: Model did not converge")
                break

            # 计算一阶泰勒近似
            z = eta   (y - mu) * L["link_prime"](mu)
            w = L["p"] / (L["b_prime2"](theta) * L["link_prime"](mu) ** 2)

            # 对 z 执行加权最小二乘
            wlr = LinearRegression(fit_intercept=False)
            beta_new = wlr.fit(X, z, weights=w).beta.ravel()

            eta = X @ beta_new
            mu = L["inv_link"](eta)
            theta = L["theta"](mu)

            diff = np.linalg.norm(beta - beta_new, ord=1)
            beta = beta_new
            i  = 1

        self.beta = beta
        self._is_fit = True
        return self
    # 使用训练好的模型为数据集 X 中的数据点生成分布均值预测
    def predict(self, X):
        r"""
        Use the trained model to generate predictions for the distribution
        means, :math:`mu`, associated with the collection of data points in
        **X**.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(Z, M)`
            A dataset consisting of `Z` new examples, each of dimension `M`.

        Returns
        -------
        mu_pred : :py:class:`ndarray <numpy.ndarray>` of shape `(Z,)`
            The model predictions for the expected value of the target
            associated with each item in `X`.
        """
        # 检查模型是否已经拟合
        assert self._is_fit, "Must call `fit` before generating predictions"
        # 从链接函数字典中获取链接函数
        L = _GLM_LINKS[self.link]

        # 如果使用截距,将 X 转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.ones(X.shape[0]), X]

        # 计算模型预测的均值
        mu_pred = L["inv_link"](X @ self.beta)
        # 将结果展平为一维数组并返回
        return mu_pred.ravel()

numpy-mlnumpy_mllinear_modelslinear_regression.py

代码语言:javascript复制
# 导入 numpy 库
import numpy as np

# 定义线性回归类
class LinearRegression:
    # 初始化加权线性最小二乘回归模型
    def __init__(self, fit_intercept=True):
        r"""
        A weighted linear least-squares regression model.

        Notes
        -----
        在加权线性最小二乘回归中,一个实值目标向量 **y** 被建模为协变量 **X** 和模型系数 :math:`beta` 的线性组合:

        .. math::

            y_i = beta^top mathbf{x}_i   epsilon_i

        在这个方程中 :math:`epsilon_i sim mathcal{N}(0, sigma^2_i)` 是与示例 :math:`i` 相关的误差项,而 :math:`sigma^2_i` 是相应示例的方差。

        在这个模型下,回归系数 :math:`beta` 的最大似然估计为:

        .. math::

            hat{beta} = Sigma^{-1} mathbf{X}^top mathbf{Wy}

        其中 :math:`Sigma^{-1} = (mathbf{X}^top mathbf{WX})^{-1}`,**W** 是一个权重对角矩阵,每个条目与相应测量的方差成反比。当 **W** 是单位矩阵时,示例被等权重处理,模型简化为标准线性最小二乘 [2]_。

        参考资料
        ----------
        .. [1] https://en.wikipedia.org/wiki/Weighted_least_squares
        .. [2] https://en.wikipedia.org/wiki/General_linear_model

        参数
        ----------
        fit_intercept : bool
            是否在模型系数之外拟合一个截距项。默认为 True。

        属性
        ----------
        beta : :py:class:`ndarray <numpy.ndarray>` of shape `(M, K)` or None
            拟合的模型系数。
        sigma_inv : :py:class:`ndarray <numpy.ndarray>` of shape `(N, N)` or None
            数据协方差矩阵的逆矩阵。
        """
        # 初始化模型系数和数据协方差矩阵的逆矩阵
        self.beta = None
        self.sigma_inv = None
        # 设置是否拟合截距项
        self.fit_intercept = fit_intercept

        # 标记模型是否已经拟合
        self._is_fit = False
    # 对单个样本进行Sherman-Morrison更新
    def _update1D(self, x, y, w):
        """Sherman-Morrison update for a single example"""
        beta, S_inv = self.beta, self.sigma_inv

        # 如果拟合截距,将x转换为设计向量
        if self.fit_intercept:
            x = np.c_[np.diag(w), x]

        # 通过Sherman-Morrison更新协方差矩阵的逆
        S_inv -= (S_inv @ x.T @ x @ S_inv) / (1   x @ S_inv @ x.T)

        # 更新模型系数
        beta  = S_inv @ x.T @ (y - x @ beta)

    # 对多个样本进行Woodbury更新
    def _update2D(self, X, y, W):
        """Woodbury update for multiple examples"""
        beta, S_inv = self.beta, self.sigma_inv

        # 如果拟合截距,将X转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.diag(W), X]

        I = np.eye(X.shape[0])  # noqa: E741

        # 通过Woodbury恒等式更新协方差矩阵的逆
        S_inv -= S_inv @ X.T @ np.linalg.pinv(I   X @ S_inv @ X.T) @ X @ S_inv

        # 更新模型系数
        beta  = S_inv @ X.T @ (y - X @ beta)
    def fit(self, X, y, weights=None):
        r"""
        Fit regression coefficients via maximum likelihood.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            A dataset consisting of `N` examples, each of dimension `M`.
        y : :py:class:`ndarray <numpy.ndarray>` of shape `(N, K)`
            The targets for each of the `N` examples in `X`, where each target
            has dimension `K`.
        weights : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)` or None
            Weights associated with the examples in `X`. Examples
            with larger weights exert greater influence on model fit.  When
            `y` is a vector (i.e., `K = 1`), weights should be set to the
            reciporical of the variance for each measurement (i.e., :math:`w_i
            = 1/sigma^2_i`). When `K > 1`, it is assumed that all columns of
            `y` share the same weight :math:`w_i`. If None, examples are
            weighted equally, resulting in the standard linear least squares
            update.  Default is None.

        Returns
        -------
        self : :class:`LinearRegression <numpy_ml.linear_models.LinearRegression>` instance
        """  # noqa: E501
        N = X.shape[0]

        # 如果 weights 为 None,则将权重设置为全为 1 的数组
        weights = np.ones(N) if weights is None else np.atleast_1d(weights)
        # 如果 weights 不是一维数组,则将其转换为一维数组
        weights = np.squeeze(weights) if weights.size > 1 else weights
        err_str = f"weights must have shape ({N},) but got {weights.shape}"
        # 检查权重数组的形状是否为 (N,),如果不是则抛出异常
        assert weights.shape == (N,), err_str

        # 根据权重调整 X 和 y 的值
        W = np.diag(np.sqrt(weights))
        X, y = W @ X, W @ y

        # 如果需要拟合截距项,则将 X 转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.sqrt(weights), X]

        # 计算最大似然估计的回归系数
        self.sigma_inv = np.linalg.pinv(X.T @ X)
        self.beta = np.atleast_2d(self.sigma_inv @ X.T @ y)

        self._is_fit = True
        return self
    # 使用训练好的模型在新的数据集上生成预测结果

    # 将参数 X 转换为设计矩阵,如果我们正在拟合一个截距
    if self.fit_intercept:
        X = np.c_[np.ones(X.shape[0]), X]
    
    # 返回 X 与模型参数 beta 的矩阵乘积作为预测结果
    return X @ self.beta

numpy-mlnumpy_mllinear_modelslogistic.py

代码语言:javascript复制
"""Logistic regression module"""
import numpy as np


class LogisticRegression:
    def fit(self, X, y, lr=0.01, tol=1e-7, max_iter=1e7):
        """
        Fit the regression coefficients via gradient descent on the negative
        log likelihood.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            A dataset consisting of `N` examples, each of dimension `M`.
        y : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            The binary targets for each of the `N` examples in `X`.
        lr : float
            The gradient descent learning rate. Default is 1e-7.
        max_iter : float
            The maximum number of iterations to run the gradient descent
            solver. Default is 1e7.
        """
        # convert X to a design matrix if we're fitting an intercept
        # 如果需要拟合截距,则将 X 转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.ones(X.shape[0]), X]

        # initialize the previous loss to infinity
        l_prev = np.inf
        # initialize beta with random values
        self.beta = np.random.rand(X.shape[1])
        # iterate for a maximum number of iterations
        for _ in range(int(max_iter)):
            # calculate the predicted values using the sigmoid function
            y_pred = _sigmoid(X @ self.beta)
            # calculate the negative log likelihood loss
            loss = self._NLL(X, y, y_pred)
            # check if the improvement in loss is less than the tolerance
            if l_prev - loss < tol:
                return
            # update the previous loss
            l_prev = loss
            # update the beta coefficients using the gradient descent
            self.beta -= lr * self._NLL_grad(X, y, y_pred)
    # 计算当前模型下目标的惩罚负对数似然
    def _NLL(self, X, y, y_pred):
        # 获取样本数量和特征数量
        N, M = X.shape
        # 获取当前模型的 beta 和 gamma 参数
        beta, gamma = self.beta, self.gamma
        # 根据惩罚类型确定阶数
        order = 2 if self.penalty == "l2" else 1
        # 计算 beta 的 L2 或 L1 范数
        norm_beta = np.linalg.norm(beta, ord=order)

        # 计算负对数似然
        nll = -np.log(y_pred[y == 1]).sum() - np.log(1 - y_pred[y == 0]).sum()
        # 计算惩罚项
        penalty = (gamma / 2) * norm_beta ** 2 if order == 2 else gamma * norm_beta
        # 返回标准化后的惩罚负对数似然
        return (penalty   nll) / N

    # 计算惩罚负对数似然对 beta 的梯度
    def _NLL_grad(self, X, y, y_pred):
        """Gradient of the penalized negative log likelihood wrt beta"""
        # 获取样本数量和特征数量
        N, M = X.shape
        # 获取惩罚类型、beta 和 gamma 参数
        p, beta, gamma = self.penalty, self.beta, self.gamma
        # 计算惩罚项的梯度
        d_penalty = gamma * beta if p == "l2" else gamma * np.sign(beta)
        # 返回梯度
        return -((y - y_pred) @ X   d_penalty) / N

    # 使用训练好的模型在新数据集上生成预测概率
    def predict(self, X):
        """
        Use the trained model to generate prediction probabilities on a new
        collection of data points.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(Z, M)`
            A dataset consisting of `Z` new examples, each of dimension `M`.

        Returns
        -------
        y_pred : :py:class:`ndarray <numpy.ndarray>` of shape `(Z,)`
            The model prediction probabilities for the items in `X`.
        """
        # 如果拟合截距,则将 X 转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.ones(X.shape[0]), X]
        # 返回预测概率
        return _sigmoid(X @ self.beta)
# 定义一个函数,计算 logistic sigmoid 函数的值
def _sigmoid(x):
    # logistic sigmoid 函数的定义:1 / (1   e^(-x))
    return 1 / (1   np.exp(-x))

numpy-mlnumpy_mllinear_modelsnaive_bayes.py

代码语言:javascript复制
# 导入 numpy 库
import numpy as np

# 定义高斯朴素贝叶斯分类器类
class GaussianNBClassifier:
    # 拟合模型参数,通过最大似然估计
    def fit(self, X, y):
        """
        Fit the model parameters via maximum likelihood.

        Notes
        -----
        The model parameters are stored in the :py:attr:`parameters
        <numpy_ml.linear_models.GaussianNBClassifier.parameters>` attribute.
        The following keys are present:

            "mean": :py:class:`ndarray <numpy.ndarray>` of shape `(K, M)`
                Feature means for each of the `K` label classes
            "sigma": :py:class:`ndarray <numpy.ndarray>` of shape `(K, M)`
                Feature variances for each of the `K` label classes
            "prior": :py:class:`ndarray <numpy.ndarray>` of shape `(K,)`
                Prior probability of each of the `K` label classes, estimated
                empirically from the training data

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            A dataset consisting of `N` examples, each of dimension `M`
        y: :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            The class label for each of the `N` examples in `X`

        Returns
        -------
        self : :class:`GaussianNBClassifier <numpy_ml.linear_models.GaussianNBClassifier>` instance
        """  # noqa: E501
        
        # 获取模型参数和超参数
        P = self.parameters
        H = self.hyperparameters

        # 获取唯一的类别标签
        self.labels = np.unique(y)

        # 获取类别数量和特征维度
        K = len(self.labels)
        N, M = X.shape

        # 初始化均值、方差和先验概率
        P["mean"] = np.zeros((K, M))
        P["sigma"] = np.zeros((K, M))
        P["prior"] = np.zeros((K,))

        # 遍历每个类别,计算均值、方差和先验概率
        for i, c in enumerate(self.labels):
            X_c = X[y == c, :]

            P["mean"][i, :] = np.mean(X_c, axis=0)
            P["sigma"][i, :] = np.var(X_c, axis=0)   H["eps"]
            P["prior"][i] = X_c.shape[0] / N
        return self
    # 使用训练好的分类器对输入数据集 X 进行预测,返回每个样本的预测类别标签
    def predict(self, X):
        """
        Use the trained classifier to predict the class label for each example
        in **X**.

        Parameters
        ----------
        X: :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            A dataset of `N` examples, each of dimension `M`

        Returns
        -------
        labels : :py:class:`ndarray <numpy.ndarray>` of shape `(N)`
            The predicted class labels for each example in `X`
        """
        # 返回每个样本的预测类别标签,通过计算后验概率的对数值并取最大值确定
        return self.labels[self._log_posterior(X).argmax(axis=1)]

    # 计算每个类别的(未归一化的)对数后验概率
    def _log_posterior(self, X):
        r"""
        Compute the (unnormalized) log posterior for each class.

        Parameters
        ----------
        X: :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            A dataset of `N` examples, each of dimension `M`

        Returns
        -------
        log_posterior : :py:class:`ndarray <numpy.ndarray>` of shape `(N, K)`
            Unnormalized log posterior probability of each class for each
            example in `X`
        """
        # 获取类别数量 K
        K = len(self.labels)
        # 初始化一个全零数组,用于存储每个样本的对数后验概率
        log_posterior = np.zeros((X.shape[0], K))
        # 遍历每个类别,计算每个样本的对数后验概率
        for i in range(K):
            log_posterior[:, i] = self._log_class_posterior(X, i)
        # 返回每个样本的对数后验概率
        return log_posterior
    # 计算给定类别索引下的(未归一化的)对数后验概率
    def _log_class_posterior(self, X, class_idx):
        r"""
        Compute the (unnormalized) log posterior for the label at index
        `class_idx` in :py:attr:`labels <numpy_ml.linear_models.GaussianNBClassifier.labels>`.

        Notes
        -----
        Unnormalized log posterior for example :math:`mathbf{x}_i` and class
        :math:`c` is::

        .. math::

            log P(y_i = c mid mathbf{x}_i, theta)
                &propto log P(y=c mid theta)  
                    log P(mathbf{x}_i mid y_i = c, theta) \
                &propto log P(y=c mid theta)
                    sum{j=1}^M log P(x_j mid y_i = c, theta)

        In the Gaussian naive Bayes model, the feature likelihood for class
        :math:`c`, :math:`P(mathbf{x}_i mid y_i = c, theta)` is assumed to
        be normally distributed

        .. math::

            mathbf{x}_i mid y_i = c, theta sim mathcal{N}(mu_c, Sigma_c)

        Parameters
        ----------
        X: :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            A dataset of `N` examples, each of dimension `M`
        class_idx : int
            The index of the current class in :py:attr:`labels`

        Returns
        -------
        log_class_posterior : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
            Unnormalized log probability of the label at index `class_idx`
            in :py:attr:`labels <numpy_ml.linear_models.GaussianNBClassifier.labels>`
            for each example in `X`
        """  # noqa: E501
        # 获取模型参数
        P = self.parameters
        # 获取当前类别的均值
        mu = P["mean"][class_idx]
        # 获取当前类别的先验概率
        prior = P["prior"][class_idx]
        # 获取当前类别的方差
        sigsq = P["sigma"][class_idx]

        # 计算对数似然 = 对数 X | N(mu, sigsq)
        log_likelihood = -0.5 * np.sum(np.log(2 * np.pi * sigsq))
        log_likelihood -= 0.5 * np.sum(((X - mu) ** 2) / sigsq, axis=1)
        # 返回对数似然加上对数先验概率,即未归一化的对数后验概率
        return log_likelihood   np.log(prior)

Linear Models

The linear_models module includes:

  1. OLS linear regression with maximum likelihood parameter estimates via the normal equation.
    • Includes optional weight arguments for weighted least squares
    • Supports batch and online coefficient updates.
  2. Ridge regression / Tikhonov regularization with maximum likelihood parameter estimates via the normal equation.
  3. Logistic regression with maximum likelihood parameter estimates via gradient descent.
  4. Bayesian linear regression with maximum a posteriori parameter estimates via conjugacy
    • Known coefficient prior mean and known error variance
    • Known coefficient prior mean and unknown error variance
  5. Naive Bayes classifier with Gaussian feature likelihoods.
  6. Generalized linear model with identity, log, and logit link functions.

Plots

numpy-mlnumpy_mllinear_modelsridge.py

代码语言:javascript复制
# 导入 numpy 库
import numpy as np

# 定义 RidgeRegression 类,用于执行岭回归
class RidgeRegression:
    # 初始化 Ridge 回归模型对象
    def __init__(self, alpha=1, fit_intercept=True):
        # Ridge 回归模型通过正规方程最大似然估计
        r"""
        A ridge regression model with maximum likelihood fit via the normal
        equations.

        Notes
        -----
        Ridge regression is a biased estimator for linear models which adds an
        additional penalty proportional to the L2-norm of the model
        coefficients to the standard mean-squared-error loss:

        .. math::

            mathcal{L}_{Ridge} = (mathbf{y} - mathbf{X} beta)^top
                (mathbf{y} - mathbf{X} beta)   alpha ||beta||_2^2

        where :math:`alpha` is a weight controlling the severity of the
        penalty.

        Given data matrix **X** and target vector **y**, the maximum-likelihood
        estimate for ridge coefficients, :math:`beta`, is:

        .. math::

            hat{beta} =
                left(mathbf{X}^top mathbf{X}   alpha mathbf{I} right)^{-1}
                    mathbf{X}^top mathbf{y}

        It turns out that this estimate for :math:`beta` also corresponds to
        the MAP estimate if we assume a multivariate Gaussian prior on the
        model coefficients, assuming that the data matrix **X** has been
        standardized and the target values **y** centered at 0:

        .. math::

            beta sim mathcal{N}left(mathbf{0}, frac{1}{2M} mathbf{I}right)

        Parameters
        ----------
        alpha : float
            L2 regularization coefficient. Larger values correspond to larger
            penalty on the L2 norm of the model coefficients. Default is 1.
        fit_intercept : bool
            Whether to fit an additional intercept term. Default is True.

        Attributes
        ----------
        beta : :py:class:`ndarray <numpy.ndarray>` of shape `(M, K)` or None
            Fitted model coefficients.
        """
        # 初始化模型系数和超参数
        self.beta = None
        self.alpha = alpha
        self.fit_intercept = fit_intercept
    def fit(self, X, y):
        """
        Fit the regression coefficients via maximum likelihood.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
            A dataset consisting of `N` examples, each of dimension `M`.
        y : :py:class:`ndarray <numpy.ndarray>` of shape `(N, K)`
            The targets for each of the `N` examples in `X`, where each target
            has dimension `K`.

        Returns
        -------
        self : :class:`RidgeRegression <numpy_ml.linear_models.RidgeRegression>` instance
        """  # noqa: E501
        # 如果需要拟合截距,将 X 转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.ones(X.shape[0]), X]

        # 创建正则化矩阵 A
        A = self.alpha * np.eye(X.shape[1])
        # 计算伪逆矩阵
        pseudo_inverse = np.linalg.inv(X.T @ X   A) @ X.T
        # 计算回归系数 beta
        self.beta = pseudo_inverse @ y
        # 返回 self 对象
        return self

    def predict(self, X):
        """
        Use the trained model to generate predictions on a new collection of
        data points.

        Parameters
        ----------
        X : :py:class:`ndarray <numpy.ndarray>` of shape `(Z, M)`
            A dataset consisting of `Z` new examples, each of dimension `M`.

        Returns
        -------
        y_pred : :py:class:`ndarray <numpy.ndarray>` of shape `(Z, K)`
            The model predictions for the items in `X`.
        """
        # 如果需要拟合截距,将 X 转换为设计矩阵
        if self.fit_intercept:
            X = np.c_[np.ones(X.shape[0]), X]
        # 返回预测结果
        return np.dot(X, self.beta)

numpy-mlnumpy_mllinear_models__init__.py

代码语言:javascript复制
# 一个包含各种线性模型的模块

# 导入 Ridge 回归模型
from .ridge import RidgeRegression
# 导入广义线性模型
from .glm import GeneralizedLinearModel
# 导入逻辑回归模型
from .logistic import LogisticRegression
# 导入贝叶斯回归模型(已知方差)
from .bayesian_regression import (
    BayesianLinearRegressionKnownVariance,
    BayesianLinearRegressionUnknownVariance,
)
# 导入高斯朴素贝叶斯分类器
from .naive_bayes import GaussianNBClassifier
# 导入线性回归模型
from .linear_regression import LinearRegression

numpy-mlnumpy_mlneural_netsactivationsactivations.py

代码语言:javascript复制
# 用于构建神经网络的激活函数对象集合
from math import erf
from abc import ABC, abstractmethod
import numpy as np

# 定义激活函数基类
class ActivationBase(ABC):
    def __init__(self, **kwargs):
        """初始化 ActivationBase 对象"""
        super().__init__()

    def __call__(self, z):
        """将激活函数应用于输入"""
        if z.ndim == 1:
            z = z.reshape(1, -1)
        return self.fn(z)

    @abstractmethod
    def fn(self, z):
        """将激活函数应用于输入"""
        raise NotImplementedError

    @abstractmethod
    def grad(self, x, **kwargs):
        """计算激活函数相对于输入的梯度"""
        raise NotImplementedError

# 定义 Sigmoid 激活函数类,继承自 ActivationBase
class Sigmoid(ActivationBase):
    def __init__(self):
        """逻辑 Sigmoid 激活函数"""
        super().__init__()

    def __str__(self):
        """返回激活函数的字符串表示"""
        return "Sigmoid"

    def fn(self, z):
        """
        计算输入 z 上的逻辑 Sigmoid 函数值

        .. math::

            sigma(x_i) = frac{1}{1   e^{-x_i}}
        """
        return 1 / (1   np.exp(-z))

    def grad(self, x):
        """
        计算输入 x 上逻辑 Sigmoid 函数的一阶导数

        .. math::

            frac{partial sigma}{partial x_i} = sigma(x_i) (1 - sigma(x_i))
        """
        fn_x = self.fn(x)
        return fn_x * (1 - fn_x)

    def grad2(self, x):
        """
        计算输入 x 上逻辑 Sigmoid 函数的二阶导数

        .. math::

            frac{partial^2 sigma}{partial x_i^2} =
                frac{partial sigma}{partial x_i} (1 - 2 sigma(x_i))
        """
        fn_x = self.fn(x)
        return fn_x * (1 - fn_x) * (1 - 2 * fn_x)

# 定义 ReLU 激活函数类,继承自 ActivationBase
class ReLU(ActivationBase):
    """
    修正线性激活函数
    """
    Notes
    -----
    "ReLU units can be fragile during training and can "die". For example, a
    large gradient flowing through a ReLU neuron could cause the weights to
    update in such a way that the neuron will never activate on any datapoint
    again. If this happens, then the gradient flowing through the unit will
    forever be zero from that point on. That is, the ReLU units can
    irreversibly die during training since they can get knocked off the data
    manifold.

    For example, you may find that as much as 40% of your network can be "dead"
    (i.e. neurons that never activate across the entire training dataset) if
    the learning rate is set too high. With a proper setting of the learning
    rate this is less frequently an issue." [*]_

    References
    ----------
    .. [*] Karpathy, A. "CS231n: Convolutional neural networks for visual recognition."
    """

    # 初始化函数
    def __init__(self):
        super().__init__()

    # 返回激活函数的字符串表示
    def __str__(self):
        """Return a string representation of the activation function"""
        return "ReLU"

    # 计算输入 z 上的 ReLU 函数值
    def fn(self, z):
        r"""
        Evaulate the ReLU function on the elements of input `z`.

        .. math::

            text{ReLU}(z_i)
                &=  z_i     &&text{if }z_i > 0 \
                &=  0     &&text{otherwise}
        """
        return np.clip(z, 0, np.inf)

    # 计算输入 x 上的 ReLU 函数的一阶导数
    def grad(self, x):
        r"""
        Evaulate the first derivative of the ReLU function on the elements of input `x`.

        .. math::

            frac{partial text{ReLU}}{partial x_i}
                &=  1     &&text{if }x_i > 0 \
                &=  0       &&text{otherwise}
        """
        return (x > 0).astype(int)

    # 计算输入 x 上的 ReLU 函数的二阶导数
    def grad2(self, x):
        r"""
        Evaulate the second derivative of the ReLU function on the elements of
        input `x`.

        .. math::

            frac{partial^2 text{ReLU}}{partial x_i^2}  =  0
        """
        return np.zeros_like(x)
class LeakyReLU(ActivationBase):
    """
    'Leaky' version of a rectified linear unit (ReLU).

    Notes
    -----
    Leaky ReLUs [*]_ are designed to address the vanishing gradient problem in
    ReLUs by allowing a small non-zero gradient when `x` is negative.

    Parameters
    ----------
    alpha: float
        Activation slope when x < 0. Default is 0.3.

    References
    ----------
    .. [*] Mass, L. M., Hannun, A. Y, & Ng, A. Y. (2013). "Rectifier
       nonlinearities improve neural network acoustic models." *Proceedings of
       the 30th International Conference of Machine Learning, 30*.
    """

    def __init__(self, alpha=0.3):
        # 初始化 LeakyReLU 激活函数,设置斜率参数 alpha,默认为 0.3
        self.alpha = alpha
        # 调用父类的初始化方法
        super().__init__()

    def __str__(self):
        """Return a string representation of the activation function"""
        # 返回激活函数的字符串表示形式
        return "Leaky ReLU(alpha={})".format(self.alpha)

    def fn(self, z):
        r"""
        Evaluate the leaky ReLU function on the elements of input `z`.

        .. math::

            text{LeakyReLU}(z_i)
                &=  z_i     &&text{if } z_i > 0 \
                &=  alpha z_i     &&text{otherwise}
        """
        # 复制输入 z,避免修改原始数据
        _z = z.copy()
        # 对小于 0 的元素应用 Leaky ReLU 函数
        _z[z < 0] = _z[z < 0] * self.alpha
        return _z

    def grad(self, x):
        r"""
        Evaluate the first derivative of the leaky ReLU function on the elements
        of input `x`.

        .. math::

            frac{partial text{LeakyReLU}}{partial x_i}
                &=  1     &&text{if }x_i > 0 \
                &=  alpha     &&text{otherwise}
        """
        # 创建与输入 x 相同形状的全为 1 的数组
        out = np.ones_like(x)
        # 对小于 0 的元素应用 Leaky ReLU 函数的导数
        out[x < 0] *= self.alpha
        return out

    def grad2(self, x):
        r"""
        Evaluate the second derivative of the leaky ReLU function on the
        elements of input `x`.

        .. math::

            frac{partial^2 text{LeakyReLU}}{partial x_i^2}  =  0
        """
        # 返回与输入 x 相同形状的全为 0 的数组
        return np.zeros_like(x)


class GELU(ActivationBase):
    # 初始化函数,创建一个高斯误差线性单元(GELU)对象
    def __init__(self, approximate=True):
        r"""
        A Gaussian error linear unit (GELU). [*]_

        Notes
        -----
        A ReLU alternative. GELU weights inputs by their value, rather than
        gates inputs by their sign, as in vanilla ReLUs.

        References
        ----------
        .. [*] Hendrycks, D., & Gimpel, K. (2016). "Bridging nonlinearities and
           stochastic regularizers with Gaussian error linear units." *CoRR*.

        Parameters
        ----------
        approximate : bool
            Whether to use a faster but less precise approximation to the Gauss
            error function when calculating the unit activation and gradient.
            Default is True.
        """
        # 设置是否使用近似计算高斯误差函数的标志
        self.approximate = True
        # 调用父类的初始化函数
        super().__init__()

    # 返回激活函数的字符串表示
    def __str__(self):
        """Return a string representation of the activation function"""
        return f"GELU(approximate={self.approximate})"

    # 计算输入 z 的 GELU 函数值
    def fn(self, z):
        r"""
        Compute the GELU function on the elements of input `z`.

        .. math::

            text{GELU}(z_i) = z_i P(Z leq z_i) = z_i Phi(z_i)
                = z_i cdot frac{1}{2}(1   text{erf}(x/sqrt{2}))
        """
        # 导入数学库中的常数和函数
        pi, sqrt, tanh = np.pi, np.sqrt, np.tanh

        # 如果使用近似计算
        if self.approximate:
            # 计算近似的 GELU 函数值
            return 0.5 * z * (1   tanh(sqrt(2 / pi) * (z   0.044715 * z ** 3)))
        # 如果不使用近似计算
        return 0.5 * z * (1   erf(z / sqrt(2)))
    def grad(self, x):
        r"""
        Evaluate the first derivative of the GELU function on the elements
        of input `x`.

        .. math::

            frac{partial text{GELU}}{partial x_i}  =
                frac{1}{2}   frac{1}{2}left(text{erf}(frac{x}{sqrt{2}})  
                    frac{x   text{erf}'(frac{x}{sqrt{2}})}{sqrt{2}}right)

        where :math:`text{erf}'(x) = frac{2}{sqrt{pi}} cdot exp{-x^2}`.
        """
        # 导入所需的数学函数库
        pi, exp, sqrt, tanh = np.pi, np.exp, np.sqrt, np.tanh

        # 计算 x/sqrt(2)
        s = x / sqrt(2)
        # 定义 erf' 函数
        erf_prime = lambda x: (2 / sqrt(pi)) * exp(-(x ** 2))  # noqa: E731

        # 如果使用近似计算
        if self.approximate:
            # 计算近似值
            approx = tanh(sqrt(2 / pi) * (x   0.044715 * x ** 3))
            # 计算一阶导数
            dx = 0.5   0.5 * approx   ((0.5 * x * erf_prime(s)) / sqrt(2))
        else:
            # 计算一阶导数
            dx = 0.5   0.5 * erf(s)   ((0.5 * x * erf_prime(s)) / sqrt(2))
        return dx

    def grad2(self, x):
        r"""
        Evaluate the second derivative of the GELU function on the elements
        of input `x`.

        .. math::

            frac{partial^2 text{GELU}}{partial x_i^2} =
                frac{1}{2sqrt{2}} left[
                    text{erf}'(frac{x}{sqrt{2}})  
                    frac{1}{sqrt{2}} text{erf}''(frac{x}{sqrt{2}})
                right]

        where :math:`text{erf}'(x) = frac{2}{sqrt{pi}} cdot exp{-x^2}` and
        :math:`text{erf}''(x) = frac{-4x}{sqrt{pi}} cdot exp{-x^2}`.
        """
        # 导入所需的数学函数库
        pi, exp, sqrt = np.pi, np.exp, np.sqrt
        # 计算 x/sqrt(2)
        s = x / sqrt(2)

        # 定义 erf' 函数
        erf_prime = lambda x: (2 / sqrt(pi)) * exp(-(x ** 2))  # noqa: E731
        # 定义 erf'' 函数
        erf_prime2 = lambda x: -4 * x * exp(-(x ** 2)) / sqrt(pi)  # noqa: E731
        # 计算二阶导数
        ddx = (1 / 2 * sqrt(2)) * (1   erf_prime(s)   (erf_prime2(s) / sqrt(2)))
        return ddx
class Tanh(ActivationBase):
    def __init__(self):
        """初始化一个双曲正切激活函数。"""
        super().__init__()

    def __str__(self):
        """返回激活函数的字符串表示形式"""
        return "Tanh"

    def fn(self, z):
        """计算输入 `z` 中元素的双曲正切函数。"""
        return np.tanh(z)

    def grad(self, x):
        r"""
        计算输入 `x` 中元素的双曲正切函数的一阶导数。

        .. math::

            frac{partial tanh}{partial x_i}  =  1 - tanh(x)^2
        """
        return 1 - np.tanh(x) ** 2

    def grad2(self, x):
        r"""
        计算输入 `x` 中元素的双曲正切函数的二阶导数。

        .. math::

            frac{partial^2 tanh}{partial x_i^2} =
                -2 tanh(x) left(frac{partial tanh}{partial x_i}right)
        """
        tanh_x = np.tanh(x)
        return -2 * tanh_x * (1 - tanh_x ** 2)


class Affine(ActivationBase):
    def __init__(self, slope=1, intercept=0):
        """
        一个仿射激活函数。

        参数
        ----------
        slope: float
            激活斜率。默认为1。
        intercept: float
            截距/偏移项。默认为0。
        """
        self.slope = slope
        self.intercept = intercept
        super().__init__()

    def __str__(self):
        """返回激活函数的字符串表示形式"""
        return "Affine(slope={}, intercept={})".format(self.slope, self.intercept)

    def fn(self, z):
        r"""
        计算输入 `z` 中元素的仿射激活函数。

        .. math::

            text{Affine}(z_i)  =  text{slope} times z_i   text{intercept}
        """
        return self.slope * z   self.intercept
    # 计算 Affine 激活函数在输入 x 的每个元素上的一阶导数
    def grad(self, x):
        # 一阶导数公式:导数等于斜率
        return self.slope * np.ones_like(x)

    # 计算 Affine 激活函数在输入 x 的每个元素上的二阶导数
    def grad2(self, x):
        # 二阶导数公式:二阶导数等于0
        return np.zeros_like(x)
class Identity(Affine):
    def __init__(self):
        """
        Identity activation function.

        Notes
        -----
        :class:`Identity` is syntactic sugar for :class:`Affine` with
        slope = 1 and intercept = 0.
        """
        # 调用父类的构造函数,设置斜率为1,截距为0
        super().__init__(slope=1, intercept=0)

    def __str__(self):
        """Return a string representation of the activation function"""
        # 返回激活函数的字符串表示
        return "Identity"


class ELU(ActivationBase):
    def __init__(self, alpha=1.0):
        r"""
        An exponential linear unit (ELU).

        Notes
        -----
        ELUs are intended to address the fact that ReLUs are strictly nonnegative
        and thus have an average activation > 0, increasing the chances of internal
        covariate shift and slowing down learning. ELU units address this by (1)
        allowing negative values when :math:`x < 0`, which (2) are bounded by a value
        :math:`-alpha`. Similar to :class:`LeakyReLU`, the negative activation
        values help to push the average unit activation towards 0. Unlike
        :class:`LeakyReLU`, however, the boundedness of the negative activation
        allows for greater robustness in the face of large negative values,
        allowing the function to avoid conveying the *degree* of "absence"
        (negative activation) in the input. [*]_

        Parameters
        ----------
        alpha : float
            Slope of negative segment. Default is 1.

        References
        ----------
        .. [*] Clevert, D. A., Unterthiner, T., Hochreiter, S. (2016). "Fast
           and accurate deep network learning by exponential linear units
           (ELUs)". *4th International Conference on Learning
           Representations*.
        """
        # 设置 alpha 参数
        self.alpha = alpha
        # 调用父类的构造函数
        super().__init__()

    def __str__(self):
        """Return a string representation of the activation function"""
        # 返回激活函数的字符串表示,包括 alpha 参数的值
        return "ELU(alpha={})".format(self.alpha)
    # 定义一个函数 fn,用于计算 ELU 激活函数在输入 z 的元素上的值
    def fn(self, z):
        # ELU 激活函数的定义,根据输入 z 的值进行计算
        return np.where(z > 0, z, self.alpha * (np.exp(z) - 1))

    # 定义一个函数 grad,用于计算 ELU 激活函数在输入 x 的元素上的一阶导数
    def grad(self, x):
        # ELU 激活函数一阶导数的定义,根据输入 x 的值进行计算
        return np.where(x > 0, np.ones_like(x), self.alpha * np.exp(x))

    # 定义一个函数 grad2,用于计算 ELU 激活函数在输入 x 的元素上的二阶导数
    def grad2(self, x):
        # ELU 激活函数二阶导数的定义,根据输入 x 的值进行计算
        return np.where(x >= 0, np.zeros_like(x), self.alpha * np.exp(x))
class Exponential(ActivationBase):
    # 定义指数(以 e 为底)激活函数
    def __init__(self):
        """An exponential (base e) activation function"""
        # 调用父类的构造函数
        super().__init__()

    def __str__(self):
        """Return a string representation of the activation function"""
        # 返回激活函数的字符串表示
        return "Exponential"

    def fn(self, z):
        r"""
        Evaluate the activation function

        .. math::
            text{Exponential}(z_i) = e^{z_i}
        """
        # 计算激活函数的值
        return np.exp(z)

    def grad(self, x):
        r"""
        Evaluate the first derivative of the exponential activation on the elements
        of input `x`.

        .. math::

            frac{partial text{Exponential}}{partial x_i}  =  e^{x_i}
        """
        # 计算激活函数的一阶导数
        return np.exp(x)

    def grad2(self, x):
        r"""
        Evaluate the second derivative of the exponential activation on the elements
        of input `x`.

        .. math::

            frac{partial^2 text{Exponential}}{partial x_i^2}  =  e^{x_i}
        """
        # 计算激活函数的二阶导数
        return np.exp(x)


class SELU(ActivationBase):
    r"""
    A scaled exponential linear unit (SELU).

    Notes
    -----
    SELU units, when used in conjunction with proper weight initialization and
    regularization techniques, encourage neuron activations to converge to
    zero-mean and unit variance without explicit use of e.g., batchnorm.

    For SELU units, the :math:`alpha` and :math:`text{scale}` values are
    constants chosen so that the mean and variance of the inputs are preserved
    between consecutive layers. As such the authors propose weights be
    initialized using Lecun-Normal initialization: :math:`w_{ij} sim
    mathcal{N}(0, 1 / text{fan_in})`, and to use the dropout variant
    :math:`alpha`-dropout during regularization. [*]_

    See the reference for more information (especially the appendix ;-) ).

    References
    ----------
    # 定义 SELU 激活函数类
    """
    [*] Klambauer, G., Unterthiner, T., & Hochreiter, S. (2017).
    "Self-normalizing neural networks." *Advances in Neural Information
    Processing Systems, 30.*
    """

    def __init__(self):
        # 初始化 SELU 激活函数的 alpha 和 scale 参数
        self.alpha = 1.6732632423543772848170429916717
        self.scale = 1.0507009873554804934193349852946
        # 创建 ELU 激活函数对象
        self.elu = ELU(alpha=self.alpha)
        # 调用父类的初始化方法
        super().__init__()

    def __str__(self):
        """Return a string representation of the activation function"""
        # 返回 SELU 的字符串表示
        return "SELU"

    def fn(self, z):
        r"""
        Evaluate the SELU activation on the elements of input `z`.

        .. math::

            text{SELU}(z_i)  =  text{scale} times text{ELU}(z_i, alpha)

        which is simply

        .. math::

            text{SELU}(z_i)
                &= text{scale} times z_i     &&text{if }z_i > 0 \
                &= text{scale} times alpha (e^{z_i} - 1)     &&text{otherwise}
        """
        # 计算 SELU 激活函数在输入 z 的元素上的值
        return self.scale * self.elu.fn(z)

    def grad(self, x):
        r"""
        Evaluate the first derivative of the SELU activation on the elements
        of input `x`.

        .. math::

            frac{partial text{SELU}}{partial x_i}
                &=  text{scale}     &&text{if } x_i > 0 \
                &=  text{scale} times alpha e^{x_i}     &&text{otherwise}
        """
        # 计算 SELU 激活函数在输入 x 的元素上的一阶导数
        return np.where(
            x >= 0, np.ones_like(x) * self.scale, np.exp(x) * self.alpha * self.scale,
        )

    def grad2(self, x):
        r"""
        Evaluate the second derivative of the SELU activation on the elements
        of input `x`.

        .. math::

            frac{partial^2 text{SELU}}{partial x_i^2}
                &=  0     &&text{if } x_i > 0 \
                &=  text{scale} times alpha e^{x_i}     &&text{otherwise}
        """
        # 计算 SELU 激活函数在输入 x 的元素上的二阶导数
        return np.where(x > 0, np.zeros_like(x), np.exp(x) * self.alpha * self.scale)
class HardSigmoid(ActivationBase):
    def __init__(self):
        """
        A "hard" sigmoid activation function.

        Notes
        -----
        The hard sigmoid is a piecewise linear approximation of the logistic
        sigmoid that is computationally more efficient to compute.
        """
        # 初始化函数,创建一个“hard” sigmoid激活函数对象
        super().__init__()

    def __str__(self):
        """Return a string representation of the activation function"""
        # 返回激活函数的字符串表示
        return "Hard Sigmoid"

    def fn(self, z):
        r"""
        Evaluate the hard sigmoid activation on the elements of input `z`.

        .. math::

            text{HardSigmoid}(z_i)
                &= 0     &&text{if }z_i < -2.5 \
                &= 0.2 z_i   0.5     &&text{if }-2.5 leq z_i leq 2.5 \
                &= 1     &&text{if }z_i > 2.5
        """
        # 计算输入`z`的hard sigmoid激活值
        return np.clip((0.2 * z)   0.5, 0.0, 1.0)

    def grad(self, x):
        r"""
        Evaluate the first derivative of the hard sigmoid activation on the elements
        of input `x`.

        .. math::

            frac{partial text{HardSigmoid}}{partial x_i}
                &=  0.2     &&text{if } -2.5 leq x_i leq 2.5\
                &=  0     &&text{otherwise}
        """
        # 计算输入`x`的hard sigmoid激活函数的一阶导数
        return np.where((x >= -2.5) & (x <= 2.5), 0.2, 0)

    def grad2(self, x):
        r"""
        Evaluate the second derivative of the hard sigmoid activation on the elements
        of input `x`.

        .. math::

            frac{partial^2 text{HardSigmoid}}{partial x_i^2} =  0
        """
        # 计算输入`x`的hard sigmoid激活函数的二阶导数
        return np.zeros_like(x)


class SoftPlus(ActivationBase):
    def __init__(self):
        """
        A softplus activation function.

        Notes
        -----
        In contrast to :class:`ReLU`, the softplus activation is differentiable
        everywhere (including 0). It is, however, less computationally efficient to
        compute.

        The derivative of the softplus activation is the logistic sigmoid.
        """
        # 初始化函数,创建一个softplus激活函数对象
        super().__init__()
    # 返回激活函数的字符串表示形式
    def __str__(self):
        """Return a string representation of the activation function"""
        return "SoftPlus"

    # 计算输入 z 中每个元素的 softplus 激活函数值
    def fn(self, z):
        r"""
        Evaluate the softplus activation on the elements of input `z`.

        .. math::

            text{SoftPlus}(z_i) = log(1   e^{z_i})
        """
        return np.log(np.exp(z)   1)

    # 计算输入 x 中每个元素 softplus 激活函数的一阶导数值
    def grad(self, x):
        r"""
        Evaluate the first derivative of the softplus activation on the elements
        of input `x`.

        .. math::

            frac{partial text{SoftPlus}}{partial x_i} = frac{e^{x_i}}{1   e^{x_i}}
        """
        exp_x = np.exp(x)
        return exp_x / (exp_x   1)

    # 计算输入 x 中每个元素 softplus 激活函数的二阶导数值
    def grad2(self, x):
        r"""
        Evaluate the second derivative of the softplus activation on the elements
        of input `x`.

        .. math::

            frac{partial^2 text{SoftPlus}}{partial x_i^2} =
                frac{e^{x_i}}{(1   e^{x_i})^2}
        """
        exp_x = np.exp(x)
        return exp_x / ((exp_x   1) ** 2)

0 人点赞