Commit 4c60e6c2 authored by nilabha's avatar nilabha

Add structure and trainer as algorithm

parent 413643fd
Pipeline #4991 failed with stage
in 2 minutes and 4 seconds
from .registry import CUSTOM_ALGORITHMS
......@@ -247,61 +247,4 @@ class ImitationAgent(PPOTrainer):
"timesteps_this_iter": steps,
}
def imitation_train_fn(config, reporter=None):
imitation_trainer = ImitationAgent(config,
env="flatland_sparse",)
eval_results_all = pd.DataFrame.from_dict([{'episode_reward_mean': 0, 'episode_completion_mean': 0, 'timesteps_this_iter': 0}])
for i in range(1000):
result = imitation_trainer.train()
if reporter:
reporter(**result)
if i % 10 == 0:
eval_results = imitation_trainer.eval()
print("Eval Results:",eval_results)
eval_results_all = pd.concat([eval_results_all, pd.DataFrame.from_dict([eval_results])])
eval_results_all.to_csv('EvalResults.csv')
checkpoint = imitation_trainer.save()
# TODO: Loads weights but not optimizer state
# Could be done by overriding _save by using model.save_weight(checkpoint)
# Also override _restore. Ideally use workers to save/load weights.
# imitation_trainer.restore(checkpoint)
print("checkpoint saved at", checkpoint)
imitation_trainer.stop()
print("Test: OK")
if __name__ == "__main__":
# Register all necessary assets in tune registries
load_envs(os.getcwd()) # Load envs
load_models(os.getcwd()) # Load models
ppo = False
if ppo:
config_file = "small_tree_video/PPO_test.yaml"
else:
config_file = "small_tree_video/apex_test.yaml"
with open(config_file) as f:
exp = yaml.safe_load(f)
exp["config"]["eager"] = True
exp["config"]["use_pytorch"] = False
exp["config"]["log_level"] = "INFO"
verbose = 2
exp["config"]["eager_tracing"] = True
webui_host = "0.0.0.0"
# TODO should be in exp['config'] directly
exp['config']['env_config']['yaml_config'] = config_file
exp['loggers'] = [WandbLogger,TBXLogger]
_default_config = with_common_config(
exp["config"])
ray.init(num_cpus=3,num_gpus=0)
resources = PPOTrainer.default_resource_request(_default_config).to_json()
tune.run(imitation_train_fn, resources_per_trial=resources, config=_default_config)
"""
Registry of custom implemented algorithms names
Please refer to the following examples to add your custom algorithms :
- AlphaZero : https://github.com/ray-project/ray/tree/master/rllib/contrib/alpha_zero
- bandits : https://github.com/ray-project/ray/tree/master/rllib/contrib/bandits
- maddpg : https://github.com/ray-project/ray/tree/master/rllib/contrib/maddpg
- random_agent: https://github.com/ray-project/ray/tree/master/rllib/contrib/random_agent
An example integration of the random agent is shown here :
- https://github.com/AIcrowd/neurips2020-procgen-starter-kit/tree/master/algorithms/custom_random_agent
"""
def _import_imitation_trainer():
from .imitation_agent.imitation_trainer import ImitationAgent
return ImitationAgent
CUSTOM_ALGORITHMS = {
"ImitationAgent": _import_imitation_trainer
}
\ No newline at end of file
......@@ -53,137 +53,8 @@ import numpy as np
from flatland.envs.agent_utils import RailAgentStatus
import sys,os
sys.path.insert(0, os.getcwd() + '/envs/expert')
from libs.cell_graph_dispatcher import CellGraphDispatcher
def adam_optimizer(policy, config):
return tf.train.AdamOptimizer(
learning_rate=0.01, epsilon=0.001)
def default_execution_plan(workers: WorkerSet, config):
# Collects experiences in parallel from multiple RolloutWorker actors.
rollouts = ParallelRollouts(workers, mode="bulk_sync")
# Combine experiences batches until we hit `train_batch_size` in size.
# Then, train the policy on those experiences and update the workers.
train_op = rollouts \
.combine(ConcatBatches(
min_batch_size=config["train_batch_size"])) \
.for_each(TrainOneStep(workers))
# Add on the standard episode reward, etc. metrics reporting. This returns
# a LocalIterator[metrics_dict] representing metrics for each train step.
return StandardMetricsReporting(train_op, workers, config)
def loss_imitation(policy, model, dist_class, train_batch):
return np.random.randint(5)
# policy = DQNTFPolicy.with_updates(name="ImitPolicy",)
ImitationTFPolicy = build_tf_policy(
name="ImitationTFPolicy",
loss_fn=loss_imitation,
optimizer_fn=adam_optimizer,
)
class ImitationAgent(PPOTrainer):
"""Policy that takes random actions and never learns."""
_name = "ImitationAgent"
@override(Trainer)
def _init(self, config, env_creator):
self.env = env_creator(config["env_config"])
self.state = {}
self._policy = ImitationTFPolicy
action_space = self.env.action_space
dist_class, logit_dim = ModelCatalog.get_action_dist(
action_space, self.config["model"])
self.workers = self._make_workers(
env_creator, self._policy, config, self.config["num_workers"])
self.execution_plan = default_execution_plan
self.train_exec_impl = self.execution_plan(self.workers, config)
@override(Trainer)
def _train(self):
import tensorflow as tf
policy = self.get_policy()
steps = 0
for _ in range(1):
env = self.env._env.rail_env
obs = self.env.reset()
num_outputs = env.action_space[0]
n_agents = env.get_num_agents()
dispatcher = CellGraphDispatcher(env)
# TODO : Update max_steps as per latest version
# https://gitlab.aicrowd.com/flatland/flatland-examples/blob/master/reinforcement_learning/multi_agent_training.py
# max_steps = int(4 * 2 * (env.height + env.width + (n_agents / n_cities))) - 1
max_steps = int(4 * 2 * (20 + env.height + env.width))
episode_steps = 0
episode_max_steps = 0
episode_num_agents = 0
episode_score = 0
episode_done_agents = 0
done = {}
done["__all__"] = False
# TODO: Support for batch update
# batch_size = 2
# logits, _ = policy.model.forward({"obs": np.vstack([obs[a],obs[a]])}, [], None)
for step in range(max_steps):
action_dict = dispatcher.step(env._elapsed_steps)
with tf.GradientTape() as tape:
imitation_loss = 0
active_agents = 0
for a in range(n_agents):
if not done.get(a) and obs.get(a) is not None:
active_agents += 1
expert_action = action_dict[a].value
input_dict = {"obs": np.expand_dims(obs[a],0)}
input_dict['obs_flat'] = input_dict['obs']
logits, _ = policy.model.forward(input_dict, [], None)
model_logits = tf.squeeze(logits)
expert_logits = tf.cast(expert_action, tf.int32)
action_dist = Categorical(logits, policy.model.model_config)
imitation_loss += tf.reduce_mean(-action_dist.logp(tf.expand_dims(expert_logits,0)))
imitation_loss = imitation_loss/max(active_agents,1)
gradients = tape.gradient(imitation_loss, policy.model.trainable_variables())
self.workers.local_worker().apply_gradients(gradients)
weights = ray.put(self.workers.local_worker().get_weights())
# print(self.workers.local_worker().get_weights())
for e in self.workers.remote_workers():
e.set_weights.remote(weights)
obs, all_rewards, done, info = self.env.step(action_dict)
steps += 1
# super()._train()
for agent, agent_info in info.items():
if episode_max_steps == 0:
episode_max_steps = agent_info["max_episode_steps"]
episode_num_agents = agent_info["num_agents"]
episode_steps = max(episode_steps, agent_info["agent_step"])
episode_score += agent_info["agent_score"]
if agent_info["agent_done"]:
episode_done_agents += 1
if done["__all__"]:
print(float(episode_done_agents) / n_agents)
break
return {
"episode_reward_mean": episode_score,
"timesteps_this_iter": steps,
}
from algorithms.imitation_agent.imitation_trainer import ImitationAgent
if __name__ == "__main__":
......@@ -240,4 +111,4 @@ if __name__ == "__main__":
print("checkpoint saved at", checkpoint)
imitation_trainer.set_weights(ppo_trainer.get_weights())
print("Done: OK")
\ No newline at end of file
print("Done: OK")
from ray.rllib.agents.trainer import Trainer, with_common_config
from ray.rllib.agents.dqn import ApexTrainer,DQNTrainer
from ray.rllib.utils.annotations import override
from ray.rllib.agents.ppo.ppo import PPOTrainer
import ray
from ray import tune
import os
import ray
import yaml
from pathlib import Path
from ray.cluster_utils import Cluster
from ray.rllib.evaluation import MultiAgentEpisode
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.tune import run_experiments
from ray.tune.logger import TBXLogger
from ray.tune.resources import resources_to_json
from ray.tune.tune import _make_scheduler
tf = try_import_tf()
from ray.tune import registry
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.optimizers import SyncSamplesOptimizer
from ray.rllib.models import ModelCatalog
from utils.argparser import create_parser
from utils.loader import load_envs, load_models, load_algorithms
from algorithms import CUSTOM_ALGORITHMS
# Custom wandb logger with hotfix to allow custom callbacks
from wandblogger import WandbLogger
import pandas as pd
"""
Note : This implementation has been adapted from :
https://github.com/ray-project/ray/blob/master/rllib/contrib/random_agent/random_agent.py
"""
import numpy as np
from flatland.envs.agent_utils import RailAgentStatus
from algorithms.imitation_agent.imitation_trainer import ImitationAgent
def imitation_train_fn(config, reporter=None):
imitation_trainer = ImitationAgent(config,
env="flatland_sparse",)
eval_results_all = pd.DataFrame.from_dict([{'episode_reward_mean': np.nan, 'episode_completion_mean': np.nan, 'timesteps_this_iter': 0}])
for i in range(1000):
result = imitation_trainer.train()
if reporter:
reporter(**result)
if i % 10 == 0:
eval_results = imitation_trainer.eval()
print("Eval Results:",eval_results)
checkpoint = imitation_trainer.save()
checkpoint_dir = os.sep.join(checkpoint.split(os.sep)[:-1])
print(checkpoint_dir)
eval_results_all = pd.concat([eval_results_all, pd.DataFrame.from_dict([eval_results])])
eval_results_all.to_csv(os.path.join(checkpoint_dir, 'EvalResults.csv'))
# TODO: Loads weights but not optimizer state
# Could be done by overriding _save by using model.save_weight(checkpoint)
# Also override _restore. Ideally use workers to save/load weights.
# imitation_trainer.restore(checkpoint)
print("checkpoint saved at", checkpoint)
imitation_trainer.stop()
print("Test: OK")
if __name__ == "__main__":
# Register all necessary assets in tune registries
load_envs(os.getcwd()) # Load envs
load_models(os.getcwd()) # Load models
load_algorithms(CUSTOM_ALGORITHMS) # Load algorithms
ppo = False
if ppo:
config_file = "small_tree_video/PPO_test.yaml"
else:
config_file = "small_tree_video/apex_test.yaml"
with open(config_file) as f:
exp = yaml.safe_load(f)
exp["config"]["eager"] = True
exp["config"]["use_pytorch"] = False
exp["config"]["log_level"] = "INFO"
verbose = 2
exp["config"]["eager_tracing"] = True
webui_host = "0.0.0.0"
# TODO should be in exp['config'] directly
exp['config']['env_config']['yaml_config'] = config_file
exp['loggers'] = [WandbLogger,TBXLogger]
_default_config = with_common_config(
exp["config"])
ray.init(num_cpus=3,num_gpus=0)
resources = PPOTrainer.default_resource_request(_default_config).to_json()
# imitation_train_fn(_default_config)
tune.run(imitation_train_fn, resources_per_trial=resources, config=_default_config)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment