Commit 63ee753f authored by manuschn's avatar manuschn
Browse files

Merge branch 'master' into global-density-obs

parents bbebe763 79edf57b
......@@ -127,4 +127,8 @@ dmypy.json
.pyre/
# misc
.idea
\ No newline at end of file
.idea
# custom extras
small_tree_video/
test.yaml
import getopt
import os
import sys
import time
......@@ -30,6 +31,11 @@ from ray.rllib.offline.json_writer import JsonWriter
imitate = True
## Legacy Code for the correct expert actions
# change below line in method malfunction_from_file in the file flatland.envs.malfunction_generators.py
# mean_malfunction_rate = 1/oMPD.malfunction_rate
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", ""])
......@@ -55,8 +61,8 @@ def main(args):
max_depth = 30
tree_depth = 2
trial_start = 0
n_trials = 97
trial_start = 100
n_trials = 999
start = 0
columns = ['Agents', 'X_DIM', 'Y_DIM', 'TRIAL_NO',
......@@ -66,14 +72,22 @@ def main(args):
for trials in range(trial_start, n_trials + 1):
env_file = f"envs-100-999/envs/Level_{trials}.pkl"
# env_file = f"../env_configs/test-envs-small/Test_0/Level_{trials}.mpk"
# file = f"../env_configs/actions-small/Test_0/Level_{trials}.mpk"
file = f"envs-100-999/actions/envs/Level_{trials}.json"
if not os.path.isfile(env_file) or not os.path.isfile(file):
print("Missing file!", env_file, file)
continue
step = 0
obs_builder_object = TreeObsForRailEnv(max_depth=tree_depth,
predictor=ShortestPathPredictorForRailEnv(
max_depth))
env_file = f"../env_configs/test-envs-small/Test_0/Level_{trials}.mpk"
env = RailEnv(width=1, height=1,
rail_generator=rail_from_file(env_file),
schedule_generator=schedule_from_file(env_file),
......@@ -88,8 +102,6 @@ def main(args):
random_seed=1001
)
file = f"../env_configs/actions-small/Test_0/Level_{trials}.mpk"
with open(file, "r") as files:
expert_actions = json.load(files)
......@@ -244,7 +256,7 @@ def main(args):
step,
np.mean(reward_window),
np.mean(scores_window),
100 * np.mean(done_window)), end=" ")
100 * np.mean(done_window)))
if visuals:
env_renderer.close_window()
......
import getopt
import os
import sys
import time
import argparse
import tarfile
import numpy as np
import pandas as pd
from collections import deque
import gc
import copy
import tensorflow as tf
from flatland.core.grid import grid4
from flatland.envs.rail_env import RailEnv
from flatland.utils.misc import str2bool
from flatland.envs.observations import TreeObsForRailEnv,GlobalObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.malfunction_generators import malfunction_from_file
from flatland.envs.rail_generators import rail_from_file
from flatland.envs.schedule_generators import schedule_from_file
from flatland.envs.agent_utils import RailAgentStatus
from utils.observation_utils import normalize_observation # noqa
# from gen_envs import *
import json
from functools import partial
from ray.rllib.evaluation.sample_batch_builder import SampleBatchBuilder
from ray.rllib.offline.json_writer import JsonWriter
from tensorflow.python.framework.ops import enable_eager_execution
enable_eager_execution()
parser = argparse.ArgumentParser(description="Flatland Saving Experiences Parallel.")
parser.add_argument("--single", default=False, action="store_true")
parser.add_argument("--visual", default=False, action="store_true")
parser.add_argument("--globalobs", default=False, action="store_true")
## Legacy Code for the correct expert actions
# change below line in method malfunction_from_file in the file flatland.envs.malfunction_generators.py
# mean_malfunction_rate = 1/oMPD.malfunction_rate
extract = True
if extract:
env_path = "envs-100-999.tgz"
env_names = env_path.split(".")[0]
if not os.path.isdir(env_names):
with tarfile.open(env_path) as tar_file:
tar_file.extractall('.')
imitate = True
obs_type = "tree" # global
# Setting this parameters to True can slow down training
visuals = False
_max_height = 45
_max_width = 45
columns = ['Agents', 'X_DIM', 'Y_DIM', 'TRIAL_NO',
'REWARD', 'NORMALIZED_REWARD',
'DONE_RATIO', 'STEPS', 'ACTION_PROB']
# To disable parallel for debug purposes etc.
parallel = True
if parallel:
batch_builder = SampleBatchBuilder() # or MultiAgentSampleBatchBuilder
writer = JsonWriter(path="./",max_file_size=1024 * 1024 * 1024)
'''
A 2-d array matrix on-hot encoded similar to tf.one_hot function
https://stackoverflow.com/questions/36960320/convert-a-2d-matrix-to-a-3d-one-hot-matrix-numpy/36960495
'''
def one_hot2d(arr,depth):
return (np.arange(depth) == arr[...,None]).astype(int)
def create_global_observation(agent_obs):
# Taken from the file global_obs_model - Intended to be used with Impala/CNN Architectures
global_obs = list(agent_obs)
height, width = global_obs[0].shape[:2]
pad_height, pad_width = _max_height - height, _max_width - width
global_obs[1] = global_obs[1] + 1 # get rid of -1
assert pad_height >= 0 and pad_width >= 0
final_obs = tuple([
np.pad(o, ((0, pad_height), (0, pad_height), (0, 0)), constant_values=0)
for o in global_obs
])
# observations = [tf.keras.layers.Input(shape=o.shape) for o in final_obs]
# processed_observations = preprocess_obs(tuple(observations))
processed_observations = preprocess_obs(final_obs)
return processed_observations
def preprocess_obs(obs):
transition_map, agents_state, targets = obs
new_agents_state = agents_state.transpose([2,0,1])
*states, = new_agents_state
processed_agents_state_layers = []
for i, feature_layer in enumerate(states):
if i in {0, 1}: # agent direction (categorical)
feature_layer = tf.one_hot(tf.cast(feature_layer, tf.int32), depth=len(grid4.Grid4TransitionsEnum) + 1,
dtype=tf.float32).numpy()
# Numpy Version
# feature_layer = one_hot2d(feature_layer, depth=len(grid4.Grid4TransitionsEnum) + 1)
elif i in {2, 4}: # counts
feature_layer = np.expand_dims(np.log(feature_layer + 1), axis=-1)
else: # well behaved scalars
feature_layer = np.expand_dims(feature_layer, axis=-1)
processed_agents_state_layers.append(feature_layer)
return np.concatenate([transition_map, targets] + processed_agents_state_layers, axis=-1)
def generate_experiences(trials,start=0, tree_depth=2, max_depth = 30,obs_type = "tree",batch_builder = None, writer=None):
env_file = f"envs-100-999/envs/Level_{trials}.pkl"
# env_file = f"../env_configs/test-envs-small/Test_0/Level_{trials}.mpk"
pad_name = False
if pad_name:
total_size = 5
_str_trial = str(trials)
trials = str(0)*(total_size - len(_str_trial)) + _str_trial
# env_file = f"./{env_names}/envs/Level_{trials}.pkl"
# file = f"../env_configs/actions-small/Test_0/Level_{trials}.mpk"
file = f"envs-100-999/actions/envs/Level_{trials}.json"
# file = f"./{env_names}/actions/envs/Level_{trials}.json"
if not os.path.isfile(env_file) or not os.path.isfile(file):
print("Missing file!", env_file, file)
return
step = 0
if obs_type == "tree":
obs_builder_object = TreeObsForRailEnv(max_depth=tree_depth,
predictor=ShortestPathPredictorForRailEnv(
max_depth))
elif obs_type == "global":
obs_builder_object = GlobalObsForRailEnv()
env = RailEnv(width=1, height=1,
rail_generator=rail_from_file(env_file),
schedule_generator=schedule_from_file(env_file),
malfunction_generator_and_process_data=malfunction_from_file(
env_file),
obs_builder_object=obs_builder_object)
obs, info = env.reset(
regenerate_rail=True,
regenerate_schedule=True,
activate_agents=False,
random_seed=1001
)
with open(file, "r") as files:
expert_actions = json.load(files)
n_agents = env.get_num_agents()
x_dim, y_dim = env.width, env.height
agent_obs = [None] * n_agents
agent_obs_buffer = [None] * n_agents
done = dict()
done["__all__"] = False
if imitate:
agent_action_buffer = list(
expert_actions[step].values())
else:
# , p=[0.2, 0, 0.5]) # [0] * n_agents
agent_action_buffer = np.random.choice(5, n_agents, replace=True)
update_values = [False] * n_agents
max_steps = int(4 * 2 * (20 + env.height + env.width))
action_size = 5 # 3
# And some variables to keep track of the progress
action_dict = dict()
scores_window = deque(maxlen=100)
reward_window = deque(maxlen=100)
done_window = deque(maxlen=100)
action_prob = [0] * action_size
# agent = Agent(state_size, action_size)
if visuals:
from flatland.utils.rendertools import RenderTool
env_renderer = RenderTool(env, gl="PILSVG")
env_renderer.render_env(
show=True, frames=True, show_observations=True)
for a in range(n_agents):
if obs[a]:
if obs_type == "global":
agent_obs[a] = create_global_observation(obs[a])
elif obs_type == "tree":
agent_obs[a] = normalize_observation(
obs[a], tree_depth, observation_radius=10)
agent_obs_buffer[a] = copy.copy(agent_obs[a]) # agent_obs[a].copy()
# Reset score and done
score = 0
agent_action_buffer = np.zeros(n_agents)
# prev_action = np.zeros_like(env.action_space.sample())
prev_reward = np.zeros(n_agents)
for step in range(max_steps):
for a in range(n_agents):
if info['action_required'][a]:
if imitate:
if step < len(expert_actions):
action = expert_actions[step][str(a)]
else:
action = 0
else:
action = 0
action_prob[action] += 1
update_values[a] = True
else:
update_values[a] = False
action = 0
action_dict.update({a: action})
next_obs, all_rewards, done, info = env.step(action_dict)
for a in range(n_agents):
if next_obs[a] is not None:
if obs_type == "global":
agent_obs[a] = create_global_observation(next_obs[a])
elif obs_type == "tree":
agent_obs[a] = normalize_observation(
next_obs[a], tree_depth, observation_radius=10)
# Only update the values when we are done or when an action
# was taken and thus relevant information is present
if update_values[a] or done[a]:
start += 1
batch_builder.add_values(
t=step,
eps_id=trials,
agent_index=0,
obs=agent_obs_buffer[a],
actions=action_dict[a],
action_prob=1.0, # put the true action probability
rewards=all_rewards[a],
prev_actions=agent_action_buffer[a],
prev_rewards=prev_reward[a],
dones=done[a],
infos=info['action_required'][a],
new_obs=agent_obs[a])
agent_obs_buffer[a] = copy.copy(agent_obs[a]) # agent_obs[a].copy()
agent_action_buffer[a] = action_dict[a]
prev_reward[a] = all_rewards[a]
score += all_rewards[a] # / env.get_num_agents()
if visuals:
env_renderer.render_env(
show=True, frames=True, show_observations=True)
if done["__all__"] or step > max_steps:
writer.write(batch_builder.build_and_reset())
break
# Collection information about training
if step % 100 == 0:
tasks_finished = 0
for current_agent in env.agents:
if current_agent.status == RailAgentStatus.DONE_REMOVED:
tasks_finished += 1
print(
'\rTrial No {} Training {} Agents on ({},{}).\t Steps {}\t Reward: {:.3f}\t Normalized Reward: {:.3f}\tDones: {:.2f}%\t'.format(
trials, env.get_num_agents(), x_dim, y_dim,
step,
score,
score / (max_steps + n_agents),
100 * np.mean(tasks_finished / max(
1, env.get_num_agents()))), end=" ")
tasks_finished = 0
for current_agent in env.agents:
if current_agent.status == RailAgentStatus.DONE_REMOVED:
tasks_finished += 1
done_window.append(tasks_finished / max(1, env.get_num_agents()))
reward_window.append(score)
scores_window.append(score / (max_steps + n_agents))
data = [[n_agents, x_dim, y_dim,
trials,
np.mean(reward_window),
np.mean(scores_window),
100 * np.mean(done_window),
step, action_prob / np.sum(action_prob)]]
df_cur = pd.DataFrame(data, columns=columns)
print(
'\rTrial No {} Training {} Agents on ({},{}).\t Total Steps {}\t Reward: {:.3f}\t Normalized Reward: {:.3f}\tDones: {:.2f}%\t'.format(
trials, env.get_num_agents(), x_dim, y_dim,
step,
np.mean(reward_window),
np.mean(scores_window),
100 * np.mean(done_window)))
if visuals:
env_renderer.close_window()
return df_cur
def main():
args = parser.parse_args()
if args.single:
print("Running process in single process")
global parallel
parallel = False
if args.visual:
print("Rendering environment")
global visuals
visuals = True
if args.globalobs:
print("Running for global observation")
global obs_type
obs_type = "global"
if visuals:
from flatland.utils.rendertools import RenderTool
max_depth = 30
tree_depth = 2
trial_start = 100
n_trials = 999
start = 0
df_all_results = pd.DataFrame(columns=columns)
all_trials = range(trial_start, n_trials+1)
if parallel:
from ray.util.multiprocessing import Pool
print(tf.executing_eagerly(),tf.__version__)
pool = Pool(processes=None)
# By default, Ray uses this to determine the number of CPUs
# TODO: Check if splitting based on cores yields better performance
# Especially for cases where there are too many trials and far less cpu cores
# import psutil
# n_cores = psutil.cpu_count()
# parallel_splits = np.array_split(np.array(all_trials),n_cores)
generate_experiences_trial = partial(generate_experiences,start=start, tree_depth=tree_depth,
max_depth=max_depth, obs_type = obs_type,batch_builder=batch_builder,writer=writer)
for df_cur in pool.map(generate_experiences_trial, all_trials):
if df_cur is not None:
df_all_results = pd.concat([df_all_results, df_cur])
else:
generate_experiences_trial = partial(generate_experiences,start=start, tree_depth=tree_depth,
max_depth=max_depth, obs_type = obs_type,batch_builder=SampleBatchBuilder(),
writer=JsonWriter(path="./",max_file_size=1024 * 1024 * 1024))
for trial in all_trials:
df_cur = generate_experiences_trial(trial)
if df_cur is not None:
df_all_results = pd.concat([df_all_results, df_cur])
if imitate:
df_all_results.to_csv(
f'TreeImitationLearning_DQN_TrainingResults.csv', index=False)
if __name__ == '__main__':
main()
......@@ -8,6 +8,6 @@ dependencies:
- pyhumps==1.3.1
- gputil==1.4.0
- pyhumps==1.3.1
- wandb==0.8.35
- wandb==0.8.36
- ray[rllib]==0.8.5
- tensorflow==2.1.0
\ No newline at end of file
......@@ -9,6 +9,6 @@ dependencies:
- pyhumps==1.3.1
- gputil==1.4.0
- pyhumps==1.3.1
- wandb==0.8
- wandb==0.8.36
- ray[rllib]==0.8.5
- tensorflow==2.1.0
\ No newline at end of file
......@@ -10,5 +10,5 @@ dependencies:
- pyhumps==1.3.1
- gputil==1.4.0
- pyhumps==1.3.1
- wandb==0.8
- wandb==0.8.36
- ray[rllib]==0.8.5
\ No newline at end of file
......@@ -10,5 +10,5 @@ dependencies:
- pyhumps==1.3.1
- gputil==1.4.0
- pyhumps==1.3.1
- wandb==0.8
- wandb==0.8.36
- ray[rllib]==0.8.5
\ No newline at end of file
# ADRIAN_V0
# config shared by Adrian, without malfunctions
width: 30
height: 30
number_of_agents: 5
max_num_cities: 40
max_rails_between_cities: 2
max_rails_in_city: 8
grid_mode: False
seed: 0
regenerate_rail_on_reset: True
regenerate_schedule_on_reset: True
\ No newline at end of file
......@@ -3,9 +3,34 @@ import numpy as np
from flatland.core.env import Environment
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.envs.observations import GlobalObsForRailEnv
from flatland.core.grid import grid4
from envs.flatland.observations import Observation, register_obs
'''
A 2-d array matrix on-hot encoded similar to tf.one_hot function
https://stackoverflow.com/questions/36960320/convert-a-2d-matrix-to-a-3d-one-hot-matrix-numpy/36960495
'''
def one_hot2d(arr,depth):
return (np.arange(depth) == arr[...,None]).astype(int)
def preprocess_obs(obs):
transition_map, agents_state, targets = obs
new_agents_state = agents_state.transpose([2,0,1])
*states, = new_agents_state
processed_agents_state_layers = []
for i, feature_layer in enumerate(states):
if i in {0, 1}: # agent direction (categorical)
# feature_layer = tf.one_hot(tf.cast(feature_layer, tf.int32), depth=len(grid4.Grid4TransitionsEnum) + 1,
# dtype=tf.float32).numpy()
# Numpy Version
feature_layer = one_hot2d(feature_layer, depth=len(grid4.Grid4TransitionsEnum) + 1)
elif i in {2, 4}: # counts
feature_layer = np.expand_dims(np.log(feature_layer + 1), axis=-1)
else: # well behaved scalars
feature_layer = np.expand_dims(feature_layer, axis=-1)
processed_agents_state_layers.append(feature_layer)
return np.concatenate([transition_map, targets] + processed_agents_state_layers, axis=-1)
@register_obs("global")
class GlobalObservation(Observation):
......@@ -20,11 +45,7 @@ class GlobalObservation(Observation):
def observation_space(self) -> gym.Space:
grid_shape = (self._config['max_width'], self._config['max_height'])
return gym.spaces.Tuple([
gym.spaces.Box(low=0, high=np.inf, shape=grid_shape + (16,), dtype=np.float32),
gym.spaces.Box(low=0, high=np.inf, shape=grid_shape + (5,), dtype=np.float32),
gym.spaces.Box(low=0, high=np.inf, shape=grid_shape + (2,), dtype=np.float32),
])
return gym.spaces.Box(low=0, high=np.inf, shape=grid_shape + (31,), dtype=np.float32)