Commit 3f321add authored by nilabha's avatar nilabha
Browse files

Merge branch 'rllib-IL' into 'flatland-paper-baselines'

Rllib IL Changes

See merge request flatland/neurips2020-flatland-baselines!14
parents 50e24f72 48a2454e
from .registry import CUSTOM_ALGORITHMS
import numpy as np
from ray.rllib.agents.trainer import Trainer, with_common_config
from ray.rllib.agents.dqn import ApexTrainer,DQNTrainer
from ray.rllib.utils.annotations import override
from ray.rllib.agents.ppo.ppo import PPOTrainer
import ray
from ray import tune
from ray.tune.trainable import Trainable
import numpy as np
import os
import math
import ray
import yaml
from pathlib import Path
from ray.cluster_utils import Cluster
from ray.rllib.evaluation import MultiAgentEpisode
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.tune import run_experiments
from ray.tune.logger import TBXLogger
from ray.tune.resources import resources_to_json
from ray.tune.tune import _make_scheduler
from ray.rllib.models.tf.tf_action_dist import Categorical
tf = try_import_tf()
from ray.tune import registry
from ray.rllib.agents.trainer_template import build_trainer
from ray.rllib.optimizers import PolicyOptimizer, SyncSamplesOptimizer
from ray.rllib.models import ModelCatalog
from utils.argparser import create_parser
from utils.loader import load_envs, load_models, load_algorithms
from envs.flatland import get_eval_config
from ray.rllib.utils import merge_dicts
from ray.rllib.evaluation.metrics import collect_metrics
# Custom wandb logger with hotfix to allow custom callbacks
from wandblogger import WandbLogger
import pandas as pd
"""
Note : This implementation has been adapted from :
https://github.com/ray-project/ray/blob/master/rllib/contrib/random_agent/random_agent.py
"""
from ray.rllib.policy import Policy,TFPolicy
from ray.rllib.policy.dynamic_tf_policy import DynamicTFPolicy
from ray.rllib.agents.dqn.dqn_tf_policy import DQNTFPolicy
from ray.rllib.policy.tf_policy_template import build_tf_policy
from ray.rllib.evaluation.worker_set import WorkerSet
from ray.rllib.execution.rollout_ops import ParallelRollouts, ConcatBatches
from ray.rllib.execution.train_ops import TrainOneStep
from ray.rllib.execution.metric_ops import StandardMetricsReporting
import numpy as np
import logging
logger = logging.getLogger(__name__)
from flatland.envs.agent_utils import RailAgentStatus
import sys,os
# sys.path.insert(0, os.getcwd() + '/envs/expert')
from libs.cell_graph_dispatcher import CellGraphDispatcher
def adam_optimizer(policy, config):
return tf.train.AdamOptimizer(
learning_rate=config.get('lr',5e-4), epsilon=config.get('adam_epsilon',1e-8))
def default_execution_plan(workers: WorkerSet, config):
# Collects experiences in parallel from multiple RolloutWorker actors.
rollouts = ParallelRollouts(workers, mode="bulk_sync")
# Combine experiences batches until we hit `train_batch_size` in size.
# Then, train the policy on those experiences and update the workers.
train_op = rollouts \
.combine(ConcatBatches(
min_batch_size=config["train_batch_size"])) \
.for_each(TrainOneStep(workers))
# Add on the standard episode reward, etc. metrics reporting. This returns
# a LocalIterator[metrics_dict] representing metrics for each train step.
return StandardMetricsReporting(train_op, workers, config)
def loss_imitation(policy, model, dist_class, train_batch):
return np.random.randint(5)
ImitationTFPolicy = build_tf_policy(
name="ImitationTFPolicy",
loss_fn=loss_imitation,
optimizer_fn=adam_optimizer,
)
class ImitationMetrics(PolicyOptimizer):
"""Adding metrics."""
@override(PolicyOptimizer)
def step(self):
pass
class ImitationAgent(PPOTrainer):
"""Policy that takes random actions and never learns."""
_name = "ImitationAgent"
@override(Trainer)
def _init(self, config, env_creator):
self.env = env_creator(config["env_config"])
self.state = {}
self._policy = ImitationTFPolicy
action_space = self.env.action_space
dist_class, logit_dim = ModelCatalog.get_action_dist(
action_space, self.config["model"])
self.workers = self._make_workers(
env_creator, self._policy, config, self.config["num_workers"])
self.execution_plan = default_execution_plan
self.train_exec_impl = self.execution_plan(self.workers, config)
self.optimizer = ImitationMetrics(self.workers)
# @override(Trainer)
# def collect_metrics(self, selected_workers=None):
# weights = ray.put(self.workers.local_worker().save())
# self.evaluation_workers.foreach_worker(
# lambda w: w.restore(ray.get(weights)))
# self.workers.foreach_worker(
# lambda w: w.restore(ray.get(weights)))
# ray.get([
# w.sample.remote()
# for w in self.workers.remote_workers()
# ])
# metrics = collect_metrics(self.workers.local_worker(),
# self.workers.remote_workers())
# res = self.optimizer.collect_metrics(
# self.config["collect_metrics_timeout"],
# min_history=self.config["metrics_smoothing_episodes"],
# selected_workers=selected_workers)
# return res
@override(Trainer)
def _train(self):
import tensorflow as tf
policy = self.get_policy()
steps = 0
n_episodes = 1
for _ in range(n_episodes):
env = self.env._env.rail_env
obs = self.env.reset()
num_outputs = env.action_space[0]
n_agents = env.get_num_agents()
dispatcher = CellGraphDispatcher(env)
# TODO : Update max_steps as per latest version
# https://gitlab.aicrowd.com/flatland/flatland-examples/blob/master/reinforcement_learning/multi_agent_training.py
# max_steps = int(4 * 2 * (env.height + env.width + (n_agents / n_cities))) - 1
max_steps = int(4 * 2 * (20 + env.height + env.width))
episode_steps = 0
episode_max_steps = 0
episode_num_agents = 0
episode_score = 0
episode_done_agents = 0
done = {}
done["__all__"] = False
# TODO: Support for batch update
# batch_size = 2
# logits, _ = policy.model.forward({"obs": np.vstack([obs[a],obs[a]])}, [], None)
for step in range(max_steps):
action_dict = dispatcher.step(env._elapsed_steps)
with tf.GradientTape() as tape:
imitation_loss = 0
active_agents = 0
for a in range(n_agents):
if not done.get(a) and obs.get(a) is not None:
active_agents += 1
expert_action = action_dict[a].value
input_dict = {"obs": np.expand_dims(obs[a],0)}
input_dict['obs_flat'] = input_dict['obs']
logits, _ = policy.model.forward(input_dict, [], None)
model_logits = tf.squeeze(logits)
expert_logits = tf.cast(expert_action, tf.int32)
action_dist = Categorical(logits, policy.model.model_config)
imitation_loss += tf.reduce_mean(-action_dist.logp(tf.expand_dims(expert_logits,0)))
imitation_loss = imitation_loss/max(active_agents,1)
gradients = tape.gradient(imitation_loss, policy.model.trainable_variables())
self.workers.local_worker().apply_gradients(gradients)
weights = ray.put(self.workers.local_worker().get_weights())
# print(self.workers.local_worker().get_weights()['default_policy'][0][:4])
for e in self.workers.remote_workers():
e.set_weights.remote(weights)
obs, all_rewards, done, info = self.env.step(action_dict)
steps += 1
for agent, agent_info in info.items():
if episode_max_steps == 0:
episode_max_steps = agent_info["max_episode_steps"]
episode_num_agents = agent_info["num_agents"]
episode_steps = max(episode_steps, agent_info["agent_step"])
episode_score += agent_info["agent_score"]
if agent_info["agent_done"]:
episode_done_agents += 1
if done["__all__"]:
print(float(episode_done_agents) / n_agents)
break
result = {
"expert_episode_reward_mean": episode_score,
"episode_reward_mean" : episode_score,
"expert_episode_completion_mean": float(episode_done_agents) / n_agents,
"episodes_this_iter": n_episodes,
"timesteps_this_iter": steps,
}
# Code taken from _train method of trainer_template.py - TODO: Not working
# res = self.collect_metrics()
# res = {}
# res.update(
# optimizer_steps_this_iter=steps,
# episode_reward_mean=episode_score,
# info=res.get("info", {}))
# res.update(expert_scores = result)
return result
if __name__ == "__main__":
# Copy this file to the root folder to run
from train import on_episode_end
exp = {}
exp['run']= "ImitationAgent"
exp['env']= "flatland_sparse"
# exp['stop'] = {"timesteps_total": 15000}
exp['stop'] = {"iterations": 4}
exp['checkpoint_freq'] = 2
# exp['checkpoint_at_end'] = True
# exp['keep_checkpoints_num']= 100
# exp['checkpoint_score_attr']: "episode_reward_mean"
# exp['num_samples']= 3
config = {
"num_workers": 1,
"num_envs_per_worker": 1,
"num_gpus": 0,
"clip_rewards": False,
"vf_clip_param": 500.0,
"entropy_coeff": 0.01,
# effective batch_size: train_batch_size * num_agents_in_each_environment [5, 10]
# see https://github.com/ray-project/ray/issues/4628
"train_batch_size": 1000, # 5000
"rollout_fragment_length": 50, # 100
"sgd_minibatch_size": 100, # 500
"vf_share_layers": False,
"env_config" : {
"observation": "tree",
"observation_config":{
"max_depth": 2,
"shortest_path_max_depth": 30},
"generator": "sparse_rail_generator",
"generator_config": "small_v0",
"eval_generator": "test_eval"},
"model" : {
"fcnet_activation": "relu",
"fcnet_hiddens": [256, 256],
"vf_share_layers": True }}
exp['config'] = config
exp['config']['callbacks'] = {
'on_episode_end': on_episode_end,
}
eval_configs = get_eval_config(exp['config'].get('env_config',\
{}).get('eval_generator',"default"))
eval_seed = eval_configs.get('evaluation_config',{}).get('env_config',{}).get('seed')
# add evaluation config to the current config
exp['config'] = merge_dicts(exp['config'],eval_configs)
if exp['config'].get('evaluation_config'):
exp['config']['evaluation_config']['env_config'] = exp['config'].get('env_config')
eval_env_config = exp['config']['evaluation_config'].get('env_config')
if eval_seed and eval_env_config:
# We override the env seed from the evaluation config
eval_env_config['seed'] = eval_seed
exp["config"]["eager"] = True
exp["config"]["use_pytorch"] = False
exp["config"]["log_level"] = "INFO"
verbose = 2
exp["config"]["eager_tracing"] = True
webui_host = "0.0.0.0"
# TODO should be in exp['config'] directly
exp['config']['env_config']['yaml_config'] = config
exp['loggers'] = [TBXLogger]
_default_config = with_common_config(
exp["config"])
ray.init(num_cpus=4,num_gpus=0)
trainer = ImitationAgent(_default_config,
env=exp['env'],)
# trainer = PPOTrainer(_default_config,
# env="flatland_sparse",)
for i in range(exp.get("stop",{}).get("iterations",5)):
result = trainer.train()
print("Results:",result)
if i % exp['checkpoint_freq']==0:
# eval_results = trainer._evaluate()
# print("Eval Results:",eval_results)
checkpoint = trainer.save()
# TODO: Loads weights but not optimizer state
# Could be done by overriding _save by using model.save_weight(checkpoint)
# Also override _restore. Ideally use workers to save/load weights.
# trainer.restore(checkpoint)
print("checkpoint saved at", checkpoint)
trainer.stop()
print("Test: OK")
"""
Registry of custom implemented algorithms names
Please refer to the following examples to add your custom algorithms :
- AlphaZero : https://github.com/ray-project/ray/tree/master/rllib/contrib/alpha_zero
- bandits : https://github.com/ray-project/ray/tree/master/rllib/contrib/bandits
- maddpg : https://github.com/ray-project/ray/tree/master/rllib/contrib/maddpg
- random_agent: https://github.com/ray-project/ray/tree/master/rllib/contrib/random_agent
An example integration of the random agent is shown here :
- https://github.com/AIcrowd/neurips2020-procgen-starter-kit/tree/master/algorithms/custom_random_agent
"""
def _import_imitation_trainer():
from .imitation_agent.imitation_trainer import ImitationAgent
return ImitationAgent
CUSTOM_ALGORITHMS = {
"ImitationAgent": _import_imitation_trainer
}
\ No newline at end of file
flatland-sparse-small-tree-fc-apex-il-trainer:
run: ImitationAgent
env: flatland_sparse
stop:
timesteps_total: 15000000 # 1.5e7
checkpoint_freq: 50
checkpoint_at_end: True
keep_checkpoints_num: 50
checkpoint_score_attr: episode_reward_mean
num_samples: 3
config:
num_workers: 6
num_envs_per_worker: 5
num_gpus: 0
clip_rewards: False
vf_clip_param: 500.0
entropy_coeff: 0.01
# effective batch_size: train_batch_size * num_agents_in_each_environment [5, 10]
# see https://github.com/ray-project/ray/issues/4628
train_batch_size: 1000 # 5000
rollout_fragment_length: 50 # 100
sgd_minibatch_size: 100 # 500
vf_share_layers: False
env_config:
custom_fn: imitation_ppo_train_fn
expert:
ratio: 0.5
ratio_decay: 1
min_ratio: 0.5
observation: tree
observation_config:
max_depth: 2
shortest_path_max_depth: 30
generator: sparse_rail_generator
generator_config: small_v0
wandb:
project: flatland-paper
entity: aicrowd
tags: ["small_v0", "tree_obs", "apex_rllib_il"] # TODO should be set programmatically
model:
fcnet_activation: relu
fcnet_hiddens: [256, 256]
vf_share_layers: False # Should be same as ppo vf_shared_layers
flatland-sparse-small-tree-fc-apex-il-trainer:
run: ImitationAgent
env: flatland_sparse
stop:
timesteps_total: 15000000 # 1.5e7
checkpoint_freq: 50
checkpoint_at_end: True
keep_checkpoints_num: 50
checkpoint_score_attr: episode_reward_mean
num_samples: 3
config:
num_workers: 13
num_envs_per_worker: 5
num_gpus: 0
clip_rewards: False
vf_clip_param: 500.0
entropy_coeff: 0.01
# effective batch_size: train_batch_size * num_agents_in_each_environment [5, 10]
# see https://github.com/ray-project/ray/issues/4628
train_batch_size: 1000 # 5000
rollout_fragment_length: 50 # 100
sgd_minibatch_size: 100 # 500
vf_share_layers: False
env_config:
observation: tree
observation_config:
max_depth: 2
shortest_path_max_depth: 30
generator: sparse_rail_generator
generator_config: small_v0
wandb:
project: flatland-paper
entity: aicrowd
tags: ["small_v0", "tree_obs", "apex_rllib_il"] # TODO should be set programmatically
model:
fcnet_activation: relu
fcnet_hiddens: [256, 256]
vf_share_layers: False # Should be same as ppo vf_shared_layers
evaluation_num_workers: 2
# Enable evaluation, once per training iteration.
evaluation_interval: 3
evaluation_interval: 1
# Run 1 episode each time evaluation runs.
evaluation_num_episodes: 2
# Override the env config for evaluation.
......
import numpy as np
from collections import deque
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_env import RailEnvActions
class Vertex:
def __init__(self, y, x, idx):
self.point = (y, x)
self.idx = idx
self.out = [[], [], [], []]
self.in_edges = [[], [], [], []]
class Edge:
def __init__(self, start_v, end_v, start_dir, end_dir, action_type):
self.start_v = start_v
self.end_v = end_v
self.start_direction = start_dir
self.end_direction = end_dir
self.action_type = action_type
class CellGraph:
def __init__(self, env : RailEnv):
self.env = env
self._build_graph()
def _build_graph(self):
width = self.env.width
height = self.env.height
self.vertex_idx = np.zeros((height, width), dtype=np.int)
self.vertex_idx.fill(-1)
self.vertexes = []
for y in range(height):
for x in range(width):
if self._is_rail(y, x):
idx = len(self.vertexes)
self.vertexes.append(Vertex(y, x, idx))
self.vertex_idx[y, x] = idx
# print('vertexes:', len(self.vertexes))
edges_cnt = 0
for v_idx, v in enumerate(self.vertexes):
start_point = v.point
for direction in range(4):
directions = self._possible_directions(start_point, direction)
# assert len(directions) <= 2
for end_direction in directions:
next_point = self._next_point(start_point, end_direction)
end_v = self._vertex_idx_from_point(next_point)
action_type = self._action_from_directions(direction, end_direction)
e = Edge(v_idx, end_v, direction, end_direction, action_type)
v.out[direction].append(e)
self.vertexes[end_v].in_edges[end_direction].append(e)
edges_cnt += 1
# print('edges_cnt', edges_cnt)
def _is_rail(self, y, x):
return self.env.rail.grid[y, x] != 0
def _next_point(self, point, direction):
if direction==0:
return (point[0]-1, point[1])
elif direction==1:
return (point[0], point[1]+1)
elif direction==2:
return (point[0]+1, point[1])
else:
return (point[0], point[1]-1)
def _possible_directions(self, point, in_direction):
return np.flatnonzero(self.env.rail.get_transitions(point[0], point[1], in_direction))
def _vertex_idx_from_point(self, point):
assert (point[0] >= 0) and (point[0] < self.vertex_idx.shape[0])
assert (point[1] >= 0) and (point[1] < self.vertex_idx.shape[1])
return self.vertex_idx[point[0], point[1]]
def position_from_vertexid(self, vertexid: int):
return self.vertexes[vertexid].point
def _action_from_directions(self, in_direction, new_direction):
if in_direction==new_direction:
return RailEnvActions.MOVE_FORWARD
if (in_direction+1)%4 == new_direction:
return RailEnvActions.MOVE_RIGHT
elif (in_direction-1)%4 == new_direction:
return RailEnvActions.MOVE_LEFT
else:
return RailEnvActions.MOVE_FORWARD
This diff is collapsed.
import traceback
from copy import deepcopy
from typing import Dict
from flatland.envs.rail_env import RailEnv, RailAgentStatus, RailEnvActions
from libs import cell_graph_rescheduling, cell_graph_partial_rescheduling, cell_graph_rescheduling_data
from libs.cell_graph import CellGraph
from libs.cell_graph_agent import CellGraphAgent
from libs.cell_graph_locker import CellGraphLocker
class CellGraphDispatcher:
def __init__(self, env: RailEnv, sort_function=None):
self.env = env
self.graph = CellGraph(env)
self.locker = CellGraphLocker(self.graph)
max_steps = env._max_episode_steps