跳转至

第十一章:深度强化学习DQN

上一章我们学习了强化学习算法,其核心思想是要找到一个最优策略来和环境交互,最优策略意味着每轮游戏都尽可能拿到最大的回报。之前的例子中,策略函数都是以一个矩阵的形式保存,也就是Q表。神经网络也是一种函数,本章将介绍如何使用神经网络来代替Q表,让神经网络处理状态输入,计算得到动作输出,这就是深度强化学习的经典算法DQN。但是直接把神经网络和强化学习进行结合也会遇到一些问题,本章后面也会介绍如何来克服这些问题。

一、什么是深度强化学习

我们再来回顾一下上一章强化学习的几个要素。

  • 主体(agent):是进行试错、学习的主体。在冰湖问题中主体就是走迷宫的机器人。

  • 环境(environment):对主体的行动作出反馈的部分,在冰湖问题中就是迷宫本身。

  • 动作(action):主体可以选择的行动有哪些,在冰湖问题中就是上下左右四个可以选择的动作。

  • 状态(state):反映主体的一系列状况,在冰湖问题中某一时间的状态就是主体所处的位置坐标。

  • 策略 (strategy):主体的应对方案,是从输入状态,到输出动作的一个函数。在冰湖问题中就是一个Q表,它的输入是位置坐标,输出是四种方向动作。这个Q表是需要主体和环境交互来学习得到的。

  • 回报(reward):主体在环境中进行了某些动作之后得到的回报,在冰湖问题中,如果到达右下角终点可以获得+1的回报,其它情况下都是0的回报。

用大白话讲强化学习就是吃一堑长一智。主体刚被扔到环境中时是一无所知的,它通过不断的在环境中探索,从而得到反馈,对不同的反馈来反思自己之前的行动,来修正自己的策略。在上一章的例子中,主体是用Q表的方式来保存策略,很简单方便,但是也有点傻,有点死记硬背的感觉。

换一个日常生活中的例子。当温度升高时,我们需要减少身上的衣服,当温度降低时,我们需要增加身上的衣服,这是一个简单的生活策略。状态是外界温度,动作是增加或减少衣物。如果是用Q表的话,那我们可能要保存很多个温度状态。但我们大脑中可能只有几个关键的温度,例如30度以上就只穿短袖。10度以下就要加羽绒服了。人脑的策略更类似于一个数学函数。这个函数在可以更好的反映状态和动作之间的关系。

我们回想一下第八章内容,神经网络可以拟合任何函数,所以也可以用于强化学习问题。当我们用一个神经网络来表示策略函数,并用强化学习思路来学习策略时,就成为深度强化学习算法了。我们先回忆一下第八章神经网络是如何来训练的。神经网络中的结构是事先给定的,网络参数是随机赋初始值。它需要外界给一个输入,计算到预测值,再将预测值和目标值之间计算损失。最后使用最优化方法,来调整网络参数,使得损失最小。

graph LR
subgraph 数据
输入
目标
end
subgraph 模型
网络结构
网络参数
end

    输入-->模型-->预测
    目标-->损失计算
    预测-->损失计算
    损失计算--参数优化-->模型

如果把神经网络用于强化学习的策略函数,我们自然想到用它来代替原来的Q表。神经网络的输入就是每个行动的状态,那么其目标应该是什么呢?自然想到可以用Q-Learning中的target来当做目标了。下图是神经网络结合强化学习后的整体训练流程,神经网络会接受当前状态和下一状态两种信息输入,计算对应的两种价值,即当前状态行动价值和下一状态行动价值的最大值。并据此选择行动,和环境交互产生收益。根据和上一章同样的公式计算出目标和损失,再用梯度下降来更新修正神经网络。因为用了神经网络来代替Q-Learning中的Q表,所以这一套流程方法就称之为Deep Q-Network,简称为DQN。

graph TD
    当前状态-->神经网络-->当前状态行动价值
    神经网络--选择行动-->环境-->产生收益
    环境-->下一状态-->神经网络-->下一状态行动价值MAX
    当前状态行动价值-->计算Loss
    target-->计算Loss
    下一状态行动价值MAX--> target
    产生收益--> target
    计算Loss--更新-->神经网络

二、DQN解决冰湖问题

下面我们来看看具体到冰湖问题,是如何来和神经网络结合的。

首先我们需要将状态进行一些处理。位置状态是一种离散的整数变量,在表格中可以用第一行和第二行为分别保存Q值,但是它并不适合直接输入到神经网络中。为什么呢?就好像一个学校中,有若干个班级,1班和5班中的数字只是表示班级名称的数字,并不意味着5班大于1班。所以要将它转换成独热编码,方便后续处理。独热编码的思路是,将 1 班编码为向量 [1,0,0,0,0],它在第 1 个位置上取 1,其他位置取 0。将 5 班编码为向量[0,0,0,0,1]。这种编码被称为独热编码。

下面我们编写了one_hot函数来实现这个功能。

def one_hot(x,size):
    result = np.zeros(size)
    result[x] = 1
    return result 
我们在第八章里使用了pytorch模块来处理神经网络的训练,因此,所有的处理数据均需要转换成pytorch的格式,也就是tensor格式。在conv2tensor函数中,先将输入参数转成独热编码,再使用from_numpy函数转换成tensor格式。
def conv2tensor(x):
    x = one_hot(x,16)
    x = torch.from_numpy(x).float()
    return x

和第十章的思路类似,我们需要一个行动选择函数,get_action的功能是要能探索环境和利用环境。函数get_action有两个参数,一个是q_value,这个参数是神经网络输出的Q值,因为它是神经网络的输出值,它天然是带有梯度信息,而我们在使用这个值时,是不需要它带上梯度信息的,因此使用detach来实现这个目的。再转换成numpy的格式,最后是用squeeze函数去除多余的维度。这种连续调用函数方式称为链式调用。另一个参数是n_game,它是表示当前游戏进行的轮次数量。在游戏前期,我们需要进行更多的探索来获取信息,而在游戏后期,则需要进行更多的利用。所以会根据游戏轮次来调整二者的比例。当游戏早期时,n_game较少,epsilon会比较大,随机值小于epsilon的概率比较大,final_move会更多的从随机产生。当游戏轮次越多,n_game较大,则final_move会基于最大值来选择行动。

def get_action(q_value, n_game):
    q_value_np = q_value.clone().detach().numpy().squeeze()
    epsilon = 2000 - n_game
    if random.randint(0, 2000) < epsilon:
        prob = np.exp(q_value_np)/np.exp(q_value_np).sum()
        final_move = np.random.choice(len(prob), p=prob)
    else:
        final_move = q_value_np.argmax()
    return final_move
下面来看最重要的函数,Simple_DQN负责和环境交互,并训练一个神经网络。这个函数的输入参数比较多,包括交互环境evn,学习率lr,游戏迭代的次数episodes,单次游戏中的最大频数max_step,折现系数gamma,用于策略验证的频率test_policy_freq。

函数首先会拿到环境的两个属性值,分别是状态空间的大小,以及行动空间的大小。根据这两个值来定义一个线性模型model。这个模型model中的权重参数是随机产生的,需要通过强化学习来收敛到正确的参数。同时也定义好模型的损失函数和优化器,并定义一个空列表用于存放结果。

def Simple_DQN(env,lr = 0.001,episodes=100, max_step = 100,gamma=0.9,test_policy_freq=100):
    nS, nA = env.observation_space.n, env.action_space.n
    model = Linear(nS, nA)
    loss_fn = torch.nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    results = []
之后进行多轮游戏循环,在每一次for循环中,会重置环境,开始新一轮游戏,将初始状态转换成需要的格式,然后进入第二层循环,第二层循环内会对一局游戏的每一步进行计算。内层循环中会先根据初始状态来计算q_value,然后基于get_action来选择某个行动方向,再根据得到的action输入到环境中和环境交互,得到新的状态和回报等信息。和上一章Q学习方法思路一致,我们需要用一步之后的Q值来回推计算出目标值。所以此处将next_state再次输入model得到一步之后的Q值,求最大值后折现,再加上reward,计算出目标值。将这个目标值当做标签值,让q_value和目标值一起计算损失,再用torch的自动梯度功能,来调整神经网络的权重参数,让这个损失变小。这几步就是最核心的代码步骤。

    for i in range(episodes): 
        state, _ = env.reset()
        state = conv2tensor(state,nS)
        finished = False
        step = 0
        while not finished :
            q_value = model(state)

            # take action
            action = get_action(q_value,n_game=i)
            next_state, reward, finished, _, _ = env.step(action)
            next_state = conv2tensor(next_state,nS)

            # find target
            target = q_value.clone().detach()
            q_value_next = model(next_state).detach().numpy().squeeze()
            td_target = reward + gamma * q_value_next.max() * (not finished)
            target[action] = td_target

            optimizer.zero_grad()
            td_error = loss_fn(q_value,target)
            td_error.backward()
            optimizer.step()
            state = next_state
            step += 1
            if step >= max_step:
           break
当finished条件满足时,说明一轮游戏已经完成,将reward保存起来,如果游戏经过了一定的轮数,我们会来计算一下过去100轮游戏里面,有多少轮的游戏是带有非零的回报的。从而验证模型的效果。

        if finished:
            results.append(reward)

        if (i>0) and (i % test_policy_freq == 0):
            results_array = np.array(results)
            print("Running episode  {} Reaches goal {:.2f}%. ".format(
                i, 
                results_array[-100:].mean()*100))

    return 
下面是真正的运行这个函数来跑一下看看。
env = gym.make('FrozenLake-v1')
Simple_DQN(env,lr = 0.001,episodes=10000, max_step = 100,gamma=0.9,test_policy_freq=200)

Running episode  200 Reaches goal 0.00%. 
Running episode  400 Reaches goal 1.00%. 
Running episode  600 Reaches goal 5.00%. 
Running episode  800 Reaches goal 2.00%. 
Running episode  1000 Reaches goal 2.00%. 
Running episode  1200 Reaches goal 8.00%. 
Running episode  1400 Reaches goal 7.00%. 
Running episode  1600 Reaches goal 12.00%. 
Running episode  1800 Reaches goal 20.00%. 
Running episode  2000 Reaches goal 38.00%. 
Running episode  2200 Reaches goal 64.00%.

对于这个简单的迷宫环境,这个简单版本的深度强化学习表现的不错。成功率可以达到60%以上。和上一章的Q表方法差不多。我们再尝试一下更复杂的迷宫环境。将大小从4乘4的迷宫换成8乘8的迷宫。

env = gym.make('FrozenLake-v1',map_name="8x8")
Simple_DQN(env,lr = 0.001,episodes=10000, max_step = 100,gamma=0.9,test_policy_freq=200)
结果你会发现,效果并不好,有些时候成功率一直在0位置徘徊。其原因在于训练方式,该神经网络一次只学习了一条数据,随后就扔掉不再使用了,这种方式在强化学习环境中并不稳定。我们可以想一些办法来优化它。

三、DQN的完整结构和优化

下面我们来看看如何优化上面的代码。首先我们将使用类的方式来构造DQN的完整结构。构造一个Linear_QNet类,这个类是继承了pytorch的Module类。我们只需要写两个类方法即可。一个是初始化方法,另一个是前向方法。

class Linear_QNet(nn.Module):
    def __init__(self, input_size, output_size):
        super().__init__()
        self.linear = nn.Linear(input_size,output_size)

    def forward(self, x):
        x = self.linear(x)
        return x
初始化方法中,我们仅仅是定义了linear函数,然后在前向方法中,用linear函数来计算输出值。

之后我们来定义一个QTrainer类,负责模型的训练相关任务。初始化方法中定义了折现系数gamma,学习率lr,输入维度input_dim和输出维度output_dim。同时定义了模型对象model。定义了优化器和损失函数。注意到在初始化这里,我们又定义了另一个模型target_model。它的用处是作为model的一个复制克隆体,具体好处我们会在后续讲解。

class QTrainer:
    def __init__(self, lr, gamma,input_dim, output_dim):
        self.gamma = gamma
        self.model = Linear_QNet(input_dim, output_dim)
        self.target_model = Linear_QNet(input_dim, output_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.criterion = nn.SmoothL1Loss()
        self.copy_model()
target_model是model的克隆体,所以需要将model的所有权重参数完整复制给target_model。让它们的参数一模一样。

    def copy_model(self):
        self.target_model.load_state_dict(self.model.state_dict())
之后是最重要的训练函数train_step,它负责将所有输入信息进行转换,转换成torch的tensor格式,然后进行算法的训练。Q_value是模型对于当前state条件下,某个选择的行动计算的Q值,Q_value_next是下一个state下,最有利行动对应的Q值。注意到这里计算时使用了target_model,其原因在于,原有的方式损失函数的两个元素中都有模型计算的变量,每一步模型都会变化,而target也会有变化,有点类似于小狗追着自己的尾巴,容易出现不稳定的情况。此外用克隆体来计算,会让一定时间内的训练较为稳定。之后计算target则和之前逻辑一致,将回报加上折现后的下一步Q值。大家可以基于之前的代码和流程图进行理解。

    def train_step(self, state, action, reward, next_state, done):
        state = torch.tensor(state, dtype=torch.float)
        next_state = torch.tensor(next_state, dtype=torch.float)
        action = torch.tensor(action, dtype=torch.long)
        action = torch.unsqueeze(action, -1)
        reward = torch.tensor(reward, dtype=torch.float)
        done = torch.tensor(done, dtype=torch.long)

        Q_value = self.model(state).gather(-1, action).squeeze()
        Q_value_next = self.target_model(next_state).detach().max(-1)[0]
        target =  (reward + self.gamma * Q_value_next * (1 - done)).squeeze()

        self.optimizer.zero_grad()
        loss = self.criterion(Q_value,target)
        loss.backward()
        self.optimizer.step()
随后我们再定义一个类Agent,它会定义强化学习中主体的功能。在初始化方法中,定义了几个常见参数,其中max_explore是指我们需要探索多少轮。然后定义了一个队列memory用于保存过去的经验。因为前面简单版本的深度强化学习的一个缺点,就是拿到一个数据就训练,而丢弃了之前的历史经验。这里将过去经验保留下来,就像是我们人类具备了历史记忆一样,让训练更稳定。最后是将前面介绍的训练器实例化,其它参数是显而易见的含义。
class Agent:
    def __init__(self,env,max_explore=1000, gamma = 0.9,
                max_memory=5000, lr=0.001):
        self.max_explore = max_explore 
        self.memory = deque(maxlen=max_memory) 
        self.nS = env.observation_space.n
        self.nA = env.action_space.n
        self.step = 0
        self.n_game=0
        self.trainer = QTrainer(lr, gamma, self.nS, self.nA)
Agent类中编写了remember方法,用于将交互的信息进行保存,train_long_memory则是从经验中进行数据抽样,用trainer中的训练函数进行训练。
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done)) 

    def train_long_memory(self,batch_size):
        if len(self.memory) > batch_size:
            mini_sample = random.sample(self.memory, batch_size) # list of tuples
        else:
            mini_sample = self.memory
        states, actions, rewards, next_states, dones = zip(*mini_sample)
        states = np.array(states)
        next_states = np.array(next_states)
        self.trainer.train_step(states, actions, rewards, next_states, dones)
get_action方法和之前遇到的函数功能一致,用于环境探索和环境利用。
    def get_action(self, state, n_game, explore=True):
        state = torch.tensor(state, dtype=torch.float)
        prediction = self.trainer.model(state).detach().numpy().squeeze()
        epsilon = self.max_explore - n_game
        if explore and random.randint(0, self.max_explore) < epsilon:
            prob = np.exp(prediction)/np.exp(prediction).sum()
            final_move = np.random.choice(len(prob), p=prob)
        else:
            final_move = prediction.argmax()
        return final_move
最后还会在Agent类中加入一个独热编码转换的函数,因为它不涉及到类中的数据,所以把它修饰成一个静态方法。
    @staticmethod
    def one_hot(x,size):
        result = np.zeros(size)
        result[x] = 1
        return result 
最后我们来编写一个函数,调用Agent和环境相关的代码,让他们交互起来。和前面代码的区别在于,这里没有再用两层循环,而是只有一个while循环,每次循环都是某轮游戏中的某一步。每一步中agent和环境交互,产生新的状态,并将这些交互信息记录下来,train_long_memory用于训练。

def train(env, max_game=5000, max_step=100):
    nS = env.observation_space.n
    agent = Agent(env, max_explore=2000, gamma = 0.9,
                max_memory=50000, lr=0.001)
    results = []
    state_new, _ = env.reset()
    state_new = Agent.one_hot(state_new,nS)
    done = False
    total_step = 0
    while agent.n_game <= max_game:
        state_old = state_new
        action = agent.get_action(state_old,agent.n_game,explore=True)
        state_new, reward, done, _, _ = env.step(action)
        state_new = Agent.one_hot(state_new,nS)
        agent.remember(state_old, action, reward, state_new, done)
        agent.train_long_memory(batch_size=256)
        agent.step += 1
        total_step += 1
在while循环中,如果游戏进行了10步,我们就将模型参数拷贝给分身模型。如果某轮游戏步数太多或者游戏结束,将会把回报进行记录,重置游戏环境。最后部分的代码计算最近100轮游戏的成功率。
        if total_step % 10 == 0:
            agent.trainer.copy_model()

        if done or agent.step>max_step:
            results.append(reward>0)
            state_new, _ = env.reset()
            state_new = Agent.one_hot(state_new,nS)
            agent.step = 0
            agent.n_game += 1

            if (agent.n_game>0) and (agent.n_game % 100 ==0):         
                print("Running episode  {}, step {} Reaches goal {:.2f}%. ".format(
                    agent.n_game, total_step,np.sum(results[-100:])))

让我们实际运行上面的函数,跑一下代码。

env = gym.make('FrozenLake-v1',map_name="8x8")
train(env, 5000)
让我们看看效果,可以看到效果不错,成功率达到40%左右。
Running episode  100, step 2953 Reaches goal 0.00%. 
Running episode  200, step 6285 Reaches goal 0.00%. 
Running episode  300, step 9401 Reaches goal 1.00%. 
Running episode  400, step 12646 Reaches goal 1.00%. 
Running episode  500, step 16262 Reaches goal 1.00%. 
Running episode  600, step 19583 Reaches goal 2.00%. 
Running episode  700, step 23532 Reaches goal 2.00%. 
Running episode  800, step 27180 Reaches goal 1.00%. 
Running episode  900, step 30931 Reaches goal 3.00%. 
Running episode  1000, step 34597 Reaches goal 2.00%. 
Running episode  1100, step 38435 Reaches goal 2.00%. 
Running episode  1200, step 42254 Reaches goal 2.00%. 
Running episode  1300, step 46513 Reaches goal 10.00%. 
Running episode  1400, step 50562 Reaches goal 1.00%. 
Running episode  1500, step 55466 Reaches goal 6.00%. 
Running episode  1600, step 59880 Reaches goal 13.00%. 
Running episode  1700, step 64269 Reaches goal 13.00%. 
Running episode  1800, step 68950 Reaches goal 18.00%. 
Running episode  1900, step 74121 Reaches goal 27.00%. 
Running episode  2000, step 78896 Reaches goal 32.00%. 
Running episode  2100, step 83524 Reaches goal 35.00%. 
Running episode  2200, step 88514 Reaches goal 29.00%

本章小结:

本章介绍了深度强化学习算法DQN,DQN的基本思想和第十章的Q-Learning一致,只不过是用神经网络来代替Q表,让神经网络处理状态输入,计算得到动作输出。这样把第八章的神经网络和第十章的强化学习Q-Learning进行了完美的结合。它相对于Q表的好处在于泛化性能更好,可以处理较复杂的环境。

本章使用简单版本的DQN来处理冰湖问题,由于神经网络是依赖pytorch模块的数据格式,所以会有一些格式转换的代码。我们也发现,在很复杂环境中使用DQN,会遇到训练不稳定的情况。所以我们对简单版本的DQN进行升级,介绍了完整版本的DQN训练方法。主要是增加了记忆功能和克隆模型,它们一起增强了针对复杂环境的训练稳定性。

我们将在第十三章使用DQN来玩贪吃蛇游戏。让神经网络来驱动贪吃蛇自己去找到果实。不过在下一章,我们还要再学习一种人工智能算法,也就是遗传算法。