跳转至

第十四章:打砖块AI编程

在第十三章内容中,我们学习了用贪吃蛇游戏做为AI环境,用DQN来作为AI引擎,去驱动贪吃蛇自动找果实吃。本章我们将实现一个相同设计的程序,只不过是将贪吃蛇游戏换成打转块游戏,之前的AI部分不会有很多的改变。我们仍然会有三部分的代码文件来实现这个编程功能,第一部分代码是改造之前的打砖块游戏程序,第二部分代码是将之前贪吃蛇的AI引擎移植过来,做一点少许的修改。第三部分代码是构造一个函数来组装所有的逻辑,让所有的东西跑起来。下面就让我们开始吧。

一、打砖块环境改造

环境改造思路

先回顾一下打砖块游戏程序的代码结构是怎么样的。

classDiagram

    Game *-- Ball : Composition
    Game *-- Bat :Composition
    Game *-- Brick : Composition

    class Game {
    ball
    bat
    brick
    ...
    }

    class Ball {
    }

    class Bat {
    }

    class Brick {
    }

我们的代码定义了四个类,分别对应了球、球板、砖块以及游戏机制这四个模块。前三个类都使用初始化方法来加载图片资源,这三个类都拥有绘图draw方法。然后球和球板类是拥有update方法,用于更新其位置。最后一个类,也是最重要的是游戏机制即Game类,它在初始化过程中,将一些重要数据进行了初始化。然后在play函数中构造了游戏主循环。

对于人工操作的游戏,我们需要修改四个主要的方面。这些修改的地方和贪吃蛇是差不多的。

  1. 修改游戏重启机制
  2. 提供奖励、状态等游戏信息
  3. 增加决策函数,修改球板控制逻辑
  4. 修改核心运行机制,改造成单步运行

第一项是修改重启机制。在人工操作的游戏结束后,会将球重新放置在相同的起始位置以相同的角度出发,但是在AI算法训练中,我们希望球能在随机的位置出发,开始重新一局。

第二项是要提供游戏信息。人工操作游戏时,我们只需要用肉眼来观察显示器上的图片就可以了。但是在AI算法训练时,我们需要设计一些有用的信息发送传出去。

第三项是修改球板控制逻辑。人工操作游戏是通过鼠标来控制球板运动的,但在AI算法控制游戏时,我们要把检测输入逻辑进行修改,要修改成依赖AI的动作。

第四项是修改成单步运行游戏。为了和AI引擎配合,需要将play函数改造成单步运行。

改造代码实现

首先我们来实现第一项修改。编写一个新的函数reset,它负责在游戏重新开始时,将一些游戏数据进行重置,同时也实例化了新的球板、砖块和球这三种对象。这个函数会用在game类的初始化函数中,也会用在后续介绍的组装代码中。

    def reset(self):
        self.bat = Bat()
        self.ball = Ball(self.Win_width) 
        self.bricks = Bricks() 
        self.score = 0
        self.reward = 0

然后考虑第二项修改,我们需要提供三种信息给AI进行计算。 - 奖励数据 reward - 状态数据 state - 本轮游戏是否结束 game_over

首先考虑正值的reward,也就是当成功的用球板拦截住球的时候,我们把reward设置为10。这个逻辑我们放在bat_collision函数中,其它主要内容和之前的一致。

    def bat_collision(self):
        if self.ball.rect.colliderect(self.bat.rect):
            self.reward = 10
            self.ball.rect.bottom = self.bat.rect.top
            diff_x = self.ball.rect.centerx - self.bat.rect.centerx
            diff_ratio = min(0.95,abs(diff_x)/(0.5*self.bat.rect.width))
            theta = asin(diff_ratio)
            self.ball.speedX = self.ball.speed * sin(theta)
            self.ball.speedY = self.ball.speed * cos(theta)
            self.ball.speedY *= -1
            if (diff_x<0 and self.ball.speedX>0) or (diff_x>0 and self.ball.speedX<0):
                self.ball.speedX *= -1

我们还需要提状态数据,这里的状态信息计算比较简单,只计算球的中心横轴位置相对于球板两端的关系。

    def get_state(self):
        states = [
            self.ball.rect.centerx<self.bat.rect.left-self.bat.rect.width,
            self.ball.rect.centerx<self.bat.rect.left,
            self.ball.rect.centerx>self.bat.rect.right,
            self.ball.rect.centerx>self.bat.rect.right+self.bat.rect.width,
            self.ball.rect.centerx>self.bat.rect.centerx
        ]
        return np.array(states, dtype=float)

再来看第三项修改,在行动决策方面,我们不再需要用鼠标来控制球板运行,因此bat类的update函数会更为简单。它只负责基于speedX来更新球板的位置。

    def update(self,win_width):
        self.positionX += self.speedX
        if self.positionX < 0:
            self.positionX = 0
        if (self.positionX > win_width - self.rect.width):
            self.positionX = win_width - self.rect.width
        self.rect.topleft = (self.positionX, self.positionY)

此外是增加handle_action函数来控制speedX的大小,以此来控制球板的方向,action的取值是三个方向选项,保持不动、向左、向右。

    def handle_action(self, action):
        if action==0:
            self.bat.speedX = 0
        elif action==1:
            self.bat.speedX = -6
        else: 
            self.bat.speedX = 6
        self.bat.update(self.Win_width)

第四项需要修改的是play_step函数。这个函数负责游戏运行一个迭代的所有运算。hand_action来输入AI决策,ball.update来更新球的位置,bat_collision和bricks_collision来判断球板和砖块的碰撞。如果球掉落未被球板接住,check_failed函数会返回逻辑真值,此时将奖励值reward设置为-10。

    def play_step(self,action):
        game_over = False
        self.reward = 0

        for event in pygame.event.get():
            if event.type == QUIT:
                pygame.quit()
                sys.exit()

        self.handle_action(action)    
        self.ball.update(self.Win_width)
        self.bat_collision()
        self.bricks_collision()

        if self.check_failed():
            game_over = True
            self.reward = -10
            return self.reward, game_over, self.score

        if len(self.bricks.contains)==0:
            self.reward = 10
            game_over = True
            return self.reward, game_over, self.score

        self.draw()
        self.Clock.tick(60)
        return self.reward, game_over, self.score

改造后的完整代码参考brick_env.py文件。

二、AI引擎的设计和编写

这里仍然使用基于DQN的AI引擎来玩游戏,所以此处的AI算法代码和上一章是类似的。打砖块的AI引擎也是包括了三个类,分别负责神经网络结构的Linear_QNet类,负责训练器的QTrainer类,以及负责决策的Agent类。

神经网络的结构仍然是一个全连接的三层神经网络。代码内容和第十三章的内容没有区别,但是注意在实际运行的时候,这里的输入层和隐藏层的神经元参数是不一样的。

class Linear_QNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.linear2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = self.linear2(x)
        return x

    def save(self, file_name='model.pth'):
        model_folder_path = './model'
        if not os.path.exists(model_folder_path):
            os.makedirs(model_folder_path)

        file_name = os.path.join(model_folder_path, file_name)
        torch.save(self.state_dict(), file_name)

训练模块代码和第十三章也没有区别。

class QTrainer:
    def __init__(self, lr, gamma,input_dim, hidden_dim, output_dim):
        self.gamma = gamma
        self.hidden_size = hidden_dim
        self.model = Linear_QNet(input_dim, self.hidden_size, output_dim)
        self.target_model = Linear_QNet(input_dim, self.hidden_size,output_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.criterion = nn.MSELoss()
        self.copy_model()

    def copy_model(self):
        self.target_model.load_state_dict(self.model.state_dict())

    def train_step(self, state, action, reward, next_state, done):
        state = torch.tensor(state, dtype=torch.float)
        next_state = torch.tensor(next_state, dtype=torch.float)
        action = torch.tensor(action, dtype=torch.long)
        action = torch.unsqueeze(action, -1)
        reward = torch.tensor(reward, dtype=torch.float)
        done = torch.tensor(done, dtype=torch.long)

        Q_value = self.model(state).gather(-1, action).squeeze()
        Q_value_next = self.target_model(next_state).detach().max(-1)[0]
        target =  (reward + self.gamma * Q_value_next * (1 - done)).squeeze()

        self.optimizer.zero_grad()
        loss = self.criterion(Q_value,target)
        loss.backward()
        self.optimizer.step()

Agent类代码的内容和第十三章也没有区别。

class Agent:
    def __init__(self,nS,nA,max_explore=100, gamma = 0.9,
                max_memory=5000, lr=0.001, hidden_dim=10):
        self.max_explore = max_explore 
        self.memory = deque(maxlen=max_memory) 
        self.nS = nS
        self.nA = nA
        self.n_game = 0
        self.trainer = QTrainer(lr, gamma, self.nS, hidden_dim, self.nA)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done)) 

    def train_long_memory(self,batch_size):
        if len(self.memory)>0:
            if len(self.memory) > batch_size:
                mini_sample = random.sample(self.memory, batch_size) # list of tuples
            mini_sample = self.memory
            states, actions, rewards, next_states, dones = zip(*mini_sample)
            states = np.array(states)
            next_states = np.array(next_states)
            self.trainer.train_step(states, actions, rewards, next_states, dones)

    def get_action(self, state, n_game, explore=True):
        state = torch.tensor(state, dtype=torch.float)
        prediction = self.trainer.model(state).detach().numpy().squeeze()
        epsilon = self.max_explore - n_game
        if explore and random.randint(0, self.max_explore) < epsilon:
            prob = np.exp(prediction)/np.exp(prediction).sum()
            final_move = np.random.choice(len(prob), p=prob)
        else:
            final_move = prediction.argmax()
        return final_move

完整的代码参看brick_agent.py文件。

三、环境和AI引擎的组装运行

我们已经有了一个游戏环境,有了一个AI引擎Agent。下面需要用一个函数将二者组装在一起进行交互。我们编写一个训练函数train,这个函数内容和第十三章的相应内容基本一致。

在train函数内将Game类和Agent类进行实例化,得到两个对象。通过get_state函数来获取初始状态。之后是游戏主循环,在主循环中将状态信息输入给Agent得到相应的决策信息final_move,将这个决策再输入给环境得到这个决策运行对应的结果,包括奖励等信息。随后再获得新的游戏状态。注意在这里使用了一个函数ball_near_bat来执行一个判断条件,因为打砖块游戏中存在大量无关紧要的帧,例如球在砖块上飞行的时候,此时砖块的移动就和球的飞行没太大关系。所以在重要时候,才需要进行记忆的操作。

def train():
    plot_scores = []
    plot_mean_scores = []
    record = 0
    total_step = 0
    game = Game()
    agent = Agent(game.nS,game.nA)
    state_new = game.get_state()

    while True:
        state_old = state_new
        final_move = agent.get_action(state_old,agent.n_game)
        reward, done, score = game.play_step(final_move)
        state_new = game.get_state()
        if game.ball_near_bat():
            agent.remember(state_old, final_move, reward, state_new, done)
        agent.train_long_memory(batch_size=256)
        total_step += 1

        if total_step % 10 == 0:
            agent.trainer.copy_model()

如果本轮游戏结束,将所有环境数据重置。如果本轮游戏打掉的砖块超过历史最高记录,则更新这个分值记录,同时把模型保存。之后我们把有关的几个数据进行绘图,方便我们监测观察游戏过程中,AI引擎的学习过程。

        if done:
            game.reset()
            agent.n_game += 1
            if score > record:
                record = score
                agent.trainer.model.save()
            print('Game', agent.n_game, 'Score', score, 'Record:', record)
            plot_scores.append(score)
            mean_scores = np.mean(plot_scores[-10:])
            plot_mean_scores.append(mean_scores)
            plot(plot_scores, plot_mean_scores)

完整的代码可以参考brick_ai.py文件。

最后我们来运行游戏,在终端输入如下命令。

python snake_ai.py

游戏截图显示如下,你会观察到一些有趣的现象。例如在一开始,AI控制的球板不知道如何行动,它处于一种随机移动的状态。当它偶尔能接住球后,游戏会给AI一些正向的反馈。它会越来越明白应该如何根据球的位置来移动球板。

snake2

我们可以看看绘图窗口的结果,横轴是我们游戏的轮数,纵轴是每轮游戏得到的分数。这个统计图可以帮我们从全局上监测打砖块AI的学习进度。一开始的时候成绩比较差,到了100轮左右,最高已经可以得到60分,AI的确从环境交互中学习了知识,知道如何处理状态信息,如何来拿到较好的收益。

ch14_1

本章小结:

本章和上一章内容相似,可以让我们重复巩固已经学习到的内容。包括两个方面,一个是学习如何修改人工操作游戏,打造成一个可以和AI相配合的游戏环境。另一个是复用了上一章的代码,来构造了一个打砖块AI引擎。不过本章的代码还是有一些不一样的地方。重要的区别在于AI引擎的输入是不一样的,此处的输入信息只有5个特征,所以神经网络的结构也不一样了。另一个区别在于AI引擎在处理记忆时也不一样,它只会记忆重要的游戏环节。

我们还可以进一步想想,还可以怎么玩?读者可以尝试自己做一些修改。一方面是去尝试修改环境,目前的砖块是静止不动的,如果改成会移动的砖块,AI还会得到高分吗?另一方面,可以去修改AI引擎,目前我们使用了5个特征来计算状态信息,这里面没有考虑到砖块的位置信息。你能否设计一些更有用的状态信息?

下一章,我们将会把第六章笨鸟先飞的游戏环境和AI相结合。我们不仅会使用DQN来驱动小鸟飞行,还会尝试用遗传算法来生成一大群小鸟,是不是很壮观。