关注我们,一起学习~
Bandit方法在很多领域都有应用,比如强化学习,推荐系统。在推荐系统中可以采用Bandit方法进行冷启动,探索与利用的平衡,具体的方法介绍在之前的文章中已经介绍了,这里不在赘述,这次和大家分享相关方法的具体实现。
Bandit算法学习与总结(一)
Bandit算法学习与总结(二):Disjoint LinUCB、Hybrid LinUCB
完整代码和测试代码详见:https://github.com/dqdallen/recommendation/tree/main/Bandit
代码相对简单,所以就没写多少注释
父类
该类用于实现一些通用的方法,从而简化具体类实现时的代码。主要是arms,counts,values分别存储臂,每个臂被访问的次数以及每个臂对应的收益。
代码语言:javascript复制from abc import ABCMeta, abstractmethod
class BanditFather(metaclass=ABCMeta):
def __init__(self, data):
"""
Args:
data: array, the collection of arm, eg items
"""
self.arms = data
# store the number of visits per arm
self.counts = [0] * len(self.arms)
# store the average reward or wins of each arm
self.values = [0.] * len(self.arms)
@abstractmethod
def excute(self):
pass
@abstractmethod
def update(self, reward, arm_idx):
pass
EpsilonGreedy
代码语言:javascript复制import numpy as np
from bandit_father import BanditFather
class EpsilonGreedy(BanditFather):
def __init__(self, data, eps):
"""The implementation of epsilon_greedy
Args:
eps: the epsilon
"""
super(EpsilonGreedy, self).__init__(data)
self.eps = eps
def get_best_armidx(self):
v = max(self.values)
return self.values.index(v)
def get_random_armidx(self):
idx = np.random.randint(len(self.arms))
return idx
def update(self, reward, arm_idx):
self.counts[arm_idx] = 1
new_value = self.values[arm_idx] * (self.counts[arm_idx] - 1.) reward
new_value = new_value / self.counts[arm_idx]
self.values[arm_idx] = new_value
def excute(self):
if np.random.random() > self.eps:
arm_idx = self.get_best_armidx()
else:
arm_idx = self.get_random_armidx()
return arm_idx
汤普森采样
代码语言:javascript复制import numpy as np
from bandit_father import BanditFather
class ThompsonSampling(BanditFather):
def __init__(self, data):
super(ThompsonSampling, self).__init__(data)
def update(self, reward, arm_idx):
if reward == 1:
self.values[arm_idx] = 1
self.counts[arm_idx] = 1
def excute(self):
# sample from beta distribution
pbeta = [np.random.beta(a 1, b - a 1)
for a, b in zip(self.values, self.counts)]
arm_idx = np.argmax(pbeta)
return arm_idx
UCB
代码语言:javascript复制import numpy as np
from bandit_father import BanditFather
class UCB(BanditFather):
def __init__(self, data):
super(UCB, self).__init__(data)
def excute(self):
for i in range(len(self.arms)):
if self.counts[i] == 0:
return i
max_ind = 0
max_ucb = -1
for i in range(len(self.arms)):
v = self.values[i] np.sqrt(
2 * np.log(sum(self.counts)) / self.counts[i])
if v > max_ucb:
max_ucb = v
max_ind = i
return max_ind
def update(self, reward, arm_idx):
self.counts[arm_idx] = 1
value = self.values[arm_idx] * (self.counts[arm_idx] - 1) reward
self.values[arm_idx] = value / self.counts[arm_idx]
LibUCB
可以对照着伪代码看,基本就是跟着伪代码写的。
代码语言:javascript复制import numpy as np
class DisjointLibUCB:
def __init__(self, data, dim, alpha=0.25):
self.arms = data
self.dim = dim # 特征维度
self.alpha = alpha
self.A = [] # 存储每个臂的A矩阵
self.invA = [] # 矩阵A的逆
self.b = []
self.theta = []
self.initialize()
def initialize(self):
for i in range(len(self.arms)):
self.A.append(np.eye(self.dim))
self.invA.append(np.eye(self.dim))
self.b.append(np.zeros((self.dim, 1)))
def excute(self, features):
max_prob = -1
max_ind = 0
for i in range(len(self.invA)):
theta = np.dot(self.invA[i], self.b[i])
score = np.dot(theta.T, features[:, i])[0]
bound = self.alpha * np.sqrt(np.dot(
np.dot(features[:, i].T, self.invA[i]), features[:, i]))
prob_tmp = score bound
if max_prob < prob_tmp:
max_prob = prob_tmp
max_ind = i
return max_ind
def update(self, features, reward, arm_idx):
fea_tmp = features[:, arm_idx]
self.A[arm_idx] = self.A[arm_idx] np.dot(fea_tmp, fea_tmp.T)
self.b[arm_idx] = self.b[arm_idx] reward * fea_tmp
self.invA[arm_idx] = np.linalg.inv(self.A[arm_idx])