Commit fdb25dfc authored by nilabha's avatar nilabha
Browse files

Added parallel version for saving experiences with support for global obs

parent e5dcbcf6
import getopt
import os
import sys
import time
import argparse
import tarfile
import numpy as np
import pandas as pd
from collections import deque
import gc
import copy
import tensorflow as tf
from flatland.core.grid import grid4
from flatland.envs.rail_env import RailEnv
from flatland.utils.misc import str2bool
from flatland.envs.observations import TreeObsForRailEnv,GlobalObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.malfunction_generators import malfunction_from_file
from flatland.envs.rail_generators import rail_from_file
from flatland.envs.schedule_generators import schedule_from_file
from flatland.envs.agent_utils import RailAgentStatus
from observation_utils import normalize_observation # noqa
# from gen_envs import *
import json
from functools import partial
from ray.rllib.evaluation.sample_batch_builder import SampleBatchBuilder
from ray.rllib.offline.json_writer import JsonWriter
from tensorflow.python.framework.ops import enable_eager_execution
enable_eager_execution()
parser = argparse.ArgumentParser(description="Approximate digits of Pi using Monte Carlo simulation.")
parser.add_argument("--single", default=False, action="store_true")
parser.add_argument("--visual", default=False, action="store_true")
parser.add_argument("--globalobs", default=False, action="store_true")
extract = False
if extract:
env_path = "medium2-100.tgz"
env_names = env_path.split(".")[0]
if not os.path.isdir(env_names):
with tarfile.open(env_path) as tar_file:
tar_file.extractall('.')
imitate = True
obs_type = "tree" # global
# Setting this parameters to True can slow down training
visuals = False
_max_height = 45
_max_width = 45
columns = ['Agents', 'X_DIM', 'Y_DIM', 'TRIAL_NO',
'REWARD', 'NORMALIZED_REWARD',
'DONE_RATIO', 'STEPS', 'ACTION_PROB']
# To disable parallel for debug purposes etc.
parallel = True
if parallel:
batch_builder = SampleBatchBuilder() # or MultiAgentSampleBatchBuilder
writer = JsonWriter(path="./",max_file_size=1024 * 1024 * 1024)
'''
A 2-d array matrix on-hot encoded similar to tf.one_hot function
https://stackoverflow.com/questions/36960320/convert-a-2d-matrix-to-a-3d-one-hot-matrix-numpy/36960495
'''
def one_hot2d(arr,depth):
return (np.arange(depth) == arr[...,None]).astype(int)
def create_global_observation(agent_obs):
# Taken from the file global_obs_model - Intended to be used with Impala/CNN Architectures
global_obs = list(agent_obs)
height, width = global_obs[0].shape[:2]
pad_height, pad_width = _max_height - height, _max_width - width
global_obs[1] = global_obs[1] + 1 # get rid of -1
assert pad_height >= 0 and pad_width >= 0
final_obs = tuple([
np.pad(o, ((0, pad_height), (0, pad_height), (0, 0)), constant_values=0)
for o in global_obs
])
# observations = [tf.keras.layers.Input(shape=o.shape) for o in final_obs]
# processed_observations = preprocess_obs(tuple(observations))
processed_observations = preprocess_obs(final_obs)
return processed_observations
def preprocess_obs(obs):
transition_map, agents_state, targets = obs
new_agents_state = agents_state.transpose([2,0,1])
*states, = new_agents_state
processed_agents_state_layers = []
for i, feature_layer in enumerate(states):
if i in {0, 1}: # agent direction (categorical)
feature_layer = tf.one_hot(tf.cast(feature_layer, tf.int32), depth=len(grid4.Grid4TransitionsEnum) + 1,
dtype=tf.float32).numpy()
# Numpy Version
# feature_layer = one_hot2d(feature_layer, depth=len(grid4.Grid4TransitionsEnum) + 1)
elif i in {2, 4}: # counts
feature_layer = np.expand_dims(np.log(feature_layer + 1), axis=-1)
else: # well behaved scalars
feature_layer = np.expand_dims(feature_layer, axis=-1)
processed_agents_state_layers.append(feature_layer)
return np.concatenate([transition_map, targets] + processed_agents_state_layers, axis=-1)
def generate_experiences(trials,start=0, tree_depth=2, max_depth = 30,obs_type = "tree",batch_builder = None, writer=None):
# env_file = f"/Users/flaurent/Sites/flatland/flatland-neurips/envs-100-999/envs/Level_{trials}.pkl"
env_file = f"../env_configs/test-envs-small/Test_0/Level_{trials}.mpk"
pad_name = False
if pad_name:
total_size = 5
_str_trial = str(trials)
trials = str(0)*(total_size - len(_str_trial)) + _str_trial
# env_file = f"./{env_names}/envs/Level_{trials}.pkl"
file = f"../env_configs/actions-small/Test_0/Level_{trials}.mpk"
# file = f"/Users/flaurent/Sites/flatland/flatland-neurips/envs-100-999/actions/envs/Level_{trials}.json"
# file = f"./{env_names}/actions/envs/Level_{trials}.json"
if not os.path.isfile(env_file) or not os.path.isfile(file):
print("Missing file!", env_file, file)
return
step = 0
if obs_type == "tree":
obs_builder_object = TreeObsForRailEnv(max_depth=tree_depth,
predictor=ShortestPathPredictorForRailEnv(
max_depth))
elif obs_type == "global":
obs_builder_object = GlobalObsForRailEnv()
env = RailEnv(width=1, height=1,
rail_generator=rail_from_file(env_file),
schedule_generator=schedule_from_file(env_file),
malfunction_generator_and_process_data=malfunction_from_file(
env_file),
obs_builder_object=obs_builder_object)
obs, info = env.reset(
regenerate_rail=True,
regenerate_schedule=True,
activate_agents=False,
random_seed=1001
)
with open(file, "r") as files:
expert_actions = json.load(files)
n_agents = env.get_num_agents()
x_dim, y_dim = env.width, env.height
agent_obs = [None] * n_agents
agent_obs_buffer = [None] * n_agents
done = dict()
done["__all__"] = False
if imitate:
agent_action_buffer = list(
expert_actions[step].values())
else:
# , p=[0.2, 0, 0.5]) # [0] * n_agents
agent_action_buffer = np.random.choice(5, n_agents, replace=True)
update_values = [False] * n_agents
max_steps = int(4 * 2 * (20 + env.height + env.width))
action_size = 5 # 3
# And some variables to keep track of the progress
action_dict = dict()
scores_window = deque(maxlen=100)
reward_window = deque(maxlen=100)
done_window = deque(maxlen=100)
action_prob = [0] * action_size
# agent = Agent(state_size, action_size)
if visuals:
from flatland.utils.rendertools import RenderTool
env_renderer = RenderTool(env, gl="PILSVG")
env_renderer.render_env(
show=True, frames=True, show_observations=True)
for a in range(n_agents):
if obs[a]:
if obs_type == "global":
agent_obs[a] = create_global_observation(obs[a])
elif obs_type == "tree":
agent_obs[a] = normalize_observation(
obs[a], tree_depth, observation_radius=10)
agent_obs_buffer[a] = copy.copy(agent_obs[a]) # agent_obs[a].copy()
# Reset score and done
score = 0
agent_action_buffer = np.zeros(n_agents)
# prev_action = np.zeros_like(env.action_space.sample())
prev_reward = np.zeros(n_agents)
for step in range(max_steps):
for a in range(n_agents):
if info['action_required'][a]:
if imitate:
if step < len(expert_actions):
action = expert_actions[step][str(a)]
else:
action = 0
else:
action = 0
action_prob[action] += 1
update_values[a] = True
else:
update_values[a] = False
action = 0
action_dict.update({a: action})
next_obs, all_rewards, done, info = env.step(action_dict)
for a in range(n_agents):
if next_obs[a] is not None:
if obs_type == "global":
agent_obs[a] = create_global_observation(next_obs[a])
elif obs_type == "tree":
agent_obs[a] = normalize_observation(
next_obs[a], tree_depth, observation_radius=10)
# Only update the values when we are done or when an action
# was taken and thus relevant information is present
if update_values[a] or done[a]:
start += 1
batch_builder.add_values(
t=step,
eps_id=trials,
agent_index=0,
obs=agent_obs_buffer[a],
actions=action_dict[a],
action_prob=1.0, # put the true action probability
rewards=all_rewards[a],
prev_actions=agent_action_buffer[a],
prev_rewards=prev_reward[a],
dones=done[a],
infos=info['action_required'][a],
new_obs=agent_obs[a])
agent_obs_buffer[a] = copy.copy(agent_obs[a]) # agent_obs[a].copy()
agent_action_buffer[a] = action_dict[a]
prev_reward[a] = all_rewards[a]
score += all_rewards[a] # / env.get_num_agents()
if visuals:
env_renderer.render_env(
show=True, frames=True, show_observations=True)
if done["__all__"] or step > max_steps:
writer.write(batch_builder.build_and_reset())
break
# Collection information about training
if step % 100 == 0:
tasks_finished = 0
for current_agent in env.agents:
if current_agent.status == RailAgentStatus.DONE_REMOVED:
tasks_finished += 1
print(
'\rTrial No {} Training {} Agents on ({},{}).\t Steps {}\t Reward: {:.3f}\t Normalized Reward: {:.3f}\tDones: {:.2f}%\t'.format(
trials, env.get_num_agents(), x_dim, y_dim,
step,
score,
score / (max_steps + n_agents),
100 * np.mean(tasks_finished / max(
1, env.get_num_agents()))), end=" ")
tasks_finished = 0
for current_agent in env.agents:
if current_agent.status == RailAgentStatus.DONE_REMOVED:
tasks_finished += 1
done_window.append(tasks_finished / max(1, env.get_num_agents()))
reward_window.append(score)
scores_window.append(score / (max_steps + n_agents))
data = [[n_agents, x_dim, y_dim,
trials,
np.mean(reward_window),
np.mean(scores_window),
100 * np.mean(done_window),
step, action_prob / np.sum(action_prob)]]
df_cur = pd.DataFrame(data, columns=columns)
print(
'\rTrial No {} Training {} Agents on ({},{}).\t Total Steps {}\t Reward: {:.3f}\t Normalized Reward: {:.3f}\tDones: {:.2f}%\t'.format(
trials, env.get_num_agents(), x_dim, y_dim,
step,
np.mean(reward_window),
np.mean(scores_window),
100 * np.mean(done_window)))
if visuals:
env_renderer.close_window()
return df_cur
def main():
args = parser.parse_args()
if args.single:
print("Running process in single process")
global parallel
parallel = False
if args.visual:
print("Rendering environment")
global visuals
visuals = True
if args.globalobs:
print("Running for global observation")
global obs_type
obs_type = "global"
if visuals:
from flatland.utils.rendertools import RenderTool
max_depth = 30
tree_depth = 2
trial_start = 0
n_trials = 97
start = 0
df_all_results = pd.DataFrame(columns=columns)
all_trials = range(trial_start, n_trials+1)
if parallel:
from ray.util.multiprocessing import Pool
print(tf.executing_eagerly(),tf.__version__)
pool = Pool(processes=None)
# By default, Ray uses this to determine the number of CPUs
# TODO: Check if splitting based on cores yields better performance
# Especially for cases where there are too many trials and far less cpu cores
# import psutil
# n_cores = psutil.cpu_count()
# parallel_splits = np.array_split(np.array(all_trials),n_cores)
generate_experiences_trial = partial(generate_experiences,start=start, tree_depth=tree_depth,
max_depth=max_depth, obs_type = obs_type,batch_builder=batch_builder,writer=writer)
for df_cur in pool.map(generate_experiences_trial, all_trials):
if df_cur is not None:
df_all_results = pd.concat([df_all_results, df_cur])
else:
generate_experiences_trial = partial(generate_experiences,start=start, tree_depth=tree_depth,
max_depth=max_depth, obs_type = obs_type,batch_builder=SampleBatchBuilder(),
writer=JsonWriter(path="./",max_file_size=1024 * 1024 * 1024))
for trial in all_trials:
df_cur = generate_experiences_trial(trial)
if df_cur is not None:
df_all_results = pd.concat([df_all_results, df_cur])
if imitate:
df_all_results.to_csv(
f'TreeImitationLearning_DQN_TrainingResults.csv', index=False)
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment