【二】gym初次入门一学就会---代码详细解析简明教程----平衡杆案例

2022-12-01 16:48:27 浏览数 (1)

相关文章:

【一】gym环境安装以及安装遇到的错误解决

【二】gym初次入门一学就会-简明教程

【三】gym简单画图

【四】gym搭建自己的环境,全网最详细版本,3分钟你就学会了!

【五】gym搭建自己的环境____详细定义自己myenv.py文件

【六】gym搭建自己环境升级版设计,动态障碍------强化学习


gym简明教程

创建CartPole-v0的环境.

代码语言:javascript复制
import gym
env = gym.make('CartPole-v0')
env.reset()
for i in range(1000):
    env.render()
    env.step(env.action_space.sample()) # take a random action
env.close()

代码含义:

  • reset(self):重置环境的状态,返回观察。
  • step(self, action):推进一个时间步长,返回observation, reward, done, info。
  • render(self, mode=‘human’, close=False):重绘环境的一帧。默认模式一般比较友好,如弹出一个窗口。
  • close(self):关闭环境,并清除内存。

注释:导入gym库,第2行创建CartPole-v0环境,并在第3行重置环境状态。在for循环中进行1000个时间步长(timestep)的控制,第5行刷新每个时间步长环境画面,第6行对当前环境状态采取一个随机动作(0或1),最后第7行循环结束后关闭仿真环境。

同时本地会渲染出一个窗口进行模拟如下图:

关于Space的说明 在上面的代码中, 我们可以看到我们每一次的action都是随机进行取值的. 事实上, 每一个环境都有action_space和observation_space.(Every environment comes with an action_space and an observation_space) 以CartPole-v0来作为例子. 首先我们来看action_spaces, 这个代表可以采取的action的种类, 在CartPole-v0的例子中, 可以采取的action的种类只有两种. 我们看一下下面的示例.

代码语言:javascript复制
 import gym
env = gym.make('CartPole-v0')
print(env.action_space)
#> Discrete(2)
print(env.observation_space)
#> Box(4,)

  • action_space 是一个离散Discrete类型,从discrete.py源码可知,范围是一个{0,1,…,n-1} 长度为 n 的非负整数集合,在CartPole-v0例子中,动作空间表示为{0,1}。

对于observation_space. 则查看这个space的shape四个边界的上界和下界(能取到的最大值和最小值)

代码语言:javascript复制
print(env.observation_space.high)
print(env.observation_space.low)
[4.8000002e 00 3.4028235e 38 4.1887903e-01 3.4028235e 38]
[-4.8000002e 00 -3.4028235e 38 -4.1887903e-01 -3.4028235e 38]

observation_space 是一个Box类型,从box.py源码可知,表示一个 n 维的盒子,所以在上一节打印出来的observation是一个长度为 4 的数组。数组中的每个元素都具有上下界。

利用运动空间和观测空间的定义和范围,在许多仿真环境中,BoxDiscrete是最常见的空间描述,在智体每次执行动作时,都属于这些空间范围内,代码示例为:

代码语言:javascript复制
from gym import spaces
space = spaces.Discrete(6) 
# Set with 6 elements {0, 1, 2, ..., 6}
x = space.sample()
print(space.contains(x)) 
print(space.n == 6)
True
True

CartPole-v0栗子中,运动只能选择左和右,分别用{0,1}表示

对于step的详细说明 上面我们只是每次做随机的action, 为了更好的进行action, 我们需要知道每一步step之后的返回值. 事实上, step会返回四个值. 下面我们一一进行介绍.

  • 观测 Observation (Object):当前step执行后,环境的观测(类型为对象)。例如,从相机获取的像素点,机器人各个关节的角度或棋盘游戏当前的状态等;
  • 奖励 Reward (Float): 执行上一步动作(action)后,智能体( agent)获得的奖励(浮点类型),不同的环境中奖励值变化范围也不相同,但是强化学习的目标就是使得总奖励值最大;
  • 完成 Done (Boolen): 表示是否需要将环境重置 env.reset。大多数情况下,当 Done 为True 时,就表明当前回合(episode)或者试验(tial)结束。例如当机器人摔倒或者掉出台面,就应当终止当前回合进行重置(reset);
  • 信息 Info (Dict): 针对调试过程的诊断信息。在标准的智体仿真评估当中不会使用到这个info,

在 Gym 仿真中,每一次回合开始,需要先执行 reset() 函数,返回初始观测信息,然后根据标志位 done 的状态,来决定是否进行下一次回合。所以更恰当的方法是遵守done的标志.

代码语言:javascript复制
import gym
env = gym.make('CartPole-v0')
for i_episode in range(20):
    observation = env.reset()
    for t in range(100):
        env.render()
        print(observation)
        action = env.action_space.sample()
        observation, reward, done, info = env.step(action)
        if done:
            print("Episode finished after {} timesteps".format(t 1))
            break
env.close()

done 为true时,控制失败,此阶段episode 结束。可以计算每 episode 的回报就是其坚持的t 1时间,坚持的越久回报越大.在上面算法中,agent 的行为选择是随机的,平均回报为20左右。

*再次说明gym模块中环境的常用函数 gym的初始化

代码语言:javascript复制
env = gym.make('CartPole-v0')   
# 定义使用gym库中的某一个环境,'CartPole-v0'可以改为其它环境
env = env.unwrapped             
# unwrapped是打开限制的意思

gym的各个参数的获取

代码语言:javascript复制
env.action_space   		
# 查看这个环境中可用的action有多少个,返回Discrete()格式
env.observation_space   
# 查看这个环境中observation的特征,返回Box()格式
n_actions=env.action_space.n 
# 查看这个环境中可用的action有多少个,返回int
n_features=env.observation_space.shape[0] 
# 查看这个环境中observation的特征有多少个,返回int

刷新环境

代码语言:javascript复制
env.reset()
# 用于一个done后环境的重启,获取回合的第一个observation
env.render()	
# 用于每一步后刷新环境状态
observation_, reward, done, info = env.step(action)
# 获取下一步的环境、得分、检测是否完成。

实例应用 平衡杆测试代码:以AC算法为例,详细解析看下面链接分析。

代码语言:javascript复制
  
import numpy as np
import tensorflow as tf
import gym
 
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
 
tf.compat.v1.disable_eager_execution()  #这句话可有可无
 
np.random.seed(2)
tf.set_random_seed(2)  # reproducible
 
# Superparameters
OUTPUT_GRAPH = False
MAX_EPISODE = 3000
DISPLAY_REWARD_THRESHOLD = 200  # renders environment if total episode reward is greater then this threshold
MAX_EP_STEPS = 1000   # maximum time step in one episode
RENDER = False  # rendering wastes time
GAMMA = 0.9     # reward discount in TD error
LR_A = 0.001    # learning rate for actor
LR_C = 0.01     # learning rate for critic
 
env = gym.make('CartPole-v0')
env.seed(1)  # reproducible
env = env.unwrapped
 
N_F = env.observation_space.shape[0]
N_A = env.action_space.n
 
 
class Actor(object):
    def __init__(self, sess, n_features, n_actions, lr=0.001):
        self.sess = sess
 
        self.s = tf.placeholder(tf.float32, [1, n_features], "state")
        self.a = tf.placeholder(tf.int32, None, "act")
        self.td_error = tf.placeholder(tf.float32, None, "td_error")  # TD_error
 
        with tf.variable_scope('Actor'):
            l1 = tf.layers.dense(
                inputs=self.s,
                units=20,    # number of hidden units
                activation=tf.nn.relu,
                kernel_initializer=tf.random_normal_initializer(0., .1),    # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='l1'
            )
 
            self.acts_prob = tf.layers.dense(
                inputs=l1,
                units=n_actions,    # output units
                activation=tf.nn.softmax,   # get action probabilities
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='acts_prob'
            )
 
        with tf.variable_scope('exp_v'):
            log_prob = tf.log(self.acts_prob[0, self.a])
            self.exp_v = tf.reduce_mean(log_prob * self.td_error)  # advantage (TD_error) guided loss
 
        with tf.variable_scope('train'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(-self.exp_v)  # minimize(-exp_v) = maximize(exp_v)
 
    def learn(self, s, a, td):
        s = s[np.newaxis, :]
 
        feed_dict = {self.s: s, self.a: a, self.td_error: td}
 
        _, exp_v = self.sess.run([self.train_op, self.exp_v], feed_dict)
 
        return exp_v
 
    def choose_action(self, s):
        s = s[np.newaxis, :]
        probs = self.sess.run(self.acts_prob, {self.s: s})   # get probabilities for all actions
        return np.random.choice(np.arange(probs.shape[1]), p=probs.ravel())   # return a int
 
 
class Critic(object):
    def __init__(self, sess, n_features, lr=0.01):
        self.sess = sess
 
        self.s = tf.placeholder(tf.float32, [1, n_features], "state")
        self.v_ = tf.placeholder(tf.float32, [1, 1], "v_next")
        self.r = tf.placeholder(tf.float32, None, 'r')
 
        with tf.variable_scope('Critic'):
            l1 = tf.layers.dense(
                inputs=self.s,
                units=20,  # number of hidden units
                activation=tf.nn.relu,  # None
                # have to be linear to make sure the convergence of actor.
                # But linear approximator seems hardly learns the correct Q.
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='l1'
            )
 
            self.v = tf.layers.dense(
                inputs=l1,
                units=1,  # output units
                activation=None,
                kernel_initializer=tf.random_normal_initializer(0., .1),  # weights
                bias_initializer=tf.constant_initializer(0.1),  # biases
                name='V'
            )
 
        with tf.variable_scope('squared_TD_error'):
            self.td_error = self.r   GAMMA * self.v_ - self.v
            self.loss = tf.square(self.td_error)    # TD_error = (r gamma*V_next) - V_eval
        with tf.variable_scope('train'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)
 
    def learn(self, s, r, s_):
        s, s_ = s[np.newaxis, :], s_[np.newaxis, :]
 
        v_ = self.sess.run(self.v, {self.s: s_})
        td_error, _ = self.sess.run([self.td_error, self.train_op],
                                          {self.s: s, self.v_: v_, self.r: r})
        return td_error
 
 
sess = tf.Session()
 
actor = Actor(sess, n_features=N_F, n_actions=N_A, lr=LR_A)
critic = Critic(sess, n_features=N_F, lr=LR_C)     # we need a good teacher, so the teacher should learn faster than the actor
 
sess.run(tf.global_variables_initializer())
 
if OUTPUT_GRAPH:
    tf.summary.FileWriter("logs/", sess.graph)
 
for i_episode in range(MAX_EPISODE):
    s = env.reset()
    t = 0
    track_r = []
    while True:
        if RENDER: env.render()
 
        a = actor.choose_action(s)
 
        s_, r, done, info = env.step(a)
 
        if done: r = -20
 
        track_r.append(r)
 
        td_error = critic.learn(s, r, s_)  # gradient = grad[r   gamma * V(s_) - V(s)]
        actor.learn(s, a, td_error)     # true_gradient = grad[logPi(s,a) * td_error]
 
        s = s_
        t  = 1
 
        if done or t >= MAX_EP_STEPS:
            ep_rs_sum = sum(track_r)
 
            if 'running_reward' not in globals():
                running_reward = ep_rs_sum
            else:
                running_reward = running_reward * 0.95   ep_rs_sum * 0.05
            if running_reward > DISPLAY_REWARD_THRESHOLD: RENDER = True  # rendering
            print("episode:", i_episode, "  reward:", int(running_reward))
            break
 

更多实例教程可以参考我下面的文章在本地或者在parl中制作自己的游戏环境:

PaddlePaddlle强化学习及PARL框架{飞桨}

【一】-环境配置 python入门教学

【二】-Parl基础命令

【三】-Notebook、&pdb、ipdb 调试

【四】-强化学习入门简介

【五】-Sarsa&Qlearing详细讲解

【六】-DQN

【七】-Policy Gradient

【八】-DDPG

【九】-四轴飞行器仿真

都有详细原理分析和码源解释的。

0 人点赞