AsyncVectorEnv¶
- class gymnasium.vector.AsyncVectorEnv(env_fns: Sequence[Callable[[], Env]], shared_memory: bool = True, copy: bool = True, context: str | None = None, daemon: bool = True, worker: Callable[[int, Callable[[], Env], Connection, Connection, bool, Queue], None] | None = None, observation_mode: str | Space = 'same', autoreset_mode: str | AutoresetMode = AutoresetMode.NEXT_STEP)[source]¶
並行執行多個環境的向量化環境。
它使用
multiprocessing程序和管道進行通訊。示例
>>> import gymnasium as gym >>> envs = gym.make_vec("Pendulum-v1", num_envs=2, vectorization_mode="async") >>> envs AsyncVectorEnv(Pendulum-v1, num_envs=2) >>> envs = gym.vector.AsyncVectorEnv([ ... lambda: gym.make("Pendulum-v1", g=9.81), ... lambda: gym.make("Pendulum-v1", g=1.62) ... ]) >>> envs AsyncVectorEnv(num_envs=2) >>> observations, infos = envs.reset(seed=42) >>> observations array([[-0.14995256, 0.9886932 , -0.12224312], [ 0.5760367 , 0.8174238 , -0.91244936]], dtype=float32) >>> infos {} >>> _ = envs.action_space.seed(123) >>> observations, rewards, terminations, truncations, infos = envs.step(envs.action_space.sample()) >>> observations array([[-0.1851753 , 0.98270553, 0.714599 ], [ 0.6193494 , 0.7851154 , -1.0808398 ]], dtype=float32) >>> rewards array([-2.96495728, -1.00214607]) >>> terminations array([False, False]) >>> truncations array([False, False]) >>> infos {}
- 引數:
env_fns – 用於建立環境的函式。
shared_memory – 如果為
True,則工作程序的觀測值透過共享變數傳回。如果觀測值很大(例如影像),這可以提高效率。copy – 如果為
True,則AsyncVectorEnv.reset()和AsyncVectorEnv.step()方法返回觀測值的副本。context – multiprocessing 的上下文。如果為
None,則使用預設上下文。daemon – 如果為
True,則子程序的daemon標誌將開啟;也就是說,如果主程序退出,它們也會退出。然而,daemon=True會阻止子程序生成子程序,因此對於某些環境,您可能希望將其設定為False。worker – 如果已設定,則在子程序中使用該工作器而不是預設工作器。這對於覆蓋某些內部向量環境邏輯很有用,例如,如何處理終止或截斷時的重置。
observation_mode – 定義環境觀測空間應如何批次處理。'same' 定義應有
n個相同空間的副本。'different' 定義可以有多個具有不同引數的觀測空間,但需要相同的形狀和資料型別,警告:可能會引發意外錯誤。傳遞Tuple[Space, Space]物件允許定義自定義的single_observation_space和observation_space,警告:可能會引發意外錯誤。autoreset_mode – 使用的自動重置模式,有關更多資訊,請參閱 https://farama.org/Vector-Autoreset-Mode。
警告
worker 是一個高階模式選項。它提供了高度的靈活性,但也極有可能讓你自食其果;因此,如果你正在編寫自己的 worker,建議從
_worker(或_async_worker)方法的程式碼開始,然後新增更改。- 引發:
RuntimeError – 如果某些子環境的觀測空間與 observation_space 不匹配(或者,預設情況下,與第一個子環境的觀測空間不匹配)。
ValueError – 如果 observation_space 是自定義空間(即不是 Gym 中的預設空間,例如 gymnasium.spaces.Box、gymnasium.spaces.Discrete 或 gymnasium.spaces.Dict)並且 shared_memory 為 True。
- reset(*, seed: int | list[int | None] | None = None, options: dict[str, Any] | None = None) tuple[ObsType, dict[str, Any]][source]¶
並行重置所有子環境,並返回一批連線的觀測值和資訊。
- 引數:
seed – 環境重置的種子。
options – 是否返回選項。
- 返回:
來自向量化環境的一批觀測值和資訊。
- step(actions: ActType) tuple[ObsType, ArrayType, ArrayType, ArrayType, dict[str, Any]][source]¶
為每個並行環境採取一個動作。
- 引數:
actions –
action_space動作批次的元素。- 返回:
批次 (觀測值,獎勵,終止,截斷,資訊)
- close(**kwargs: Any)¶
關閉所有並行環境並釋放資源。
它還會關閉所有現有的影像檢視器,然後呼叫
close_extras()並將closed設定為True。警告
此函式本身不關閉環境,應在
close_extras()中處理。這對於同步和異步向量化環境都是通用的。注意
這將在垃圾回收或程式退出時自動呼叫。
- 引數:
**kwargs – 傳遞給
close_extras()的關鍵字引數。
- call(name: str, *args: Any, **kwargs: Any) tuple[Any, ...][source]¶
使用 args 和 kwargs 呼叫每個並行環境中的方法。
- 引數:
name (str) – 要呼叫的方法或屬性的名稱。
*args – 應用於方法呼叫的位置引數。
**kwargs – 應用於方法呼叫的關鍵字引數。
- 返回:
每個環境呼叫方法或屬性的單獨結果列表。
- get_attr(name: str) tuple[Any, ...][source]¶
從每個並行環境獲取一個屬性。
- 引數:
name (str) – 要從每個獨立環境獲取的屬性的名稱。
- 返回:
具有名稱的屬性
- set_attr(name: str, values: list[Any] | tuple[Any] | object)[source]¶
設定子環境的屬性。
- 引數:
name – 要在每個獨立環境中設定的屬性名稱。
values – 要設定的屬性值。如果
values是列表或元組,則對應於每個獨立環境的值,否則為所有環境設定單個值。
- 引發:
ValueError – 值必須是列表或元組,且長度等於環境數量。
AlreadyPendingCallError – 在等待掛起呼叫完成時呼叫
set_attr()。
附加方法¶
- property AsyncVectorEnv.np_random: tuple[Generator, ...]¶
返回封裝環境中 numpy 隨機數生成器的元組。
- property AsyncVectorEnv.np_random_seed: tuple[int, ...]¶
返回所有封裝環境的 np_random 種子元組。