第十三章:贪吃蛇AI编程
在本书第四章,我们学习了使用pygame来编写贪吃蛇的游戏。在本书第十一章,我们学习了深度强化学习算法DQN,它的核心思想是通过评估不同行动状态的价值,来选择行动方向。本章的目标就是把这两章的知识内容结合起来,用贪吃蛇游戏做为AI环境,用DQN来作为AI引擎,去驱动贪吃蛇自动找果实吃。听起来是不是很好玩。让我们开始吧
一、整体设计思路
在我们动手之前,我们需要想想应该怎么设计我们的代码?我们先想一想人工操作的贪吃蛇是怎样的一个交互逻辑。选择行动方向的实际上是人类的大脑,通过显示器上的游戏图像获取游戏信息,吃到果实后得到正的收益,碰到边界游戏结束后,我们得到负的收益。玩家的大脑驱动手指控制键盘,进而控制方向。下图显示了玩家和游戏之间的交互方式。
flowchart LR
大脑--键盘控制-->贪吃蛇游戏
贪吃蛇游戏--奖励-->大脑
贪吃蛇游戏--图像显示-->大脑
subgraph 人类玩家
大脑
end
AI驱动的游戏则有些不一样。首先,贪吃蛇游戏环境要做修改,之前的操作是由键盘输入的,要改成由算法来决策,也就是输入部分要修改。还需要把游戏的一些信息保存传递给算法。这个修改后的贪吃蛇环境我们称之为Snake_Env。其次,要增加AI引擎部分,需要设计算法DQN_Model,它负责处理信息输出决策,需要设计一个记忆模块,它负责接收游戏信息并保存起来,这个AI引擎称之为Snake_Agent。最后,我们还需要一个函数来负责Snake_Agent和Snake_Env之间的交互逻辑,让所有的东西跑起来。下面就让我们开始吧。
flowchart LR
Snake_Env--Reward-->DQN_Model
Snake_Env--State-->DQN_Model
subgraph Snake_Agent
Memory<-->DQN_Model
end
DQN_Model--Action-->Snake_Env
二、贪吃蛇环境改造
环境改造思路
我们先回顾一下之前人工操作游戏程序的代码结构是怎么样的。
classDiagram
Game *-- Snake : Composition
Game *-- Berry :Composition
Game *-- Wall : Composition
class Game {
snake
berry
wall
...
}
class Snake {
}
class Berry {
}
class Wall {
}
Snake类中,需要加载贪吃蛇图片资源文件,定义贪吃蛇身体,行动方向等属性。还需定义蛇的移动和绘图等函数。Berry类中,同样加载资源文件,定义位置属性,定义绘图函数。Wall类中,加载地图文件和图片资源,定义绘图函数。在Game类中,需要场景初始化。定义窗口,初始化上述三种对象。定义分值、时钟等属性。在方法中需要定义碰撞检测、绘图和游戏主循环函数。
对人工操作的游戏,我们需要修改四个主要的方面。
- 修改游戏退出和重启机制
- 提供游戏信息
- 修改决策和移动逻辑
- 修改核心运行机制,改造成单步运行
第一项是修改退出机制。当人工操作的游戏结束后,会退出游戏回到操作系统,但是在AI算法训练中,我们希望能持续不断的让AI来玩游戏,所以要对游戏退出机制进行修改,当AI玩游戏失败时,能马上重启新一轮游戏。
第二项是要给AI提供游戏信息。当人工操作游戏时,我们只需要用肉眼来观察显示器上的图片就可以了。但是在AI算法训练时,我们需要设计一些有用的信息发送传给AI引擎。这些信息在强化学习中称之为状态信息(state)。除了状态以外,还需要传送奖励信息和游戏是否结束的信息。因此第二项修改是需要新增提供游戏信息的代码部分。
第三项修改是要修改移动逻辑。人工操作游戏是通过键盘来输入操作的,但在AI算法控制游戏时,需要设计一个接口,AI算法的选择我们称之为动作(action),我们要把检测输入逻辑进行修改,之前贪吃蛇的移动依赖键盘,之后要修改成依赖AI的动作。那么第三项修改是修改决策和移动逻辑。
第四项修改是改造play函数。为了和AI引擎配合,需要将play函数改造成单步运行。因为游戏每运行一帧后,都需要给AI引擎提供信息,让它计算后续应该如何移动。所以play函数需要改造成只考虑运行一帧的情况。
编写改造代码
编写新的运行函数play_step来代替原有的play函数,此处体现了第一项和第四项的改造逻辑。在play_step函数中,当贪吃蛇发生碰撞时,不再退出程序,而是将game_over的变量设置为True,同时将reward设置为-10。这样游戏不会退出,游戏窗口仍然保持,节约AI训练的时间。而且play_step函数中不再有while循环,该函数只考虑运行一步游戏的所有任务。
def play_step(self, action):
game_over = False
self.reward = 0
for event in pygame.event.get():
if event.type == QUIT:
pygame.quit()
sys.exit()
self.total_step += 1
self.frame = (self.frame + 1) % 2
self.snake.handle_action(action)
self.berry_collision()
if (self.head_hit_wall() or
self.head_hit_body() or
self.total_step > 100*len(self.snake.blocks)):
game_over = True
self.reward = -10
return self.reward, game_over, self.score
self.draw()
self.Clock.tick(60)
return self.reward, game_over, self.score
此外,我们编写一个新的函数reset,它负责重启游戏。代码中重新实例化了一个新的Snake类,重新放置了果实,将一些游戏数据进行重置归零。
def reset(self):
self.score = 0
self.frame = 0
self.snake = Snake()
self.current_direction = Direction.right
self.positionBerry()
self.frame_iteration = 0
self.reward = 0
第二项改造是要增加提供信息的功能,我们需要提供三种信息给AI进行计算。
- 奖励数据 reward
- 状态数据 state
- 本轮游戏是否结束 game_over
在前面修改游戏退出逻辑时,我们已经提供了负值的reward和game_over,还需要考虑正值的reward,也就是当成功的吃掉一个果子的时候,我们把reward设置为10。在函数berry_collision中增加相应的代码部分。
def berry_collision(self):
head = self.snake.blocks[0]
if (head.x == self.berry.position.x and
head.y == self.berry.position.y):
self.position_berry()
self.score += 1
self.reward = 10
else:
self.snake.blocks.pop()
我们还需要提状态数据,为此需要修改碰撞函数,将headHitBody的参数增加一个position。这样增加了函数计算的灵活性,如果position没有参数输入的话,会默认计算贪吃蛇的头部和身体碰撞的情况,如果输入了position参数,它会计算任一个坐标和身体碰撞的情况。这样做的好处我们一会来讲。
def head_hit_body(self,position=None):
if position is None:
position = self.snake.blocks[0]
if position in self.snake.blocks[1:]:
return True
return False
def get_state(self):
head = self.snake.blocks[0]
point_l = Position(head.x - 1, head.y)
point_r = Position(head.x + 1, head.y)
point_u = Position(head.x, head.y - 1)
point_d = Position(head.x, head.y + 1)
danger_1b_r = self.head_hit_body(point_r),
danger_1b_l = self.head_hit_body(point_l),
danger_1b_u = self.head_hit_body(point_u),
danger_1b_d = self.head_hit_body(point_d),
danger_1w_r = self.head_hit_wall(point_r),
danger_1w_l = self.head_hit_wall(point_l),
danger_1w_u = self.head_hit_wall(point_u),
danger_1w_d = self.head_hit_wall(point_d),
points_l = [Position(i, head.y) for i in range(1,head.x)]
points_r = [Position(i, head.y) for i in range(head.x+1,self.Space_width)]
points_u = [Position(head.x , i) for i in range(1,head.y)]
points_d = [Position(head.x , i) for i in range(head.y+1,self.Space_height)]
danger_b_l = np.any(np.array([self.head_hit_body(point) for point in points_l]))
danger_b_r = np.any(np.array([self.head_hit_body(point) for point in points_r]))
danger_b_u = np.any(np.array([self.head_hit_body(point) for point in points_u]))
danger_b_d = np.any(np.array([self.head_hit_body(point) for point in points_d]))
dir_l = self.snake.current_direction == Direction.left
dir_r = self.snake.current_direction == Direction.right
dir_u = self.snake.current_direction == Direction.up
dir_d = self.snake.current_direction == Direction.down
计算果实相对于蛇头部坐标的位置逻辑值。
berry_l = self.berry.position.x < head.x
berry_r = self.berry.position.x > head.x
berry_u = self.berry.position.y < head.y
berry_d = self.berry.position.y > head.y
state = [
danger_1b_r,danger_1b_l,danger_1b_u,danger_1b_d,
danger_1w_r,danger_1w_l,danger_1w_u,danger_1w_d,
danger_b_l,danger_b_r,danger_b_u,danger_b_d,
dir_l,dir_r,dir_u,dir_d,
berry_l, berry_r,berry_u,berry_d
]
return np.array(state, dtype=int)
第三项修改是要修改决策和移动逻辑。我们不再需要handle_input函数,取而代之的是增加action作为游戏的输出,所以我们新建一个handle_action函数。action的取值并不是上下左右,因为贪吃蛇有一个身体在后面,所以当它往左走的时候,action的取值绝对不能向右,所以它的行动决策应该是三个方向选项,继续直行,向左,向右,表示和当前方向对比的相对方向。用一个0到2之间的数字来表示。如果action取值为0,表示方向不变向前走,如果取值1表示方向向右。
需要注意是的,action的三个相对方向和游戏四个绝对方向之间,需要做一些转换。这里使用了一个closck_wise列表,表示方向的顺时钟的顺序,想像一下绝对方向右,指向3点钟,方向下指向6点,方向左指向9点,方向上指向12点。当输入的action是方向不变时,我们也让原来的绝对方向不变。当action表示向右转弯时,也就是顺时钟方向转90度,也就是索引值idx加一。反之则是逆时钟方向转90度,也就是索引值idx减一。
def handle_action(self,action):
# action is one of [straight, right, left]
clock_wise = [Direction.right,Direction.down,Direction.left,Direction.up]
idx = clock_wise.index(self.current_direction)
if action == 0:
new_dir = clock_wise[idx] # no change
elif action == 1:
next_idx = (idx + 1) % 4
new_dir = clock_wise[next_idx] # right turn r -> d -> l -> u
elif action == 2:
next_idx = (idx - 1) % 4
new_dir = clock_wise[next_idx] # left turn r -> u -> l -> d
self.current_direction = new_dir
self.move()
三、AI引擎的设计和编写
DQN回顾
我们先回顾一下第十一章学习过的DQN算法。DQN的核心组件是神经网络,神经网络中的结构是事先给定的,网络参数是随机赋初始值。它需要外界给一个输入,计算到预测值,再将预测值和目标值之间计算损失。最后使用最优化方法,来调整网络参数,使得损失最小。
graph LR
subgraph 数据
输入
目标
end
subgraph 模型
网络结构
网络参数
end
输入-->模型-->预测
目标-->损失计算
预测-->损失计算
损失计算--参数优化-->模型
再回顾一下神经网络结合强化学习后的整体训练流程。神经网络会接受当前状态和下一状态两种信息输入,计算对应的两种价值,即当前状态行动价值和下一状态行动价值的最大值。并据此选择行动,和环境交互产生收益。计算出目标和损失,再用梯度下降来更新修正神经网络。
graph TD
当前状态-->神经网络-->当前状态行动价值
神经网络--选择行动-->环境-->产生收益
环境-->下一状态-->神经网络-->下一状态行动价值MAX
当前状态行动价值-->计算Loss
target-->计算Loss
下一状态行动价值MAX--> target
产生收益--> target
计算Loss--更新-->神经网络
对于贪吃蛇的AI引擎,我们可以直接借鉴第十一章的设计,将其各个类直接拿过来使用。只是需要注意其输入输出的数据,需要和贪吃蛇的游戏环境相匹配。
编写AI引擎
AI引擎的代码和第十一章介绍的类似。首先编写神经网络类。它是一个全连接的三层神经网络,包括了输入层,隐层和输出层。这里的输入层就是输入的状态信息。为了保持灵活性,我们使用三个变量来分别表示三层的元素个数,即input_size, hidden_size, output_size。在类中我们增加了save方法,可以将模型保存成本地文件。
class Linear_QNet(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.linear1 = nn.Linear(input_size, hidden_size)
self.linear2 = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = F.relu(self.linear1(x))
x = self.linear2(x)
return x
def save(self, file_name='model.pth'):
model_folder_path = './model'
if not os.path.exists(model_folder_path):
os.makedirs(model_folder_path)
file_name = os.path.join(model_folder_path, file_name)
torch.save(self.state_dict(), file_name)
训练模块会负责所有的训练逻辑,我们还是用QTrainer类来包装所有的代码。在初始化方法中将必要的超参数放进来。包括折现系数gamma,主要的神经网络model,以及克隆神经网络target_model,优化器optimizer,还有损失函数计算方法criterion。copy_model函数则是负责克隆模型的参数复制。
class QTrainer:
def __init__(self, lr, gamma,input_dim, hidden_dim, output_dim):
self.gamma = gamma
self.hidden_size = hidden_dim
self.model = Linear_QNet(input_dim, self.hidden_size, output_dim)
self.target_model = Linear_QNet(input_dim, self.hidden_size,output_dim)
self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
self.criterion = nn.MSELoss()
self.copy_model()
def copy_model(self):
self.target_model.load_state_dict(self.model.state_dict())
之后编写关键的训练方法train_step。在这个函数中我们读取的参数就是五个元素,分别是当前状态信息state,对应的行动action,得到的收益reward,下一个状态信息next_state,游戏是否结束done。将这些信息转换成torch可以计算的tensor格式。
然后是实现最为关键的部分,定义好我们的预测值pred,和标签值target。如前所述,pred是神经网络对当前状态作作的价值评估,target是由reward和max(model(next_state))两部分构成。如果当前状态完成后游戏结束的话,就没有下一上状态了,所以target只包括reward了。AI算法部分的代码可以参考snake_model.py文件。
def train_step(self, state, action, reward, next_state, done):
state = torch.tensor(state, dtype=torch.float)
next_state = torch.tensor(next_state, dtype=torch.float)
action = torch.tensor(action, dtype=torch.long)
action = torch.unsqueeze(action, -1)
reward = torch.tensor(reward, dtype=torch.float)
done = torch.tensor(done, dtype=torch.long)
Q_value = self.model(state).gather(-1, action).squeeze()
Q_value_next = self.target_model(next_state).detach().max(-1)[0]
target = (reward + self.gamma * Q_value_next * (1 - done)).squeeze()
self.optimizer.zero_grad()
loss = self.criterion(Q_value,target)
loss.backward()
self.optimizer.step()
最后是编写Agent类,初始参数包括记录游戏轮数n_games,用于随机决策的参数epsilon,折现系数gamma。然后是用于记录历史游戏数据的双向队列memory,神经网络model,模型训练器trainer。
class Agent:
def __init__(self,nS,nA,max_explore=100, gamma = 0.9,
max_memory=5000, lr=0.001, hidden_dim=128):
self.max_explore = max_explore
self.memory = deque(maxlen=max_memory)
self.nS = nS
self.nA = nA
self.n_game = 0
self.trainer = QTrainer(lr, gamma, self.nS, hidden_dim, self.nA)
在类方法里面,定义了记录游戏数据的函数remember,这个函数的输入和模型训练的输入是一致的,它将五个要素组成一个元组,再塞到队列memory里面去。之后定义训练函数train_long_memory,此函数的核心就是调用在训练器trainer中的train_step函数,只不过我们会用memory中的数据来进行训练。再定义另一个训练函数train_short_memory,它并不利用memory数据训练,而是会用游戏即时信息来训练。
之所以用两种训练函数,为了让长期信息和短期信息优势互补。即时的短期信息让训练可以及时进行,不必等到游戏结束。长期信息的训练可以让历史信息得到反复利用,训练更稳定。我们人类也有类似的操作,比如在课堂上学习的时候,我们会马上吸收到老师即时讲述的知识。在学期快结束的时候,我们会复习所有的知识,进行巩固和反刍。
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def train_long_memory(self,batch_size):
if len(self.memory) > batch_size:
mini_sample = random.sample(self.memory, batch_size) # list of tuples
else:
mini_sample = self.memory
states, actions, rewards, next_states, dones = zip(*mini_sample)
states = np.array(states)
next_states = np.array(next_states)
self.trainer.train_step(states, actions, rewards, next_states, dones)
def train_short_memory(self, state, action, reward, next_state, done):
self.trainer.train_step(state, action, reward, next_state, done)
最后要编写的重要方法是get_actioin,它的核心部分是调用模型model来进行预测,输出在某个状态下,三个动作的价值评分,然后算出价值最大的动作,得到最终的行动项final_move。需要补充的是,在游戏前期模型还没有充分训练时,模型效果是比较差的,此时更多需要随机游走探索世界。所以在80轮游戏前,epsilon的值是正值,我们用随机选择的方式来选择行动的方向。
def get_action(self, state, n_game, explore=True):
state = torch.tensor(state, dtype=torch.float)
prediction = self.trainer.model(state).detach().numpy().squeeze()
epsilon = self.max_explore - n_game
if explore and random.randint(0, self.max_explore) < epsilon:
prob = np.exp(prediction)/np.exp(prediction).sum()
final_move = np.random.choice(len(prob), p=prob)
else:
final_move = prediction.argmax()
return final_move
到此为止,所有的AI引擎部分代码编写完成,完整的代码可以参见snake_agent.py。
四、环境与AI引擎的组装运行
经过前面两节的努力,我们已经即将完工。根据之前的描述,我们需要两个大的组件,一个是游戏部分的代码,承担AI环境的角色,它负责接收AI的行动选择,运行游戏逻辑,产生每一步的状态、收益等信息。这部分已经在第二节完成了。另一个是AI引擎部分的代码,它负责接收环境信息,运行算法,产生每一步的行动选择。AI引擎的核心是DQN算法,这个模块还包括了训练模块及其记忆模块。这部分已经在第三小节完成。
此外,还需要一个函数,把AI环境和AI引擎这两大组件进行交互,让信息在两方之间传送交互,让游戏环境运行起来,游戏结束时自动重开,让算法接收数据进行计算等等。这部分我们会用一个train函数来进行最终的组装。
train函数主要功能就是把前述的两个核心类进行实例化。得到AI引擎agent和AI环境game。之后进入游戏主循环,从环境game中得到当前状态信息state_old,再放到引擎中得到行动决策final_move,游戏环境拿到这个决策运行一步游戏,得到这个行动相应的收益、是否结束、分数信息。此时游戏运行一步后,贪吃蛇位置发生变化,状态也不一样了,再次调用get_state函数拿到新的状态。用remember函数把这五个数据保存起来,然后把上述五个数据一起输入给AI引擎,用训练函数train_long_memory来训练。
def train():
plot_scores = []
plot_mean_scores = []
record = 0
total_step = 0
game = Game()
agent = Agent(game.nS,game.nA)
state_new = game.get_state()
while True:
state_old = state_new
final_move = agent.get_action(state_old,agent.n_game)
reward, done, score = game.play_step(final_move)
state_new = game.get_state()
agent.remember(state_old, final_move, reward, state_new, done)
agent.train_long_memory(batch_size=256)
total_step += 1
if total_step % 10 == 0:
agent.trainer.copy_model()
如果本轮游戏结束,将所有环境数据重置。如果本轮游戏吃到的果子超过历史最高记录,则更新这个记录,同时把模型保存。之后我们把有关的几个数据进行绘图,方便我们监测观察游戏过程中,AI引擎的学习过程。
if done:
game.reset()
agent.n_game += 1
if score > record:
record = score
agent.trainer.model.save()
print('Game', agent.n_game, 'Score', score, 'Record:', record)
plot_scores.append(score)
mean_scores = np.mean(plot_scores[-10:])
plot_mean_scores.append(mean_scores)
plot(plot_scores, plot_mean_scores)
这个部分的代码可以参考snake_ai.py文件,这个文件也是程序的运行入口。
然后我们来运行游戏,在终端输入如下命令
系统会调用pygame打开游戏窗口,调用算法模块来驱动贪吃蛇自行移动。可以观察到起初贪吃蛇处于一种无规律的随机移动,也许会半天吃不到一个果子,也许会很快就碰到边界结束本轮游戏。这是因为最初的算法还没有学习到足够有用的信息,算法参数还是初始化时随机参数。
当游戏进行了100轮左右的时候,AI算法在四周边界上已经碰得头破血流,它已经知道当自己靠近边界的时候,收益会很差,同时通过一些随机游走,碰巧吃到几个果子,得到了大量的正收益,它也开始明白往果子坐标方向走会有好处。此时,算法参数已经知道了趋利避害。贪吃蛇明显变聪明了,它的身体可以保持一定长度了。
我们可以看看绘图窗口的结果,横轴是我们游戏的轮数,纵轴是每轮游戏吃到的果实数量。这个统计图可以帮我们从全局上监测贪吃蛇AI的学习进度。一开始的时候成绩比较差,到了200轮左右,最高已经可以吃到50个果实,整体的平均成绩也稳步增长。贪吃蛇AI的确从环境交互中学习了知识,知道如何处理状态信息,如何来拿到较好的收益。
本章小结:
通过本章介绍,我们学习到了三方面内容。第一,我们是如何来修改之前人工操作的贪吃蛇游戏,打造一个游戏环境Game类。关键改造点是要让游戏环境能接受action信息,并输出state信息。模块代码为snake_env.py文件。第二,我们是如何来编写AI引擎,也就是Agent类,关键点是编写DQN算法,接收state信息,输出action给环境。其中理解损失函数的计算是极为重要的。模块代码为snake_agent文件。第三,我们把这两个类实例化后用train函数组装起来,让它们产生交互。代码为snake_ai文件。
从游戏效果上看,AI控制的贪吃蛇还不错,贪吃蛇AI的确能吃到相当数量的果实。那我们还可以进一步想想,还可以怎么玩?读者可以尝试自己做一些修改。一方面是去尝试修改环境,目前的果实是静止不动的,如果改成会移动的果实,贪吃蛇还会吃到吗?另一方面,可以去修改AI引擎,目前我们使用了20个特征来计算状态信息,你能否设计一些更有用的状态信息?我们还发现,早期贪吃蛇AI能否碰巧吃到果实是很重要的,万事开头难。如果前面好些轮都碰巧没吃到果实,它的学习过程会比较艰难。能否让玩家人工操作,记录下用户操作信息,然后让AI去学习呢?这就是模仿学习的思路了。这些方向都是读者可以进一步尝试的,祝你玩的开心。