Commit d4fe3086 authored by nilabha's avatar nilabha

cleanup comments , add logger

parent d76a69e2
Pipeline #5015 passed with stage
in 4 minutes and 51 seconds
......@@ -144,71 +144,6 @@ class ImitationAgent(PPOTrainer):
# return res
# @override(Trainer)
# def _evaluate(self):
# import tensorflow as tf
# policy = self.get_policy()
# steps = 0
# all_scores = 0
# all_completion = 0
# eval_episodes = self.config.get("evaluation_num_episodes",2)
# for _ in range(eval_episodes):
# env = self.env._env.rail_env
# obs = self.env.reset()
# num_outputs = env.action_space[0]
# n_agents = env.get_num_agents()
# # TODO : Update max_steps as per latest version
# # https://gitlab.aicrowd.com/flatland/flatland-examples/blob/master/reinforcement_learning/multi_agent_training.py
# # max_steps = int(4 * 2 * (env.height + env.width + (n_agents / n_cities))) - 1
# max_steps = int(4 * 2 * (20 + env.height + env.width))
# episode_steps = 0
# episode_max_steps = 0
# episode_num_agents = 0
# episode_score = 0
# episode_done_agents = 0
# # obs = self.env.reset()
# done = {}
# done["__all__"] = False
# action_dict = {i:2 for i in range(n_agents)}
# # TODO: Support for batch update
# # batch_size = 2
# # logits, _ = policy.model.forward({"obs": np.vstack([obs[a],obs[a]])}, [], None)
# for step in range(max_steps):
# for a in range(n_agents):
# if not done.get(a) and obs.get(a) is not None:
# input_dict = {"obs": np.expand_dims(obs[a],0)}
# input_dict['obs_flat'] = input_dict['obs']
# logits, _ = policy.model.forward(input_dict, [], None)
# model_logits = tf.squeeze(logits)
# action_dict[a] = tf.math.argmax(model_logits).numpy()
# obs, all_rewards, done, info = self.env.step(action_dict)
# steps += 1
# for agent, agent_info in info.items():
# if episode_max_steps == 0:
# episode_max_steps = agent_info["max_episode_steps"]
# episode_num_agents = agent_info["num_agents"]
# episode_steps = max(episode_steps, agent_info["agent_step"])
# episode_score += agent_info["agent_score"]
# if agent_info["agent_done"]:
# episode_done_agents += 1
# if done["__all__"]:
# all_scores += episode_score
# all_completion += float(episode_done_agents) / n_agents
# break
# result = {
# "episode_reward_mean": all_scores/eval_episodes,
# "episode_completion_mean": all_completion/eval_episodes,
# "timesteps_this_iter": steps,
# }
# return result
@override(Trainer)
def _train(self):
import tensorflow as tf
......@@ -301,18 +236,6 @@ class ImitationAgent(PPOTrainer):
# res.update(expert_scores = result)
return result
# @override(Trainable)
# def _save(self, checkpoint_dir):
# checkpoint_path = os.path.join(checkpoint_dir,
# "checkpoint-{}".format(self.iteration))
# pickle.dump(self.__getstate__(), open(checkpoint_path, "wb"))
# return checkpoint_path
# @override(Trainable)
# def save(self, checkpoint_dir):
# return self._save(checkpoint_dir)
if __name__ == "__main__":
......
......@@ -173,14 +173,6 @@ def run(args, parser):
return experiments
# run_experiments(
# experiments,
# scheduler=_make_scheduler(args),
# queue_trials=args.queue_trials,
# resume=args.resume,
# verbose=verbose,
# concurrent=True)
def imitation_train_fn(config,reporter=None):
imitation_trainer = ImitationAgent(config,
......@@ -219,7 +211,7 @@ def imitation_train_fn(config,reporter=None):
imitation_trainer.stop()
ppo_trainer.stop()
print("Test: OK")
print("Completed: OK")
if __name__ == "__main__":
......@@ -227,24 +219,15 @@ if __name__ == "__main__":
args = parser.parse_args()
experiments = run(args, parser)
# exp['stop'] = {"timesteps_total": 15000}
# exp['stop'] = {"iterations": 4}
for exp in experiments.values():
_default_config = with_common_config(exp["config"])
_default_config['env'] = exp.get('env')
resources = PPOTrainer.default_resource_request(_default_config).to_json()
tune.run(imitation_train_fn, resources_per_trial=resources, config=_default_config,
#loggers=exp.get('loggers'),
loggers=exp.get('loggers'),
stop=exp.get('stop'),
num_samples=exp.get('num_samples',1),
# checkpoint_freq=exp.get('checkpoint_freq'),
# keep_checkpoints_num=exp.get('keep_checkpoints_num'),
# checkpoint_at_end=exp.get('checkpoint_at_end'),
# checkpoint_score_attr=exp.get('checkpoint_score_attr'),
scheduler=_make_scheduler(args),
queue_trials=args.queue_trials,
resume=args.resume,
verbose=verbose)
# imitation_train_fn(_default_config)
verbose=verbose)
\ No newline at end of file
from ray.rllib.agents.trainer import Trainer, with_common_config
from ray.rllib.agents.dqn import ApexTrainer,DQNTrainer
from ray.rllib.utils.annotations import override
from ray.rllib.agents.ppo.ppo import PPOTrainer
import ray
from ray import tune
import os
import ray
import yaml
from pathlib import Path
from ray.cluster_utils import Cluster
from ray.rllib.evaluation import MultiAgentEpisode
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.tune import run_experiments
from ray.tune.logger import TBXLogger
from ray.tune.resources import resources_to_json
from ray.tune.tune import _make_scheduler
tf = try_import_tf()
from ray.tune import registry
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.models import ModelCatalog
from utils.argparser import create_parser
from utils.loader import load_envs, load_models, load_algorithms
from algorithms import CUSTOM_ALGORITHMS
from envs.flatland import get_eval_config
from ray.rllib.utils import merge_dicts
# Custom wandb logger with hotfix to allow custom callbacks
from wandblogger import WandbLogger
import pandas as pd
"""
Note : This implementation has been adapted from :
https://github.com/ray-project/ray/blob/master/rllib/contrib/random_agent/random_agent.py
"""
import numpy as np
from flatland.envs.agent_utils import RailAgentStatus
from algorithms.imitation_agent.imitation_trainer import ImitationAgent
from train import on_episode_end
checkpoint_freq=10
keep_checkpoints_num=100
checkpoint_at_end=True
checkpoint_score_attr="episode_reward_mean"
verbose = 2
MAX_ITERATIONS = 1000000
def run(args, parser):
if args.config_file:
with open(args.config_file) as f:
experiments = yaml.safe_load(f)
else:
# Note: keep this in sync with tune/config_parser.py
experiments = {
args.experiment_name: { # i.e. log to ~/ray_results/default
"run": args.run,
"checkpoint_freq": args.checkpoint_freq,
"keep_checkpoints_num": args.keep_checkpoints_num,
"checkpoint_score_attr": args.checkpoint_score_attr,
"local_dir": args.local_dir,
"resources_per_trial": (
args.resources_per_trial and
resources_to_json(args.resources_per_trial)),
"stop": args.stop,
"config": dict(args.config, env=args.env),
"restore": args.restore,
"num_samples": args.num_samples,
"upload_dir": args.upload_dir,
}
}
global verbose
verbose = 1
webui_host = "localhost"
for exp in experiments.values():
# Bazel makes it hard to find files specified in `args` (and `data`).
# Look for them here.
# NOTE: Some of our yaml files don't have a `config` section.
if exp.get("config", {}).get("input") and \
not os.path.exists(exp["config"]["input"]):
# This script runs in the ray/rllib dir.
rllib_dir = Path(__file__).parent
input_file = rllib_dir.absolute().joinpath(exp["config"]["input"])
exp["config"]["input"] = str(input_file)
if not exp.get("run"):
parser.error("the following arguments are required: --run")
if not exp.get("env") and not exp.get("config", {}).get("env"):
parser.error("the following arguments are required: --env")
if args.eager:
exp["config"]["eager"] = True
if args.torch:
exp["config"]["use_pytorch"] = True
if args.v:
exp["config"]["log_level"] = "INFO"
verbose = 2
if args.vv:
exp["config"]["log_level"] = "DEBUG"
verbose = 3
if args.trace:
if not exp["config"].get("eager"):
raise ValueError("Must enable --eager to enable tracing.")
exp["config"]["eager_tracing"] = True
if args.bind_all:
webui_host = "0.0.0.0"
if args.log_flatland_stats:
exp['config']['callbacks'] = {
'on_episode_end': on_episode_end,
}
if args.eval:
eval_configs = get_eval_config(exp['config'].get('env_config',\
{}).get('eval_generator',"default"))
eval_seed = eval_configs.get('evaluation_config',{}).get('env_config',{}).get('seed')
# add evaluation config to the current config
exp['config'] = merge_dicts(exp['config'],eval_configs)
if exp['config'].get('evaluation_config'):
exp['config']['evaluation_config']['env_config'] = exp['config'].get('env_config')
eval_env_config = exp['config']['evaluation_config'].get('env_config')
if eval_seed and eval_env_config:
# We override the env seed from the evaluation config
eval_env_config['seed'] = eval_seed
# Remove any wandb related configs
if eval_env_config:
if eval_env_config.get('wandb'):
del eval_env_config['wandb']
# Remove any wandb related configs
if exp['config']['evaluation_config'].get('wandb'):
del exp['config']['evaluation_config']['wandb']
if args.config_file:
# TODO should be in exp['config'] directly
exp['config']['env_config']['yaml_config'] = args.config_file
exp['loggers'] = [WandbLogger, TBXLogger]
global checkpoint_freq,keep_checkpoints_num,checkpoint_score_attr,checkpoint_at_end
checkpoint_freq = exp['checkpoint_freq']
keep_checkpoints_num = exp['keep_checkpoints_num']
checkpoint_score_attr = exp['checkpoint_score_attr']
checkpoint_at_end = exp['checkpoint_at_end']
if args.ray_num_nodes:
cluster = Cluster()
for _ in range(args.ray_num_nodes):
cluster.add_node(
num_cpus=args.ray_num_cpus or 1,
num_gpus=args.ray_num_gpus or 0,
object_store_memory=args.ray_object_store_memory,
memory=args.ray_memory,
redis_max_memory=args.ray_redis_max_memory)
ray.init(address=cluster.address)
else:
ray.init(
address=args.ray_address,
object_store_memory=args.ray_object_store_memory,
memory=args.ray_memory,
redis_max_memory=args.ray_redis_max_memory,
num_cpus=args.ray_num_cpus,
num_gpus=args.ray_num_gpus,
webui_host=webui_host)
return experiments
# run_experiments(
# experiments,
# scheduler=_make_scheduler(args),
# queue_trials=args.queue_trials,
# resume=args.resume,
# verbose=verbose,
# concurrent=True)
def imitation_train_fn(config,reporter=None):
imitation_trainer = ImitationAgent(config,
env=config.get("env"),)
# eval_results_all = pd.DataFrame.from_dict([{'episode_reward_mean': np.nan, 'episode_completion_mean': np.nan, 'timesteps_this_iter': 0}])
for i in range(MAX_ITERATIONS):
result = imitation_trainer.train()
if reporter:
reporter(**result)
if i % checkpoint_freq == 0:
# eval_results = imitation_trainer._evaluate()
# print("Eval Results:",eval_results)
checkpoint = imitation_trainer.save()
# checkpoint_dir = os.sep.join(checkpoint.split(os.sep)[:-1])
# print(checkpoint_dir)
# eval_results_all = pd.concat([eval_results_all, pd.DataFrame.from_dict([eval_results])])
# eval_results_all.to_csv(os.path.join(checkpoint_dir, 'EvalResults.csv'))
# Loads weights but not optimizer state
# Could be done by overriding _save by using model.save_weight(checkpoint)
# Also override _restore. Ideally use workers to save/load weights.
# imitation_trainer.restore(checkpoint)
print("checkpoint saved at", checkpoint)
imitation_trainer.stop()
print("Test: OK")
if __name__ == "__main__":
parser = create_parser()
args = parser.parse_args()
experiments = run(args, parser)
# exp['stop'] = {"timesteps_total": 15000}
# exp['stop'] = {"iterations": 4}
for exp in experiments.values():
_default_config = with_common_config(exp["config"])
_default_config['env'] = exp.get('env')
resources = PPOTrainer.default_resource_request(_default_config).to_json()
tune.run(imitation_train_fn, resources_per_trial=resources, config=_default_config,
#loggers=exp.get('loggers'),
stop=exp.get('stop'),
num_samples=exp.get('num_samples',1),
# checkpoint_freq=exp.get('checkpoint_freq'),
# keep_checkpoints_num=exp.get('keep_checkpoints_num'),
# checkpoint_at_end=exp.get('checkpoint_at_end'),
# checkpoint_score_attr=exp.get('checkpoint_score_attr'),
scheduler=_make_scheduler(args),
queue_trials=args.queue_trials,
resume=args.resume,
verbose=verbose)
# imitation_train_fn(_default_config)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment