Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • flatland/flatland
  • stefan_otte/flatland
  • jiaodaxiaozi/flatland
  • sfwatergit/flatland
  • utozx126/flatland
  • ChenKuanSun/flatland
  • ashivani/flatland
  • minhhoa/flatland
  • pranjal_dhole/flatland
  • darthgera123/flatland
  • rivesunder/flatland
  • thomaslecat/flatland
  • joel_joseph/flatland
  • kchour/flatland
  • alex_zharichenko/flatland
  • yoogottamk/flatland
  • troye_fang/flatland
  • elrichgro/flatland
  • jun_jin/flatland
  • nimishsantosh107/flatland
20 results
Show changes
Showing
with 393 additions and 148 deletions
File added
File added
File added
File added
File added
test_id,env_id,n_agents,x_dim,y_dim,n_cities,max_rail_pairs_in_city,n_envs_run,seed,grid_mode,max_rails_between_cities,malfunction_duration_min,malfunction_duration_max,malfunction_interval,speed_ratios
Test_0,Level_0,7,30,30,2,2,10,335971,False,2,20,50,540,"{1.0: 0.25, 0.5: 0.25, 0.33: 0.25, 0.25: 0.25}"
Test_0,Level_1,7,30,30,2,2,10,335972,False,2,20,50,540,"{1.0: 0.25, 0.5: 0.25, 0.33: 0.25, 0.25: 0.25}"
File moved
File added
File added
File added
from flatland.envs.generators import rail_from_manual_specifications_generator
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
# Example generate a rail given a manual specification,
# a map of tuples (cell_type, rotation)
specs = [[(0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0)],
[(0, 0), (0, 0), (0, 0), (0, 0), (7, 0), (0, 0)],
[(7, 270), (1, 90), (1, 90), (1, 90), (2, 90), (7, 90)],
[(0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0)]]
env = RailEnv(width=6,
height=4,
rail_generator=rail_from_manual_specifications_generator(specs),
number_of_agents=1,
obs_builder_object=TreeObsForRailEnv(max_depth=2))
env.reset()
env_renderer = RenderTool(env, gl="PILSVG")
env_renderer.renderEnv(show=True)
input("Press Enter to continue...")
import random
import numpy as np
from flatland.envs.generators import random_rail_generator # , rail_from_list_of_saved_GridTransitionMap_generator
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
# Relative weights of each cell type to be used by the random rail generators.
transition_probability = [1.0, # empty cell - Case 0
1.0, # Case 1 - straight
1.0, # Case 2 - simple switch
0.3, # Case 3 - diamond drossing
0.5, # Case 4 - single slip
0.5, # Case 5 - double slip
0.2, # Case 6 - symmetrical
0.0, # Case 7 - dead end
0.2, # Case 8 - turn left
0.2, # Case 9 - turn right
1.0] # Case 10 - mirrored switch
# Example generate a random rail
env = RailEnv(width=10,
height=10,
rail_generator=random_rail_generator(cell_type_relative_proportion=transition_probability),
number_of_agents=3,
obs_builder_object=TreeObsForRailEnv(max_depth=2))
env.reset()
env_renderer = RenderTool(env, gl="PILSVG")
env_renderer.renderEnv(show=True)
input("Press Enter to continue...")
import random
import numpy as np
from flatland.envs.generators import random_rail_generator
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
random.seed(10)
np.random.seed(10)
env = RailEnv(width=7,
height=7,
rail_generator=random_rail_generator(),
number_of_agents=2,
obs_builder_object=TreeObsForRailEnv(max_depth=2))
# Print the observation vector for agent 0
obs, all_rewards, done, _ = env.step({0: 0})
for i in range(env.get_num_agents()):
env.obs_builder.util_print_obs_subtree(tree=obs[i], num_features_per_node=5)
env_renderer = RenderTool(env, gl="PIL")
env_renderer.renderEnv(show=True, frames=True)
print("Manual control: s=perform step, q=quit, [agent id] [1-2-3 action] \
(turnleft+move, move to front, turnright+move)")
for step in range(100):
cmd = input(">> ")
cmds = cmd.split(" ")
action_dict = {}
i = 0
while i < len(cmds):
if cmds[i] == 'q':
import sys
sys.exit()
elif cmds[i] == 's':
obs, all_rewards, done, _ = env.step(action_dict)
action_dict = {}
print("Rewards: ", all_rewards, " [done=", done, "]")
else:
agent_id = int(cmds[i])
action = int(cmds[i + 1])
action_dict[agent_id] = action
i = i + 1
i += 1
env_renderer.renderEnv(show=True, frames=True)
import random
import numpy as np
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.envs.generators import random_rail_generator
from flatland.envs.rail_env import RailEnv
random.seed(100)
np.random.seed(100)
class CustomObs(ObservationBuilder):
def __init__(self):
self.observation_space = [5]
def reset(self):
return
def get(self, handle):
observation = handle * np.ones((5,))
return observation
env = RailEnv(width=7,
height=7,
rail_generator=random_rail_generator(),
number_of_agents=3,
obs_builder_object=CustomObs())
# Print the observation vector for each agents
obs, all_rewards, done, _ = env.step({0: 0})
for i in range(env.get_num_agents()):
print("Agent ", i, "'s observation: ", obs[i])
import random
import numpy as np
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
random.seed(100)
np.random.seed(100)
class SimpleObs(ObservationBuilder):
"""
Simplest observation builder. The object returns observation vectors with 5 identical components,
all equal to the ID of the respective agent.
"""
def __init__(self):
super().__init__()
def reset(self):
return
def get(self, handle: int = 0) -> np.ndarray:
observation = handle * np.ones((5,))
return observation
def create_env():
nAgents = 3
n_cities = 2
max_rails_between_cities = 2
max_rails_in_city = 4
seed = 0
env = RailEnv(
width=20,
height=30,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=SimpleObs()
)
return env
def main():
env = create_env()
env.reset()
# Print the observation vector for each agents
obs, all_rewards, done, _ = env.step({0: 0})
for i in range(env.get_num_agents()):
print("Agent ", i, "'s observation: ", obs[i])
if __name__ == '__main__':
main()
import getopt
import random
import sys
import time
from typing import List
import numpy as np
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.core.grid.grid4_utils import get_new_position
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.misc import str2bool
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
class SingleAgentNavigationObs(ObservationBuilder):
"""
We build a representation vector with 3 binary components, indicating which of the 3 available directions
for each agent (Left, Forward, Right) lead to the shortest path to its target.
E.g., if taking the Left branch (if available) is the shortest route to the agent's target, the observation vector
will be [1, 0, 0].
"""
def __init__(self):
super().__init__()
def reset(self):
pass
def get(self, handle: int = 0) -> List[int]:
agent = self.env.agents[handle]
if agent.position:
possible_transitions = self.env.rail.get_transitions(*agent.position, agent.direction)
else:
possible_transitions = self.env.rail.get_transitions(*agent.initial_position, agent.direction)
num_transitions = np.count_nonzero(possible_transitions)
# Start from the current orientation, and see which transitions are available;
# organize them as [left, forward, right], relative to the current orientation
# If only one transition is possible, the forward branch is aligned with it.
if num_transitions == 1:
observation = [0, 1, 0]
else:
min_distances = []
for direction in [(agent.direction + i) % 4 for i in range(-1, 2)]:
if possible_transitions[direction]:
new_position = get_new_position(agent.position, direction)
min_distances.append(
self.env.distance_map.get()[handle, new_position[0], new_position[1], direction])
else:
min_distances.append(np.inf)
observation = [0, 0, 0]
observation[np.argmin(min_distances)] = 1
return observation
def create_env():
nAgents = 1
n_cities = 2
max_rails_between_cities = 2
max_rails_in_city = 4
seed = 0
env = RailEnv(
width=30,
height=40,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=SingleAgentNavigationObs()
)
return env
def custom_observation_example_02_SingleAgentNavigationObs(sleep_for_animation, do_rendering):
env = create_env()
obs, info = env.reset()
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env)
env_renderer.render_env(show=True, frames=True, show_observations=False)
for step in range(100):
action = np.argmax(obs[0]) + 1
obs, all_rewards, done, _ = env.step({0: action})
print("Rewards: ", all_rewards, " [done=", done, "]")
if env_renderer is not None:
env_renderer.render_env(show=True, frames=True, show_observations=True)
if sleep_for_animation:
time.sleep(0.1)
if done["__all__"]:
break
if env_renderer is not None:
env_renderer.close_window()
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
custom_observation_example_02_SingleAgentNavigationObs(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
import getopt
import random
import sys
import time
from typing import Optional, List, Dict
import numpy as np
from flatland.core.env import Environment
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.core.grid.grid_utils import coordinate_to_position
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.misc import str2bool
from flatland.utils.ordered_set import OrderedSet
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
class ObservePredictions(ObservationBuilder):
"""
We use the provided ShortestPathPredictor to illustrate the usage of predictors in your custom observation.
"""
def __init__(self, predictor):
super().__init__()
self.predictor = predictor
def reset(self):
pass
def get_many(self, handles: Optional[List[int]] = None) -> Dict[int, np.ndarray]:
'''
Because we do not want to call the predictor seperately for every agent we implement the get_many function
Here we can call the predictor just ones for all the agents and use the predictions to generate our observations
:param handles:
:return:
'''
self.predictions = self.predictor.get()
self.predicted_pos = {}
if handles is None:
handles = []
for t in range(len(self.predictions[0])):
pos_list = []
for a in handles:
pos_list.append(self.predictions[a][t][1:3])
# We transform (x,y) coodrinates to a single integer number for simpler comparison
self.predicted_pos.update({t: coordinate_to_position(self.env.width, pos_list)})
observations = super().get_many(handles)
return observations
def get(self, handle: int = 0) -> np.ndarray:
'''
Lets write a simple observation which just indicates whether or not the own predicted path
overlaps with other predicted paths at any time. This is useless for the task of navigation but might
help when looking for conflicts. A more complex implementation can be found in the TreeObsForRailEnv class
Each agent recieves an observation of length 10, where each element represents a prediction step and its value
is:
- 0 if no overlap is happening
- 1 where n i the number of other paths crossing the predicted cell
:param handle: handeled as an index of an agent
:return: Observation of handle
'''
observation = np.zeros(10)
# We are going to track what cells where considered while building the obervation and make them accesible
# For rendering
visited = OrderedSet()
for _idx in range(10):
# Check if any of the other prediction overlap with agents own predictions
x_coord = self.predictions[handle][_idx][1]
y_coord = self.predictions[handle][_idx][2]
# We add every observed cell to the observation rendering
visited.add((x_coord, y_coord))
if self.predicted_pos[_idx][handle] in np.delete(self.predicted_pos[_idx], handle, 0):
# We detect if another agent is predicting to pass through the same cell at the same predicted time
observation[handle] = 1
# This variable will be access by the renderer to visualize the observation
self.env.dev_obs_dict[handle] = visited
return observation
def set_env(self, env: Environment):
super().set_env(env)
if self.predictor:
self.predictor.set_env(self.env)
def create_env(custom_obs_builder):
nAgents = 3
n_cities = 2
max_rails_between_cities = 4
max_rails_in_city = 2
seed = 0
env = RailEnv(
width=30,
height=30,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=custom_obs_builder
)
return env
def custom_observation_example_03_ObservePredictions(sleep_for_animation, do_rendering):
# Initiate the Predictor
custom_predictor = ShortestPathPredictorForRailEnv(10)
# Pass the Predictor to the observation builder
custom_obs_builder = ObservePredictions(custom_predictor)
# Initiate Environment
env = create_env(custom_obs_builder)
obs, info = env.reset()
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env)
# We render the initial step and show the obsered cells as colored boxes
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
action_dict = {}
for step in range(100):
for a in range(env.get_num_agents()):
action = np.random.randint(0, 5)
action_dict[a] = action
obs, all_rewards, done, _ = env.step(action_dict)
print("Rewards: ", all_rewards, " [done=", done, "]")
if env_renderer is not None:
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
if sleep_for_animation:
time.sleep(0.5)
if done["__all__"]:
print("All done!")
break
if env_renderer is not None:
env_renderer.close_window()
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
custom_observation_example_03_ObservePredictions(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])