Commit fada618e authored by Chakraborty's avatar Chakraborty
Browse files

ppg run

parents 3d480be0 1fb3e4c4
......@@ -58,11 +58,11 @@ class CustomTorchPolicy(TorchPolicy):
aux_optim_params = list(network_params - value_params)
ppo_optim_params = list(network_params - aux_params - value_params)
if not self.config['single_optimizer']:
self.optimizer = torch.optim.Adam(ppo_optim_params, lr=self.config['lr'])
self.optimizer = torch.optim.Adam(ppo_optim_params, lr=self.config['lr'], weight_decay=self.config['l2_reg'])
self.optimizer = torch.optim.Adam(network_params, lr=self.config['lr'])
self.aux_optimizer = torch.optim.Adam(aux_optim_params, lr=self.config['aux_lr'])
self.value_optimizer = torch.optim.Adam(value_params, lr=self.config['value_lr'])
self.optimizer = torch.optim.Adam(network_params, lr=self.config['lr'], weight_decay=self.config['l2_reg'])
self.aux_optimizer = torch.optim.Adam(aux_optim_params, lr=self.config['aux_lr'], weight_decay=self.config['l2_reg'])
self.value_optimizer = torch.optim.Adam(value_params, lr=self.config['value_lr'], weight_decay=self.config['l2_reg'])
self.max_reward = self.config['env_config']['return_max']
self.rewnorm = RewardNormalizer(cliprew=self.max_reward) ## TODO: Might need to go to custom state
self.reward_deque = deque(maxlen=100)
......@@ -99,6 +99,7 @@ DEFAULT_CONFIG = with_common_config({
"max_time": 7200,
"pi_phase_mixed_precision": False,
"aux_num_accumulates": 1,
"l2_reg": 0.0,
# __sphinx_doc_end__
# yapf: enable
#from ray.rllib.agents.ppo.ppo import PPOTrainer, DEFAULT_CONFIG
#from ppo_tf_policy import PPOTFPolicy
#__all__ = [
# "PPOTFPolicy",
# "PPOTrainer",
This diff is collapsed.
import logging
import os
import time
from ray.rllib.agents.trainer import Trainer, COMMON_CONFIG
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.utils import add_mixins
from ray.rllib.utils.annotations import override, DeveloperAPI
import numpy as np
from zlib import compress, decompress
from sys import getsizeof
logger = logging.getLogger(__name__)
def build_trainer(name,
"""Helper function for defining a custom trainer.
Functions will be run in this order to initialize the trainer:
1. Config setup: validate_config, get_initial_state, get_policy
2. Worker setup: before_init, make_workers, make_policy_optimizer
3. Post setup: after_init
name (str): name of the trainer (e.g., "PPO")
default_policy (cls): the default Policy class to use
default_config (dict): The default config dict of the algorithm,
otherwise uses the Trainer default config.
validate_config (func): optional callback that checks a given config
for correctness. It may mutate the config as needed.
get_initial_state (func): optional function that returns the initial
state dict given the trainer instance as an argument. The state
dict must be serializable so that it can be checkpointed, and will
be available as the `trainer.state` variable.
get_policy_class (func): optional callback that takes a config and
returns the policy class to override the default with
before_init (func): optional function to run at the start of trainer
init that takes the trainer instance as argument
make_workers (func): override the method that creates rollout workers.
This takes in (trainer, env_creator, policy, config) as args.
make_policy_optimizer (func): optional function that returns a
PolicyOptimizer instance given (WorkerSet, config)
after_init (func): optional function to run at the end of trainer init
that takes the trainer instance as argument
before_train_step (func): optional callback to run before each train()
call. It takes the trainer instance as an argument.
after_optimizer_step (func): optional callback to run after each
step() call to the policy optimizer. It takes the trainer instance
and the policy gradient fetches as arguments.
after_train_result (func): optional callback to run at the end of each
train() call. It takes the trainer instance and result dict as
arguments, and may mutate the result dict as needed.
collect_metrics_fn (func): override the method used to collect metrics.
It takes the trainer instance as argumnt.
before_evaluate_fn (func): callback to run before evaluation. This
takes the trainer instance as argument.
mixins (list): list of any class mixins for the returned trainer class.
These mixins will be applied in order and will have higher
precedence than the Trainer class
execution_plan (func): Experimental distributed execution
API. This overrides `make_policy_optimizer`.
a Trainer instance that uses the specified args.
original_kwargs = locals().copy()
base = add_mixins(Trainer, mixins)
class trainer_cls(base):
_name = name
_default_config = default_config or COMMON_CONFIG
_policy = default_policy
def __init__(self, config=None, env=None, logger_creator=None):
Trainer.__init__(self, config, env, logger_creator)
def _init(self, config, env_creator):
if validate_config:
if get_initial_state:
self.state = get_initial_state(self)
self.state = {}
if get_policy_class is None:
self._policy = default_policy
self._policy = get_policy_class(config)
if before_init:
use_exec_api = (execution_plan
and (self.config["use_exec_api"]
or "RLLIB_EXEC_API" in os.environ))
# Creating all workers (excluding evaluation workers).
if make_workers and not use_exec_api:
self.workers = make_workers(self, env_creator, self._policy,
self.workers = self._make_workers(env_creator, self._policy,
self.train_exec_impl = None
self.optimizer = None
self.execution_plan = execution_plan
if use_exec_api:
"The experimental distributed execution API is enabled "
"for this algorithm. Disable this by setting "
"'use_exec_api': False.")
self.train_exec_impl = execution_plan(self.workers, config)
elif make_policy_optimizer:
self.optimizer = make_policy_optimizer(self.workers, config)
optimizer_config = dict(
**{"train_batch_size": config["train_batch_size"]})
self.optimizer = SyncSamplesOptimizer(self.workers,
if after_init:
policy = Trainer.get_policy(self)
def _train(self):
if self.train_exec_impl:
return self._train_exec_impl()
if before_train_step:
prev_steps = self.optimizer.num_steps_sampled
start = time.time()
optimizer_steps_this_iter = 0
while True:
fetches = self.optimizer.step()
optimizer_steps_this_iter += 1
if after_optimizer_step:
after_optimizer_step(self, fetches)
if (time.time() - start >= self.config["min_iter_time_s"]
and self.optimizer.num_steps_sampled - prev_steps >=
if collect_metrics_fn:
res = collect_metrics_fn(self)
res = self.collect_metrics()
timesteps_this_iter=self.optimizer.num_steps_sampled -
info=res.get("info", {}))
if after_train_result:
after_train_result(self, res)
return res
def _train_exec_impl(self):
if before_train_step:
logger.debug("Ignoring before_train_step callback")
res = next(self.train_exec_impl)
if after_train_result:
logger.debug("Ignoring after_train_result callback")
return res
def _before_evaluate(self):
if before_evaluate_fn:
def __getstate__(self):
state = Trainer.__getstate__(self)
state["trainer_state"] = self.state.copy()
policy = Trainer.get_policy(self)
state["custom_state_vars"] = policy.get_custom_state_vars()
state["optimizer_state"] = {k: v for k, v in policy.optimizer.state_dict().items()}
state["aux_optimizer_state"] = {k: v for k, v in policy.aux_optimizer.state_dict().items()}
state["value_optimizer_state"] = {k: v for k, v in policy.value_optimizer.state_dict().items()}
state["amp_scaler_state"] = {k: v for k, v in policy.amp_scaler.state_dict().items()}
print("################# WARNING: SAVING STATE VARS AND OPTIMIZER FAILED ################")
if self.train_exec_impl:
state["train_exec_impl"] = (
return state
def __setstate__(self, state):
Trainer.__setstate__(self, state)
policy = Trainer.get_policy(self)
self.state = state["trainer_state"].copy()
policy.set_optimizer_state(state["optimizer_state"], state["aux_optimizer_state"],
state["value_optimizer_state"], state["amp_scaler_state"])
print("################# WARNING: LOADING STATE VARS AND OPTIMIZER FAILED ################")
if self.train_exec_impl:
def with_updates(**overrides):
"""Build a copy of this trainer with the specified overrides.
overrides (dict): use this to override any of the arguments
originally passed to build_trainer() for this policy.
return build_trainer(**dict(original_kwargs, **overrides))
trainer_cls.with_updates = staticmethod(with_updates)
trainer_cls.__name__ = name
trainer_cls.__qualname__ = name
return trainer_cls
import logging
from ray.rllib.agents import with_common_config
from .custom_torch_ppg import CustomTorchPolicy
# from ray.rllib.agents.trainer_template import build_trainer
from .custom_trainer_template import build_trainer
logger = logging.getLogger(__name__)
# yapf: disable
# __sphinx_doc_begin__
DEFAULT_CONFIG = with_common_config({
# Should use a critic as a baseline (otherwise don't use value baseline;
# required for using GAE).
"use_critic": True,
# If true, use the Generalized Advantage Estimator (GAE)
# with a value function, see
"use_gae": True,
# The GAE(lambda) parameter.
"lambda": 1.0,
# Initial coefficient for KL divergence.
"kl_coeff": 0.2,
# Size of batches collected from each worker.
"rollout_fragment_length": 200,
# Number of timesteps collected for each SGD round. This defines the size
# of each SGD epoch.
"train_batch_size": 4000,
# Total SGD batch size across all devices for SGD. This defines the
# minibatch size within each epoch.
"sgd_minibatch_size": 128,
# Whether to shuffle sequences in the batch when training (recommended).
"shuffle_sequences": True,
# Number of SGD iterations in each outer loop (i.e., number of epochs to
# execute per train batch).
"num_sgd_iter": 30,
# Stepsize of SGD.
"lr": 5e-5,
# Learning rate schedule.
"lr_schedule": None,
# Share layers for value function. If you set this to True, it's important
# to tune vf_loss_coeff.
"vf_share_layers": False,
# Coefficient of the value function loss. IMPORTANT: you must tune this if
# you set vf_share_layers: True.
"vf_loss_coeff": 1.0,
# Coefficient of the entropy regularizer.
"entropy_coeff": 0.0,
# Decay schedule for the entropy regularizer.
"entropy_coeff_schedule": None,
# PPO clip parameter.
"clip_param": 0.3,
# Clip param for the value function. Note that this is sensitive to the
# scale of the rewards. If your expected V is large, increase this.
"vf_clip_param": 10.0,
# If specified, clip the global norm of gradients by this amount.
"grad_clip": None,
# Target value for KL divergence.
"kl_target": 0.01,
# Whether to rollout "complete_episodes" or "truncate_episodes".
"batch_mode": "truncate_episodes",
# Which observation filter to apply to the observation.
"observation_filter": "NoFilter",
# Uses the sync samples optimizer instead of the multi-gpu one. This is
# usually slower, but you might want to try it if you run into issues with
# the default optimizer.
"simple_optimizer": False,
# Whether to fake GPUs (using CPUs).
# Set this to True for debugging on non-GPU machines (set `num_gpus` > 0).
"_fake_gpus": False,
# Use PyTorch as framework?
"use_pytorch": True,
# Custom swithches
"skips": 0,
"n_pi": 32,
"num_retunes": 100,
"retune_epochs": 6,
"standardize_rewards": True,
"scale_reward": 1.0,
"accumulate_train_batches": 1,
"adaptive_gamma": False,
"final_lr": 2e-4,
"lr_schedule": 'None',
"final_entropy_coeff": 0.002,
"entropy_schedule": False,
"max_minibatch_size": 2048,
"updates_per_batch": 8,
"aux_mbsize": 4,
"augment_buffer": False,
"reset_returns": True,
"flattened_buffer": False,
"augment_randint_num": 6,
"aux_lr": 5e-4,
"value_lr": 1e-3,
"same_lr_everywhere": False,
"aux_phase_mixed_precision": False,
"single_optimizer": False,
"max_time": 7200,
"pi_phase_mixed_precision": False,
"aux_num_accumulates": 1,
# __sphinx_doc_end__
# yapf: enable
PPGTrainer = build_trainer(
import numpy as np
from ray.rllib.utils import try_import_torch
from collections import deque
from skimage.util import view_as_windows
torch, nn = try_import_torch()
import torch.distributions as td
from functools import partial
import itertools
def calculate_gae_buffer(values_buffer, dones_buffer, rewards_buffer, last_values, gamma, lam):
new_returns = np.empty_like(values_buffer)
lastgaelam = 0
nsegs, nsteps = values_buffer.shape[:2]
for s in reversed(range(nsegs)):
mb_values = values_buffer[s]
mb_rewards = rewards_buffer[s]
mb_dones = dones_buffer[s]
mb_returns, _ = calculate_gae(mb_values, mb_dones, mb_rewards,
last_values, gamma, lam)
new_returns[s] = mb_returns
last_values = mb_values[0]
return new_returns
def calculate_gae(mb_values, mb_dones, mb_rewards, last_values, gamma, lam):
lastgaelam = 0
nsteps = mb_values.shape[0]
mb_advs = np.empty_like(mb_values)
for t in reversed(range(nsteps)):
if t == nsteps - 1:
nextvalues = last_values
nextvalues = mb_values[t+1]
nextnonterminal = 1.0 - mb_dones[t]
delta = mb_rewards[t] + gamma * nextvalues * nextnonterminal - mb_values[t]
mb_advs[t] = lastgaelam = delta + gamma * lam * nextnonterminal * lastgaelam
mb_returns = mb_advs + mb_values
return mb_returns, mb_advs
def _make_categorical(x, ncat, shape):
x = x.reshape((x.shape[0], shape, ncat))
return td.Categorical(logits=x)
def dist_build(ac_space):
return partial(_make_categorical, shape=1, ncat=ac_space.n)
def neglogp_actions(pi_logits, actions):
return nn.functional.cross_entropy(pi_logits, actions, reduction='none')
def sample_actions(logits, device):
u = torch.rand(logits.shape, dtype=logits.dtype).to(device)
return torch.argmax(logits - torch.log(-torch.log(u)), dim=1)
def pi_entropy(logits):
a0 = logits - torch.max(logits, dim=1, keepdim=True)[0]
ea0 = torch.exp(a0)
z0 = torch.sum(ea0, dim=1, keepdim=True)
p0 = ea0 / z0
return torch.sum(p0 * (torch.log(z0) - a0), axis=1)
def roll(arr):
s = arr.shape
return arr.swapaxes(0, 1).reshape(s[0] * s[1], *s[2:])
def unroll(arr, targetshape):
s = arr.shape
return arr.reshape(*targetshape, *s[1:]).swapaxes(0, 1)
def safe_mean(xs):
return -np.inf if len(xs) == 0 else np.mean(xs)
def pad_and_random_crop(imgs, out, pad):
Vectorized pad and random crop
Assumes square images?
imgs: shape (B,H,W,C)
out: output size (e.g. 64)
# n: batch size.
imgs = np.pad(imgs, [[0, 0], [pad, pad], [pad, pad], [0, 0]])
n = imgs.shape[0]
img_size = imgs.shape[1] # e.g. 64
crop_max = img_size - out
w1 = np.random.randint(0, crop_max, n)
h1 = np.random.randint(0, crop_max, n)
# creates all sliding window
# combinations of size (out)
windows = view_as_windows(imgs, (1, out, out, 1))[..., 0,:,:, 0]
# selects a random window
# for each batch element
cropped = windows[np.arange(n), w1, h1]
cropped = cropped.transpose(0,2,3,1)
return cropped
def random_cutout_color(imgs, min_cut, max_cut):
n, h, w, c = imgs.shape
w1 = np.random.randint(min_cut, max_cut, n)
h1 = np.random.randint(min_cut, max_cut, n)
cutouts = np.empty((n, h, w, c), dtype=imgs.dtype)
rand_box = np.random.randint(0, 255, size=(n, c), dtype=imgs.dtype)
for i, (img, w11, h11) in enumerate(zip(imgs, w1, h1)):
cut_img = img.copy()
# add random box
cut_img[h11:h11 + h11, w11:w11 + w11, :] = rand_box[i]
cutouts[i] = cut_img
return cutouts
def linear_schedule(initial_val, final_val, current_steps, total_steps):
frac = 1.0 - current_steps / total_steps
return (initial_val-final_val) * frac + final_val
def horizon_to_gamma(horizon):
return 1.0 - 1.0/horizon
class AdaptiveDiscountTuner:
def __init__(self, gamma, momentum=0.98, eplenmult=1):
self.gamma = gamma
self.momentum = momentum
self.eplenmult = eplenmult
def update(self, horizon):
if horizon > 0:
htarg = horizon * self.eplenmult
gtarg = horizon_to_gamma(htarg)
self.gamma = self.gamma * self.momentum + gtarg * (1-self.momentum)
return self.gamma
def flatten01(arr):
return arr.reshape(-1, *arr.shape[2:])
def flatten012(arr):
return arr.reshape(-1, *arr.shape[3:])
class RetuneSelector:
def __init__(self, nenvs, ob_space, ac_space, replay_shape, skips = 0, n_pi = 32, num_retunes = 5, flat_buffer=False):
self.skips = skips
self.n_pi = n_pi
self.nenvs = nenvs
self.exp_replay = np.empty((*replay_shape, *ob_space.shape), dtype=np.uint8)
self.dones_replay = np.empty((*replay_shape,), dtype=np.bool)
self.rewards_replay = np.empty((*replay_shape,), dtype=np.float32)
self.replay_shape = replay_shape
self.num_retunes = num_retunes
self.ac_space = ac_space
self.ob_space = ob_space
self.cooldown_counter = skips
self.replay_index = 0
self.flat_buffer = flat_buffer
def update(self, obs_batch, dones_batch, rewards_batch):
if self.num_retunes == 0:
return False
if self.cooldown_counter > 0:
self.cooldown_counter -= 1
return False
self.exp_replay[self.replay_index] = obs_batch
self.dones_replay[self.replay_index] = dones_batch
self.rewards_replay[self.replay_index] = rewards_batch
self.replay_index = (self.replay_index + 1) % self.n_pi
return self.replay_index == 0
def retune_done(self):
self.cooldown_counter = self.skips
self.num_retunes -= 1
self.replay_index = 0
def make_minibatches(self, presleep_pi, returns_buffer, num_rollouts):
if not self.flat_buffer:
env_segs = list(itertools.product(range(self.n_pi), range(self.nenvs)))
env_segs = np.array(env_segs)
for idx in range(0, len(env_segs), num_rollouts):
esinds = env_segs[idx:idx+num_rollouts]
mbatch = [flatten01(arr[esinds[:,0], : , esinds[:,1]])
for arr in (self.exp_replay, returns_buffer, presleep_pi)]
yield mbatch
nsteps = returns_buffer.shape[1]
buffsize = self.n_pi * nsteps * self.nenvs
inds = np.arange(buffsize)
batchsize = num_rollouts * nsteps