import random import time import numpy as np from flatland.core.env_observation_builder import ObservationBuilder from flatland.core.grid.grid_utils import coordinate_to_position from flatland.envs.observations import TreeObsForRailEnv from flatland.envs.predictions import ShortestPathPredictorForRailEnv from flatland.envs.rail_env import RailEnv from flatland.envs.rail_generators import random_rail_generator, complex_rail_generator from flatland.envs.schedule_generators import complex_schedule_generator from flatland.utils.rendertools import RenderTool random.seed(100) np.random.seed(100) class SimpleObs(ObservationBuilder): """ Simplest observation builder. The object returns observation vectors with 5 identical components, all equal to the ID of the respective agent. """ def __init__(self): self.observation_space = [5] def reset(self): return def get(self, handle): observation = handle * np.ones((5,)) return observation env = RailEnv(width=7, height=7, rail_generator=random_rail_generator(), number_of_agents=3, obs_builder_object=SimpleObs()) # Print the observation vector for each agents obs, all_rewards, done, _ = env.step({0: 0}) for i in range(env.get_num_agents()): print("Agent ", i, "'s observation: ", obs[i]) class SingleAgentNavigationObs(TreeObsForRailEnv): """ We derive our bbservation builder from TreeObsForRailEnv, to exploit the existing implementation to compute the minimum distances from each grid node to each agent's target. We then build a representation vector with 3 binary components, indicating which of the 3 available directions for each agent (Left, Forward, Right) lead to the shortest path to its target. E.g., if taking the Left branch (if available) is the shortest route to the agent's target, the observation vector will be [1, 0, 0]. """ def __init__(self): super().__init__(max_depth=0) self.observation_space = [3] def reset(self): # Recompute the distance map, if the environment has changed. super().reset() def get(self, handle): agent = self.env.agents[handle] possible_transitions = self.env.rail.get_transitions(*agent.position, agent.direction) num_transitions = np.count_nonzero(possible_transitions) # Start from the current orientation, and see which transitions are available; # organize them as [left, forward, right], relative to the current orientation # If only one transition is possible, the forward branch is aligned with it. if num_transitions == 1: observation = [0, 1, 0] else: min_distances = [] for direction in [(agent.direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[direction]: new_position = self._new_position(agent.position, direction) min_distances.append(self.distance_map[handle, new_position[0], new_position[1], direction]) else: min_distances.append(np.inf) observation = [0, 0, 0] observation[np.argmin(min_distances)] = 1 return observation env = RailEnv(width=7, height=7, rail_generator=complex_rail_generator(nr_start_goal=10, nr_extra=1, min_dist=5, max_dist=99999, seed=0), schedule_generator=complex_schedule_generator(), number_of_agents=1, obs_builder_object=SingleAgentNavigationObs()) obs = env.reset() env_renderer = RenderTool(env, gl="PILSVG") env_renderer.render_env(show=True, frames=True, show_observations=True) for step in range(100): action = np.argmax(obs[0]) + 1 obs, all_rewards, done, _ = env.step({0: action}) print("Rewards: ", all_rewards, " [done=", done, "]") env_renderer.render_env(show=True, frames=True, show_observations=True) time.sleep(0.1) if done["__all__"]: break env_renderer.close_window() class ObservePredictions(TreeObsForRailEnv): """ We use the provided ShortestPathPredictor to illustrate the usage of predictors in your custom observation. We derive our observation builder from TreeObsForRailEnv, to exploit the existing implementation to compute the minimum distances from each grid node to each agent's target. This is necessary so that we can pass the distance map to the ShortestPathPredictor Here we also want to highlight how you can visualize your observation """ def __init__(self, predictor): super().__init__(max_depth=0) self.observation_space = [10] self.predictor = predictor def reset(self): # Recompute the distance map, if the environment has changed. super().reset() def get_many(self, handles=None): ''' Because we do not want to call the predictor seperately for every agent we implement the get_many function Here we can call the predictor just ones for all the agents and use the predictions to generate our observations :param handles: :return: ''' self.predictions = self.predictor.get(custom_args={'distance_map': self.distance_map}) self.predicted_pos = {} for t in range(len(self.predictions[0])): pos_list = [] for a in handles: pos_list.append(self.predictions[a][t][1:3]) # We transform (x,y) coodrinates to a single integer number for simpler comparison self.predicted_pos.update({t: coordinate_to_position(self.env.width, pos_list)}) observations = {} # Collect all the different observation for all the agents for h in handles: observations[h] = self.get(h) return observations def get(self, handle): ''' Lets write a simple observation which just indicates whether or not the own predicted path overlaps with other predicted paths at any time. This is useless for the task of navigation but might help when looking for conflicts. A more complex implementation can be found in the TreeObsForRailEnv class Each agent recieves an observation of length 10, where each element represents a prediction step and its value is: - 0 if no overlap is happening - 1 where n i the number of other paths crossing the predicted cell :param handle: handeled as an index of an agent :return: Observation of handle ''' observation = np.zeros(10) # We are going to track what cells where considered while building the obervation and make them accesible # For rendering visited = set() for _idx in range(10): # Check if any of the other prediction overlap with agents own predictions x_coord = self.predictions[handle][_idx][1] y_coord = self.predictions[handle][_idx][2] # We add every observed cell to the observation rendering visited.add((x_coord, y_coord)) if self.predicted_pos[_idx][handle] in np.delete(self.predicted_pos[_idx], handle, 0): # We detect if another agent is predicting to pass through the same cell at the same predicted time observation[handle] = 1 # This variable will be access by the renderer to visualize the observation self.env.dev_obs_dict[handle] = visited return observation # Initiate the Predictor CustomPredictor = ShortestPathPredictorForRailEnv(10) # Pass the Predictor to the observation builder CustomObsBuilder = ObservePredictions(CustomPredictor) # Initiate Environment env = RailEnv(width=10, height=10, rail_generator=complex_rail_generator(nr_start_goal=5, nr_extra=1, min_dist=8, max_dist=99999, seed=0), schedule_generator=complex_schedule_generator(), number_of_agents=3, obs_builder_object=CustomObsBuilder, save_episodes=True) obs = env.reset() env_renderer = RenderTool(env, gl="PILSVG") # We render the initial step and show the obsered cells as colored boxes env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False) action_dict = {} for step in range(100): for a in range(env.get_num_agents()): action = np.random.randint(0, 5) action_dict[a] = action obs, all_rewards, done, _ = env.step(action_dict) print("Rewards: ", all_rewards, " [done=", done, "]") env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False) time.sleep(0.01) sFilename = "saved_episode_{:}x{:}.mpk".format(*env.rail.grid.shape) env.save(sFilename)