自动求导的思路与简单实现

2022-09-29 17:03:02 浏览数 (2)

参考:https://borgwang.github.io/dl/2019/09/15/autograd.html

概述

idealflow 在实现的时候需要显示为每层定义好前向 forward 和反向 backward(梯度计算)的计算逻辑。从本质上看这些 layer 其实是一组基础算子的组合,而这些基础算子(加减乘除、矩阵变换等等)的导函数本身都比较简单,如果能够将这些基础算子的导函数写好,同时把不同算子之间连接逻辑记录(计算依赖图)下来,那么这个时候就不再需要自己写反向了,只需要计算损失,然后从损失函数开始,让梯度自己用预先定义好的导函数,沿着计算图反向流动即可以得到参数的梯度,这个就是自动求导的核心思想。idealflow 中之所有 layer 这个概念,一方面是符合我们直觉上的理解,另一方面是为了在没有自动求导的情况下方便实现。有了自动求导,我们可以抛开 layer 这个概念,神经网络的训练可以抽象为定义好一个网络的计算图,然后让数据前向流动,让梯度自动反向流动。

Helper functions to build backward context

代码语言:python代码运行次数:0复制
def build_binary_ops_tensor(ts1, ts2, grad_fn_ts1, grad_fn_ts2, values):
    """for binary operator"""
    requires_grad = ts1.requires_grad or ts2.requires_grad
    dependency = []
    if ts1.requires_grad:
        dependency.append(dict(tensor=ts1, grad_fn=grad_fn_ts1))
    if ts2.requires_grad:
        dependency.append(dict(tensor=ts2, grad_fn=grad_fn_ts2))
    tensor_cls = ts1.__class__
    return tensor_cls(values, requires_grad, dependency)


def build_unary_ops_tensor(ts, grad_fn, values):
    """for unary operators"""
    requires_grad = ts.requires_grad
    dependency = []
    if ts.requires_grad:
        dependency.append(dict(tensor=ts, grad_fn=grad_fn))
    tensor_cls = ts.__class__
    return tensor_cls(values, requires_grad, dependency)

Define Tensor class

  • needs to define numerical operators
  • store its dependent tensors
  • store gradient functions w.r.t its dependent tensors
代码语言:python代码运行次数:0复制
def as_tensor(obj):
    if not isinstance(obj, Tensor):
        obj = Tensor(obj)
    return obj


class Tensor:
    
    def __init__(self, values, requires_grad=False, dependency=None):
        self._values = np.array(values)
        self.shape = self.values.shape
        
        self.grad = None
        if requires_grad:
            self.zero_grad()
        self.requires_grad = requires_grad
        
        if dependency is None:
            dependency = []
        self.dependency = dependency
            
    @property
    def values(self):
        return self._values
    
    @values.setter
    def values(self, new_values):
        self._values = np.array(new_values)
        self.grad = None
        
    def zero_grad(self):
        self.grad = np.zeros(self.shape)
        
    def __matmul__(self, other):
        """ self @ other """
        return _matmul(self, as_tensor(other))
        
    def __rmatmul__(self, other):
        """ other @ self """
        return _matmul(as_tensor(other), self)
    
    def __imatmul__(self, other):
        """ self @= other """
        self.values = self.values @ as_tensor(other).values
        return self
    
    def __add__(self, other):
        """ self   other """
        return _add(self, as_tensor(other))
    
    def __radd__(self, other):
        """ other   self """
        return _add(as_tensor(other), self)
    
    def __iadd__(self, other):
        """ self  = other """
        self.values = self.values   as_tensor(other).values
        return self
       
    def __sub__(self, other):
        """ self - other """
        return _sub(self, as_tensor(other))
    
    def __rsub__(self, other):
        """ other - self """
        return _add(as_tensor(other), self)
    
    def __isub__(self, other):
        """ self -= other """
        self.values = self.values - as_tensor(other).values
        return self
        
    def __mul__(self, other):
        """ self * other """
        return _mul(self, as_tensor(other))
    
    def __rmul(self, other):
        """ other * self """
        return _mul(as_tensor(other), self)
    
    def __imul(self, other):
        """ self *= other """
        self.values = self.values * as_tensor(other).values
        return self
    
    def __neg__(self):
        """ -self """
        return _neg(self)
    
    def sum(self, axis=None):
        return _sum(self, axis=axis)
    
    
    def backward(self, grad=None):
        assert self.requires_grad, "Call backward() on a non-requires-grad tensor."
        grad = 1.0 if grad is None else grad
        grad = np.array(grad)

        # accumulate gradient
        self.grad  = grad

        # propagate the gradient to its dependencies
        for dep in self.dependency:
            grad_for_dep = dep["grad_fn"](grad)
            dep["tensor"].backward(grad_for_dep)
            
            
def _matmul(ts1, ts2):
    values = ts1.values @ ts2.values

    # c = a @ b
    # D_c / D_a = grad @ b.T
    # D_c / D_b = a.T @ grad
    def grad_fn_ts1(grad):
        return grad @ ts2.values.T

    def grad_fn_ts2(grad):
        return ts1.values.T @ grad

    return build_binary_ops_tensor(
        ts1, ts2, grad_fn_ts1, grad_fn_ts2, values)


def _add(ts1, ts2):
    values = ts1.values   ts2.values

    # c = a   b
    # D_c / D_a = 1.0
    # D_c / D_b = 1.0
    def grad_fn_ts1(grad):
        # handle broadcasting (5, 3)   (3,) -> (5, 3)
        for _ in range(grad.ndim - ts1.values.ndim):
            grad = grad.sum(axis=0)
        # handle broadcasting (5, 3)   (1, 3) -> (5, 3)
        for i, dim in enumerate(ts1.shape):
            if dim == 1:
                grad = grad.sum(axis=i, keepdims=True)
        return grad

    def grad_fn_ts2(grad):
        for _ in range(grad.ndim - ts2.values.ndim):
            grad = grad.sum(axis=0)
        for i, dim in enumerate(ts2.shape):
            if dim == 1:
                grad = grad.sum(axis=i, keepdims=True)
        return grad

    return build_binary_ops_tensor(
        ts1, ts2, grad_fn_ts1, grad_fn_ts2, values)


def _sub(ts1, ts2):
    return ts1   (-ts2)


def _mul(ts1, ts2):
    values = ts1.values * ts2.values

    # c = a * b
    # D_c / D_a = b
    # D_c / D_b = a
    def grad_fn_ts1(grad):
        grad = grad * ts2.values
        for _ in range(grad.ndim - ts1.values.ndim):
            grad = grad.sum(axis=0)
        for i, dim in enumerate(ts1.shape):
            if dim == 1:
                grad = grad.sum(axis=i, keepdims=True)
        return grad

    def grad_fn_ts2(grad):
        grad = grad * ts1.values
        for _ in range(grad.ndim - ts2.values.ndim):
            grad = grad.sum(axis=0)
        for i, dim in enumerate(ts2.shape):
            if dim == 1:
                grad = grad.sum(axis=i, keepdims=True)
        return grad

    return build_binary_ops_tensor(
        ts1, ts2, grad_fn_ts1, grad_fn_ts2, values)


def _neg(ts):
    values = -ts.values

    def grad_fn(grad):
        return -grad

    return build_unary_ops_tensor(ts, grad_fn, values)


def _sum(ts, axis):
    values = ts.values.sum(axis=axis)
    if axis is not None:
        repeat = ts.values.shape[axis]

    def grad_fn(grad):
        if axis is None:
            grad = grad * np.ones_like(ts.values)
        else:
            grad = np.expand_dims(grad, axis)
            grad = np.repeat(grad, repeat, axis)
        return grad

    return build_unary_ops_tensor(ts, grad_fn, values)

training data

代码语言:txt复制
x = Tensor(np.random.normal(0, 1.0, (100, 3)))
coef = Tensor(np.random.randint(0, 10, (3,)))
y = x * coef - 3 

params = {
    "w": Tensor(np.random.normal(0, 1.0, (3, 3)), requires_grad=True),
    "b": Tensor(np.random.normal(0, 1.0, 3), requires_grad=True)
}

learng_rate = 3e-4
loss_list = []
for e in range(101):
    # set gradient to zero
    for param in params.values():
        param.zero_grad()
    
    # forward
    predicted = x @ params["w"]   params["b"]
    err = predicted - y
    loss = (err * err).sum()
    
    # backward automatically
    loss.backward()
    
    # updata parameters
    for param in params.values():
        param -= learng_rate * param.grad
        
    loss_list.append(loss.values)
    if e % 10 == 0:
        print("epoch-%i tloss: %.4f" % (e, loss.values))

plt.figure(figsize=(8, 5))
plt.plot(loss_list)
plt.grid()
plt.xlabel("epoch")
plt.ylabel("loss")
image.pngimage.png

0 人点赞