-
Guillaume Mollard authoredGuillaume Mollard authored
train_experiment.py 7.78 KiB
import os
import gin
import gym
from flatland.envs.predictions import DummyPredictorForRailEnv, ShortestPathPredictorForRailEnv
# Import PPO trainer: we can replace these imports by any other trainer from RLLib.
from ray.rllib.agents.ppo.ppo import DEFAULT_CONFIG
from ray.rllib.agents.ppo.ppo import PPOTrainer as Trainer
from ray.rllib.agents.ppo.ppo_policy_graph import PPOPolicyGraph as PolicyGraph
from ray.rllib.models import ModelCatalog
gin.external_configurable(DummyPredictorForRailEnv)
gin.external_configurable(ShortestPathPredictorForRailEnv)
import ray
from ray.tune.logger import UnifiedLogger
from ray.tune.logger import pretty_print
import os
from RailEnvRLLibWrapper import RailEnvRLLibWrapper
import tempfile
from ray import tune
from ray.rllib.utils.seed import seed as set_seed
from flatland.envs.observations import TreeObsForRailEnv
gin.external_configurable(TreeObsForRailEnv)
import numpy as np
from custom_preprocessors import TreeObsPreprocessor
ModelCatalog.register_custom_preprocessor("tree_obs_prep", TreeObsPreprocessor)
ray.init() # object_store_memory=150000000000, redis_max_memory=30000000000)
__file_dirname__ = os.path.dirname(os.path.realpath(__file__))
def on_episode_start(info):
episode = info['episode']
map_width = info['env'].envs[0].width
map_height = info['env'].envs[0].height
episode.horizon = 3*(map_width + map_height)
def on_episode_end(info):
episode = info['episode']
# Calculation of a custom score metric: cum of all accumulated rewards, divided by the number of agents
# and the number of the maximum time steps of the episode.
score = 0
for k, v in episode._agent_reward_history.items():
score += np.sum(v)
score /= (len(episode._agent_reward_history) * episode.horizon)
# Calculation of the proportion of solved episodes before the maximum time step
done = 0
if len(episode._agent_reward_history[0]) <= episode.horizon-5:
done = 1
episode.custom_metrics["score"] = score
episode.custom_metrics["proportion_episode_solved"] = done
def train(config, reporter):
print('Init Env')
set_seed(config['seed'], config['seed'], config['seed'])
# Given the depth of the tree observation and the number of features per node we get the following state_size
num_features_per_node = config['obs_builder'].observation_dim
tree_depth = 2
nr_nodes = 0
for i in range(tree_depth + 1):
nr_nodes += np.power(4, i)
obs_size = num_features_per_node * nr_nodes
# Environment parameters
env_config = {"width": config['map_width'],
"height": config['map_height'],
"rail_generator": config["rail_generator"],
"nr_extra": config["nr_extra"],
"number_of_agents": config['n_agents'],
"seed": config['seed'],
"obs_builder": config['obs_builder'],
"min_dist": config['min_dist'],
"step_memory": config["step_memory"]}
# Observation space and action space definitions
if isinstance(config["obs_builder"], TreeObsForRailEnv):
obs_space = gym.spaces.Tuple((gym.spaces.Box(low=-float('inf'), high=float('inf'), shape=(obs_size,)),) * 2)
preprocessor = "tree_obs_prep"
else:
raise ValueError("Undefined observation space") # Only TreeObservation implemented for now.
act_space = gym.spaces.Discrete(5)
# Dict with the different policies to train. In this case, all trains follow the same policy
policy_graphs = {
"ppo_policy": (PolicyGraph, obs_space, act_space, {})
}
# Function that maps an agent id to the name of its respective policy.
def policy_mapping_fn(agent_id):
return "ppo_policy"
# Trainer configuration
trainer_config = DEFAULT_CONFIG.copy()
trainer_config['model'] = {"fcnet_hiddens": config['hidden_sizes'], "custom_preprocessor": preprocessor,
"custom_options": {"step_memory": config["step_memory"], "obs_size": obs_size}}
trainer_config['multiagent'] = {"policy_graphs": policy_graphs,
"policy_mapping_fn": policy_mapping_fn,
"policies_to_train": list(policy_graphs.keys())}
# Maximum time steps for an episode is set to 3*map_width*map_height
trainer_config["horizon"] = 3 * (config['map_width'] + config['map_height'])
# Parameters for calculation parallelization
trainer_config["num_workers"] = 0
trainer_config["num_cpus_per_worker"] = 8
trainer_config["num_gpus"] = 0.2
trainer_config["num_gpus_per_worker"] = 0.2
trainer_config["num_cpus_for_driver"] = 1
trainer_config["num_envs_per_worker"] = 1
# Parameters for PPO training
trainer_config['entropy_coeff'] = config['entropy_coeff']
trainer_config["env_config"] = env_config
trainer_config["batch_mode"] = "complete_episodes"
trainer_config['simple_optimizer'] = False
trainer_config['log_level'] = 'WARN'
trainer_config['num_sgd_iter'] = 10
trainer_config['clip_param'] = 0.2
trainer_config['kl_coeff'] = config['kl_coeff']
trainer_config['lambda'] = config['lambda_gae']
trainer_config['callbacks'] = {
"on_episode_start": tune.function(on_episode_start),
"on_episode_end": tune.function(on_episode_end)
}
def logger_creator(conf):
"""Creates a Unified logger with a default logdir prefix."""
logdir = config['policy_folder_name'].format(**locals())
logdir = tempfile.mkdtemp(
prefix=logdir, dir=config['local_dir'])
return UnifiedLogger(conf, logdir, None)
logger = logger_creator
trainer = Trainer(env=RailEnvRLLibWrapper, config=trainer_config, logger_creator=logger)
for i in range(100000 + 2):
print("== Iteration", i, "==")
print(pretty_print(trainer.train()))
if i % config['save_every'] == 0:
checkpoint = trainer.save()
print("checkpoint saved at", checkpoint)
reporter(num_iterations_trained=trainer._iteration)
@gin.configurable
def run_experiment(name, num_iterations, n_agents, hidden_sizes, save_every,
map_width, map_height, policy_folder_name, local_dir, obs_builder,
entropy_coeff, seed, conv_model, rail_generator, nr_extra, kl_coeff, lambda_gae,
step_memory, min_dist):
tune.run(
train,
name=name,
stop={"num_iterations_trained": num_iterations},
config={"n_agents": n_agents,
"hidden_sizes": hidden_sizes, # Array containing the sizes of the network layers
"save_every": save_every,
"map_width": map_width,
"map_height": map_height,
"local_dir": local_dir,
'policy_folder_name': policy_folder_name,
"obs_builder": obs_builder,
"entropy_coeff": entropy_coeff,
"seed": seed,
"conv_model": conv_model,
"rail_generator": rail_generator,
"nr_extra": nr_extra,
"kl_coeff": kl_coeff,
"lambda_gae": lambda_gae,
"min_dist": min_dist,
"step_memory": step_memory # If equal to two, the current observation plus
# the observation of last time step will be given as input the the model.
},
resources_per_trial={
"cpu": 8,
"gpu": 0.2
},
verbose=2,
local_dir=local_dir
)
if __name__ == '__main__':
folder_name = 'config_example' # To Modify
gin.parse_config_file(os.path.join(__file_dirname__, 'experiment_configs', folder_name, 'config.gin'))
dir = os.path.join(__file_dirname__, 'experiment_configs', folder_name)
run_experiment(local_dir=dir)