动手实现Bandit算法

2022-09-19 10:13:49 浏览数 (1)

关注我们,一起学习~

Bandit方法在很多领域都有应用,比如强化学习,推荐系统。在推荐系统中可以采用Bandit方法进行冷启动,探索与利用的平衡,具体的方法介绍在之前的文章中已经介绍了,这里不在赘述,这次和大家分享相关方法的具体实现。

Bandit算法学习与总结(一)

Bandit算法学习与总结(二):Disjoint LinUCB、Hybrid LinUCB

完整代码和测试代码详见:https://github.com/dqdallen/recommendation/tree/main/Bandit

代码相对简单,所以就没写多少注释

父类

该类用于实现一些通用的方法,从而简化具体类实现时的代码。主要是arms,counts,values分别存储臂,每个臂被访问的次数以及每个臂对应的收益。

代码语言:javascript复制
from abc import ABCMeta, abstractmethod


class BanditFather(metaclass=ABCMeta):
    def __init__(self, data):
        """
        Args:
            data: array, the collection of arm, eg items
        """
        self.arms = data
        # store the number of visits per arm
        self.counts = [0] * len(self.arms)
        # store the average reward or wins of each arm
        self.values = [0.] * len(self.arms)

    @abstractmethod
    def excute(self):
        pass

    @abstractmethod
    def update(self, reward, arm_idx):
        pass

EpsilonGreedy

代码语言:javascript复制
import numpy as np
from bandit_father import BanditFather


class EpsilonGreedy(BanditFather):
    def __init__(self, data, eps):
        """The implementation of epsilon_greedy
        Args:
            eps: the epsilon
        """
        super(EpsilonGreedy, self).__init__(data)
        self.eps = eps

    def get_best_armidx(self):
        v = max(self.values)
        return self.values.index(v)

    def get_random_armidx(self):
        idx = np.random.randint(len(self.arms))
        return idx

    def update(self, reward, arm_idx):
        self.counts[arm_idx]  = 1
        new_value = self.values[arm_idx] * (self.counts[arm_idx] - 1.)   reward
        new_value = new_value / self.counts[arm_idx]
        self.values[arm_idx] = new_value

    def excute(self):
        if np.random.random() > self.eps:
            arm_idx = self.get_best_armidx()
        else:
            arm_idx = self.get_random_armidx()

        return arm_idx

汤普森采样

代码语言:javascript复制
import numpy as np
from bandit_father import BanditFather


class ThompsonSampling(BanditFather):
    def __init__(self, data):
        super(ThompsonSampling, self).__init__(data)

    def update(self, reward, arm_idx):
        if reward == 1:
            self.values[arm_idx]  = 1
        self.counts[arm_idx]  = 1

    def excute(self):
        # sample from beta distribution
        pbeta = [np.random.beta(a   1, b - a   1)
                 for a, b in zip(self.values, self.counts)]
        arm_idx = np.argmax(pbeta)
        return arm_idx

UCB

代码语言:javascript复制
import numpy as np
from bandit_father import BanditFather


class UCB(BanditFather):
    def __init__(self, data):
        super(UCB, self).__init__(data)

    def excute(self):
        for i in range(len(self.arms)):
            if self.counts[i] == 0:
                return i
        max_ind = 0
        max_ucb = -1
        for i in range(len(self.arms)):
            v = self.values[i]   np.sqrt(
                2 * np.log(sum(self.counts)) / self.counts[i])
            if v > max_ucb:
                max_ucb = v
                max_ind = i
        return max_ind

    def update(self, reward, arm_idx):
        self.counts[arm_idx]  = 1
        value = self.values[arm_idx] * (self.counts[arm_idx] - 1)   reward
        self.values[arm_idx] = value / self.counts[arm_idx]

LibUCB

可以对照着伪代码看,基本就是跟着伪代码写的。

代码语言:javascript复制
import numpy as np


class DisjointLibUCB:
    def __init__(self, data, dim, alpha=0.25):
        self.arms = data
        self.dim = dim  # 特征维度
        self.alpha = alpha
        self.A = []     # 存储每个臂的A矩阵
        self.invA = []  # 矩阵A的逆
        self.b = []
        self.theta = []
        self.initialize()

    def initialize(self):
        for i in range(len(self.arms)):
            self.A.append(np.eye(self.dim))
            self.invA.append(np.eye(self.dim))
            self.b.append(np.zeros((self.dim, 1)))

    def excute(self, features):
        max_prob = -1
        max_ind = 0
        for i in range(len(self.invA)):
            theta = np.dot(self.invA[i], self.b[i])
            score = np.dot(theta.T, features[:, i])[0]
            bound = self.alpha * np.sqrt(np.dot(
                np.dot(features[:, i].T, self.invA[i]), features[:, i]))
            prob_tmp = score   bound
            if max_prob < prob_tmp:
                max_prob = prob_tmp
                max_ind = i

        return max_ind

    def update(self, features, reward, arm_idx):
        fea_tmp = features[:, arm_idx]
        self.A[arm_idx] = self.A[arm_idx]   np.dot(fea_tmp, fea_tmp.T)
        self.b[arm_idx] = self.b[arm_idx]   reward * fea_tmp
        self.invA[arm_idx] = np.linalg.inv(self.A[arm_idx])

0 人点赞