本文来自云栖社区官方钉群 "Python 技术进阶 http://tb.cn/UQkRRHw", 了解相关信息可以关注 "Python 技术进阶 http://tb.cn/UQkRRHw".
1,Q-learning 回顾
Q-learning 的 算法过程如下图所示:
在 Q-learning 中, 我们维护一张 Q 值表, 表的维数为: 状态数 S * 动作数 A, 表中每个数代表在当前状态 S 下可以采用动作 A 可以获得的未来收益的折现和. 我们不断的迭代我们的 Q 值表使其最终收敛, 然后根据 Q 值表我们就可以在每个状态下选取一个最优策略.
Q 值表的更新公式为:
公式中, Q(S,A) 我们可以称做 Q 估计值, 即我们当前估计的 Q 值, 而:
称为 Q-target, 即我们使用贝尔曼方程加贪心策略认为实际应该得到的奖励, 我们的目标就是使我们的 Q 值不断的接近 Q-target 值.
2, 深度 Q 网络(Deep - Q - Network)
2.1 DQN 简介
为什么会出现 DQN 呢
在普通的 Q-learning 中, 当状态和动作空间是离散且维数不高时可使用 Q-Table 储存每个状态动作对的 Q 值, 而当状态和动作空间是高维连续时, 使用 Q-Table 不现实.
两篇 DQN 奠基之作
[1]Playing Atari with Deep Reinforcement Learning
[2]Human-level control through deep reinforcement learning
如何将原始的 Q-learning 转换成深度学习问题
将 Q-Table 的更新问题变成一个函数拟合问题, 相近的状态得到相近的输出动作. 如下式, 通过更新参数 θ 使 Q 函数逼近最优 Q 值 . 因此, DQN 就是要设计一个神经网络结构, 通过函数来拟合 Q 值, 即:
2.2 DL 和 RL 结合带来的问题
1,DL 需要大量带标签的样本进行监督学习; RL 只有 reward 返回值, 而且伴随着噪声, 延迟 (过了几十毫秒才返回), 稀疏(很多 State 的 reward 是 0) 等问题;
2,DL 的样本独立; RL 前后 state 状态相关;
3,DL 目标分布固定; RL 的分布一直变化, 比如你玩一个游戏, 一个关卡和下一个关卡的状态分布是不同的, 所以训练好了前一个关卡, 下一个关卡又要重新训练;
4, 过往的研究表明, 使用非线性网络表示值函数时出现不稳定等问题.
2.3 DQN 解决问题方法
那么 DQN 是如何解决上述问题的呢?
1, 通过 Q-Learning 使用 reward 来构造标签(对应问题 1)
2, 通过 experience replay(经验池)的方法来解决相关性及非静态分布问题(对应问题 2,3)
3, 使用一个神经网络产生当前 Q 值, 使用另外一个神经网络产生 Target Q 值(对应问题 4)
构造标签
对于函数优化问题, 监督学习的一般方法是先确定 Loss Function, 然后求梯度, 使用随机梯度下降等方法更新参数. DQN 则基于 Q-Learning 来确定 Loss Function. 我们想要使 q-target 值和 q-eval 值相差越小越好. DQN 中的损失函数是:
这里 yi 是根据上一个迭代周期或者说 target.NET 网络的参数计算出的 q-target 值, 跟当前网络结构中的参数无关, yi 的计算如下:
这样, 整个目标函数就可以通过随机梯度下降方法来进行优化:
经验回放
经验池的功能主要是解决相关性及非静态分布问题. 具体做法是把每个时间步 agent 与环境交互得到的转移样本 (st,at,rt,st+1) 储存到回放记忆单元, 要训练时就随机拿出一些 (minibatch) 来训练.(其实就是将游戏的过程打成碎片存储, 训练时随机抽取就避免了相关性问题)
双网络结构
在 Nature 2015 版本的 DQN 中提出了这个改进, 使用另一个网络 (这里称为 target_net) 产生 Target Q 值. 具体地, Q(s,a;θi) 表示当前网络 eval_net 的输出, 用来评估当前状态动作对的值函数; Q(s,a;θ−i) 表示 target_net 的输出, 代入上面求 TargetQ 值的公式中得到目标 Q 值. 根据上面的 Loss Function 更新 eval_net 的参数, 每经过 N 轮迭代, 将 MainNet 的参数复制给 target_net.
引入 target_net 后, 再一段时间里目标 Q 值使保持不变的, 一定程度降低了当前 Q 值和目标 Q 值的相关性, 提高了算法稳定性.
2.4 DQN 算法流程
NIPS 2013 版
Nature 2015 版
可以看到, 两版的 DQN 都使用了经验池, 而 2015 版的 DQN 增加了 target.NET, 提高了算法稳定性.
3,DQN 实现 DEMO
找了很多 DQN 的例子, 有原版的实现 Atari 的, 也有 Flappy Bird 的, 但是最简单的还是莫烦大神的 Demo,GitHub 地址是:.
在介绍整个 Demo 前, 我们介绍两种 DQN 的实现方式, 一种是将 s 和 a 输入到网络, 得到 q 值, 另一种是只将 s 输入到网络, 输出为 s 和每个 a 结合的 q 值. 这里莫烦大神的代码采取了后一种方式.
如果你对 DQN 的原理有比较深刻的认识, 那么读莫烦大神的代码也并不是十分困难. 这里我们想要实现的效果类似于寻宝.
其中, 红色的方块代表寻宝人, 黑色的方块代表陷阱, 黄色的方块代表宝藏, 我们的目标就是让寻宝人找到最终的宝藏.
这里, 我们的状态可以用横纵坐标表示, 而动作有上下左右四个动作. 使用 tkinter 来做这样一个动画效果. 宝藏的奖励是 1, 陷阱的奖励是 - 1, 而其他时候的奖励都为 0.
接下来, 我们重点看一下我们 DQN 相关的代码.
定义相关输入
这了, 我们用 s 代表当前状态, 用 a 代表当前状态下采取的动作, r 代表获得的奖励, s_代表转移后的状态.
- self.s = tf.placeholder(tf.float32,[None,self.n_features],name='s')
- self.s_ = tf.placeholder(tf.float32,[None,self.n_features],name='s_')
- self.r = tf.placeholder(tf.float32,[None,],name='r')
- self.a = tf.placeholder(tf.int32,[None,],name='a')
经验池
- def store_transition(self,s,a,r,s_):
- if not hasattr(self, 'memory_counter'):
- self.memory_counter = 0
- # hstack:Stack arrays in sequence horizontally
- transition = np.hstack((s,[a,r],s_))
- index = self.memory_counter % self.memory_size
- self.memory[index,:] = transition
- self.memory_counter += 1
双网络结构
target_net 和 eval_net 的网络结构必须保持一致, 这里我们使用的是两层全链接的神经网络, 值得注意的一点是对于 eval_net 来说, 网络的输入是当前的状态 s, 而对 target_net 网络来说, 网络的输入是下一个状态 s_, 因为 target_net 的输出要根据贝尔曼公式计算 q-target 值, 即
代码如下:
- w_initializer, b_initializer = tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1)
- # ------------------ build evaluate_net ------------------
- with tf.variable_scope('eval_net'):
- e1 = tf.layers.dense(self.s,20,tf.nn.relu,kernel_initializer=w_initializer,
- bias_initializer=b_initializer,name='e1'
- )
- self.q_eval = tf.layers.dense(e1,self.n_actions,kernel_initializer=w_initializer,
- bias_initializer=b_initializer,name='q')
- # ------------------ build target_net ------------------
- with tf.variable_scope('target_net'):
- t1 = tf.layers.dense(self.s_, 20, tf.nn.relu, kernel_initializer=w_initializer,
- bias_initializer=b_initializer, name='t1')
- self.q_next = tf.layers.dense(t1, self.n_actions, kernel_initializer=w_initializer,
- bias_initializer=b_initializer, name='t2')
每隔一定的步数, 我们就要将 target_net 中的参数复制到 eval_net 中:
- t_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='target_net')
- e_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='eval_net')
- with tf.variable_scope('soft_replacement'):
- self.target_replace_op = [tf.assign(t,e) for t,e in zip(t_params,e_params)]
计算损失并优化
首先, 对于 eval_net 来说, 我们只要得到当前的网络输出即可, 但是我们定义的网络输出是四个动作对应的 q-eval 值, 我们要根据实际的 a 来选择对应的 q-eval 值, 这一部分的代码如下:
- with tf.variable_scope('q_eval'):
- # tf.stack
- #a = tf.constant([1,2,3])
- # b = tf.constant([4,5,6])
- # c = tf.stack([a,b],axis=1)
- # [[1 4]
- # [2 5]
- # [3 6]]
- a_indices = tf.stack([tf.range(tf.shape(self.a)[0], dtype=tf.int32), self.a], axis=1)
- # 用 indices 从张量 params 得到新张量
- # indices = [[0, 0], [1, 1]]
- # params = [['a', 'b'], ['c', 'd']]
- # output = ['a', 'd']
- # 这里 self.q_eval 是 batch * action_number,a_indices 是 batch * 1, 也就是说选择当前估计每个动作的 Q 值
- self.q_eval_wrt_a = tf.gather_nd(params=self.q_eval, indices=a_indices)
中间有几个函数不太了解的, 上面都有详细的注释, 如果还不是很理解的话, 大家可以百度或者阅读相应函数的源码.
对于 target_net 网络来说, 我们要根据下面的式子来计算 q-target 值:
第一部分的 R 我们是已经得到了的, 剩下的就是根据贪心策略选择四个输出中最大的一个即可:
- with tf.variable_scope('q_target'):
- q_target = self.r + self.gamma * tf.reduce_max(self.q_next,axis=1,name='Qmax_s_')
- # 一个节点被 stop 之后, 这个节点上的梯度, 就无法再向前 BP 了
- self.q_target = tf.stop_gradient(q_target)
接下来, 我们就可以定义我们的损失函数并选择优化器进行优化:
- with tf.variable_scope('loss'):
- self.loss = tf.reduce_mean(tf.squared_difference(self.q_target,self.q_eval_wrt_a,name='TD_error'))
- with tf.variable_scope('train'):
- self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)
网络的训练
每隔一定的步数, 我们就要将 eval_net 中的参数复制到 target_net 中, 同时我们要从经验池中选择 batch 大小的数据输入到网络中进行训练.
- def learn(self):
- if self.learn_step_counter % self.replace_target_iter == 0:
- self.sess.run(self.target_replace_op)
- print('\ntarget_params_replaced\n')
- if self.memory_counter> self.memory_size:
- sample_index = np.random.choice(self.memory_size,size=self.batch_size)
- else:
- sample_index = np.random.choice(self.memory_counter,size = self.batch_size)
- batch_memory = self.memory[sample_index,:]
- _,cost = self.sess.run(
- [self._train_op,self.loss],
- feed_dict={
- self.s:batch_memory[:,:self.n_features],
- self.a:batch_memory[:,self.n_features],
- self.r:batch_memory[:,self.n_features+1],
- self.s_:batch_memory[:,-self.n_features:]
- }
- )
剩下的代码就不介绍啦, 大家不妨去 GitHub 上 fork 大神的代码, 跟着进行练习, 相信会对 DQN 的原理有一个更进一步的认识.
来源: https://yq.aliyun.com/articles/691458