注意
本示例相容 Gymnasium 1.2.0 版。
使用向量環境加速 A2C 訓練¶
本教程演示瞭如何使用向量環境來加速訓練。
注意¶
如果您遇到類似在 multiprocessing/spawn.py 中引發的 RuntimeError,請將從 gym.make_vec= 或 gym.vector.AsyncVectorEnv 開始到程式碼末尾的部分用 if__name__ == '__main__' 包裹起來。
已嘗試在當前程序完成其引導階段之前啟動新程序。
介紹¶
在本教程中,您將學習如何使用向量化環境來訓練一個優勢演員-評論家(Advantage Actor-Critic)智慧體。我們將使用 A2C,它是 A3C 演算法的同步版本 [1]。
向量化環境 [3] 可以透過允許同一環境的多個例項並行執行(在多個 CPU 上)來幫助實現更快、更穩健的訓練。這可以顯著減少方差,從而加快訓練速度。
我們將從頭開始實現一個優勢演員-評論家(Advantage Actor-Critic),以瞭解如何將批次狀態輸入到網路中以獲得動作向量(每個環境一個動作),並在過渡的小批次資料上計算演員和評論家的損失。每個小批次資料包含一個取樣階段的過渡:在 n_envs 個環境中並行執行 n_steps_per_update 步(兩者相乘可得到一個小批次資料中的過渡數量)。每個取樣階段之後,計算損失並執行一次梯度步。為了計算優勢,我們將使用廣義優勢估計(Generalized Advantage Estimation, GAE)方法 [2],它平衡了優勢估計的方差和偏差之間的權衡。
A2C 智慧體類初始化時需要輸入狀態的特徵數量、智慧體可以採取的動作數量、學習率以及並行執行以收集經驗的環境數量。演員和評論家網路被定義並初始化各自的最佳化器。網路的正向傳播接收批次狀態向量,並返回狀態值張量和動作 logits 張量。`select_action` 方法返回一個元組,包含所選動作、這些動作的對數機率,以及每個動作的狀態值。此外,它還返回策略分佈的熵,該熵稍後(透過加權因子 ent_coef)從損失中減去,以鼓勵探索。
`get_losses` 函式計算演員和評論家網路(使用 GAE)的損失,然後使用 `update_parameters` 函式進行更新。
# Author: Till Zemann
# License: MIT License
from __future__ import annotations
import os
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from torch import optim
from tqdm import tqdm
import gymnasium as gym
優勢演員-評論家 (A2C)¶
演員-評論家結合了基於價值和基於策略的方法的元素。在 A2C 中,智慧體有兩個獨立的神經網路:一個評論家網路(critic network)用於估計狀態價值函式,一個演員網路(actor network)用於輸出所有動作的分類機率分佈的 logits。評論家網路訓練的目標是最小化預測狀態值與智慧體實際獲得的收益之間的均方誤差(這等同於最小化平方優勢,因為動作的優勢定義為收益與狀態價值之間的差值:A(s,a) = Q(s,a) - V(s))。演員網路訓練的目標是透過選擇根據評論家網路具有高期望值的動作來最大化期望收益。
本教程的重點並非 A2C 本身的細節。相反,本教程將重點介紹如何使用向量化環境和領域隨機化來加速 A2C(以及其他強化學習演算法)的訓練過程。
class A2C(nn.Module):
"""
(Synchronous) Advantage Actor-Critic agent class
Args:
n_features: The number of features of the input state.
n_actions: The number of actions the agent can take.
device: The device to run the computations on (running on a GPU might be quicker for larger Neural Nets,
for this code CPU is totally fine).
critic_lr: The learning rate for the critic network (should usually be larger than the actor_lr).
actor_lr: The learning rate for the actor network.
n_envs: The number of environments that run in parallel (on multiple CPUs) to collect experiences.
"""
def __init__(
self,
n_features: int,
n_actions: int,
device: torch.device,
critic_lr: float,
actor_lr: float,
n_envs: int,
) -> None:
"""Initializes the actor and critic networks and their respective optimizers."""
super().__init__()
self.device = device
self.n_envs = n_envs
critic_layers = [
nn.Linear(n_features, 32),
nn.ReLU(),
nn.Linear(32, 32),
nn.ReLU(),
nn.Linear(32, 1), # estimate V(s)
]
actor_layers = [
nn.Linear(n_features, 32),
nn.ReLU(),
nn.Linear(32, 32),
nn.ReLU(),
nn.Linear(
32, n_actions
), # estimate action logits (will be fed into a softmax later)
]
# define actor and critic networks
self.critic = nn.Sequential(*critic_layers).to(self.device)
self.actor = nn.Sequential(*actor_layers).to(self.device)
# define optimizers for actor and critic
self.critic_optim = optim.RMSprop(self.critic.parameters(), lr=critic_lr)
self.actor_optim = optim.RMSprop(self.actor.parameters(), lr=actor_lr)
def forward(self, x: np.ndarray) -> tuple[torch.Tensor, torch.Tensor]:
"""
Forward pass of the networks.
Args:
x: A batched vector of states.
Returns:
state_values: A tensor with the state values, with shape [n_envs,].
action_logits_vec: A tensor with the action logits, with shape [n_envs, n_actions].
"""
x = torch.Tensor(x).to(self.device)
state_values = self.critic(x) # shape: [n_envs,]
action_logits_vec = self.actor(x) # shape: [n_envs, n_actions]
return (state_values, action_logits_vec)
def select_action(
self, x: np.ndarray
) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
"""
Returns a tuple of the chosen actions and the log-probs of those actions.
Args:
x: A batched vector of states.
Returns:
actions: A tensor with the actions, with shape [n_steps_per_update, n_envs].
action_log_probs: A tensor with the log-probs of the actions, with shape [n_steps_per_update, n_envs].
state_values: A tensor with the state values, with shape [n_steps_per_update, n_envs].
"""
state_values, action_logits = self.forward(x)
action_pd = torch.distributions.Categorical(
logits=action_logits
) # implicitly uses softmax
actions = action_pd.sample()
action_log_probs = action_pd.log_prob(actions)
entropy = action_pd.entropy()
return actions, action_log_probs, state_values, entropy
def get_losses(
self,
rewards: torch.Tensor,
action_log_probs: torch.Tensor,
value_preds: torch.Tensor,
entropy: torch.Tensor,
masks: torch.Tensor,
gamma: float,
lam: float,
ent_coef: float,
device: torch.device,
) -> tuple[torch.Tensor, torch.Tensor]:
"""
Computes the loss of a minibatch (transitions collected in one sampling phase) for actor and critic
using Generalized Advantage Estimation (GAE) to compute the advantages (https://arxiv.org/abs/1506.02438).
Args:
rewards: A tensor with the rewards for each time step in the episode, with shape [n_steps_per_update, n_envs].
action_log_probs: A tensor with the log-probs of the actions taken at each time step in the episode, with shape [n_steps_per_update, n_envs].
value_preds: A tensor with the state value predictions for each time step in the episode, with shape [n_steps_per_update, n_envs].
masks: A tensor with the masks for each time step in the episode, with shape [n_steps_per_update, n_envs].
gamma: The discount factor.
lam: The GAE hyperparameter. (lam=1 corresponds to Monte-Carlo sampling with high variance and no bias,
and lam=0 corresponds to normal TD-Learning that has a low variance but is biased
because the estimates are generated by a Neural Net).
device: The device to run the computations on (e.g. CPU or GPU).
Returns:
critic_loss: The critic loss for the minibatch.
actor_loss: The actor loss for the minibatch.
"""
T = len(rewards)
advantages = torch.zeros(T, self.n_envs, device=device)
# compute the advantages using GAE
gae = 0.0
for t in reversed(range(T - 1)):
td_error = (
rewards[t] + gamma * masks[t] * value_preds[t + 1] - value_preds[t]
)
gae = td_error + gamma * lam * masks[t] * gae
advantages[t] = gae
# calculate the loss of the minibatch for actor and critic
critic_loss = advantages.pow(2).mean()
# give a bonus for higher entropy to encourage exploration
actor_loss = (
-(advantages.detach() * action_log_probs).mean() - ent_coef * entropy.mean()
)
return (critic_loss, actor_loss)
def update_parameters(
self, critic_loss: torch.Tensor, actor_loss: torch.Tensor
) -> None:
"""
Updates the parameters of the actor and critic networks.
Args:
critic_loss: The critic loss.
actor_loss: The actor loss.
"""
self.critic_optim.zero_grad()
critic_loss.backward()
self.critic_optim.step()
self.actor_optim.zero_grad()
actor_loss.backward()
self.actor_optim.step()
使用向量化環境¶
當您僅在一個 epoch 內計算兩個神經網路的損失時,它可能具有高方差。使用向量化環境,我們可以並行地處理 n_envs 個環境,從而獲得接近線性加速(理論上意味著我們收集樣本的速度是原來的 n_envs 倍),我們可以利用這些樣本來計算當前策略和評論家網路的損失。當我們使用更多樣本來計算損失時,它的方差會更低,因此會導致更快的學習。
A2C 是一種同步方法,這意味著對網路的引數更新是確定性地進行的(在每個取樣階段之後),但我們仍然可以利用異步向量環境來生成多個程序以並行執行環境。
建立向量環境最簡單的方法是呼叫 gym.vector.make,它會建立同一環境的多個例項。
envs = gym.make_vec("LunarLander-v3", num_envs=3, max_episode_steps=600)
領域隨機化¶
如果我們想在訓練中對環境進行隨機化以獲得更魯棒的智慧體(能夠處理環境的不同引數化,因此可能具有更高的泛化程度),我們可以手動設定所需的引數,或使用偽隨機數生成器來生成它們。
手動設定 3 個具有不同引數的並行‘LunarLander-v3’環境
envs = gym.vector.SyncVectorEnv(
[
lambda: gym.make(
"LunarLander-v3",
gravity=-10.0,
enable_wind=True,
wind_power=15.0,
turbulence_power=1.5,
max_episode_steps=600,
),
lambda: gym.make(
"LunarLander-v3",
gravity=-9.8,
enable_wind=True,
wind_power=10.0,
turbulence_power=1.3,
max_episode_steps=600,
),
lambda: gym.make(
"LunarLander-v3", gravity=-7.0, enable_wind=False, max_episode_steps=600
),
]
)
隨機生成 3 個並行‘LunarLander-v3’環境的引數,使用 np.clip 保持在推薦的引數空間內
envs = gym.vector.SyncVectorEnv(
[
lambda: gym.make(
"LunarLander-v3",
gravity=np.clip(
np.random.normal(loc=-10.0, scale=1.0), a_min=-11.99, a_max=-0.01
),
enable_wind=np.random.choice([True, False]),
wind_power=np.clip(
np.random.normal(loc=15.0, scale=1.0), a_min=0.01, a_max=19.99
),
turbulence_power=np.clip(
np.random.normal(loc=1.5, scale=0.5), a_min=0.01, a_max=1.99
),
max_episode_steps=600,
)
for i in range(3)
]
)
這裡我們使用正態分佈,將環境的標準引數化作為均值,並使用任意標準差(scale)。根據問題的不同,您也可以嘗試更高的方差和使用不同的分佈。
如果在整個訓練過程中您都在相同的 n_envs 個環境中進行訓練,並且 n_envs 是一個相對較小的數字(相對於環境的複雜程度而言),您可能仍然會對您選擇的特定引數化產生過擬合。為了緩解這種情況,您可以選擇大量隨機引數化的環境,或者每隔幾個取樣階段就重新建立您的環境,以生成一組新的偽隨機引數。
設定¶
# environment hyperparams
n_envs = 10
n_updates = 1000
n_steps_per_update = 128
randomize_domain = False
# agent hyperparams
gamma = 0.999
lam = 0.95 # hyperparameter for GAE
ent_coef = 0.01 # coefficient for the entropy bonus (to encourage exploration)
actor_lr = 0.001
critic_lr = 0.005
# Note: the actor has a slower learning rate so that the value targets become
# more stationary and are theirfore easier to estimate for the critic
# environment setup
if randomize_domain:
envs = gym.vector.AsyncVectorEnv(
[
lambda: gym.make(
"LunarLander-v3",
gravity=np.clip(
np.random.normal(loc=-10.0, scale=1.0), a_min=-11.99, a_max=-0.01
),
enable_wind=np.random.choice([True, False]),
wind_power=np.clip(
np.random.normal(loc=15.0, scale=1.0), a_min=0.01, a_max=19.99
),
turbulence_power=np.clip(
np.random.normal(loc=1.5, scale=0.5), a_min=0.01, a_max=1.99
),
max_episode_steps=600,
)
for i in range(n_envs)
]
)
else:
envs = gym.make_vec("LunarLander-v3", num_envs=n_envs, max_episode_steps=600)
obs_shape = envs.single_observation_space.shape[0]
action_shape = envs.single_action_space.n
# set the device
use_cuda = False
if use_cuda:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
else:
device = torch.device("cpu")
# init the agent
agent = A2C(obs_shape, action_shape, device, critic_lr, actor_lr, n_envs)
訓練 A2C 智慧體¶
對於我們的訓練迴圈,我們使用 RecordEpisodeStatistics 包裝器來記錄 episode 長度和收益,並且我們還儲存了損失和熵,以便在智慧體完成訓練後繪製它們。
您可能會注意到,我們並沒有像往常一樣在每個 episode 開始時重置向量化環境。這是因為每個環境在 episode 結束後會自動重置(每個環境由於隨機種子而需要不同數量的時間步來完成一個 episode)。因此,我們也不是按 episode 收集資料,而只是在每個環境中執行一定數量的步(n_steps_per_update)(例如,這可能意味著我們執行 20 個時間步來完成一個 episode,然後使用剩餘的時間步開始一個新的 episode)。
# create a wrapper environment to save episode returns and episode lengths
envs_wrapper = gym.wrappers.vector.RecordEpisodeStatistics(
envs, buffer_length=n_envs * n_updates
)
critic_losses = []
actor_losses = []
entropies = []
# use tqdm to get a progress bar for training
for sample_phase in tqdm(range(n_updates)):
# we don't have to reset the envs, they just continue playing
# until the episode is over and then reset automatically
# reset lists that collect experiences of an episode (sample phase)
ep_value_preds = torch.zeros(n_steps_per_update, n_envs, device=device)
ep_rewards = torch.zeros(n_steps_per_update, n_envs, device=device)
ep_action_log_probs = torch.zeros(n_steps_per_update, n_envs, device=device)
masks = torch.zeros(n_steps_per_update, n_envs, device=device)
# at the start of training reset all envs to get an initial state
if sample_phase == 0:
states, info = envs_wrapper.reset(seed=42)
# play n steps in our parallel environments to collect data
for step in range(n_steps_per_update):
# select an action A_{t} using S_{t} as input for the agent
actions, action_log_probs, state_value_preds, entropy = agent.select_action(
states
)
# perform the action A_{t} in the environment to get S_{t+1} and R_{t+1}
states, rewards, terminated, truncated, infos = envs_wrapper.step(
actions.cpu().numpy()
)
ep_value_preds[step] = torch.squeeze(state_value_preds)
ep_rewards[step] = torch.tensor(rewards, device=device)
ep_action_log_probs[step] = action_log_probs
# add a mask (for the return calculation later);
# for each env the mask is 1 if the episode is ongoing and 0 if it is terminated (not by truncation!)
masks[step] = torch.tensor([not term for term in terminated])
# calculate the losses for actor and critic
critic_loss, actor_loss = agent.get_losses(
ep_rewards,
ep_action_log_probs,
ep_value_preds,
entropy,
masks,
gamma,
lam,
ent_coef,
device,
)
# update the actor and critic networks
agent.update_parameters(critic_loss, actor_loss)
# log the losses and entropy
critic_losses.append(critic_loss.detach().cpu().numpy())
actor_losses.append(actor_loss.detach().cpu().numpy())
entropies.append(entropy.detach().mean().cpu().numpy())
繪圖¶
""" plot the results """
# %matplotlib inline
rolling_length = 20
fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(12, 5))
fig.suptitle(
f"Training plots for {agent.__class__.__name__} in the LunarLander-v3 environment \n \
(n_envs={n_envs}, n_steps_per_update={n_steps_per_update}, randomize_domain={randomize_domain})"
)
# episode return
axs[0][0].set_title("Episode Returns")
episode_returns_moving_average = (
np.convolve(
np.array(envs_wrapper.return_queue).flatten(),
np.ones(rolling_length),
mode="valid",
)
/ rolling_length
)
axs[0][0].plot(
np.arange(len(episode_returns_moving_average)) / n_envs,
episode_returns_moving_average,
)
axs[0][0].set_xlabel("Number of episodes")
# entropy
axs[1][0].set_title("Entropy")
entropy_moving_average = (
np.convolve(np.array(entropies), np.ones(rolling_length), mode="valid")
/ rolling_length
)
axs[1][0].plot(entropy_moving_average)
axs[1][0].set_xlabel("Number of updates")
# critic loss
axs[0][1].set_title("Critic Loss")
critic_losses_moving_average = (
np.convolve(
np.array(critic_losses).flatten(), np.ones(rolling_length), mode="valid"
)
/ rolling_length
)
axs[0][1].plot(critic_losses_moving_average)
axs[0][1].set_xlabel("Number of updates")
# actor loss
axs[1][1].set_title("Actor Loss")
actor_losses_moving_average = (
np.convolve(np.array(actor_losses).flatten(), np.ones(rolling_length), mode="valid")
/ rolling_length
)
axs[1][1].plot(actor_losses_moving_average)
axs[1][1].set_xlabel("Number of updates")
plt.tight_layout()
plt.show()
同步和異步向量化環境的效能分析¶
與同步環境相比,非同步環境可以帶來更快的訓練時間和更高的資料收集加速。這是因為非同步環境允許多個智慧體並行地與其環境互動,而同步環境則序列執行多個環境。這使得非同步環境具有更高的效率和更快的訓練時間。
根據 Karp-Flatt 指標(一種用於平行計算中,估計並行程序數量(此處為環境數量)擴充套件時的加速極限的指標),非同步環境的估計最大加速為 57,而同步環境的估計最大加速為 21。這表明非同步環境比同步環境具有顯著更快的訓練時間(參見圖表)。
然而,需要注意的是,在達到一定數量的環境後,增加並行向量環境的數量可能會導致訓練時間變慢(參見下圖,智慧體訓練到平均訓練收益高於 -120)。訓練時間變慢可能發生的原因是,在相對較少數量的環境下,環境的梯度已經足夠好(特別是當環境不是很複雜時)。在這種情況下,增加環境數量並不會提高學習速度,反而會增加執行時間,這可能是由於計算梯度所需的額外時間。對於 LunarLander-v3,表現最佳的配置是使用 10 個並行環境的 AsyncVectorEnv,但複雜度更高的環境可能需要更多並行環境才能達到最佳效能。
儲存/載入權重¶
save_weights = False
load_weights = False
actor_weights_path = "weights/actor_weights.h5"
critic_weights_path = "weights/critic_weights.h5"
if not os.path.exists("weights"):
os.mkdir("weights")
""" save network weights """
if save_weights:
torch.save(agent.actor.state_dict(), actor_weights_path)
torch.save(agent.critic.state_dict(), critic_weights_path)
""" load network weights """
if load_weights:
agent = A2C(obs_shape, action_shape, device, critic_lr, actor_lr)
agent.actor.load_state_dict(torch.load(actor_weights_path))
agent.critic.load_state_dict(torch.load(critic_weights_path))
agent.actor.eval()
agent.critic.eval()
展示智慧體¶
""" play a couple of showcase episodes """
n_showcase_episodes = 3
for episode in range(n_showcase_episodes):
print(f"starting episode {episode}...")
# create a new sample environment to get new random parameters
if randomize_domain:
env = gym.make(
"LunarLander-v3",
render_mode="human",
gravity=np.clip(
np.random.normal(loc=-10.0, scale=2.0), a_min=-11.99, a_max=-0.01
),
enable_wind=np.random.choice([True, False]),
wind_power=np.clip(
np.random.normal(loc=15.0, scale=2.0), a_min=0.01, a_max=19.99
),
turbulence_power=np.clip(
np.random.normal(loc=1.5, scale=1.0), a_min=0.01, a_max=1.99
),
max_episode_steps=500,
)
else:
env = gym.make("LunarLander-v3", render_mode="human", max_episode_steps=500)
# get an initial state
state, info = env.reset()
# play one episode
done = False
while not done:
# select an action A_{t} using S_{t} as input for the agent
with torch.no_grad():
action, _, _, _ = agent.select_action(state[None, :])
# perform the action A_{t} in the environment to get S_{t+1} and R_{t+1}
state, reward, terminated, truncated, info = env.step(action.item())
# update if the environment is done
done = terminated or truncated
env.close()
嘗試自己玩轉環境¶
# from gymnasium.utils.play import play
#
# play(gym.make('LunarLander-v3', render_mode='rgb_array'),
# keys_to_action={'w': 2, 'a': 1, 'd': 3}, noop=0)
參考文獻¶
[1] V. Mnih, A. P. Badia, M. Mirza, A. Graves, T. P. Lillicrap, T. Harley, D. Silver, K. Kavukcuoglu. “Asynchronous Methods for Deep Reinforcement Learning” ICML (2016).
[2] J. Schulman, P. Moritz, S. Levine, M. Jordan and P. Abbeel. “High-dimensional continuous control using generalized advantage estimation.” ICLR (2016).
[3] Gymnasium 文件:向量環境。(URL: https://gymnasium.llms.tw/api/vector/)