您的位置:首页 >环保 >

用深度强化学习玩超级马里奥兄弟

时间:2021-08-19 16:16:40 来源:

介绍

从本文中,你将学习如何使用 Deep Q-Network 和 Double Deep Q-Network(带代码!)玩超级马里奥兄弟。

超级马里奥是任天堂在 1980 年代开发和发行的著名游戏。它是历经多年无需解释的经典游戏名称之一。这是一款2D横向卷轴游戏,让玩家可以控制主角——马里奥。游戏玩法包括从左到右移动马里奥,从反派中生存下来,获得硬币,以及到达旗帜以清除关卡。马里奥最终需要拯救公主。这些有不同的奖励系统、硬币、反派、漏洞和完成时间。游戏环境取自 OpenAI Gym,使用 Nintendo Entertainment System (NES) python 模拟器。在本文中,我将展示如何使用深度 Q 网络 (DQN) 和深度双 Q 网络 (DDQN) 算法和PyTorch 库来实现强化学习算法,以检查它们各自的性能。然后评估对每种算法进行的实验。数据理解和预处理超级马里奥兄弟的原始观察空间是 240 x 256 x 3 的 RGB 图像。动作空间是 256,这意味着能够采取 256 种不同的可能动作。为了加快我们模型的训练时间,我们使用了gym的包装器函数对原始环境应用了某些转换:在 4 帧上重复代理的每个动作并减小视频帧大小,即环境中的每个状态都是 4 x 84 x 84 x 1(4 个连续 84 x 84 灰度像素帧的列表)将像素值归一化到 0 到 1 的范围内将动作次数减少到 5(仅右)、7(简单动作)和 12(复杂动作)理论结果最初,我想使用 Q-learning 执行一个实验,该实验使用二维数组来存储状态和动作对值的所有可能组合。但是,在这种环境设置中,我意识到应用 Q-learning 是不可能的,因为需要存储非常大的 Q-table, 而这是不可行的。因此,本项目使用 DQN 算法作为基线模型。DQN 算法使用 Q-learning 来学习在给定状态下采取的最佳动作,并使用深度神经网络来估计 Q 值函数。我使用的深度神经网络类型是一个 3 层卷积神经网络,后跟两个完全连接的线性层,每个可能的动作都有一个输出。该网络的工作原理类似于 Q-Learning 算法中的 Q-table。我们使用的目标损失函数是 Huber 损失或 Q 值的平滑平均绝对误差。Huber loss 结合了 MSE 和 MAE 来最小化目标函数。我们用来优化目标函数的优化器是 Adam。但是,DQN 网络存在高估的问题。

图 1:说明 DQN 网络如何被高估如图1所示,高估的主要原因有两个。第一个原因是由于用于计算目标值的最大化函数。假设action值为True, 表示为:x(a?) … x(a?)。由 DQN 做出的噪声估计由 Q(s,a?;w), ... Q(s, a?;w) 表示,在数学上,

因此它高估了真实的 Q 值。第二个原因是高估的 Q 值再次被用于通过反向传播更新 Q 网络的权重。这使得高估更加严重。高估的主要缺点是由于 DQN 所做的非均匀高估。直观的感觉是,一个特定的状态、操作对在重放缓冲区中出现的频率越高,对该状态-操作对的高估就越高。为了获得更准确的 Q 值,我们想在我们的问题上使用 DDQN 网络,然后将实验结果与之前的 DQN 网络进行比较。为了减轻由最大化引起的高估,DDQN 使用 2 个 Q 网络,一个用于获取动作,另一个用于通过反向传播更新权重。DDQN Q-learning更新方程为:

Q* 用于更新权重,Q^ 用于获取特定状态的动作。Q^ 只是每 n 步复制 Q* 的值。实验结果使用 2 种算法 DQN 和 DDQN,基于智能体的不同运动进行了 5 次实验。不同的动作是复杂动作、简单动作和仅右动作。参数设置如下:观察空间:4 x 84 x 84 x 1动作空间:12(复杂动作)或7(简单动作)或5(仅右动作)损失函数:HuberLoss,δ = 1优化器:Adam,lr = 0.00025betas = (0.9, 0.999)批大小 = 64 Dropout = 0.2gamma = 0.9体验回放的最大内存大小 = 30000对于 epsilon greedy:探索衰减 = 0.99,探索最小值 = 0.05在探索开始时,max = 1,代理将采取随机动作。在每一次动作之后,它将以探索衰减率衰减,直到达到 0.05 的探索最小值。实验一进行的第一个实验是比较 DDQN 和 DQN 算法用于智能体的复杂运动。

实验二进行的第二个实验是比较 DDQN 和 DQN 算法对于智能体的简单移动。

实验三进行的第三个实验是比较 DDQN 和 DQN 算法仅适用于代理的右运动。

从以上 3 个实验结果可以看出,在所有情况下,DQN 在第 10,000 集的性能与 DDQN 在第 2,000 集的性能大致相同。因此,我们可以得出结论,DDQN 网络有助于消除由 DQN 网络引起的高估问题。使用 DDQN 和 DQN 对 3 种不同运动进行了进一步的实验。实验四进行的第四个实验是在所有 3 个不同的动作上使用 DDQN 算法。

实验五进行的第五个实验是对所有 3 个不同的动作使用 DQN 算法

从以上 2 个实验结果,我们可以得出结论,该网络能够在仅允许代理在仅右运动的动作空间上进行更好的训练。代码import torch

import torch.nn as nn

import random

from nes_py.wrappers import JoypadSpace

import gym_super_mario_bros

from tqdm import tqdm

import pickle

from gym_super_mario_bros.actions import RIGHT_ONLY, SIMPLE_MOVEMENT, COMPLEX_MOVEMENT

import gym

import numpy as np

import collections

import cv2

import matplotlib.pyplot as plt

%matplotlib inline

import time

import pylab as pl

from IPython import display

class MaxAndSkipEnv(gym.Wrapper):

"""

Each action of the agent is repeated over skip frames

return only every `skip`-th frame

"""

def __init__(self, env=None, skip=4):

super(MaxAndSkipEnv, self).__init__(env) # most recent raw observations (for max pooling across time steps) self._obs_buffer = collections.deque(maxlen=2) self._skip = skip def step(self, action): total_reward = 0.0 done = None for _ in range(self._skip): obs, reward, done, info = self.env.step(action) self._obs_buffer.append(obs) total_reward += reward if done: break max_frame = np.max(np.stack(self._obs_buffer), axis=0) return max_frame, total_reward, done, info def reset(self): """Clear past frame buffer and init to first obs""" self._obs_buffer.clear() obs = self.env.reset() self._obs_buffer.append(obs) return obsclass MarioRescale84x84(gym.ObservationWrapper): """ Downsamples/Rescales each frame to size 84x84 with greyscale """ def __init__(self, env=None): super(MarioRescale84x84, self).__init__(env) self.observation_space = gym.spaces.Box(low=0, high=255, shape=(84, 84, 1), dtype=np.uint8) def observation(self, obs): return MarioRescale84x84.process(obs) @staticmethod def process(frame): if frame.size == 240 * 256 * 3: img = np.reshape(frame, [240, 256, 3]).astype(np.float32) else: assert False, "Unknown resolution." # image normalization on RBG img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114 resized_screen = cv2.resize(img, (84, 110), interpolation=cv2.INTER_AREA) x_t = resized_screen[18:102, :] x_t = np.reshape(x_t, [84, 84, 1]) return x_t.astype(np.uint8)class ImageToPyTorch(gym.ObservationWrapper): """ Each frame is converted to PyTorch tensors """ def __init__(self, env): super(ImageToPyTorch, self).__init__(env) old_shape = self.observation_space.shape self.observation_space = gym.spaces.Box(low=0.0, high=1.0, shape=(old_shape[-1], old_shape[0], old_shape[1]), dtype=np.float32) def observation(self, observation): return np.moveaxis(observation, 2, 0) class BufferWrapper(gym.ObservationWrapper): """ Only every k-th frame is collected by the buffer """ def __init__(self, env, n_steps, dtype=np.float32): super(BufferWrapper, self).__init__(env) self.dtype = dtype old_space = env.observation_space self.observation_space = gym.spaces.Box(old_space.low.repeat(n_steps, axis=0), old_space.high.repeat(n_steps, axis=0), dtype=dtype) def reset(self): self.buffer = np.zeros_like(self.observation_space.low, dtype=self.dtype) return self.observation(self.env.reset()) def observation(self, observation): self.buffer[:-1] = self.buffer[1:] self.buffer[-1] = observation return self.bufferclass PixelNormalization(gym.ObservationWrapper): """ Normalize pixel values in frame --> 0 to 1 """ def observation(self, obs): return np.array(obs).astype(np.float32) / 255.0def create_mario_env(env): env = MaxAndSkipEnv(env) env = MarioRescale84x84(env) env = ImageToPyTorch(env) env = BufferWrapper(env, 4) env = PixelNormalization(env) return JoypadSpace(env, SIMPLE_MOVEMENT) class DQNSolver(nn.Module): """ Convolutional Neural Net with 3 conv layers and two linear layers """ def __init__(self, input_shape, n_actions): super(DQNSolver, self).__init__() self.conv = nn.Sequential( nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4), nn.ReLU(), nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.ReLU(), nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.ReLU() ) conv_out_size = self._get_conv_out(input_shape) self.fc = nn.Sequential( nn.Linear(conv_out_size, 512), nn.ReLU(), nn.Linear(512, n_actions) ) def _get_conv_out(self, shape): o = self.conv(torch.zeros(1, *shape)) return int(np.prod(o.size())) def forward(self, x): conv_out = self.conv(x).view(x.size()[0], -1) return self.fc(conv_out)class DQNAgent: def __init__(self, state_space, action_space, max_memory_size, batch_size, gamma, lr, dropout, exploration_max, exploration_min, exploration_decay, double_dqn, pretrained): # Define DQN Layers self.state_space = state_space self.action_space = action_space self.double_dqn = double_dqn self.pretrained = pretrained self.device = 'cuda' if torch.cuda.is_available() else 'cpu' # Double DQN network if self.double_dqn: self.local_net = DQNSolver(state_space, action_space).to(self.device) self.target_net = DQNSolver(state_space, action_space).to(self.device) if self.pretrained: self.local_net.load_state_dict(torch.load("DQN1.pt", map_location=torch.device(self.device))) self.target_net.load_state_dict(torch.load("DQN2.pt", map_location=torch.device(self.device))) self.optimizer = torch.optim.Adam(self.local_net.parameters(), lr=lr) self.copy = 5000 # Copy the local model weights into the target network every 5000 steps self.step = 0 # DQN network else: self.dqn = DQNSolver(state_space, action_space).to(self.device) if self.pretrained: self.dqn.load_state_dict(torch.load("DQN.pt", map_location=torch.device(self.device))) self.optimizer = torch.optim.Adam(self.dqn.parameters(), lr=lr) # Create memory self.max_memory_size = max_memory_size if self.pretrained: self.STATE_MEM = torch.load("STATE_MEM.pt") self.ACTION_MEM = torch.load("ACTION_MEM.pt") self.REWARD_MEM = torch.load("REWARD_MEM.pt") self.STATE2_MEM = torch.load("STATE2_MEM.pt") self.DONE_MEM = torch.load("DONE_MEM.pt") with open("ending_position.pkl", 'rb') as f: self.ending_position = pickle.load(f) with open("num_in_queue.pkl", 'rb') as f: self.num_in_queue = pickle.load(f) else: self.STATE_MEM = torch.zeros(max_memory_size, *self.state_space) self.ACTION_MEM = torch.zeros(max_memory_size, 1) self.REWARD_MEM = torch.zeros(max_memory_size, 1) self.STATE2_MEM = torch.zeros(max_memory_size, *self.state_space) self.DONE_MEM = torch.zeros(max_memory_size, 1) self.ending_position = 0 self.num_in_queue = 0 self.memory_sample_size = batch_size # Learning parameters self.gamma = gamma self.l1 = nn.SmoothL1Loss().to(self.device) # Also known as Huber loss self.exploration_max = exploration_max self.exploration_rate = exploration_max self.exploration_min = exploration_min self.exploration_decay = exploration_decay def remember(self, state, action, reward, state2, done): """Store the experiences in a buffer to use later""" self.STATE_MEM[self.ending_position] = state.float() self.ACTION_MEM[self.ending_position] = action.float() self.REWARD_MEM[self.ending_position] = reward.float() self.STATE2_MEM[self.ending_position] = state2.float() self.DONE_MEM[self.ending_position] = done.float() self.ending_position = (self.ending_position + 1) % self.max_memory_size # FIFO tensor self.num_in_queue = min(self.num_in_queue + 1, self.max_memory_size) def batch_experiences(self): """Randomly sample 'batch size' experiences""" idx = random.choices(range(self.num_in_queue), k=self.memory_sample_size) STATE = self.STATE_MEM[idx] ACTION = self.ACTION_MEM[idx] REWARD = self.REWARD_MEM[idx] STATE2 = self.STATE2_MEM[idx] DONE = self.DONE_MEM[idx] return STATE, ACTION, REWARD, STATE2, DONE def act(self, state): """Epsilon-greedy action""" if self.double_dqn: self.step += 1 if random.random() < self.exploration_rate: return torch.tensor([[random.randrange(self.action_space)]]) if self.double_dqn: # Local net is used for the policy return torch.argmax(self.local_net(state.to(self.device))).unsqueeze(0).unsqueeze(0).cpu() else: return torch.argmax(self.dqn(state.to(self.device))).unsqueeze(0).unsqueeze(0).cpu() def copy_model(self): """Copy local net weights into target net for DDQN network""" self.target_net.load_state_dict(self.local_net.state_dict()) def experience_replay(self): """Use the double Q-update or Q-update equations to update the network weights""" if self.double_dqn and self.step % self.copy == 0: self.copy_model() if self.memory_sample_size > self.num_in_queue: return # Sample a batch of experiences STATE, ACTION, REWARD, STATE2, DONE = self.batch_experiences() STATE = STATE.to(self.device) ACTION = ACTION.to(self.device) REWARD = REWARD.to(self.device) STATE2 = STATE2.to(self.device) DONE = DONE.to(self.device) self.optimizer.zero_grad() if self.double_dqn: # Double Q-Learning target is Q*(S, A) <- r + γ max_a Q_target(S', a) target = REWARD + torch.mul((self.gamma * self.target_net(STATE2).max(1).values.unsqueeze(1)), 1 - DONE) current = self.local_net(STATE).gather(1, ACTION.long()) # Local net approximation of Q-value else: # Q-Learning target is Q*(S, A) <- r + γ max_a Q(S', a) target = REWARD + torch.mul((self.gamma * self.dqn(STATE2).max(1).values.unsqueeze(1)), 1 - DONE) current = self.dqn(STATE).gather(1, ACTION.long()) loss = self.l1(current, target) loss.backward() # Compute gradients self.optimizer.step() # Backpropagate error self.exploration_rate *= self.exploration_decay # Makes sure that exploration rate is always at least 'exploration min' self.exploration_rate = max(self.exploration_rate, self.exploration_min) def show_state(env, ep=0, info=""): """While testing show the mario playing environment""" plt.figure(3) plt.clf() plt.imshow(env.render(mode='rgb_array')) plt.title("Episode: %d %s" % (ep, info)) plt.axis('off') display.clear_output(wait=True) display.display(plt.gcf()) def run(training_mode, pretrained, double_dqn, num_episodes=1000, exploration_max=1): env = gym_super_mario_bros.make('SuperMarioBros-1-1-v0') env = create_mario_env(env) # Wraps the environment so that frames are grayscale observation_space = env.observation_space.shape action_space = env.action_space.n agent = DQNAgent(state_space=observation_space, action_space=action_space, max_memory_size=30000, batch_size=32, gamma=0.90, lr=0.00025, dropout=0.2, exploration_max=1.0, exploration_min=0.02, exploration_decay=0.99, double_dqn=double_dqn, pretrained=pretrained) # Restart the enviroment for each episode num_episodes = num_episodes env.reset() total_rewards = [] if training_mode and pretrained: with open("total_rewards.pkl", 'rb') as f: total_rewards = pickle.load(f) for ep_num in tqdm(range(num_episodes)): state = env.reset() state = torch.Tensor([state]) total_reward = 0 steps = 0 while True: if not training_mode: show_state(env, ep_num) action = agent.act(state) steps += 1 state_next, reward, terminal, info = env.step(int(action[0])) total_reward += reward state_next = torch.Tensor([state_next]) reward = torch.tensor([reward]).unsqueeze(0) terminal = torch.tensor([int(terminal)]).unsqueeze(0) if training_mode: agent.remember(state, action, reward, state_next, terminal) agent.experience_replay() state = state_next if terminal: break total_rewards.append(total_reward) if ep_num != 0 and ep_num % 100 == 0: print("Episode {} score = {}, average score = {}".format(ep_num + 1, total_rewards[-1], np.mean(total_rewards))) num_episodes += 1 print("Episode {} score = {}, average score = {}".format(ep_num + 1, total_rewards[-1], np.mean(total_rewards))) # Save the trained memory so that we can continue from where we stop using 'pretrained' = True if training_mode: with open("ending_position.pkl", "wb") as f: pickle.dump(agent.ending_position, f) with open("num_in_queue.pkl", "wb") as f: pickle.dump(agent.num_in_queue, f) with open("total_rewards.pkl", "wb") as f: pickle.dump(total_rewards, f) if agent.double_dqn: torch.save(agent.local_net.state_dict(), "DQN1.pt") torch.save(agent.target_net.state_dict(), "DQN2.pt") else: torch.save(agent.dqn.state_dict(), "DQN.pt") torch.save(agent.STATE_MEM, "STATE_MEM.pt") torch.save(agent.ACTION_MEM, "ACTION_MEM.pt") torch.save(agent.REWARD_MEM, "REWARD_MEM.pt") torch.save(agent.STATE2_MEM, "STATE2_MEM.pt") torch.save(agent.DONE_MEM, "DONE_MEM.pt") env.close()

# For training

run(training_mode=True, pretrained=False, double_dqn=True, num_episodes=1, exploration_max = 1)

# For Testing

run(training_mode=False, pretrained=True, double_dqn=True, num_episodes=1, exploration_max = 0.05)

结论与DQN相比,DDQN需要的训练片段要少得多。因此,DDQN网络有助于消除DQN网络中存在的高估问题。DQN和DDQN网络相比简单和复杂的运动动作空间,都能更好地进行仅右运动训练。


郑重声明:文章仅代表原作者观点,不代表本站立场;如有侵权、违规,可直接反馈本站,我们将会作修改或删除处理。