Commit d76a69e2 authored by nilabha's avatar nilabha
Browse files

Changes for pure and ppo and imitation converted to use custom train fn

parent 4c60e6c2
Pipeline #5014 passed with stage
in 13 minutes and 1 second
......@@ -7,9 +7,11 @@ from ray.rllib.utils.annotations import override
from ray.rllib.agents.ppo.ppo import PPOTrainer
import ray
from ray import tune
from ray.tune.trainable import Trainable
import numpy as np
import os
import math
import ray
import yaml
......@@ -27,11 +29,14 @@ tf = try_import_tf()
from ray.tune import registry
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.optimizers import PolicyOptimizer, SyncSamplesOptimizer
from ray.rllib.models import ModelCatalog
from utils.argparser import create_parser
from utils.loader import load_envs, load_models
from utils.loader import load_envs, load_models, load_algorithms
from envs.flatland import get_eval_config
from ray.rllib.utils import merge_dicts
from ray.rllib.evaluation.metrics import collect_metrics
# Custom wandb logger with hotfix to allow custom callbacks
from wandblogger import WandbLogger
import pandas as pd
......@@ -51,6 +56,8 @@ from ray.rllib.execution.train_ops import TrainOneStep
from ray.rllib.execution.metric_ops import StandardMetricsReporting
import numpy as np
import logging
logger = logging.getLogger(__name__)
from flatland.envs.agent_utils import RailAgentStatus
......@@ -86,6 +93,13 @@ ImitationTFPolicy = build_tf_policy(
optimizer_fn=adam_optimizer,
)
class ImitationMetrics(PolicyOptimizer):
"""Adding metrics."""
@override(PolicyOptimizer)
def step(self):
pass
class ImitationAgent(PPOTrainer):
"""Policy that takes random actions and never learns."""
......@@ -104,69 +118,95 @@ class ImitationAgent(PPOTrainer):
env_creator, self._policy, config, self.config["num_workers"])
self.execution_plan = default_execution_plan
self.train_exec_impl = self.execution_plan(self.workers, config)
def eval(self):
import tensorflow as tf
policy = self.get_policy()
steps = 0
all_scores = 0
all_completion = 0
eval_episodes = 2
for _ in range(eval_episodes):
env = self.env._env.rail_env
obs = self.env.reset()
num_outputs = env.action_space[0]
n_agents = env.get_num_agents()
# TODO : Update max_steps as per latest version
# https://gitlab.aicrowd.com/flatland/flatland-examples/blob/master/reinforcement_learning/multi_agent_training.py
# max_steps = int(4 * 2 * (env.height + env.width + (n_agents / n_cities))) - 1
max_steps = int(4 * 2 * (20 + env.height + env.width))
episode_steps = 0
episode_max_steps = 0
episode_num_agents = 0
episode_score = 0
episode_done_agents = 0
# obs = self.env.reset()
done = {}
done["__all__"] = False
action_dict = {i:2 for i in range(n_agents)}
# TODO: Support for batch update
# batch_size = 2
# logits, _ = policy.model.forward({"obs": np.vstack([obs[a],obs[a]])}, [], None)
for step in range(max_steps):
for a in range(n_agents):
if not done.get(a) and obs.get(a) is not None:
input_dict = {"obs": np.expand_dims(obs[a],0)}
input_dict['obs_flat'] = input_dict['obs']
logits, _ = policy.model.forward(input_dict, [], None)
model_logits = tf.squeeze(logits)
action_dict[a] = tf.math.argmax(model_logits).numpy()
obs, all_rewards, done, info = self.env.step(action_dict)
steps += 1
for agent, agent_info in info.items():
if episode_max_steps == 0:
episode_max_steps = agent_info["max_episode_steps"]
episode_num_agents = agent_info["num_agents"]
episode_steps = max(episode_steps, agent_info["agent_step"])
episode_score += agent_info["agent_score"]
if agent_info["agent_done"]:
episode_done_agents += 1
if done["__all__"]:
all_scores += episode_score
all_completion += float(episode_done_agents) / n_agents
break
return {
"episode_reward_mean": all_scores/eval_episodes,
"episode_completion_mean": all_completion/eval_episodes,
"timesteps_this_iter": steps,
}
self.optimizer = ImitationMetrics(self.workers)
# @override(Trainer)
# def collect_metrics(self, selected_workers=None):
# weights = ray.put(self.workers.local_worker().save())
# self.evaluation_workers.foreach_worker(
# lambda w: w.restore(ray.get(weights)))
# self.workers.foreach_worker(
# lambda w: w.restore(ray.get(weights)))
# ray.get([
# w.sample.remote()
# for w in self.workers.remote_workers()
# ])
# metrics = collect_metrics(self.workers.local_worker(),
# self.workers.remote_workers())
# res = self.optimizer.collect_metrics(
# self.config["collect_metrics_timeout"],
# min_history=self.config["metrics_smoothing_episodes"],
# selected_workers=selected_workers)
# return res
# @override(Trainer)
# def _evaluate(self):
# import tensorflow as tf
# policy = self.get_policy()
# steps = 0
# all_scores = 0
# all_completion = 0
# eval_episodes = self.config.get("evaluation_num_episodes",2)
# for _ in range(eval_episodes):
# env = self.env._env.rail_env
# obs = self.env.reset()
# num_outputs = env.action_space[0]
# n_agents = env.get_num_agents()
# # TODO : Update max_steps as per latest version
# # https://gitlab.aicrowd.com/flatland/flatland-examples/blob/master/reinforcement_learning/multi_agent_training.py
# # max_steps = int(4 * 2 * (env.height + env.width + (n_agents / n_cities))) - 1
# max_steps = int(4 * 2 * (20 + env.height + env.width))
# episode_steps = 0
# episode_max_steps = 0
# episode_num_agents = 0
# episode_score = 0
# episode_done_agents = 0
# # obs = self.env.reset()
# done = {}
# done["__all__"] = False
# action_dict = {i:2 for i in range(n_agents)}
# # TODO: Support for batch update
# # batch_size = 2
# # logits, _ = policy.model.forward({"obs": np.vstack([obs[a],obs[a]])}, [], None)
# for step in range(max_steps):
# for a in range(n_agents):
# if not done.get(a) and obs.get(a) is not None:
# input_dict = {"obs": np.expand_dims(obs[a],0)}
# input_dict['obs_flat'] = input_dict['obs']
# logits, _ = policy.model.forward(input_dict, [], None)
# model_logits = tf.squeeze(logits)
# action_dict[a] = tf.math.argmax(model_logits).numpy()
# obs, all_rewards, done, info = self.env.step(action_dict)
# steps += 1
# for agent, agent_info in info.items():
# if episode_max_steps == 0:
# episode_max_steps = agent_info["max_episode_steps"]
# episode_num_agents = agent_info["num_agents"]
# episode_steps = max(episode_steps, agent_info["agent_step"])
# episode_score += agent_info["agent_score"]
# if agent_info["agent_done"]:
# episode_done_agents += 1
# if done["__all__"]:
# all_scores += episode_score
# all_completion += float(episode_done_agents) / n_agents
# break
# result = {
# "episode_reward_mean": all_scores/eval_episodes,
# "episode_completion_mean": all_completion/eval_episodes,
# "timesteps_this_iter": steps,
# }
# return result
@override(Trainer)
......@@ -174,7 +214,8 @@ class ImitationAgent(PPOTrainer):
import tensorflow as tf
policy = self.get_policy()
steps = 0
for _ in range(1):
n_episodes = 1
for _ in range(n_episodes):
env = self.env._env.rail_env
obs = self.env.reset()
num_outputs = env.action_space[0]
......@@ -222,7 +263,7 @@ class ImitationAgent(PPOTrainer):
self.workers.local_worker().apply_gradients(gradients)
weights = ray.put(self.workers.local_worker().get_weights())
# print(self.workers.local_worker().get_weights())
# print(self.workers.local_worker().get_weights()['default_policy'][0][:4])
for e in self.workers.remote_workers():
e.set_weights.remote(weights)
......@@ -242,9 +283,135 @@ class ImitationAgent(PPOTrainer):
print(float(episode_done_agents) / n_agents)
break
return {
"episode_reward_mean": episode_score,
result = {
"expert_episode_reward_mean": episode_score,
"episode_reward_mean" : episode_score,
"expert_episode_completion_mean": float(episode_done_agents) / n_agents,
"episodes_this_iter": n_episodes,
"timesteps_this_iter": steps,
}
# Code taken from _train method of trainer_template.py - TODO: Not working
# res = self.collect_metrics()
# res = {}
# res.update(
# optimizer_steps_this_iter=steps,
# episode_reward_mean=episode_score,
# info=res.get("info", {}))
# res.update(expert_scores = result)
return result
# @override(Trainable)
# def _save(self, checkpoint_dir):
# checkpoint_path = os.path.join(checkpoint_dir,
# "checkpoint-{}".format(self.iteration))
# pickle.dump(self.__getstate__(), open(checkpoint_path, "wb"))
# return checkpoint_path
# @override(Trainable)
# def save(self, checkpoint_dir):
# return self._save(checkpoint_dir)
if __name__ == "__main__":
# Copy this file to the root folder to run
from train import on_episode_end
exp = {}
exp['run']= "ImitationAgent"
exp['env']= "flatland_sparse"
# exp['stop'] = {"timesteps_total": 15000}
exp['stop'] = {"iterations": 4}
exp['checkpoint_freq'] = 2
# exp['checkpoint_at_end'] = True
# exp['keep_checkpoints_num']= 100
# exp['checkpoint_score_attr']: "episode_reward_mean"
# exp['num_samples']= 3
config = {
"num_workers": 1,
"num_envs_per_worker": 1,
"num_gpus": 0,
"clip_rewards": False,
"vf_clip_param": 500.0,
"entropy_coeff": 0.01,
# effective batch_size: train_batch_size * num_agents_in_each_environment [5, 10]
# see https://github.com/ray-project/ray/issues/4628
"train_batch_size": 1000, # 5000
"rollout_fragment_length": 50, # 100
"sgd_minibatch_size": 100, # 500
"vf_share_layers": False,
"env_config" : {
"observation": "tree",
"observation_config":{
"max_depth": 2,
"shortest_path_max_depth": 30},
"generator": "sparse_rail_generator",
"generator_config": "small_v0",
"eval_generator": "test_eval"},
"model" : {
"fcnet_activation": "relu",
"fcnet_hiddens": [256, 256],
"vf_share_layers": True }}
exp['config'] = config
exp['config']['callbacks'] = {
'on_episode_end': on_episode_end,
}
eval_configs = get_eval_config(exp['config'].get('env_config',\
{}).get('eval_generator',"default"))
eval_seed = eval_configs.get('evaluation_config',{}).get('env_config',{}).get('seed')
# add evaluation config to the current config
exp['config'] = merge_dicts(exp['config'],eval_configs)
if exp['config'].get('evaluation_config'):
exp['config']['evaluation_config']['env_config'] = exp['config'].get('env_config')
eval_env_config = exp['config']['evaluation_config'].get('env_config')
if eval_seed and eval_env_config:
# We override the env seed from the evaluation config
eval_env_config['seed'] = eval_seed
exp["config"]["eager"] = True
exp["config"]["use_pytorch"] = False
exp["config"]["log_level"] = "INFO"
verbose = 2
exp["config"]["eager_tracing"] = True
webui_host = "0.0.0.0"
# TODO should be in exp['config'] directly
exp['config']['env_config']['yaml_config'] = config
exp['loggers'] = [TBXLogger]
_default_config = with_common_config(
exp["config"])
ray.init(num_cpus=4,num_gpus=0)
trainer = ImitationAgent(_default_config,
env=exp['env'],)
# trainer = PPOTrainer(_default_config,
# env="flatland_sparse",)
for i in range(exp.get("stop",{}).get("iterations",5)):
result = trainer.train()
print("Results:",result)
if i % exp['checkpoint_freq']==0:
# eval_results = trainer._evaluate()
# print("Eval Results:",eval_results)
checkpoint = trainer.save()
# TODO: Loads weights but not optimizer state
# Could be done by overriding _save by using model.save_weight(checkpoint)
# Also override _restore. Ideally use workers to save/load weights.
# trainer.restore(checkpoint)
print("checkpoint saved at", checkpoint)
trainer.stop()
print("Test: OK")
flatland-sparse-small-tree-fc-apex-il-trainer:
run: ImitationAgent
env: flatland_sparse
stop:
timesteps_total: 15000000 # 1.5e7
checkpoint_freq: 50
checkpoint_at_end: True
keep_checkpoints_num: 50
checkpoint_score_attr: episode_reward_mean
num_samples: 3
config:
num_workers: 13
num_envs_per_worker: 5
num_gpus: 0
clip_rewards: False
vf_clip_param: 500.0
entropy_coeff: 0.01
# effective batch_size: train_batch_size * num_agents_in_each_environment [5, 10]
# see https://github.com/ray-project/ray/issues/4628
train_batch_size: 1000 # 5000
rollout_fragment_length: 50 # 100
sgd_minibatch_size: 100 # 500
vf_share_layers: False
env_config:
observation: tree
observation_config:
max_depth: 2
shortest_path_max_depth: 30
generator: sparse_rail_generator
generator_config: small_v0
wandb:
project: flatland-paper
entity: aicrowd
tags: ["small_v0", "tree_obs", "apex_rllib_il"] # TODO should be set programmatically
model:
fcnet_activation: relu
fcnet_hiddens: [256, 256]
vf_share_layers: False # Should be same as ppo vf_shared_layers
evaluation_num_workers: 2
# Enable evaluation, once per training iteration.
evaluation_interval: 3
evaluation_interval: 1
# Run 1 episode each time evaluation runs.
evaluation_num_episodes: 2
# Override the env config for evaluation.
......
import numpy as np
from ray.rllib.agents.trainer import Trainer, with_common_config
from ray.rllib.agents.dqn import ApexTrainer,DQNTrainer
from ray.rllib.utils.annotations import override
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.tune.logger import pretty_print
import numpy as np
import ray
from ray import tune
import os
import ray
......@@ -21,7 +18,6 @@ from ray.tune.logger import TBXLogger
from ray.tune.resources import resources_to_json
from ray.tune.tune import _make_scheduler
from ray.rllib.models.tf.tf_action_dist import Categorical
tf = try_import_tf()
from ray.tune import registry
......@@ -30,67 +26,170 @@ from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.models import ModelCatalog
from utils.argparser import create_parser
from utils.loader import load_envs, load_models
from utils.loader import load_envs, load_models, load_algorithms
from algorithms import CUSTOM_ALGORITHMS
from envs.flatland import get_eval_config
from ray.rllib.utils import merge_dicts
# Custom wandb logger with hotfix to allow custom callbacks
from wandblogger import WandbLogger
import pandas as pd
"""
Note : This implementation has been adapted from various files in :
https://github.com/ray-project/ray/blob/master/rllib/examples
Note : This implementation has been adapted from :
https://github.com/ray-project/ray/blob/master/rllib/contrib/random_agent/random_agent.py
"""
from ray.rllib.policy import Policy,TFPolicy
from ray.rllib.policy.dynamic_tf_policy import DynamicTFPolicy
from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy
from ray.rllib.policy.tf_policy_template import build_tf_policy
from ray.rllib.evaluation.worker_set import WorkerSet
from ray.rllib.execution.rollout_ops import ParallelRollouts, ConcatBatches
from ray.rllib.execution.train_ops import TrainOneStep
from ray.rllib.execution.metric_ops import StandardMetricsReporting
import numpy as np
from flatland.envs.agent_utils import RailAgentStatus
import sys,os
from algorithms.imitation_agent.imitation_trainer import ImitationAgent
from algorithms.imitation_agent.imitation_trainer import ImitationAgent
from train import on_episode_end
checkpoint_freq=10
keep_checkpoints_num=100
checkpoint_at_end=True
checkpoint_score_attr="episode_reward_mean"
verbose = 2
MAX_ITERATIONS = 1000000
if __name__ == "__main__":
# Register all necessary assets in tune registries
load_envs(os.getcwd()) # Load envs
load_models(os.getcwd()) # Load models
ppo = True
if ppo:
config_file = "small_tree_video/PPO_test.yaml"
def run(args, parser):
if args.config_file:
with open(args.config_file) as f:
experiments = yaml.safe_load(f)
else:
config_file = "small_tree_video/apex_test.yaml"
with open(config_file) as f:
exp = yaml.safe_load(f)
exp["config"]["eager"] = True
exp["config"]["use_pytorch"] = False
exp["config"]["log_level"] = "INFO"
verbose = 2
exp["config"]["eager_tracing"] = True
webui_host = "0.0.0.0"
# TODO should be in exp['config'] directly
exp['config']['env_config']['yaml_config'] = config_file
exp['loggers'] = [TBXLogger]
_default_config = with_common_config(
exp["config"])
ray.init(num_cpus=3,num_gpus=0)
imitation_trainer = ImitationAgent(_default_config,
env="flatland_sparse",)
# Note: keep this in sync with tune/config_parser.py
experiments = {
args.experiment_name: { # i.e. log to ~/ray_results/default
"run": args.run,
"checkpoint_freq": args.checkpoint_freq,
"keep_checkpoints_num": args.keep_checkpoints_num,
"checkpoint_score_attr": args.checkpoint_score_attr,
"local_dir": args.local_dir,
"resources_per_trial": (
args.resources_per_trial and
resources_to_json(args.resources_per_trial)),
"stop": args.stop,
"config": dict(args.config, env=args.env),
"restore": args.restore,
"num_samples": args.num_samples,
"upload_dir": args.upload_dir,
}
}
global verbose
verbose = 1
webui_host = "localhost"
for exp in experiments.values():
# Bazel makes it hard to find files specified in `args` (and `data`).
# Look for them here.
# NOTE: Some of our yaml files don't have a `config` section.
if exp.get("config", {}).get("input") and \
not os.path.exists(exp["config"]["input"]):
# This script runs in the ray/rllib dir.
rllib_dir = Path(__file__).parent
input_file = rllib_dir.absolute().joinpath(exp["config"]["input"])
exp["config"]["input"] = str(input_file)
if not exp.get("run"):
parser.error("the following arguments are required: --run")
if not exp.get("env") and not exp.get("config", {}).get("env"):
parser.error("the following arguments are required: --env")
if args.eager:
exp["config"]["eager"] = True
if args.torch:
exp["config"]["use_pytorch"] = True
if args.v:
exp["config"]["log_level"] = "INFO"
verbose = 2
if args.vv:
exp["config"]["log_level"] = "DEBUG"
verbose = 3
if args.trace:
if not exp["config"].get("eager"):
raise ValueError("Must enable --eager to enable tracing.")
exp["config"]["eager_tracing"] = True
if args.bind_all:
webui_host = "0.0.0.0"
if args.log_flatland_stats:
exp['config']['callbacks'] = {
'on_episode_end': on_episode_end,
}
if args.eval:
eval_configs = get_eval_config(exp['config'].get('env_config',\
{}).get('eval_generator',"default"))
eval_seed = eval_configs.get('evaluation_config',{}).get('env_config',{}).get('seed')
# add evaluation config to the current config
exp['config'] = merge_dicts(exp['config'],eval_configs)
if exp['config'].get('evaluation_config'):
exp['config']['evaluation_config']['env_config'] = exp['config'].get('env_config')
eval_env_config = exp['config']['evaluation_config'].get('env_config')
if eval_seed and eval_env_config:
# We override the env seed from the evaluation config
eval_env_config['seed'] = eval_seed
# Remove any wandb related configs
if eval_env_config:
if eval_env_config.get('wandb'):
del eval_env_config['wandb']
# Remove any wandb related configs
if exp['config']['evaluation_config'].get('wandb'):
del exp['config']['evaluation_config']['wandb']
if args.config_file:
# TODO should be in exp['config'] directly
exp['config']['env_config']['yaml_config'] = args.config_file
exp['loggers'] = [WandbLogger, TBXLogger]