Deep Reinforcement learning – 2. 基于tensorflow的DDPG实现

2022-09-13 08:20:51 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

Deep Reinforcemen learning – 2. 基于tensorflow的DDPG实现

基于我上一篇博客的算法介绍, 使用tensorflow的代码实现,仿真环境使用gym torcs 为了快速训练出结果,我没有使用driver view图像作为输入,而是使用low dimension传感器数据作为输入, 总共29个数据,包括: – 赛车速度: speedX, speedY, speedZ. – 赛车在跑道中的位置 – 19个range finder的探测数据:车身与跑道边缘的距离 – 发动机转速 – 车轮速度

输出action有三个维度: – steer: 方向, 取值范围 [-1,1] – accel: 油门,取值范围 [0,1] – brake: 刹车,取值范围 [0,1]

训练1M steps的视频链接:ddpg视频

完整代码的github链接:https://github.com/kennethyu2017/ddpg

下面分模块讲解:

代码框架

再回顾一下ddpg算法的流程图:


actor

actor包含online policy和 target policy 两张神经网络, 其结构是一样的,由于使用low dimension 的数据输入,我没有使用卷积层, policy网络架构如下 ,取自tensorboard生成的 computation graph:

包括: – BatchNorm: input batch norm layer – fully_connected,fully_connected_1: 2个hidden layers – fully_connected_2: output layer

为了限定policy网络的输出action范围,使用tanh对steer,sigmoid对accelerate和brake,作为bound函数,进行范围限定:

Actor class的主要代码如下: 为了在调试时可以灵活的修改模型架构,policy 网络的模型架构、配置、参数,全部通过在实例化Actor时指定,包括: 对输入数据做归一化的batch norm层、全连接fully-connected层、输出层、 输出bound 函数等。

代码语言:javascript复制
import tensorflow as tf
from tensorflow.contrib.layers import fully_connected,batch_norm
from common.common import soft_update_online_to_target, copy_online_to_target
DDPG_CFG = tf.app.flags.FLAGS  # alias

class Actor(object):
  def __init__(self, action_dim, online_state_inputs, target_state_inputs,input_normalizer, input_norm_params, n_fc_units, fc_activations, fc_initializers, fc_normalizers, fc_norm_params, fc_regularizers, output_layer_initializer, output_layer_regularizer, output_normalizers, output_norm_params,output_bound_fns, learning_rate, is_training):
    self.a_dim = action_dim
    self.learning_rate = learning_rate
    ...

使用Adam作为gradient descent的算法:

代码语言:javascript复制
    self.optimizer = tf.train.AdamOptimizer(learning_rate=self.learning_rate)  # use beta1, beta2 default.

创建online policy网络:

代码语言:javascript复制
    self._online_action_outputs = self.create_policy_net(scope=DDPG_CFG.online_policy_net_var_scope,
                                                        state_inputs=self.online_state_inputs,
                                                        trainable=True)

    self.online_policy_net_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES,
                                                    scope=DDPG_CFG.online_policy_net_var_scope)
    self.online_policy_net_vars_by_name = {var.name.strip(DDPG_CFG.online_policy_net_var_scope):var
                                           for var in self.online_policy_net_vars}

创建target policy 网络,由于我们采用soft update的方法更新target 网络的参数,所以在创建target网络时指定其参数trainable为False。

代码语言:javascript复制
    self._target_action_outputs = self.create_policy_net(scope=DDPG_CFG.target_policy_net_var_scope,
                                                        state_inputs=self.target_state_inputs,
                                                        trainable=False)
    self.target_policy_net_vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,
                                                    scope=DDPG_CFG.target_policy_net_var_scope)

    self.target_policy_net_vars_by_name = {var.name.strip(DDPG_CFG.target_policy_net_var_scope):var
                                           for var in self.target_policy_net_vars}

策略网络的创建函数 create_policy_net ,根据网络架构参数创建各个layer:

代码语言:javascript复制
  def create_policy_net(self, state_inputs, scope, trainable):
    with tf.variable_scope(scope):
      #input norm layer
      prev_layer = self.input_normalizer(state_inputs, **self.input_norm_params)

      ##fc layers
      for n_unit, activation, initializer, normalizer, norm_param, regularizer in zip(
          self.n_fc_units, self.fc_activations, self.fc_initializers,
        self.fc_normalizers,self.fc_norm_params, self.fc_regularizers):
        prev_layer = fully_connected(prev_layer, num_outputs=n_unit, activation_fn=activation,
                                     weights_initializer=initializer,
                                     weights_regularizer=regularizer,
                                     normalizer_fn=normalizer,
                                     normalizer_params=norm_param,
                                     biases_initializer=None, #skip bias when use norm.
                                     trainable=trainable)

      ##output layer
      output_layer = fully_connected(prev_layer, num_outputs=self.a_dim, activation_fn=None,
                                     weights_initializer=self.output_layer_initializer,
                                     weights_regularizer=self.output_layer_regularizer,
                                     normalizer_fn=self.output_normalizers,
                                     normalizer_params=self.output_norm_params,
                                     biases_initializer=None, # to skip bias
                                     trainable=trainable)
      ## bound and scale each action dim
      action_unpacked = tf.unstack(output_layer, axis=1)
      action_bounded = []
      for i in range(self.a_dim):
        action_bounded.append(self.output_bound_fns[i](action_unpacked[i]))
      action_outputs = tf.stack(action_bounded, axis=1)

    return action_outputs

policy网络的输出tensor,即action:

代码语言:javascript复制
  # of online net
  @property
  def online_action_outputs_tensor(self):
    return self._online_action_outputs

  # of target net
  @property
  def target_action_outputs_tensor(self):
    return self._target_action_outputs

定义计算online policy gradient的函数,由于我们只训练online 网络, 所以不用计算target网络的gradient :

代码语言:javascript复制
  def compute_online_policy_net_gradients(self, policy_loss):
    grads_and_vars = self.optimizer.compute_gradients(
      policy_loss,var_list=self.online_policy_net_vars)
    grads = [g for (g, _) in grads_and_vars if g is not None]
    compute_op = tf.group(*grads)

    return (grads_and_vars, compute_op)

定义函数,通过Adam optimize将计算好的gradient用于更新online policy网络的参数:

代码语言:javascript复制
  def apply_online_policy_net_gradients(self, grads_and_vars):
    vars_with_grad = [v for g, v in grads_and_vars if g is not None]
    if not vars_with_grad:
      raise ValueError(
        "$$ ddpg $$ policy net $$ No gradients provided for any variable, check your graph for ops"
        " that do not support gradients,variables %s." %
        ([str(v) for _, v in grads_and_vars]))
    return self.optimizer.apply_gradients(grads_and_vars)

从功能上来讲,上面这两步可以直接合并为使用 optimizer.minimize()函数,我是为了在调试过程中观察gradient的变化情况,所以拆开来实现, 这样可以在调试时取得gradient的tensor,观察其是否收敛。

定义函数,将online policy网络的参数在初始化时copy到target policy 网络,以及在训练时soft update到 target policy网络, 函数返回的是具体操作的op:

代码语言:javascript复制
  def soft_update_online_to_target(self):
    return soft_update_online_to_target(self.online_q_net_vars_by_name,
                                               self.target_q_net_vars_by_name)

  def copy_online_to_target(self):
    return copy_online_to_target(self.online_q_net_vars_by_name,
                                        self.target_q_net_vars_by_name)

critic

critic包含online q和 target q 两张神经网络, 其结构也是一样的,q网络结构如下,取自tensorboard生成的computation graph:

包括: – BatchNorm: input batch norm layer – fully_connected,fully_connected_1: 2个hidden layers – fully_connected_2: output layer

有两点需要注意: 1、由于q网络需要以action作为输入,且在第二层fully_connected_1 中(代码中通过DDPG_CFG.include_action_fc_layer配置参数指定) ,通过concat包含action输入; 2、对照上面的ddpg流程图,在训练q 网络时,q网络的action输入是采样自replay buffer的action batch, 而在训练 policy网络时, q网络的action输入是 μ(si) mu(s_{i}) , 即policy网络的action输出。我们为q 网络增加一个switch-merge的 控制逻辑–对应上图右下角部分–来同时接入上述两种action输入,并通过cond_training_q tensor控制接通哪一路action输入, 这样我们在训练q网络时,就feed cond_training_q tensor的值为True, 在训练policy网络时,feed cond_training_q tensor的值为False。

Critic class 主要代码如下:同Actor一样,为了在调试时可以灵活的修改模型架构,policy 网络的模型架构、配置、参数,全部通过在实例化Critic 时指定:

代码语言:javascript复制
class Critic(object):

  def __init__(self, online_state_inputs, target_state_inputs,input_normalizer,input_norm_params, online_action_inputs_training_q, online_action_inputs_training_policy, cond_training_q, target_action_inputs, n_fc_units, fc_activations, fc_initializers, fc_normalizers,fc_norm_params,fc_regularizers, output_layer_initializer,output_layer_regularizer,learning_rate):

    self.online_state_inputs = online_state_inputs
    self.target_state_inputs = target_state_inputs
    ...

同Actor一样 ,也使用Adam作为gradient descent的算法;接着创建online q 和 target q 网络, 其创建函数如下:

代码语言:javascript复制
  def create_q_net(self, state_inputs, # NHWC format. action_inputs_training_q, scope, trainable, action_inputs_training_policy=None, # None for target net. cond_training_q=None # bool to control switch. can be None for target net. ):
    with tf.variable_scope(scope):
      #input norm layer
      prev_layer=self.input_normalizer(state_inputs,**self.input_norm_params)

      ##fc layers
      l = 1  # start from fc-1 as 1
      for n_unit, activation, initializer, normalizer, norm_param,regularizer in zip(
              self.n_fc_units, self.fc_activations, self.fc_initializers,
              self.fc_normalizers, self.fc_norm_params, self.fc_regularizers):
        # include action_inputs
        if l == DDPG_CFG.include_action_fc_layer:
          if action_inputs_training_policy is None:  # target net
            actions = action_inputs_training_q
          else:  # add logic for selecting online net action inputs
            # switch return :(output_false, output_true)
            (_, sw_action_training_q) = switch(data=action_inputs_training_q,
                                                                   pred=cond_training_q,
                                                                   name='switch_actions_training_q')
            (sw_action_training_policy, _) = switch(data=action_inputs_training_policy,
                                                                        pred=cond_training_q,
                                                                        name='switch_actions_training_policy')
            (actions, _) = merge([sw_action_training_q, sw_action_training_policy])

          prev_layer = tf.concat([prev_layer, actions], axis=1)
        l  = 1
        prev_layer = fully_connected(prev_layer, num_outputs=n_unit, activation_fn=activation,
                                     weights_initializer=initializer,
                                     weights_regularizer=regularizer,
                                     normalizer_fn=normalizer, #when specify norm , bias will be ignored.
                                     normalizer_params=norm_param,
                                     trainable=trainable)

      # end fc layers

      ##output layer. fully_connected will create bias which is not wanted in output layer.
      output_layer = fully_connected(inputs=prev_layer,num_outputs=1,
                                     activation_fn=None,
                                     weights_initializer=self.output_layer_initializer,
                                     weights_regularizer=self.output_layer_regularizer,
                                     biases_initializer=None, # to skip bias in output layer
                                    trainable=trainable)

    # == == end with variable_scope() ==
    return output_layer

接着定义gradient的计算、应用函数,以及target q 网络soft update的函数,同Actor类似,不再赘述。


构建总体computation graph

我们知道,tensorflow是基于computation graph进行计算,当我们有了policy 网络、q 网络之后,就可以构架一张大的graph, 把 他们连接起来,构成一个完整的ddpg graph, 进行训练,如下图所示,取自tensorboard的graph显示:

可以看到online policy、online q网络,policy lossq_loss, 为back propagation添加的gradientsgradients_1 模块, target policy target q网络。

定义构建函数,入参包括actor实例 、 critic实例;两个placeholder:reward_inputs 和 terminated_inputs:在session run时, 通过feed将raplay buffer中采样的reward和terminated batch输入给训练网络;返回训练ddpg时需要运行的op:

代码语言:javascript复制
def build_ddpg_graph(actor, critic, reward_inputs, terminated_inputs,global_step_tensor):
  #定义label。 对于episode 结束的Transition数据,我们不计算q值, 所以对q值乘上(1.0 - terminated_inputs)
  y_i = reward_inputs   (1.0 - terminated_inputs) * DDPG_CFG.gamma * critic.target_q_outputs_tensor

  #定义q 网络的loss。 对q网络,我们一般采用l2 reg.
  q_reg_loss = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES,scope=DDPG_CFG.online_q_net_var_scope)
  q_loss = tf.add_n([tf.losses.mean_squared_error(labels=y_i, predictions=critic.online_q_outputs_tensor)]   q_reg_loss,
                    name='q_loss')

  #定义policy 网络的loss。 policy网络可以不用reg。
  policy_reg_loss = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES, scope=DDPG_CFG.online_policy_net_var_scope)
  policy_loss =tf.add_n([ -1.0 * tf.reduce_mean(critic.online_q_outputs_tensor)]   policy_reg_loss,
                        name='policy_loss')

  # 定义计算和应用online policy 网络gradient的op, 我们在训练时,运行该op即实现了对policy网络的训练。
  actor_g_and_v, actor_compute_grads_op = actor.compute_online_policy_net_gradients(policy_loss=policy_loss)
  actor_apply_grads_op = actor.apply_online_policy_net_gradients(grads_and_vars=actor_g_and_v)
  train_online_policy_op = actor_apply_grads_op

  # 定义计算和应用online q网络gradient的op, 运行该op即实现了对q网络的训练。
  critic_g_and_v, critic_compute_grads_op = critic.compute_online_q_net_gradients(q_loss=q_loss)
  critic_apply_grads_op = critic.apply_online_q_net_gradients(grads_and_vars=critic_g_and_v)
  train_online_q_op = critic_apply_grads_op

  #定义soft update 的op
  actor_update_target_op = actor.soft_update_online_to_target()
  critic_update_target_op = critic.soft_update_online_to_target()

  #创建control flow的依赖关系
  with tf.control_dependencies([actor_update_target_op, critic_update_target_op]):
    update_target_op = tf.assign_add(global_step_tensor, 1).op  # increment global step

  #copy online -> target,在初始化后进行。
  actor_init_target_op = actor.copy_online_to_target()
  critic_init_target_op = critic.copy_online_to_target()
  copy_online_to_target_op = tf.group(actor_init_target_op, critic_init_target_op)

  # 创建model saver
  saver = tf.train.Saver(keep_checkpoint_every_n_hours=0.5, max_to_keep=5)

  return (copy_online_to_target_op, train_online_policy_op, train_online_q_op, update_target_op, saver)

replay buffer

replay buffer存储我们收集到的transition数据样本,可以通过collections 包中的deque实现。为了反复训练,我增加了将buffer存盘的功能,可以在 下次训练时单独加载buffer文件,不用再一步步执行torcs环境,提高训练效率。

代码语言:javascript复制
class ReplayBuffer(object):
  def __init__(self, buffer_size, seed, save_segment_size=None, save_path=None):
    #The right side of the deque contains the most recent experiences
    self.buffer_size = buffer_size
    self.buffer = deque([], maxlen=buffer_size)
    if seed is not None:
      np.random.seed(seed)
    self.save=False
    if save_segment_size is not None:
      assert save_path is not None
      self.save = True
      self.save_segment_size = save_segment_size
      self.save_path = save_path
      self.save_data_cnt=0
      self.save_segment_cnt=0

存储transition数据,如果最近新增加的数据达到save_segment_size,就存盘:

代码语言:javascript复制
  def store(self, transition):
    #deque can take care of max len.
    T = copy.deepcopy(transition)
    self.buffer.append(T)
    if self.save:
      self.save_data_cnt =1
      if self.save_data_cnt >= self.save_segment_size:
        self.save_segment()
        self.save_data_cnt=0
    del transition

采样mini batch,使用uniform随机采样策略,注意一些边界条件:

代码语言:javascript复制
  def sample_batch(self, batch_size):
    indices = np.random.permutation(self.length - 1)[:batch_size]
    state_batch, action_batch, reward_batch, next_state_batch, terminated_batch = [], [], [], [], []
    for idx in indices:
      trans_1 = self.buffer[idx]
      if trans_1.terminated is not True:
        # the trans_2 : (a_2, r_2, term_2, s_3)
        trans_2 = self.buffer[idx   1]
        # we use the data (s_2, a_2, r_2, term_2, s_3)
        state_batch.append(trans_1.next_state)
        action_batch.append(trans_2.action)
        reward_batch.append(trans_2.reward)
        next_state_batch.append(trans_2.next_state)
        terminated_batch.append(trans_2.terminated)
      else:
        ##term_1 is true, so buffer[idx 1] is beginning of new episode,
        # we traverse back.
        if idx != 0:
          trans_0 = self.buffer[idx - 1]  # a_0, r_0, s_1, term_0 = self.buffer[idx - 1]
          if trans_0.terminated is True:  # give up
            continue
          # we use the data (s_1, a_1, r_1, term_1, s_2)
          # s_2 is not used to calc Q cause its terminated. but we still use
          # it to FF through mu_prime/Q_prime then Q*0. guarantee s_2 is accurate formatted.
          state_batch.append(trans_0.next_state)
          action_batch.append(trans_1.action)
          reward_batch.append(trans_1.reward)
          next_state_batch.append(trans_1.next_state)
          terminated_batch.append(trans_1.terminated)
        else:
          # head of buffer, we dont know the previous state , so give up.
          continue

    return (np.array(state_batch), np.array(action_batch), np.array(reward_batch), np.array(next_state_batch),np.array(terminated_batch))

接着定义buffer文件的存储和加载函数,使用pickle包来做deque的序列化存储:

代码语言:javascript复制
  def save_segment(self):
    self.save_segment_cnt =1

    data = []
    start = self.length - self.save_segment_size  #always save latest data of segment_size
    end = self.length

    for idx in range(start, end):
      data.append(self.buffer[idx])

    if not os.path.exists(self.save_path):
      os.makedirs(self.save_path)

    abs_file_name = os.path.abspath(os.path.join(self.save_path,
                            '_'.join([DDPG_CFG.replay_buffer_file_name,str(self.save_segment_cnt),time.ctime()])))

    with open(abs_file_name,'wb') as f:
      pkl.dump(data, f)


  def load(self, path):
    #load from file to buffer
    abs_file_pattern = os.path.abspath(os.path.join(path,
                            '_'.join([DDPG_CFG.replay_buffer_file_name,'*'])))
    buffer_files = glob.glob(abs_file_pattern)
    for f_name in buffer_files:
      with open(f_name,'rb') as f:
        data = pkl.load(f)
        self.buffer.extend(data)

train 和 evaluate

接下来,我们可以定义actor、critic的网络架构参数,训练用的hyper-parameter,包括learning rate、l2 regularization ratio、 buffer size等:

policy 网络架构定义,online 和 target 是同样的架构:

代码语言:javascript复制
import tensorflow as tf
from tensorflow.contrib.layers import variance_scaling_initializer,batch_norm,l2_regularizer

DDPG_CFG = tf.app.flags.FLAGS  # alias
is_training=tf.placeholder(tf.bool, shape=(), name='is_training')

DDPG_CFG.online_policy_net_var_scope = 'online_policy'
DDPG_CFG.target_policy_net_var_scope = 'target_policy'

#因为输入状态参数的scale不统一,我们使用一个batch norm 层,将输入数据的每个维度都
#归一化为均值为0、方差为1:
# -- 1 input norm layers -- 
DDPG_CFG.actor_input_normalizer = batch_norm
DDPG_CFG.actor_input_norm_params =  {
  
  'is_training':is_training,
                                       'data_format':'NHWC',
                                       'updates_collections':None,
                                        'scale':False,  # not gamma. let next fc layer to scale.
                                        'center':True  # beta.
                                        }
# -- 2层fully_connected layers --
DDPG_CFG.actor_n_fc_units = [400, 300]
DDPG_CFG.actor_fc_activations = [tf.nn.elu] * 2
DDPG_CFG.actor_fc_initializers = [variance_scaling_initializer()] * 2
DDPG_CFG.actor_fc_regularizers = [None] * 2
#加一个batch norm层,训练比较稳定
DDPG_CFG.actor_fc_normalizers = [batch_norm] * 2
DDPG_CFG.actor_fc_norm_params =  [{
  
  'is_training':is_training,
                                       'data_format':'NHWC',
                                       'updates_collections':None,
                                       'scale':False,
                                       'center':True
                                         }] *2

# -- 1层 output layer ,由于我们采用tanh和sigmoid函数对输出的action进行了bound,
# 为了避免tanh和sigmoid函数的饱和,在他们之前加一个batch norm层
DDPG_CFG.actor_output_layer_normalizers = batch_norm
DDPG_CFG.actor_output_layer_norm_params = {
  
  'is_training':is_training,
                                       'data_format':'NHWC',
                                       'updates_collections':None,
                                       'scale':False,
                                       'center':False}
DDPG_CFG.actor_output_layer_initializer=tf.random_uniform_initializer(-3e-3,3e-3)

q网络架构定义, online和target是相同的架构:

代码语言:javascript复制
DDPG_CFG.online_q_net_var_scope = 'online_q'
DDPG_CFG.target_q_net_var_scope = 'target_q'

#-- 1层 input norm layer --
DDPG_CFG.critic_input_normalizer = batch_norm
DDPG_CFG.critic_input_norm_params =  {
  
  'is_training':is_training,
                                       'data_format':'NHWC',
                                       'updates_collections':None,
                                       'scale': False,
                                       'center': True
                                      }

# -- 2层 fully-connected layer,使用l2 regularization --
DDPG_CFG.include_action_fc_layer = 2  # in this layer we include action inputs.
DDPG_CFG.critic_n_fc_units = [400, 300]
DDPG_CFG.critic_fc_activations = [tf.nn.elu] * 2
DDPG_CFG.critic_fc_initializers = [variance_scaling_initializer()] * 2
DDPG_CFG.critic_fc_regularizers = [l2_regularizer(scale=DDPG_CFG.critic_reg_ratio,scope=DDPG_CFG.online_q_net_var_scope)] * 2
DDPG_CFG.critic_fc_normalizers = [batch_norm, None]  # 2nd fc including action input and no BN but has bias.
DDPG_CFG.critic_fc_norm_params =  [{
  
  'is_training':is_training,
                                       'data_format':'NHWC',
                                       'updates_collections':None,
                                       'scale': False,
                                       'center': True
                                    }, None]

# -- 1 output layer --
DDPG_CFG.critic_output_layer_initializer = tf.random_uniform_initializer(-3e-3, 3e-3)

hyper parameter定义:

代码语言:javascript复制
## hyper-P
DDPG_CFG.actor_learning_rate = 1e-3
DDPG_CFG.critic_learning_rate = 1e-4
DDPG_CFG.critic_reg_ratio = 1e-2
DDPG_CFG.tau = 0.001
DDPG_CFG.gamma = 0.99
DDPG_CFG.num_training_steps = 25*(10**5)  # 2.5M steps total
DDPG_CFG.eval_freq = 3*10000
DDPG_CFG.num_eval_steps = 1000  # eval steps during training
DDPG_CFG.eval_steps_after_training=2000
DDPG_CFG.batch_size = 64
DDPG_CFG.replay_buff_size = 10**6  # 1M
DDPG_CFG.replay_buff_save_segment_size = 30*3000  # every 180,000 Transition data.
DDPG_CFG.greedy_accel_noise_steps = 3*(10**5)

定义train函数, 这个函数也提供在训练后,对得到的具体模型进行evaluation的模式,由入参eval_mode指定:

代码语言:javascript复制
def train(train_env, agent_action_fn, eval_mode=False):
  action_space = train_env.action_space
  obs_space = train_env.observation_space

  #########实例化 actor,critic, replay buffer#########
  # 训练online网络时的state 输入
  online_state_inputs = tf.placeholder(tf.float32,
                                       shape=(None, obs_space.shape[0]),
                                       name="online_state_inputs")

  # target网络需要的state 输入
  target_state_inputs = tf.placeholder(tf.float32,
                                       shape=online_state_inputs.shape,
                                       name="target_state_inputs")

  # 训练q 网络时的action输入
  online_action_inputs_training_q = tf.placeholder(tf.float32,
                                                   shape=(None, action_space.shape[0]),
                                                   name='online_action_batch_inputs'
                                                   )
  # 用于控制q 网络action输入的条件变量:
  # True: training q .
  # False: training policy.
  cond_training_q = tf.placeholder(tf.bool, shape=[], name='cond_training_q')

  terminated_inputs = tf.placeholder(tf.float32, shape=(None), name='terminated_inputs')
  reward_inputs = tf.placeholder(tf.float32, shape=(None), name='rewards_inputs')

  # for summary text
  summary_text_tensor = tf.convert_to_tensor(str('summary_text'), preferred_dtype=string)
  tf.summary.text(name='summary_text', tensor=summary_text_tensor, collections=[DDPG_CFG.log_summary_keys])

  ##实例化 actor, critic.
  actor = Actor(action_dim=action_space.shape[0],
                online_state_inputs=online_state_inputs,
                target_state_inputs=target_state_inputs,
                input_normalizer=DDPG_CFG.actor_input_normalizer,
                input_norm_params=DDPG_CFG.actor_input_norm_params,
                n_fc_units=DDPG_CFG.actor_n_fc_units,
                fc_activations=DDPG_CFG.actor_fc_activations,
                fc_initializers=DDPG_CFG.actor_fc_initializers,
                fc_normalizers=DDPG_CFG.actor_fc_normalizers,summary_text_tensor
                fc_norm_params=DDPG_CFG.actor_fc_norm_params,
                fc_regularizers=DDPG_CFG.actor_fc_regularizers,
                output_layer_initializer=DDPG_CFG.actor_output_layer_initializer,
                output_layer_regularizer=None,
                output_normalizers=DDPG_CFG.actor_output_layer_normalizers,
                output_norm_params=DDPG_CFG.actor_output_layer_norm_params,
                output_bound_fns=DDPG_CFG.actor_output_bound_fns,
                learning_rate=DDPG_CFG.actor_learning_rate,
                is_training=is_training)


  critic = Critic(online_state_inputs=online_state_inputs,
                  target_state_inputs=target_state_inputs,
                  input_normalizer=DDPG_CFG.critic_input_normalizer,
                  input_norm_params=DDPG_CFG.critic_input_norm_params,
                  online_action_inputs_training_q=online_action_inputs_training_q,
                  online_action_inputs_training_policy=actor.online_action_outputs_tensor,
                  cond_training_q=cond_training_q,
                  target_action_inputs=actor.target_action_outputs_tensor,
                  n_fc_units=DDPG_CFG.critic_n_fc_units,
                  fc_activations=DDPG_CFG.critic_fc_activations,
                  fc_initializers=DDPG_CFG.critic_fc_initializers,
                  fc_normalizers=DDPG_CFG.critic_fc_normalizers,
                  fc_norm_params=DDPG_CFG.critic_fc_norm_params,
                  fc_regularizers=DDPG_CFG.critic_fc_regularizers,
                  output_layer_initializer=DDPG_CFG.critic_output_layer_initializer,
                  output_layer_regularizer = None,
                  learning_rate=DDPG_CFG.critic_learning_rate)

  ## track updates.
  global_step_tensor = tf.train.create_global_step()

  ## 构建整个ddpg computation graph 
  copy_online_to_target_op, train_online_policy_op, train_online_q_op, update_target_op, saver 
    = build_ddpg_graph(actor, critic, reward_inputs, terminated_inputs, global_step_tensor)

  #实例化 replay buffer,指定是否将buffer数据保存到文件
  replay_buffer = ReplayBuffer(buffer_size=DDPG_CFG.replay_buff_size,
                               save_segment_size= DDPG_CFG.replay_buff_save_segment_size,
                               save_path=DDPG_CFG.replay_buffer_file_path,
                               seed=DDPG_CFG.random_seed
                               )
  #从文件加载buffer数据
  if DDPG_CFG.load_replay_buffer_set:
    replay_buffer.load(DDPG_CFG.replay_buffer_file_path)

  #使用summary监控训练中各项数据、参数的变化,并生成图表,在tensorboard中进行观察,非常方便
  sess = tf.Session(graph=tf.get_default_graph())
  summary_writer = tf.summary.FileWriter(logdir=os.path.join(DDPG_CFG.log_dir, "train"),
                                         graph=sess.graph)
  log_summary_op = tf.summary.merge_all(key=DDPG_CFG.log_summary_keys)

  sess.run(fetches=[tf.global_variables_initializer()])

  #copy init params from online to target
  sess.run(fetches=[copy_online_to_target_op])

  #加载之前保存的模型参数checkpoint:
  latest_checkpoint = tf.train.latest_checkpoint(DDPG_CFG.checkpoint_dir)
  if latest_checkpoint:
    tf.logging.info("==== Loading model checkpoint: {}".format(latest_checkpoint))
    saver.restore(sess, latest_checkpoint)

  ####### start training #########
  obs = train_env.reset()  
  transition = preprocess_low_dim(obs)

  n_episodes = 1
  if not eval_mode:
    #training 模式
    for step in range(1, DDPG_CFG.num_training_steps):
      #根据state参数,从online policy网络得到action
      policy_out=sess.run(fetches=[actor.online_action_outputs_tensor],
               feed_dict={online_state_inputs: transition.next_state[np.newaxis, :], is_training: False})[0]
      #通过仿真环境执行action,并保存Transition数据到replay buffer.
      # 这一步里会引入action的noise,参见下面的 <noise设计> 段落。
      transition=agent_action_fn(policy_out, replay_buffer,train_env)

      ##从replay buffer采样一个mini-batch
      state_batch, action_batch, reward_batch, next_state_batch, terminated_batch = 
       replay_buffer.sample_batch(DDPG_CFG.batch_size)

      # ---- 1. 训练 policy网络,注意feed的参数 -----------
      sess.run(fetches=[train_online_policy_op],
               feed_dict = {online_state_inputs: state_batch,
                            cond_training_q: False,
                            online_action_inputs_training_q: action_batch,  # feed but not used.
                            is_training: True
                            })

      # ---- 2. 训练 q 网络 --------------
      sess.run(fetches=[train_online_q_op],
                feed_dict={ online_state_inputs: state_batch,
                            cond_training_q: True,
                            online_action_inputs_training_q: action_batch,
                            target_state_inputs: next_state_batch,
                            reward_inputs: reward_batch,
                            terminated_inputs: terminated_batch,
                            is_training: True})

      # ----- 3. soft update target网络 ---------
      sess.run(fetches=[update_target_op], feed_dict=None)

      # 每隔 eval_freq steps,我们进行一次evaluation,以便在训练结束后选择好的模型:
      if step % DDPG_CFG.eval_freq == 0:
        evaluate(env=train_env,
                 num_eval_steps=DDPG_CFG.num_eval_steps,
                 preprocess_fn=preprocess_low_dim,
                 estimate_fn=lambda state: sess.run(fetches=[actor.online_action_outputs_tensor],
                                                      feed_dict={online_state_inputs:state,
                                                      is_training:False} ),
                 summary_writer=summary_writer,
                 saver=saver, sess=sess, global_step=step,
                 log_summary_op=log_summary_op,summary_text_tensor=summary_text_tensor)

      if transition.terminated:
        transition = preprocess_low_dim(train_env.reset())
        n_episodes  =1
        continue  # begin new episode

  #evaluate 模式
  else: 
    evaluate(env=train_env,
           num_eval_steps=DDPG_CFG.eval_steps_after_training,
           preprocess_fn=preprocess_low_dim,
           estimate_fn=lambda state: sess.run(fetches=[actor.online_action_outputs_tensor],
                                              feed_dict={online_state_inputs: state,
                                                         is_training: False}),
           summary_writer=summary_writer,
           saver=None, sess=sess, global_step=0,
           log_summary_op=log_summary_op, summary_text_tensor=summary_text_tensor)

  sess.close()
  train_env.close()

定义evaluate函数如下:

代码语言:javascript复制
def evaluate(env, num_eval_steps, preprocess_fn, estimate_fn, summary_writer, saver, sess,global_step,log_summary_op,summary_text_tensor):
  total_reward = 0
  episode_reward = 0
  max_episode_reward = 0
  n_episodes = 0
  n_rewards = 0
  terminated = False
  transition = preprocess_fn(state=env.reset())
  tf.logging.info(' ####### start evaluate @ global step:{}## '.format(global_step))

  for estep in range(1,num_eval_steps):      
    policy_out = estimate_fn(transition.next_state[np.newaxis,:])
    action = policy_output_to_deterministic_action(policy_out,env.action_space)
    (state, reward, terminated) = env_step(env, action)

    transition = preprocess_fn(state)

    # record every reward
    total_reward  = reward
    episode_reward  = reward

    if reward != 0:
      n_rewards  = 1  # can represent effective steps in episode

    if terminated:
      n_episodes  = 1
      if episode_reward > max_episode_reward:
        max_episode_reward = episode_reward
        episode_reward = 0            
      transition = preprocess_fn(env.reset())

  # -- end for estep ---
  avg_episode_reward = total_reward / max(1, n_episodes)
  avg_episode_steps = n_rewards / max(1, n_episodes)

  #训练时我们才保存model参数
  saved_name='eval_only_not_save_model'
  if saver is not None: 
    saved_name = save_model(saver, sess, global_step)
  write_summary(summary_writer, global_step, avg_episode_reward, max_episode_reward,
                avg_episode_steps,saved_name,sess,log_summary_op,
                summary_text_tensor)

我们将evaluation的结果,通过summary保存, 可以通过tensorboard进行查看:

代码语言:javascript复制
def write_summary(writer, global_step, avg_episode_reward, max_episode_reward, avg_episode_steps,saved_name,sess,log_summary_op, summary_text_tensor):
  eval_summary = tf.Summary()  # protocol buffer
  eval_summary.value.add(node_name='avg_episode_reward',simple_value=avg_episode_reward, tag="train_eval/avg_episode_reward")
  eval_summary.value.add(node_name='max_episode_reward', simple_value=max_episode_reward, tag="train_eval/max_episode_reward")
  eval_summary.value.add(node_name='avg_episode_steps', simple_value=avg_episode_steps, tag="train_eval/avg_episode_steps")
  writer.add_summary(summary=eval_summary, global_step=global_step)

  log_info = 'eval result : global_step:{} avg_episode_reward:{}  max_episode_reward:{} avg_episode_steps:{} n saved_file: {} '.format(global_step,
                                                                                     avg_episode_reward,
                                                                                      max_episode_reward,
                                                                                      avg_episode_steps,
                                                                                      saved_name)
  log_summary=sess.run(fetches=[log_summary_op],
                       feed_dict={summary_text_tensor:log_info})
  writer.add_summary(summary=log_summary[0], global_step=global_step)
  writer.flush()

通过saver对象保存模型参数:

代码语言:javascript复制
def save_model(saver, sess,global_step):
  # save model. will save both online and target networks.
  return saver.save(sess=sess, save_path=DDPG_CFG.checkpoint_dir, global_step=global_step)

接下来,我们就可以创建torcs环境,开始训练: 注意:为了加速训练,torcs使用text模式(-T选项) ,即没有画面显示。你可以更改下面的vision选项打开画面显示。 当你想选择赛道时,在shell里通过运行 sudo torcs , 进入 practice->configure race 界面进行配置,再退出,然后开始训练。

代码语言:javascript复制
if __name__ == "__main__":
  tf.logging.info("@@@ start ddpg training gym_torcs @@@ start time:{}".format(time.ctime()))
  # Generate a Torcs environment
  env_train = torcs_env_wrapper(vision=False, throttle=True, gear_change=False,port=3101)
  train(env_train,agent_action,eval_mode=False)

gym torcs的reward设计和episode终结条件:

我们定义reward和episode终结条件如下:

代码语言:javascript复制
- 每一步action执行时,汽车的当前速度投影到赛道正前方向的值,参照  [yanpanlau的描述](https://github.com/yanpanlau/DDPG-Keras-Torcs):
代码语言:javascript复制
- 当汽车与周围发生碰撞时, reward=-1 作为惩罚,但并不结束当前episode
- 当汽车在持续200步进展缓慢(前后两步的距离差小于0.05米)时,给出reward=-1作为惩罚,并结束当前episode
- 汽车如果掉头行驶,则结束当前episode

这样设计的目的也是了实现exploration,即让汽车尽量多的往前跑,去覆盖到赛道的各个部分,并且避免产生大量低速行驶的数据。

具体实现参见本项目中的gym_torcs.py文件

noise的设计

noise的引入是为了exploration,ddpg paper中使用的是OU noise,但经过我实践,标准的OU noise应用在torcs这种环境效果并不算好, 我参考了yanpanlau 的做法, 即在训练一开始,引入大量直线加速的noise, 然后随着训练步骤衰减该noise, 这样做的效果很明显,就是让模型快速学习到高速行驶状态(车速也是状态输入的一部分)下的q网络和policy网络参数, 而且在速度的保证下,可以往前开的尽量的远,探索到更多赛段。你可以探索更多有效的noise,去帮助ddpg寻找最优解。 我管这种noise 叫做greedy accelerate:

代码语言:javascript复制
def greedy_function(x, mu, theta, sigma):
        return theta * (mu - x)   sigma * np.random.randn()

epsilon=1
def policy_output_to_stochastic_action(output, action_space):
  global epsilon
  output = np.squeeze(output, axis=0)

  epsilon -= 1.0 / DDPG_CFG.greedy_accel_noise_steps

  greedy_noise=np.array( [max(epsilon, 0) * greedy_function(output[0], 0.0, 0.60, 0.30),  # steer
                  max(epsilon, 0) * greedy_function(output[1], 0.5, 1.00, 0.10),  # accel
                  max(epsilon, 0) * greedy_function(output[2], -0.1, 1.00, 0.05)]) # brake

  stochastic_action = greedy_noise   output
  bounded = np.clip(stochastic_action, action_space.low, action_space.high)
  return bounded

训练结果

使用机器配置:cpu i5-3450, gpu Nvidia 1050Ti, 训练1M steps 耗时 13小时, 基本可以得到一个跑分(evo-3-l赛道,Alpine赛道)在10k/episode 以上的模型。 ddpg paper中的模型训练2.5M steps后,得到的 torcs最好跑分为1840/episode, 当然paper中的模型是为了 通用(也用于atari、mujuco等仿真环境), 对于torcs没有特别的处理。 我得到的模型应该是有overfit torcs具体某一条赛道的成分存在。 观察 q_loss的曲线,是往下收敛的:

观察policy loss 曲线,虽然没有往下收敛,基本比较平稳,需要持续的训练才能明显的收敛。我们可以取一个较好的中间结果,也可以看到赛车可以30km左右的速度顺利过弯了。

再看一下training过程中evaluate的结果,可以看到中间有些步骤的跑分可以达到10k,由于我们每次evaluation都保存了模型参数,我们 可以取出对应的模型:

上述1M steps训练后得到的不算是最优模型: – 不能稳定跑完整条赛道,只能跑完前面一段 – 速度40km上下,还不能尽力加速, – 多赛道的表现不好,因为我只在一条赛道上进行训练, 有overfit存在。

我们验证了ddpg算法的正确性。 ddpg的训练过程存在很大的不稳定性,后期的训练过程很可能覆盖前期训练的结果,所以我们需要大量的反复训练,然后在过程中 寻找一个相对最优解。 如果你想自己动手训练一下,我建议两点:

代码语言:javascript复制
    -- 加上一些基本的Deep learning 技巧去调试:比如cross validation寻找最优hyper parameters,
        early stopping等。
    -- 算法上,更新Deepmind 2017年发表的Rainbow

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/149034.html原文链接:https://javaforall.cn

0 人点赞