跳转至

第十五章:笨鸟先飞AI编程

在本书第六章,我们学习了编写游戏笨鸟先飞。在本章我们会给这只小鸟安装上AI引擎。我们会使用两种AI算法来让小鸟穿越钢管丛林。一种是使用我们熟悉的DQN,也就是深度强化学习。另一种是使用遗传算法,我们会构造一大群小鸟来进行游戏。看看哪种方法更好玩呢。

一、基于DQN的AI引擎

1.笨鸟先飞游戏代码改造

先复习一下之前第六章的内容,看看人工操作游戏程序的代码结构是怎么样的。

classDiagram

    Game *-- Bird : Composition
    Game *-- Pipe :Composition
    Game *-- Button : Composition

    class Game {
    bird
    pipes
    button
    ...
    }

    class Bird {
    }

    class Pipe {
    }

    class Button {
    }

我们的代码定义了四个类,首先是Bird类,用于处理用户输入和鸟的飞行功能。然后是Pipe类用于处理游戏中管道的生成和更新。之后是Button类,用于游戏结束后接受用户操作。最后是Game类,用于整合游戏玩法机制。在核心play_step函数中构造了游戏主体逻辑,包括控制输入、控制鸟的状态、碰撞检测、绘图等主要功能。

对于游戏环境的改造,和前面两章类似,依然会重点修改如下四项内容:

  • 修改游戏重启机制
  • 提供奖励、状态等游戏信息
  • 增加决策函数,修改控制逻辑
  • 修改核心运行机制

我们看一下代码是如何具体实现的。

第一项修改是修改游戏重启机制。我们不再需要Button类,也不再需要game_restart函数,所以在代码中删除对应内容。此外,在Game类中,编写一个新的函数reset,它负责在游戏重新开始时,将一些游戏数据进行重置。这个函数增加了reward的属性设置,重置flappy的初始位置和failed状态,并重新将管道对象进行重置,重要的是增加了get_pipe_dist函数,用于计算新出现管道的数据信息。这个函数和原有的reset_game函数功能类似。

    def reset(self):
        self.score = 0
        self.reward = 0
        self.flappy.rect.x = 100
        self.flappy.rect.y = self.Win_height//2 + random.randrange(-200,200)
        self.flappy.failed = False
        self.pipe_group.empty()
        self.new_pipes(time=0)
        self.get_pipe_dist()
        pygame.mixer.music.play()

第二项修改是增加信息获取函数,包括奖励、状态等游戏信息。首先考虑正值的reward,也就是当成功的越过一对管道的时候,我们把reward设置为10。这个逻辑我们放在check_pipe_pass函数中,其它内容和之前的一致。

    def check_pipe_pass(self):
        if self.flappy.rect.left >= self.observed['pipe_dist_right']:
            self.score += 1
            self.reward = 10
            self.pipe_group.sprites()[0].passed = True
            self.pipe_group.sprites()[1].passed = True
            self.sounds['point'].play()

另外,考虑到这个游戏的难度比较大,一个随机决策的小鸟很难在开局时候飞过钢管。因此我们构造一个flying_good函数。如果小鸟能在管道所处的高度位置飞行,也可以得到少许的奖励。这些信息是用以辅助小鸟改进其内部的AI引擎能力。

    def flying_good(self):
        if (self.flappy.rect.top >= self.observed['pipe_dist_top'] 
        and self.flappy.rect.bottom <= self.observed['pipe_dist_bottom'] ):
            self.reward = 1

我们还需要提状态数据,这里的状态信息计算三项数值。分别是小鸟的速度,小鸟的飞行高度距离上下两侧钢管的距离。注意这些数值我们都计算的是相对值,因为绝对值会过大,不方便输入神经网络中计算。

    def get_state(self):
        states =np.array([float(self.flappy.vel)/self.flappy.cap, 
                (self.flappy.rect.top-self.observed['pipe_dist_top'])/Pipe.pipe_gap,
                (self.observed['pipe_dist_bottom'] - self.flappy.rect.bottom)/Pipe.pipe_gap],dtype=float)
        return states

第三项修改是增加决策函数,在行动方面,我们不再需要用鼠标点击来控制小鸟飞行,因此Bird类的handle_input函数会改造成handle_action函数。这个函数中如果action为1,则表示向上飞行,此时,我们修改小鸟的速度属性。

    def handle_action(self,action):
        if action == 1:
            self.vel = -1 * self.cap
            self.wing.play()

此外是修改一下类中的update函数,对它进行一些简化,将handle_action函数放到相应的位置上。

    def update(self,action):
        self.vel += 0.5
        if self.vel > 8:
            self.vel = 8
        if not self.touch_ground():
            self.rect.y += int(self.vel)

        if not self.failed:
            self.handle_action(action)
            self.animation()

第四项修改的是play_step函数。这个函数负责游戏运行一个迭代的所有运算任务。bird_group.update用来处理AI决策并更新小鸟的位置,handle_collision用来判断碰撞。

    def play_step(self,action):
        game_over = False
        self.reward = -1
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                sys.exit()

        self.bird_group.update(action)
        self.handle_collision()

如果小鸟碰上管道或地面,failed属性会返回逻辑真值,此时游戏结束,并将奖励值reward设置为-20。如果小鸟还正常飞行,则更新游戏中各元素的状态,并绘制它们。

        if self.flappy.failed:
            game_over = True
            pygame.mixer.music.stop()
            self.reward = -20
            return self.reward, game_over, self.score

        self.pipe_update()
        self.ground_update()
        self.flying_good()
        self.draw()
        self.Clock.tick(60)
        return self.reward, game_over, self.score 

改造后的完整代码参考brick_env.py文件。

2.笨鸟先飞AI引擎的组装

游戏环境已经改造完成。游戏AI引擎方面,我们可以直接复用之前的代码,具体参考flappy_agent.py文件。剩下的工作就是用一个函数将二者组装在一起进行交互。我们编写一个训练函数train,函数会对Game类和Agent类进行实例化。通过get_state函数来获取初始状态。

在主循环中将状态信息输入给Agent得到相应的决策信息final_move,将这个决策再输入给环境得到这个决策运行对应的结果,包括奖励等信息。随后再获得新的游戏状态。

def train():
    plot_scores = []
    plot_mean_scores = []
    record = 0
    total_step = 0
    game = Game()
    agent = Agent(game.nS,game.nA)
    state_new = game.get_state()

    while True:
        state_old = state_new
        final_move = agent.get_action(state_old,agent.n_game)
        reward, done, score = game.play_step(final_move)
        state_new = game.get_state()
        agent.remember(state_old, final_move, reward, state_new, done)
        agent.train_long_memory(batch_size=256)
        total_step += 1

        if total_step % 10 == 0:
            agent.trainer.copy_model()

如果游戏结束,则将游戏进行重启,记录下新的最高分值,显示绘制游戏信息,方便我们观察。

        if done:
            game.reset()
            agent.n_game += 1
            if score > record:
                record = score
                agent.trainer.model.save()
            print('Game', agent.n_game, 'Score', score, 'Record:', record)
            plot_scores.append(score)
            mean_scores = np.mean(plot_scores[-10:])
            plot_mean_scores.append(mean_scores)
            plot(plot_scores, plot_mean_scores)

然后我们来运行游戏,在终端输入如下命令

python flappy_ai.py

二、基于遗传算法的AI引擎

1.整体设计思路

遗传算法是创造一个种群,利用环境选择来让种群进化。应用到笨鸟先飞游戏上,我们将会一次性的构造一大群小鸟,这样以构建出一个种群。每只小鸟的飞行决策仍然是一个神经网络来控制。但是神经网络的参数训练并非是通过强化学习进行的,这些参数可以看作是个体的基因,它们将通过交叉组合、随机变异等遗传算法的技术来更新优化。十只小鸟放飞后,由于先天的参数不一样,有些小鸟可能很快就死亡,有些运气不错飞行距离长一些。所以一种定义适应度的方法就是根据小鸟的飞行距离来算的,飞的越远,适应度越高。但是为了能通过管道,我们还需要对飞行高度加以考虑。因此更好的定义适应度的方法是看小鸟在合适的高度上能飞行多远。

归纳一下:

  • 种群:游戏中生成的十只小鸟构成了种群。
  • 个体:游戏中的每一只小鸟构成了个体的外壳。小鸟中封装了一个神经网络用于飞行控制,其参数可能看作是是其基因。
  • 杂交:在游戏中是将两只小鸟的神经网络参数进行组合。
  • 变异:在游戏中是对个神经网络的参数加入随机噪声。
  • 适应度:在游戏中是对于某个小鸟的神经网络的优秀程度的判断,看它是否能根据状态来控制飞行决策。
  • 父代和子代:在游戏中是游戏重启前和重启后,两组不同的小鸟。

2.神经网络类的改造

我们需要对Linear_Net类进行改造,增加get_weight函数,用于从类中获得神经网络的参数。编写set_weight函数,用于将输入的权重赋值给神经网络。

    def get_weight(self):
        return deepcopy([self.linear1.weight.data,
                self.linear1.bias.data,
                self.linear2.weight.data,
                self.linear2.bias.data])

    def set_weight(self,weights):
        weights = deepcopy(weights)
        self.linear1.weight = nn.Parameter(weights[0])
        self.linear1.bias = nn.Parameter(weights[1])
        self.linear2.weight = nn.Parameter(weights[2])
        self.linear2.bias = nn.Parameter(weights[3])

3.Bird类的改造

下面我们需要改造Bird类,在初始化函数中,大部分代码和之前类似,重要的是增加三个类属性,分别是适应度fitness,分值score,以及模型属性model。

    def __init__(self, x, y):
        super().__init__()
        self.images = []
        self.index = 0
        self.counter = 0
        self.vel = 0
        self.failed = False
        for num in range (1, 4):
            img = pygame.image.load(f"resources/bird{num}.png")
            self.images.append(img)
        self.image = self.images[self.index]
        self.rect = self.image.get_rect()
        self.rect.center = [x, y]
        self.wing = pygame.mixer.Sound('resources\wing.wav')

        self.fitness = 0
        self.score = 0
        self.model = Linear_Net(Bird.input_size, Bird.hidden_size, Bird.output_size)

此外,在Bird类中,我们还要增加三个成员函数。get_action函数用于基于状态信息来输出行动决策。get_state函数用于获取当前小鸟的状态信息。get_fitness函数用于计算适应度。

    def get_action(self, state):
        prediction = self.model(torch.Tensor(state)) 
        prediction = prediction.detach().numpy().squeeze()
        move = prediction.argmax()
        return move

    def get_state(self, observed):
        return np.array([int(self.vel)/Bird.cap, 
                    (self.rect.top - observed['pipe_dist_top'])/Pipe.pipe_gap,
                    (observed['pipe_dist_bottom'] - self.rect.bottom)/Pipe.pipe_gap],
                    dtype=float)

    def get_fitness(self, observed):
        if (self.rect.top - observed['pipe_dist_top']>0 and 
            observed['pipe_dist_bottom'] - self.rect.bottom > 0):
            self.fitness += 1

4.Game类的改造

Game类中,需要增加new_birds函数,因为我们要一次性的放飞一群小鸟,因此会用一个循环来往bird_group中新增bird类。

    def new_birds(self):
        for i in range(self.generation_size):
            bird_height = random.randint(-200, 200)
            bird = Bird(100, int(self.Win_height / 2+bird_height))
            self.bird_group.add(bird)

reset函数也需要修改,增加适应度fitness,用于存放种群的适应度,weights用于存放种群的权重参数,重置管道组和小鸟组。还需要使用循环和set_weight函数,对下一代的每只小鸟,赋予它们新的权重参数。

    def reset(self,next_generation):
        self.score = 0
        self.fitness = []
        self.weights = []
        self.pipe_group.empty()
        self.bird_group.empty()
        self.new_pipes(time=0)
        self.get_pipe_dist()
        self.new_birds()
        for i, bird in enumerate(self.bird_group.sprites()):
            bird.model.set_weight(next_generation[i])
        pygame.mixer.music.play(-1)

增加bird_update函数,对于种群中的每个小鸟,遍历它们的状态,如果没有失败,则获取它们的状态,根据其内部神经网络的预测值进行飞行决策,更新其飞行位置,计算其适应度。如果有哪只小鸟能通过管道,则分值加1。如果失败会将其内部神经网络的参数进行保存,适应度也进行保存,然后删除这只小鸟。

    def birds_update(self):
        for i, bird in enumerate(self.bird_group.sprites()):
            if not bird.failed:
                self.score += bird.score
                state = bird.get_state(self.observed)
                action = bird.get_action(state)
                bird.update(action)
                bird.get_fitness(self.observed)
                if bird.rect.left >= self.observed['pipe_dist_right']:
                    bird.score += 1
                    self.pipe_group.sprites()[0].passed = True
                    self.pipe_group.sprites()[1].passed = True
                    self.sounds['point'].play()
                if bird.score > 50:
                    bird.failed = True

            if bird.failed:
                self.weights.append(bird.model.get_weight())
                self.fitness.append(bird.fitness)
                bird.kill()

核心游戏函数play_step没有很大变化,只是将上面新增的函数补充进来。

    def play_step(self):
        game_over = False
        self.score = 0

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                sys.exit()

        self.birds_update()
        self.handle_collision()

        if len(self.bird_group) == 0 or self.score>50:
            game_over = True
            return game_over, self.score

        self.pipe_update()
        self.ground_update()
        self.draw()
        self.Clock.tick(60)
        return game_over, self.score 

5.遗传算法函数编写

为了用遗传算法来玩这个游戏,我们编写一个GATrainer类。它负责控制整体算法的训练任务。其初始化函数中对Game类进行了实例化,并设置了三个超参数。迭代的次数generate_num,种群变异比例mutate_pop_rate,神经网络参数变异比例mutate_net_rate。

class GATrainer:
    def __init__(self):
        self.game = Game()
        self.generate_num = 0
        self.mutate_pop_rate = 0.2
        self.mutate_net_rate = 0.1

GATrainer类中需要编写几个辅助函数。fitness_prob函数用于基于适应度来计算选择概率。list2tensor函数和tensor2list函数则用于list格式和tensor格式之间的转换

    @staticmethod
    def fitness_prob(fitness):
        fitness = np.array(fitness)
        return fitness/np.sum(fitness)

    @staticmethod
    def list2tensor(weights):
        return torch.concat([weights[0].flatten(),weights[1],
                                weights[2].flatten(),weights[3]])

    @staticmethod
    def tensor2list(weights):
        output_weights = []
        index  = [Bird.input_size*Bird.hidden_size, 
                Bird.input_size*Bird.hidden_size+Bird.hidden_size,
                Bird.input_size*Bird.hidden_size+Bird.hidden_size+Bird.hidden_size*Bird.output_size]
        output_weights.append(weights[:index[0]].reshape(Bird.hidden_size,Bird.input_size))
        output_weights.append(weights[index[0]:index[1]])
        output_weights.append(weights[index[1]:index[2]].reshape(Bird.output_size,Bird.hidden_size))
        output_weights.append(weights[index[2]:])

然后是交叉和变异函数,函数输入两个个体的参数,对其进行交叉组合,然后进行随机噪声的添加。

    def cross_mutate(self,weights_1, weights_2):
        weights_1 = GATrainer.list2tensor(weights_1)
        weights_2 = GATrainer.list2tensor(weights_2)
        crossover_idx = random.randint(0, Game.parameter_len-1)
        new_weights = torch.concat([weights_1[:crossover_idx] , weights_2[crossover_idx:]])
        if (random.randint(0,self.game.generation_size-1) <= 
            self.game.generation_size*self.mutate_pop_rate):
            mutate_num = int(self.mutate_net_rate*Game.parameter_len)
            for _ in range(mutate_num):
                i = random.randint(0,Game.parameter_len-1)
                new_weights[i] += torch.randn(1).numpy()
        output_weights = GATrainer.tensor2list(new_weights)
        return  output_weights

然后是编写reproduce函数,用于生成下一代的种群。核心逻辑并不复杂,先根据适应度来选择两个最优秀的个体,将其直接存入下一代中,这种保送的思路称为精英策略。然后再选择其他位置的人选,根据适应度来随机选择两个不错的个体,将其基因进行交叉变异,也就是组合两个神经网络的参数。

    def reproduce(self):
        next_generation = []
        prob = GATrainer.fitness_prob(self.game.fitness) 
        second_index, first_index= list(np.argsort(prob)[-2:])
        next_generation.append(self.game.weights[first_index])
        next_generation.append(self.game.weights[second_index])
        for _ in range(self.game.generation_size - 2):
            p1, p2 = np.random.choice(len(prob),size=2, replace=False,p=prob) 
            next_generation.append(self.cross_mutate(self.game.weights[p1],self.game.weights[p2]))
        return next_generation

最后我们编写函数,将所有的逻辑组装起来。

    def run_GA(self):
        while True:
            game_over, score  = self.game.play_step()
            if game_over :
                print(f"generate {self.generate_num} average fitness: {sum(self.game.fitness)/10}")
                next_generation = self.reproduce()
                self.game.reset(next_generation)
                self.generate_num += 1

整体代码可以参见flappy_ga.py文件。然后我们来运行游戏,在终端输入如下命令

python flappy_ga.py

6.算法效果

下图是运行代码的游戏画面。我们一次性生成了一个小鸟种群。一开始的时候,很多小鸟直接撞上障碍失败了。有几只小鸟恰好飞行在钢管合适的高度上,获得了更高的适应度。这些相对优秀一点的小鸟通过杂交变异了产生了下一代。不到几次迭代,你会发现就产生了新的下一代能飞越几个钢管了。随后就会有越来越多的鸟能飞过更多的钢管。

ch15-01

本章小结:

本章使用了两种人工智能算法来驱动小鸟穿越钢管丛林。DQN的思路和之前两章类似,小鸟基于输入的状态信息和回报,来不断调整神经网络的参数。经过多轮游戏才能成功通关。遗传算法的思路则是基于种群的自然选择,种群中不同个体中包含的神经网络参数,就是代表了人体的基因。通过基因的交叉、变异和选择这三种操作。让种群中适应度较高的个体能相互取长补短,使后代能更好的适应这个钢管丛林的环境。