Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • flatland/flatland
  • stefan_otte/flatland
  • jiaodaxiaozi/flatland
  • sfwatergit/flatland
  • utozx126/flatland
  • ChenKuanSun/flatland
  • ashivani/flatland
  • minhhoa/flatland
  • pranjal_dhole/flatland
  • darthgera123/flatland
  • rivesunder/flatland
  • thomaslecat/flatland
  • joel_joseph/flatland
  • kchour/flatland
  • alex_zharichenko/flatland
  • yoogottamk/flatland
  • troye_fang/flatland
  • elrichgro/flatland
  • jun_jin/flatland
  • nimishsantosh107/flatland
20 results
Show changes
Showing
with 1227 additions and 0 deletions
File added
File added
File added
File added
test_id,env_id,n_agents,x_dim,y_dim,n_cities,max_rail_pairs_in_city,n_envs_run,seed,grid_mode,max_rails_between_cities,malfunction_duration_min,malfunction_duration_max,malfunction_interval,speed_ratios
Test_0,Level_0,7,30,30,2,2,10,335971,False,2,20,50,540,"{1.0: 0.25, 0.5: 0.25, 0.33: 0.25, 0.25: 0.25}"
Test_0,Level_1,7,30,30,2,2,10,335972,False,2,20,50,540,"{1.0: 0.25, 0.5: 0.25, 0.33: 0.25, 0.25: 0.25}"
File added
File added
File added
File added
import random
import numpy as np
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
random.seed(100)
np.random.seed(100)
class SimpleObs(ObservationBuilder):
"""
Simplest observation builder. The object returns observation vectors with 5 identical components,
all equal to the ID of the respective agent.
"""
def __init__(self):
super().__init__()
def reset(self):
return
def get(self, handle: int = 0) -> np.ndarray:
observation = handle * np.ones((5,))
return observation
def create_env():
nAgents = 3
n_cities = 2
max_rails_between_cities = 2
max_rails_in_city = 4
seed = 0
env = RailEnv(
width=20,
height=30,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=SimpleObs()
)
return env
def main():
env = create_env()
env.reset()
# Print the observation vector for each agents
obs, all_rewards, done, _ = env.step({0: 0})
for i in range(env.get_num_agents()):
print("Agent ", i, "'s observation: ", obs[i])
if __name__ == '__main__':
main()
import getopt
import random
import sys
import time
from typing import List
import numpy as np
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.core.grid.grid4_utils import get_new_position
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.misc import str2bool
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
class SingleAgentNavigationObs(ObservationBuilder):
"""
We build a representation vector with 3 binary components, indicating which of the 3 available directions
for each agent (Left, Forward, Right) lead to the shortest path to its target.
E.g., if taking the Left branch (if available) is the shortest route to the agent's target, the observation vector
will be [1, 0, 0].
"""
def __init__(self):
super().__init__()
def reset(self):
pass
def get(self, handle: int = 0) -> List[int]:
agent = self.env.agents[handle]
if agent.position:
possible_transitions = self.env.rail.get_transitions(*agent.position, agent.direction)
else:
possible_transitions = self.env.rail.get_transitions(*agent.initial_position, agent.direction)
num_transitions = np.count_nonzero(possible_transitions)
# Start from the current orientation, and see which transitions are available;
# organize them as [left, forward, right], relative to the current orientation
# If only one transition is possible, the forward branch is aligned with it.
if num_transitions == 1:
observation = [0, 1, 0]
else:
min_distances = []
for direction in [(agent.direction + i) % 4 for i in range(-1, 2)]:
if possible_transitions[direction]:
new_position = get_new_position(agent.position, direction)
min_distances.append(
self.env.distance_map.get()[handle, new_position[0], new_position[1], direction])
else:
min_distances.append(np.inf)
observation = [0, 0, 0]
observation[np.argmin(min_distances)] = 1
return observation
def create_env():
nAgents = 1
n_cities = 2
max_rails_between_cities = 2
max_rails_in_city = 4
seed = 0
env = RailEnv(
width=30,
height=40,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=SingleAgentNavigationObs()
)
return env
def custom_observation_example_02_SingleAgentNavigationObs(sleep_for_animation, do_rendering):
env = create_env()
obs, info = env.reset()
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env)
env_renderer.render_env(show=True, frames=True, show_observations=False)
for step in range(100):
action = np.argmax(obs[0]) + 1
obs, all_rewards, done, _ = env.step({0: action})
print("Rewards: ", all_rewards, " [done=", done, "]")
if env_renderer is not None:
env_renderer.render_env(show=True, frames=True, show_observations=True)
if sleep_for_animation:
time.sleep(0.1)
if done["__all__"]:
break
if env_renderer is not None:
env_renderer.close_window()
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
custom_observation_example_02_SingleAgentNavigationObs(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
import getopt
import random
import sys
import time
from typing import Optional, List, Dict
import numpy as np
from flatland.core.env import Environment
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.core.grid.grid_utils import coordinate_to_position
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.misc import str2bool
from flatland.utils.ordered_set import OrderedSet
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
class ObservePredictions(ObservationBuilder):
"""
We use the provided ShortestPathPredictor to illustrate the usage of predictors in your custom observation.
"""
def __init__(self, predictor):
super().__init__()
self.predictor = predictor
def reset(self):
pass
def get_many(self, handles: Optional[List[int]] = None) -> Dict[int, np.ndarray]:
'''
Because we do not want to call the predictor seperately for every agent we implement the get_many function
Here we can call the predictor just ones for all the agents and use the predictions to generate our observations
:param handles:
:return:
'''
self.predictions = self.predictor.get()
self.predicted_pos = {}
if handles is None:
handles = []
for t in range(len(self.predictions[0])):
pos_list = []
for a in handles:
pos_list.append(self.predictions[a][t][1:3])
# We transform (x,y) coodrinates to a single integer number for simpler comparison
self.predicted_pos.update({t: coordinate_to_position(self.env.width, pos_list)})
observations = super().get_many(handles)
return observations
def get(self, handle: int = 0) -> np.ndarray:
'''
Lets write a simple observation which just indicates whether or not the own predicted path
overlaps with other predicted paths at any time. This is useless for the task of navigation but might
help when looking for conflicts. A more complex implementation can be found in the TreeObsForRailEnv class
Each agent recieves an observation of length 10, where each element represents a prediction step and its value
is:
- 0 if no overlap is happening
- 1 where n i the number of other paths crossing the predicted cell
:param handle: handeled as an index of an agent
:return: Observation of handle
'''
observation = np.zeros(10)
# We are going to track what cells where considered while building the obervation and make them accesible
# For rendering
visited = OrderedSet()
for _idx in range(10):
# Check if any of the other prediction overlap with agents own predictions
x_coord = self.predictions[handle][_idx][1]
y_coord = self.predictions[handle][_idx][2]
# We add every observed cell to the observation rendering
visited.add((x_coord, y_coord))
if self.predicted_pos[_idx][handle] in np.delete(self.predicted_pos[_idx], handle, 0):
# We detect if another agent is predicting to pass through the same cell at the same predicted time
observation[handle] = 1
# This variable will be access by the renderer to visualize the observation
self.env.dev_obs_dict[handle] = visited
return observation
def set_env(self, env: Environment):
super().set_env(env)
if self.predictor:
self.predictor.set_env(self.env)
def create_env(custom_obs_builder):
nAgents = 3
n_cities = 2
max_rails_between_cities = 4
max_rails_in_city = 2
seed = 0
env = RailEnv(
width=30,
height=30,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=custom_obs_builder
)
return env
def custom_observation_example_03_ObservePredictions(sleep_for_animation, do_rendering):
# Initiate the Predictor
custom_predictor = ShortestPathPredictorForRailEnv(10)
# Pass the Predictor to the observation builder
custom_obs_builder = ObservePredictions(custom_predictor)
# Initiate Environment
env = create_env(custom_obs_builder)
obs, info = env.reset()
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env)
# We render the initial step and show the obsered cells as colored boxes
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
action_dict = {}
for step in range(100):
for a in range(env.get_num_agents()):
action = np.random.randint(0, 5)
action_dict[a] = action
obs, all_rewards, done, _ = env.step(action_dict)
print("Rewards: ", all_rewards, " [done=", done, "]")
if env_renderer is not None:
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
if sleep_for_animation:
time.sleep(0.5)
if done["__all__"]:
print("All done!")
break
if env_renderer is not None:
env_renderer.close_window()
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
custom_observation_example_03_ObservePredictions(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
import getopt
import random
import sys
import time
from typing import Tuple
import numpy as np
from flatland.core.env_observation_builder import DummyObservationBuilder
from flatland.core.grid.rail_env_grid import RailEnvTransitions
from flatland.core.transition_map import GridTransitionMap
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import rail_from_grid_transition_map
from flatland.utils.misc import str2bool
from flatland.utils.rendertools import RenderTool
def custom_rail_map() -> Tuple[GridTransitionMap, np.array]:
# We instantiate a very simple rail network on a 7x10 grid:
# 0 1 2 3 4 5 6 7 8 9 10
# 0 /-------------\
# 1 | |
# 2 | |
# 3 _ _ _ /_ _ _ |
# 4 \ ___ /
# 5 |/
# 6 |
# 7 |
transitions = RailEnvTransitions()
cells = transitions.transition_list
empty = cells[0]
dead_end_from_south = cells[7]
right_turn_from_south = cells[8]
right_turn_from_west = transitions.rotate_transition(right_turn_from_south, 90)
right_turn_from_north = transitions.rotate_transition(right_turn_from_south, 180)
dead_end_from_west = transitions.rotate_transition(dead_end_from_south, 90)
dead_end_from_north = transitions.rotate_transition(dead_end_from_south, 180)
dead_end_from_east = transitions.rotate_transition(dead_end_from_south, 270)
vertical_straight = cells[1]
simple_switch_north_left = cells[2]
simple_switch_north_right = cells[10]
simple_switch_left_east = transitions.rotate_transition(simple_switch_north_left, 90)
horizontal_straight = transitions.rotate_transition(vertical_straight, 90)
double_switch_south_horizontal_straight = horizontal_straight + cells[6]
double_switch_north_horizontal_straight = transitions.rotate_transition(
double_switch_south_horizontal_straight, 180)
rail_map = np.array(
[[empty] * 3 + [right_turn_from_south] + [horizontal_straight] * 5 + [right_turn_from_west]] +
[[empty] * 3 + [vertical_straight] + [empty] * 5 + [vertical_straight]] * 2 +
[[dead_end_from_east] + [horizontal_straight] * 2 + [simple_switch_left_east] + [horizontal_straight] * 2 + [
right_turn_from_west] + [empty] * 2 + [vertical_straight]] +
[[empty] * 6 + [simple_switch_north_right] + [horizontal_straight] * 2 + [right_turn_from_north]] +
[[empty] * 6 + [vertical_straight] + [empty] * 3] +
[[empty] * 6 + [dead_end_from_north] + [empty] * 3], dtype=np.uint16)
rail = GridTransitionMap(width=rail_map.shape[1],
height=rail_map.shape[0], transitions=transitions)
rail.grid = rail_map
city_positions = [(0, 3), (6, 6)]
train_stations = [
[((0, 3), 0)],
[((6, 6), 0)],
]
city_orientations = [0, 2]
agents_hints = {'city_positions': city_positions,
'train_stations': train_stations,
'city_orientations': city_orientations
}
optionals = {'agents_hints': agents_hints}
return rail, rail_map, optionals
def create_env():
rail, rail_map, optionals = custom_rail_map()
env = RailEnv(width=rail_map.shape[1],
height=rail_map.shape[0],
rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(),
number_of_agents=2,
obs_builder_object=DummyObservationBuilder(),
)
return env
def custom_railmap_example(sleep_for_animation, do_rendering):
random.seed(100)
np.random.seed(100)
env = create_env()
env.reset()
if do_rendering:
env_renderer = RenderTool(env)
env_renderer.render_env(show=True, show_observations=False)
env_renderer.close_window()
if sleep_for_animation:
time.sleep(1)
# uncomment to keep the renderer open
# input("Press Enter to continue...")
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
custom_railmap_example(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
import getopt
import sys
import time
import numpy as np
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.malfunction_generators import MalfunctionParameters
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.persistence import RailEnvPersister
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.misc import str2bool
from flatland.utils.rendertools import RenderTool, AgentRenderVariant
# Import your own Agent or use RLlib to train agents on Flatland
# As an example we use a random agent instead
class RandomAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
def act(self, state):
"""
:param state: input is the observation of the agent
:return: returns an action
"""
return 2 # np.random.choice(np.arange(self.action_size))
def step(self, memories):
"""
Step function to improve agent by adjusting policy given the observations
:param memories: SARS Tuple to be
:return:
"""
return
def save(self, filename):
# Store the current policy
return
def load(self, filename):
# Load a policy
return
def create_env():
# Use the new sparse_rail_generator to generate feasible network configurations with corresponding tasks
# Training on simple small tasks is the best way to get familiar with the environment
# Use a the malfunction generator to break agents from time to time
stochastic_data = MalfunctionParameters(malfunction_rate=30, # Rate of malfunction occurence
min_duration=3, # Minimal duration of malfunction
max_duration=20 # Max duration of malfunction
)
# Custom observation builder
TreeObservation = TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv())
nAgents = 3
n_cities = 2
max_rails_between_cities = 2
max_rails_in_city = 4
seed = 0
env = RailEnv(
width=20,
height=30,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=TreeObsForRailEnv(max_depth=3, predictor=ShortestPathPredictorForRailEnv())
)
return env
def flatland_3_0_example(sleep_for_animation, do_rendering):
np.random.seed(1)
env = create_env()
env.reset()
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env, gl="PILSVG",
agent_render_variant=AgentRenderVariant.AGENT_SHOWS_OPTIONS_AND_BOX,
show_debug=True,
screen_height=1000,
screen_width=1000)
# Initialize the agent with the parameters corresponding to the environment and observation_builder
# Set action space to 4 to remove stop action
agent = RandomAgent(218, 4)
# Empty dictionary for all agent action
action_dict = dict()
print("Start episode...")
# Reset environment and get initial observations for all agents
start_reset = time.time()
obs, info = env.reset()
end_reset = time.time()
print(end_reset - start_reset)
print(env.get_num_agents(), )
# Reset the rendering sytem
if env_renderer is not None:
env_renderer.reset()
# Here you can also further enhance the provided observation by means of normalization
# See training navigation example in the baseline repository
score = 0
# Run episode
frame_step = 0
for step in range(500):
# Chose an action for each agent in the environment
for a in range(env.get_num_agents()):
action = agent.act(obs[a])
action_dict.update({a: action})
# Environment step which returns the observations for all agents, their corresponding
# reward and whether their are done
next_obs, all_rewards, done, _ = env.step(action_dict)
if env_renderer is not None:
env_renderer.render_env(show=True, show_observations=False, show_predictions=False)
frame_step += 1
# Update replay buffer and train agent
for a in range(env.get_num_agents()):
agent.step((obs[a], action_dict[a], all_rewards[a], next_obs[a], done[a]))
score += all_rewards[a]
obs = next_obs.copy()
if done['__all__']:
break
if env_renderer is not None:
env_renderer.close_window()
print('Episode: Steps {}\t Score = {}'.format(step, score))
RailEnvPersister.save(env, "saved_episode_2.pkl")
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
flatland_3_0_example(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
This diff is collapsed.
This diff is collapsed.
# Making Videos from Env
In order to generate Videos or gifs, it is easiest to generate image files and then run ffmpeg to generate a video.
## 1. Generating Images from Env
Start by importing the render and instantiating it
```
from flatland.utils.rendertools import RenderTool
env_renderer = RenderTool(env, gl="PILSVG", )
```
If the environment changes don't forget to reset the renderer
```
env_renderer.reset()
```
You can now record an image after every step. It is best to use a format similar to the one below, where `frame_step` is counting the number of steps.
```
env_renderer.gl.save_image("./Images/Avoiding/flatland_frame_{:04d}.bmp".format(frame_step))
```
Once the images have been saved to the folder you can run a shell from that folder and run the following commands.
Generate a mp4 out of the images:
```
ffmpeg -y -framerate 12 -i flatland_frame_%04d.bmp -hide_banner -c:v libx264 -pix_fmt yuv420p test.mp4
```
Generate a palette out of the video necessary to generate beautiful gifs:
```
ffmpeg -i test.mp4 -filter_complex "[0:v] palettegen" palette.png
```
and finaly generate the gif
```
ffmpeg -i test.mp4 -i palette.png -filter_complex "[0:v][1:v] paletteuse" single_agent_navigation.gif
```
File added