上文我们介绍了使用简单的Random Guessing Algorithm & Hill Climbing 算法来解决CartPole问题,主要在决策动作这个步骤进行了修改,但是上文介绍的方法都是随机改变权重,针对简单问题参数量比较少的问题可能会得到比较好的效果,但是如果问题复杂,需要参数量多的话,这种方法就不太理想。本文主要介绍基于PolicyGradient方案如何解决CartPole问题。
PolicyGradient实例
基于策略的方案我们在算法介绍一章中已经进行了介绍,是直接对策略进行建模,用一个神经网络表示策略,对动作输出一个输出概率来表示。
我们还是基于上文的学习框架,只是在最重要的choose_action步骤中,调整为PolicyGradient模型预测的action。
首先我们看下学习过程,其中主要逻辑都添加到代码注释中。
探索过程
代码语言:python代码运行次数:0复制#学习过程,探索1000次
for i_episode in range(1000):
# 每次探索重置环境
observation = env.reset()
while True:
if RENDER: env.render()
# 根据策略模型决策动作
action = RL.choose_action(observation)
# 执行动作,返回执行动作后的观测状态,reward等信息
observation_, reward, done, info = env.step(action)
# 将观测,动作和回报存储起来。需要用这些序列值进行模型学习
RL.store_transition(observation, action, reward)
# 本次探索结束
if done:
ep_rs_sum = sum(RL.ep_rs)
if 'running_reward' not in globals():
running_reward = ep_rs_sum
else:
# 累计每次探索的回报值
running_reward = running_reward * 0.99 ep_rs_sum * 0.01
# reward大于阈值开始渲染,否则再学学
if running_reward > DISPLAY_REWARD_THRESHOLD: RENDER = True
print("episode:", i_episode, "rewards:", int(running_reward), "RENDER", RENDER)
# 每次探索学习一次
vt = RL.learn()
break
# 智能体探索一步
observation = observation_
模型更新过程
其中最重要的逻辑代码就是action = RL.choose_action(observation)
已经在每次探索中的vt = RL.learn()
其中RL就是PolicyGradient,下面我们再重点看下PolicyGradient模型代码,以及代码分析:
代码语言:python代码运行次数:0复制class PolicyGradient:
def __init__(
self,
n_actions,
n_features,
learning_rate=0.01,
reward_decay=0.95,
output_graph=False,
):
# 动作空间的维数--2
self.n_actions = n_actions
# 状态特征的维数--4
self.n_features = n_features
# 学习速率
self.lr = learning_rate
# 回报衰减率
self.gamma = reward_decay
# 一次探索的观测值,动作值,和回报值
self.ep_obs, self.ep_as, self.ep_rs = [], [], []
# 创建策略网络
self._build_net()
self.sess = tf.Session()
self.sess.run(tf.global_variables_initializer())
if output_graph:
tf.summary.FileWriter("logs/", self.sess.graph)
def _build_net(self):
""" 创建策略网络的实现
"""
# 2.x版本和1.x版本兼容性问题
tf.disable_eager_execution()
with tf.name_scope('input'):
# 观察状态--[B, 4]
self.tf_obs = tf.placeholder(tf.float32, [None, self.n_features], name="observations")
# 执行动作--[B, ]
self.tf_acts = tf.placeholder(tf.int32, [None, ], name="actions_num")
# 累计回报值--[B,]
self.tf_vt = tf.placeholder(tf.float32, [None, ], name="actions_value")
# 网络结构,两层全连接层
layer = tf.layers.dense(
inputs=self.tf_obs,
units=10,
activation=tf.nn.tanh,
kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.3),
bias_initializer=tf.constant_initializer(0.1),
name='fc1',
)
all_act = tf.layers.dense(
inputs=layer,
units=self.n_actions,
activation=None,
kernel_initializer=tf.random_normal_initializer(mean=0, stddev=0.3),
bias_initializer=tf.constant_initializer(0.1),
name='fc2'
)
# 利用softmax函数预测每个动作的概率
self.all_act_prob = tf.nn.softmax(all_act, name='act_prob')
# 定义损失函数
with tf.name_scope('loss'):
# to maximize total reward (log_p * R) is to minimize -(log_p * R), and the tf only have minimize(loss)
# 目标是最大化(log_p * R) 等价于优化器最小化-(log_p * R)
neg_log_prob = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=all_act, labels=self.tf_acts)
# or in this way:
# neg_log_prob = tf.reduce_sum(-tf.log(self.all_act_prob)*tf.one_hot(self.tf_acts, self.n_actions), axis=1)
loss = tf.reduce_mean(neg_log_prob * self.tf_vt)
#定义训练,更新参数
with tf.name_scope('train'):
self.train_op = tf.train.AdamOptimizer(self.lr).minimize(loss)
def choose_action(self, observation, type="random"):
""" 定义如何选择行为,即状态s处的行为采样.根据当前的行为概率分布进行采样
:param observation: 当前观察值
:return: 根据策略选取的动作
"""
prob_weights = self.sess.run(self.all_act_prob, feed_dict={self.tf_obs: observation[np.newaxis, :]})
# 按照给定的概率采样, 或者直接取最大。(random方式增加更多随机和探索性)
if type == "random":
action = np.random.choice(range(prob_weights.shape[1]), p=prob_weights.ravel())
else:
action = np.argmax(prob_weights.ravel())
return action
def store_transition(self, s, a, r):
""" 定义存储,将一个回合的状态,动作和回报都保存
:param s: 每步的观察值
:param a: 每步的动作值
:param r: 每步的reward
"""
self.ep_obs.append(s)
self.ep_as.append(a)
self.ep_rs.append(r)
def learn(self):
""" 每次探索获取数据后,进行学习更新策略网络参数
"""
# 计算一次探索的累计折扣回报
discounted_ep_rs_norm = self._discount_and_norm_rewards()
# 调用训练函数更新参数
self.sess.run(self.train_op, feed_dict={
self.tf_obs: np.vstack(self.ep_obs),
self.tf_acts: np.array(self.ep_as),
self.tf_vt: discounted_ep_rs_norm,
})
# 清空episode数据,等待下一次探索学习
self.ep_obs, self.ep_as, self.ep_rs = [], [], []
return discounted_ep_rs_norm
def _discount_and_norm_rewards(self):
""" 衰减回合的reward
"""
discounted_ep_rs = np.zeros_like(self.ep_rs)
running_add = 0
# 由于要考虑长期的累计reward,这里是逆序。t时刻reward:当前t时刻reward * gamma (t 1)时刻的reward。
for t in reversed(range(0, len(self.ep_rs))):
running_add = running_add * self.gamma self.ep_rs[t]
discounted_ep_rs[t] = running_add
# 进行归一化
discounted_ep_rs -= np.mean(discounted_ep_rs)
discounted_ep_rs /= np.std(discounted_ep_rs)
return discounted_ep_rs
总结
通过以上逻辑,总结下整个探索和学习过程的框架大致如下:
代码语言:shell复制#学习过程,探索N次
for i_episode in range(N):
observation = env.reset()
while True:
# 决策动作(可替换模块)
action = choose_action(observation)
observation_, reward, done, _ = env.step(action)
# 存储探索序列信息
store_transition(observation, action, reward)
if done:
# 模型学习(可替换模块)
vt = learn()
break
# 智能体探索一步,更新观测值
observation = observation_
其中最重要的有一个决策模型,能通过当前观测状态值得到最好的指导动作,让长期受益最大。
而决策模型中最重要的部分就是网络的设计(本文代码使用的比较简单的两层全链接,可以设计更为复杂的网络),以及loss部分设计(目标是使长期受益最大)。
在下一篇中,我们将介绍基于策略和基于值结合的Actor-Critic方案。
代码参考:
https://github.com/gxnk/reinforcement-learning-code/tree/master/第一讲 gym 学习及二次开发
中国女足真牛逼!~