强化学习系列(八)--PPO

2024-05-09 21:17:19 浏览数 (2)

回顾上文中的DDPG,DDPG是源于DQN,它使用神经网络替换maxQ(s',a')的功能来解决连续空间问题。也就是说DDPG的Actor网络输出的是一个动作,他的目标是输出一个动作,这个动作输入到Critic后,能过获得最大的Q值。和DQN一样,更新的时候如果更新目标在不断变化会使学习过程困难,所以需要固定目标网络,求target的网络更新后再赋值参数,所以需要四个网络。

本文介绍的PPO和DDPG一样,也是AC的架构,但是和DDPG不一样,PPO的actor输出的不是一个动作,而是一个策略,另外DDPG是off_policy,而PPO是on_policy。

下面我们开始详细介绍PPO。

PPO

PPO(Proximal Policy Optimization)近端策略优化算法。

论文:https://arxiv.org/pdf/1707.06347.pdf

代码:https://github.com/openai/baselines/tree/master/baselines

在PG中,我们使用策略pi收集的样本进行参数更新后,数据不能重复利用,需要使用新策略pi和环境互动后再收集数据,非常耗时而且浪费数据。

在DQN中,我们能够使用off_policy,因为我们输出的是动作和策略没有直接的关系,数据能够重复利用。

为了解决数据问题,PPO通过重要性采样方案,重复使用样本。

重要性采样

重要性采样是我们从一个分布中采样比较困难时,可以从另一个容易的分布中采样,两种采样分布不同,所以需要将采样到的样本进行一个修正。在PG中,我们用策略pi和环境互动,收集一批数据,然后使用pi学习这些数据后更新策略为pii,策略进行了更新。如果我们想要新策略pii继续学习这一批数据,就需要修正数据,在更新梯度中新增frac{p(a|s)}{p'(a|s)} 。分子就是当前策略,分母是采样策略。我们用采样策略收集数据,让p去学习,当p变化时只需要用修正项修正就好了。

注意这里important sampling不能算是off-policy,PPO里面的 important sampling采样的过程仍然是在同一个策略生成的样本,并未使用其他策略产生的样本,因此它是on-policy的。而DDPG这种使用其他策略产生的数据来更新另一个策略的方式才是off-policy。

在PPO中,如果我们想使用策略B抽样出来的数据,来更新策略P,可以将td_error乘一个重要性权重:

IW=P(a)/B(a)

就是目标策略动作概率a的概率除以行为策略出现a的概率。

通过重要性采样方案,可以解决数据使用效率低的问题。

clip_loss

另外策略更新还有一个不稳定的问题是,如果新旧策略差异大,更新不稳定。所有PPO是在策略梯度更新的基础上,添加一个约束,希望每次更新策略差异不要太大。TRPO使用的约束是直接约束,更新前后两个策略的KL距离不要超过一定的阈值;而PPO是在loss上进行约束。下面我们结合论文公式了解策略优化算法。

首先PG 策略的目标是:

TRPO使用新旧策略的重要性采样优化数据利用,其主要目标是,最大化目标函数,同时通过KL散度表示新旧策略差异,小于一定的置信域。

使用惩罚项替换约束,有公式

但是TRPO算法存在一个问题是约束计算KL散度需要计算二阶梯度,如果目标函数维度高计算量比较大,而且/beta值不好确认。

PPO算法的优化使用了如下公式:

其中,

通过引入clip函数,使第二项(蓝色虚线),必须在1-epsilon1 epsilon 之间

分下上图:绿色虚线表示min中第一项,不做任何处理;蓝色虚线表示第二项,如果新旧分布差异大,进行clip操作;红色虚线表示取min后函数分布情况。

如果A>0,说明优势函数为正,应该向这个动作方向学习,即增大r_t,横轴向右移动,红色线也提升,但是不能超过1 epsilon,保证新旧分布超逸不会相差太大;

如果A<0,说明优势函数为负,应该少向这个动作学习,即减小r_t,横轴向左移动,红色线也下降,但是不能小于1-epsilon,保证新旧分布超逸不会相差太大。

代码分析

下面从代码上进一步理解理论,具体逻辑通过注释介绍。

代码参考: https://github.com/nikhilbarhate99/PPO-PyTorch/blob/master/PPO.py

训练核心代码:

代码语言:python代码运行次数:2复制
# 初始化PPO agent
ppo_agent = PPO(state_dim, action_dim, lr_actor, lr_critic, gamma, K_epochs, eps_clip, has_continuous_action_space, action_std)
# training loop
while time_step <= max_training_timesteps:

    state = env.reset()
    current_ep_reward = 0
    # 一次探索
    for t in range(1, max_ep_len 1):

        # 使用策略选取动作,同时会存储每个step的状态和选取动作的概率分布
        action = ppo_agent.select_action(state)
        # 执行动作后返回状态,reward, 是否结束
        state, reward, done, _ = env.step(action)

        # 存储每个step的即刻reward
        ppo_agent.buffer.rewards.append(reward)
        ppo_agent.buffer.is_terminals.append(done)

        time_step  =1
        current_ep_reward  = reward

        # update PPO_agent
        if time_step % update_timestep == 0:
            reports = ppo_agent.update()

学习核心代码:

代码语言:python代码运行次数:3复制
def update(self):

    # 通过存储的buffer.rewards,计算累计rewards
    rewards = []
    discounted_reward = 0
    for reward, is_terminal in zip(reversed(self.buffer.rewards), reversed(self.buffer.is_terminals)):
        if is_terminal:
            discounted_reward = 0
        discounted_reward = reward   (self.gamma * discounted_reward)
        rewards.insert(0, discounted_reward)

    # 进行归一化
    rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
    rewards = (rewards - rewards.mean()) / (rewards.std()   1e-7)

    # 旧策略的状态,动作,log概率--转tensor
    old_states = torch.squeeze(torch.stack(self.buffer.states, dim=0)).detach().to(device)
    old_actions = torch.squeeze(torch.stack(self.buffer.actions, dim=0)).detach().to(device)
    old_logprobs = torch.squeeze(torch.stack(self.buffer.logprobs, dim=0)).detach().to(device)

    # K epochs个迭代,使用旧策略数据优化新策略
    for _ in range(self.K_epochs):
        # 使用旧策略的状态和action,使用新策略进行evaluate
        logprobs, state_values, dist_entropy = self.policy.evaluate(old_states, old_actions)

        state_values = torch.squeeze(state_values)

        # 计算 ratios = (pi_theta / pi_theta_old)
        ratios = torch.exp(logprobs - old_logprobs.detach())

        # 计算优势函数:Q(s,a) - V(s), Q(s,a)在环境中执行了a得到的Q值函数,V(s) is critic得到的V值函数
        # advantage>0,说明这个执行此动作为好的方向
        advantages = rewards - state_values.detach()
        # ppo_loss
        surr1 = ratios * advantages
        surr2 = torch.clamp(ratios, 1 - self.eps_clip, 1   self.eps_clip) * advantages
        ppo_loss = -torch.min(surr1, surr2)
        # critic_loss
        critic_loss = 0.5 * self.MseLoss(state_values, rewards)
        # entropy_loss,防止陷入次优解,entropy_loss可以让分布不要过度集中
        entropy_loss = - 0.01 * dist_entropy

        # 论文中对三个loss进行加权处理,最大化转最小化,所以取了负数
        # loss = -torch.min(surr1, surr2)   0.5*self.MseLoss(state_values, rewards) - 0.01*dist_entropy
        loss = ppo_loss   critic_loss   entropy_loss

        self.optimizer.zero_grad()
        loss.mean().backward()
        self.optimizer.step()
        ratios_report = ratios.mean().detach()
        advantages_report = torch.abs(advantages).mean().detach()
        loss_report = (ppo_loss   entropy_loss).mean().detach()
        ppo_loss_report = ppo_loss.mean().detach()
        entropy_loss_report = entropy_loss.mean().detach()

    # 更新旧策略权重
    self.policy_old.load_state_dict(self.policy.state_dict())

    self.buffer.clear()
    return loss_report, ppo_loss_report, entropy_loss_report, advantages_report, ratios_report

0 人点赞