Commit ff5c7fe1 authored by manuschn's avatar manuschn
Browse files

Merge branch 'cleanup' into combined-obs-from-config

parents 96fad330 c21c8c24
# NeurIPS 2020 Flatland Challenge baselines
# 🚂 Flatland Baselines
📈 [**Results**](
This repository contains reinforcement learning baselines for the [NeurIPS 2020 Flatland Challenge](
## Provided baselines
**Note:** looking for something simpler to start? We also provide a simpler Dueling Double DQN method implemented using PyTorch without relying on RLlib: ****
### RL Methods
- Ape-X
- Pure Imitation Learning: MARWIL
- Mixed IL/RL: DQfD
### Custom observations
- Density observations
- Local conflict observations
### Tricks
- Action skipping
- Action masking
## Organisation
Experiments consist of one or many RLlib YAML config files alongside a MARKDOWN file containing results, plots and a detailed description of the methodology.
......@@ -12,31 +38,20 @@ All files are stored in a experiment folder under `experiments/<env-name>/<exper
- [Tree observations w/ fully connected network](experiments/flatland_random_sparse_small/tree_obs_fc_net)
- [Global observations w/ convnet](experiments/flatland_random_sparse_small/global_obs_conv_net)
## Setup
Using conda (recommended):
The setup uses conda, [install it]( if necessary.
# with GPU support:
conda env create -f environment-gpu.yml
conda activate flatland-baseline-gpu-env
# or, without GPU support:
#conda env create -f environment-cpu.yml
conda activate flatland-env
pip install -r requirements.txt
conda env create -f environment-cpu.yml
conda activate flatland-baseline-cpu-env
Using pip:
# no GPU support:
pip install -r requirements.txt
You may need to install/update bazel: [Ubuntu guide](
## Usage
Training example:
......@@ -48,9 +63,8 @@ Evaluation example:
`python ./ /tmp/ray/checkpoint_dir/checkpoint-0 --run PPO --no-render
--config '{"env_config": {"test": true}}' --episodes 1000 --out rollouts.pkl`
Note that -f overrides all other trial-specific command-line options.
Note that `-f` overrides all other trial-specific command-line options.
## Notes
- The basic structure of this repository is adopted from [](
\ No newline at end of file
- The basic structure of this repository is adapted from [](
\ No newline at end of file
# Global Density Observation
```{admonition} TL;DR
This observation is a global observation, that provides an agent with information on its own and the other agents predicted paths. The paths are predicted from the shortest path of each agent to its respective target. The information is encoded into a density map.
### 💡 The idea
The density observation is based on the idea that every agent's path to its target is represented in a discrete map of the environment assigning each location (cell) a value encoding the information if and when the cell will be occupied. For simplicity, we assume that an agent follows the shortest path to its target and don't consider alternative paths. The individual values along the agents' shortest paths are combined into a "density" for each cell. For example, if all agents would occupy the same cell at the same time step, the density would be very high. If the agents would use the same cell but at different time steps the density for that cell would be lower. The density map therefore potentially allows the agents to learn from the (projected) cell occupancy distribution.
### 🗂️ Files and usage
The observation is defined in "neurips2020-flatland-baselines/envs/flatland/observations/", the model used in the example in "neurips2020-flatland-baselines/models/".
The observation can be configured with the following parameters:
* width and height: have to correspond to the shape of the environment
* max_t: max number of time steps the path of each agent is predicted for
* encoding: defining how to factor in the time information into the density value (2d options: exp_decay, lin_decay, binary; 3d option: series; see next section for more details)
An example config is located in "neurips2020-flatland-baselines/baselines/global_density_obs/sparse_small_apex_expdecay_maxt1000.yaml" and can be run with
`python ./ -f baselines/global_density_obs/sparse_small_apex_expdecay_maxt1000.yaml`
### 📦 Implementation Details
The observation for each agent consists of two arrays representing the cells of the environment. The first array contains the density values for the agent itself, and the second one the mean of the other agents' values for each cell. The arrays are either two or three dimensional depending on the encoding.
The idea behind this parameter is to provide a way to compress the space and time information into a 2d representation. However, it is possible to get a 3d observation with a separate, 2d density map for each time step, by using the option "series" (for time series) for the encoding. In this case, a binary representation for the individual agent occupancies is used.
The other options use a function of the time step *t* and the maximal time step *max_t* to determine the density value:
* exp_decay: e^(-t / max_t^(1/2))
* lin_decay: (max_t - t) / max_t
* binary: 1
We created a custom model (GlobalDensObsModel) for this observation that uses a convolutional neural network to process the observation. For the experiments, we used the IMPALA (see links section) implementation.
### 📈 Results
We trained the agents with the different encoding options and different values for max_t using Ape-X (see links section). However, we didn't search systematically or exhaustibly for the best settings.
The best runs achieved around 45% mean completion on the sparse, small flatand environment with max_t = 1000 and encoding = exp_decay. The mean completion rate is considerably lower than the tree observation but show that learning is possible from global observations and can inform approaches to combine local, tree and global observations.
More information on the runs is can be found in the weights and biases report linked below.
### 🔗 Links
* [IMPALA Paper – IMPALA: Scalable Distributed Deep-RL with Importance Weighted Actor-Learner Architectures (Espeholt et al.)](
* [Ape-X Paper – Distributed Prioritized Experience Replay (Horgan et al.)](
* [W&B report for training runs](|-sparse-small_v0--VmlldzoxMTYxMDE)
### 🌟 Credits
run: APEX
env: flatland_sparse
timesteps_total: 15000000 # 1.5e7
checkpoint_freq: 10
checkpoint_at_end: True
keep_checkpoints_num: 5
checkpoint_score_attr: episode_reward_mean
num_samples: 3
num_workers: 13
num_envs_per_worker: 5
num_gpus: 0
hiddens: []
dueling: False
observation: density
width: 25
height: 25
max_t: 1000
encoding: exp_decay
generator: sparse_rail_generator
generator_config: small_v0
project: flatland
entity: masterscrat
tags: ["small_v0", "density_obs", "apex"] # TODO should be set programmatically
custom_model: global_dens_obs_model
architecture: impala
residual_layers: [[16, 2], [32, 4]]
evaluation_num_workers: 2
evaluation_interval: 100
evaluation_num_episodes: 100
explore: False
observation: density
width: 25
height: 25
max_t: 1000
encoding: exp_decay
regenerate_rail_on_reset: True
regenerate_schedule_on_reset: True
render: False
name: flatland-env
name: flatland-baseline-cpu-env
- python=3.7
- pip=20.0
- pip:
- flatland-rl==2.1.10
- flatland-rl==2.2.1
- CairoSVG==2.4.2
# - pycairo==1.19.1 # fails!
- pyhumps==1.3.1
- gputil==1.4.0
- pyhumps==1.3.1
- wandb==0.8.36
- wandb==0.9.2
- ray[rllib]==0.8.5
- tensorflow==2.1.0
\ No newline at end of file
name: flatland-env
- python=3.7
- pip=20.0
- pip:
- git+
- CairoSVG==2.4.2
# - pycairo==1.19.1 # fails!
- pyhumps==1.3.1
- gputil==1.4.0
- pyhumps==1.3.1
- wandb==0.8.36
- ray[rllib]==0.8.5
- tensorflow==2.1.0
\ No newline at end of file
name: flatland-env
- python=3.7
- tensorflow-gpu=2.1.0
- pip=20.0
- pip:
- git+
- CairoSVG==2.4.2
# - pycairo==1.19.1 # fails!
- pyhumps==1.3.1
- gputil==1.4.0
- pyhumps==1.3.1
- wandb==0.8.36
- ray[rllib]==0.8.5
\ No newline at end of file
name: flatland-env
name: flatland-baseline-gpu-env
- python=3.7
- tensorflow-gpu=2.1.0
- pip=20.0
- pip:
- flatland-rl==2.1.10
- flatland-rl==2.2.1
- CairoSVG==2.4.2
# - pycairo==1.19.1 # fails!
- pyhumps==1.3.1
- gputil==1.4.0
- pyhumps==1.3.1
- wandb==0.8.36
- wandb==0.9.2
- ray[rllib]==0.8.5
\ No newline at end of file
import gym
import numpy as np
from typing import Optional, List, Dict
from flatland.core.env import Environment
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.envs.agent_utils import RailAgentStatus
from flatland.envs.rail_env import RailEnv
from flatland.core.env_prediction_builder import PredictionBuilder
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from envs.flatland.observations import Observation, register_obs
class ProjectedDensityObservation(Observation):
def __init__(self, config) -> None:
self._builder = ProjectedDensityForRailEnv(config['height'], config['width'], config['encoding'], config['max_t'])
def builder(self) -> ObservationBuilder:
return self._builder
def observation_space(self) -> gym.Space:
obs_shape = self._builder.observation_shape
return gym.spaces.Tuple([
gym.spaces.Box(low=0, high=1, shape=obs_shape, dtype=np.float32),
gym.spaces.Box(low=0, high=1, shape=obs_shape, dtype=np.float32),
class ProjectedDensityForRailEnv(ObservationBuilder):
def __init__(self, height, width, encoding='exp_decay', max_t=10):
self._height = height
self._width = width
self._depth = max_t + 1 if encoding == 'series' else 1
if encoding == 'exp_decay':
self._encode = lambda t: np.exp(-t / np.sqrt(max_t))
elif encoding == 'lin_decay':
self._encode = lambda t: (max_t - t) / max_t
self._encode = lambda t: 1
self._predictor = ShortestPathPredictorForRailEnv(max_t)
def observation_shape(self):
return (self._height, self._width, self._depth)
def get_many(self, handles: Optional[List[int]] = None) -> Dict[int, np.ndarray]:
get density maps for agents and compose the observation with agent's and other's density maps
self._predictions = self._predictor.get()
density_maps = dict()
for handle in handles:
density_maps[handle] = self.get(handle)
obs = dict()
for handle in handles:
other_dens_maps = [density_maps[key] for key in density_maps if key != handle]
others_density = np.mean(np.array(other_dens_maps), axis=0)
obs[handle] = [density_maps[handle], others_density]
return obs
def get(self, handle: int = 0):
compute density map for agent: a value is asigned to every cell along the shortest path between
the agent and its target based on the distance to the agent, i.e. the number of time steps the
agent needs to reach the cell, encoding the time information.
density_map = np.zeros(shape=(self._height, self._width, self._depth), dtype=np.float32)
agent = self.env.agents[handle]
if self._predictions[handle] is not None:
for t, prediction in enumerate(self._predictions[handle]):
p = tuple(np.array(prediction[1:3]).astype(int))
d = t if self._depth > 1 else 0
density_map[p][d] = self._encode(t)
return density_map
def set_env(self, env: Environment):
self.env: RailEnv = env
def reset(self):
import hashlib
import json
import logging
import os
import random
import time
import msgpack
import msgpack_numpy as m
import pickle
import numpy as np
import redis
import flatland
from flatland.envs.malfunction_generators import malfunction_from_file
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import rail_from_file
from flatland.envs.schedule_generators import schedule_from_file
from flatland.evaluators import messages
logger = logging.getLogger(__name__)
class FlatlandRemoteClient(object):
Redis client to interface with flatland-rl remote-evaluation-service
The Docker container hosts a redis-server inside the container.
This client connects to the same redis-server,
and communicates with the service.
The service eventually will reside outside the docker container,
and will communicate
with the client only via the redis-server of the docker container.
On the instantiation of the docker container, one service will be
instantiated parallely.
The service will accepts commands at "`service_id`::commands"
where `service_id` is either provided as an `env` variable or is
instantiated to "flatland_rl_redis_service_id"
def __init__(self,
self.remote_host = remote_host
self.remote_port = remote_port
self.remote_db = remote_db
self.remote_password = remote_password
self.redis_pool = redis.ConnectionPool(
self.redis_conn = redis.Redis(connection_pool=self.redis_pool)
self.namespace = "flatland-rl"
self.service_id = os.getenv(
self.command_channel = "{}::{}::commands".format(
if test_envs_root:
self.test_envs_root = test_envs_root
self.test_envs_root = os.getenv(
self.current_env_path = None
self.verbose = verbose
self.env = None
self.env_step_times = []
self.stats = {}
def update_running_mean_stats(self, key, scalar):
Computes the running mean for certain params
mean_key = "{}_mean".format(key)
counter_key = "{}_counter".format(key)
self.stats[mean_key] = \
((self.stats[mean_key] * self.stats[counter_key]) + scalar) / (self.stats[counter_key] + 1)
self.stats[counter_key] += 1
except KeyError:
self.stats[mean_key] = 0
self.stats[counter_key] = 0
def get_redis_connection(self):
return self.redis_conn
def _generate_response_channel(self):
random_hash = hashlib.md5(
random.randint(0, 10 ** 10)
response_channel = "{}::{}::response::{}".format(self.namespace,
return response_channel
def _remote_request(self, _request, blocking=True):
response: (on response_channel)
* Send the payload on command_channel (self.namespace+"::command")
** redis-left-push (LPUSH)
* Keep listening on response_channel (BLPOP)
assert isinstance(_request, dict)
_request['response_channel'] = self._generate_response_channel()
_request['timestamp'] = time.time()
_redis = self.get_redis_connection()
The client always pushes in the left
and the service always pushes in the right
if self.verbose:
print("Request : ", _request)
# Push request in command_channels
# Note: The patched msgpack supports numpy arrays
if self.use_pickle:
payload = pickle.dumps(_request)
payload = msgpack.packb(_request, default=m.encode, use_bin_type=True)
_redis.lpush(self.command_channel, payload)
if blocking:
# Wait with a blocking pop for the response
_response = _redis.blpop(_request['response_channel'])[1]
if self.verbose:
print("Response : ", _response)
if self.use_pickle:
_response = pickle.loads(_response)
_response = msgpack.unpackb(
strict_map_key=False, # new for msgpack 1.0?
encoding="utf8" # remove for msgpack 1.0
if _response['type'] == messages.FLATLAND_RL.ERROR:
raise Exception(str(_response["payload"]))
return _response
def ping_pong(self):
Official Handshake with the evaluation service
Send a PING
and wait for PONG
If not PONG, raise error
_request = {}
_request['type'] = messages.FLATLAND_RL.PING
_request['payload'] = {
"version": flatland.__version__
_response = self._remote_request(_request)
if _response['type'] != messages.FLATLAND_RL.PONG:
raise Exception(
"Unable to perform handshake with the evaluation service. \
Expected PONG; received {}".format(json.dumps(_response)))
return True
def env_create(self, obs_builder_object):
Create a local env and remote env on which the
local agent can operate.
The observation builder is only used in the local env
and the remote env uses a DummyObservationBuilder
time_start = time.time()
_request = {}
_request['type'] = messages.FLATLAND_RL.ENV_CREATE
_request['payload'] = {}
_response = self._remote_request(_request)
observation = _response['payload']['observation']
info = _response['payload']['info']
random_seed = _response['payload']['random_seed']
test_env_file_path = _response['payload']['env_file_path']
time_diff = time.time() - time_start
self.update_running_mean_stats("env_creation_wait_time", time_diff)
if not observation:
# If the observation is False,
# then the evaluations are complete
# hence return false
return observation, info
if self.verbose:
print("Received Env : ", test_env_file_path)
test_env_file_path = os.path.join(
if not os.path.exists(test_env_file_path):
raise Exception(
"\nWe cannot seem to find the env file paths at the required location.\n"
"Did you remember to set the AICROWD_TESTS_FOLDER environment variable "
"to point to the location of the Tests folder ? \n"
"We are currently looking at `{}` for the tests".format(self.test_envs_root)
if self.verbose:
print("Current env path : ", test_env_file_path)
self.current_env_path = test_env_file_path
self.env = RailEnv(width=1, height=1, rail_generator=rail_from_file(test_env_file_path),
time_start = time.time()
local_observation, info = self.env.reset(
time_diff = time.time() - time_start
self.update_running_mean_stats("internal_env_reset_time", time_diff)
# Use the local observation
# as the remote server uses a dummy observation builder
return local_observation, info
def env_step(self, action, render=False):
Respond with [observation, reward, done, info]
_request = {}
_request['type'] = messages.FLATLAND_RL.ENV_STEP