custom_observation_example.py 8.54 KB
Newer Older
1
import random
2
import time
3

4
5
6
import numpy as np

from flatland.core.env_observation_builder import ObservationBuilder
7
from flatland.core.grid.grid4_utils import get_new_position
8
9
10
from flatland.core.grid.grid_utils import coordinate_to_position
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
11
from flatland.envs.rail_env import RailEnv
u214892's avatar
u214892 committed
12
from flatland.envs.rail_generators import random_rail_generator, complex_rail_generator
13
from flatland.envs.schedule_generators import complex_schedule_generator
spiglerg's avatar
spiglerg committed
14
from flatland.utils.rendertools import RenderTool
15
16
17
18

random.seed(100)
np.random.seed(100)

spiglerg's avatar
spiglerg committed
19

spiglerg's avatar
spiglerg committed
20
21
22
23
24
class SimpleObs(ObservationBuilder):
    """
    Simplest observation builder. The object returns observation vectors with 5 identical components,
    all equal to the ID of the respective agent.
    """
u214892's avatar
u214892 committed
25

26
27
28
29
30
31
32
    def __init__(self):
        self.observation_space = [5]

    def reset(self):
        return

    def get(self, handle):
33
        observation = handle * np.ones((5,))
34
35
        return observation

spiglerg's avatar
spiglerg committed
36

37
38
39
40
env = RailEnv(width=7,
              height=7,
              rail_generator=random_rail_generator(),
              number_of_agents=3,
spiglerg's avatar
spiglerg committed
41
              obs_builder_object=SimpleObs())
42
43
44
45

# Print the observation vector for each agents
obs, all_rewards, done, _ = env.step({0: 0})
for i in range(env.get_num_agents()):
spiglerg's avatar
spiglerg committed
46
    print("Agent ", i, "'s observation: ", obs[i])
spiglerg's avatar
spiglerg committed
47
48
49
50
51
52
53
54
55
56
57
58


class SingleAgentNavigationObs(TreeObsForRailEnv):
    """
    We derive our bbservation builder from TreeObsForRailEnv, to exploit the existing implementation to compute
    the minimum distances from each grid node to each agent's target.

    We then build a representation vector with 3 binary components, indicating which of the 3 available directions
    for each agent (Left, Forward, Right) lead to the shortest path to its target.
    E.g., if taking the Left branch (if available) is the shortest route to the agent's target, the observation vector
    will be [1, 0, 0].
    """
u214892's avatar
u214892 committed
59

spiglerg's avatar
spiglerg committed
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
    def __init__(self):
        super().__init__(max_depth=0)
        self.observation_space = [3]

    def reset(self):
        # Recompute the distance map, if the environment has changed.
        super().reset()

    def get(self, handle):
        agent = self.env.agents[handle]

        possible_transitions = self.env.rail.get_transitions(*agent.position, agent.direction)
        num_transitions = np.count_nonzero(possible_transitions)

        # Start from the current orientation, and see which transitions are available;
        # organize them as [left, forward, right], relative to the current orientation
        # If only one transition is possible, the forward branch is aligned with it.
        if num_transitions == 1:
            observation = [0, 1, 0]
        else:
            min_distances = []
            for direction in [(agent.direction + i) % 4 for i in range(-1, 2)]:
                if possible_transitions[direction]:
83
                    new_position = get_new_position(agent.position, direction)
84
                    min_distances.append(self.env.distance_map.get()[handle, new_position[0], new_position[1], direction])
spiglerg's avatar
spiglerg committed
85
86
87
88
89
90
91
92
93
94
95
                else:
                    min_distances.append(np.inf)

            observation = [0, 0, 0]
            observation[np.argmin(min_distances)] = 1

        return observation


env = RailEnv(width=7,
              height=7,
96
              rail_generator=complex_rail_generator(nr_start_goal=10, nr_extra=1, min_dist=5, max_dist=99999, seed=0),
97
              schedule_generator=complex_schedule_generator(),
98
              number_of_agents=1,
spiglerg's avatar
spiglerg committed
99
100
              obs_builder_object=SingleAgentNavigationObs())

101
obs = env.reset()
102
103
104
env_renderer = RenderTool(env, gl="PILSVG")
env_renderer.render_env(show=True, frames=True, show_observations=True)
for step in range(100):
u214892's avatar
u214892 committed
105
106
    action = np.argmax(obs[0]) + 1
    obs, all_rewards, done, _ = env.step({0: action})
107
108
109
    print("Rewards: ", all_rewards, "  [done=", done, "]")
    env_renderer.render_env(show=True, frames=True, show_observations=True)
    time.sleep(0.1)
110
111
112
113
114
115
116
117
118
119
120
    if done["__all__"]:
        break
env_renderer.close_window()


class ObservePredictions(TreeObsForRailEnv):
    """
    We use the provided ShortestPathPredictor to illustrate the usage of predictors in your custom observation.

    We derive our observation builder from TreeObsForRailEnv, to exploit the existing implementation to compute
    the minimum distances from each grid node to each agent's target.
121

122
    This is necessary so that we can pass the distance map to the ShortestPathPredictor
spiglerg's avatar
spiglerg committed
123

124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
    Here we also want to highlight how you can visualize your observation
    """

    def __init__(self, predictor):
        super().__init__(max_depth=0)
        self.observation_space = [10]
        self.predictor = predictor

    def reset(self):
        # Recompute the distance map, if the environment has changed.
        super().reset()

    def get_many(self, handles=None):
        '''
        Because we do not want to call the predictor seperately for every agent we implement the get_many function
        Here we can call the predictor just ones for all the agents and use the predictions to generate our observations
        :param handles:
        :return:
        '''

144
        self.predictions = self.predictor.get()
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207

        self.predicted_pos = {}
        for t in range(len(self.predictions[0])):
            pos_list = []
            for a in handles:
                pos_list.append(self.predictions[a][t][1:3])
            # We transform (x,y) coodrinates to a single integer number for simpler comparison
            self.predicted_pos.update({t: coordinate_to_position(self.env.width, pos_list)})
        observations = {}

        # Collect all the different observation for all the agents
        for h in handles:
            observations[h] = self.get(h)
        return observations

    def get(self, handle):
        '''
        Lets write a simple observation which just indicates whether or not the own predicted path
        overlaps with other predicted paths at any time. This is useless for the task of navigation but might
        help when looking for conflicts. A more complex implementation can be found in the TreeObsForRailEnv class

        Each agent recieves an observation of length 10, where each element represents a prediction step and its value
        is:
         - 0 if no overlap is happening
         - 1 where n i the number of other paths crossing the predicted cell

        :param handle: handeled as an index of an agent
        :return: Observation of handle
        '''

        observation = np.zeros(10)

        # We are going to track what cells where considered while building the obervation and make them accesible
        # For rendering

        visited = set()
        for _idx in range(10):
            # Check if any of the other prediction overlap with agents own predictions
            x_coord = self.predictions[handle][_idx][1]
            y_coord = self.predictions[handle][_idx][2]

            # We add every observed cell to the observation rendering
            visited.add((x_coord, y_coord))
            if self.predicted_pos[_idx][handle] in np.delete(self.predicted_pos[_idx], handle, 0):
                # We detect if another agent is predicting to pass through the same cell at the same predicted time
                observation[handle] = 1

        # This variable will be access by the renderer to visualize the observation
        self.env.dev_obs_dict[handle] = visited

        return observation


# Initiate the Predictor
CustomPredictor = ShortestPathPredictorForRailEnv(10)

# Pass the Predictor to the observation builder
CustomObsBuilder = ObservePredictions(CustomPredictor)

# Initiate Environment
env = RailEnv(width=10,
              height=10,
              rail_generator=complex_rail_generator(nr_start_goal=5, nr_extra=1, min_dist=8, max_dist=99999, seed=0),
208
              schedule_generator=complex_schedule_generator(),
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
              number_of_agents=3,
              obs_builder_object=CustomObsBuilder)

obs = env.reset()
env_renderer = RenderTool(env, gl="PILSVG")

# We render the initial step and show the obsered cells as colored boxes
env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)

action_dict = {}
for step in range(100):
    for a in range(env.get_num_agents()):
        action = np.random.randint(0, 5)
        action_dict[a] = action
    obs, all_rewards, done, _ = env.step(action_dict)
    print("Rewards: ", all_rewards, "  [done=", done, "]")
    env_renderer.render_env(show=True, frames=True, show_observations=True, show_predictions=False)
    time.sleep(0.5)