Newer
Older
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import random
import numpy as np
from flatland.envs.generators import complex_rail_generator
# from flatland.envs.generators import rail_from_list_of_saved_GridTransitionMap_generator
from flatland.envs.generators import random_rail_generator
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
# ensure that every demo run behave constantly equal
random.seed(1)
np.random.seed(1)
class Scenario_Generator:
@staticmethod
def generate_random_scenario(number_of_agents=3):
# Example generate a rail given a manual specification,
# a map of tuples (cell_type, rotation)
transition_probability = [15, # empty cell - Case 0
5, # Case 1 - straight
5, # Case 2 - simple switch
1, # Case 3 - diamond crossing
1, # Case 4 - single slip
1, # Case 5 - double slip
1, # Case 6 - symmetrical
0, # Case 7 - dead end
1, # Case 1b (8) - simple turn right
1, # Case 1c (9) - simple turn left
1] # Case 2b (10) - simple switch mirrored
# Example generate a random rail
env = RailEnv(width=20,
height=20,
rail_generator=random_rail_generator(cell_type_relative_proportion=transition_probability),
number_of_agents=number_of_agents)
return env
@staticmethod
def generate_complex_scenario(number_of_agents=3):
env = RailEnv(width=15,
height=15,
rail_generator=complex_rail_generator(nr_start_goal=6, nr_extra=30, min_dist=10,
max_dist=99999, seed=0),
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
number_of_agents=number_of_agents)
return env
@staticmethod
def load_scenario(filename, number_of_agents=3):
env = RailEnv(width=2 * (1 + number_of_agents),
height=1 + number_of_agents)
"""
env = RailEnv(width=20,
height=20,
rail_generator=rail_from_list_of_saved_GridTransitionMap_generator(
[filename]),
number_of_agents=number_of_agents)
"""
if os.path.exists(filename):
print("load file: ", filename)
env.load(filename)
env.reset(False, False)
else:
print("File does not exist:", filename, " Working directory: ", os.getcwd())
return env
def max_lt(seq, val):
"""
Return greatest item in seq for which item < val applies.
None is returned if seq was empty or all items in seq were >= val.
"""
max = 0
idx = len(seq) - 1
while idx >= 0:
if seq[idx] < val and seq[idx] >= 0 and seq[idx] > max:
max = seq[idx]
idx -= 1
return max
def min_lt(seq, val):
"""
Return smallest item in seq for which item > val applies.
None is returned if seq was empty or all items in seq were >= val.
"""
min = np.inf
idx = len(seq) - 1
while idx >= 0:
if seq[idx] > val and seq[idx] < min:
min = seq[idx]
idx -= 1
return min
def norm_obs_clip(obs, clip_min=-1, clip_max=1):
"""
This function returns the difference between min and max value of an observation
:param obs: Observation that should be normalized
:param clip_min: min value where observation will be clipped
:param clip_max: max value where observation will be clipped
:return: returnes normalized and clipped observatoin
"""
max_obs = max(1, max_lt(obs, 1000))
min_obs = max(0, min_lt(obs, 0))
if max_obs == min_obs:
return np.clip(np.array(obs) / max_obs, clip_min, clip_max)
norm = np.abs(max_obs - min_obs)
if norm == 0:
norm = 1.
return np.clip((np.array(obs) - min_obs) / norm, clip_min, clip_max)
class Demo:
def __init__(self, env):
self.env = env
self.create_renderer()
self.action_size = 4
def create_renderer(self):
self.renderer = RenderTool(self.env, gl="PILSVG")
handle = self.env.get_agent_handles()
return handle
def run_demo(self, max_nbr_of_steps=30):
action_dict = dict()
# Reset environment
time.sleep(0.0001) # to satisfy lint...
for step in range(max_nbr_of_steps):
Egli Adrian (IT-SCI-API-PFI)
committed
for iAgent in range(self.env.get_num_agents()):
action = 2
if not ((step) % 2 == 0):
if iAgent == 4:
action = 0
if False:
agent = self.env.agents[iAgent]
trial = 0
while not self.env.check_action(agent, action)[1]:
action = np.random.choice(self.action_size)
trial += 1
if trial > 10:
break
Egli Adrian (IT-SCI-API-PFI)
committed
action_dict.update({iAgent: action})
self.renderer.renderEnv(show=True, action_dict=action_dict)
# Environment step
next_obs, all_rewards, done, _ = self.env.step(action_dict)
if done['__all__']:
break
demo_000 = Demo(Scenario_Generator.generate_random_scenario())
demo_000.run_demo()
demo_000 = None
demo_001 = Demo(Scenario_Generator.generate_complex_scenario())
demo_001.run_demo()
demo_001 = None
Egli Adrian (IT-SCI-API-PFI)
committed
demo_000 = Demo(Scenario_Generator.load_scenario('./env-data/railway/example_network_000.pkl'))
demo_000.run_demo()
demo_000 = None
Egli Adrian (IT-SCI-API-PFI)
committed
demo_001 = Demo(Scenario_Generator.load_scenario('./env-data/railway/example_network_001.pkl'))
demo_001.run_demo()
demo_001 = None
Egli Adrian (IT-SCI-API-PFI)
committed
demo_002 = Demo(Scenario_Generator.load_scenario('./env-data/railway/example_network_002.pkl'))
demo_002.run_demo()
demo_002 = None
demo_flatland_000 = Demo(Scenario_Generator.load_scenario('./env-data/railway/example_flatland_000.pkl'))
demo_flatland_000.renderer.resize()
demo_flatland_000.run_demo(60)
demo_flatland_000 = None
demo_flatland_000 = Demo(Scenario_Generator.load_scenario('./env-data/railway/example_flatland_001.pkl'))
demo_flatland_000.renderer.resize()
demo_flatland_000.run_demo(60)
demo_flatland_000 = None
demo_flatland_000 = Demo(Scenario_Generator.load_scenario('./env-data/railway/example_network_003.pkl'))
demo_flatland_000.run_demo(60)