强化学习之不基于模型的控制(五)

2021-01-04 10:43:17 浏览数 (1)

  • 前一讲讲解了智能体如何在不基于模型的情况下如何进行预测,也就是求解在给定策略下的状态价值或行为价值函数.本章则主要讲解在不基于模型的条件下如何通过个体的学习优化价值函数,同时改善自身行为的策略以最大化获得累积奖励的过程,这一过程也被称为不基于模型的控制.

目录

  • 简介
  • 行为价值函数的重要性
epsilon

-贪婪策略

  • 现时策略蒙特卡洛控制
  • 现时策略时序差分控制
  • 离线策略学习
  • 编程实践(蒙特卡洛学习求二十一点游戏最优策略)
  • 编程实践(构建基于gym的有风格子世界,个体以及交互
  • 参考

简介

  • 生活中有很多关于优化控制的问题,比如控制一个大厦内的多个电梯使得效率最高;控制直升机的特技飞行,围棋游戏等等.这类问题要么就是我们无法掌握他们的环境动力学特征,但是我们可以去经历,在实践中尝试构建理解环境的模型;要么虽然问题的环境动力学特征是已知的,但由问题的规模太大以至于计算机根据一般算法无法高效求解,除非使用采样的方法.
  • 在学习动态规划进行策略评估,优化时,我们能体会到:个体在于环境进行交互时,其实际交互的行为需要基于一个策略产生.在评估一个状态或行为的价值时,也需要基于一个策略,因为不同的策略下同一个状态或状态行为对的价值是不同的.我们把用来指导个体产生与环境进行实际交互行为的策略称为行为策略,把用来评价状态或行为价值的策略或待优化的策略成为目标策略.
  • 如果个体在学习的过程中优化策略与自己的行为策略是同一个策略时,这种学习方法称为现时策略学习(on-policy learning),如果个体在学习过程中优化的策略与自己的行为策略是不同的策略时,这种学习方式称为借鉴策略学习(off-policy learning)

行为价值函数的重要性

  • 在不基于模型的控制时,我们将无法通过分析,比较基于状态的价值来改善贪婪策略,这是因为基于状态价值的贪婪策略改善需要知晓状态间的转移概率(已知环境动力学):
  • 生活中也是如此,有时候一个人给自己制定了一个价值很高的目标,却发现不知采取如何的行为来达到这个目标.与其花时间比较目标与现实的差距,倒不如立足于当下,在所有可用的行为中选择一个最高价值的行为.因此如果能够确定某状态下所有状态行为对的价值,那么自然就比较容易从中选出一个最优价值对应的行为了.实践证明,在不基于模型的强化学习问题中,确定状态行为对的价值要容易得多.简化过程如下图所示:
  • 这样做就可以通过状态行为对的价值来改善策略而不需要知道整个模型,只需要知道在某个状态下采取什么样的行为价值最大即可.

epsilon

-贪婪策略

  • 在不基于模型,基于采样的蒙特卡洛或时序差分学习中使用贪婪算法通常不能收敛至最优策略.虽然DP,MT,TD算法都采用通过后续状态价值回溯的办法确定当前状态价值,但动态规划算法是考虑了一个状态后续所有状态价值的.而后两者则仅能考虑到有限次数,已经采样经历过的状态,那些事实存在但还没经历过的状态对于后两者算法来说都是未探索的不被考虑的状态,有些状态虽然经历过,但由于经历次数不多对其价值的估计也不一定准确.如果存在一些价值更高的未被探索的状态使用贪婪算法将无法探索到这些状态,而已经经历过但价值比较低的状态也很难再次被经历,如此将无法得到最优策略.我们使用一个例子来解释:
  • 如上图所示,在你面前有两扇门,考虑如下的行为,奖励并使用贪婪算法改善策略:
  • 你打开左侧门得到即时奖励为0:
V(left) = 0
  • 你打开右侧门得到即时奖励为1:
V(right) = 1
  • 在使用贪婪算法时,接下来你将会继续打开右侧的门,而不会尝试打开左侧的门
  • 你打开右侧门得到即时奖励为3:
V(right) = 2
  • 你打开右侧门得到即时奖励为2:
V(right) = 2
cdots
  • 这种情况下,打开右侧门显然不一定是最好的选择(因为左侧门第一次探索的价值低就永远不打开左侧门).贪婪策略产生问题的根源是无法保证持续的探索,为了解决这个问题,一种不完全的贪婪策略(
epsilon

-贪婪策略)被提出,其基本思想就是使得某一状态下所有可能的行为都有几率被选中执行,具体通过设置一个比较小的

epsilon

值,使用

1-epsilon

的概率贪婪地选择目前认为是最大行为价值的行为,而用

epsilon

的概率随机从所有m个可选行为中选择行为,即:

现时策略蒙特卡洛控制

  • 现时策略蒙特卡洛控制通过
epsilon

-贪婪策略采样一个或多个完整的状态序列后,平均得出某一状态行为对的价值,并持续进行策略评估和改善(通过状态行为对的价值Q).如下图所示:

  • 图中每一个向上或者向下的箭头都对应着一个或多个Episode.也就是说我们一般在经历了一个或多个Episode之后才进行依次Q函数更新或策略改善.但使用
epsilon

-贪婪策略进行现时蒙特卡洛控制仍然只能得到基于该策略的近似行为价值函数,这是因为该策略一直在进行探索,而没有一个终止条件.因此我们必须关注以下两个方面:一方面我们不想丢掉任何更好信息和状态,另一方面随着我们策略的改善我们最终希望能终止于某一个最优策略.为此引入了另一个理论概念GLIE(greedy in the Limit with Infinite Exploration).它包含两层意思,一是所有的状态行为对会被无限次探索:

  • 二是另外随着采样趋向无穷多,策略收敛至一个贪婪策略:
  • 存在如下的定理:GLIE蒙特卡洛控制能收敛至最优的状态行为价值函数:
  • 如果在使用
epsilon

-贪婪策略时,能令

epsilon

随采样次数的无限增加而趋向于0就符合GLIE.这样基于GLIE的蒙特卡洛控制流程如下:

  1. 基于给定策略
pi

,采样第

k

个完整的状态序列:{

S_1,A_1,R_2,cdots ,S_T

}

  1. 对于该状态序列里出现的每一个状态行为对
(S_t,A_t)

,更新其计数N和行为价值函数Q:

  1. 基于新的行为价值Q以如下方式改善策略:
  • 在实际应用中,
epsilon

的取值可不局限于取

1/k

,只要符合GLIE特性的设计均可以收敛至最优策略(价值)

现时策略时序差分控制

  • 通过上一章关于预测的学习,我们体会到时序差分(TD)学习相比蒙特卡洛(MC)学习有很多优点:低变异性,可以在线实时学习,可以学习不完整状态序列等.在控制问题上使用TD学习同样具备上述的一些优点.我们将介绍Sarsa算法和Sarsa(
lambda

)算法

Sarsa算法

  • Sarsa的名称来源于下图所示的序列描述:针对一个状态S,个体通过行为策略产生一个行为A,执行该行为进而产生一个状态行为对(S,A),环境收到个体的行为后会告诉个体即时奖励R以及后续进入的状态S';个体在状态S'时遵循当前行为策略产生一个新行为A',个体此时并不执行该行为,而是通过行为价值函数得到后一个状态行为对(S',A')的价值,利用这个新的价值和即时奖励R来更新前一个状态行为对(S,A)的价值.
  • 与MC算法不同的是,Sarsa算法在单个状态序列内每一个时间步,在状态S下采取一个行为A到达状态S'后都要更新状态行为对(S,A)的价值Q(S,A).这一过程同样使用
epsilon

-贪婪策略进行策略迭代:

  • Sarsa的算法流程如算法1所述:
  • 在Sarsa算法中,Q(S,A)的值使用一张大表来存储,这不是很适合解决规模很大的问题;对于每一个状态序列,在S状态时采取的行为A是基于当前的行为策略的,也就是该行为是与环境进行交互实际使用的行为.在更新状态行为对(S,A)的价值循环里,在个体状态s'下也依据该行为策略产生了一个行为A',该行为在当前循环周期里用来得到状态行为对(S',A')的价值,并借此来更新状态行为对(S,A)的价值,在下一个循环周期(时间步)内,状态S'和行为A'将转换身份为当前状态和当前行为,该行为将被执行.
  • 在更新行为价值时,参数
alpha

是学习速率参数,

gamma

是衰减因子.当行为策略满足前文所述GLIE特性同时学习率参数

alpha

满足:

  • 时,Sarsa算法将收敛至最优策略和最优价值函数.
  • 我们使用一个经典环境,一个有风格子世界来解释Sarsa算法的学习过程.如下图所示一个10 * 7的长方形格子世界,标记有一个起始位置S和一个终止目标位置G,格子下方的数字表示对应的列中一定强度的风.当个体进入该列的某个格子时,会按图中箭头所示的方向自动移动数字表示的格数,借此来模拟世界中风的作用.同样格子世界是有边界的,个体任意时刻只能处在世界内部的一个格子中.个体并不清楚这个世界的构造以及有风,也就是说它不知道格子是长方形的,也不知道边界在哪里,也不知道自己在里面移动移步后下一个格子与之前格子的相对位置关系,当然它也不清楚起始位置,终止目标的具体位置.但是个体会记住曾经经过的格子,下次在进入这个格子时,它能准确地辨认出这个格子曾经什么时候来过.格子可以执行的行为是朝上,下,左,右移动一步.现在要求解的问题是个体应该遵循怎样的策略才能尽快的从起始位置到达目标位置.
  • 为了使用计算机程序解决这个问题,我们首先将这个问题用强化学习的语言再描述一遍.这是一个不基于模型的控制问题,也就是要在不掌握马尔科夫决策过程的情况下寻找最优策略.环境世界中每一个格子可以用水平和垂直坐标来描述,如此构成拥有70个状态的状态空间S.行为空间A具有四个基本行为.环境的动力学特征不被个体掌握,但个体每执行一个行为,会进入一个新的状态,该状态由环境告知个体,但环境不会直接告诉个体该状态的坐标位置.即时奖励是根据任务目标设定,现要求尽快从初始位置移动到目标位置,我们可以设定每移动一步只要不是进入目标位置都给予一个-1的惩罚,直至进入目标位置后获得奖励0同时永远停留在该位置.
  • 我们将在编程实践环节给出用Sarsa算法解决有风格子世界问题的完整代码,这里先给出最优策略为依次采取如下的行为序列:
  • 右、右、右、右、右、右、右、右、右、下、下、下、下、左、左
  • 个体找到该最优策略的进度以及最优策略下个体从初始状态到目标状态的行为轨迹如下图:
  • 可以看出个体在一开始的几百甚至上千步都在尝试各种操作而没有完成一次从起始位置到目标位置的经历.不过一旦个体找到一次目标位置后,它的学习过程将明显加速,最终找到了一条只需要15步的最短路径.由于世界的构造以及其内部风的影响,个体两次利用风的影响,先向右并北漂到达最上角后折返南下才找到这条最短路径.其他路径均比该路径所花费的步数要多

Sarsa

lambda

算法

  • 在前一章,我们学习了n-步收获,这里引出一个n-步Sarsa的概念.观察下面的式子:
  • 这里
q_t

对应的是一个状态行为对

(s_t,a_t)

,表示的是在某个状态下采取某个行为的价值大小.如果n=1,则表示状态行为对

(s_t,a_t)

的Q价值可以用两部分表示,一部分是离开状态

s_t

得到的即时奖励,

R_{t 1}

,即时奖励只与状态有关,与该状态下采取的行为无关;另一部分是考虑了衰减因子的状态行为对

(s_{t 1},a_{t 1})

的价值:环境给了个体一个后续状态

s_{t 1}

,观察在该状态基于当前策略得到的行为

a_{t 1}

时的价值

Q(s_{t 1},a_{t 1})

.当n=2时,就用前2步的即时奖励,然后再用后续的

Q(s_{t 2},a_{t 2})

代替;如果n趋向于无穷大,则表示一直用带衰减因子的即时奖励计算Q值,直至状态序列结束.

  • 定义n-步Q收获(Q-return)为
  • 有了如上定义,可以把n-步Sarsa用n-步Q收获来表示,如下式
  • 类似于TD(
lambda

),可以给n-步Q收获中的每一步收获分配一个权重,并按权重对每一步Q收获求和,那么将得到

q^{lambda}_t

收获,它结合了所有n-步Q收获:

  • 如果使用某一状态
q^{lambda}_t

收获来更新状态行为对的Q值,那么可以表示成如下的形式:

  • 这个公式即是Sarsa
(lambda)

的前向认识,使用它更新Q价值需要遍历完整的状态序列.与TD(

lambda

)类似,我们也可以反向理解Sarsa

(lambda)

.同样引入效用追迹(eligibility traces,ET),不同的是这次的E值不是针对一个状态,而是一个状态行为对:

  • 它体现的是一个结果与某一个状态行为对的因果关系,与得到该结果最近的状态行为对,以及那些在此之前频繁发生的状态行为对得到这个结果的影响最大..
  • 下式是引入ET概念之后的Sarsa(
lambda

)算法中对Q值更新的描述:

  • 这个公式便是反向认识的Sarsa(
lambda

),基于反向认识的Sarsa(

lambda

)算法将可以有效地在线学习,数据学习完即可丢弃(因为不需要学习完整的Episode).

  • Sarsa(
lambda

)的算法流程如算法2描述:

  • 这里要注意一下,
E(s,a)

是在每浏览完一个Episode后需要重新置0,这体现了ET仅在一个Episode中发挥作用;其次要提及的是算法更新Q和E的时候针对的不是某个Episode里的Q或E,而是针对个体掌握的整个状态空间和行为空间产生的Q和E.

比较Sarsa和Sarsa

(lambda)
  • 下图用格子世界的例子具体解释了Sarsa和Sarsa(
lambda

)算法的区别:假设图最左侧描述的路线是个体采取两种算法中的一个得到的一个完整状态序列的路径:

  • 为了下文更方便描述,解释两个算法之间的区别,先做几个小约定:
  1. 认定每一步的即时奖励为0,直到终点处即时奖励为1;
  2. 根据算法,除了终点以外的任何状态行为对的Q值可以在初始时设为任意的,但我们设定所有的Q值均为0;
  3. 该路线是个体第一次找到终点的路线.
  • Sarsa算法:由于是现时策略学习(on-policy),一开始个体对环境一无所知,即所有的Q值均为0,它将随机选取移步行为.在到达终点前的每一个位置S,个体依据当前策略,产生一个移步行为,执行该行为,环境会将其放置到一个新位置
S'

,同时给以即时奖励0,在这个新位置上,根据当前的策略它会产生新位置下的一个行为,个体不执行该行为,仅仅在表中查找新状态下新行为的Q值,由于Q=0,依据更新公式,它将把刚才离开的位置以及对应行为的状态行为对价值Q更新为0.如此直到个体到达终点位置

S_G

,它获得一个即时奖励1,此时个体会依据公式更新其到达终点位置前所在的那个位置(暂用

S_H

表示,也就是终点位置下方,向上的箭头所在的位置)时采取向上移步的那个状态行为对价值

Q(S_H,A_{up})

,它将不再是0,这是个体在这个状态序列中唯一一次用非0数值来更新Q值.这样完成一个状态序列,此时个体已经并只进行了一次有意义的行为价值函数的更新;同时依据新的价值函数产生了新的策略.这个策略绝大多数与之前的相同,只是当个体处在特殊位置

S_H

时将会有一个近乎确定的向上的行为.这里请不要误认为Sarsa算法只在经历一个完整的状态序列之后才会更新,在这个例子中,由于我们的设定,它每走一步都会更新,只是多数时候更新的数据与原来的一样罢了.

  • 此时如果要求个体继续学习,则环境将其放入起点.个体的第二次寻路过程一开始与首次一样都是盲目随机的,直到其进入终点位置下方的位置
S_H

,在这个位置,个体更新的策略将使其有非常大的几率选择向上的行为直接进入终点位置

S_G

.

  • 同样,经过第二次寻路,个体了解到终点下方的位置
S_H

价值比较大,因为在这个位置直接采取向上移步的行为就可以拿到终点的即时奖励.因此它会将那些通过移动一步就可以到达

S_H

位置的其他位置以及相应的到达该位置所要采取的行为对所对应的价值进行提升.如此反复,如果采用贪婪策略更新,个体最终将得到一条到达终点的路径,不过这条路径的倒数第二步永远是在终点位置的下方.如果采用

epsilon

-贪婪策略更新,那么个体还会尝试到终点位置的左上右等其他方向的相邻位置价值也比较大,此时个体每次完成的路径可能都不一样.通过重复多次搜索,这种Q值的实质有意义的更新将覆盖越来越多的状态行为对,个体在早起采取的随机行为步数将越来越少,直至最终实质性的更新覆盖到起始位置.此时个体将能直接给出一条确定的从起点到终点的路径.

  • Sarsa
(lambda)

算法:该算法同时还针对每一次状态序列维护一个关于状态行为对

(S,A)

的E表,初始时E表值均为0.当个体首次在起点

S_0

决定移动一步

A_0

(假设向右)时,它环境告知新位置为

S_1

,此时发生如下事情:首先个体会做一个标记,使

E(S_0,A_0)

的值增加1,表明个体刚刚经历过这个事件

(S_0,A_0)

;其次它要估计这个事件对于解决整个问题的价值,也就是估计TD误差,此时依据公式结果为0,说明个体认为在起点处向右走没什么价值,这个"没有什么价值"有两层含义:不仅说明在

S_0

处向右走没什么价值,同时表明个体认为所有能够到达

S_0

状态的状态行为对的价值没有任何积极或消极的变化.随后个体将要更新该状态序列中所有已经经历的

Q(S,A)

值,由于存在E值,那些在

(S_0,A_0)

之前近期发生或频繁发生的

(S,A)

将改变得比其他Q值明显些,此外个体还要更新其E值,以备下次使用.对于刚从起点出发的个体,这次更新没有使得任何Q值发生变化,仅仅在

E(S_0,A_0)

处有了一个实质的变化.随后的过程类似,个体有意义的发现就是对路径有了一个记忆,体现在E里,具体的Q值没发生变化.这一情况直到个体到达终点位置时发生改变.此时个体得到了一个即时奖励1,它会发现这一次变化(从

S_H

采取向上行为

A_{up}

到达

S_G

)价值明显,它会计算这个TD误差为1,同时告诉整个经历过程中所有(S,A),根据其与

(S_H,A_{up})

的密切关系更新这些状态行为对的价值Q,个体在这个状态序列中经历的所有状态行为对的Q值都将得到一个非0的更新,但是那些在个体到达

S_H

之前就近发生以及频繁发生的状态行为对的价值提升会更加明显.

  • 在图示的例子中没有显示某一状态行为频发的情况,如果个体在寻路的过程中绕过一些弯,多次到达同一个位置,并在该位置采取的相同的动作,最终个体到达终止状态时,就产生了多次发生的
(S,A)

,这时的

(S,A)

的价值也会得到较多提升.也就是说,个体每得到一个即时奖励,同时会对所有历史事件的价值进行依次更新,当然那些与该事件关系紧密的事件价值改变的较为明显.这里的事件指的就是状态行为对.在同一状态采取不同行为是不同的事件.

  • 当个体重新从起点第二次出发时,它会发现起点处向右走的价值不再是0.如果采用贪婪策略更新,个体将根据上次经验得到的新策略直接选择右走,并且一直按照原路找到终点.如果采用
epsilon

-贪婪策略更新,那么个体还会尝试新的路线.由于为了解释方便,做了一些约定,这会导致问题并不要求个体找到最短一条路径,如果需要找最短路径,需要在每一次状态转移时给个体一个负的奖励.

  • 可以看出Sarsa(
lambda

)算法在状态每发生一次变化都对整个状态空间和行为空间的Q和E值进行更新,而事实上在每一个状态序列里,只有个体经历过的状态行为对的E值才可能不为0,为什么不仅仅对该状态序列涉及到的状态行为对进行更新呢?理论上是可以仅对该状态序列里设计的状态行为对的E和Q进行更新的,不过这要额外维护一个表,而往这个额外的表里添加新的状态行为对的E和Q值比更新总的状态行为空间要麻烦,特别是在早期个体没有一个较好的策略的时候需要花费很长很长时间才能找到终点位置,这在一定程度上反而没有更新状态空间省时.不过随着学习深入,策略得到优化,此表的规模会变小.

离线策略学习 off-policy learning

  • 现时策略学习的特点就是产生实际行为额策略与更新价值(评价)所使用的策略是同一个策略,而离线策略学习(off-policy learning)中产生指导自身行为的策略
mu(a|s)

与评价策略

pi(a|s)

是不同的策略,具体地说,个体通过策略

mu(a|s)

生成行为与环境发生实际交互,但是在更新这个状态行为对价值时使用的是目标策略

pi(a|s)

.目标策略

pi(a|s)

多数是已经具备一定能力的策略,例如人类已有的经验或个其他个体学习到的经验.离线策略学习相当于站在目标策略

pi(a|s)

的"肩膀"上学习.离线策略学习根据是否经历完整的状态序列可以将其分为基于蒙特卡洛的和基于TD的.基于蒙特卡洛的离线测了学习目前认为仅有理论上的研究价值,在实际中用处不大,这里主要讲解常用的离线策略TD学习.

  • 离线策略TD学习的任务就是使用TD方法在目标策略
mu(a|s)

的基础上更新行为价值,进而优化行为策略:

  • 对于上式,我们可以这样理解:个体处在状态
S_t

中,基于行为策略

mu

产生了一个行为

A_t

,执行该行为后进入新的状态

S_{t 1}

,离线策略学习要做的事情就是,比较评价策略和行为策略在状态

S_t

下产生同样的行为

A_t

的概率的比值,如果这个比值接近1,说明两个策略在状态

S_t

下采取行为

A_t

的概率差不多,此次对于状态

S_t

价值的更新同时得到两个策略的支持.如果这一概率比值很小,则表明评价策略

pi

在状态

S_t

下采取行为

A_t

的概率差不多,此次对于状态

S_t

价值的更新同时得到两个策略的支持.如果这一概率比值很小,则表明评价策略

pi

在状态

S_t

下选择

A_t

的机会要小一些,此时为了从评价策略学习,我们认为这一步状态价值的更新不是很符合评价策略,因而在更新时打些折扣.类似的,如果这个概率比值大于1,说明按照评价策略,选择行为

A_t

的几率要大于当前行为策略产生

A_t

的概率,此时应该对该状态的价值更新就可以大胆些.

  • 离线策略TD学习中一个典型的行为策略
mu

是基于行为价值函数

Q(s,a) epsilon

-贪婪策略,评价策略

pi

则是基于

Q(s,a)

的完全贪婪策略,这种学习方法称为Q学习(Q-learning)

  • Q学习的目标是得到最优价值
Q(s,a)

,在Q学习的过程中,t时刻的与环境进行交互的行为

A_t

由策略

mu

产生:

  • 其中策略
mu

是一个

epsilon

-贪婪策略.

t 1

时刻用来更新的行为

A'_{t 1}

由下式产生:

  • 其中策略
pi

是一个完全贪婪策略.

Q(S_t,A-t)

按下式更新:

  • 其中红色部分的TD目标是基于评价策略
pi

产生的行为A'得到的Q值.根据这种价值更新的方式,状态

S_t

根据

epsilon

-贪婪策略得到的行为

A_t

的价值将朝着

S_{t 1}

状态下贪婪策略确定的最大行为价值的方向做一定比例的更新.这种算法能够使个体的行为策略

mu

更加接近贪婪策略,同时保证个体能持续探索并经历足够丰富的新状态,并最终收敛至最优策略和最优行为价值函数.

  • 这样Q学习的TD目标值就可以被简化:
  • 所以Q学习具体行为价值更新的公式为:
  • Q学习的算法流程如算法3所述:
  • 这里通过悬崖行走的例子简要讲解Sarsa算法与Q学习算法在学习过程中的差别.任务要求个体从悬崖的一端以尽可能快的速度行走到悬崖的另一端,每多走一步给以-1的奖励.图中悬崖用灰色的长方形表示,在其两端一个是起点,一个是目标终点.个体如果坠入悬崖将得到一个非常大的负向奖励(-100)回到起点.可以看出最优路线是贴着悬崖上方行走.Q学习算法可以较快的学习到这个最优策略,但是Sarsa算法那学到的是与悬崖保持一定距离的安全路线.在两种学习算法中,由于生成行为的策略依然是
epsilon

贪婪的,因此都会偶尔发生坠入悬崖的情况,如果

epsilon

贪婪策略中的

epsilon

随着经历的增加而趋向于0,则两种算法都将最后收敛至最优策略.

编程实践:蒙特卡罗学习求二十一点游戏最优策略

  • 在本节的实战中,我们将继续使用前一章二十一点游戏的例子,这次我们要使用基于现时策略蒙特卡洛控制(on-policy)的方法来求解二十一点游戏玩家的最优策略.我们把上一章编写的Dealer,Player和Arena类保存至文件blackjack.py中,并加载这些类和其他一些需要的库和方法:
代码语言:javascript复制
from blackjack import Player, Dealer, Arena
from utils import str_key,set_dict,get_dict
from utils import draw_value, draw_policy
from utils import epsilon_greedy_policy
import math
  • 目前的Player类不具备策略评估和更新策略的能力,我们基于Player类编写一个MC_Player类,使其具备使用蒙特卡洛控制算法进行策略更新的能力,代码如下:
代码语言:javascript复制
class MC_Player(Player):
    '''
    具备蒙特卡洛控制能力的玩家
    '''
    def __init__(self, name = "", A = None, display = False):
        super(MC_Player, self).__init__(name, A, display)
        self.Q = {} #某一状态行为对的价值,策略迭代时使用
        self.Nsa = {} #Nsa的计数:某一状态行为对出现的次数
        self.total_learning_times = 0
        self.policy = self.epsilon_greedy_policy
        self.lerning_method = self.learn_Q #有了自己的学习方法

    def learn_Q(self, episode, r): #从状态序列来学习Q值
        '''
        从一个episode学习
        :param episode:
        :param r:reward
        :return:None
        '''
        for s, a in episode:
            nsa = get_dict(self.Nsa, s, a)
            set_dict(self.Nsa, nsa 1, s, a)
            q = get_dict(self.Q, s, a)
            set_dict(self.Q, q (r-q)/(nsa 1), s, a)
        self.total_learning_times  = 1

    def reset_memory(self):
        '''
        忘记既往学习经历
        :return:None
        '''
        self.Q.clear()
        self.Nsa.clear()
        self.total_learning_times = 0

    def epsilon_greedy_policy(self, dealer, epsilon = None):
        '''
        这里的贪婪策略是带有epsilon参数的
        :param dealer:庄家
        :param epsilon:贪婪率
        :return:根据epsilon-greedy选出的策略
        '''
        player_points, _ = self.get_points()
        if player_points >= 21:
            return self.A[1]
        if player_points < 12:
            return self.A[0]
        else:
            A, Q = self.A, self.Q
            s = self.get_state_name(dealer)
            if epsilon is None:
                epsilon = 1.0/(1   4 * math.log10(1 player.total_learning_times))
        return epsilon_greedy_policy(A, s, Q, epsilon)
  • 这样,MC_Player类就具备了学习Q值的方法和一个
epsilon

-贪婪策略.接下来我们使用MC_Player类来生成对局,庄家的策略仍然不变.

代码语言:javascript复制
A = ["继续叫牌", "停止叫牌"]
display = False
player = MC_Player(A = A, display = display)
dealer = Dealer(A = A, display = display)
arena = Arena(A = A, display = display)
arena.play_games(dealer = dealer, player = player, num = 200000, show_statistic=True)
# 共玩了200000局,玩家赢84463局,和17295局,输98242局,胜率:0.42,不输率:0.51
# 100%|██████████| 200000/200000 [00:24<00:00, 8135.69it/s]
  • MC学习到的行为价值函数和最优策略可以使用下面的代码绘制:
代码语言:javascript复制
draw_value(player.Q, useable_ace = True, is_q_dict=True, A = player.A)
draw_policy(epsilon_greedy_policy, player.A, player.Q, epsilon = 1e-10, useable_ace = True)
draw_value(player.Q, useable_ace = False, is_q_dict=True, A = player.A)
draw_policy(epsilon_greedy_policy, player.A, player.Q, epsilon = 1e-10, useable_ace = False)
  • 绘制结果下图所示。策略图中深色部分 (上半部) 为“停止叫牌”,浅色部分 (下半部) 为“继续交牌”。基于前一章介绍的二十一点游戏规则,迭代 20 万次后得到的贪婪策略为:当玩家手中有可用的 Ace 时,在牌点数达到 17 点,仍可选择叫牌;而当玩家手中没有可用的 Ace 时,当庄家的明牌在 2-7 点间,最好停止叫牌,当庄家的明牌为 A 或者超过 7 点时,可以选择继续交牌至玩家手中的牌点数到达 16 为止。由于训练次数并不多,策略图中还有一些零星的散点.

编程实践:构建基于gym的有风格子世界及个体

  • 强化学习讲究个体与环境的交互,强化学习算法聚焦于如何提高个体在与环境交互中的智能水平,我们在进行编程实践时需要实现这些算法.为了验证这些算法的有效性,我们需要有相应的环境.我们既可以自己编写环境,像前面介绍的二十一点游戏那样,也可以借助一些别人编写的环境,把终点放在个体学习算法的实现上.

环境序列的管理

  • 个体与环境进行交互时会生成一个或多个甚至大量的状态序列,如何管理好这些状态序列是编程实践环节的一个比较重要的任务。状态序列是时间序列,在每一个时间步,个体与环境交互的信息包括个体的状态 (
S_t

)、采取的行为 (

A_t

)、上一个行为得到的奖励

R_{t 1}

这三个方面。描述一个完整的状态转换还应包括这两个信息:下一时刻个体的状态 (

S_{t 1}

) 和下一时刻的状态是否是终止状态 (is_end).

  • 多个相邻的状态转换构成了一个状态序列。多个完整的状态序列形成了个体的整体记忆,用Memory 或 Expeirience 表示。通常一个个体的记忆容量不是无限的,在记忆的容量用满的情况下如果个体需要记录新发生的状态序列则通常忘记最早的一些状态
  • 在强化学习的个体训练中,如果使用 MC 学习算法,则需要学习完整的序列;如果使用 TD学习,最小的学习单位则是一个状态转换。特别的,许多常用的 TD 学习算法刻意选择不连续的状态转换来学习来降低 TD 学习在一个序列中的偏差。在这种情况下是否把状态转换按时间次序以状态序列的形式进行管理就显得不那么重要了,本章为了解释一些 MC 学习类算法仍然采取了使用状态序列这一中间形式来管理个体的记忆.
  • 基于上述考虑,我们依次设计了 Transition 类、Episode 类和 Experience 类来综合管理个体与环境交互时产生的多个状态序列。
代码语言:javascript复制
class State(object):
    def __init__(self, name):
        self.name = name


class Transition(object):
    def __init__(self, s0, a0, reward: float, is_done: bool, s1):
        self.data = [s0, a0, reward, is_done, s1]

    def __iter__(self):
        return iter(self.data)

    def __str__(self):
        return "s:{0:<3} a:{1:<3} r:{2:<4} is_end:{3:<5} s1:{4:<3}". 
            format(self.data[0], self.data[1], self.data[2], 
                   self.data[3], self.data[4])

    @property
    def s0(self): return self.data[0]

    @property
    def a0(self): return self.data[1]

    @property
    def reward(self): return self.data[2]

    @property
    def is_done(self): return self.data[3]

    @property
    def s1(self): return self.data[4]


class Episode(object):
    def __init__(self, e_id: int = 0) -> None:
        self.total_reward = 0  # 总的获得的奖励
        self.trans_list = []  # 状态转移列表
        self.name = str(e_id)  # 可以给Episode起个名字:"成功闯关,黯然失败?"

    def push(self, trans: Transition) -> float:
        self.trans_list.append(trans)
        self.total_reward  = trans.reward  # 不计衰减的总奖励
        return self.total_reward

    @property
    def len(self):
        return len(self.trans_list)

    def __str__(self):
        return "episode {0:<4} {1:>4} steps,total reward:{2:<8.2f}". 
            format(self.name, self.len, self.total_reward)

    def print_detail(self):
        print("detail of ({0}):".format(self))
        for i, trans in enumerate(self.trans_list):
            print("step{0:<4} ".format(i), end=" ")
            print(trans)

    def pop(self) -> Transition:
        '''normally this method shouldn't be invoked.
        '''
        if self.len > 1:
            trans = self.trans_list.pop()
            self.total_reward -= trans.reward
            return trans
        else:
            return None

    def is_complete(self) -> bool:
        '''check if an episode is an complete episode
        '''
        if self.len == 0:
            return False
        return self.trans_list[self.len - 1].is_done

    def sample(self, batch_size=1):
        '''随即产生一个trans
        '''
        return random.sample(self.trans_list, k=batch_size)

    def __len__(self) -> int:
        return self.len


class Experience(object):
    '''this class is used to record the whole experience of an agent organized
    by an episode list. agent can randomly sample transitions or episodes from
    its experience.
    '''

    def __init__(self, capacity: int = 20000):
        self.capacity = capacity  # 容量:指的是trans总数量
        self.episodes = []  # episode列表
        self.next_id = 0  # 下一个episode的Id
        self.total_trans = 0  # 总的状态转换数量

    def __str__(self):
        return "exp info:{0:5} episodes, memory usage {1}/{2}". 
            format(self.len, self.total_trans, self.capacity)

    def __len__(self):
        return self.len

    @property
    def len(self):
        return len(self.episodes)

    def _remove(self, index=0):
        '''扔掉一个Episode,默认第一个。
           remove an episode, defautly the first one.
           args:
               the index of the episode to remove
           return:
               if exists return the episode else return None
        '''
        if index > self.len - 1:
            raise (Exception("invalid index"))
        if self.len > 0:
            episode = self.episodes[index]
            self.episodes.remove(episode)
            self.total_trans -= episode.len
            return episode
        else:
            return None

    def _remove_first(self):
        self._remove(index=0)

    def push(self, trans):
        '''压入一个状态转换
        '''
        if self.capacity <= 0:
            return
        while self.total_trans >= self.capacity:  # 可能会有空episode吗?
            episode = self._remove_first()
        cur_episode = None
        if self.len == 0 or self.episodes[self.len - 1].is_complete():
            cur_episode = Episode(self.next_id)
            self.next_id  = 1
            self.episodes.append(cur_episode)
        else:
            cur_episode = self.episodes[self.len - 1]
        self.total_trans  = 1
        return cur_episode.push(trans)  # return  total reward of an episode

    def sample(self, batch_size=1):  # sample transition
        '''randomly sample some transitions from agent's experience.abs
        随机获取一定数量的状态转化对象Transition
        args:
            number of transitions need to be sampled
        return:
            list of Transition.
        '''
        sample_trans = []
        for _ in range(batch_size):
            index = int(random.random() * self.len)
            sample_trans  = self.episodes[index].sample()
        return sample_trans

    def sample_episode(self, episode_num=1):  # sample episode
        '''随机获取一定数量完整的Episode
        '''
        return random.sample(self.episodes, k=episode_num)

    @property
    def last_episode(self):
        if self.len > 0:
            return self.episodes[self.len - 1]
        return None

个体基类的编写

  • 我们把重点放在编写一个个体基类 (Agent) 上,为后续实现各种强化学习算法提供一个基础。这个基类符合 gym 库的接口规范,具备个体最基本的功能,同时我们希望个体具有一定容量的记忆功能,能够记住曾经经历过的一些状态序列。我们还希望个体在学习时能够记住一些学习过程,便于分析个体的学习效果等。有了个体基类之后,在讲解具体一个强化学习算法时仅需实现特定的方法就可以了。
  • 在第一章讲解强化学习初步概念的时候,我们对个体类进行了一个初步的建模,这次我们要构建的是符合 gym 借口规范的 Agent 基类,其中一个最基本的要求个体类对象在构造时接受环境对象作为参数,内部也有一个成员变量建立对这个环境对象的引用。在我们设计的个体基类中,其成员变量包括对环境对象的应用、状态和行为空间、与环境交互产生的经历、当前状态等。此外对于个体来说,他还具备的能力有:遵循策略产生一个行为、执行一个行为与环境交互、采用什么学习方法、具体如何学习这几个关键功能,其中最关键的是个体执行行为与环境进行交互的方法。下面的代码实现了我们的需求.
代码语言:javascript复制
class Agent(object):
    '''Base Class of Agent
    '''

    def __init__(self, env: Env = None,
                 capacity=10000):
        # 保存一些Agent可以观测到的环境信息以及已经学到的经验
        self.env = env  # 建立对环境对象的引用
        self.obs_space = env.observation_space if env is not None else None
        self.action_space = env.action_space if env is not None else None
        self.S = [i for i in range(self.obs_space.n)]
        self.A = [i for i in range(self.action_space.n)]
        self.experience = Experience(capacity=capacity)
        # 有一个变量记录agent当前的state相对来说还是比较方便的。要注意对该变量的维护、更新
        self.state = None  # 个体的当前状态

    def policy(self, A, s=None, Q=None, epsilon=None):
        '''均一随机策略
        '''
        return random.sample(self.A, k=1)[0]

    def perform_policy(self, s, Q=None, epsilon=0.05):
        action = self.policy(self.A, s, Q, epsilon)
        return int(action)

    def act(self, a0):
        s0 = self.state
        s1, r1, is_done, info = self.env.step(a0)
        # TODO add extra code here
        trans = Transition(s0, a0, r1, is_done, s1)
        total_reward = self.experience.push(trans)
        self.state = s1
        return s1, r1, is_done, info, total_reward

    def learning_method(self, lambda_=0.9, gamma=0.9, alpha=0.5, epsilon=0.2, display=False):
        '''这是一个没有学习能力的学习方法
        具体针对某算法的学习方法,返回值需是一个二维元组:(一个状态序列的时间步、该状态序列的总奖励)
        '''
        self.state = self.env.reset()
        s0 = self.state
        if display:
            self.env.render()
        a0 = self.perform_policy(s0, epsilon)
        time_in_episode, total_reward = 0, 0
        is_done = False
        while not is_done:
            # add code here
            s1, r1, is_done, info, total_reward = self.act(a0)
            if display:
                self.env.render()
            a1 = self.perform_policy(s1, epsilon)
            # add your extra code here
            s0, a0 = s1, a1
            time_in_episode  = 1
        if display:
            print(self.experience.last_episode)
        return time_in_episode, total_reward

    def learning(self, lambda_=0.9, epsilon=None, decaying_epsilon=True, gamma=0.9,
                 alpha=0.1, max_episode_num=800, display=False):
        total_time, episode_reward, num_episode = 0, 0, 0
        total_times, episode_rewards, num_episodes = [], [], []
        for i in tqdm(range(max_episode_num)):
            if epsilon is None:
                epsilon = 1e-10
            elif decaying_epsilon:
                epsilon = 1.0 / (1   num_episode)
            time_in_episode, episode_reward = self.learning_method(lambda_=lambda_, 
                                                                   gamma=gamma, alpha=alpha, epsilon=epsilon,
                                                                   display=display)
            total_time  = time_in_episode
            num_episode  = 1
            total_times.append(total_time)
            episode_rewards.append(episode_reward)
            num_episodes.append(num_episode)
        # self.experience.last_episode.print_detail()
        return total_times, episode_rewards, num_episodes

    def sample(self, batch_size=64):
        '''随机取样
        '''
        return self.experience.sample(batch_size)

    @property
    def total_trans(self):
        '''得到Experience里记录的总的状态转换数量
        '''
        return self.experience.total_trans

    def last_episode_detail(self):
        self.experience.last_episode.print_detail()
  • 不难看出Agent类的策略是最原始的均一随机策略,不具备学习能力.不过它已经具备了同gym环境进行交互的能力了.由于该个体不具备学习能力,可以编写如下的代码来观察均一策略下个体在有风格子世界里的交互情况:
代码语言:javascript复制
import gym
from gym import Env
from gridworld import WindyGridWorld#导入有风格子世界环境
from core import Agent#导入个体基类
import time

env = WindyGridWorld()#生成有风格子世界环境对象
env.reset()#重置环境对象
env.render()#显示环境对象可视化界面

agent = Agent(env, capacity=10000)#创建Agent对象
data = agent.learning(max_episode_num=180, display=True)
time.sleep(10)
env.close()
  • 运行上述代码将显示下面这个界面.图中多数格子用粉红色绘制,表示个体在离开该格子时将获得 -1 的即时奖励,白色的格子对应的即时奖励为 0。有蓝色边框的格子是个体的起始状态,金黄色边框对应的格子是终止状态。个体在格子世界中用黄色小圆形来表示。风的效果并未反映在可视化界面上,它将实实在在的影响个体采取一个行为后的后续状态 (位置)。
  • 由于可视化交互在进行多次尝试时浪费计算资源,我们在随后进行 180 次的尝试期间选择不显示个体的动态活动。其 180 次的交互信息存储在对象 data 内。图 5.9 是依据 data 绘制的该个体与环境交互产生的的状态序列时间步数与状态序列次数的关系图,可以看出个体最多曾用了三万多步才完成一个完整的状态序列.
  • 接下来我们就可以在Agent基类的基础上分别建立具有Sarsa学习,Sarsa(
lambda

)和Q学习能力的三个个体子类,我们只要分别实现其策略方法以及学习方法就可以了.

  • 对于这三类学习算法要用到贪婪策略或 ϵ-贪婪策略,由于我们计划使用字典来存储行为价值函数数据,还会用到之前编写的根据状态生成键以及读取字典的方法,本节中这些方法都被放在一个名为 utls.py 的文件中,有风格子世界环境类在文件 gridworld.py 文件中实现,个体基类的实现代码存放在 core.py 中。将这三个文件放在当前工作目录下。下面的代码将从这些文件中导入要使用的类和方法.
代码语言:javascript复制
from random import random,choice
from core import Agent
from gym import Env
import gym
from gridworld import WindyGridWorld,SimpleGridWorld
from utils import str_key,set_dict,get_dict
from utils import epsilon_greedy_policy,epsilon_greedy_pi
from utils import greedy_policy,learning_curve

Sarsa算法

  • 根据Sarsa算法的流程,我们不难得到:
代码语言:javascript复制
class SarsaAgent(Agent):
    def __init__(self, env:Env, capacity:int = 20000):
        super(SarsaAgent, self).__init__(env,capacity)
        self.Q = {}#增加Q字典存储行为价值

    def policy(self, A, s=None, Q=None, epsilon=None):
        '''
        使用epsilon-贪婪策略
        '''
        return epsilon_greedy_policy(A, s, Q, epsilon)

    def learning_method(self, gamma =0.9, alpha=0.1, epsilon=1e-5, display=False, lambda_ = None):
        self.state = self.env.reset()
        s0 = self.state
        if display:
            self.env.render()
        a0 = self.perform_policy(s0, self.Q, epsilon)
        time_in_episode, total_reward = 0, 0
        is_done = False
        while not is_done:
            s1, r1, is_done, info, total_reward = self.act(a0)
            if display:
                self.env.render()
            a1 = self.perform_policy(s1, self.Q, epsilon)
            old_q = get_dict(self.Q, s0, a0)
            q_prime = get_dict(self.Q, s1, a1)
            td_target = r1   gamma * q_prime
            new_q = old_q   alpha * (td_target - old_q)
            set_dict(self.Q, new_q, s0, a0)
            s0, a0 = s1, a1
            time_in_episode  = 1
        if display:
            print(self.experience.last_episode)
        return time_in_episode, total_reward

Sarsa(

lambda

)

代码语言:javascript复制
class SarsaLambdaAgent(Agent):
    def __init__(self, env:Env, capacity:int = 20000):
        super(SarsaLambdaAgent,self).__init__(env,capacity)
        self.Q = {}

    def policy(self, A, s=None, Q=None, epsilon=None):
        return epsilon_greedy_policy(A, s, Q, epsilon)

    def learning_method(self, lambda_=0.9, gamma=0.9, alpha=0.1, epsilon=1e-5, display=False):
        self.state = self.env.reset()
        s0 = self.state
        if display:
            self.env.reset()
        a0 = self.perform_policy(s0, self.Q, epsilon)
        time_in_episode, total_reward = 0,0
        is_done = False
        E = {}#效用值
        while not is_done:
            s1, r1, is_done, info, total_reward = self.act(a0)
            if display:
                self.env.render()
            a1 = self.perform_policy(s1, self.Q, epsilon)
            q = get_dict(self.Q, s0, a0)
            q_prime = get_dict(self.Q, s1, a1)
            delta = r1   gamma * q_prime -q
            e = get_dict(E, s0, a0)
            e = 1
            set_dict(E, e, s0, a0)

            for s in self.S:#对所有可能的Q(s,a)进行更新
                for a in self.A:
                    e_value = get_dict(E, s, a)
                    old_q = get_dict(self.Q, s, a)
                    new_q = old_q   alpha * delta * e_value
                    new_e = gamma * lambda_ * e_value
                    set_dict(self.Q, new_q, s, a)
                    set_dict(E, new_e, s, a)
            s0, a0 = s1, a1
            time_in_episode  = 1
        if display:
            print(self.experience.last_episode)
        return time_in_episode,total_reward

Q学习算法

代码语言:javascript复制
class QAgent(Agent):
    def __init__(self, env:Env, capacity:int = 20000):
        super(QAgent, self).__init__(env, capacity)
        self.Q = {}

    def policy(self, A, s=None, Q=None, epsilon=None):
        return epsilon_greedy_policy(A, s, Q, epsilon)

    def learning_method(self,gamma=0.9, alpha=0.1, epsilon=1e-5, display=False, lambda_ = None):
        self.state = self.env.reset()
        s0 = self.state
        if display:
            self.env.render()
        time_in_episode, total_reward = 0,0
        is_done = False
        while not is_done:
            self.policy = epsilon_greedy_policy#行为策略
            a0 = self.perform_policy(s0, self.Q, epsilon)
            s1, r1, is_done, info, total_reward = self.act(a0)
            if display:
                self.env.render()
            self.policy = greedy_policy
            a1 = greedy_policy(self.A, s1, self.Q)#借鉴策略

            old_q = get_dict(self.Q, s0, a0)
            q_prime = get_dict(self.Q, s1, a1)
            td_target = r1   gamma * q_prime
            new_q = old_q   alpha * (td_target - old_q)
            set_dict(self.Q, new_q, s0, a0)

            s0 = s1
            time_in_episode  = 1
        if display:
            print(self.experience.last_episode)
        return time_in_episode,total_reward
  • 我们可借鉴 Agent 基类与环境交互的代码来实现拥有各种不同学习能力的子类与有风格子世界进行交互的代码,体会三种学习算法的区别。以 Sarsa(λ) 算法为例,下面的代码实现其与有风格子世界环境的交互.
代码语言:javascript复制
env = WindyGridWorld()
agent = SarsaLambdaAgent(env, capacity = 10000)
statistics = agent.learning(gamma = 1.0,
                            epsilon = 1,
                            decaying_epsilon = True,
                            alpha = 0.5,
                            max_episode_num = 800,
                            display = False)
  • 如果对个体可视化感兴趣,可以将learning方法内的参数display设置为True.下面的代码可视化展示个体两次完整的交互经历
代码语言:javascript复制
agent.learning_method(epsilon = 0.01, display = True)
  • 需要指出的这三种学习算法在完成第一个完整状态序列时都可能会花费较长的时间步数,特别是对于 Sarsa(λ) 算法来说,由于在每一个时间步都要做大量的计算工作,因而花费的计算资源更多,该算法的优势是在线实时学习。

参考

  • David silver的课程https://www.bilibili.com/video/BV1kb411i7KG?p=1
  • 主要来自叶强的笔记:https://www.zhihu.com/column/reinforce

0 人点赞