PPO Agent playing BipedalWalker-v3
This is a trained model of a PPO agent playing BipedalWalker-v3 using the stable-baselines3 library.
Usage (with Stable-baselines3)
TODO: Add your code
from stable_baselines3 import ...
from huggingface_sb3 import load_from_hub
# **1. Setup**
### **Install Packages**
"""
# Install necessary packages
!apt install swig cmake ffmpeg xvfb python3-opengl
!pip install stable-baselines3==2.0.0a5 gymnasium[box2d] huggingface_sb3 pyvirtualdisplay imageio[ffmpeg]
"""The Next Cell will force the notebook runtime to restart. This is to ensure all the new libraries installed will be used."""
import os
os.kill(os.getpid(), 9)
"""### **Start Virtual Display**"""
from pyvirtualdisplay import Display
virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()
"""### **Setup Environment**"""
import gymnasium as gym
env = gym.make("BipedalWalker-v3", hardcore=True)
env.reset()
"""### **Observation Space**
Observation Space Shape (24,) vector of size 24, where each value contains different information about the walker:
- **Hull Angle Speed**: The speed at which the main body of the walker is rotating.
- **Angular Velocity**: The rate of change of the angular position of the walker.
- **Horizontal Speed**: The speed at which the walker is moving horizontally.
- **Vertical Speed**: The speed at which the walker is moving vertically.
- **Position of Joints**: The positions (angles) of the walker's joints. Given that the walker has 4 joints, this take up 4 values.
- **Joints Angular Speed**: The rate of change of the angular position for each joint. Again, this would be 4 values for the 4 joints.
- **Legs Contact with Ground**: Indicating whether each leg is in contact with the ground. Given two legs, this contains 2 values.
- **10 Lidar Rangefinder Measurements**: These are distance measurements to detect obstacles or terrain features around the walker. There are 10 of these values.
"""
print("_____OBSERVATION SPACE_____ \n")
print("Observation Space Shape", env.observation_space.shape)
print("Sample observation", env.observation_space.sample()) # Get a random observation
"""### **Action Space**
Actions are motor speed values in the [-1, 1] range for each of the 4 joints at both hips and knees.
"""
print("\n _____ACTION SPACE_____ \n")
print("Action Space Shape", env.action_space.shape)
print("Action Space Sample", env.action_space.sample()) # Take a random action
"""### **Vectorized Environment**
Create a vectorized environment (a method for stacking multiple independent environments into a single environment) of 16 environments to have more diverse experiences.
"""
from stable_baselines3.common.env_util import make_vec_env
env = make_vec_env('BipedalWalker-v3', n_envs=16)
"""# **2. Building the Model**"""
from stable_baselines3 import PPO
model = PPO(
policy = 'MlpPolicy',
env = env,
n_steps = 2048,
batch_size = 128,
n_epochs = 6,
gamma = 0.999,
gae_lambda = 0.98,
ent_coef = 0.01,
verbose=1)
"""# 3.**Video Generation**"""
from wasabi import Printer
import numpy as np
from stable_baselines3.common.base_class import BaseAlgorithm
from pathlib import Path
import tempfile
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import (
DummyVecEnv,
VecEnv,
VecVideoRecorder,
)
msg = Printer()
def generate_replay(
model: BaseAlgorithm,
eval_env: VecEnv,
video_length: int,
is_deterministic: bool,
local_path: Path,
):
"""
Generate a replay video of the agent
:param model: trained model
:param eval_env: environment used to evaluate the agent
:param video_length: length of the video (in timesteps)
:param is_deterministic: use deterministic or stochastic actions
:param local_path: path of the local repository
"""
# This is another temporary directory for video outputs
# SB3 created a -step-0-to-... meta files as well as other
# artifacts which we don't want in the repo.
with tempfile.TemporaryDirectory() as tmpdirname:
# Step 1: Create the VecVideoRecorder
env = VecVideoRecorder(
eval_env,
tmpdirname,
record_video_trigger=lambda x: x == 0,
video_length=video_length,
name_prefix="",
)
obs = env.reset()
lstm_states = None
episode_starts = np.ones((env.num_envs,), dtype=bool)
try:
for _ in range(video_length):
action, lstm_states = model.predict(
obs,
state=lstm_states,
episode_start=episode_starts,
deterministic=is_deterministic,
)
obs, _, episode_starts, _ = env.step(action)
# Save the video
env.close()
# Convert the video with x264 codec
inp = env.video_recorder.path
out = local_path
os.system(f"ffmpeg -y -i {inp} -vcodec h264 {out}".format(inp, out))
print(f"Video saved to: {out}")
except KeyboardInterrupt:
pass
except Exception as e:
msg.fail(str(e))
# Add a message for video
msg.fail(
"We are unable to generate a replay of your agent"
)
"""# **4. Training, Saving and Record the Videos**"""
import os
#create a directory to save the videos
video_dir = "/content/videos"
if not os.path.exists(video_dir):
os.makedirs(video_dir)
env_id = "BipedalWalker-v3"
# Train and generate video at every 100000 steps, adjust the timesteps to your liking
for i in range(0, 2000000, 100000):
model.learn(total_timesteps=100000)
# Save the model
model_name = "ppo-BipedalWalker-v3"
model.save(model_name)
video_name = f"replay_{i + 100000}.mp4"
generate_replay(
model=model,
eval_env=DummyVecEnv([lambda: Monitor(gym.make(env_id, hardcore=True, render_mode="rgb_array"))]),
video_length=1000,
is_deterministic=True,
local_path=os.path.join(video_dir, video_name)
)
model_name = "ppo-BipedalWalker-v3"
model.save(model_name)
with open(os.path.join(video_dir, "filelist.txt"), "w") as f:
for i in range(0, 2000000, 100000):
video_name = f"replay_{i + 100000}.mp4"
f.write(f"file '{os.path.join(video_dir, video_name)}'\n")
# Concatenate all the videos into one
os.system(f"ffmpeg -f concat -safe 0 -i {os.path.join(video_dir, 'filelist.txt')} -c copy {os.path.join(video_dir, 'replay_all.mp4')}")
"""# **5. Visualize Final Video**"""
from IPython.display import HTML
from base64 import b64encode
mp4 = open('videos/replay_all.mp4','rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=600 controls>
<source src="%s" type="video/mp4">
</video>
""" % data_url)
"""# **6. Evaluate the Model**"""
from stable_baselines3.common.evaluation import evaluate_policy
eval_env = Monitor(gym.make("BipedalWalker-v3"))
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)
print(f"mean_reward={mean_reward:.2f} +/- {std_reward}")
"""# **7. Upload to HuggingFace**"""
from huggingface_sb3 import load_from_hub, package_to_hub
from huggingface_hub import notebook_login # To log to our Hugging Face account to be able to upload models to the Hub.
notebook_login()
!git config --global credential.helper store
env_id = "BipedalWalker-v3"
model_name = "ppo-BipedalWalker-v3"
model_architecture = "PPO"
repo_id = "Mahanthesh0r/BipedalWalker-RL" # Change with your repo id
## Define the commit message
commit_message = "Upload PPO BipedalWalker-v3 trained agent"
# Create the evaluation env and set the render_mode="rgb_array"
eval_env = DummyVecEnv([lambda: gym.make(env_id, hardcore=True, render_mode="rgb_array")])
package_to_hub(model=model, # trained model
model_name=model_name, # The name of our trained model
model_architecture=model_architecture, # The model architecture we used: in our case PPO
env_id=env_id, # Name of the environment
eval_env=eval_env,
repo_id=repo_id,
commit_message=commit_message)
"""# **8. Load Models from HuggingFace (Optional)**"""
from huggingface_sb3 import load_from_hub
repo_id = "Mahanthesh0r/BipedalWalker-RL" # The repo_id
filename = "ppo-BipedalWalker-v3.zip" # The model filename.zip
checkpoint = load_from_hub(repo_id, filename)
model = PPO.load(checkpoint, print_system_info=True)
eval_env = Monitor(gym.make("BipedalWalker-v3", hardcore=True))
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=10, deterministic=True)
print(f"mean_reward={mean_reward:.2f} +/- {std_reward}")
...
- Downloads last month
- 2
Evaluation results
- mean_reward on BipedalWalker-v3self-reported-58.54 +/- 39.24