Commit d28d29d6 authored by Dipam Chakraborty's avatar Dipam Chakraborty
Browse files

ppo vf coeff experiment

parent eb956d0f
#from ray.rllib.agents.ppo.ppo import PPOTrainer, DEFAULT_CONFIG
#from ppo_tf_policy import PPOTFPolicy
#__all__ = [
# "PPOTFPolicy",
# "PPOTrainer",
This diff is collapsed.
import logging
import os
import time
from ray.rllib.agents.trainer import Trainer, COMMON_CONFIG
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.utils import add_mixins
from ray.rllib.utils.annotations import override, DeveloperAPI
import numpy as np
from zlib import compress, decompress
from sys import getsizeof
logger = logging.getLogger(__name__)
def build_trainer(name,
"""Helper function for defining a custom trainer.
Functions will be run in this order to initialize the trainer:
1. Config setup: validate_config, get_initial_state, get_policy
2. Worker setup: before_init, make_workers, make_policy_optimizer
3. Post setup: after_init
name (str): name of the trainer (e.g., "PPO")
default_policy (cls): the default Policy class to use
default_config (dict): The default config dict of the algorithm,
otherwise uses the Trainer default config.
validate_config (func): optional callback that checks a given config
for correctness. It may mutate the config as needed.
get_initial_state (func): optional function that returns the initial
state dict given the trainer instance as an argument. The state
dict must be serializable so that it can be checkpointed, and will
be available as the `trainer.state` variable.
get_policy_class (func): optional callback that takes a config and
returns the policy class to override the default with
before_init (func): optional function to run at the start of trainer
init that takes the trainer instance as argument
make_workers (func): override the method that creates rollout workers.
This takes in (trainer, env_creator, policy, config) as args.
make_policy_optimizer (func): optional function that returns a
PolicyOptimizer instance given (WorkerSet, config)
after_init (func): optional function to run at the end of trainer init
that takes the trainer instance as argument
before_train_step (func): optional callback to run before each train()
call. It takes the trainer instance as an argument.
after_optimizer_step (func): optional callback to run after each
step() call to the policy optimizer. It takes the trainer instance
and the policy gradient fetches as arguments.
after_train_result (func): optional callback to run at the end of each
train() call. It takes the trainer instance and result dict as
arguments, and may mutate the result dict as needed.
collect_metrics_fn (func): override the method used to collect metrics.
It takes the trainer instance as argumnt.
before_evaluate_fn (func): callback to run before evaluation. This
takes the trainer instance as argument.
mixins (list): list of any class mixins for the returned trainer class.
These mixins will be applied in order and will have higher
precedence than the Trainer class
execution_plan (func): Experimental distributed execution
API. This overrides `make_policy_optimizer`.
a Trainer instance that uses the specified args.
original_kwargs = locals().copy()
base = add_mixins(Trainer, mixins)
class trainer_cls(base):
_name = name
_default_config = default_config or COMMON_CONFIG
_policy = default_policy
def __init__(self, config=None, env=None, logger_creator=None):
Trainer.__init__(self, config, env, logger_creator)
def _init(self, config, env_creator):
if validate_config:
if get_initial_state:
self.state = get_initial_state(self)
self.state = {}
if get_policy_class is None:
self._policy = default_policy
self._policy = get_policy_class(config)
if before_init:
use_exec_api = (execution_plan
and (self.config["use_exec_api"]
or "RLLIB_EXEC_API" in os.environ))
# Creating all workers (excluding evaluation workers).
if make_workers and not use_exec_api:
self.workers = make_workers(self, env_creator, self._policy,
self.workers = self._make_workers(env_creator, self._policy,
self.train_exec_impl = None
self.optimizer = None
self.execution_plan = execution_plan
if use_exec_api:
"The experimental distributed execution API is enabled "
"for this algorithm. Disable this by setting "
"'use_exec_api': False.")
self.train_exec_impl = execution_plan(self.workers, config)
elif make_policy_optimizer:
self.optimizer = make_policy_optimizer(self.workers, config)
optimizer_config = dict(
**{"train_batch_size": config["train_batch_size"]})
self.optimizer = SyncSamplesOptimizer(self.workers,
if after_init:
policy = Trainer.get_policy(self)
def _train(self):
if self.train_exec_impl:
return self._train_exec_impl()
if before_train_step:
prev_steps = self.optimizer.num_steps_sampled
start = time.time()
optimizer_steps_this_iter = 0
while True:
fetches = self.optimizer.step()
optimizer_steps_this_iter += 1
if after_optimizer_step:
after_optimizer_step(self, fetches)
if (time.time() - start >= self.config["min_iter_time_s"]
and self.optimizer.num_steps_sampled - prev_steps >=
if collect_metrics_fn:
res = collect_metrics_fn(self)
res = self.collect_metrics()
timesteps_this_iter=self.optimizer.num_steps_sampled -
info=res.get("info", {}))
if after_train_result:
after_train_result(self, res)
return res
def _train_exec_impl(self):
if before_train_step:
logger.debug("Ignoring before_train_step callback")
res = next(self.train_exec_impl)
if after_train_result:
logger.debug("Ignoring after_train_result callback")
return res
def _before_evaluate(self):
if before_evaluate_fn:
def __getstate__(self):
state = Trainer.__getstate__(self)
state["trainer_state"] = self.state.copy()
policy = Trainer.get_policy(self)
state["custom_state_vars"] = policy.get_custom_state_vars()
state["optimizer_state"] = {k: v for k, v in policy.optimizer.state_dict().items()}
state["amp_scaler_state"] = {k: v for k, v in policy.amp_scaler.state_dict().items()}
print("################# WARNING: SAVING STATE VARS AND OPTIMIZER FAILED ################")
if self.train_exec_impl:
state["train_exec_impl"] = (
return state
def __setstate__(self, state):
Trainer.__setstate__(self, state)
policy = Trainer.get_policy(self)
self.state = state["trainer_state"].copy()
policy.set_optimizer_state(state["optimizer_state"], state["amp_scaler_state"])
print("################# WARNING: LOADING STATE VARS AND OPTIMIZER FAILED ################")
if self.train_exec_impl:
def with_updates(**overrides):
"""Build a copy of this trainer with the specified overrides.
overrides (dict): use this to override any of the arguments
originally passed to build_trainer() for this policy.
return build_trainer(**dict(original_kwargs, **overrides))
trainer_cls.with_updates = staticmethod(with_updates)
trainer_cls.__name__ = name
trainer_cls.__qualname__ = name
return trainer_cls
import logging
from ray.rllib.agents import with_common_config
from .custom_torch_policy import CustomTorchPolicy
# from ray.rllib.agents.trainer_template import build_trainer
from .custom_trainer_template import build_trainer
logger = logging.getLogger(__name__)
# yapf: disable
# __sphinx_doc_begin__
DEFAULT_CONFIG = with_common_config({
# Should use a critic as a baseline (otherwise don't use value baseline;
# required for using GAE).
"use_critic": True,
# If true, use the Generalized Advantage Estimator (GAE)
# with a value function, see
"use_gae": True,
# The GAE(lambda) parameter.
"lambda": 1.0,
# Initial coefficient for KL divergence.
"kl_coeff": 0.2,
# Size of batches collected from each worker.
"rollout_fragment_length": 200,
# Number of timesteps collected for each SGD round. This defines the size
# of each SGD epoch.
"train_batch_size": 4000,
# Total SGD batch size across all devices for SGD. This defines the
# minibatch size within each epoch.
"sgd_minibatch_size": 128,
# Whether to shuffle sequences in the batch when training (recommended).
"shuffle_sequences": True,
# Number of SGD iterations in each outer loop (i.e., number of epochs to
# execute per train batch).
"num_sgd_iter": 30,
# Stepsize of SGD.
"lr": 5e-5,
# Learning rate schedule.
"lr_schedule": None,
# Share layers for value function. If you set this to True, it's important
# to tune vf_loss_coeff.
"vf_share_layers": False,
# Coefficient of the value function loss. IMPORTANT: you must tune this if
# you set vf_share_layers: True.
"vf_loss_coeff": 1.0,
# Coefficient of the entropy regularizer.
"entropy_coeff": 0.0,
# Decay schedule for the entropy regularizer.
"entropy_coeff_schedule": None,
# PPO clip parameter.
"clip_param": 0.3,
# Clip param for the value function. Note that this is sensitive to the
# scale of the rewards. If your expected V is large, increase this.
"vf_clip_param": 10.0,
# If specified, clip the global norm of gradients by this amount.
"grad_clip": None,
# Target value for KL divergence.
"kl_target": 0.01,
# Whether to rollout "complete_episodes" or "truncate_episodes".
"batch_mode": "truncate_episodes",
# Which observation filter to apply to the observation.
"observation_filter": "NoFilter",
# Uses the sync samples optimizer instead of the multi-gpu one. This is
# usually slower, but you might want to try it if you run into issues with
# the default optimizer.
"simple_optimizer": False,
# Whether to fake GPUs (using CPUs).
# Set this to True for debugging on non-GPU machines (set `num_gpus` > 0).
"_fake_gpus": False,
# Use PyTorch as framework?
"use_pytorch": True,
# Custom swithches
"retune_skips": 300000,
"retune_replay_size": 200000,
"num_retunes": 6,
"retune_epochs": 3,
"standardize_rewards": False,
"accumulate_train_batches": 1,
"adaptive_gamma": False,
"final_lr": 1e-4,
"lr_schedule": True,
"final_entropy_coeff": 0.002,
"entropy_schedule": True,
"max_minibatch_size": 2048,
"updates_per_batch": 8,
"scale_reward": 1.0,
"return_reset": True,
"aux_phase_mixed_precision": False,
"max_time": 100000000,
# __sphinx_doc_end__
# yapf: enable
PPOTrainer = build_trainer(
import numpy as np
from ray.rllib.utils import try_import_torch
from collections import deque
from skimage.util import view_as_windows
torch, nn = try_import_torch()
def neglogp_actions(pi_logits, actions):
return nn.functional.cross_entropy(pi_logits, actions, reduction='none')
def sample_actions(logits, device):
u = torch.rand(logits.shape, dtype=logits.dtype).to(device)
return torch.argmax(logits - torch.log(-torch.log(u)), dim=1)
def pi_entropy(logits):
a0 = logits - torch.max(logits, dim=1, keepdim=True)[0]
ea0 = torch.exp(a0)
z0 = torch.sum(ea0, dim=1, keepdim=True)
p0 = ea0 / z0
return torch.sum(p0 * (torch.log(z0) - a0), axis=1)
def roll(arr):
s = arr.shape
return arr.swapaxes(0, 1).reshape(s[0] * s[1], *s[2:])
def unroll(arr, targetshape):
s = arr.shape
return arr.reshape(*targetshape, *s[1:]).swapaxes(0, 1)
def safe_mean(xs):
return -np.inf if len(xs) == 0 else np.mean(xs)
def pad_and_random_crop(imgs, out, pad):
Vectorized pad and random crop
Assumes square images?
imgs: shape (B,H,W,C)
out: output size (e.g. 64)
# n: batch size.
imgs = np.pad(imgs, [[0, 0], [pad, pad], [pad, pad], [0, 0]])
n = imgs.shape[0]
img_size = imgs.shape[1] # e.g. 64
crop_max = img_size - out
w1 = np.random.randint(0, crop_max, n)
h1 = np.random.randint(0, crop_max, n)
# creates all sliding window
# combinations of size (out)
windows = view_as_windows(imgs, (1, out, out, 1))[..., 0,:,:, 0]
# selects a random window
# for each batch element
cropped = windows[np.arange(n), w1, h1]
cropped = cropped.transpose(0,2,3,1)
return cropped
def random_cutout_color(imgs, min_cut, max_cut):
n, h, w, c = imgs.shape
w1 = np.random.randint(min_cut, max_cut, n)
h1 = np.random.randint(min_cut, max_cut, n)
cutouts = np.empty((n, h, w, c), dtype=imgs.dtype)
rand_box = np.random.randint(0, 255, size=(n, c), dtype=imgs.dtype)
for i, (img, w11, h11) in enumerate(zip(imgs, w1, h1)):
cut_img = img.copy()
# add random box
cut_img[h11:h11 + h11, w11:w11 + w11, :] = rand_box[i]
cutouts[i] = cut_img
return cutouts
def linear_schedule(initial_val, final_val, current_steps, total_steps):
frac = 1.0 - current_steps / total_steps
return (initial_val-final_val) * frac + final_val
def horizon_to_gamma(horizon):
return 1.0 - 1.0/horizon
class AdaptiveDiscountTuner:
def __init__(self, gamma, momentum=0.98, eplenmult=1):
self.gamma = gamma
self.momentum = momentum
self.eplenmult = eplenmult
def update(self, horizon):
if horizon > 0:
htarg = horizon * self.eplenmult
gtarg = horizon_to_gamma(htarg)
self.gamma = self.gamma * self.momentum + gtarg * (1-self.momentum)
return self.gamma
class RetuneSelector:
def __init__(self, nbatch, ob_space, ac_space, skips = 800_000, replay_size = 200_000, num_retunes = 5):
self.skips = skips + (-skips) % nbatch
self.replay_size = replay_size + (-replay_size) % nbatch
self.batch_size = nbatch
self.batches_in_replay = self.replay_size // nbatch
self.num_retunes = num_retunes
self.ac_space = ac_space
self.ob_space = ob_space
self.cooldown_counter = self.skips // self.batch_size
self.replay_index = 0
self.buffer_full = False
def update(self, obs_batch, exp_replay):
if self.num_retunes == 0:
return False
if self.cooldown_counter > 0:
self.cooldown_counter -= 1
return False
start = self.replay_index * self.batch_size
end = start + self.batch_size
exp_replay[start:end] = obs_batch
self.replay_index = (self.replay_index + 1) % self.batches_in_replay
self.buffer_full = self.buffer_full or (self.replay_index == 0)
return self.buffer_full
def retune_done(self):
self.cooldown_counter = self.skips // self.batch_size
self.num_retunes -= 1
self.replay_index = 0
self.buffer_full = False
def set_num_retunes(self, nr):
self.num_retunes = nr
class RewardNormalizer(object):
def __init__(self, gamma=0.99, cliprew=10.0, epsilon=1e-8):
self.epsilon = epsilon
self.gamma = gamma
self.ret_rms = RunningMeanStd(shape=())
self.cliprew = cliprew
self.ret = 0. # size updates after first pass
def normalize(self, rews, news, reset_returns=True):
self.ret = self.ret * self.gamma + rews
rews = np.clip(rews / np.sqrt(self.ret_rms.var + self.epsilon), -self.cliprew, self.cliprew)
if reset_returns:
self.ret[np.array(news, dtype=bool)] = 0. ## Values should be True of False to set positional index
return rews
class RunningMeanStd(object):
def __init__(self, epsilon=1e-4, shape=()):
self.mean = np.zeros(shape, 'float64')
self.var = np.ones(shape, 'float64')
self.count = epsilon
def update(self, x):
batch_mean = np.mean(x, axis=0)
batch_var = np.var(x, axis=0)
batch_count = x.shape[0]
self.update_from_moments(batch_mean, batch_var, batch_count)
def update_from_moments(self, batch_mean, batch_var, batch_count):
self.mean, self.var, self.count = update_mean_var_count_from_moments(
self.mean, self.var, self.count, batch_mean, batch_var, batch_count)
def update_mean_var_count_from_moments(mean, var, count, batch_mean, batch_var, batch_count):
delta = batch_mean - mean
tot_count = count + batch_count
new_mean = mean + delta * batch_count / tot_count
m_a = var * count
m_b = batch_var * batch_count
M2 = m_a + m_b + np.square(delta) * count * batch_count / tot_count
new_var = M2 / tot_count
new_count = tot_count
return new_mean, new_var, new_count
\ No newline at end of file
env: frame_stacked_procgen
run: PPOExperimental
disable_evaluation_worker: True
# === Stop Conditions ===
timesteps_total: 8000000
time_total_s: 7200
# === Settings for Checkpoints ===
checkpoint_freq: 100
checkpoint_at_end: True
keep_checkpoints_num: 5
# === Settings for the Procgen Environment ===
env_name: coinrun
num_levels: 0
start_level: 0
paint_vel_info: False
use_generated_assets: False
center_agent: True
use_sequential_levels: False
distribution_mode: easy
frame_stack: 2
return_min: 0
return_blind: 1
return_max: 10
gamma: 0.996
lambda: 0.95
lr: 5.0e-4
# Number of SGD iterations in each outer loop
num_sgd_iter: 3
vf_loss_coeff: 0.5
entropy_coeff: 0.01
clip_param: 0.2
vf_clip_param: 0.2
grad_clip: 0.5
observation_filter: NoFilter
vf_share_layers: True
horizon: null
soft_horizon: False
no_done_at_end: False
# Custom switches
retune_skips: 350000
retune_replay_size: 200000
num_retunes: 13
retune_epochs: 3
standardize_rewards: True
scale_reward: 1.0
return_reset: False
aux_phase_mixed_precision: False
max_time: 7200
adaptive_gamma: False
final_lr: 5.0e-5
lr_schedule: 'linear'
final_entropy_coeff: 0.002
entropy_schedule: False
# Memory management, if batch size overflow, batch splitting is done to handle it