Commit 96e56bbc authored by metataro's avatar metataro

initial commit

# Byte-compiled / optimized / DLL files
# C extensions
# Distribution / packaging
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
# Installer logs
# Unit test / coverage reports
# Translations
# Django stuff:
# Flask stuff:
# Scrapy stuff:
# Sphinx documentation
# PyBuilder
# Jupyter Notebook
# IPython
# pyenv
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
# celery beat schedule file
# SageMath parsed files
# Environments
# Spyder project settings
# Rope project settings
# mkdocs documentation
# mypy
# Pyre type checker
# misc
\ No newline at end of file
# NeurIPS 2020 Flatland Challenge baselines
The basic structure of this repository is adopted from [](
## Installation
pip install ray[rllib]
pip install tensorflow # or tensorflow-gpu
pip install -r requirements.txt
## Usage
Training example:
python ./ -f experiments/flatland_random_sparse_small/global_obs/ppo.yaml
Test example:
python ./ /tmp/ray/checkpoint_dir/checkpoint-0 --run PPO --no-render
--config '{"env_config": {"test": true}}' --episodes 1000 --out rollouts.pkl
Note that -f overrides all other trial-specific command-line options.
## Experiment structure
Experiments consist of one or many rllib YAML config files
alongside a MARKDOWN file containing results, plots
and a detailed description of the methodology.
All files are stored in a experiment folder under `experiments/<env-name>/<experiment-name>`.
An example can be found under `experiments/flatland_random_sparse_small/global_obs_conv_net/`.
import logging
import random
from flatland.envs.malfunction_generators import malfunction_from_params
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.envs.schedule_generators import sparse_schedule_generator
def random_sparse_env_small(random_seed, max_width, max_height, observation_builder):
size = random.randint(0, 5)
width = 20 + size * 5
height = 20 + size * 5
nr_cities = 2 + size // 2 + random.randint(0, 2)
nr_trains = min(nr_cities * 5, 5 + random.randint(0, 5)) # , 10 + random.randint(0, 10))
max_rails_between_cities = 2
max_rails_in_cities = 3 + random.randint(0, size)
malfunction_rate = 30 + random.randint(0, 100)
malfunction_min_duration = 3 + random.randint(0, 7)
malfunction_max_duration = 20 + random.randint(0, 80)
rail_generator = sparse_rail_generator(max_num_cities=nr_cities, seed=random_seed, grid_mode=False,
stochastic_data = {'malfunction_rate': malfunction_rate, 'min_duration': malfunction_min_duration,
'max_duration': malfunction_max_duration}
schedule_generator = sparse_schedule_generator({1.: 0.25, 1. / 2.: 0.25, 1. / 3.: 0.25, 1. / 4.: 0.25})
while width <= max_width and height <= max_height:
return RailEnv(width=width, height=height, rail_generator=rail_generator,
schedule_generator=schedule_generator, number_of_agents=nr_trains,
obs_builder_object=observation_builder, remove_agents_at_target=False)
except ValueError as e:
logging.error(f"Error: {e}")
width += 5
height += 5"Try again with larger env: (w,h):", width, height)
logging.error(f"Unable to generate env with seed={random_seed}, max_width={max_height}, max_height={max_height}")
return None
import importlib
import os
from abc import ABC, abstractmethod
import gym
from flatland.core.env_observation_builder import ObservationBuilder
class Observation(ABC):
def __init__(self, config) -> None:
self.config = config
def builder(self) -> ObservationBuilder:
def observation_space(self) -> gym.Space:
def register_obs(name: str):
def register_observation_cls(cls):
if name in OBS_REGISTRY:
raise ValueError(f'Observation "{name}" already registred.')
if not issubclass(cls, Observation):
raise ValueError(f'Observation "{name}" ({cls.__name__}) must extend the Observation base class.')
OBS_REGISTRY[name] = cls
return cls
return register_observation_cls
def make_obs(name: str, config, *args, **kwargs) -> Observation:
return OBS_REGISTRY[name](config, *args, **kwargs)
# automatically import any Python files in the obs/ directory
for file in os.listdir(os.path.dirname(__file__)):
if file.endswith('.py') and not file.startswith('_'):
module = importlib.import_module(f'.{file[:-3]}', __name__)
import gym
import numpy as np
from flatland.core.env import Environment
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.envs.observations import GlobalObsForRailEnv
from envs.flatland.observations import Observation, register_obs
class GlobalObservation(Observation):
def __init__(self, config) -> None:
self._config = config
self._builder = PaddedGlobalObsForRailEnv(max_width=config['max_width'], max_height=config['max_height'])
def builder(self) -> ObservationBuilder:
return self._builder
def observation_space(self) -> gym.Space:
grid_shape = (self._config['max_width'], self._config['max_height'])
return gym.spaces.Tuple([
gym.spaces.Box(low=0, high=np.inf, shape=grid_shape + (16,), dtype=np.float32),
gym.spaces.Box(low=0, high=np.inf, shape=grid_shape + (5,), dtype=np.float32),
gym.spaces.Box(low=0, high=np.inf, shape=grid_shape + (2,), dtype=np.float32),
class PaddedGlobalObsForRailEnv(ObservationBuilder):
def __init__(self, max_width, max_height):
self._max_width = max_width
self._max_height = max_height
self._builder = GlobalObsForRailEnv()
def set_env(self, env: Environment):
def reset(self):
def get(self, handle: int = 0):
obs = list(self._builder.get(handle))
height, width = obs[0].shape[:2]
pad_height, pad_width = self._max_height - height, self._max_width - width
obs[1] = obs[1] + 1 # get rid of -1
assert pad_height >= 0 and pad_width >= 0
return tuple([
np.pad(o, ((0, pad_height), (0, pad_height), (0, 0)), constant_values=0)
for o in obs
from typing import Optional, List
import gym
import numpy as np
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from envs.flatland.observations import Observation, register_obs
class TreeObservation(Observation):
def __init__(self, config) -> None:
self._builder = TreeObsForRailEnvRLLibWrapper(
def builder(self) -> ObservationBuilder:
return self._builder
def observation_space(self) -> gym.Space:
num_features_per_node = self._builder.observation_dim
nr_nodes = 0
for i in range(self.config['max_depth'] + 1):
nr_nodes += np.power(4, i)
return gym.spaces.Box(low=-np.inf, high=np.inf, shape=(num_features_per_node * nr_nodes,))
def max_lt(seq, val):
Return greatest item in seq for which item < val applies.
None is returned if seq was empty or all items in seq were >= val.
max = 0
idx = len(seq) - 1
while idx >= 0:
if seq[idx] < val and seq[idx] >= 0 and seq[idx] > max:
max = seq[idx]
idx -= 1
return max
def min_gt(seq, val):
Return smallest item in seq for which item > val applies.
None is returned if seq was empty or all items in seq were >= val.
min = np.inf
idx = len(seq) - 1
while idx >= 0:
if seq[idx] >= val and seq[idx] < min:
min = seq[idx]
idx -= 1
return min
def norm_obs_clip(obs, clip_min=-1, clip_max=1, fixed_radius=0, normalize_to_range=False):
This function returns the difference between min and max value of an observation
:param obs: Observation that should be normalized
:param clip_min: min value where observation will be clipped
:param clip_max: max value where observation will be clipped
:return: returnes normalized and clipped observatoin
if fixed_radius > 0:
max_obs = fixed_radius
max_obs = max(1, max_lt(obs, 1000)) + 1
min_obs = 0 # min(max_obs, min_gt(obs, 0))
if normalize_to_range:
min_obs = min_gt(obs, 0)
if min_obs > max_obs:
min_obs = max_obs
if max_obs == min_obs:
return np.clip(np.array(obs) / max_obs, clip_min, clip_max)
norm = np.abs(max_obs - min_obs)
return np.clip((np.array(obs) - min_obs) / norm, clip_min, clip_max)
def _split_node_into_feature_groups(node: TreeObsForRailEnv.Node) -> (np.ndarray, np.ndarray, np.ndarray):
data = np.zeros(6)
distance = np.zeros(1)
agent_data = np.zeros(4)
data[0] = node.dist_own_target_encountered
data[1] = node.dist_other_target_encountered
data[2] = node.dist_other_agent_encountered
data[3] = node.dist_potential_conflict
data[4] = node.dist_unusable_switch
data[5] = node.dist_to_next_branch
distance[0] = node.dist_min_to_target
agent_data[0] = node.num_agents_same_direction
agent_data[1] = node.num_agents_opposite_direction
agent_data[2] = node.num_agents_malfunctioning
agent_data[3] = node.speed_min_fractional
return data, distance, agent_data
def _split_subtree_into_feature_groups(node: TreeObsForRailEnv.Node, current_tree_depth: int, max_tree_depth: int) -> (np.ndarray, np.ndarray, np.ndarray):
if node == -np.inf:
remaining_depth = max_tree_depth - current_tree_depth
# reference:
num_remaining_nodes = int((4**(remaining_depth+1) - 1) / (4 - 1))
return [-np.inf] * num_remaining_nodes*6, [-np.inf] * num_remaining_nodes, [-np.inf] * num_remaining_nodes*4
data, distance, agent_data = _split_node_into_feature_groups(node)
if not node.childs:
return data, distance, agent_data
for direction in TreeObsForRailEnv.tree_explored_actions_char:
sub_data, sub_distance, sub_agent_data = _split_subtree_into_feature_groups(node.childs[direction], current_tree_depth + 1, max_tree_depth)
data = np.concatenate((data, sub_data))
distance = np.concatenate((distance, sub_distance))
agent_data = np.concatenate((agent_data, sub_agent_data))
return data, distance, agent_data
def split_tree_into_feature_groups(tree: TreeObsForRailEnv.Node, max_tree_depth: int) -> (np.ndarray, np.ndarray, np.ndarray):
This function splits the tree into three difference arrays of values
data, distance, agent_data = _split_node_into_feature_groups(tree)
for direction in TreeObsForRailEnv.tree_explored_actions_char:
sub_data, sub_distance, sub_agent_data = _split_subtree_into_feature_groups(tree.childs[direction], 1, max_tree_depth)
data = np.concatenate((data, sub_data))
distance = np.concatenate((distance, sub_distance))
agent_data = np.concatenate((agent_data, sub_agent_data))
return data, distance, agent_data
def normalize_observation(observation: TreeObsForRailEnv.Node, tree_depth: int, observation_radius=0):
This function normalizes the observation used by the RL algorithm
data, distance, agent_data = split_tree_into_feature_groups(observation, tree_depth)
data = norm_obs_clip(data, fixed_radius=observation_radius)
distance = norm_obs_clip(distance, normalize_to_range=True)
agent_data = np.clip(agent_data, -1, 1)
normalized_obs = np.concatenate((np.concatenate((data, distance)), agent_data))
return normalized_obs
class TreeObsForRailEnvRLLibWrapper(ObservationBuilder):
def __init__(self, tree_obs_builder: TreeObsForRailEnv):
self._builder = tree_obs_builder
def observation_dim(self):
return self._builder.observation_dim
def reset(self):
def get(self, handle: int = 0):
obs = self._builder.get(handle)
return normalize_observation(obs, self._builder.max_depth, observation_radius=10) if obs is not None else obs
def get_many(self, handles: Optional[List[int]] = None):
return {k: normalize_observation(o, self._builder.max_depth, observation_radius=10)
for k, o in self._builder.get_many(handles).items() if o is not None}
def util_print_obs_subtree(self, tree):
def print_subtree(self, node, label, indent):
self._builder.print_subtree(node, label, indent)
def set_env(self, env):
\ No newline at end of file
from collections import defaultdict
from typing import Dict, NamedTuple, Any, Optional
import gym
from flatland.envs.rail_env import RailEnv, RailEnvActions
class StepOutput(NamedTuple):
obs: Dict[int, Any] # depends on observation builder
reward: Dict[int, float]
done: Dict[int, bool]
info: Dict[int, Dict[str, Any]]
class FlatlandRllibWrapper(object):
def __init__(self, rail_env: RailEnv, render: bool = False, regenerate_rail_on_reset: bool = True,
regenerate_schedule_on_reset: bool = True) -> None:
self._env = rail_env
self._agents_done = []
self._agent_scores = defaultdict(float)
self._agent_steps = defaultdict(int)
self._regenerate_rail_on_reset = regenerate_rail_on_reset
self._regenerate_schedule_on_reset = regenerate_schedule_on_reset
self._action_space = gym.spaces.Discrete(5)
if render:
from flatland.utils.rendertools import RenderTool
self.renderer = RenderTool(self._env, gl="PILSVG")
self.renderer = None
def action_space(self) -> gym.spaces.Discrete:
return self._action_space
def step(self, action_dict: Dict[int, RailEnvActions]) -> StepOutput:
d, r, o = None, None, None
obs_or_done = False
while not obs_or_done:
# Perform env steps as long as there is no observation (for all agents) or all agents are done
# The observation is `None` if an agent is done or malfunctioning.
obs, rewards, dones, infos = self._env.step(action_dict)
if self.renderer is not None:
self.renderer.render_env(show=True, show_predictions=True, show_observations=False)
d, r, o = dict(), dict(), dict()
for agent, done in dones.items():
if agent != '__all__' and not agent in obs:
continue # skip agent if there is no observation
if agent not in self._agents_done:
if agent != '__all__':
if done:
if infos['action_required'][agent] or done: # filter out agents that need no action
o[agent] = obs[agent]
r[agent] = rewards[agent]
self._agent_scores[agent] += rewards[agent]
self._agent_steps[agent] += 1
d[agent] = dones[agent]
action_dict = {} # reset action dict for cases where we do multiple env steps
obs_or_done = len(o) > 0 or d['__all__'] # step through env as long as there are no obs/all agents done
assert all([x is not None for x in (d, r, o)])
return StepOutput(obs=o, reward=r, done=d, info={agent: {
'max_episode_steps': self._env._max_episode_steps,
'num_agents': self._env.get_num_agents(),
'agent_done': d[agent] and agent not in self._env.active_agents,
'agent_score': self._agent_scores[agent],
'agent_step': self._agent_steps[agent],
} for agent in o.keys()})
def reset(self, random_seed: Optional[int] = None) -> Dict[int, Any]:
self._agents_done = []
self._agent_scores = defaultdict(float)
self._agent_steps = defaultdict(int)
obs, infos = self._env.reset(regenerate_rail=self._regenerate_rail_on_reset,
if self.renderer is not None:
return {k: o for k, o in obs.items() if not k == '__all__'}
import random
import gym
from ray.rllib import MultiAgentEnv
from envs.flatland.env_generators import random_sparse_env_small
from envs.flatland.observations import make_obs
from envs.flatland.rllib_wrapper import FlatlandRllibWrapper
class FlatlandRandomSparseSmall(MultiAgentEnv):
def __init__(self, env_config) -> None:
self._env_config = env_config
self._test = env_config.get('test', False)
self._min_seed = env_config['min_seed']
self._max_seed = env_config['max_seed']
assert self._min_seed <= self._max_seed
self._min_test_seed = env_config.get('min_test_seed', 0)
self._max_test_seed = env_config.get('max_test_seed', 100)
assert self._min_test_seed <= self._max_test_seed
self._next_test_seed = self._min_test_seed
self._num_resets = 0
self._observation = make_obs(env_config['observation'], env_config.get('observation_config'))
self._env = FlatlandRllibWrapper(rail_env=self._launch(), render=env_config['render'],
def observation_space(self) -> gym.spaces.Space:
return self._observation.observation_space()
def action_space(self) -> gym.spaces.Space:
return self._env.action_space
def _generate_random_seed(self):
return random.randint(self._min_seed, self._max_seed)
def _launch(self, max_tries=5):
env = None
num_tries = 0
while env is None and num_tries < max_tries:
if self._test:
random_seed = self._next_test_seed
rel_next_seed = self._next_test_seed - self._min_test_seed
rel_max_seed = self._max_test_seed - self._min_test_seed
self._next_test_seed = self._min_test_seed + ((rel_next_seed + 1) % (rel_max_seed + 1)) # inclusive max
random_seed = self._generate_random_seed()
random_seed = random_seed * 19997 + 997 # backwards consistency
env = random_sparse_env_small(random_seed=random_seed, max_width=45, max_height=45,
num_tries += 1
if env is None:
raise RuntimeError(f"Unable to launch env within {max_tries} tries.")
return env
def step(self, action_dict):
return self._env.step(action_dict)
def reset(self):
if self._test or (
self._env_config['reset_env_freq'] is not None
and self._num_resets > 0
and self._num_resets % self._env_config['reset_env_freq'] == 0
self._env.env = self._launch()
self._num_resets += 1
return self._env.reset(random_seed=self._next_test_seed if self._test else self._generate_random_seed())
# Global observation convnet experiments
## Method
In this experiment, we compare the performance of two established CNN architectures on the global
observations. In the first case, agents are based on the Nature-CNN architecture [2] that
consists of 3 convolutional layers followed by a dense layer. In the second case, the
agents are based on the IMPALA-CNN [1] network, which consists of a 15-layer residual architecture
neural network followed by a dense layer. Agents share the same centralized
policy network.