第十五章:笨鸟先飞AI编程
在本书第六章,我们学习了编写游戏笨鸟先飞。在本章我们会给这只小鸟安装上AI引擎。我们会使用两种AI算法来让小鸟穿越钢管丛林。一种是使用我们熟悉的DQN,也就是深度强化学习。另一种是使用遗传算法,我们会构造一大群小鸟来进行游戏。看看哪种方法更好玩呢。
一、基于DQN的AI引擎
1.笨鸟先飞游戏代码改造
先复习一下之前第六章的内容,看看人工操作游戏程序的代码结构是怎么样的。
classDiagram
Game *-- Bird : Composition
Game *-- Pipe :Composition
Game *-- Button : Composition
class Game {
bird
pipes
button
...
}
class Bird {
}
class Pipe {
}
class Button {
}
我们的代码定义了四个类,首先是Bird类,用于处理用户输入和鸟的飞行功能。然后是Pipe类用于处理游戏中管道的生成和更新。之后是Button类,用于游戏结束后接受用户操作。最后是Game类,用于整合游戏玩法机制。在核心play_step函数中构造了游戏主体逻辑,包括控制输入、控制鸟的状态、碰撞检测、绘图等主要功能。
对于游戏环境的改造,和前面两章类似,依然会重点修改如下四项内容:
- 修改游戏重启机制
- 提供奖励、状态等游戏信息
- 增加决策函数,修改控制逻辑
- 修改核心运行机制
我们看一下代码是如何具体实现的。
第一项修改是修改游戏重启机制。我们不再需要Button类,也不再需要game_restart函数,所以在代码中删除对应内容。此外,在Game类中,编写一个新的函数reset,它负责在游戏重新开始时,将一些游戏数据进行重置。这个函数增加了reward的属性设置,重置flappy的初始位置和failed状态,并重新将管道对象进行重置,重要的是增加了get_pipe_dist函数,用于计算新出现管道的数据信息。这个函数和原有的reset_game函数功能类似。
def reset(self):
self.score = 0
self.reward = 0
self.flappy.rect.x = 100
self.flappy.rect.y = self.Win_height//2 + random.randrange(-200,200)
self.flappy.failed = False
self.pipe_group.empty()
self.new_pipes(time=0)
self.get_pipe_dist()
pygame.mixer.music.play()
第二项修改是增加信息获取函数,包括奖励、状态等游戏信息。首先考虑正值的reward,也就是当成功的越过一对管道的时候,我们把reward设置为10。这个逻辑我们放在check_pipe_pass函数中,其它内容和之前的一致。
def check_pipe_pass(self):
if self.flappy.rect.left >= self.observed['pipe_dist_right']:
self.score += 1
self.reward = 10
self.pipe_group.sprites()[0].passed = True
self.pipe_group.sprites()[1].passed = True
self.sounds['point'].play()
另外,考虑到这个游戏的难度比较大,一个随机决策的小鸟很难在开局时候飞过钢管。因此我们构造一个flying_good函数。如果小鸟能在管道所处的高度位置飞行,也可以得到少许的奖励。这些信息是用以辅助小鸟改进其内部的AI引擎能力。
def flying_good(self):
if (self.flappy.rect.top >= self.observed['pipe_dist_top']
and self.flappy.rect.bottom <= self.observed['pipe_dist_bottom'] ):
self.reward = 1
我们还需要提状态数据,这里的状态信息计算三项数值。分别是小鸟的速度,小鸟的飞行高度距离上下两侧钢管的距离。注意这些数值我们都计算的是相对值,因为绝对值会过大,不方便输入神经网络中计算。
def get_state(self):
states =np.array([float(self.flappy.vel)/self.flappy.cap,
(self.flappy.rect.top-self.observed['pipe_dist_top'])/Pipe.pipe_gap,
(self.observed['pipe_dist_bottom'] - self.flappy.rect.bottom)/Pipe.pipe_gap],dtype=float)
return states
第三项修改是增加决策函数,在行动方面,我们不再需要用鼠标点击来控制小鸟飞行,因此Bird类的handle_input函数会改造成handle_action函数。这个函数中如果action为1,则表示向上飞行,此时,我们修改小鸟的速度属性。
此外是修改一下类中的update函数,对它进行一些简化,将handle_action函数放到相应的位置上。
def update(self,action):
self.vel += 0.5
if self.vel > 8:
self.vel = 8
if not self.touch_ground():
self.rect.y += int(self.vel)
if not self.failed:
self.handle_action(action)
self.animation()
第四项修改的是play_step函数。这个函数负责游戏运行一个迭代的所有运算任务。bird_group.update用来处理AI决策并更新小鸟的位置,handle_collision用来判断碰撞。
def play_step(self,action):
game_over = False
self.reward = -1
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit()
self.bird_group.update(action)
self.handle_collision()
如果小鸟碰上管道或地面,failed属性会返回逻辑真值,此时游戏结束,并将奖励值reward设置为-20。如果小鸟还正常飞行,则更新游戏中各元素的状态,并绘制它们。
if self.flappy.failed:
game_over = True
pygame.mixer.music.stop()
self.reward = -20
return self.reward, game_over, self.score
self.pipe_update()
self.ground_update()
self.flying_good()
self.draw()
self.Clock.tick(60)
return self.reward, game_over, self.score
改造后的完整代码参考brick_env.py文件。
2.笨鸟先飞AI引擎的组装
游戏环境已经改造完成。游戏AI引擎方面,我们可以直接复用之前的代码,具体参考flappy_agent.py文件。剩下的工作就是用一个函数将二者组装在一起进行交互。我们编写一个训练函数train,函数会对Game类和Agent类进行实例化。通过get_state函数来获取初始状态。
在主循环中将状态信息输入给Agent得到相应的决策信息final_move,将这个决策再输入给环境得到这个决策运行对应的结果,包括奖励等信息。随后再获得新的游戏状态。
def train():
plot_scores = []
plot_mean_scores = []
record = 0
total_step = 0
game = Game()
agent = Agent(game.nS,game.nA)
state_new = game.get_state()
while True:
state_old = state_new
final_move = agent.get_action(state_old,agent.n_game)
reward, done, score = game.play_step(final_move)
state_new = game.get_state()
agent.remember(state_old, final_move, reward, state_new, done)
agent.train_long_memory(batch_size=256)
total_step += 1
if total_step % 10 == 0:
agent.trainer.copy_model()
如果游戏结束,则将游戏进行重启,记录下新的最高分值,显示绘制游戏信息,方便我们观察。
if done:
game.reset()
agent.n_game += 1
if score > record:
record = score
agent.trainer.model.save()
print('Game', agent.n_game, 'Score', score, 'Record:', record)
plot_scores.append(score)
mean_scores = np.mean(plot_scores[-10:])
plot_mean_scores.append(mean_scores)
plot(plot_scores, plot_mean_scores)
然后我们来运行游戏,在终端输入如下命令
二、基于遗传算法的AI引擎
1.整体设计思路
遗传算法是创造一个种群,利用环境选择来让种群进化。应用到笨鸟先飞游戏上,我们将会一次性的构造一大群小鸟,这样以构建出一个种群。每只小鸟的飞行决策仍然是一个神经网络来控制。但是神经网络的参数训练并非是通过强化学习进行的,这些参数可以看作是个体的基因,它们将通过交叉组合、随机变异等遗传算法的技术来更新优化。十只小鸟放飞后,由于先天的参数不一样,有些小鸟可能很快就死亡,有些运气不错飞行距离长一些。所以一种定义适应度的方法就是根据小鸟的飞行距离来算的,飞的越远,适应度越高。但是为了能通过管道,我们还需要对飞行高度加以考虑。因此更好的定义适应度的方法是看小鸟在合适的高度上能飞行多远。
归纳一下:
- 种群:游戏中生成的十只小鸟构成了种群。
- 个体:游戏中的每一只小鸟构成了个体的外壳。小鸟中封装了一个神经网络用于飞行控制,其参数可能看作是是其基因。
- 杂交:在游戏中是将两只小鸟的神经网络参数进行组合。
- 变异:在游戏中是对个神经网络的参数加入随机噪声。
- 适应度:在游戏中是对于某个小鸟的神经网络的优秀程度的判断,看它是否能根据状态来控制飞行决策。
- 父代和子代:在游戏中是游戏重启前和重启后,两组不同的小鸟。
2.神经网络类的改造
我们需要对Linear_Net类进行改造,增加get_weight函数,用于从类中获得神经网络的参数。编写set_weight函数,用于将输入的权重赋值给神经网络。
def get_weight(self):
return deepcopy([self.linear1.weight.data,
self.linear1.bias.data,
self.linear2.weight.data,
self.linear2.bias.data])
def set_weight(self,weights):
weights = deepcopy(weights)
self.linear1.weight = nn.Parameter(weights[0])
self.linear1.bias = nn.Parameter(weights[1])
self.linear2.weight = nn.Parameter(weights[2])
self.linear2.bias = nn.Parameter(weights[3])
3.Bird类的改造
下面我们需要改造Bird类,在初始化函数中,大部分代码和之前类似,重要的是增加三个类属性,分别是适应度fitness,分值score,以及模型属性model。
def __init__(self, x, y):
super().__init__()
self.images = []
self.index = 0
self.counter = 0
self.vel = 0
self.failed = False
for num in range (1, 4):
img = pygame.image.load(f"resources/bird{num}.png")
self.images.append(img)
self.image = self.images[self.index]
self.rect = self.image.get_rect()
self.rect.center = [x, y]
self.wing = pygame.mixer.Sound('resources\wing.wav')
self.fitness = 0
self.score = 0
self.model = Linear_Net(Bird.input_size, Bird.hidden_size, Bird.output_size)
此外,在Bird类中,我们还要增加三个成员函数。get_action函数用于基于状态信息来输出行动决策。get_state函数用于获取当前小鸟的状态信息。get_fitness函数用于计算适应度。
def get_action(self, state):
prediction = self.model(torch.Tensor(state))
prediction = prediction.detach().numpy().squeeze()
move = prediction.argmax()
return move
def get_state(self, observed):
return np.array([int(self.vel)/Bird.cap,
(self.rect.top - observed['pipe_dist_top'])/Pipe.pipe_gap,
(observed['pipe_dist_bottom'] - self.rect.bottom)/Pipe.pipe_gap],
dtype=float)
def get_fitness(self, observed):
if (self.rect.top - observed['pipe_dist_top']>0 and
observed['pipe_dist_bottom'] - self.rect.bottom > 0):
self.fitness += 1
4.Game类的改造
Game类中,需要增加new_birds函数,因为我们要一次性的放飞一群小鸟,因此会用一个循环来往bird_group中新增bird类。
def new_birds(self):
for i in range(self.generation_size):
bird_height = random.randint(-200, 200)
bird = Bird(100, int(self.Win_height / 2+bird_height))
self.bird_group.add(bird)
reset函数也需要修改,增加适应度fitness,用于存放种群的适应度,weights用于存放种群的权重参数,重置管道组和小鸟组。还需要使用循环和set_weight函数,对下一代的每只小鸟,赋予它们新的权重参数。
def reset(self,next_generation):
self.score = 0
self.fitness = []
self.weights = []
self.pipe_group.empty()
self.bird_group.empty()
self.new_pipes(time=0)
self.get_pipe_dist()
self.new_birds()
for i, bird in enumerate(self.bird_group.sprites()):
bird.model.set_weight(next_generation[i])
pygame.mixer.music.play(-1)
增加bird_update函数,对于种群中的每个小鸟,遍历它们的状态,如果没有失败,则获取它们的状态,根据其内部神经网络的预测值进行飞行决策,更新其飞行位置,计算其适应度。如果有哪只小鸟能通过管道,则分值加1。如果失败会将其内部神经网络的参数进行保存,适应度也进行保存,然后删除这只小鸟。
def birds_update(self):
for i, bird in enumerate(self.bird_group.sprites()):
if not bird.failed:
self.score += bird.score
state = bird.get_state(self.observed)
action = bird.get_action(state)
bird.update(action)
bird.get_fitness(self.observed)
if bird.rect.left >= self.observed['pipe_dist_right']:
bird.score += 1
self.pipe_group.sprites()[0].passed = True
self.pipe_group.sprites()[1].passed = True
self.sounds['point'].play()
if bird.score > 50:
bird.failed = True
if bird.failed:
self.weights.append(bird.model.get_weight())
self.fitness.append(bird.fitness)
bird.kill()
核心游戏函数play_step没有很大变化,只是将上面新增的函数补充进来。
def play_step(self):
game_over = False
self.score = 0
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit()
self.birds_update()
self.handle_collision()
if len(self.bird_group) == 0 or self.score>50:
game_over = True
return game_over, self.score
self.pipe_update()
self.ground_update()
self.draw()
self.Clock.tick(60)
return game_over, self.score
5.遗传算法函数编写
为了用遗传算法来玩这个游戏,我们编写一个GATrainer类。它负责控制整体算法的训练任务。其初始化函数中对Game类进行了实例化,并设置了三个超参数。迭代的次数generate_num,种群变异比例mutate_pop_rate,神经网络参数变异比例mutate_net_rate。
class GATrainer:
def __init__(self):
self.game = Game()
self.generate_num = 0
self.mutate_pop_rate = 0.2
self.mutate_net_rate = 0.1
GATrainer类中需要编写几个辅助函数。fitness_prob函数用于基于适应度来计算选择概率。list2tensor函数和tensor2list函数则用于list格式和tensor格式之间的转换
@staticmethod
def fitness_prob(fitness):
fitness = np.array(fitness)
return fitness/np.sum(fitness)
@staticmethod
def list2tensor(weights):
return torch.concat([weights[0].flatten(),weights[1],
weights[2].flatten(),weights[3]])
@staticmethod
def tensor2list(weights):
output_weights = []
index = [Bird.input_size*Bird.hidden_size,
Bird.input_size*Bird.hidden_size+Bird.hidden_size,
Bird.input_size*Bird.hidden_size+Bird.hidden_size+Bird.hidden_size*Bird.output_size]
output_weights.append(weights[:index[0]].reshape(Bird.hidden_size,Bird.input_size))
output_weights.append(weights[index[0]:index[1]])
output_weights.append(weights[index[1]:index[2]].reshape(Bird.output_size,Bird.hidden_size))
output_weights.append(weights[index[2]:])
然后是交叉和变异函数,函数输入两个个体的参数,对其进行交叉组合,然后进行随机噪声的添加。
def cross_mutate(self,weights_1, weights_2):
weights_1 = GATrainer.list2tensor(weights_1)
weights_2 = GATrainer.list2tensor(weights_2)
crossover_idx = random.randint(0, Game.parameter_len-1)
new_weights = torch.concat([weights_1[:crossover_idx] , weights_2[crossover_idx:]])
if (random.randint(0,self.game.generation_size-1) <=
self.game.generation_size*self.mutate_pop_rate):
mutate_num = int(self.mutate_net_rate*Game.parameter_len)
for _ in range(mutate_num):
i = random.randint(0,Game.parameter_len-1)
new_weights[i] += torch.randn(1).numpy()
output_weights = GATrainer.tensor2list(new_weights)
return output_weights
然后是编写reproduce函数,用于生成下一代的种群。核心逻辑并不复杂,先根据适应度来选择两个最优秀的个体,将其直接存入下一代中,这种保送的思路称为精英策略。然后再选择其他位置的人选,根据适应度来随机选择两个不错的个体,将其基因进行交叉变异,也就是组合两个神经网络的参数。
def reproduce(self):
next_generation = []
prob = GATrainer.fitness_prob(self.game.fitness)
second_index, first_index= list(np.argsort(prob)[-2:])
next_generation.append(self.game.weights[first_index])
next_generation.append(self.game.weights[second_index])
for _ in range(self.game.generation_size - 2):
p1, p2 = np.random.choice(len(prob),size=2, replace=False,p=prob)
next_generation.append(self.cross_mutate(self.game.weights[p1],self.game.weights[p2]))
return next_generation
最后我们编写函数,将所有的逻辑组装起来。
def run_GA(self):
while True:
game_over, score = self.game.play_step()
if game_over :
print(f"generate {self.generate_num} average fitness: {sum(self.game.fitness)/10}")
next_generation = self.reproduce()
self.game.reset(next_generation)
self.generate_num += 1
整体代码可以参见flappy_ga.py文件。然后我们来运行游戏,在终端输入如下命令
6.算法效果
下图是运行代码的游戏画面。我们一次性生成了一个小鸟种群。一开始的时候,很多小鸟直接撞上障碍失败了。有几只小鸟恰好飞行在钢管合适的高度上,获得了更高的适应度。这些相对优秀一点的小鸟通过杂交变异了产生了下一代。不到几次迭代,你会发现就产生了新的下一代能飞越几个钢管了。随后就会有越来越多的鸟能飞过更多的钢管。

本章小结:
本章使用了两种人工智能算法来驱动小鸟穿越钢管丛林。DQN的思路和之前两章类似,小鸟基于输入的状态信息和回报,来不断调整神经网络的参数。经过多轮游戏才能成功通关。遗传算法的思路则是基于种群的自然选择,种群中不同个体中包含的神经网络参数,就是代表了人体的基因。通过基因的交叉、变异和选择这三种操作。让种群中适应度较高的个体能相互取长补短,使后代能更好的适应这个钢管丛林的环境。