Commit 4c1f39f3 authored by Dipam Chakraborty's avatar Dipam Chakraborty
Browse files

old ppo - save buffer + scale reward

parent ac409306
......@@ -126,7 +126,7 @@ class CustomTorchPolicy(TorchPolicy):
else:
mb_rewards = unroll(samples['rewards'], ts)
# Weird hack that helps in many envs (Yes keep it after normalization)
# Weird hack that helps in many envs (Yes keep it after reward normalization)
rew_scale = self.config["scale_reward"]
if rew_scale != 1.0:
mb_rewards *= rew_scale
......@@ -253,7 +253,9 @@ class CustomTorchPolicy(TorchPolicy):
for ep in range(retune_epochs):
for slices in self.retune_selector.make_minibatches_with_rollouts(self.exp_replay, self.vtarg_replay, replay_pi):
self.tune_policy(slices[0], self.to_tensor(slices[1]), self.to_tensor(slices[2]))
self.exp_replay.fill(0)
self.vtarg_replay.fill(0)
self.retune_selector.retune_done()
def tune_policy(self, obs, target_vf, target_pi):
......@@ -335,7 +337,6 @@ class CustomTorchPolicy(TorchPolicy):
"best_weights": self.best_weights,
"reward_deque": self.reward_deque,
"batch_end_time": self.batch_end_time,
# "retune_selector": self.retune_selector,
"gamma": self.gamma,
"maxrewep_lenbuf": self.maxrewep_lenbuf,
"lr": self.lr,
......@@ -352,7 +353,6 @@ class CustomTorchPolicy(TorchPolicy):
self.best_weights = custom_state_vars["best_weights"]
self.reward_deque = custom_state_vars["reward_deque"]
self.batch_end_time = custom_state_vars["batch_end_time"]
# self.retune_selector = custom_state_vars["retune_selector"]
self.gamma = self.adaptive_discount_tuner.gamma = custom_state_vars["gamma"]
self.maxrewep_lenbuf = custom_state_vars["maxrewep_lenbuf"]
self.lr =custom_state_vars["lr"]
......
......@@ -68,7 +68,7 @@ class CustomTorchPolicy(TorchPolicy):
skips = self.config['retune_skips'],
replay_size = self.config['retune_replay_size'],
num_retunes = self.config['num_retunes'])
self.exp_replay = np.zeros((self.retune_selector.replay_size, *observation_space.shape), dtype=np.uint8)
self.target_timesteps = 8_000_000
self.buffer_time = 20 # TODO: Could try to do a median or mean time step check instead
......@@ -81,6 +81,7 @@ class CustomTorchPolicy(TorchPolicy):
self.ent_coef = config['entropy_coeff']
self.last_dones = np.zeros((nw * self.config['num_envs_per_worker'],))
self.save_success = 0
def to_tensor(self, arr):
return torch.from_numpy(arr).to(self.device)
......@@ -118,20 +119,17 @@ class CustomTorchPolicy(TorchPolicy):
else:
mb_rewards = unroll(samples['rewards'], ts)
# Weird hack that helps in many envs (Yes keep it after reward normalization)
rew_scale = self.config["scale_reward"]
if rew_scale != 1.0:
mb_rewards *= rew_scale
should_skip_train_step = self.best_reward_model_select(samples)
if should_skip_train_step:
self.update_batch_time()
return {} # Not doing last optimization step - This is intentional due to noisy gradients
obs = samples['obs']
## Distill with augmentation
should_retune = self.retune_selector.update(obs)
if should_retune:
self.retune_with_augmentation(obs)
self.update_batch_time()
return {}
## Value prediction
next_obs = unroll(samples['new_obs'], ts)[-1]
......@@ -141,8 +139,6 @@ class CustomTorchPolicy(TorchPolicy):
end = start + nbatch_train
values[start:end], _ = self.model.vf_pi(samples['obs'][start:end], ret_numpy=True, no_grad=True, to_torch=True)
## GAE
mb_values = unroll(values, ts)
mb_returns = np.zeros_like(mb_rewards)
......@@ -189,7 +185,14 @@ class CustomTorchPolicy(TorchPolicy):
apply_grad = (optim_count % self.accumulate_train_batches) == 0
self._batch_train(apply_grad, self.accumulate_train_batches,
lrnow, cliprange, vfcliprange, max_grad_norm, ent_coef, vf_coef, *slices)
## Distill with augmentation
should_retune = self.retune_selector.update(obs, self.exp_replay)
if should_retune:
self.retune_with_augmentation()
self.update_batch_time()
return {}
self.update_gamma(samples)
self.update_lr()
self.update_ent_coef()
......@@ -208,9 +211,6 @@ class CustomTorchPolicy(TorchPolicy):
for g in self.optimizer.param_groups:
g['lr'] = lr
# Advantages are normalized with full size batch instead of memory limited batch
# advs = returns - values
# advs = (advs - torch.mean(advs)) / (torch.std(advs) + 1e-8)
vpred, pi_logits = self.model.vf_pi(obs, ret_numpy=False, no_grad=False, to_torch=False)
neglogpac = neglogp_actions(pi_logits, actions)
entropy = torch.mean(pi_entropy(pi_logits))
......@@ -236,7 +236,7 @@ class CustomTorchPolicy(TorchPolicy):
self.optimizer.zero_grad()
def retune_with_augmentation(self, obs):
def retune_with_augmentation(self):
nbatch_train = self.mem_limited_batch_size
retune_epochs = self.config['retune_epochs']
replay_size = self.retune_selector.replay_size
......@@ -246,13 +246,13 @@ class CustomTorchPolicy(TorchPolicy):
# Store current value function and policy logits
for start in range(0, replay_size, nbatch_train):
end = start + nbatch_train
replay_batch = self.retune_selector.exp_replay[start:end]
replay_batch = self.exp_replay[start:end]
replay_vf[start:end], replay_pi[start:end] = self.model.vf_pi(replay_batch,
ret_numpy=True, no_grad=True, to_torch=True)
optim_count = 0
# Tune vf and pi heads to older predictions with augmented observations
inds = np.arange(len(self.retune_selector.exp_replay))
inds = np.arange(len(self.exp_replay))
for ep in range(retune_epochs):
np.random.shuffle(inds)
for start in range(0, replay_size, nbatch_train):
......@@ -260,11 +260,12 @@ class CustomTorchPolicy(TorchPolicy):
mbinds = inds[start:end]
optim_count += 1
apply_grad = (optim_count % self.accumulate_train_batches) == 0
slices = [self.retune_selector.exp_replay[mbinds],
slices = [self.exp_replay[mbinds],
self.to_tensor(replay_vf[mbinds]),
self.to_tensor(replay_pi[mbinds])]
self.tune_policy(apply_grad, *slices, 0.5)
self.exp_replay.fill(0)
self.retune_selector.retune_done()
def tune_policy(self, apply_grad, obs, target_vf, target_pi, retune_vf_loss_coeff):
......@@ -343,7 +344,6 @@ class CustomTorchPolicy(TorchPolicy):
"best_weights": self.best_weights,
"reward_deque": self.reward_deque,
"batch_end_time": self.batch_end_time,
"num_retunes": self.retune_selector.num_retunes,
"gamma": self.gamma,
"maxrewep_lenbuf": self.maxrewep_lenbuf,
"lr": self.lr,
......@@ -360,7 +360,6 @@ class CustomTorchPolicy(TorchPolicy):
self.best_weights = custom_state_vars["best_weights"]
self.reward_deque = custom_state_vars["reward_deque"]
self.batch_end_time = custom_state_vars["batch_end_time"]
self.retune_selector.set_num_retunes(custom_state_vars["num_retunes"])
self.gamma = self.adaptive_discount_tuner.gamma = custom_state_vars["gamma"]
self.maxrewep_lenbuf = custom_state_vars["maxrewep_lenbuf"]
self.lr =custom_state_vars["lr"]
......
import logging
import os
import time
from ray.rllib.agents.trainer import Trainer, COMMON_CONFIG
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.utils import add_mixins
from ray.rllib.utils.annotations import override, DeveloperAPI
import numpy as np
from zlib import compress, decompress
from sys import getsizeof
logger = logging.getLogger(__name__)
@DeveloperAPI
def build_trainer(name,
default_policy,
default_config=None,
validate_config=None,
get_initial_state=None,
get_policy_class=None,
before_init=None,
make_workers=None,
make_policy_optimizer=None,
after_init=None,
before_train_step=None,
after_optimizer_step=None,
after_train_result=None,
collect_metrics_fn=None,
before_evaluate_fn=None,
mixins=None,
execution_plan=None):
"""Helper function for defining a custom trainer.
Functions will be run in this order to initialize the trainer:
1. Config setup: validate_config, get_initial_state, get_policy
2. Worker setup: before_init, make_workers, make_policy_optimizer
3. Post setup: after_init
Arguments:
name (str): name of the trainer (e.g., "PPO")
default_policy (cls): the default Policy class to use
default_config (dict): The default config dict of the algorithm,
otherwise uses the Trainer default config.
validate_config (func): optional callback that checks a given config
for correctness. It may mutate the config as needed.
get_initial_state (func): optional function that returns the initial
state dict given the trainer instance as an argument. The state
dict must be serializable so that it can be checkpointed, and will
be available as the `trainer.state` variable.
get_policy_class (func): optional callback that takes a config and
returns the policy class to override the default with
before_init (func): optional function to run at the start of trainer
init that takes the trainer instance as argument
make_workers (func): override the method that creates rollout workers.
This takes in (trainer, env_creator, policy, config) as args.
make_policy_optimizer (func): optional function that returns a
PolicyOptimizer instance given (WorkerSet, config)
after_init (func): optional function to run at the end of trainer init
that takes the trainer instance as argument
before_train_step (func): optional callback to run before each train()
call. It takes the trainer instance as an argument.
after_optimizer_step (func): optional callback to run after each
step() call to the policy optimizer. It takes the trainer instance
and the policy gradient fetches as arguments.
after_train_result (func): optional callback to run at the end of each
train() call. It takes the trainer instance and result dict as
arguments, and may mutate the result dict as needed.
collect_metrics_fn (func): override the method used to collect metrics.
It takes the trainer instance as argumnt.
before_evaluate_fn (func): callback to run before evaluation. This
takes the trainer instance as argument.
mixins (list): list of any class mixins for the returned trainer class.
These mixins will be applied in order and will have higher
precedence than the Trainer class
execution_plan (func): Experimental distributed execution
API. This overrides `make_policy_optimizer`.
Returns:
a Trainer instance that uses the specified args.
"""
original_kwargs = locals().copy()
base = add_mixins(Trainer, mixins)
class trainer_cls(base):
_name = name
_default_config = default_config or COMMON_CONFIG
_policy = default_policy
def __init__(self, config=None, env=None, logger_creator=None):
Trainer.__init__(self, config, env, logger_creator)
def _init(self, config, env_creator):
if validate_config:
validate_config(config)
if get_initial_state:
self.state = get_initial_state(self)
else:
self.state = {}
if get_policy_class is None:
self._policy = default_policy
else:
self._policy = get_policy_class(config)
if before_init:
before_init(self)
use_exec_api = (execution_plan
and (self.config["use_exec_api"]
or "RLLIB_EXEC_API" in os.environ))
# Creating all workers (excluding evaluation workers).
if make_workers and not use_exec_api:
self.workers = make_workers(self, env_creator, self._policy,
config)
else:
self.workers = self._make_workers(env_creator, self._policy,
config,
self.config["num_workers"])
self.train_exec_impl = None
self.optimizer = None
self.execution_plan = execution_plan
if use_exec_api:
logger.warning(
"The experimental distributed execution API is enabled "
"for this algorithm. Disable this by setting "
"'use_exec_api': False.")
self.train_exec_impl = execution_plan(self.workers, config)
elif make_policy_optimizer:
self.optimizer = make_policy_optimizer(self.workers, config)
else:
optimizer_config = dict(
config["optimizer"],
**{"train_batch_size": config["train_batch_size"]})
self.optimizer = SyncSamplesOptimizer(self.workers,
**optimizer_config)
if after_init:
after_init(self)
@override(Trainer)
def _train(self):
if self.train_exec_impl:
return self._train_exec_impl()
if before_train_step:
before_train_step(self)
prev_steps = self.optimizer.num_steps_sampled
start = time.time()
optimizer_steps_this_iter = 0
while True:
fetches = self.optimizer.step()
optimizer_steps_this_iter += 1
if after_optimizer_step:
after_optimizer_step(self, fetches)
if (time.time() - start >= self.config["min_iter_time_s"]
and self.optimizer.num_steps_sampled - prev_steps >=
self.config["timesteps_per_iteration"]):
break
if collect_metrics_fn:
res = collect_metrics_fn(self)
else:
res = self.collect_metrics()
res.update(
optimizer_steps_this_iter=optimizer_steps_this_iter,
timesteps_this_iter=self.optimizer.num_steps_sampled -
prev_steps,
info=res.get("info", {}))
if after_train_result:
after_train_result(self, res)
return res
def _train_exec_impl(self):
if before_train_step:
logger.debug("Ignoring before_train_step callback")
res = next(self.train_exec_impl)
if after_train_result:
logger.debug("Ignoring after_train_result callback")
return res
@override(Trainer)
def _before_evaluate(self):
if before_evaluate_fn:
before_evaluate_fn(self)
def __getstate__(self):
state = Trainer.__getstate__(self)
state["trainer_state"] = self.state.copy()
policy = Trainer.get_policy(self)
state["custom_state_vars"] = policy.get_custom_state_vars()
state["optimizer_state"] = {k: v for k, v in policy.optimizer.state_dict().items()}
## Ugly hack to save replay buffer because organizers taking forever to give fix for spot instances
save_success = False
max_size = 3_700_000_000
if policy.exp_replay.nbytes < max_size:
state["replay_buffer"] = policy.exp_replay
state["buffer_saved"] = 1
policy.save_success = 1
save_success = True
elif policy.exp_replay.shape[-1] == 6: # only for frame stack = 2
eq = np.all(policy.exp_replay[1:,...,:3] == policy.exp_replay[:-1,...,-3:], axis=(-3,-2,-1))
non_eq = np.where(1 - eq)
images_non_eq = policy.exp_replay[non_eq]
images_last = policy.exp_replay[-1,...,-3:]
images_first = policy.exp_replay[0,...,:3]
if policy.exp_replay[1:,...,:3].nbytes < max_size:
state["sliced_buffer"] = policy.exp_replay[1:,...,:3]
state["buffer_saved"] = 2
policy.save_success = 2
save_success = True
else:
comp = compress(policy.exp_replay[1:,...,:3].copy(), level=9)
if getsizeof(comp) < max_size:
state["compressed_buffer"] = comp
state["buffer_saved"] = 3
policy.save_success = 3
save_success = True
if save_success:
state["matched_frame_data"] = [non_eq, images_non_eq, images_last, images_first]
if not save_success:
state["buffer_saved"] = -1
policy.save_success = -1
print("####################### BUFFER SAVE FAILED #########################")
else:
state["retune_selector"] = policy.retune_selector
if self.train_exec_impl:
state["train_exec_impl"] = (
self.train_exec_impl.shared_metrics.get().save())
return state
def __setstate__(self, state):
Trainer.__setstate__(self, state)
policy = Trainer.get_policy(self)
self.state = state["trainer_state"].copy()
policy.set_optimizer_state(state["optimizer_state"])
policy.set_custom_state_vars(state["custom_state_vars"])
## Ugly hack to save replay buffer because organizers taking forever to give fix for spot instances
buffer_saved = state.get("buffer_saved", -1)
policy.save_success = buffer_saved
if buffer_saved == 1:
policy.exp_replay = state["replay_buffer"]
elif buffer_saved > 1:
non_eq, images_non_eq, images_last, images_first = state["matched_frame_data"]
policy.exp_replay[non_eq] = images_non_eq
policy.exp_replay[-1,...,-3:] = images_last
policy.exp_replay[0,...,:3] = images_first
if buffer_saved == 2:
policy.exp_replay[1:,...,:3] = state["sliced_buffer"]
elif buffer_saved == 3:
ts = policy.exp_replay[1:,...,:3].shape
dt = policy.exp_replay.dtype
decomp = decompress(state["compressed_buffer"])
policy.exp_replay[1:,...,:3] = np.array(np.frombuffer(decomp, dtype=dt).reshape(ts))
if buffer_saved > 0:
policy.retune_selector = state["retune_selector"]
if self.train_exec_impl:
self.train_exec_impl.shared_metrics.get().restore(
state["train_exec_impl"])
def with_updates(**overrides):
"""Build a copy of this trainer with the specified overrides.
Arguments:
overrides (dict): use this to override any of the arguments
originally passed to build_trainer() for this policy.
"""
return build_trainer(**dict(original_kwargs, **overrides))
trainer_cls.with_updates = staticmethod(with_updates)
trainer_cls.__name__ = name
trainer_cls.__qualname__ = name
return trainer_cls
......@@ -2,7 +2,9 @@ import logging
from ray.rllib.agents import with_common_config
from .custom_torch_policy import CustomTorchPolicy
from ray.rllib.agents.trainer_template import build_trainer
# from ray.rllib.agents.trainer_template import build_trainer
from .custom_trainer_template import build_trainer
logger = logging.getLogger(__name__)
......@@ -83,7 +85,8 @@ DEFAULT_CONFIG = with_common_config({
"entropy_schedule": True,
"max_minibatch_size": 2048,
"updates_per_batch": 8,
"updates_per_batch": 8,
"scale_reward": 1.0,
})
# __sphinx_doc_end__
# yapf: enable
......
......@@ -94,7 +94,6 @@ class RetuneSelector:
def __init__(self, nbatch, ob_space, ac_space, skips = 800_000, replay_size = 200_000, num_retunes = 5):
self.skips = skips + (-skips) % nbatch
self.replay_size = replay_size + (-replay_size) % nbatch
self.exp_replay = np.empty((self.replay_size, *ob_space.shape), dtype=np.uint8)
self.batch_size = nbatch
self.batches_in_replay = self.replay_size // nbatch
......@@ -106,7 +105,7 @@ class RetuneSelector:
self.replay_index = 0
self.buffer_full = False
def update(self, obs_batch):
def update(self, obs_batch, exp_replay):
if self.num_retunes == 0:
return False
......@@ -116,7 +115,7 @@ class RetuneSelector:
start = self.replay_index * self.batch_size
end = start + self.batch_size
self.exp_replay[start:end] = obs_batch
exp_replay[start:end] = obs_batch
self.replay_index = (self.replay_index + 1) % self.batches_in_replay
self.buffer_full = self.buffer_full or (self.replay_index == 0)
......@@ -144,9 +143,8 @@ class RewardNormalizer(object):
def normalize(self, rews, news):
self.ret = self.ret * self.gamma + rews
self.ret_rms.update(self.ret)
# rews = np.clip(rews / np.sqrt(self.ret_rms.var + self.epsilon), -self.cliprew, self.cliprew)
# self.ret[np.array(news, dtype=bool)] = 0. ## Values should be True of False to set positional index
rews = np.float32(rews > 0)
rews = np.clip(rews / np.sqrt(self.ret_rms.var + self.epsilon), -self.cliprew, self.cliprew)
self.ret[np.array(news, dtype=bool)] = 0. ## Values should be True of False to set positional index
return rews
class RunningMeanStd(object):
......
......@@ -9,7 +9,7 @@ procgen-ppo:
# === Settings for Checkpoints ===
checkpoint_freq: 1
checkpoint_freq: 25
checkpoint_at_end: True
keep_checkpoints_num: 5
......@@ -51,6 +51,7 @@ procgen-ppo:
num_retunes: 8
retune_epochs: 3
standardize_rewards: True
scale_reward: 0.6
adaptive_gamma: False
final_lr: 2.0e-4
......@@ -76,8 +77,6 @@ procgen-ppo:
model:
custom_model: impala_torch_custom
custom_options:
# depths: [64, 128, 128]
# nlatents: 1024
depths: [32, 64, 64]
nlatents: 512
init_normed: False
......
......@@ -6,8 +6,8 @@ set -e
#########################################
# export EXPERIMENT_DEFAULT="experiments/impala-baseline.yaml"
# export EXPERIMENT_DEFAULT="experiments/custom-torch-ppo.yaml"
export EXPERIMENT_DEFAULT="experiments/custom-ppg.yaml"
export EXPERIMENT_DEFAULT="experiments/custom-torch-ppo.yaml"
# export EXPERIMENT_DEFAULT="experiments/custom-ppg.yaml"
export EXPERIMENT=${EXPERIMENT:-$EXPERIMENT_DEFAULT}
if [[ -z $AICROWD_IS_GRADING ]]; then
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment