回顾上文中的DDPG,DDPG是源于DQN,它使用神经网络替换maxQ(s',a')的功能来解决连续空间问题。也就是说DDPG的Actor网络输出的是一个动作,他的目标是输出一个动作,这个动作输入到Critic后,能过获得最大的Q值。和DQN一样,更新的时候如果更新目标在不断变化会使学习过程困难,所以需要固定目标网络,求target的网络更新后再赋值参数,所以需要四个网络。
本文介绍的PPO和DDPG一样,也是AC的架构,但是和DDPG不一样,PPO的actor输出的不是一个动作,而是一个策略,另外DDPG是off_policy,而PPO是on_policy。
下面我们开始详细介绍PPO。
PPO
PPO(Proximal Policy Optimization)近端策略优化算法。
论文:https://arxiv.org/pdf/1707.06347.pdf
代码:https://github.com/openai/baselines/tree/master/baselines
在PG中,我们使用策略pi收集的样本进行参数更新后,数据不能重复利用,需要使用新策略pi和环境互动后再收集数据,非常耗时而且浪费数据。
在DQN中,我们能够使用off_policy,因为我们输出的是动作和策略没有直接的关系,数据能够重复利用。
为了解决数据问题,PPO通过重要性采样方案,重复使用样本。
重要性采样
重要性采样是我们从一个分布中采样比较困难时,可以从另一个容易的分布中采样,两种采样分布不同,所以需要将采样到的样本进行一个修正。在PG中,我们用策略pi和环境互动,收集一批数据,然后使用pi学习这些数据后更新策略为pii,策略进行了更新。如果我们想要新策略pii继续学习这一批数据,就需要修正数据,在更新梯度中新增frac{p(a|s)}{p'(a|s)} 。分子就是当前策略,分母是采样策略。我们用采样策略收集数据,让p去学习,当p变化时只需要用修正项修正就好了。
注意这里important sampling不能算是off-policy,PPO里面的 important sampling采样的过程仍然是在同一个策略生成的样本,并未使用其他策略产生的样本,因此它是on-policy的。而DDPG这种使用其他策略产生的数据来更新另一个策略的方式才是off-policy。
在PPO中,如果我们想使用策略B抽样出来的数据,来更新策略P,可以将td_error乘一个重要性权重:
IW=P(a)/B(a)
就是目标策略动作概率a的概率除以行为策略出现a的概率。
通过重要性采样方案,可以解决数据使用效率低的问题。
clip_loss
另外策略更新还有一个不稳定的问题是,如果新旧策略差异大,更新不稳定。所有PPO是在策略梯度更新的基础上,添加一个约束,希望每次更新策略差异不要太大。TRPO使用的约束是直接约束,更新前后两个策略的KL距离不要超过一定的阈值;而PPO是在loss上进行约束。下面我们结合论文公式了解策略优化算法。
首先PG 策略的目标是:
TRPO使用新旧策略的重要性采样优化数据利用,其主要目标是,最大化目标函数,同时通过KL散度表示新旧策略差异,小于一定的置信域。
使用惩罚项替换约束,有公式
但是TRPO算法存在一个问题是约束计算KL散度需要计算二阶梯度,如果目标函数维度高计算量比较大,而且/beta值不好确认。
PPO算法的优化使用了如下公式:
其中,
通过引入clip函数,使第二项(蓝色虚线),必须在1-epsilon 和1 epsilon 之间
分下上图:绿色虚线表示min中第一项,不做任何处理;蓝色虚线表示第二项,如果新旧分布差异大,进行clip操作;红色虚线表示取min后函数分布情况。
如果A>0,说明优势函数为正,应该向这个动作方向学习,即增大r_t,横轴向右移动,红色线也提升,但是不能超过1 epsilon,保证新旧分布超逸不会相差太大;
如果A<0,说明优势函数为负,应该少向这个动作学习,即减小r_t,横轴向左移动,红色线也下降,但是不能小于1-epsilon,保证新旧分布超逸不会相差太大。
代码分析
下面从代码上进一步理解理论,具体逻辑通过注释介绍。
代码参考: https://github.com/nikhilbarhate99/PPO-PyTorch/blob/master/PPO.py
训练核心代码:
代码语言:python代码运行次数:2复制# 初始化PPO agent
ppo_agent = PPO(state_dim, action_dim, lr_actor, lr_critic, gamma, K_epochs, eps_clip, has_continuous_action_space, action_std)
# training loop
while time_step <= max_training_timesteps:
state = env.reset()
current_ep_reward = 0
# 一次探索
for t in range(1, max_ep_len 1):
# 使用策略选取动作,同时会存储每个step的状态和选取动作的概率分布
action = ppo_agent.select_action(state)
# 执行动作后返回状态,reward, 是否结束
state, reward, done, _ = env.step(action)
# 存储每个step的即刻reward
ppo_agent.buffer.rewards.append(reward)
ppo_agent.buffer.is_terminals.append(done)
time_step =1
current_ep_reward = reward
# update PPO_agent
if time_step % update_timestep == 0:
reports = ppo_agent.update()
学习核心代码:
代码语言:python代码运行次数:3复制def update(self):
# 通过存储的buffer.rewards,计算累计rewards
rewards = []
discounted_reward = 0
for reward, is_terminal in zip(reversed(self.buffer.rewards), reversed(self.buffer.is_terminals)):
if is_terminal:
discounted_reward = 0
discounted_reward = reward (self.gamma * discounted_reward)
rewards.insert(0, discounted_reward)
# 进行归一化
rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
rewards = (rewards - rewards.mean()) / (rewards.std() 1e-7)
# 旧策略的状态,动作,log概率--转tensor
old_states = torch.squeeze(torch.stack(self.buffer.states, dim=0)).detach().to(device)
old_actions = torch.squeeze(torch.stack(self.buffer.actions, dim=0)).detach().to(device)
old_logprobs = torch.squeeze(torch.stack(self.buffer.logprobs, dim=0)).detach().to(device)
# K epochs个迭代,使用旧策略数据优化新策略
for _ in range(self.K_epochs):
# 使用旧策略的状态和action,使用新策略进行evaluate
logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)
state_values = torch.squeeze(state_values)
# 计算 ratios = (pi_theta / pi_theta_old)
ratios = torch.exp(logprobs - old_logprobs.detach())
# 计算优势函数:Q(s,a) - V(s), Q(s,a)在环境中执行了a得到的Q值函数,V(s) is critic得到的V值函数
# advantage>0,说明这个执行此动作为好的方向
advantages = rewards - state_values.detach()
# ppo_loss
surr1 = ratios * advantages
surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1 self.eps_clip) * advantages
ppo_loss = -torch.min(surr1, surr2)
# critic_loss
critic_loss = 0.5 * self.MseLoss(state_values, rewards)
# entropy_loss,防止陷入次优解,entropy_loss可以让分布不要过度集中
entropy_loss = - 0.01 * dist_entropy
# 论文中对三个loss进行加权处理,最大化转最小化,所以取了负数
# loss = -torch.min(surr1, surr2) 0.5*self.MseLoss(state_values, rewards) - 0.01*dist_entropy
loss = ppo_loss critic_loss entropy_loss
self.optimizer.zero_grad()
loss.mean().backward()
self.optimizer.step()
ratios_report = ratios.mean().detach()
advantages_report = torch.abs(advantages).mean().detach()
loss_report = (ppo_loss entropy_loss).mean().detach()
ppo_loss_report = ppo_loss.mean().detach()
entropy_loss_report = entropy_loss.mean().detach()
# 更新旧策略权重
self.policy_old.load_state_dict(self.policy.state_dict())
self.buffer.clear()
return loss_report, ppo_loss_report, entropy_loss_report, advantages_report, ratios_report