Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • flatland/flatland
  • stefan_otte/flatland
  • jiaodaxiaozi/flatland
  • sfwatergit/flatland
  • utozx126/flatland
  • ChenKuanSun/flatland
  • ashivani/flatland
  • minhhoa/flatland
  • pranjal_dhole/flatland
  • darthgera123/flatland
  • rivesunder/flatland
  • thomaslecat/flatland
  • joel_joseph/flatland
  • kchour/flatland
  • alex_zharichenko/flatland
  • yoogottamk/flatland
  • troye_fang/flatland
  • elrichgro/flatland
  • jun_jin/flatland
  • nimishsantosh107/flatland
20 results
Show changes
Showing
with 1142 additions and 490 deletions
import getopt
import random
import sys
import time
from typing import List
import numpy as np
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.core.grid.grid4_utils import get_new_position
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.misc import str2bool
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
class SingleAgentNavigationObs(ObservationBuilder):
"""
We build a representation vector with 3 binary components, indicating which of the 3 available directions
for each agent (Left, Forward, Right) lead to the shortest path to its target.
E.g., if taking the Left branch (if available) is the shortest route to the agent's target, the observation vector
will be [1, 0, 0].
"""
def __init__(self):
super().__init__()
def reset(self):
pass
def get(self, handle: int = 0) -> List[int]:
agent = self.env.agents[handle]
if agent.position:
possible_transitions = self.env.rail.get_transitions(*agent.position, agent.direction)
else:
possible_transitions = self.env.rail.get_transitions(*agent.initial_position, agent.direction)
num_transitions = np.count_nonzero(possible_transitions)
# Start from the current orientation, and see which transitions are available;
# organize them as [left, forward, right], relative to the current orientation
# If only one transition is possible, the forward branch is aligned with it.
if num_transitions == 1:
observation = [0, 1, 0]
else:
min_distances = []
for direction in [(agent.direction + i) % 4 for i in range(-1, 2)]:
if possible_transitions[direction]:
new_position = get_new_position(agent.position, direction)
min_distances.append(
self.env.distance_map.get()[handle, new_position[0], new_position[1], direction])
else:
min_distances.append(np.inf)
observation = [0, 0, 0]
observation[np.argmin(min_distances)] = 1
return observation
def create_env():
nAgents = 1
n_cities = 2
max_rails_between_cities = 2
max_rails_in_city = 4
seed = 0
env = RailEnv(
width=30,
height=40,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=SingleAgentNavigationObs()
)
return env
def custom_observation_example_02_SingleAgentNavigationObs(sleep_for_animation, do_rendering):
env = create_env()
obs, info = env.reset()
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env)
env_renderer.render_env(show=True, frames=True, show_observations=False)
for step in range(100):
action = np.argmax(obs[0]) + 1
obs, all_rewards, done, _ = env.step({0: action})
print("Rewards: ", all_rewards, " [done=", done, "]")
if env_renderer is not None:
env_renderer.render_env(show=True, frames=True, show_observations=True)
if sleep_for_animation:
time.sleep(0.1)
if done["__all__"]:
break
if env_renderer is not None:
env_renderer.close_window()
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
custom_observation_example_02_SingleAgentNavigationObs(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
import getopt
import random
import sys
import time
from typing import Optional, List, Dict
import numpy as np
from flatland.core.env import Environment
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.core.grid.grid_utils import coordinate_to_position
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.misc import str2bool
from flatland.utils.ordered_set import OrderedSet
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
class ObservePredictions(ObservationBuilder):
"""
We use the provided ShortestPathPredictor to illustrate the usage of predictors in your custom observation.
"""
def __init__(self, predictor):
super().__init__()
self.predictor = predictor
def reset(self):
pass
def get_many(self, handles: Optional[List[int]] = None) -> Dict[int, np.ndarray]:
'''
Because we do not want to call the predictor seperately for every agent we implement the get_many function
Here we can call the predictor just ones for all the agents and use the predictions to generate our observations
:param handles:
:return:
'''
self.predictions = self.predictor.get()
self.predicted_pos = {}
if handles is None:
handles = []
for t in range(len(self.predictions[0])):
pos_list = []
for a in handles:
pos_list.append(self.predictions[a][t][1:3])
# We transform (x,y) coodrinates to a single integer number for simpler comparison
self.predicted_pos.update({t: coordinate_to_position(self.env.width, pos_list)})
observations = super().get_many(handles)
return observations
def get(self, handle: int = 0) -> np.ndarray:
'''
Lets write a simple observation which just indicates whether or not the own predicted path
overlaps with other predicted paths at any time. This is useless for the task of navigation but might
help when looking for conflicts. A more complex implementation can be found in the TreeObsForRailEnv class
Each agent recieves an observation of length 10, where each element represents a prediction step and its value
is:
- 0 if no overlap is happening
- 1 where n i the number of other paths crossing the predicted cell
:param handle: handeled as an index of an agent
:return: Observation of handle
'''
observation = np.zeros(10)
# We are going to track what cells where considered while building the obervation and make them accesible
# For rendering
visited = OrderedSet()
for _idx in range(10):
# Check if any of the other prediction overlap with agents own predictions
x_coord = self.predictions[handle][_idx][1]
y_coord = self.predictions[handle][_idx][2]
# We add every observed cell to the observation rendering
visited.add((x_coord, y_coord))
if self.predicted_pos[_idx][handle] in np.delete(self.predicted_pos[_idx], handle, 0):
# We detect if another agent is predicting to pass through the same cell at the same predicted time
observation[handle] = 1
# This variable will be access by the renderer to visualize the observation
self.env.dev_obs_dict[handle] = visited
return observation
def set_env(self, env: Environment):
super().set_env(env)
if self.predictor:
self.predictor.set_env(self.env)
def create_env(custom_obs_builder):
nAgents = 3
n_cities = 2
max_rails_between_cities = 4
max_rails_in_city = 2
seed = 0
env = RailEnv(
width=30,
height=30,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=custom_obs_builder
)
return env
def custom_observation_example_03_ObservePredictions(sleep_for_animation, do_rendering):
# Initiate the Predictor
custom_predictor = ShortestPathPredictorForRailEnv(10)
# Pass the Predictor to the observation builder
custom_obs_builder = ObservePredictions(custom_predictor)
# Initiate Environment
env = create_env(custom_obs_builder)
obs, info = env.reset()
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env)
# We render the initial step and show the obsered cells as colored boxes
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
action_dict = {}
for step in range(100):
for a in range(env.get_num_agents()):
action = np.random.randint(0, 5)
action_dict[a] = action
obs, all_rewards, done, _ = env.step(action_dict)
print("Rewards: ", all_rewards, " [done=", done, "]")
if env_renderer is not None:
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
if sleep_for_animation:
time.sleep(0.5)
if done["__all__"]:
print("All done!")
break
if env_renderer is not None:
env_renderer.close_window()
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
custom_observation_example_03_ObservePredictions(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
import getopt
import random
import sys
import time
from typing import Tuple
import numpy as np
from flatland.core.env_observation_builder import DummyObservationBuilder
from flatland.core.grid.rail_env_grid import RailEnvTransitions
from flatland.core.transition_map import GridTransitionMap
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import rail_from_grid_transition_map
from flatland.utils.misc import str2bool
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
def custom_rail_map() -> Tuple[GridTransitionMap, np.array]:
# We instantiate a very simple rail network on a 7x10 grid:
# 0 1 2 3 4 5 6 7 8 9 10
# 0 /-------------\
# 1 | |
# 2 | |
# 3 _ _ _ /_ _ _ |
# 4 \ ___ /
# 5 |/
# 6 |
# 7 |
transitions = RailEnvTransitions()
cells = transitions.transition_list
def custom_rail_generator():
def generator(width, height, num_agents=0, num_resets=0):
rail_trans = RailEnvTransitions()
grid_map = GridTransitionMap(width=width, height=height, transitions=rail_trans)
rail_array = grid_map.grid
rail_array.fill(0)
new_tran = rail_trans.set_transition(1, 1, 1, 1)
print(new_tran)
agents_positions = []
agents_direction = []
agents_target = []
rail_array[0, 0] = new_tran
rail_array[0, 1] = new_tran
return grid_map, agents_positions, agents_direction, agents_target
empty = cells[0]
dead_end_from_south = cells[7]
right_turn_from_south = cells[8]
right_turn_from_west = transitions.rotate_transition(right_turn_from_south, 90)
right_turn_from_north = transitions.rotate_transition(right_turn_from_south, 180)
dead_end_from_west = transitions.rotate_transition(dead_end_from_south, 90)
dead_end_from_north = transitions.rotate_transition(dead_end_from_south, 180)
dead_end_from_east = transitions.rotate_transition(dead_end_from_south, 270)
vertical_straight = cells[1]
simple_switch_north_left = cells[2]
simple_switch_north_right = cells[10]
simple_switch_left_east = transitions.rotate_transition(simple_switch_north_left, 90)
horizontal_straight = transitions.rotate_transition(vertical_straight, 90)
double_switch_south_horizontal_straight = horizontal_straight + cells[6]
double_switch_north_horizontal_straight = transitions.rotate_transition(
double_switch_south_horizontal_straight, 180)
rail_map = np.array(
[[empty] * 3 + [right_turn_from_south] + [horizontal_straight] * 5 + [right_turn_from_west]] +
[[empty] * 3 + [vertical_straight] + [empty] * 5 + [vertical_straight]] * 2 +
[[dead_end_from_east] + [horizontal_straight] * 2 + [simple_switch_left_east] + [horizontal_straight] * 2 + [
right_turn_from_west] + [empty] * 2 + [vertical_straight]] +
[[empty] * 6 + [simple_switch_north_right] + [horizontal_straight] * 2 + [right_turn_from_north]] +
[[empty] * 6 + [vertical_straight] + [empty] * 3] +
[[empty] * 6 + [dead_end_from_north] + [empty] * 3], dtype=np.uint16)
rail = GridTransitionMap(width=rail_map.shape[1],
height=rail_map.shape[0], transitions=transitions)
rail.grid = rail_map
city_positions = [(0, 3), (6, 6)]
train_stations = [
[((0, 3), 0)],
[((6, 6), 0)],
]
city_orientations = [0, 2]
agents_hints = {'city_positions': city_positions,
'train_stations': train_stations,
'city_orientations': city_orientations
}
optionals = {'agents_hints': agents_hints}
return rail, rail_map, optionals
return generator
def create_env():
rail, rail_map, optionals = custom_rail_map()
env = RailEnv(width=rail_map.shape[1],
height=rail_map.shape[0],
rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(),
number_of_agents=2,
obs_builder_object=DummyObservationBuilder(),
)
return env
env = RailEnv(width=6,
height=4,
rail_generator=custom_rail_generator(),
number_of_agents=1)
env.reset()
def custom_railmap_example(sleep_for_animation, do_rendering):
random.seed(100)
np.random.seed(100)
env_renderer = RenderTool(env)
env_renderer.renderEnv(show=True)
env = create_env()
env.reset()
input("Press Enter to continue...")
if do_rendering:
env_renderer = RenderTool(env)
env_renderer.render_env(show=True, show_observations=False)
env_renderer.close_window()
if sleep_for_animation:
time.sleep(1)
# uncomment to keep the renderer open
# input("Press Enter to continue...")
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
custom_railmap_example(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
import os
import random
import time
import numpy as np
from flatland.envs.generators import complex_rail_generator
from flatland.envs.generators import random_rail_generator
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
# ensure that every demo run behave constantly equal
random.seed(1)
np.random.seed(1)
__file_dirname__ = os.path.dirname(os.path.realpath(__file__))
class Scenario_Generator:
@staticmethod
def generate_random_scenario(number_of_agents=3):
# Example generate a rail given a manual specification,
# a map of tuples (cell_type, rotation)
transition_probability = [15, # empty cell - Case 0
5, # Case 1 - straight
5, # Case 2 - simple switch
1, # Case 3 - diamond crossing
1, # Case 4 - single slip
1, # Case 5 - double slip
1, # Case 6 - symmetrical
0, # Case 7 - dead end
1, # Case 1b (8) - simple turn right
1, # Case 1c (9) - simple turn left
1] # Case 2b (10) - simple switch mirrored
# Example generate a random rail
env = RailEnv(width=20,
height=20,
rail_generator=random_rail_generator(cell_type_relative_proportion=transition_probability),
number_of_agents=number_of_agents)
return env
@staticmethod
def generate_complex_scenario(number_of_agents=3):
env = RailEnv(width=15,
height=15,
rail_generator=complex_rail_generator(nr_start_goal=6, nr_extra=30, min_dist=10,
max_dist=99999, seed=0),
number_of_agents=number_of_agents)
return env
@staticmethod
def load_scenario(resource, package='env_data.railway', number_of_agents=3):
env = RailEnv(width=2 * (1 + number_of_agents),
height=1 + number_of_agents)
env.load_resource(package, resource)
env.reset(False, False)
return env
class Demo:
def __init__(self, env):
self.env = env
self.create_renderer()
self.action_size = 4
self.max_frame_rate = 60
self.record_frames = None
def set_record_frames(self, record_frames):
self.record_frames = record_frames
def create_renderer(self):
self.renderer = RenderTool(self.env)
handle = self.env.get_agent_handles()
return handle
def set_max_framerate(self, max_frame_rate):
self.max_frame_rate = max_frame_rate
def run_demo(self, max_nbr_of_steps=30):
action_dict = dict()
# Reset environment
_ = self.env.reset(False, False)
time.sleep(0.0001) # to satisfy lint...
for step in range(max_nbr_of_steps):
# Action
for iAgent in range(self.env.get_num_agents()):
# allways walk straight forward
action = 2
action = np.random.choice([0, 1, 2, 3], 1, p=[0.0, 0.5, 0.5, 0.0])[0]
# update the actions
action_dict.update({iAgent: action})
# render
self.renderer.renderEnv(show=True, show_observations=False)
# environment step (apply the actions to all agents)
next_obs, all_rewards, done, _ = self.env.step(action_dict)
if done['__all__']:
break
if self.record_frames is not None:
self.renderer.gl.saveImage(self.record_frames.format(step))
self.renderer.close_window()
@staticmethod
def run_generate_random_scenario():
demo_000 = Demo(Scenario_Generator.generate_random_scenario())
demo_000.run_demo()
@staticmethod
def run_generate_complex_scenario():
demo_001 = Demo(Scenario_Generator.generate_complex_scenario())
demo_001.run_demo()
@staticmethod
def run_example_network_000():
demo_000 = Demo(Scenario_Generator.load_scenario('example_network_000.pkl'))
demo_000.run_demo()
@staticmethod
def run_example_network_001():
demo_001 = Demo(Scenario_Generator.load_scenario('example_network_001.pkl'))
demo_001.run_demo()
@staticmethod
def run_example_network_002():
demo_002 = Demo(Scenario_Generator.load_scenario('example_network_002.pkl'))
demo_002.run_demo()
@staticmethod
def run_example_network_003():
demo_flatland_000 = Demo(Scenario_Generator.load_scenario('example_network_003.pkl'))
demo_flatland_000.renderer.resize()
demo_flatland_000.set_max_framerate(5)
demo_flatland_000.run_demo(30)
@staticmethod
def run_example_flatland_000():
demo_flatland_000 = Demo(Scenario_Generator.load_scenario('example_flatland_000.pkl'))
demo_flatland_000.renderer.resize()
demo_flatland_000.set_max_framerate(5)
demo_flatland_000.run_demo(60)
@staticmethod
def run_example_flatland_001():
demo_flatland_000 = Demo(Scenario_Generator.load_scenario('example_flatland_001.pkl'))
demo_flatland_000.renderer.resize()
demo_flatland_000.set_max_framerate(5)
demo_flatland_000.set_record_frames(os.path.join(__file_dirname__, '..', 'rendering', 'frame_{:04d}.bmp'))
demo_flatland_000.run_demo(60)
@staticmethod
def run_complex_scene():
demo_001 = Demo(Scenario_Generator.load_scenario('complex_scene.pkl'))
demo_001.set_record_frames(os.path.join(__file_dirname__, '..', 'rendering', 'frame_{:04d}.bmp'))
demo_001.run_demo(120)
@staticmethod
def run_basic_elements_test():
demo_001 = Demo(Scenario_Generator.load_scenario('basic_elements_test.pkl'))
demo_001.run_demo(120)
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_basic_elements_test()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_flatland_000()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_flatland_001()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_network_000()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_network_001()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_network_002()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_network_003()
import getopt
import sys
import time
import numpy as np
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.malfunction_generators import MalfunctionParameters
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.persistence import RailEnvPersister
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.misc import str2bool
from flatland.utils.rendertools import RenderTool, AgentRenderVariant
# Import your own Agent or use RLlib to train agents on Flatland
# As an example we use a random agent instead
class RandomAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
def act(self, state):
"""
:param state: input is the observation of the agent
:return: returns an action
"""
return 2 # np.random.choice(np.arange(self.action_size))
def step(self, memories):
"""
Step function to improve agent by adjusting policy given the observations
:param memories: SARS Tuple to be
:return:
"""
return
def save(self, filename):
# Store the current policy
return
def load(self, filename):
# Load a policy
return
def create_env():
# Use the new sparse_rail_generator to generate feasible network configurations with corresponding tasks
# Training on simple small tasks is the best way to get familiar with the environment
# Use a the malfunction generator to break agents from time to time
stochastic_data = MalfunctionParameters(malfunction_rate=30, # Rate of malfunction occurence
min_duration=3, # Minimal duration of malfunction
max_duration=20 # Max duration of malfunction
)
# Custom observation builder
TreeObservation = TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv())
nAgents = 3
n_cities = 2
max_rails_between_cities = 2
max_rails_in_city = 4
seed = 0
env = RailEnv(
width=20,
height=30,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=TreeObsForRailEnv(max_depth=3, predictor=ShortestPathPredictorForRailEnv())
)
return env
def flatland_3_0_example(sleep_for_animation, do_rendering):
np.random.seed(1)
env = create_env()
env.reset()
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env, gl="PILSVG",
agent_render_variant=AgentRenderVariant.AGENT_SHOWS_OPTIONS_AND_BOX,
show_debug=True,
screen_height=1000,
screen_width=1000)
# Initialize the agent with the parameters corresponding to the environment and observation_builder
# Set action space to 4 to remove stop action
agent = RandomAgent(218, 4)
# Empty dictionary for all agent action
action_dict = dict()
print("Start episode...")
# Reset environment and get initial observations for all agents
start_reset = time.time()
obs, info = env.reset()
end_reset = time.time()
print(end_reset - start_reset)
print(env.get_num_agents(), )
# Reset the rendering sytem
if env_renderer is not None:
env_renderer.reset()
# Here you can also further enhance the provided observation by means of normalization
# See training navigation example in the baseline repository
score = 0
# Run episode
frame_step = 0
for step in range(500):
# Chose an action for each agent in the environment
for a in range(env.get_num_agents()):
action = agent.act(obs[a])
action_dict.update({a: action})
# Environment step which returns the observations for all agents, their corresponding
# reward and whether their are done
next_obs, all_rewards, done, _ = env.step(action_dict)
if env_renderer is not None:
env_renderer.render_env(show=True, show_observations=False, show_predictions=False)
frame_step += 1
# Update replay buffer and train agent
for a in range(env.get_num_agents()):
agent.step((obs[a], action_dict[a], all_rewards[a], next_obs[a], done[a]))
score += all_rewards[a]
obs = next_obs.copy()
if done['__all__']:
break
if env_renderer is not None:
env_renderer.close_window()
print('Episode: Steps {}\t Score = {}'.format(step, score))
RailEnvPersister.save(env, "saved_episode_2.pkl")
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
flatland_3_0_example(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
import cProfile
import pstats
import numpy as np
from flatland.core.env_observation_builder import DummyObservationBuilder
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.malfunction_generators import MalfunctionParameters, ParamMalfunctionGen
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.rendertools import RenderTool, AgentRenderVariant
class RandomAgent:
def __init__(self, action_size):
self.action_size = action_size
def act(self, state):
"""
:param state: input is the observation of the agent
:return: returns an action
"""
return np.random.choice(np.arange(self.action_size))
def get_rail_env(nAgents=70, use_dummy_obs=False, width=300, height=300):
# Rail Generator:
num_cities = 5 # Number of cities to place on the map
seed = 1 # Random seed
max_rails_between_cities = 2 # Maximum number of rails connecting 2 cities
max_rail_pairs_in_cities = 2 # Maximum number of pairs of tracks within a city
# Even tracks are used as start points, odd tracks are used as endpoints)
rail_generator = sparse_rail_generator(
max_num_cities=num_cities,
seed=seed,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rail_pairs_in_cities,
)
# Line Generator
# sparse_line_generator accepts a dictionary which maps speeds to probabilities.
# Different agent types (trains) with different speeds.
speed_probability_map = {
1.: 0.25, # Fast passenger train
1. / 2.: 0.25, # Fast freight train
1. / 3.: 0.25, # Slow commuter train
1. / 4.: 0.25 # Slow freight train
}
line_generator = sparse_line_generator(speed_probability_map)
# Malfunction Generator:
stochastic_data = MalfunctionParameters(
malfunction_rate=1 / 10000, # Rate of malfunction occurence
min_duration=15, # Minimal duration of malfunction
max_duration=50 # Max duration of malfunction
)
malfunction_generator = ParamMalfunctionGen(stochastic_data)
# Observation Builder
# tree observation returns a tree of possible paths from the current position.
max_depth = 3 # Max depth of the tree
predictor = ShortestPathPredictorForRailEnv(
max_depth=50) # (Specific to Tree Observation - read code)
observation_builder = TreeObsForRailEnv(
max_depth=max_depth,
predictor=predictor
)
if use_dummy_obs:
observation_builder = DummyObservationBuilder()
number_of_agents = nAgents # Number of trains to create
seed = 1 # Random seed
env = RailEnv(
width=width,
height=height,
rail_generator=rail_generator,
line_generator=line_generator,
number_of_agents=number_of_agents,
random_seed=seed,
obs_builder_object=observation_builder,
malfunction_generator=malfunction_generator
)
return env
def run_simulation(env_fast: RailEnv, do_rendering):
agent = RandomAgent(action_size=5)
max_steps = 200
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env_fast,
gl="PGL",
show_debug=True,
agent_render_variant=AgentRenderVariant.AGENT_SHOWS_OPTIONS)
env_renderer.set_new_rail()
env_renderer.reset()
for step in range(max_steps):
# Chose an action for each agent in the environment
for handle in range(env_fast.get_num_agents()):
action = agent.act(handle)
action_dict.update({handle: action})
next_obs, all_rewards, done, _ = env_fast.step(action_dict)
if env_renderer is not None:
env_renderer.render_env(
show=True,
frames=False,
show_observations=True,
show_predictions=False
)
if env_renderer is not None:
env_renderer.close_window()
USE_PROFILER = True
PROFILE_CREATE = False
PROFILE_RESET = False
PROFILE_STEP = True
PROFILE_OBSERVATION = False
RUN_SIMULATION = False
DO_RENDERING = False
if __name__ == "__main__":
print("Start ...")
if USE_PROFILER:
profiler = cProfile.Profile()
print("Create env ... ")
if PROFILE_CREATE:
profiler.enable()
env_fast = get_rail_env(nAgents=200, use_dummy_obs=False, width=100, height=100)
if PROFILE_CREATE:
profiler.disable()
print("Reset env ... ")
if PROFILE_RESET:
profiler.enable()
env_fast.reset(random_seed=1)
if PROFILE_RESET:
profiler.disable()
print("Make actions ... ")
action_dict = {agent.handle: 0 for agent in env_fast.agents}
print("Step env ... ")
if PROFILE_STEP:
profiler.enable()
for i in range(1):
env_fast.step(action_dict)
if PROFILE_STEP:
profiler.disable()
if PROFILE_OBSERVATION:
profiler.enable()
print("get observation ... ")
obs = env_fast._get_observations()
if PROFILE_OBSERVATION:
profiler.disable()
if USE_PROFILER:
if False:
print("---- tottime")
stats = pstats.Stats(profiler).sort_stats('tottime') # ncalls, 'cumtime'...
stats.print_stats(20)
if True:
print("---- cumtime")
stats = pstats.Stats(profiler).sort_stats('cumtime') # ncalls, 'cumtime'...
stats.print_stats(200)
if False:
print("---- ncalls")
stats = pstats.Stats(profiler).sort_stats('ncalls') # ncalls, 'cumtime'...
stats.print_stats(200)
print("... end ")
if RUN_SIMULATION:
run_simulation(env_fast, DO_RENDERING)
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_generate_complex_scenario()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_generate_random_scenario()
import os
import numpy as np
from flatland.envs.line_generators import sparse_line_generator
# In Flatland you can use custom observation builders and predicitors
# Observation builders generate the observation needed by the controller
# Preditctors can be used to do short time prediction which can help in avoiding conflicts in the network
from flatland.envs.malfunction_generators import MalfunctionParameters, ParamMalfunctionGen
from flatland.envs.observations import GlobalObsForRailEnv
# First of all we import the Flatland rail environment
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_env import RailEnvActions
from flatland.envs.rail_generators import sparse_rail_generator
# We also include a renderer because we want to visualize what is going on in the environment
from flatland.utils.rendertools import RenderTool, AgentRenderVariant
# This is an introduction example for the Flatland 2.1.* version.
# Changes and highlights of this version include
# - Stochastic events (malfunctions)
# - Different travel speeds for differet agents
# - Levels are generated using a novel generator to reflect more realistic railway networks
# - Agents start outside of the environment and enter at their own time
# - Agents leave the environment after they have reached their goal
# Use the new sparse_rail_generator to generate feasible network configurations with corresponding tasks
# Training on simple small tasks is the best way to get familiar with the environment
# We start by importing the necessary rail and schedule generators
# The rail generator will generate the railway infrastructure
# The schedule generator will assign tasks to all the agent within the railway network
# The railway infrastructure can be build using any of the provided generators in env/rail_generators.py
# Here we use the sparse_rail_generator with the following parameters
DO_RENDERING = False
width = 16 * 7 # With of map
height = 9 * 7 # Height of map
nr_trains = 50 # Number of trains that have an assigned task in the env
cities_in_map = 20 # Number of cities where agents can start or end
seed = 14 # Random seed
grid_distribution_of_cities = False # Type of city distribution, if False cities are randomly placed
max_rails_between_cities = 2 # Max number of tracks allowed between cities. This is number of entry point to a city
max_rail_in_cities = 6 # Max number of parallel tracks within a city, representing a realistic trainstation
rail_generator = sparse_rail_generator(max_num_cities=cities_in_map,
seed=seed,
grid_mode=grid_distribution_of_cities,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rail_in_cities,
)
# rail_generator = SparseRailGen(max_num_cities=cities_in_map,
# seed=seed,
# grid_mode=grid_distribution_of_cities,
# max_rails_between_cities=max_rails_between_cities,
# max_rails_in_city=max_rail_in_cities,
# )
# The schedule generator can make very basic schedules with a start point, end point and a speed profile for each agent.
# The speed profiles can be adjusted directly as well as shown later on. We start by introducing a statistical
# distribution of speed profiles
# Different agent types (trains) with different speeds.
speed_ration_map = {1.: 0.25, # Fast passenger train
1. / 2.: 0.25, # Fast freight train
1. / 3.: 0.25, # Slow commuter train
1. / 4.: 0.25} # Slow freight train
# We can now initiate the schedule generator with the given speed profiles
line_generator = sparse_line_generator(speed_ration_map)
# We can furthermore pass stochastic data to the RailEnv constructor which will allow for stochastic malfunctions
# during an episode.
stochastic_data = MalfunctionParameters(malfunction_rate=1 / 10000, # Rate of malfunction occurence
min_duration=15, # Minimal duration of malfunction
max_duration=50 # Max duration of malfunction
)
# Custom observation builder without predictor
observation_builder = GlobalObsForRailEnv()
# Custom observation builder with predictor, uncomment line below if you want to try this one
# observation_builder = TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv())
# Construct the enviornment with the given observation, generataors, predictors, and stochastic data
env = RailEnv(width=width,
height=height,
rail_generator=rail_generator,
line_generator=line_generator,
number_of_agents=nr_trains,
obs_builder_object=observation_builder,
malfunction_generator=ParamMalfunctionGen(stochastic_data),
remove_agents_at_target=True)
env.reset()
# Initiate the renderer
env_renderer = None
if DO_RENDERING:
env_renderer = RenderTool(env,
agent_render_variant=AgentRenderVariant.ONE_STEP_BEHIND,
show_debug=False,
screen_height=600, # Adjust these parameters to fit your resolution
screen_width=800) # Adjust these parameters to fit your resolution
# The first thing we notice is that some agents don't have feasible paths to their target.
# We first look at the map we have created
# nv_renderer.render_env(show=True)
# time.sleep(2)
# Import your own Agent or use RLlib to train agents on Flatland
# As an example we use a random agent instead
class RandomAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
def act(self, state):
"""
:param state: input is the observation of the agent
:return: returns an action
"""
return np.random.choice([RailEnvActions.MOVE_FORWARD, RailEnvActions.MOVE_RIGHT, RailEnvActions.MOVE_LEFT,
RailEnvActions.STOP_MOVING])
def step(self, memories):
"""
Step function to improve agent by adjusting policy given the observations
:param memories: SARS Tuple to be
:return:
"""
return
def save(self, filename):
# Store the current policy
return
def load(self, filename):
# Load a policy
return
# Initialize the agent with the parameters corresponding to the environment and observation_builder
controller = RandomAgent(218, env.action_space[0])
# We start by looking at the information of each agent
# We can see the task assigned to the agent by looking at
print("\n Agents in the environment have to solve the following tasks: \n")
for agent_idx, agent in enumerate(env.agents):
print(
"The agent with index {} has the task to go from its initial position {}, facing in the direction {} to its target at {}.".format(
agent_idx, agent.initial_position, agent.direction, agent.target))
# The agent will always have a status indicating if it is currently present in the environment or done or active
# For example we see that agent with index 0 is currently not active
print("\n Their current statuses are:")
print("============================")
for agent_idx, agent in enumerate(env.agents):
print("Agent {} status is: {} with its current position being {}".format(agent_idx, str(agent.state),
str(agent.position)))
# The agent needs to take any action [1,2,3] except do_nothing or stop to enter the level
# If the starting cell is free they will enter the level
# If multiple agents want to enter the same cell at the same time the lower index agent will enter first.
# Let's check if there are any agents with the same start location
agents_with_same_start = set()
print("\n The following agents have the same initial position:")
print("=====================================================")
for agent_idx, agent in enumerate(env.agents):
for agent_2_idx, agent2 in enumerate(env.agents):
if agent_idx != agent_2_idx and agent.initial_position == agent2.initial_position:
print("Agent {} as the same initial position as agent {}".format(agent_idx, agent_2_idx))
agents_with_same_start.add(agent_idx)
# Lets try to enter with all of these agents at the same time
action_dict = dict()
for agent_id in agents_with_same_start:
action_dict[agent_id] = 1 # Try to move with the agents
# Do a step in the environment to see what agents entered:
env.step(action_dict)
# Current state and position of the agents after all agents with same start position tried to move
print("\n This happened when all tried to enter at the same time:")
print("========================================================")
for agent_id in agents_with_same_start:
print(
"Agent {} status is: {} with the current position being {}.".format(
agent_id, str(env.agents[agent_id].state),
str(env.agents[agent_id].position)))
# As you see only the agents with lower indexes moved. As soon as the cell is free again the agents can attempt
# to start again.
# You will also notice, that the agents move at different speeds once they are on the rail.
# The agents will always move at full speed when moving, never a speed inbetween.
# The fastest an agent can go is 1, meaning that it moves to the next cell at every time step
# All slower speeds indicate the fraction of a cell that is moved at each time step
# Lets look at the current speed data of the agents:
print("\n The speed information of the agents are:")
print("=========================================")
for agent_idx, agent in enumerate(env.agents):
print(
"Agent {} speed is: {:.2f} with the current fractional position being {}/{}".format(
agent_idx, agent.speed_counter.speed, agent.speed_counter.counter, agent.speed_counter.max_count))
# New the agents can also have stochastic malfunctions happening which will lead to them being unable to move
# for a certain amount of time steps. The malfunction data of the agents can easily be accessed as follows
print("\n The malfunction data of the agents are:")
print("========================================")
for agent_idx, agent in enumerate(env.agents):
print(
"Agent {} is OK = {}".format(
agent_idx, agent.malfunction_handler.in_malfunction))
# Now that you have seen these novel concepts that were introduced you will realize that agents don't need to take
# an action at every time step as it will only change the outcome when actions are chosen at cell entry.
# Therefore the environment provides information about what agents need to provide an action in the next step.
# You can access this in the following way.
# Chose an action for each agent
for a in range(env.get_num_agents()):
action = controller.act(0)
action_dict.update({a: action})
# Do the environment step
observations, rewards, dones, information = env.step(action_dict)
print("\n The following agents can register an action:")
print("========================================")
for info in information['action_required']:
print("Agent {} needs to submit an action.".format(info))
# We recommend that you monitor the malfunction data and the action required in order to optimize your training
# and controlling code.
# Let us now look at an episode playing out with random actions performed
print("\nStart episode...")
# Reset the rendering system
if env_renderer is not None:
env_renderer.reset()
# Here you can also further enhance the provided observation by means of normalization
# See training navigation example in the baseline repository
score = 0
# Run episode
frame_step = 0
os.makedirs("tmp/frames", exist_ok=True)
for step in range(200):
# Chose an action for each agent in the environment
for a in range(env.get_num_agents()):
action = controller.act(observations[a])
action_dict.update({a: action})
# Environment step which returns the observations for all agents, their corresponding
# reward and whether their are done
next_obs, all_rewards, done, _ = env.step(action_dict)
if env_renderer is not None:
env_renderer.render_env(show=True, show_observations=False, show_predictions=False)
env_renderer.gl.save_image('tmp/frames/flatland_frame_{:04d}.png'.format(step))
frame_step += 1
# Update replay buffer and train agent
for a in range(env.get_num_agents()):
controller.step((observations[a], action_dict[a], all_rewards[a], next_obs[a], done[a]))
score += all_rewards[a]
observations = next_obs.copy()
if done['__all__']:
break
print('Episode: Steps {}\t Score = {}'.format(step, score))
# close the renderer / rendering window
if env_renderer is not None:
env_renderer.close_window()
# Making Videos from Env
In order to generate Videos or gifs, it is easiest to generate image files and then run ffmpeg to generate a video.
## 1. Generating Images from Env
Start by importing the render and instantiating it
```
from flatland.utils.rendertools import RenderTool
env_renderer = RenderTool(env, gl="PILSVG", )
```
If the environment changes don't forget to reset the renderer
```
env_renderer.reset()
```
You can now record an image after every step. It is best to use a format similar to the one below, where `frame_step` is counting the number of steps.
```
env_renderer.gl.save_image("./Images/Avoiding/flatland_frame_{:04d}.bmp".format(frame_step))
```
Once the images have been saved to the folder you can run a shell from that folder and run the following commands.
Generate a mp4 out of the images:
```
ffmpeg -y -framerate 12 -i flatland_frame_%04d.bmp -hide_banner -c:v libx264 -pix_fmt yuv420p test.mp4
```
Generate a palette out of the video necessary to generate beautiful gifs:
```
ffmpeg -i test.mp4 -filter_complex "[0:v] palettegen" palette.png
```
and finaly generate the gif
```
ffmpeg -i test.mp4 -i palette.png -filter_complex "[0:v][1:v] paletteuse" single_agent_navigation.gif
```
import random
import time
from collections import deque
import numpy as np
from flatland.envs.generators import complex_rail_generator
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
class Player(object):
def __init__(self, env):
self.env = env
self.handle = env.get_agent_handles()
self.state_size = 105
self.action_size = 4
self.n_trials = 9999
self.eps = 1.
self.eps_end = 0.005
self.eps_decay = 0.998
self.action_dict = dict()
self.scores_window = deque(maxlen=100)
self.done_window = deque(maxlen=100)
self.scores = []
self.dones_list = []
self.action_prob = [0] * 4
# Removing refs to a real agent for now.
self.iFrame = 0
self.tStart = time.time()
# Reset environment
self.env.obs_builder.reset()
self.obs = self.env._get_observations()
for envAgent in range(self.env.get_num_agents()):
norm = max(1, max_lt(self.obs[envAgent], np.inf))
self.obs[envAgent] = np.clip(np.array(self.obs[envAgent]) / norm, -1, 1)
self.score = 0
self.env_done = 0
def reset(self):
self.obs = self.env.reset()
return self.obs
def step(self):
env = self.env
# Pass the (stored) observation to the agent network and retrieve the action
for handle in env.get_agent_handles():
# Random actions
action = np.random.choice([0, 1, 2, 3], 1, p=[0.2, 0.1, 0.6, 0.1])[0]
# Numpy version uses single random sequence
self.action_prob[action] += 1
self.action_dict.update({handle: action})
# Environment step - pass the agent actions to the environment,
# retrieve the response - observations, rewards, dones
next_obs, all_rewards, done, _ = self.env.step(self.action_dict)
for handle in env.get_agent_handles():
norm = max(1, max_lt(next_obs[handle], np.inf))
next_obs[handle] = np.clip(np.array(next_obs[handle]) / norm, -1, 1)
# Update replay buffer and train agent
if False:
for handle in self.env.get_agent_handles():
self.agent.step(self.obs[handle], self.action_dict[handle],
all_rewards[handle], next_obs[handle], done[handle],
train=False)
self.score += all_rewards[handle]
self.iFrame += 1
self.obs = next_obs.copy()
if done['__all__']:
self.env_done = 1
def max_lt(seq, val):
"""
Return greatest item in seq for which item < val applies.
None is returned if seq was empty or all items in seq were >= val.
"""
idx = len(seq) - 1
while idx >= 0:
if seq[idx] < val and seq[idx] >= 0:
return seq[idx]
idx -= 1
return None
def main(render=True, delay=0.0, n_trials=3, n_steps=50):
random.seed(1)
np.random.seed(1)
# Example generate a random rail
env = RailEnv(width=15, height=15,
rail_generator=complex_rail_generator(nr_start_goal=5, nr_extra=20, min_dist=12),
number_of_agents=5)
if render:
env_renderer = RenderTool(env)
oPlayer = Player(env)
for trials in range(1, n_trials + 1):
# Reset environment
oPlayer.reset()
env_renderer.set_new_rail()
# Run episode
for step in range(n_steps):
oPlayer.step()
if render:
env_renderer.renderEnv(show=True, frames=True, iEpisode=trials, iStep=step)
if delay > 0:
time.sleep(delay)
env_renderer.gl.close_window()
if __name__ == "__main__":
main(render=True, delay=0)
from flatland.envs.generators import rail_from_manual_specifications_generator
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
# Example generate a rail given a manual specification,
# a map of tuples (cell_type, rotation)
specs = [[(0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0)],
[(0, 0), (0, 0), (0, 0), (0, 0), (7, 0), (0, 0)],
[(7, 270), (1, 90), (1, 90), (1, 90), (2, 90), (7, 90)],
[(0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0)]]
env = RailEnv(width=6,
height=4,
rail_generator=rail_from_manual_specifications_generator(specs),
number_of_agents=1,
obs_builder_object=TreeObsForRailEnv(max_depth=2))
env.reset()
env_renderer = RenderTool(env)
env_renderer.renderEnv(show=True)
env_renderer.renderEnv(show=True)
input("Press Enter to continue...")
import random
import numpy as np
from flatland.envs.generators import random_rail_generator
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
# Relative weights of each cell type to be used by the random rail generators.
transition_probability = [1.0, # empty cell - Case 0
1.0, # Case 1 - straight
1.0, # Case 2 - simple switch
0.3, # Case 3 - diamond drossing
0.5, # Case 4 - single slip
0.5, # Case 5 - double slip
0.2, # Case 6 - symmetrical
0.0, # Case 7 - dead end
0.2, # Case 8 - turn left
0.2, # Case 9 - turn right
1.0] # Case 10 - mirrored switch
# Example generate a random rail
env = RailEnv(width=10,
height=10,
rail_generator=random_rail_generator(cell_type_relative_proportion=transition_probability),
number_of_agents=3,
obs_builder_object=TreeObsForRailEnv(max_depth=2))
env.reset()
env_renderer = RenderTool(env, gl="PIL")
env_renderer.renderEnv(show=True)
env_renderer.renderEnv(show=True)
input("Press Enter to continue...")