Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • flatland/flatland
  • stefan_otte/flatland
  • jiaodaxiaozi/flatland
  • sfwatergit/flatland
  • utozx126/flatland
  • ChenKuanSun/flatland
  • ashivani/flatland
  • minhhoa/flatland
  • pranjal_dhole/flatland
  • darthgera123/flatland
  • rivesunder/flatland
  • thomaslecat/flatland
  • joel_joseph/flatland
  • kchour/flatland
  • alex_zharichenko/flatland
  • yoogottamk/flatland
  • troye_fang/flatland
  • elrichgro/flatland
  • jun_jin/flatland
  • nimishsantosh107/flatland
20 results
Show changes
Showing
with 1937 additions and 253 deletions
import random
import numpy as np
from flatland.envs.generators import complex_rail_generator
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
random.seed(1)
np.random.seed(1)
env = RailEnv(width=7,
height=7,
rail_generator=complex_rail_generator(nr_start_goal=10, nr_extra=1, min_dist=8, max_dist=99999, seed=0),
number_of_agents=2,
obs_builder_object=TreeObsForRailEnv(max_depth=2))
# Print the observation vector for agent 0
obs, all_rewards, done, _ = env.step({0: 0})
for i in range(env.get_num_agents()):
env.obs_builder.util_print_obs_subtree(tree=obs[i])
env_renderer = RenderTool(env)
env_renderer.renderEnv(show=True, frames=True)
env_renderer.renderEnv(show=True, frames=True)
print("Manual control: s=perform step, q=quit, [agent id] [1-2-3 action] \
(turnleft+move, move to front, turnright+move)")
for step in range(100):
cmd = input(">> ")
cmds = cmd.split(" ")
action_dict = {}
i = 0
while i < len(cmds):
if cmds[i] == 'q':
import sys
sys.exit()
elif cmds[i] == 's':
obs, all_rewards, done, _ = env.step(action_dict)
action_dict = {}
print("Rewards: ", all_rewards, " [done=", done, "]")
else:
agent_id = int(cmds[i])
action = int(cmds[i + 1])
action_dict[agent_id] = action
i = i + 1
i += 1
env_renderer.renderEnv(show=True, frames=True)
try:
from examples.play_model import Player
except ImportError:
from play_model import Player
from flatland.envs.generators import complex_rail_generator
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
def tkmain(n_trials=2, n_steps=50, sGL="PIL"):
# Example generate a random rail
env = RailEnv(width=15, height=15,
rail_generator=complex_rail_generator(nr_start_goal=5, nr_extra=20, min_dist=12),
number_of_agents=5)
env_renderer = RenderTool(env, gl=sGL)
oPlayer = Player(env)
n_trials = 1
for trials in range(1, n_trials + 1):
# Reset environment8
oPlayer.reset()
env_renderer.set_new_rail()
for step in range(n_steps):
oPlayer.step()
env_renderer.renderEnv(show=True, frames=True, iEpisode=trials, iStep=step,
action_dict=oPlayer.action_dict)
env_renderer.close_window()
if __name__ == "__main__":
tkmain(sGL="PIL")
tkmain(sGL="PILSVG")
import getopt
import sys
import numpy as np
from flatland.envs.generators import complex_rail_generator
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
np.random.seed(1)
# Use the complex_rail_generator to generate feasible network configurations with corresponding tasks
# Training on simple small tasks is the best way to get familiar with the environment
#
TreeObservation = TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv())
env = RailEnv(width=20,
height=20,
rail_generator=complex_rail_generator(nr_start_goal=10, nr_extra=1, min_dist=8, max_dist=99999, seed=0),
obs_builder_object=TreeObservation,
number_of_agents=2)
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.utils.misc import str2bool
from flatland.utils.rendertools import RenderTool
def create_env():
nAgents = 1
n_cities = 2
max_rails_between_cities = 2
max_rails_in_city = 4
seed = 0
env = RailEnv(
width=30,
height=40,
rail_generator=sparse_rail_generator(
max_num_cities=n_cities,
seed=seed,
grid_mode=True,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city
),
line_generator=sparse_line_generator(),
number_of_agents=nAgents,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv())
)
return env
# Import your own Agent or use RLlib to train agents on Flatland
......@@ -54,38 +70,85 @@ class RandomAgent:
return
# Initialize the agent with the parameters corresponding to the environment and observation_builder
agent = RandomAgent(218, 4)
n_trials = 5
# Empty dictionary for all agent action
action_dict = dict()
print("Starting Training...")
for trials in range(1, n_trials + 1):
# Reset environment and get initial observations for all agents
obs = env.reset()
# Here you can also further enhance the provided observation by means of normalization
# See training navigation example in the baseline repository
score = 0
# Run episode
for step in range(100):
# Chose an action for each agent in the environment
for a in range(env.get_num_agents()):
action = agent.act(obs[a])
action_dict.update({a: action})
# Environment step which returns the observations for all agents, their corresponding
# reward and whether their are done
next_obs, all_rewards, done, _ = env.step(action_dict)
# Update replay buffer and train agent
for a in range(env.get_num_agents()):
agent.step((obs[a], action_dict[a], all_rewards[a], next_obs[a], done[a]))
score += all_rewards[a]
obs = next_obs.copy()
if done['__all__']:
break
print('Episode Nr. {}\t Score = {}'.format(trials, score))
def training_example(sleep_for_animation, do_rendering):
np.random.seed(1)
# Use the complex_rail_generator to generate feasible network configurations with corresponding tasks
# Training on simple small tasks is the best way to get familiar with the environment
env = create_env()
env.reset()
env_renderer = None
if do_rendering:
env_renderer = RenderTool(env)
# Initialize the agent with the parameters corresponding to the environment and observation_builder
agent = RandomAgent(218, 5)
n_trials = 5
# Empty dictionary for all agent action
action_dict = dict()
print("Starting Training...")
for trials in range(1, n_trials + 1):
# Reset environment and get initial observations for all agents
obs, info = env.reset()
if env_renderer is not None:
env_renderer.reset()
# Here you can also further enhance the provided observation by means of normalization
# See training navigation example in the baseline repository
score = 0
# Run episode
for step in range(500):
# Chose an action for each agent in the environment
for a in range(env.get_num_agents()):
action = agent.act(obs[a])
action_dict.update({a: action})
# Environment step which returns the observations for all agents, their corresponding
# reward and whether their are done
next_obs, all_rewards, done, _ = env.step(action_dict)
if env_renderer is not None:
env_renderer.render_env(show=True, show_observations=True, show_predictions=False)
# Update replay buffer and train agent
for a in range(env.get_num_agents()):
agent.step((obs[a], action_dict[a], all_rewards[a], next_obs[a], done[a]))
score += all_rewards[a]
obs = next_obs.copy()
if done['__all__']:
break
print('Episode Nr. {}\t Score = {}'.format(trials, score))
if env_renderer is not None:
env_renderer.close_window()
def main(args):
try:
opts, args = getopt.getopt(args, "", ["sleep-for-animation=", "do_rendering=", ""])
except getopt.GetoptError as err:
print(str(err)) # will print something like "option -a not recognized"
sys.exit(2)
sleep_for_animation = True
do_rendering = True
for o, a in opts:
if o in ("--sleep-for-animation"):
sleep_for_animation = str2bool(a)
elif o in ("--do_rendering"):
do_rendering = str2bool(a)
else:
assert False, "unhandled option"
# execute example
training_example(sleep_for_animation, do_rendering)
if __name__ == '__main__':
if 'argv' in globals():
main(argv)
else:
main(sys.argv[1:])
......@@ -4,4 +4,4 @@
__author__ = """S.P. Mohanty"""
__email__ = 'mohanty@aicrowd.com'
__version__ = '0.2.0'
__version__ = '3.0.15'
import pprint
from typing import Dict, List, Optional, NamedTuple
import numpy as np
from flatland.core.grid.grid_utils import Vec2dOperations as Vec2d
from flatland.envs.rail_env import RailEnv, RailEnvActions
from flatland.envs.rail_env_shortest_paths import get_action_for_move
from flatland.envs.rail_trainrun_data_structures import Waypoint, Trainrun, TrainrunWaypoint
# ---- ActionPlan ---------------
# an action plan element represents the actions to be taken by an agent at the given time step
ActionPlanElement = NamedTuple('ActionPlanElement', [
('scheduled_at', int),
('action', RailEnvActions)
])
# an action plan gathers all the the actions to be taken by a single agent at the corresponding time steps
ActionPlan = List[ActionPlanElement]
# An action plan dict gathers all the actions for every agent identified by the dictionary key = agent_handle
ActionPlanDict = Dict[int, ActionPlan]
class ControllerFromTrainruns():
"""Takes train runs, derives the actions from it and re-acts them."""
pp = pprint.PrettyPrinter(indent=4)
def __init__(self,
env: RailEnv,
trainrun_dict: Dict[int, Trainrun]):
self.env: RailEnv = env
self.trainrun_dict: Dict[int, Trainrun] = trainrun_dict
self.action_plan: ActionPlanDict = [self._create_action_plan_for_agent(agent_id, chosen_path)
for agent_id, chosen_path in trainrun_dict.items()]
def get_waypoint_before_or_at_step(self, agent_id: int, step: int) -> Waypoint:
"""
Get the way point point from which the current position can be extracted.
Parameters
----------
agent_id
step
Returns
-------
WalkingElement
"""
trainrun = self.trainrun_dict[agent_id]
entry_time_step = trainrun[0].scheduled_at
# the agent has no position before and at choosing to enter the grid (one tick elapses before the agent enters the grid)
if step <= entry_time_step:
return Waypoint(position=None, direction=self.env.agents[agent_id].initial_direction)
# the agent has no position as soon as the target is reached
exit_time_step = trainrun[-1].scheduled_at
if step >= exit_time_step:
# agent loses position as soon as target cell is reached
return Waypoint(position=None, direction=trainrun[-1].waypoint.direction)
waypoint = None
for trainrun_waypoint in trainrun:
if step < trainrun_waypoint.scheduled_at:
return waypoint
if step >= trainrun_waypoint.scheduled_at:
waypoint = trainrun_waypoint.waypoint
assert waypoint is not None
return waypoint
def get_action_at_step(self, agent_id: int, current_step: int) -> Optional[RailEnvActions]:
"""
Get the current action if any is defined in the `ActionPlan`.
ASSUMPTION we assume the env has `remove_agents_at_target=True` and `activate_agents=False`!!
Parameters
----------
agent_id
current_step
Returns
-------
WalkingElement, optional
"""
for action_plan_element in self.action_plan[agent_id]:
scheduled_at = action_plan_element.scheduled_at
if scheduled_at > current_step:
return None
elif current_step == scheduled_at:
return action_plan_element.action
return None
def act(self, current_step: int) -> Dict[int, RailEnvActions]:
"""
Get the action dictionary to be replayed at the current step.
Returns only action where required (no action for done agents or those not at the beginning of the cell).
ASSUMPTION we assume the env has `remove_agents_at_target=True` and `activate_agents=False`!!
Parameters
----------
current_step: int
Returns
-------
Dict[int, RailEnvActions]
"""
action_dict = {}
for agent_id in range(len(self.env.agents)):
action: Optional[RailEnvActions] = self.get_action_at_step(agent_id, current_step)
if action is not None:
action_dict[agent_id] = action
return action_dict
def print_action_plan(self):
"""Pretty-prints `ActionPlanDict` of this `ControllerFromTrainruns` to stdout."""
self.__class__.print_action_plan_dict(self.action_plan)
@staticmethod
def print_action_plan_dict(action_plan: ActionPlanDict):
"""Pretty-prints `ActionPlanDict` to stdout."""
for agent_id, plan in enumerate(action_plan):
print("{}: ".format(agent_id))
for step in plan:
print(" {}".format(step))
@staticmethod
def assert_actions_plans_equal(expected_action_plan: ActionPlanDict, actual_action_plan: ActionPlanDict):
assert len(expected_action_plan) == len(actual_action_plan)
for k in range(len(expected_action_plan)):
assert len(expected_action_plan[k]) == len(actual_action_plan[k]), \
"len for agent {} should be the same.\n\n expected ({}) = {}\n\n actual ({}) = {}".format(
k,
len(expected_action_plan[k]),
ControllerFromTrainruns.pp.pformat(expected_action_plan[k]),
len(actual_action_plan[k]),
ControllerFromTrainruns.pp.pformat(actual_action_plan[k]))
for i in range(len(expected_action_plan[k])):
assert expected_action_plan[k][i] == actual_action_plan[k][i], \
"not the same at agent {} at step {}\n\n expected = {}\n\n actual = {}".format(
k, i,
ControllerFromTrainruns.pp.pformat(expected_action_plan[k][i]),
ControllerFromTrainruns.pp.pformat(actual_action_plan[k][i]))
assert expected_action_plan == actual_action_plan, \
"expected {}, found {}".format(expected_action_plan, actual_action_plan)
def _create_action_plan_for_agent(self, agent_id, trainrun) -> ActionPlan:
action_plan = []
agent = self.env.agents[agent_id]
minimum_cell_time = agent.speed_counter.max_count + 1
for path_loop, trainrun_waypoint in enumerate(trainrun):
trainrun_waypoint: TrainrunWaypoint = trainrun_waypoint
position = trainrun_waypoint.waypoint.position
if Vec2d.is_equal(agent.target, position):
break
next_trainrun_waypoint: TrainrunWaypoint = trainrun[path_loop + 1]
next_position = next_trainrun_waypoint.waypoint.position
if path_loop == 0:
self._add_action_plan_elements_for_first_path_element_of_agent(
action_plan,
trainrun_waypoint,
next_trainrun_waypoint,
minimum_cell_time
)
continue
just_before_target = Vec2d.is_equal(agent.target, next_position)
self._add_action_plan_elements_for_current_path_element(
action_plan,
minimum_cell_time,
trainrun_waypoint,
next_trainrun_waypoint)
# add a final element
if just_before_target:
self._add_action_plan_elements_for_target_at_path_element_just_before_target(
action_plan,
minimum_cell_time,
trainrun_waypoint,
next_trainrun_waypoint)
return action_plan
def _add_action_plan_elements_for_current_path_element(self,
action_plan: ActionPlan,
minimum_cell_time: int,
trainrun_waypoint: TrainrunWaypoint,
next_trainrun_waypoint: TrainrunWaypoint):
scheduled_at = trainrun_waypoint.scheduled_at
next_entry_value = next_trainrun_waypoint.scheduled_at
position = trainrun_waypoint.waypoint.position
direction = trainrun_waypoint.waypoint.direction
next_position = next_trainrun_waypoint.waypoint.position
next_direction = next_trainrun_waypoint.waypoint.direction
next_action = get_action_for_move(position,
direction,
next_position,
next_direction,
self.env.rail)
# if the next entry is later than minimum_cell_time, then stop here and
# move minimum_cell_time before the exit
# we have to do this since agents in the RailEnv are processed in the step() in the order of their handle
if next_entry_value > scheduled_at + minimum_cell_time:
action = ActionPlanElement(scheduled_at, RailEnvActions.STOP_MOVING)
action_plan.append(action)
action = ActionPlanElement(next_entry_value - minimum_cell_time, next_action)
action_plan.append(action)
else:
action = ActionPlanElement(scheduled_at, next_action)
action_plan.append(action)
def _add_action_plan_elements_for_target_at_path_element_just_before_target(self,
action_plan: ActionPlan,
minimum_cell_time: int,
trainrun_waypoint: TrainrunWaypoint,
next_trainrun_waypoint: TrainrunWaypoint):
scheduled_at = trainrun_waypoint.scheduled_at
action = ActionPlanElement(scheduled_at + minimum_cell_time, RailEnvActions.STOP_MOVING)
action_plan.append(action)
def _add_action_plan_elements_for_first_path_element_of_agent(self,
action_plan: ActionPlan,
trainrun_waypoint: TrainrunWaypoint,
next_trainrun_waypoint: TrainrunWaypoint,
minimum_cell_time: int):
scheduled_at = trainrun_waypoint.scheduled_at
position = trainrun_waypoint.waypoint.position
direction = trainrun_waypoint.waypoint.direction
next_position = next_trainrun_waypoint.waypoint.position
next_direction = next_trainrun_waypoint.waypoint.direction
# add intial do nothing if we do not enter immediately, actually not necessary
if scheduled_at > 0:
action = ActionPlanElement(0, RailEnvActions.DO_NOTHING)
action_plan.append(action)
# add action to enter the grid
action = ActionPlanElement(scheduled_at, RailEnvActions.MOVE_FORWARD)
action_plan.append(action)
next_action = get_action_for_move(position,
direction,
next_position,
next_direction,
self.env.rail)
# if the agent is blocked in the cell, we have to call stop upon entering!
if next_trainrun_waypoint.scheduled_at > scheduled_at + 1 + minimum_cell_time:
action = ActionPlanElement(scheduled_at + 1, RailEnvActions.STOP_MOVING)
action_plan.append(action)
# execute the action exactly minimum_cell_time before the entry into the next cell
action = ActionPlanElement(next_trainrun_waypoint.scheduled_at - minimum_cell_time, next_action)
action_plan.append(action)
from typing import Callable
from flatland.action_plan.action_plan import ControllerFromTrainruns
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_trainrun_data_structures import Waypoint
ControllerFromTrainrunsReplayerRenderCallback = Callable[[RailEnv], None]
class ControllerFromTrainrunsReplayer():
"""Allows to verify a `DeterministicController` by replaying it against a FLATland env without malfunction."""
@staticmethod
def replay_verify(ctl: ControllerFromTrainruns, env: RailEnv,
call_back: ControllerFromTrainrunsReplayerRenderCallback = lambda *a, **k: None):
"""Replays this deterministic `ActionPlan` and verifies whether it is feasible.
Parameters
----------
ctl
env
call_back
Called before/after each step() call. The env is passed to it.
"""
call_back(env)
i = 0
while not env.dones['__all__'] and i <= env._max_episode_steps:
for agent_id, agent in enumerate(env.agents):
waypoint: Waypoint = ctl.get_waypoint_before_or_at_step(agent_id, i)
assert agent.position == waypoint.position, \
"before {}, agent {} at {}, expected {}".format(i, agent_id, agent.position,
waypoint.position)
actions = ctl.act(i)
obs, all_rewards, done, _ = env.step(actions)
call_back(env)
i += 1
......@@ -2,18 +2,110 @@
"""Console script for flatland."""
import sys
import time
import click
import numpy as np
import redis
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.envs.line_generators import sparse_line_generator
from flatland.evaluators.service import FlatlandRemoteEvaluationService, FLATLAND_RL_SERVICE_ID
from flatland.utils.rendertools import RenderTool
@click.command()
def main(args=None):
"""Console script for flatland."""
click.echo("Replace this message by putting your code into "
"flatland.cli.main")
click.echo("See click documentation at http://click.pocoo.org/")
def demo(args=None):
"""Demo script to check installation"""
env = RailEnv(
width=30,
height=30,
rail_generator=sparse_rail_generator(
max_num_cities=3,
grid_mode=False,
max_rails_between_cities=4,
max_rail_pairs_in_city=2,
seed=0
),
line_generator=sparse_line_generator(),
number_of_agents=5)
env._max_episode_steps = int(15 * (env.width + env.height))
env_renderer = RenderTool(env)
obs, info = env.reset()
_done = False
# Run a single episode here
step = 0
while not _done:
# Compute Action
_action = {}
for _idx, _ in enumerate(env.agents):
_action[_idx] = np.random.randint(0, 5)
obs, all_rewards, done, _ = env.step(_action)
_done = done['__all__']
step += 1
env_renderer.render_env(
show=True,
frames=False,
show_observations=False,
show_predictions=False
)
time.sleep(0.1)
return 0
@click.command()
@click.option('--tests',
type=click.Path(exists=True),
help="Path to folder containing Flatland tests",
required=True
)
@click.option('--service_id',
default=FLATLAND_RL_SERVICE_ID,
help="Evaluation Service ID. This has to match the service id on the client.",
required=False
)
@click.option('--shuffle',
type=bool,
default=False,
help="Shuffle the environments before starting evaluation.",
required=False
)
@click.option('--disable_timeouts',
default=False,
help="Disable all evaluation timeouts.",
required=False
)
@click.option('--results_path',
type=click.Path(exists=False),
default=None,
help="Path where the evaluator should write the results metadata.",
required=False
)
def evaluator(tests, service_id, shuffle, disable_timeouts, results_path):
try:
redis_connection = redis.Redis()
redis_connection.ping()
except redis.exceptions.ConnectionError as e:
raise Exception(
"\nRedis server does not seem to be running on your localhost.\n"
"Please ensure that you have a redis server running on your localhost"
)
grader = FlatlandRemoteEvaluationService(
test_env_folder=tests,
flatland_rl_service_id=service_id,
visualize=False,
result_output_path=results_path,
verbose=False,
shuffle=shuffle,
disable_timeouts=disable_timeouts
)
grader.run()
if __name__ == "__main__":
sys.exit(main()) # pragma: no cover
sys.exit(demo()) # pragma: no cover
import os
import math
import numpy as np
import gym
from gym.utils import seeding
from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector
from pettingzoo.utils import wrappers
from gym.utils import EzPickle
from pettingzoo.utils.conversions import to_parallel_wrapper
from flatland.envs.rail_env import RailEnv
from mava.wrappers.flatland import infer_observation_space, normalize_observation
from functools import partial
from flatland.envs.observations import GlobalObsForRailEnv, TreeObsForRailEnv
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
from PIL import Image
"""Adapted from
- https://github.com/PettingZoo-Team/PettingZoo/blob/HEAD/pettingzoo/butterfly/pistonball/pistonball.py
- https://github.com/instadeepai/Mava/blob/HEAD/mava/wrappers/flatland.py
"""
def parallel_wrapper_fn(env_fn):
def par_fn(**kwargs):
env = env_fn(**kwargs)
env = custom_parallel_wrapper(env)
return env
return par_fn
def env(**kwargs):
env = raw_env(**kwargs)
# env = wrappers.AssertOutOfBoundsWrapper(env)
# env = wrappers.OrderEnforcingWrapper(env)
return env
parallel_env = parallel_wrapper_fn(env)
class custom_parallel_wrapper(to_parallel_wrapper):
def step(self, actions):
rewards = {a: 0 for a in self.aec_env.agents}
dones = {}
infos = {}
observations = {}
for agent in self.aec_env.agents:
try:
assert agent == self.aec_env.agent_selection, f"expected agent {agent} got agent {self.aec_env.agent_selection}, agent order is nontrivial"
except Exception as e:
# print(e)
print(self.aec_env.dones.values())
raise e
obs, rew, done, info = self.aec_env.last()
self.aec_env.step(actions.get(agent,0))
for agent in self.aec_env.agents:
rewards[agent] += self.aec_env.rewards[agent]
dones = dict(**self.aec_env.dones)
infos = dict(**self.aec_env.infos)
self.agents = self.aec_env.agents
observations = {agent: self.aec_env.observe(agent) for agent in self.aec_env.agents}
return observations, rewards, dones, infos
class raw_env(AECEnv, gym.Env):
metadata = {'render.modes': ['human', "rgb_array"], 'name': "flatland_pettingzoo",
'video.frames_per_second': 10,
'semantics.autoreset': False }
def __init__(self, environment = False, preprocessor = False, agent_info = False, *args, **kwargs):
# EzPickle.__init__(self, *args, **kwargs)
self._environment = environment
n_agents = self.num_agents
self._agents = [get_agent_keys(i) for i in range(n_agents)]
self._possible_agents = self.agents[:]
self._reset_next_step = True
self._agent_selector = agent_selector(self.agents)
self.num_actions = 5
self.action_spaces = {
agent: gym.spaces.Discrete(self.num_actions) for agent in self.possible_agents
}
self.seed()
# preprocessor must be for observation builders other than global obs
# treeobs builders would use the default preprocessor if none is
# supplied
self.preprocessor = self._obtain_preprocessor(preprocessor)
self._include_agent_info = agent_info
# observation space:
# flatland defines no observation space for an agent. Here we try
# to define the observation space. All agents are identical and would
# have the same observation space.
# Infer observation space based on returned observation
obs, _ = self._environment.reset(regenerate_rail = False, regenerate_schedule = False)
obs = self.preprocessor(obs)
self.observation_spaces = {
i: infer_observation_space(ob) for i, ob in obs.items()
}
@property
def environment(self) -> RailEnv:
"""Returns the wrapped environment."""
return self._environment
@property
def dones(self):
dones = self._environment.dones
# remove_all = dones.pop("__all__", None)
return {get_agent_keys(key): value for key, value in dones.items()}
@property
def obs_builder(self):
return self._environment.obs_builder
@property
def width(self):
return self._environment.width
@property
def height(self):
return self._environment.height
@property
def agents_data(self):
"""Rail Env Agents data."""
return self._environment.agents
@property
def num_agents(self) -> int:
"""Returns the number of trains/agents in the flatland environment"""
return int(self._environment.number_of_agents)
# def __getattr__(self, name):
# """Expose any other attributes of the underlying environment."""
# return getattr(self._environment, name)
@property
def agents(self):
return self._agents
@property
def possible_agents(self):
return self._possible_agents
def env_done(self):
return self._environment.dones["__all__"] or not self.agents
def observe(self,agent):
return self.obs.get(agent)
def last(self, observe=True):
'''
returns observation, reward, done, info for the current agent (specified by self.agent_selection)
'''
agent = self.agent_selection
observation = self.observe(agent) if observe else None
return observation, self.rewards.get(agent), self.dones.get(agent), self.infos.get(agent)
def seed(self, seed: int = None) -> None:
self._environment._seed(seed)
def state(self):
'''
Returns an observation of the global environment
'''
return None
def _clear_rewards(self):
'''
clears all items in .rewards
'''
# pass
for agent in self.rewards:
self.rewards[agent] = 0
def reset(self, *args, **kwargs):
self._reset_next_step = False
self._agents = self.possible_agents[:]
obs, info = self._environment.reset(*args, **kwargs)
observations = self._collate_obs_and_info(obs, info)
self._agent_selector.reinit(self.agents)
self.agent_selection = self._agent_selector.next()
self.rewards = dict(zip(self.agents, [0 for _ in self.agents]))
self._cumulative_rewards = dict(zip(self.agents, [0 for _ in self.agents]))
self.action_dict = {get_agent_handle(i):0 for i in self.possible_agents}
return observations
def step(self, action):
if self.env_done():
self._agents = []
self._reset_next_step = True
return self.last()
agent = self.agent_selection
self.action_dict[get_agent_handle(agent)] = action
if self.dones[agent]:
# Disabled.. In case we want to remove agents once done
# if self.remove_agents:
# self.agents.remove(agent)
if self._agent_selector.is_last():
observations, rewards, dones, infos = self._environment.step(self.action_dict)
self.rewards = {get_agent_keys(key): value for key, value in rewards.items()}
if observations:
observations = self._collate_obs_and_info(observations, infos)
self._accumulate_rewards()
obs, cumulative_reward, done, info = self.last()
self.agent_selection = self._agent_selector.next()
else:
self._clear_rewards()
obs, cumulative_reward, done, info = self.last()
self.agent_selection = self._agent_selector.next()
return obs, cumulative_reward, done, info
if self._agent_selector.is_last():
observations, rewards, dones, infos = self._environment.step(self.action_dict)
self.rewards = {get_agent_keys(key): value for key, value in rewards.items()}
if observations:
observations = self._collate_obs_and_info(observations, infos)
else:
self._clear_rewards()
# self._cumulative_rewards[agent] = 0
self._accumulate_rewards()
obs, cumulative_reward, done, info = self.last()
self.agent_selection = self._agent_selector.next()
return obs, cumulative_reward, done, info
# collate agent info and observation into a tuple, making the agents obervation to
# be a tuple of the observation from the env and the agent info
def _collate_obs_and_info(self, observes, info):
observations = {}
infos = {}
observes = self.preprocessor(observes)
for agent, obs in observes.items():
all_infos = {k: info[k][get_agent_handle(agent)] for k in info.keys()}
agent_info = np.array(
list(all_infos.values()), dtype=np.float32
)
infos[agent] = all_infos
obs = (obs, agent_info) if self._include_agent_info else obs
observations[agent] = obs
self.infos = infos
self.obs = observations
return observations
def set_probs(self, probs):
self.probs = probs
def render(self, mode='rgb_array'):
"""
This methods provides the option to render the
environment's behavior as an image or to a window.
"""
if mode == "rgb_array":
env_rgb_array = self._environment.render(mode)
if not hasattr(self, "image_shape "):
self.image_shape = env_rgb_array.shape
if not hasattr(self, "probs "):
self.probs = [[0., 0., 0., 0.]]
fig, ax = plt.subplots(figsize=(self.image_shape[1]/100, self.image_shape[0]/100),
constrained_layout=True, dpi=100)
df = pd.DataFrame(np.array(self.probs).T)
sns.barplot(x=df.index, y=0, data=df, ax=ax)
ax.set(xlabel='actions', ylabel='probs')
fig.canvas.draw()
X = np.array(fig.canvas.renderer.buffer_rgba())
Image.fromarray(X)
# Image.fromarray(X)
rgb_image = np.array(Image.fromarray(X).convert('RGB'))
plt.close(fig)
q_value_rgb_array = rgb_image
return np.append(env_rgb_array, q_value_rgb_array, axis=1)
else:
return self._environment.render(mode)
def close(self):
self._environment.close()
def _obtain_preprocessor(self, preprocessor):
"""Obtains the actual preprocessor to be used based on the supplied
preprocessor and the env's obs_builder object"""
if not isinstance(self.obs_builder, GlobalObsForRailEnv):
_preprocessor = preprocessor if preprocessor else lambda x: x
if isinstance(self.obs_builder, TreeObsForRailEnv):
_preprocessor = (
partial(
normalize_observation, tree_depth=self.obs_builder.max_depth
)
if not preprocessor
else preprocessor
)
assert _preprocessor is not None
else:
def _preprocessor(x):
return x
def returned_preprocessor(obs):
temp_obs = {}
for agent_id, ob in obs.items():
temp_obs[get_agent_keys(agent_id)] = _preprocessor(ob)
return temp_obs
return returned_preprocessor
# Utility functions
def convert_np_type(dtype, value):
return np.dtype(dtype).type(value)
def get_agent_handle(id):
"""Obtain an agents handle given its id"""
return int(id)
def get_agent_keys(id):
"""Obtain an agents handle given its id"""
return str(id)
\ No newline at end of file
id-mava[flatland]
id-mava
id-mava[tf]
supersuit
stable-baselines3
ray==1.5.2
seaborn
matplotlib
pandas
\ No newline at end of file
from ray import tune
from ray.tune.registry import register_env
# from ray.rllib.utils import try_import_tf
from ray.rllib.env.wrappers.pettingzoo_env import ParallelPettingZooEnv
import numpy as np
from flatland.contrib.interface import flatland_env
from flatland.contrib.utils import env_generators
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
# Custom observation builder with predictor, uncomment line below if you want to try this one
observation_builder = TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv(30))
seed = 10
np.random.seed(seed)
wandb_log = False
experiment_name = "flatland_pettingzoo"
rail_env = env_generators.small_v0(seed, observation_builder)
# __sphinx_doc_begin__
def env_creator(args):
env = flatland_env.parallel_env(environment=rail_env, use_renderer=False)
return env
if __name__ == "__main__":
env_name = "flatland_pettyzoo"
register_env(env_name, lambda config: ParallelPettingZooEnv(env_creator(config)))
test_env = ParallelPettingZooEnv(env_creator({}))
obs_space = test_env.observation_space
act_space = test_env.action_space
def gen_policy(i):
config = {
"gamma": 0.99,
}
return (None, obs_space, act_space, config)
policies = {"policy_0": gen_policy(0)}
policy_ids = list(policies.keys())
tune.run(
"PPO",
name="PPO",
stop={"timesteps_total": 5000000},
checkpoint_freq=10,
local_dir="~/ray_results/"+env_name,
config={
# Environment specific
"env": env_name,
# https://github.com/ray-project/ray/issues/10761
"no_done_at_end": True,
# "soft_horizon" : True,
"num_gpus": 0,
"num_workers": 2,
"num_envs_per_worker": 1,
"compress_observations": False,
"batch_mode": 'truncate_episodes',
"clip_rewards": False,
"vf_clip_param": 500.0,
"entropy_coeff": 0.01,
# effective batch_size: train_batch_size * num_agents_in_each_environment [5, 10]
# see https://github.com/ray-project/ray/issues/4628
"train_batch_size": 1000, # 5000
"rollout_fragment_length": 50, # 100
"sgd_minibatch_size": 100, # 500
"vf_share_layers": False
},
)
# __sphinx_doc_end__
import numpy as np
import os
import PIL
import shutil
from stable_baselines3.ppo import MlpPolicy
from stable_baselines3 import PPO
import supersuit as ss
from flatland.contrib.interface import flatland_env
from flatland.contrib.utils import env_generators
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
import fnmatch
import wandb
"""
https://github.com/PettingZoo-Team/PettingZoo/blob/HEAD/tutorials/13_lines.py
"""
# Custom observation builder without predictor
# observation_builder = GlobalObsForRailEnv()
# Custom observation builder with predictor
observation_builder = TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv(30))
seed = 10
np.random.seed(seed)
wandb_log = False
experiment_name = "flatland_pettingzoo"
try:
if os.path.isdir(experiment_name):
shutil.rmtree(experiment_name)
os.mkdir(experiment_name)
except OSError as e:
print("Error: %s - %s." % (e.filename, e.strerror))
# rail_env = env_generators.sparse_env_small(seed, observation_builder)
rail_env = env_generators.small_v0(seed, observation_builder)
# __sphinx_doc_begin__
env = flatland_env.parallel_env(environment=rail_env, use_renderer=False)
# env = flatland_env.env(environment = rail_env, use_renderer = False)
if wandb_log:
run = wandb.init(project="flatland2021", entity="nilabha2007", sync_tensorboard=True,
config={}, name=experiment_name, save_code=True)
env_steps = 1000 # 2 * env.width * env.height # Code uses 1.5 to calculate max_steps
rollout_fragment_length = 50
env = ss.pettingzoo_env_to_vec_env_v0(env)
# env.black_death = True
env = ss.concat_vec_envs_v0(env, 1, num_cpus=1, base_class='stable_baselines3')
model = PPO(MlpPolicy, env, tensorboard_log=f"/tmp/{experiment_name}", verbose=3, gamma=0.95,
n_steps=rollout_fragment_length, ent_coef=0.01,
learning_rate=5e-5, vf_coef=1, max_grad_norm=0.9, gae_lambda=1.0, n_epochs=30, clip_range=0.3,
batch_size=150, seed=seed)
# wandb.watch(model.policy.action_net,log='all', log_freq = 1)
# wandb.watch(model.policy.value_net, log='all', log_freq = 1)
train_timesteps = 100000
model.learn(total_timesteps=train_timesteps)
model.save(f"policy_flatland_{train_timesteps}")
# __sphinx_doc_end__
model = PPO.load(f"policy_flatland_{train_timesteps}")
env = flatland_env.env(environment=rail_env, use_renderer=True)
if wandb_log:
artifact = wandb.Artifact('model', type='model')
artifact.add_file(f'policy_flatland_{train_timesteps}.zip')
run.log_artifact(artifact)
# Model Interference
seed = 100
env.reset(random_seed=seed)
step = 0
ep_no = 0
frame_list = []
while ep_no < 1:
for agent in env.agent_iter():
obs, reward, done, info = env.last()
act = model.predict(obs, deterministic=True)[0] if not done else None
env.step(act)
frame_list.append(PIL.Image.fromarray(env.render(mode='rgb_array')))
step += 1
if step % 100 == 0:
print(f"env step:{step} and action taken:{act}")
completion = env_generators.perc_completion(env)
print("Agents Completed:", completion)
completion = env_generators.perc_completion(env)
print("Final Agents Completed:", completion)
ep_no += 1
frame_list[0].save(f"{experiment_name}{os.sep}pettyzoo_out_{ep_no}.gif", save_all=True,
append_images=frame_list[1:], duration=3, loop=0)
frame_list = []
env.close()
env.reset(random_seed=seed+ep_no)
def find(pattern, path):
result = []
for root, dirs, files in os.walk(path):
for name in files:
if fnmatch.fnmatch(name, pattern):
result.append(os.path.join(root, name))
return result
if wandb_log:
extn = "gif"
_video_file = f'*.{extn}'
_found_videos = find(_video_file, experiment_name)
print(_found_videos)
for _found_video in _found_videos:
wandb.log({_found_video: wandb.Video(_found_video, format=extn)})
run.join()
from typing import List
from flatland.envs.agent_utils import EnvAgent
from flatland.envs.step_utils.states import TrainState
from flatland.core.grid.grid4_utils import get_new_position
from flatland.envs.step_utils import env_utils
class Deadlock_Checker:
def __init__(self, env):
self.env = env
self.deadlocked_agents = []
self.immediate_deadlocked = []
def reset(self) -> None:
self.deadlocked_agents = []
self.immediate_deadlocked = []
# an immediate deadlock consists of two trains "trying to pass through each other".
# An agent may have a free possible transition, but took a bad action and "ran into another train". This is now a deadlock, and the other free
# direction can not be chosen anymore!
def check_immediate_deadlocks(self, action_dict) -> List[EnvAgent]:
"""
output: list of agents who are in immediate deadlocks
"""
env = self.env
newly_deadlocked_agents = []
# TODO: check restrictions to relevant agents (status ACTIVE, etc.)
relevant_agents = [agent for agent in env.agents if agent.state != TrainState.DONE and agent.position is not None]
for agent in relevant_agents:
other_agents = [other_agent for other_agent in env.agents if other_agent != agent] # check if this is a good test for inequality. Maybe use handles...
# get the transitions the agent can take from his current position and orientation
# an indicator array of the form e.g. (0,1,1,0) meaning that he can only go to east and south, not to north and west.
possible_transitions = env.rail.get_transitions(*agent.position, agent.direction)
#print(f"possible transitions: {possible_transitions}")
# the directions are: 0(north), 1(east), 2(south) and 3(west)
#possible_directions = [direction for direction, flag in enumerate(possible_transitions) if flag == 1]
#print(f"possible directions: {possible_directions}")
################### only consider direction for actually chosen action ###############################
new_position, new_direction = env_utils.apply_action_independent(action=action_dict[agent.handle], rail=env.rail, position=agent.position, direction=agent.direction)
#assert new_direction in possible_directions, "Error, action leads to impossible direction"
assert new_position == get_new_position(agent.position, new_direction), "Error, something is wrong with new position"
opposed_agent_id = env.agent_positions[new_position] # TODO: check that agent_positions now works correctly in flatland V3 (i.e. gets correctly updated...)
# agent_positions[cell] is an agent_id if an agent is there, otherwise -1.
if opposed_agent_id != -1:
opposed_agent = env.agents[opposed_agent_id]
# other agent with opposing direction is in the way --> deadlock
# an opposing direction means having a different direction than our agent would have if he moved to the new cell. (180 degrees or 90 degrees to our agent)
if opposed_agent.direction != new_direction:
if agent not in newly_deadlocked_agents: # to avoid duplicates
newly_deadlocked_agents.append(agent)
if opposed_agent not in newly_deadlocked_agents: # to avoid duplicates
newly_deadlocked_agents.append(opposed_agent)
self.immediate_deadlocked = newly_deadlocked_agents
return newly_deadlocked_agents
# main method to check for all deadlocks
def check_deadlocks(self, action_dict) -> List[EnvAgent]:
env = self.env
relevant_agents = [agent for agent in env.agents if agent.state != TrainState.DONE and agent.position is not None]
immediate_deadlocked = self.check_immediate_deadlocks(action_dict)
self.immediate_deadlocked = immediate_deadlocked
deadlocked = immediate_deadlocked[:]
# now we have to "close": each train which is blocked by another deadlocked train becomes deadlocked itself.
still_changing = True
while still_changing:
still_changing = False # will be overwritten below if a change did occur
# check if for any agent, there is a new deadlock found
for agent in relevant_agents:
#possible_transitions = env.rail.get_transitions(*agent.position, agent.direction)
#print(f"possible transitions: {possible_transitions}")
# the directions are: 0 (north), 1(east), 2(south) and 3(west)
#possible_directions = [direction for direction, flag in enumerate(possible_transitions) if flag == 1]
#print(f"possible directions: {possible_directions}")
new_position, new_direction = env_utils.apply_action_independent(action=action_dict[agent.handle], rail=env.rail, position=agent.position, direction=agent.direction)
#assert new_direction in possible_directions, "Error, action leads to impossible direction"
assert new_position == get_new_position(agent.position, new_direction), "Error, something is wrong with new position"
opposed_agent_id = env.agent_positions[new_position]
if opposed_agent_id != -1: # there is an opposed agent there
opposed_agent = env.agents[opposed_agent_id]
if opposed_agent in deadlocked:
if agent not in deadlocked: # to avoid duplicates
deadlocked.append(agent)
still_changing = True
self.deadlocked_agents = deadlocked
return deadlocked
\ No newline at end of file
import logging
import random
import numpy as np
from typing import NamedTuple
from flatland.envs.malfunction_generators import malfunction_from_params, MalfunctionParameters, ParamMalfunctionGen, no_malfunction_generator
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.agent_utils import TrainState
from flatland.core.grid.grid4_utils import get_new_position
from flatland.envs.fast_methods import fast_count_nonzero, fast_argmax
MalfunctionParameters = NamedTuple('MalfunctionParameters', [('malfunction_rate', float), ('min_duration', int), ('max_duration', int)])
def get_shortest_path_action(env,handle):
distance_map = env.distance_map.get()
agent = env.agents[handle]
if agent.status in [TrainState.WAITING, TrainState.READY_TO_DEPART,
TrainState.MALFUNCTION_OFF_MAP]:
agent_virtual_position = agent.initial_position
elif agent.status in [TrainState.MALFUNCTION, TrainState.MOVING, TrainState.STOPPED]:
agent_virtual_position = agent.position
elif agent.status == TrainState.DONE:
agent_virtual_position = agent.target
else:
return None
if agent.position:
possible_transitions = env.rail.get_transitions(
*agent.position, agent.direction)
else:
possible_transitions = env.rail.get_transitions(
*agent.initial_position, agent.direction)
num_transitions = fast_count_nonzero(possible_transitions)
min_distances = []
for direction in [(agent.direction + i) % 4 for i in range(-1, 2)]:
if possible_transitions[direction]:
new_position = get_new_position(
agent_virtual_position, direction)
min_distances.append(
distance_map[handle, new_position[0],
new_position[1], direction])
else:
min_distances.append(np.inf)
if num_transitions == 1:
observation = [0, 1, 0]
elif num_transitions == 2:
idx = np.argpartition(np.array(min_distances), 2)
observation = [0, 0, 0]
observation[idx[0]] = 1
return fast_argmax(observation) + 1
def small_v0(random_seed, observation_builder, max_width = 35, max_height = 35):
random.seed(random_seed)
width = 30
height = 30
nr_trains = 5
max_num_cities = 4
grid_mode = False
max_rails_between_cities = 2
max_rails_in_city = 3
malfunction_rate = 0
malfunction_min_duration = 0
malfunction_max_duration = 0
rail_generator = sparse_rail_generator(max_num_cities=max_num_cities, seed=random_seed, grid_mode=False,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_city)
stochastic_data = MalfunctionParameters(malfunction_rate=malfunction_rate, # Rate of malfunction occurence
min_duration=malfunction_min_duration, # Minimal duration of malfunction
max_duration=malfunction_max_duration # Max duration of malfunction
)
speed_ratio_map = None
line_generator = sparse_line_generator(speed_ratio_map)
malfunction_generator = no_malfunction_generator()
while width <= max_width and height <= max_height:
try:
env = RailEnv(width=width, height=height, rail_generator=rail_generator,
line_generator=line_generator, number_of_agents=nr_trains,
# malfunction_generator_and_process_data=malfunction_from_params(stochastic_data),
malfunction_generator_and_process_data=malfunction_generator,
obs_builder_object=observation_builder, remove_agents_at_target=False)
print("[{}] {}x{} {} cities {} trains, max {} rails between cities, max {} rails in cities. Malfunction rate {}, {} to {} steps.".format(
random_seed, width, height, max_num_cities, nr_trains, max_rails_between_cities,
max_rails_in_city, malfunction_rate, malfunction_min_duration, malfunction_max_duration
))
return env
except ValueError as e:
logging.error(f"Error: {e}")
width += 5
height += 5
logging.info("Try again with larger env: (w,h):", width, height)
logging.error(f"Unable to generate env with seed={random_seed}, max_width={max_height}, max_height={max_height}")
return None
def random_sparse_env_small(random_seed, observation_builder, max_width = 45, max_height = 45):
random.seed(random_seed)
size = random.randint(0, 5)
width = 20 + size * 5
height = 20 + size * 5
nr_cities = 2 + size // 2 + random.randint(0, 2)
nr_trains = min(nr_cities * 5, 5 + random.randint(0, 5)) # , 10 + random.randint(0, 10))
max_rails_between_cities = 2
max_rails_in_cities = 3 + random.randint(0, size)
malfunction_rate = 30 + random.randint(0, 100)
malfunction_min_duration = 3 + random.randint(0, 7)
malfunction_max_duration = 20 + random.randint(0, 80)
rail_generator = sparse_rail_generator(max_num_cities=nr_cities, seed=random_seed, grid_mode=False,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rails_in_cities)
stochastic_data = MalfunctionParameters(malfunction_rate=malfunction_rate, # Rate of malfunction occurence
min_duration=malfunction_min_duration, # Minimal duration of malfunction
max_duration=malfunction_max_duration # Max duration of malfunction
)
line_generator = sparse_line_generator({1.: 0.25, 1. / 2.: 0.25, 1. / 3.: 0.25, 1. / 4.: 0.25})
while width <= max_width and height <= max_height:
try:
env = RailEnv(width=width, height=height, rail_generator=rail_generator,
line_generator=line_generator, number_of_agents=nr_trains,
# malfunction_generator_and_process_data=malfunction_from_params(stochastic_data),
malfunction_generator=ParamMalfunctionGen(stochastic_data),
obs_builder_object=observation_builder, remove_agents_at_target=False)
print("[{}] {}x{} {} cities {} trains, max {} rails between cities, max {} rails in cities. Malfunction rate {}, {} to {} steps.".format(
random_seed, width, height, nr_cities, nr_trains, max_rails_between_cities,
max_rails_in_cities, malfunction_rate, malfunction_min_duration, malfunction_max_duration
))
return env
except ValueError as e:
logging.error(f"Error: {e}")
width += 5
height += 5
logging.info("Try again with larger env: (w,h):", width, height)
logging.error(f"Unable to generate env with seed={random_seed}, max_width={max_height}, max_height={max_height}")
return None
def sparse_env_small(random_seed, observation_builder):
width = 30 # With of map
height = 30 # Height of map
nr_trains = 2 # Number of trains that have an assigned task in the env
cities_in_map = 3 # Number of cities where agents can start or end
seed = 10 # Random seed
grid_distribution_of_cities = False # Type of city distribution, if False cities are randomly placed
max_rails_between_cities = 2 # Max number of tracks allowed between cities. This is number of entry point to a city
max_rail_in_cities = 6 # Max number of parallel tracks within a city, representing a realistic trainstation
rail_generator = sparse_rail_generator(max_num_cities=cities_in_map,
seed=seed,
grid_mode=grid_distribution_of_cities,
max_rails_between_cities=max_rails_between_cities,
max_rail_pairs_in_city=max_rail_in_cities,
)
# Different agent types (trains) with different speeds.
speed_ration_map = {1.: 0.25, # Fast passenger train
1. / 2.: 0.25, # Fast freight train
1. / 3.: 0.25, # Slow commuter train
1. / 4.: 0.25} # Slow freight train
# We can now initiate the schedule generator with the given speed profiles
line_generator = sparse_rail_generator(speed_ration_map)
# We can furthermore pass stochastic data to the RailEnv constructor which will allow for stochastic malfunctions
# during an episode.
stochastic_data = MalfunctionParameters(malfunction_rate=1/10000, # Rate of malfunction occurence
min_duration=15, # Minimal duration of malfunction
max_duration=50 # Max duration of malfunction
)
rail_env = RailEnv(width=width,
height=height,
rail_generator=rail_generator,
line_generator=line_generator,
number_of_agents=nr_trains,
obs_builder_object=observation_builder,
# malfunction_generator_and_process_data=malfunction_from_params(stochastic_data),
malfunction_generator=ParamMalfunctionGen(stochastic_data),
remove_agents_at_target=True)
return rail_env
def _after_step(self, observation, reward, done, info):
if not self.enabled: return done
if type(done)== dict:
_done_check = done['__all__']
else:
_done_check = done
if _done_check and self.env_semantics_autoreset:
# For envs with BlockingReset wrapping VNCEnv, this observation will be the first one of the new episode
self.reset_video_recorder()
self.episode_id += 1
self._flush()
# Record stats - Disabled as it causes error in multi-agent set up
# self.stats_recorder.after_step(observation, reward, done, info)
# Record video
self.video_recorder.capture_frame()
return done
def perc_completion(env):
tasks_finished = 0
if hasattr(env, "agents_data"):
agent_data = env.agents_data
else:
agent_data = env.agents
for current_agent in agent_data:
if current_agent.status == TrainState.DONE:
tasks_finished += 1
return 100 * np.mean(tasks_finished / max(
1, len(agent_data)))
from collections import defaultdict
from typing import Dict, Tuple
from flatland.contrib.utils.deadlock_checker import Deadlock_Checker
from flatland.core.grid.grid4_utils import get_new_position
from flatland.envs.agent_utils import EnvAgent
from flatland.envs.fast_methods import fast_count_nonzero
from flatland.envs.rail_env import RailEnv, RailEnvActions
from flatland.envs.step_utils.states import TrainState
def possible_actions_sorted_by_distance(env: RailEnv, handle: int):
agent = env.agents[handle]
if agent.state == TrainState.READY_TO_DEPART:
agent_virtual_position = agent.initial_position
elif agent.state.is_on_map_state():
agent_virtual_position = agent.position
else:
print("no action possible!")
print("agent state: ", agent.state)
# NEW: if agent is at target, DO_NOTHING, and distance is zero.
# NEW: (needs to be tested...)
return [(RailEnvActions.DO_NOTHING, 0)] * 2
possible_transitions = env.rail.get_transitions(*agent_virtual_position, agent.direction)
print(f"possible transitions: {possible_transitions}")
distance_map = env.distance_map.get()[handle]
possible_steps = []
for movement in list(range(4)):
if possible_transitions[movement]:
if movement == agent.direction:
action = RailEnvActions.MOVE_FORWARD
elif movement == (agent.direction + 1) % 4:
action = RailEnvActions.MOVE_RIGHT
elif movement == (agent.direction - 1) % 4:
action = RailEnvActions.MOVE_LEFT
else:
print(f"An error occured. movement is: {movement}, agent direction is: {agent.direction}")
if movement == (agent.direction + 2) % 4 or (movement == agent.direction - 2) % 4:
print("it seems that we are turning by 180 degrees. Turning in a dead end?")
action = RailEnvActions.MOVE_FORWARD
distance = distance_map[get_new_position(agent_virtual_position, movement) + (movement,)]
possible_steps.append((action, distance))
possible_steps = sorted(possible_steps, key=lambda step: step[1])
# if there is only one path to target, this is both the shortest one and the second shortest path.
if len(possible_steps) == 1:
return possible_steps * 2
else:
return possible_steps
class RailEnvWrapper:
def __init__(self, env:RailEnv):
self.env = env
assert self.env is not None
assert self.env.rail is not None, "Reset original environment first!"
assert self.env.agents is not None, "Reset original environment first!"
assert len(self.env.agents) > 0, "Reset original environment first!"
# @property
# def number_of_agents(self):
# return self.env.number_of_agents
# @property
# def agents(self):
# return self.env.agents
# @property
# def _seed(self):
# return self.env._seed
# @property
# def obs_builder(self):
# return self.env.obs_builder
def __getattr__(self, name):
try:
return super().__getattr__(self,name)
except:
"""Expose any other attributes of the underlying environment."""
return getattr(self.env, name)
@property
def rail(self):
return self.env.rail
@property
def width(self):
return self.env.width
@property
def height(self):
return self.env.height
@property
def agent_positions(self):
return self.env.agent_positions
def get_num_agents(self):
return self.env.get_num_agents()
def get_agent_handles(self):
return self.env.get_agent_handles()
def step(self, action_dict: Dict[int, RailEnvActions]):
return self.env.step(action_dict)
def reset(self, **kwargs):
obs, info = self.env.reset(**kwargs)
return obs, info
class ShortestPathActionWrapper(RailEnvWrapper):
def __init__(self, env:RailEnv):
super().__init__(env)
def step(self, action_dict: Dict[int, RailEnvActions]) -> Tuple[Dict, Dict, Dict, Dict]:
# input: action dict with actions in [0, 1, 2].
transformed_action_dict = {}
for agent_id, action in action_dict.items():
if action == 0:
transformed_action_dict[agent_id] = action
else:
#assert action in [1, 2]
#assert possible_actions_sorted_by_distance(self.env, agent_id) is not None
#assert possible_actions_sorted_by_distance(self.env, agent_id)[action - 1] is not None
transformed_action_dict[agent_id] = possible_actions_sorted_by_distance(self.env, agent_id)[action - 1][0]
obs, rewards, dones, info = self.env.step(transformed_action_dict)
return obs, rewards, dones, info
def find_all_cells_where_agent_can_choose(env: RailEnv):
"""
input: a RailEnv (or something which behaves similarly, e.g. a wrapped RailEnv),
WHICH HAS BEEN RESET ALREADY!
(o.w., we call env.rail, which is None before reset(), and crash.)
"""
switches = []
switches_neighbors = []
directions = list(range(4))
for h in range(env.height):
for w in range(env.width):
pos = (h, w)
is_switch = False
# Check for switch: if there is more than one outgoing transition
for orientation in directions:
possible_transitions = env.rail.get_transitions(*pos, orientation)
num_transitions = fast_count_nonzero(possible_transitions)
if num_transitions > 1:
switches.append(pos)
is_switch = True
break
if is_switch:
# Add all neighbouring rails, if pos is a switch
for orientation in directions:
possible_transitions = env.rail.get_transitions(*pos, orientation)
for movement in directions:
if possible_transitions[movement]:
switches_neighbors.append(get_new_position(pos, movement))
decision_cells = switches + switches_neighbors
return tuple(map(set, (switches, switches_neighbors, decision_cells)))
class SkipNoChoiceCellsWrapper(RailEnvWrapper):
# env can be a real RailEnv, or anything that shares the same interface
# e.g. obs, rewards, dones, info = env.step(action_dict) and obs, info = env.reset(), and so on.
def __init__(self, env:RailEnv, accumulate_skipped_rewards: bool, discounting: float) -> None:
super().__init__(env)
# save these so they can be inspected easier.
self.accumulate_skipped_rewards = accumulate_skipped_rewards
self.discounting = discounting
self.switches = None
self.switches_neighbors = None
self.decision_cells = None
self.skipped_rewards = defaultdict(list)
# sets initial values for switches, decision_cells, etc.
self.reset_cells()
def on_decision_cell(self, agent: EnvAgent) -> bool:
return agent.position is None or agent.position == agent.initial_position or agent.position in self.decision_cells
def on_switch(self, agent: EnvAgent) -> bool:
return agent.position in self.switches
def next_to_switch(self, agent: EnvAgent) -> bool:
return agent.position in self.switches_neighbors
def reset_cells(self) -> None:
self.switches, self.switches_neighbors, self.decision_cells = find_all_cells_where_agent_can_choose(self.env)
def step(self, action_dict: Dict[int, RailEnvActions]) -> Tuple[Dict, Dict, Dict, Dict]:
o, r, d, i = {}, {}, {}, {}
# need to initialize i["..."]
# as we will access i["..."][agent_id]
i["action_required"] = dict()
i["malfunction"] = dict()
i["speed"] = dict()
i["state"] = dict()
while len(o) == 0:
obs, reward, done, info = self.env.step(action_dict)
for agent_id, agent_obs in obs.items():
if done[agent_id] or self.on_decision_cell(self.env.agents[agent_id]):
o[agent_id] = agent_obs
r[agent_id] = reward[agent_id]
d[agent_id] = done[agent_id]
i["action_required"][agent_id] = info["action_required"][agent_id]
i["malfunction"][agent_id] = info["malfunction"][agent_id]
i["speed"][agent_id] = info["speed"][agent_id]
i["state"][agent_id] = info["state"][agent_id]
if self.accumulate_skipped_rewards:
discounted_skipped_reward = r[agent_id]
for skipped_reward in reversed(self.skipped_rewards[agent_id]):
discounted_skipped_reward = self.discounting * discounted_skipped_reward + skipped_reward
r[agent_id] = discounted_skipped_reward
self.skipped_rewards[agent_id] = []
elif self.accumulate_skipped_rewards:
self.skipped_rewards[agent_id].append(reward[agent_id])
# end of for-loop
d['__all__'] = done['__all__']
action_dict = {}
# end of while-loop
return o, r, d, i
def reset(self, **kwargs) -> Tuple[Dict, Dict]:
obs, info = self.env.reset(**kwargs)
# resets decision cells, switches, etc. These can change with an env.reset(...)!
# needs to be done after env.reset().
self.reset_cells()
return obs, info
class DeadlockWrapper(RailEnvWrapper):
def __init__(self, env:RailEnv, deadlock_reward=-100) -> None:
super().__init__(env)
self.deadlock_reward = deadlock_reward
self.deadlock_checker = Deadlock_Checker(env=self.env)
@property
def deadlocked_agents(self):
return self.deadlock_checker.deadlocked_agents
@property
def immediate_deadlocks(self):
return [agent.handle for agent in self.deadlock_checker.immediate_deadlocked]
# make sure to assign the deadlock reward only once to each deadlocked agent...
def step(self, action_dict: Dict[int, RailEnvActions]) -> Tuple[Dict, Dict, Dict, Dict]:
# agents which are already deadlocked from previous steps
already_deadlocked_ids = [agent.handle for agent in self.deadlocked_agents]
# step environment
obs, rewards, dones, info = self.env.step(action_dict)
# compute new list of deadlocked agents (ids) after stepping the environment
deadlocked_agents = self.deadlock_checker.check_deadlocks(action_dict) # also stored in self.deadlocked_checker.deadlocked_agents
deadlocked_agents_ids = [agent.handle for agent in deadlocked_agents]
# immediate deadlocked ids only used for prints
immediate_deadlocked_ids = [agent.handle for agent in self.deadlock_checker.immediate_deadlocked]
print(f"immediate deadlocked: {immediate_deadlocked_ids}")
print(f"total deadlocked: {deadlocked_agents_ids}")
newly_deadlocked_agents_ids = [agent_id for agent_id in deadlocked_agents_ids if agent_id not in already_deadlocked_ids]
# assign deadlock rewards
for agent_id in newly_deadlocked_agents_ids:
print(f"assigning deadlock reward of {self.deadlock_reward} to agent {agent_id}")
rewards[agent_id] = self.deadlock_reward
return obs, rewards, dones, info
def reset(self, **kwargs) -> Tuple[Dict, Dict]:
self.deadlock_checker.reset() # sets all lists of deadlocked agents to empty list
obs, info = super().reset(**kwargs)
return obs, info
......@@ -11,11 +11,11 @@ class Environment:
Derived environments should implement the following attributes:
action_space: tuple with the dimensions of the actions to be passed to the step method
observation_space: tuple with the dimensions of the observations returned by reset and step
Agents are identified by agent ids (handles).
Examples:
>>> obs = env.reset()
>>> obs, info = env.reset()
>>> print(obs)
{
"train_0": [2.4, 1.6],
......@@ -40,18 +40,19 @@ class Environment:
"train_0": {}, # info for train_0
"train_1": {}, # info for train_1
}
"""
def __init__(self):
self.action_space = ()
self.observation_space = ()
pass
def reset(self):
"""
Resets the env and returns observations from agents in the environment.
Returns:
Returns
-------
obs : dict
New observations for each agent.
"""
......@@ -66,7 +67,7 @@ class Environment:
The returns are dicts mapping from agent_id strings to values.
Parameters
-------
----------
action_dict : dict
Dictionary of actions to execute, indexed by agent id.
......
......@@ -2,27 +2,29 @@
ObservationBuilder objects are objects that can be passed to environments designed for customizability.
The ObservationBuilder-derived custom classes implement 2 functions, reset() and get() or get(handle).
+ Reset() is called after each environment reset, to allow for pre-computing relevant data.
+ `reset()` is called after each environment reset, to allow for pre-computing relevant data.
+ `get()` is called whenever an observation has to be computed, potentially for each agent independently in case of \
multi-agent environments.
+ Get() is called whenever an observation has to be computed, potentially for each agent independently in
case of multi-agent environments.
"""
from typing import Optional, List
import numpy as np
from flatland.core.env import Environment
class ObservationBuilder:
"""
ObservationBuilder base class.
Derived objects must implement and `observation_space' attribute as a tuple with the dimensions of the returned
observations.
"""
def __init__(self):
self.observation_space = ()
self.env = None
def _set_env(self, env):
self.env = env
def set_env(self, env: Environment):
self.env: Environment = env
def reset(self):
"""
......@@ -30,35 +32,37 @@ class ObservationBuilder:
"""
raise NotImplementedError()
def get_many(self, handles=[]):
def get_many(self, handles: Optional[List[int]] = None):
"""
Called whenever an observation has to be computed for the `env' environment, for each agent with handle
in the `handles' list.
Called whenever an observation has to be computed for the `env` environment, for each agent with handle
in the `handles` list.
Parameters
-------
handles : list of handles (optional)
----------
handles : list of handles, optional
List with the handles of the agents for which to compute the observation vector.
Returns
-------
function
A dictionary of observation structures, specific to the corresponding environment, with handles from
`handles' as keys.
`handles` as keys.
"""
observations = {}
if handles is None:
handles = []
for h in handles:
observations[h] = self.get(h)
return observations
def get(self, handle=0):
def get(self, handle: int = 0):
"""
Called whenever an observation has to be computed for the `env' environment, possibly
for each agent independently (agent id `handle').
Called whenever an observation has to be computed for the `env` environment, possibly
for each agent independently (agent id `handle`).
Parameters
-------
handle : int (optional)
----------
handle : int, optional
Handle of the agent for which to compute the observation vector.
Returns
......@@ -73,3 +77,22 @@ class ObservationBuilder:
direction = np.zeros(4)
direction[agent.direction] = 1
return direction
class DummyObservationBuilder(ObservationBuilder):
"""
DummyObservationBuilder class which returns dummy observations
This is used in the evaluation service
"""
def __init__(self):
super().__init__()
def reset(self):
pass
def get_many(self, handles: Optional[List[int]] = None) -> bool:
return True
def get(self, handle: int = 0) -> bool:
return True
......@@ -3,11 +3,12 @@ PredictionBuilder objects are objects that can be passed to environments designe
The PredictionBuilder-derived custom classes implement 2 functions, reset() and get([handle]).
If predictions are not required in every step or not for all agents, then
+ Reset() is called after each environment reset, to allow for pre-computing relevant data.
+ `reset()` is called after each environment reset, to allow for pre-computing relevant data.
+ Get() is called whenever an step has to be computed, potentially for each agent independently in
+ `get()` is called whenever an step has to be computed, potentially for each agent independently in \
case of multi-agent environments.
"""
from flatland.core.env import Environment
class PredictionBuilder:
......@@ -18,8 +19,9 @@ class PredictionBuilder:
def __init__(self, max_depth: int = 20):
self.max_depth = max_depth
self.env = None
def _set_env(self, env):
def set_env(self, env: Environment):
self.env = env
def reset(self):
......@@ -28,16 +30,13 @@ class PredictionBuilder:
"""
pass
def get(self, custom_args=None, handle=0):
def get(self, handle: int = 0):
"""
Called whenever get_many in the observation build is called.
Parameters
-------
custom_args: dict
Implementation-dependent custom arguments, see the sub-classes.
handle : int (optional)
----------
handle : int, optional
Handle of the agent for which to compute the observation vector.
Returns
......
from enum import IntEnum
from typing import Type
from functools import lru_cache
from typing import Type, List
import numpy as np
from flatland.core.transitions import Transitions
# maxsize=None can be used because the number of possible transition is limited (16 bit encoded) and the
# direction/orientation is also limited (2bit). Where the 16bit are only sparse used = number of rail types
# Those methods can be cached -> the are independant of the railways (env)
@lru_cache(maxsize=128)
def fast_grid4_get_transitions(cell_transition, orientation):
bits = (cell_transition >> ((3 - orientation) * 4))
return ((bits >> 3) & 1, (bits >> 2) & 1, (bits >> 1) & 1, (bits) & 1)
@lru_cache(maxsize=128)
def fast_grid4_get_transition(cell_transition, orientation, direction):
return ((cell_transition >> ((4 - 1 - orientation) * 4)) >> (4 - 1 - direction)) & 1
@lru_cache(maxsize=128)
def fast_grid4_set_transitions(cell_transition, orientation, new_transitions):
mask = (1 << ((4 - orientation) * 4)) - (1 << ((3 - orientation) * 4))
negmask = ~mask
new_transitions = \
(new_transitions[0] & 1) << 3 | \
(new_transitions[1] & 1) << 2 | \
(new_transitions[2] & 1) << 1 | \
(new_transitions[3] & 1)
cell_transition = (cell_transition & negmask) | (new_transitions << ((3 - orientation) * 4))
return cell_transition
@lru_cache(maxsize=128)
def fast_grid4_remove_deadends(cell_transition):
"""
Remove all turn-arounds (e.g. N-S, S-N, E-W,...).
"""
maskDeadEnds = Grid4Transitions.maskDeadEnds()
cell_transition &= cell_transition & (~maskDeadEnds) & 0xffff
return cell_transition
@lru_cache(maxsize=128)
def fast_grid4_rotate_transition(cell_transition, rotation=0):
value = cell_transition
rotation = rotation // 90
for i in range(4):
block_tuple = fast_grid4_get_transitions(value, i)
block_tuple = block_tuple[(4 - rotation):] + block_tuple[:(4 - rotation)]
value = fast_grid4_set_transitions(value, i, block_tuple)
# Rotate the 4-bits blocks
value = ((value & (2 ** (rotation * 4) - 1)) << ((4 - rotation) * 4)) | (
value >> (rotation * 4))
cell_transition = value
return cell_transition
class Grid4TransitionsEnum(IntEnum):
NORTH = 0
EAST = 1
......@@ -24,9 +82,9 @@ class Grid4Transitions(Transitions):
"""
Grid4Transitions class derived from Transitions.
Special case of `Transitions' over a 2D-grid (FlatLand).
Special case of `Transitions` over a 2D-grid (FlatLand).
Transitions are possible to neighboring cells on the grid if allowed.
GridTransitions keeps track of valid transitions supplied as `transitions'
GridTransitions keeps track of valid transitions supplied as `transitions`
list, each represented as a bitmap of 16 bits.
Whether a transition is allowed or not depends on which direction an agent
......@@ -57,8 +115,11 @@ class Grid4Transitions(Transitions):
# row,col delta for each direction
self.gDir2dRC = np.array([[-1, 0], [0, 1], [1, 0], [0, -1]])
# These bits represent all the possible dead ends
self.maskDeadEnds = 0b0010000110000100
# These bits represent all the possible dead ends
@staticmethod
@lru_cache()
def maskDeadEnds():
return 0b0010000110000100
def get_type(self):
return np.uint16
......@@ -67,8 +128,8 @@ class Grid4Transitions(Transitions):
"""
Get the 4 possible transitions ((N,E,S,W), 4 elements tuple
if no diagonal transitions allowed) available for an agent oriented
in direction `orientation' and inside a cell with
transitions `cell_transition'.
in direction `orientation` and inside a cell with
transitions `cell_transition`.
Parameters
----------
......@@ -83,16 +144,15 @@ class Grid4Transitions(Transitions):
List of the validity of transitions in the cell.
"""
bits = (cell_transition >> ((3 - orientation) * 4))
return ((bits >> 3) & 1, (bits >> 2) & 1, (bits >> 1) & 1, (bits) & 1)
return fast_grid4_get_transitions(cell_transition, orientation)
def set_transitions(self, cell_transition, orientation, new_transitions):
"""
Set the possible transitions (e.g., (N,E,S,W), 4 elements tuple
if no diagonal transitions allowed) available for an agent
oriented in direction `orientation' and inside a cell with transitions
`cell_transition'. A new `cell_transition' is returned with
the specified bits replaced by `new_transitions'.
oriented in direction `orientation` and inside a cell with transitions
`cell_transition'. A new `cell_transition` is returned with
the specified bits replaced by `new_transitions`.
Parameters
----------
......@@ -107,28 +167,17 @@ class Grid4Transitions(Transitions):
-------
int
An updated bitmap that replaces the original transitions validity
of `cell_transition' with `new_transitions', for the appropriate
`orientation'.
of `cell_transition' with `new_transitions`, for the appropriate
`orientation`.
"""
mask = (1 << ((4 - orientation) * 4)) - (1 << ((3 - orientation) * 4))
negmask = ~mask
new_transitions = \
(new_transitions[0] & 1) << 3 | \
(new_transitions[1] & 1) << 2 | \
(new_transitions[2] & 1) << 1 | \
(new_transitions[3] & 1)
cell_transition = (cell_transition & negmask) | (new_transitions << ((3 - orientation) * 4))
return cell_transition
return fast_grid4_set_transitions(cell_transition, orientation, new_transitions)
def get_transition(self, cell_transition, orientation, direction):
"""
Get the transition bit (1 value) that determines whether an agent
oriented in direction `orientation' and inside a cell with transitions
`cell_transition' can move to the cell in direction `direction'
oriented in direction `orientation` and inside a cell with transitions
`cell_transition' can move to the cell in direction `direction`
relative to the current cell.
Parameters
......@@ -146,13 +195,14 @@ class Grid4Transitions(Transitions):
Validity of the requested transition: 0/1 allowed/not allowed.
"""
return ((cell_transition >> ((4 - 1 - orientation) * 4)) >> (4 - 1 - direction)) & 1
return fast_grid4_get_transition(cell_transition, orientation, direction)
def set_transition(self, cell_transition, orientation, direction, new_transition, remove_deadends=False):
def set_transition(self, cell_transition, orientation, direction, new_transition,
remove_deadends=False):
"""
Set the transition bit (1 value) that determines whether an agent
oriented in direction `orientation' and inside a cell with transitions
`cell_transition' can move to the cell in direction `direction'
oriented in direction `orientation` and inside a cell with transitions
`cell_transition' can move to the cell in direction `direction`
relative to the current cell.
Parameters
......@@ -171,8 +221,8 @@ class Grid4Transitions(Transitions):
-------
int
An updated bitmap that replaces the original transitions validity
of `cell_transition' with `new_transitions', for the appropriate
`orientation'.
of `cell_transition' with `new_transitions`, for the appropriate
`orientation`.
"""
if new_transition:
......@@ -181,7 +231,7 @@ class Grid4Transitions(Transitions):
cell_transition &= ~(1 << ((4 - 1 - orientation) * 4 + (4 - 1 - direction)))
if remove_deadends:
cell_transition = self.remove_deadends(cell_transition)
cell_transition = fast_grid4_remove_deadends(cell_transition)
return cell_transition
......@@ -196,7 +246,7 @@ class Grid4Transitions(Transitions):
16 bits used to encode the valid transitions for a cell.
rotation : int
Angle by which to clock-wise rotate the transition bits in
`cell_transition' by. I.e., rotation={0, 90, 180, 270} degrees.
`cell_transition` by. I.e., rotation={0, 90, 180, 270} degrees.
Returns
-------
......@@ -206,27 +256,18 @@ class Grid4Transitions(Transitions):
"""
# Rotate the individual bits in each block
value = cell_transition
rotation = rotation // 90
for i in range(4):
block_tuple = self.get_transitions(value, i)
block_tuple = block_tuple[(4 - rotation):] + block_tuple[:(4 - rotation)]
value = self.set_transitions(value, i, block_tuple)
# Rotate the 4-bits blocks
value = ((value & (2 ** (rotation * 4) - 1)) << ((4 - rotation) * 4)) | (value >> (rotation * 4))
cell_transition = value
return cell_transition
return fast_grid4_rotate_transition(cell_transition, rotation)
def get_direction_enum(self) -> Type[Grid4TransitionsEnum]:
return Grid4TransitionsEnum
def has_deadend(self, cell_transition):
@staticmethod
@lru_cache()
def has_deadend(cell_transition):
"""
Checks if one entry can only by exited by a turn-around.
"""
if cell_transition & self.maskDeadEnds > 0:
if cell_transition & Grid4Transitions.maskDeadEnds() > 0:
return True
else:
return False
......@@ -235,5 +276,9 @@ class Grid4Transitions(Transitions):
"""
Remove all turn-arounds (e.g. N-S, S-N, E-W,...).
"""
cell_transition &= cell_transition & (~self.maskDeadEnds) & 0xffff
return cell_transition
return fast_grid4_remove_deadends(cell_transition)
@staticmethod
@lru_cache()
def get_entry_directions(cell_transition) -> List[int]:
return [(cell_transition >> ((3 - orientation) * 4)) & 15 > 0 for orientation in range(4)]
from flatland.core.grid.grid4_utils import validate_new_transition
import numpy as np
from flatland.core.grid.grid_utils import IntVector2D, IntVector2DDistance
from flatland.core.grid.grid_utils import IntVector2DArray
from flatland.core.grid.grid_utils import Vec2dOperations as Vec2d
from flatland.core.transition_map import GridTransitionMap
from flatland.utils.ordered_set import OrderedSet
class AStarNode():
class AStarNode:
"""A node class for A* Pathfinding"""
def __init__(self, parent=None, pos=None):
def __init__(self, pos: IntVector2D, parent=None):
self.parent = parent
self.pos = pos
self.g = 0
self.h = 0
self.f = 0
self.pos: IntVector2D = pos
self.g = 0.0
self.h = 0.0
self.f = 0.0
def __eq__(self, other):
"""
Parameters
----------
other : AStarNode
"""
return self.pos == other.pos
def __hash__(self):
......@@ -25,16 +37,35 @@ class AStarNode():
self.f = other.f
def a_star(rail_trans, rail_array, start, end):
def a_star(grid_map: GridTransitionMap, start: IntVector2D, end: IntVector2D,
a_star_distance_function: IntVector2DDistance = Vec2d.get_manhattan_distance, avoid_rails=False,
respect_transition_validity=True, forbidden_cells: IntVector2DArray = None) -> IntVector2DArray:
"""
:param avoid_rails:
:param grid_map: Grid Map where the path is found in
:param start: Start positions as (row,column)
:param end: End position as (row,column)
:param a_star_distance_function: Define the distance function to use as heuristc:
-get_euclidean_distance
-get_manhattan_distance
-get_chebyshev_distance
:param respect_transition_validity: Whether or not a-star respect allowed transitions on the grid map.
- True: Respects the validity of transition. This generates valid paths, of no path if it cannot be found
- False: This always finds a path, but the path might be illegal and thus needs to be fixed afterwards
:param forbidden_cells: List of cells where the path cannot pass through. Used to avoid certain areas of Grid map
:return: IF a path is found a ordered list of al cells in path is returned
"""
"""
Returns a list of tuples as a path from the given start to end.
If no path is found, returns path to closest point to end.
"""
rail_shape = rail_array.shape
start_node = AStarNode(None, start)
end_node = AStarNode(None, end)
open_nodes = set()
closed_nodes = set()
rail_shape = grid_map.grid.shape
start_node = AStarNode(start, None)
end_node = AStarNode(end, None)
open_nodes = OrderedSet()
closed_nodes = OrderedSet()
open_nodes.add(start_node)
while len(open_nodes) > 0:
......@@ -58,6 +89,7 @@ def a_star(rail_trans, rail_array, start, end):
while current is not None:
path.append(current.pos)
current = current.parent
# return reversed path
return path[::-1]
......@@ -67,17 +99,28 @@ def a_star(rail_trans, rail_array, start, end):
prev_pos = current_node.parent.pos
else:
prev_pos = None
for new_pos in [(0, -1), (0, 1), (-1, 0), (1, 0)]:
node_pos = (current_node.pos[0] + new_pos[0], current_node.pos[1] + new_pos[1])
# update the "current" pos
node_pos: IntVector2D = Vec2d.add(current_node.pos, new_pos)
# is node_pos inside the grid?
if node_pos[0] >= rail_shape[0] or node_pos[0] < 0 or node_pos[1] >= rail_shape[1] or node_pos[1] < 0:
continue
# validate positions
if not validate_new_transition(rail_trans, rail_array, prev_pos, current_node.pos, node_pos, end_node.pos):
#
if not grid_map.validate_new_transition(prev_pos, current_node.pos, node_pos,
end_node.pos) and respect_transition_validity:
continue
# create new node
new_node = AStarNode(current_node, node_pos)
new_node = AStarNode(node_pos, current_node)
# Skip paths through forbidden regions if they are provided
if forbidden_cells is not None:
if node_pos in forbidden_cells and new_node != start_node and new_node != end_node:
continue
children.append(new_node)
# loop through children
......@@ -87,11 +130,12 @@ def a_star(rail_trans, rail_array, start, end):
continue
# create the f, g, and h values
child.g = current_node.g + 1
# this heuristic favors diagonal paths:
# child.h = ((child.pos[0] - end_node.pos[0]) ** 2) + ((child.pos[1] - end_node.pos[1]) ** 2) \# noqa: E800
child.g = current_node.g + 1.0
# this heuristic avoids diagonal paths
child.h = abs(child.pos[0] - end_node.pos[0]) + abs(child.pos[1] - end_node.pos[1])
if avoid_rails:
child.h = a_star_distance_function(child.pos, end_node.pos) + np.clip(grid_map.grid[child.pos], 0, 1)
else:
child.h = a_star_distance_function(child.pos, end_node.pos)
child.f = child.g + child.h
# already in the open list?
......