Commit ade85808 authored by Dipam Chakraborty's avatar Dipam Chakraborty
Browse files

ppg pre-changed code

parent d020573a
#from ray.rllib.agents.ppo.ppo import PPOTrainer, DEFAULT_CONFIG
#from ppo_tf_policy import PPOTFPolicy
#__all__ = [
# "DEFAULT_CONFIG",
# "PPOTFPolicy",
# "PPOTrainer",
#]
from ray.rllib.policy.torch_policy import TorchPolicy
import numpy as np
from ray.rllib.utils.torch_ops import convert_to_non_torch_type, convert_to_torch_tensor
from ray.rllib.utils import try_import_torch
from ray.rllib.models import ModelCatalog
from ray.rllib.utils.annotations import override
from collections import deque
from .utils import *
import time
torch, nn = try_import_torch()
class CustomTorchPolicy(TorchPolicy):
"""Example of a random policy
If you are using tensorflow/pytorch to build custom policies,
you might find `build_tf_policy` and `build_torch_policy` to
be useful.
Adopted from examples from https://docs.ray.io/en/master/rllib-concepts.html
"""
def __init__(self, observation_space, action_space, config):
self.config = config
self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
dist_class, logit_dim = ModelCatalog.get_action_dist(
action_space, self.config["model"], framework="torch")
self.model = ModelCatalog.get_model_v2(
obs_space=observation_space,
action_space=action_space,
num_outputs=logit_dim,
model_config=self.config["model"],
framework="torch",
device=self.device,
)
TorchPolicy.__init__(
self,
observation_space=observation_space,
action_space=action_space,
config=config,
model=self.model,
loss=None,
action_distribution_class=dist_class,
)
self.framework = "torch"
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001)
self.max_reward = self.config['env_config']['return_max']
self.rewnorm = RewardNormalizer(cliprew=self.max_reward) ## TODO: Might need to go to custom state
self.reward_deque = deque(maxlen=100)
self.best_reward = -np.inf
self.best_weights = None
self.time_elapsed = 0
self.batch_end_time = time.time()
self.timesteps_total = 0
self.best_rew_tsteps = 0
nw = self.config['num_workers'] if self.config['num_workers'] > 0 else 1
self.nbatch = nw * self.config['num_envs_per_worker'] * self.config['rollout_fragment_length']
self.actual_batch_size = self.nbatch // self.config['updates_per_batch']
self.accumulate_train_batches = int(np.ceil( self.actual_batch_size / self.config['max_minibatch_size'] ))
self.mem_limited_batch_size = self.actual_batch_size // self.accumulate_train_batches
if self.nbatch % self.actual_batch_size != 0 or self.nbatch % self.mem_limited_batch_size != 0:
print("#################################################")
print("WARNING: MEMORY LIMITED BATCHING NOT SET PROPERLY")
print("#################################################")
self.retune_selector = RetuneSelector(self.nbatch, observation_space, action_space,
skips = self.config['retune_skips'],
replay_size = self.config['retune_replay_size'],
num_retunes = self.config['num_retunes'])
self.target_timesteps = 8_000_000
self.buffer_time = 20 # TODO: Could try to do a median or mean time step check instead
self.max_time = 7200
self.maxrewep_lenbuf = deque(maxlen=100)
self.gamma = self.config['gamma']
self.adaptive_discount_tuner = AdaptiveDiscountTuner(self.gamma, momentum=0.98, eplenmult=3)
self.lr = config['lr']
self.ent_coef = config['entropy_coeff']
self.last_dones = np.zeros((nw * self.config['num_envs_per_worker'],))
def to_tensor(self, arr):
return torch.from_numpy(arr).to(self.device)
@override(TorchPolicy)
def learn_on_batch(self, samples):
"""Fused compute gradients and apply gradients call.
Either this or the combination of compute/apply grads must be
implemented by subclasses.
Returns:
grad_info: dictionary of extra metadata from compute_gradients().
Examples:
>>> batch = ev.sample()
>>> ev.learn_on_batch(samples)
Reference: https://github.com/ray-project/ray/blob/master/rllib/policy/policy.py#L279-L316
"""
## Config data values
nbatch = self.nbatch
nbatch_train = self.mem_limited_batch_size
gamma, lam = self.gamma, self.config['lambda']
nsteps = self.config['rollout_fragment_length']
nenvs = nbatch//nsteps
ts = (nenvs, nsteps)
mb_dones = unroll(samples['dones'], ts)
## Reward Normalization - No reward norm works well for many envs
if self.config['standardize_rewards']:
mb_origrewards = unroll(samples['rewards'], ts)
mb_rewards = np.zeros_like(mb_origrewards)
mb_rewards[0] = self.rewnorm.normalize(mb_origrewards[0], self.last_dones)
for ii in range(1, nsteps):
mb_rewards[ii] = self.rewnorm.normalize(mb_origrewards[ii], mb_dones[ii-1])
self.last_dones = mb_dones[-1]
else:
mb_rewards = unroll(samples['rewards'], ts)
should_skip_train_step = self.best_reward_model_select(samples)
if should_skip_train_step:
self.update_batch_time()
return {} # Not doing last optimization step - This is intentional due to noisy gradients
obs = samples['obs']
## Distill with augmentation
should_retune = self.retune_selector.update(obs)
if should_retune:
self.retune_with_augmentation(obs)
self.update_batch_time()
return {}
## Value prediction
next_obs = unroll(samples['new_obs'], ts)[-1]
last_values, _ = self.model.vf_pi(next_obs, ret_numpy=True, no_grad=True, to_torch=True)
values = np.empty((nbatch,), dtype=np.float32)
for start in range(0, nbatch, nbatch_train): # Causes OOM up if trying to do all at once
end = start + nbatch_train
values[start:end], _ = self.model.vf_pi(samples['obs'][start:end], ret_numpy=True, no_grad=True, to_torch=True)
## GAE
mb_values = unroll(values, ts)
mb_returns = np.zeros_like(mb_rewards)
mb_advs = np.zeros_like(mb_rewards)
lastgaelam = 0
for t in reversed(range(nsteps)):
if t == nsteps - 1:
nextvalues = last_values
else:
nextvalues = mb_values[t+1]
nextnonterminal = 1.0 - mb_dones[t]
delta = mb_rewards[t] + gamma * nextvalues * nextnonterminal - mb_values[t]
mb_advs[t] = lastgaelam = delta + gamma * lam * nextnonterminal * lastgaelam
mb_returns = mb_advs + mb_values
## Data from config
cliprange, vfcliprange = self.config['clip_param'], self.config['vf_clip_param']
lrnow = self.lr
max_grad_norm = self.config['grad_clip']
ent_coef, vf_coef = self.ent_coef, self.config['vf_loss_coeff']
neglogpacs = -samples['action_logp'] ## np.isclose seems to be True always, otherwise compute again if needed
noptepochs = self.config['num_sgd_iter']
actions = samples['actions']
returns = roll(mb_returns)
## Train multiple epochs
optim_count = 0
inds = np.arange(nbatch)
for _ in range(noptepochs):
np.random.shuffle(inds)
normalized_advs = returns - values
# Can do this because actual_batch_size is a multiple of mem_limited_batch_size
for start in range(0, nbatch, self.actual_batch_size):
end = start + self.actual_batch_size
mbinds = inds[start:end]
advs_batch = normalized_advs[mbinds].copy()
normalized_advs[mbinds] = (advs_batch - np.mean(advs_batch)) / (np.std(advs_batch) + 1e-8)
for start in range(0, nbatch, nbatch_train):
end = start + nbatch_train
mbinds = inds[start:end]
slices = (self.to_tensor(arr[mbinds]) for arr in (obs, returns, actions, values, neglogpacs, normalized_advs))
optim_count += 1
apply_grad = (optim_count % self.accumulate_train_batches) == 0
self._batch_train(apply_grad, self.accumulate_train_batches,
lrnow, cliprange, vfcliprange, max_grad_norm, ent_coef, vf_coef, *slices)
self.update_gamma(samples)
self.update_lr()
self.update_ent_coef()
self.update_batch_time()
return {}
def update_batch_time(self):
self.time_elapsed += time.time() - self.batch_end_time
self.batch_end_time = time.time()
def _batch_train(self, apply_grad, num_accumulate,
lr, cliprange, vfcliprange, max_grad_norm,
ent_coef, vf_coef,
obs, returns, actions, values, neglogpac_old, advs):
for g in self.optimizer.param_groups:
g['lr'] = lr
# Advantages are normalized with full size batch instead of memory limited batch
# advs = returns - values
# advs = (advs - torch.mean(advs)) / (torch.std(advs) + 1e-8)
vpred, pi_logits = self.model.vf_pi(obs, ret_numpy=False, no_grad=False, to_torch=False)
neglogpac = neglogp_actions(pi_logits, actions)
entropy = torch.mean(pi_entropy(pi_logits))
vpredclipped = values + torch.clamp(vpred - values, -vfcliprange, vfcliprange)
vf_losses1 = torch.pow((vpred - returns), 2)
vf_losses2 = torch.pow((vpredclipped - returns), 2)
vf_loss = .5 * torch.mean(torch.max(vf_losses1, vf_losses2))
ratio = torch.exp(neglogpac_old - neglogpac)
pg_losses1 = -advs * ratio
pg_losses2 = -advs * torch.clamp(ratio, 1-cliprange, 1+cliprange)
pg_loss = torch.mean(torch.max(pg_losses1, pg_losses2))
loss = pg_loss - entropy * ent_coef + vf_loss * vf_coef
loss = loss / num_accumulate
loss.backward()
if apply_grad:
nn.utils.clip_grad_norm_(self.model.parameters(), max_grad_norm)
self.optimizer.step()
self.optimizer.zero_grad()
def retune_with_augmentation(self, obs):
nbatch_train = self.mem_limited_batch_size
retune_epochs = self.config['retune_epochs']
replay_size = self.retune_selector.replay_size
replay_vf = np.empty((replay_size,), dtype=np.float32)
replay_pi = np.empty((replay_size, self.retune_selector.ac_space.n), dtype=np.float32)
# Store current value function and policy logits
for start in range(0, replay_size, nbatch_train):
end = start + nbatch_train
replay_batch = self.retune_selector.exp_replay[start:end]
replay_vf[start:end], replay_pi[start:end] = self.model.vf_pi(replay_batch,
ret_numpy=True, no_grad=True, to_torch=True)
optim_count = 0
# Tune vf and pi heads to older predictions with augmented observations
inds = np.arange(len(self.retune_selector.exp_replay))
for ep in range(retune_epochs):
np.random.shuffle(inds)
for start in range(0, replay_size, nbatch_train):
end = start + nbatch_train
mbinds = inds[start:end]
optim_count += 1
apply_grad = (optim_count % self.accumulate_train_batches) == 0
slices = [self.retune_selector.exp_replay[mbinds],
self.to_tensor(replay_vf[mbinds]),
self.to_tensor(replay_pi[mbinds])]
self.tune_policy(apply_grad, *slices, 0.5)
self.retune_selector.retune_done()
def tune_policy(self, apply_grad, obs, target_vf, target_pi, retune_vf_loss_coeff):
obs_aug = np.empty(obs.shape, obs.dtype)
aug_idx = np.random.randint(3, size=len(obs))
obs_aug[aug_idx == 0] = pad_and_random_crop(obs[aug_idx == 0], 64, 10)
obs_aug[aug_idx == 1] = random_cutout_color(obs[aug_idx == 1], 10, 30)
obs_aug[aug_idx == 2] = obs[aug_idx == 2]
obs_aug = self.to_tensor(obs_aug)
with torch.no_grad():
tpi_log_softmax = nn.functional.log_softmax(target_pi, dim=1)
tpi_softmax = torch.exp(tpi_log_softmax)
vpred, pi_logits = self.model.vf_pi(obs_aug, ret_numpy=False, no_grad=False, to_torch=False)
pi_log_softmax = nn.functional.log_softmax(pi_logits, dim=1)
pi_loss = torch.mean(torch.sum(tpi_softmax * (tpi_log_softmax - pi_log_softmax) , dim=1)) # kl_div torch 1.3.1 has numerical issues
vf_loss = .5 * torch.mean(torch.pow(vpred - target_vf, 2))
loss = retune_vf_loss_coeff * vf_loss + pi_loss
loss = loss / self.accumulate_train_batches
loss.backward()
if apply_grad:
self.optimizer.step()
self.optimizer.zero_grad()
def best_reward_model_select(self, samples):
self.timesteps_total += self.nbatch
## Best reward model selection
eprews = [info['episode']['r'] for info in samples['infos'] if 'episode' in info]
self.reward_deque.extend(eprews)
mean_reward = safe_mean(eprews) if len(eprews) >= 100 else safe_mean(self.reward_deque)
if self.best_reward < mean_reward:
self.best_reward = mean_reward
self.best_weights = self.get_weights()["current_weights"]
self.best_rew_tsteps = self.timesteps_total
if self.timesteps_total > self.target_timesteps or (self.time_elapsed + self.buffer_time) > self.max_time:
if self.best_weights is not None:
self.set_model_weights(self.best_weights)
return True
return False
def update_lr(self):
if self.config['lr_schedule'] == 'linear':
self.lr = linear_schedule(initial_val=self.config['lr'],
final_val=self.config['final_lr'],
current_steps=self.timesteps_total,
total_steps=self.target_timesteps)
elif self.config['lr_schedule'] == 'exponential':
self.lr = 0.997 * self.lr
def update_ent_coef(self):
if self.config['entropy_schedule']:
self.ent_coef = linear_schedule(initial_val=self.config['entropy_coeff'],
final_val=self.config['final_entropy_coeff'],
current_steps=self.timesteps_total,
total_steps=self.target_timesteps)
def update_gamma(self, samples):
if self.config['adaptive_gamma']:
epinfobuf = [info['episode'] for info in samples['infos'] if 'episode' in info]
self.maxrewep_lenbuf.extend([epinfo['l'] for epinfo in epinfobuf if epinfo['r'] >= self.max_reward])
sorted_nth = lambda buf, n: np.nan if len(buf) < 100 else sorted(self.maxrewep_lenbuf.copy())[n]
target_horizon = sorted_nth(self.maxrewep_lenbuf, 80)
self.gamma = self.adaptive_discount_tuner.update(target_horizon)
def get_custom_state_vars(self):
return {
"time_elapsed": self.time_elapsed,
"timesteps_total": self.timesteps_total,
"best_weights": self.best_weights,
"reward_deque": self.reward_deque,
"batch_end_time": self.batch_end_time,
"num_retunes": self.retune_selector.num_retunes,
"gamma": self.gamma,
"maxrewep_lenbuf": self.maxrewep_lenbuf,
"lr": self.lr,
"ent_coef": self.ent_coef,
"rewnorm": self.rewnorm,
"best_rew_tsteps": self.best_rew_tsteps,
"best_reward": self.best_reward,
"last_dones": self.last_dones,
}
def set_custom_state_vars(self, custom_state_vars):
self.time_elapsed = custom_state_vars["time_elapsed"]
self.timesteps_total = custom_state_vars["timesteps_total"]
self.best_weights = custom_state_vars["best_weights"]
self.reward_deque = custom_state_vars["reward_deque"]
self.batch_end_time = custom_state_vars["batch_end_time"]
self.retune_selector.set_num_retunes(custom_state_vars["num_retunes"])
self.gamma = self.adaptive_discount_tuner.gamma = custom_state_vars["gamma"]
self.maxrewep_lenbuf = custom_state_vars["maxrewep_lenbuf"]
self.lr =custom_state_vars["lr"]
self.ent_coef = custom_state_vars["ent_coef"]
self.rewnorm = custom_state_vars["rewnorm"]
self.best_rew_tsteps = custom_state_vars["best_rew_tsteps"]
self.best_reward = custom_state_vars["best_reward"]
self.last_dones = custom_state_vars["last_dones"]
@override(TorchPolicy)
def get_weights(self):
weights = {}
weights["current_weights"] = {
k: v.cpu().detach().numpy()
for k, v in self.model.state_dict().items()
}
weights["optimizer_state"] = {
k: v
for k, v in self.optimizer.state_dict().items()
}
weights["custom_state_vars"] = self.get_custom_state_vars()
return weights
@override(TorchPolicy)
def set_weights(self, weights):
self.set_model_weights(weights["current_weights"])
self.set_optimizer_state(weights["optimizer_state"])
self.set_custom_state_vars(weights["custom_state_vars"])
def set_optimizer_state(self, optimizer_state):
optimizer_state = convert_to_torch_tensor(optimizer_state, device=self.device)
self.optimizer.load_state_dict(optimizer_state)
def set_model_weights(self, model_weights):
model_weights = convert_to_torch_tensor(model_weights, device=self.device)
self.model.load_state_dict(model_weights)
\ No newline at end of file
import logging
from ray.rllib.agents import with_common_config
from .custom_torch_policy import CustomTorchPolicy
from ray.rllib.agents.trainer_template import build_trainer
logger = logging.getLogger(__name__)
# yapf: disable
# __sphinx_doc_begin__
DEFAULT_CONFIG = with_common_config({
# Should use a critic as a baseline (otherwise don't use value baseline;
# required for using GAE).
"use_critic": True,
# If true, use the Generalized Advantage Estimator (GAE)
# with a value function, see https://arxiv.org/pdf/1506.02438.pdf.
"use_gae": True,
# The GAE(lambda) parameter.
"lambda": 1.0,
# Initial coefficient for KL divergence.
"kl_coeff": 0.2,
# Size of batches collected from each worker.
"rollout_fragment_length": 200,
# Number of timesteps collected for each SGD round. This defines the size
# of each SGD epoch.
"train_batch_size": 4000,
# Total SGD batch size across all devices for SGD. This defines the
# minibatch size within each epoch.
"sgd_minibatch_size": 128,
# Whether to shuffle sequences in the batch when training (recommended).
"shuffle_sequences": True,
# Number of SGD iterations in each outer loop (i.e., number of epochs to
# execute per train batch).
"num_sgd_iter": 30,
# Stepsize of SGD.
"lr": 5e-5,
# Learning rate schedule.
"lr_schedule": None,
# Share layers for value function. If you set this to True, it's important
# to tune vf_loss_coeff.
"vf_share_layers": False,
# Coefficient of the value function loss. IMPORTANT: you must tune this if
# you set vf_share_layers: True.
"vf_loss_coeff": 1.0,
# Coefficient of the entropy regularizer.
"entropy_coeff": 0.0,
# Decay schedule for the entropy regularizer.
"entropy_coeff_schedule": None,
# PPO clip parameter.
"clip_param": 0.3,
# Clip param for the value function. Note that this is sensitive to the
# scale of the rewards. If your expected V is large, increase this.
"vf_clip_param": 10.0,
# If specified, clip the global norm of gradients by this amount.
"grad_clip": None,
# Target value for KL divergence.
"kl_target": 0.01,
# Whether to rollout "complete_episodes" or "truncate_episodes".
"batch_mode": "truncate_episodes",
# Which observation filter to apply to the observation.
"observation_filter": "NoFilter",
# Uses the sync samples optimizer instead of the multi-gpu one. This is
# usually slower, but you might want to try it if you run into issues with
# the default optimizer.
"simple_optimizer": False,
# Whether to fake GPUs (using CPUs).
# Set this to True for debugging on non-GPU machines (set `num_gpus` > 0).
"_fake_gpus": False,
# Use PyTorch as framework?
"use_pytorch": True,
# Custom swithches
"retune_skips": 300000,
"retune_replay_size": 200000,
"num_retunes": 6,
"retune_epochs": 3,
"standardize_rewards": False,
"accumulate_train_batches": 1,
"adaptive_gamma": False,
"final_lr": 1e-4,
"lr_schedule": True,
"final_entropy_coeff": 0.002,
"entropy_schedule": True,
"max_minibatch_size": 2048,
"updates_per_batch": 8,
})
# __sphinx_doc_end__
# yapf: enable
PPOTrainer = build_trainer(
name="CustomTorchPPOAgent",
default_config=DEFAULT_CONFIG,
default_policy=CustomTorchPolicy)
import numpy as np
from ray.rllib.utils import try_import_torch
from collections import deque
from skimage.util import view_as_windows
torch, nn = try_import_torch()
def neglogp_actions(pi_logits, actions):
return nn.functional.cross_entropy(pi_logits, actions, reduction='none')
def sample_actions(logits, device):
u = torch.rand(logits.shape, dtype=logits.dtype).to(device)
return torch.argmax(logits - torch.log(-torch.log(u)), dim=1)
def pi_entropy(logits):
a0 = logits - torch.max(logits, dim=1, keepdim=True)[0]
ea0 = torch.exp(a0)
z0 = torch.sum(ea0, dim=1, keepdim=True)
p0 = ea0 / z0
return torch.sum(p0 * (torch.log(z0) - a0), axis=1)
def roll(arr):
s = arr.shape
return arr.swapaxes(0, 1).reshape(s[0] * s[1], *s[2:])
def unroll(arr, targetshape):
s = arr.shape
return arr.reshape(*targetshape, *s[1:]).swapaxes(0, 1)
def safe_mean(xs):
return -np.inf if len(xs) == 0 else np.mean(xs)
def pad_and_random_crop(imgs, out, pad):
"""
Vectorized pad and random crop
Assumes square images?
args:
imgs: shape (B,H,W,C)
out: output size (e.g. 64)
"""
# n: batch size.
imgs = np.pad(imgs, [[0, 0], [pad, pad], [pad, pad], [0, 0]])
n = imgs.shape[0]
img_size = imgs.shape[1] # e.g. 64
crop_max = img_size - out
w1 = np.random.randint(0, crop_max, n)
h1 = np.random.randint(0, crop_max, n)
# creates all sliding window
# combinations of size (out)
windows = view_as_windows(imgs, (1, out, out, 1))[..., 0,:,:, 0]
# selects a random window
# for each batch element
cropped = windows[np.arange(n), w1, h1]
cropped = cropped.transpose(0,2,3,1)
return cropped
def random_cutout_color(imgs, min_cut, max_cut):
n, h, w, c = imgs.shape
w1 = np.random.randint(min_cut, max_cut, n)
h1 = np.random.randint(min_cut, max_cut, n)
cutouts = np.empty((n, h, w, c), dtype=imgs.dtype)
rand_box = np.random.randint(0, 255, size=(n, c), dtype=imgs.dtype)
for i, (img, w11, h11) in enumerate(zip(imgs, w1, h1)):
cut_img = img.copy()
# add random box
cut_img[h11:h11 + h11, w11:w11 + w11, :] = rand_box[i]
cutouts[i] = cut_img
return cutouts
def linear_schedule(initial_val, final_val, current_steps, total_steps):
frac = 1.0 - current_steps / total_steps
return (initial_val-final_val) * frac + final_val