详解PLANET代码(tensorflow)如何加入SAC功能

2019-07-17 16:58:54 浏览数 (1)

先说为什么要加?

SAC 算法本质是经过熵强化的回报值最大化算法。在我们单独跑的其他实验中,包括SAC RNN表现出很好的性能,1.replay buffer使它的采样效率增高 2.尤其在高维连续动作空间,对动作的稳定性连续性有比较好的提升。

1.思路:

整理记录一下思路。如果你拿到的原代码盘根错节地耦合在一起,这种复杂的整体就像瑞士手表的内部结构,牵一发而动全身。而你现在需要加入新功能,且不是类比和可模仿的添加,而是加入如sac这种原代码中不存在的功能,那你可以参考如下:

需要对原代码足够清晰,不是大概逻辑,而是从数据收集,存取,使用到模型中,模型如何运转 的每一步细节非常清晰,最好画成精确的图。因为对于tensorflow,你需要始终有一个概念,tensorflow 是一个静态图,它像一个整体的精密的仪器,每个结点之间如何链接(data dependency and control dependency),所有这些关系你先要设计确定好,这一步叫define model. 等图画好了,建立session,feed数据,去run它一次,你要想象数据流在整个仪器中按你设计好的方式流动一次,run 50000 step就是50000个batch的数据一一进到装置中流动。

2. 如何搞定变态

planet代码写的有点变态,它不是模块化清晰地呈现出 数据模型设计session.run,和模型存取。下面就这四个模块来描述下planet代码分别是如何实现这四个模块的:

2.1  session.run部分

如图中横轴(图给自己看的,看不清没关系,下面文字描述):

2.1.1

session.run每run一个step,数据就在装置中流动一次。它设计了phase这个概念,它加入train和test两个phase. train phase 有50000 个step ,test phase 有100个 step. 

加入好phase后,它每一次run,就会判断这是什么阶段,前50000步都是train,所以每一个step都会这么流动: 1会从train_episode目录中读取数据,计算loss,2接着计算gradient, 3做planning给出action 从而得到一个episode 的数据存入train_episodes目录。

50000 step之后是100 step , 这100 step每run step 会这么流动:会从test_episode目录中读取数据,但并没用上。loss为0所以没有反向传播不会计算和更新gradient,只会做planning输入action从而得到一个episode数据存入test_episode目录。

以上这些控制是通过tf.cond阀门来控制数据流向的,阀门决定了哪一条路是通路,run的是最后的结点,与这个最后结点依次建立关系的且通顺的路才能有数据流过,好像电路一样。

代码语言:javascript复制
train_loss = tf.cond(
    tf.equal(phase, 'train'),
    lambda: loss,
    lambda: 0 * tf.get_variable('dummy_loss', (), tf.float32))

2.1.2

如果一开始没有生成目录,从start函数进入,会进入random_episode,从而分别收集到5个episode存入train_episode和test_episode目录。

代码语言:javascript复制
def random_episodes(env_ctor, num_episodes, output_dir=None):

综合上述两点,整个代码在Testing=False,即正常训练时是这样的:

train_episode和test_episode目录先各收集到5个episode,然后开始5万步,每一步从train_episode取数据,去求gradient,再planning出一个episode。5万步之后,是100步的test phase,会从test_episode得数据,不求gradient,或者更准备的描述法是:没有数据流动经过装置中的求梯度结点。它会planning出一个episode (这步是由下图代码决定的).

代码语言:javascript复制
with tf.variable_scope('simulation'):
  sim_returns = []
  for name, params in config.sim_summaries.items():
    # These are expensive and equivalent for train and test phases, so only
    # do one of them.
    sim_summary, sim_return = tf.cond(
        tf.equal(graph.phase, 'test'),
        lambda: utility.simulate_episodes(config, params, graph, name),
        lambda: ('', 0.0),
        name='should_simulate_'   params.task.name)
    summaries.append(sim_summary)
    sim_returns.append(sim_return)

2.2 数据部分:

2.2.1

首先若进入start 函数是先收集随机数据存到目录中,若进入resume函数,是直接从目录中读取 .npz

2.2.2

数据的存:从env得到并存数据到目录:这个env其实是封装好的,具体:它发出命令与env进程间通信,得到反馈。(另外env那个进程本身还有子进程,分为carla server和carla client不展开说)。

代码语言:javascript复制
def random_episodes(env_ctor, num_episodes, output_dir=None):
  env = env_ctor()  # env is an <ExternalProcess object>.
  env = wrappers.CollectGymDataset(env, output_dir)
  episodes = []
  for _ in range(num_episodes):
    policy = lambda env, obs: env.action_space.sample()
    done = False
    obs = env.reset()
    # cnt = 0
    while not done:
      action = policy(env, obs)
      obs, _, done, info = env.step(action)  # env.step
    #   cnt  = 1
    # print(cnt)
    episodes.append(info['episode'])  # if done is True, info stores the 'episode' information and 'episode' is written in a file(e.g. "~/planet/log_debug/00001/test_episodes").
  return episodes

step层层进去得到数据,然后

代码语言:javascript复制
def _process_step(self, action, observ, reward, done, info):

这个函数是将数据整理放到目录中。而我改的就是这个函数。

我需要的数据是:[o,a,r,o_next,d]. 考虑到restore还有它后面模型结构的情况,要改动最小,所以我决定只加上o_next(即代码中image_next),其他原数据保持不变。

数据的取:

代码语言:javascript复制
def _read_episodes_scan(
    directory, batch_size, every, max_episodes=None, **kwargs):
  recent = {}
  cache = {}
  while True:
    index = 0
    for episode in np.random.permutation(list(recent.values())):
      yield episode
      index  = 1
      if index > every / 2:
        break
    for episode in np.random.permutation(list(cache.values())):
      index  = 1
      yield episode
      if index > every:
        break
    # At the end of the epoch, add new episodes to the cache.
    # print(index, len(recent), max_episodes, len(cache))
    cache.update(recent)
    recent = {}
    filenames = tf.gfile.Glob(os.path.join(directory, '*.npz'))
    filenames = [filename for filename in filenames if filename not in cache]
    if max_episodes:
      filenames = filenames[:max_episodes - len(cache)]
    for filename in filenames:
      recent[filename] = _read_episode(filename, **kwargs)

这个函数做了什么:

一开始目录里的数据全读取,之后再每次得到batch数据时,会分为recent和cache,cache是已经读过的文件列表,recent是目录里新出现的文件。最多各读取5个。这样兼顾了新旧数据,有replay buffer的功效。具体新旧如何配置,可以自行调整。

数据读取利用了这个dataset机制,是一个迭代器,每次从生成器取出batchsize个chunk.

代码语言:javascript复制
train = tf.data.Dataset.from_generator(                                         # train.output_shapes: e.g. 'image' shape(?, 64, 64, 3)
    functools.partial(loader, train_dir, shape[0], **kwargs), dtypes, shapes)

通过这个函数得到batch数据,注意,这些都是整个图中结点,都用最后的run每一step,来真正的生成数据。

代码语言:javascript复制
data = get_batch(datasets, trainer.phase, trainer.reset)

2.2.3

SAC算法中数据分两部分:随机部分和用policy生成的部分。

随机部分对比下改好的数据和原始数据:

policy生成部分想了两个方案:1把原始代码的planning生成a替换为policy生成a,但我觉得这里面很冗余,细节也不敢随意更改又是个大坑,本着改动最小,因此放弃 2.所以在define model 函数中加入类似如下函数,可以让他在每一个step去生成一个或若干个episode的数据。

代码语言:javascript复制
def random_episodes(env_ctor, num_episodes, output_dir=None):
  env = env_ctor()  # env is an <ExternalProcess object>.
  env = wrappers.CollectGymDataset(env, output_dir)
  episodes = []
  for _ in range(num_episodes):
    policy = lambda env, obs: env.action_space.sample()
    done = False
    obs = env.reset()
    # cnt = 0
    while not done:
      action = policy(env, obs)
      obs, _, done, info = env.step(action)  # env.step
    #   cnt  = 1
    # print(cnt)
    episodes.append(info['episode'])  # if done is True, info stores the 'episode' information and 'episode' is written in a file(e.g. "~/planet/log_debug/00001/test_episodes").
  return episodes

模型设计部分:

不改动它原来的结构,用tf.cond phase去控制数据不往它的loss流动,主要改动会在这个函数:模仿它的写法,1在单独的文件写好sac的模型,2在config 时配置好heads。其他基本就没什么大问题了。

代码语言:javascript复制
def compute_losses(
    loss_scales, cell, heads, step, target, prior, posterior, mask,
    free_nats=None, debug=False):
  features = cell.features_from_state(posterior)      # [s,h]
  losses = {}
  for key, scale in loss_scales.items():
    # Skip losses with zero or None scale to save computation.
    if not scale:
      continue
    elif key == 'divergence':
      loss = cell.divergence_from_states(posterior, prior, mask)
      if free_nats is not None:
        loss = tf.maximum(tf.cast(free_nats, tf.float32), loss)
      loss = tf.reduce_sum(loss, 1) / tf.reduce_sum(tf.to_float(mask), 1)
    elif key == 'global_divergence':
      global_prior = {
          'mean': tf.zeros_like(prior['mean']),
          'stddev': tf.ones_like(prior['stddev'])}
      loss = cell.divergence_from_states(posterior, global_prior, mask)
      loss = tf.reduce_sum(loss, 1) / tf.reduce_sum(tf.to_float(mask), 1)
    elif key in heads:
      output = heads[key](features)   # decoder is used.
      loss = -tools.mask(output.log_prob(target[key]), mask)
    else:
      message = "Loss scale of head '{}' is not used."
      print(message.format(key))
      continue
    # Average over the batch and normalize by the maximum chunk length.
    loss = tf.reduce_mean(loss)
    losses[key] = tf.check_numerics(loss, key) if debug else loss
  return losses

restore 部分:

注意用到filter variable功能去restore 它原有的参数,致使与我新加的参数无关。

代码语言:javascript复制
def add_saver(
    self, include=r'.*', exclude=r'.^', logdir=None, load=True, save=True,
    checkpoint=None):
  """Add a saver to save or load variables.

  Args:
    include: One or more regexes to match variable names to include.
    exclude: One or more regexes to match variable names to exclude.
    logdir: Directory for saver to store and search for checkpoints.
    load: Whether to use the saver to restore variables.
    save: Whether to use the saver to save variables.
    checkpoint: Checkpoint name to load; None for newest.
  """
  variables = tools.filter_variables(include, exclude)
  saver = tf.train.Saver(variables, keep_checkpoint_every_n_hours=2)
  if load:

思路和细节就是这样的,接下来就开始加代码吧。

0 人点赞