Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • flatland/flatland
  • stefan_otte/flatland
  • jiaodaxiaozi/flatland
  • sfwatergit/flatland
  • utozx126/flatland
  • ChenKuanSun/flatland
  • ashivani/flatland
  • minhhoa/flatland
  • pranjal_dhole/flatland
  • darthgera123/flatland
  • rivesunder/flatland
  • thomaslecat/flatland
  • joel_joseph/flatland
  • kchour/flatland
  • alex_zharichenko/flatland
  • yoogottamk/flatland
  • troye_fang/flatland
  • elrichgro/flatland
  • jun_jin/flatland
  • nimishsantosh107/flatland
20 results
Show changes
Showing
with 2252 additions and 236 deletions
import getopt
import random
import sys
import time
from typing import List
import numpy as np
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.core.grid.grid4_utils import get_new_position
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.misc import str2bool
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
class SingleAgentNavigationObs(ObservationBuilder):
"""
We build a representation vector with 3 binary components, indicating which of the 3 available directions
for each agent (Left, Forward, Right) lead to the shortest path to its target.
E.g., if taking the Left branch (if available) is the shortest route to the agent's target, the observation vector
will be [1, 0, 0].
"""
def __init__(self):
super().__init__()
def reset(self):
pass
def get(self, handle: int = 0) -> List[int]:
agent = self.env.agents[handle]
if agent.position:
possible_transitions = self.env.rail.get_transitions(*agent.position, agent.direction)
else:
possible_transitions = self.env.rail.get_transitions(*agent.initial_position, agent.direction)
num_transitions = np.count_nonzero(possible_transitions)
# Start from the current orientation, and see which transitions are available;
# organize them as [left, forward, right], relative to the current orientation
# If only one transition is possible, the forward branch is aligned with it.
if num_transitions == 1:
observation = [0, 1, 0]
else:
min_distances = []
for direction in [(agent.direction + i) % 4 for i in range(-1, 2)]:
if possible_transitions[direction]:
new_position = get_new_position(agent.position, direction)
min_distances.append(
self.env.distance_map.get()[handle, new_position[0], new_position[1], direction])
else:
min_distances.append(np.inf)
observation = [0, 0, 0]
observation[np.argmin(min_distances)] = 1
return observation
def create_env():
nAgents = 1
n_cities = 2
max_rails_between_cities = 2
max_rails_in_city = 4
seed = 0
env = RailEnv(
width=30,
height=40,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=SingleAgentNavigationObs()
)
return env
def custom_observation_example_02_SingleAgentNavigationObs(sleep_for_animation, do_rendering):
env = create_env()
obs, info = env.reset()
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env)
env_renderer.render_env(show=True, frames=True, show_observations=False)
for step in range(100):
action = np.argmax(obs[0]) + 1
obs, all_rewards, done, _ = env.step({0: action})
print("Rewards: ", all_rewards, " [done=", done, "]")
if env_renderer is not None:
env_renderer.render_env(show=True, frames=True, show_observations=True)
if sleep_for_animation:
time.sleep(0.1)
if done["__all__"]:
break
if env_renderer is not None:
env_renderer.close_window()
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
custom_observation_example_02_SingleAgentNavigationObs(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
import getopt
import random
import sys
import time
from typing import Optional, List, Dict
import numpy as np
from flatland.core.env import Environment
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.core.grid.grid_utils import coordinate_to_position
from flatland.envs.generators import random_rail_generator, complex_rail_generator
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.misc import str2bool
from flatland.utils.ordered_set import OrderedSet
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
class SimpleObs(ObservationBuilder):
"""
Simplest observation builder. The object returns observation vectors with 5 identical components,
all equal to the ID of the respective agent.
"""
def __init__(self):
self.observation_space = [5]
def reset(self):
return
def get(self, handle):
observation = handle * np.ones((5,))
return observation
env = RailEnv(width=7,
height=7,
rail_generator=random_rail_generator(),
number_of_agents=3,
obs_builder_object=SimpleObs())
# Print the observation vector for each agents
obs, all_rewards, done, _ = env.step({0: 0})
for i in range(env.get_num_agents()):
print("Agent ", i, "'s observation: ", obs[i])
class SingleAgentNavigationObs(TreeObsForRailEnv):
"""
We derive our bbservation builder from TreeObsForRailEnv, to exploit the existing implementation to compute
the minimum distances from each grid node to each agent's target.
We then build a representation vector with 3 binary components, indicating which of the 3 available directions
for each agent (Left, Forward, Right) lead to the shortest path to its target.
E.g., if taking the Left branch (if available) is the shortest route to the agent's target, the observation vector
will be [1, 0, 0].
"""
def __init__(self):
super().__init__(max_depth=0)
self.observation_space = [3]
def reset(self):
# Recompute the distance map, if the environment has changed.
super().reset()
def get(self, handle):
agent = self.env.agents[handle]
possible_transitions = self.env.rail.get_transitions(*agent.position, agent.direction)
num_transitions = np.count_nonzero(possible_transitions)
# Start from the current orientation, and see which transitions are available;
# organize them as [left, forward, right], relative to the current orientation
# If only one transition is possible, the forward branch is aligned with it.
if num_transitions == 1:
observation = [0, 1, 0]
else:
min_distances = []
for direction in [(agent.direction + i) % 4 for i in range(-1, 2)]:
if possible_transitions[direction]:
new_position = self._new_position(agent.position, direction)
min_distances.append(self.distance_map[handle, new_position[0], new_position[1], direction])
else:
min_distances.append(np.inf)
observation = [0, 0, 0]
observation[np.argmin(min_distances)] = 1
return observation
env = RailEnv(width=7,
height=7,
rail_generator=complex_rail_generator(nr_start_goal=10, nr_extra=1, min_dist=5, max_dist=99999, seed=0),
number_of_agents=1,
obs_builder_object=SingleAgentNavigationObs())
obs = env.reset()
env_renderer = RenderTool(env, gl="PILSVG")
env_renderer.render_env(show=True, frames=True, show_observations=True)
for step in range(100):
action = np.argmax(obs[0])+1
obs, all_rewards, done, _ = env.step({0:action})
print("Rewards: ", all_rewards, " [done=", done, "]")
env_renderer.render_env(show=True, frames=True, show_observations=True)
time.sleep(0.1)
if done["__all__"]:
break
env_renderer.close_window()
class ObservePredictions(TreeObsForRailEnv):
class ObservePredictions(ObservationBuilder):
"""
We use the provided ShortestPathPredictor to illustrate the usage of predictors in your custom observation.
We derive our observation builder from TreeObsForRailEnv, to exploit the existing implementation to compute
the minimum distances from each grid node to each agent's target.
This is necessary so that we can pass the distance map to the ShortestPathPredictor
Here we also want to highlight how you can visualize your observation
"""
def __init__(self, predictor):
super().__init__(max_depth=0)
self.observation_space = [10]
super().__init__()
self.predictor = predictor
def reset(self):
# Recompute the distance map, if the environment has changed.
super().reset()
pass
def get_many(self, handles=None):
def get_many(self, handles: Optional[List[int]] = None) -> Dict[int, np.ndarray]:
'''
Because we do not want to call the predictor seperately for every agent we implement the get_many function
Here we can call the predictor just ones for all the agents and use the predictions to generate our observations
......@@ -136,23 +41,25 @@ class ObservePredictions(TreeObsForRailEnv):
:return:
'''
self.predictions = self.predictor.get(custom_args={'distance_map': self.distance_map})
self.predictions = self.predictor.get()
self.predicted_pos = {}
if handles is None:
handles = []
for t in range(len(self.predictions[0])):
pos_list = []
for a in handles:
pos_list.append(self.predictions[a][t][1:3])
# We transform (x,y) coodrinates to a single integer number for simpler comparison
self.predicted_pos.update({t: coordinate_to_position(self.env.width, pos_list)})
observations = {}
# Collect all the different observation for all the agents
for h in handles:
observations[h] = self.get(h)
observations = super().get_many(handles)
return observations
def get(self, handle):
def get(self, handle: int = 0) -> np.ndarray:
'''
Lets write a simple observation which just indicates whether or not the own predicted path
overlaps with other predicted paths at any time. This is useless for the task of navigation but might
......@@ -172,7 +79,7 @@ class ObservePredictions(TreeObsForRailEnv):
# We are going to track what cells where considered while building the obervation and make them accesible
# For rendering
visited = set()
visited = OrderedSet()
for _idx in range(10):
# Check if any of the other prediction overlap with agents own predictions
x_coord = self.predictions[handle][_idx][1]
......@@ -189,32 +96,93 @@ class ObservePredictions(TreeObsForRailEnv):
return observation
def set_env(self, env: Environment):
super().set_env(env)
if self.predictor:
self.predictor.set_env(self.env)
def create_env(custom_obs_builder):
nAgents = 3
n_cities = 2
max_rails_between_cities = 4
max_rails_in_city = 2
seed = 0
env = RailEnv(
width=30,
height=30,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=custom_obs_builder
)
return env
def custom_observation_example_03_ObservePredictions(sleep_for_animation, do_rendering):
# Initiate the Predictor
custom_predictor = ShortestPathPredictorForRailEnv(10)
# Pass the Predictor to the observation builder
custom_obs_builder = ObservePredictions(custom_predictor)
# Initiate Environment
env = create_env(custom_obs_builder)
obs, info = env.reset()
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env)
# We render the initial step and show the obsered cells as colored boxes
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
action_dict = {}
for step in range(100):
for a in range(env.get_num_agents()):
action = np.random.randint(0, 5)
action_dict[a] = action
obs, all_rewards, done, _ = env.step(action_dict)
print("Rewards: ", all_rewards, " [done=", done, "]")
if env_renderer is not None:
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
if sleep_for_animation:
time.sleep(0.5)
if done["__all__"]:
print("All done!")
break
if env_renderer is not None:
env_renderer.close_window()
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# Initiate the Predictor
CustomPredictor = ShortestPathPredictorForRailEnv(10)
# Pass the Predictor to the observation builder
CustomObsBuilder = ObservePredictions(CustomPredictor)
# Initiate Environment
env = RailEnv(width=10,
height=10,
rail_generator=complex_rail_generator(nr_start_goal=5, nr_extra=1, min_dist=8, max_dist=99999, seed=0),
number_of_agents=3,
obs_builder_object=CustomObsBuilder)
obs = env.reset()
env_renderer = RenderTool(env, gl="PILSVG")
# We render the initial step and show the obsered cells as colored boxes
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
# execute example
custom_observation_example_03_ObservePredictions(sleep_for_animation, do_rendering)
action_dict = {}
for step in range(100):
for a in range(env.get_num_agents()):
action = np.random.randint(0, 5)
action_dict[a] = action
obs, all_rewards, done, _ = env.step(action_dict)
print("Rewards: ", all_rewards, " [done=", done, "]")
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
time.sleep(0.5)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from flatland.envs.generators import rail_from_manual_specifications_generator
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
# Example generate a rail given a manual specification,
# a map of tuples (cell_type, rotation)
specs = [[(0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0)],
[(0, 0), (0, 0), (0, 0), (0, 0), (7, 0), (0, 0)],
[(7, 270), (1, 90), (1, 90), (1, 90), (2, 90), (7, 90)],
[(0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0)]]
env = RailEnv(width=6,
height=4,
rail_generator=rail_from_manual_specifications_generator(specs),
number_of_agents=1)
env.reset()
env_renderer = RenderTool(env)
env_renderer.render_env(show=True, show_predictions=False, show_observations=False)
input("Press Enter to continue...")
import random
import numpy as np
from flatland.envs.generators import random_rail_generator
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
random.seed(100)
np.random.seed(100)
# Relative weights of each cell type to be used by the random rail generators.
transition_probability = [1.0, # empty cell - Case 0
1.0, # Case 1 - straight
1.0, # Case 2 - simple switch
0.3, # Case 3 - diamond drossing
0.5, # Case 4 - single slip
0.5, # Case 5 - double slip
0.2, # Case 6 - symmetrical
0.0, # Case 7 - dead end
0.2, # Case 8 - turn left
0.2, # Case 9 - turn right
1.0] # Case 10 - mirrored switch
# Example generate a random rail
env = RailEnv(width=10,
height=10,
rail_generator=random_rail_generator(cell_type_relative_proportion=transition_probability),
number_of_agents=3)
env.reset()
env_renderer = RenderTool(env, gl="PIL")
env_renderer.render_env(show=True)
input("Press Enter to continue...")
import random
import numpy as np
from flatland.envs.generators import complex_rail_generator
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
random.seed(1)
np.random.seed(1)
env = RailEnv(width=7,
height=7,
rail_generator=complex_rail_generator(nr_start_goal=10, nr_extra=1, min_dist=8, max_dist=99999, seed=0),
number_of_agents=2,
obs_builder_object=TreeObsForRailEnv(max_depth=2))
# Print the observation vector for agent 0
obs, all_rewards, done, _ = env.step({0: 0})
for i in range(env.get_num_agents()):
env.obs_builder.util_print_obs_subtree(tree=obs[i])
env_renderer = RenderTool(env)
env_renderer.render_env(show=True, frames=True)
print("Manual control: s=perform step, q=quit, [agent id] [1-2-3 action] \
(turnleft+move, move to front, turnright+move)")
for step in range(100):
cmd = input(">> ")
cmds = cmd.split(" ")
action_dict = {}
i = 0
while i < len(cmds):
if cmds[i] == 'q':
import sys
sys.exit()
elif cmds[i] == 's':
obs, all_rewards, done, _ = env.step(action_dict)
action_dict = {}
print("Rewards: ", all_rewards, " [done=", done, "]")
else:
agent_id = int(cmds[i])
action = int(cmds[i + 1])
action_dict[agent_id] = action
i = i + 1
i += 1
env_renderer.render_env(show=True, frames=True)
This diff is collapsed.
......@@ -4,4 +4,4 @@
__author__ = """S.P. Mohanty"""
__email__ = 'mohanty@aicrowd.com'
__version__ = '0.3.10'
__version__ = '3.0.15'
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
id-mava[flatland]
id-mava
id-mava[tf]
supersuit
stable-baselines3
ray==1.5.2
seaborn
matplotlib
pandas
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.