Skip to content
Snippets Groups Projects
Commit 777ef2a6 authored by Erik Nygren's avatar Erik Nygren
Browse files

Cleaned up examples folder and moved unused examles to internal repo

parent c55cb333
No related branches found
No related tags found
No related merge requests found
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_complex_scene()
import os
import random
import time
import numpy as np
from flatland.envs.generators import complex_rail_generator
from flatland.envs.generators import random_rail_generator
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
# ensure that every demo run behave constantly equal
random.seed(1)
np.random.seed(1)
__file_dirname__ = os.path.dirname(os.path.realpath(__file__))
class Scenario_Generator:
@staticmethod
def generate_random_scenario(number_of_agents=3):
# Example generate a rail given a manual specification,
# a map of tuples (cell_type, rotation)
transition_probability = [15, # empty cell - Case 0
5, # Case 1 - straight
5, # Case 2 - simple switch
1, # Case 3 - diamond crossing
1, # Case 4 - single slip
1, # Case 5 - double slip
1, # Case 6 - symmetrical
0, # Case 7 - dead end
1, # Case 1b (8) - simple turn right
1, # Case 1c (9) - simple turn left
1] # Case 2b (10) - simple switch mirrored
# Example generate a random rail
env = RailEnv(width=20,
height=20,
rail_generator=random_rail_generator(cell_type_relative_proportion=transition_probability),
number_of_agents=number_of_agents)
return env
@staticmethod
def generate_complex_scenario(number_of_agents=3):
env = RailEnv(width=15,
height=15,
rail_generator=complex_rail_generator(nr_start_goal=6, nr_extra=30, min_dist=10,
max_dist=99999, seed=0),
number_of_agents=number_of_agents)
return env
@staticmethod
def load_scenario(resource, package='env_data.railway', number_of_agents=3):
env = RailEnv(width=2 * (1 + number_of_agents),
height=1 + number_of_agents)
env.load_resource(package, resource)
env.reset(False, False)
return env
class Demo:
def __init__(self, env):
self.env = env
self.create_renderer()
self.action_size = 4
self.max_frame_rate = 60
self.record_frames = None
def set_record_frames(self, record_frames):
self.record_frames = record_frames
def create_renderer(self):
self.renderer = RenderTool(self.env)
handle = self.env.get_agent_handles()
return handle
def set_max_framerate(self, max_frame_rate):
self.max_frame_rate = max_frame_rate
def run_demo(self, max_nbr_of_steps=30):
action_dict = dict()
# Reset environment
_ = self.env.reset(False, False)
time.sleep(0.0001) # to satisfy lint...
for step in range(max_nbr_of_steps):
# Action
for iAgent in range(self.env.get_num_agents()):
# allways walk straight forward
action = 2
action = np.random.choice([0, 1, 2, 3], 1, p=[0.0, 0.5, 0.5, 0.0])[0]
# update the actions
action_dict.update({iAgent: action})
# render
self.renderer.render_env(show=True, show_observations=False)
# environment step (apply the actions to all agents)
next_obs, all_rewards, done, _ = self.env.step(action_dict)
if done['__all__']:
break
if self.record_frames is not None:
self.renderer.gl.save_image(self.record_frames.format(step))
self.renderer.close_window()
@staticmethod
def run_generate_random_scenario():
demo_000 = Demo(Scenario_Generator.generate_random_scenario())
demo_000.run_demo()
@staticmethod
def run_generate_complex_scenario():
demo_001 = Demo(Scenario_Generator.generate_complex_scenario())
demo_001.run_demo()
@staticmethod
def run_example_network_000():
demo_000 = Demo(Scenario_Generator.load_scenario('example_network_000.pkl'))
demo_000.run_demo()
@staticmethod
def run_example_network_001():
demo_001 = Demo(Scenario_Generator.load_scenario('example_network_001.pkl'))
demo_001.run_demo()
@staticmethod
def run_example_network_002():
demo_002 = Demo(Scenario_Generator.load_scenario('example_network_002.pkl'))
demo_002.run_demo()
@staticmethod
def run_example_network_003():
demo_flatland_000 = Demo(Scenario_Generator.load_scenario('example_network_003.pkl'))
demo_flatland_000.renderer.resize()
demo_flatland_000.set_max_framerate(5)
demo_flatland_000.run_demo(30)
@staticmethod
def run_example_flatland_000():
demo_flatland_000 = Demo(Scenario_Generator.load_scenario('example_flatland_000.pkl'))
demo_flatland_000.renderer.resize()
demo_flatland_000.set_max_framerate(5)
demo_flatland_000.run_demo(60)
@staticmethod
def run_example_flatland_001():
demo_flatland_000 = Demo(Scenario_Generator.load_scenario('example_flatland_001.pkl'))
demo_flatland_000.renderer.resize()
demo_flatland_000.set_max_framerate(5)
demo_flatland_000.set_record_frames(os.path.join(__file_dirname__, '..', 'rendering', 'frame_{:04d}.bmp'))
demo_flatland_000.run_demo(60)
@staticmethod
def run_complex_scene():
demo_001 = Demo(Scenario_Generator.load_scenario('complex_scene.pkl'))
demo_001.set_record_frames(os.path.join(__file_dirname__, '..', 'rendering', 'frame_{:04d}.bmp'))
demo_001.run_demo(120)
@staticmethod
def run_basic_elements_test():
demo_001 = Demo(Scenario_Generator.load_scenario('basic_elements_test.pkl'))
demo_001.run_demo(120)
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_basic_elements_test()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_flatland_000()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_flatland_001()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_network_000()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_network_001()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_network_002()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_example_network_003()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_generate_complex_scenario()
import random
import numpy as np
from examples.demo import Demo
random.seed(1)
np.random.seed(1)
if __name__ == "__main__":
Demo.run_generate_random_scenario()
import random
import time
from collections import deque
import numpy as np
from flatland.envs.generators import complex_rail_generator
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
class Player(object):
def __init__(self, env):
self.env = env
self.handle = env.get_agent_handles()
self.state_size = 105
self.action_size = 4
self.n_trials = 9999
self.eps = 1.
self.eps_end = 0.005
self.eps_decay = 0.998
self.action_dict = dict()
self.scores_window = deque(maxlen=100)
self.done_window = deque(maxlen=100)
self.scores = []
self.dones_list = []
self.action_prob = [0] * 4
# Removing refs to a real agent for now.
self.iFrame = 0
self.tStart = time.time()
# Reset environment
self.env.obs_builder.reset()
self.obs = self.env._get_observations()
for envAgent in range(self.env.get_num_agents()):
norm = max(1, max_lt(self.obs[envAgent], np.inf))
self.obs[envAgent] = np.clip(np.array(self.obs[envAgent]) / norm, -1, 1)
self.score = 0
self.env_done = 0
def reset(self):
self.obs = self.env.reset()
return self.obs
def step(self):
env = self.env
# Pass the (stored) observation to the agent network and retrieve the action
for handle in env.get_agent_handles():
# Random actions
action = np.random.choice([0, 1, 2, 3], 1, p=[0.2, 0.1, 0.6, 0.1])[0]
# Numpy version uses single random sequence
self.action_prob[action] += 1
self.action_dict.update({handle: action})
# Environment step - pass the agent actions to the environment,
# retrieve the response - observations, rewards, dones
next_obs, all_rewards, done, _ = self.env.step(self.action_dict)
for handle in env.get_agent_handles():
norm = max(1, max_lt(next_obs[handle], np.inf))
next_obs[handle] = np.clip(np.array(next_obs[handle]) / norm, -1, 1)
# Update replay buffer and train agent
if False:
for handle in self.env.get_agent_handles():
self.agent.step(self.obs[handle], self.action_dict[handle],
all_rewards[handle], next_obs[handle], done[handle],
train=False)
self.score += all_rewards[handle]
self.iFrame += 1
self.obs = next_obs.copy()
if done['__all__']:
self.env_done = 1
def max_lt(seq, val):
"""
Return greatest item in seq for which item < val applies.
None is returned if seq was empty or all items in seq were >= val.
"""
idx = len(seq) - 1
while idx >= 0:
if seq[idx] < val and seq[idx] >= 0:
return seq[idx]
idx -= 1
return None
def main(render=True, delay=0.0, n_trials=3, n_steps=50):
random.seed(1)
np.random.seed(1)
# Example generate a random rail
env = RailEnv(width=15, height=15,
rail_generator=complex_rail_generator(nr_start_goal=5, nr_extra=20, min_dist=12),
number_of_agents=5)
if render:
env_renderer = RenderTool(env)
oPlayer = Player(env)
for trials in range(1, n_trials + 1):
# Reset environment
oPlayer.reset()
env_renderer.set_new_rail()
# Run episode
for step in range(n_steps):
oPlayer.step()
if render:
env_renderer.render_env(show=True, frames=True, episode=trials, step=step)
if delay > 0:
time.sleep(delay)
env_renderer.gl.close_window()
if __name__ == "__main__":
main(render=True, delay=0)
try:
from examples.play_model import Player
except ImportError:
from play_model import Player
from flatland.envs.generators import complex_rail_generator
from flatland.envs.rail_env import RailEnv
from flatland.utils.rendertools import RenderTool
def tkmain(n_trials=2, n_steps=50, sGL="PIL"):
# Example generate a random rail
env = RailEnv(width=15, height=15,
rail_generator=complex_rail_generator(nr_start_goal=5, nr_extra=20, min_dist=12),
number_of_agents=5)
env_renderer = RenderTool(env, gl=sGL)
oPlayer = Player(env)
n_trials = 1
for trials in range(1, n_trials + 1):
# Reset environment8
oPlayer.reset()
env_renderer.set_new_rail()
for step in range(n_steps):
oPlayer.step()
env_renderer.render_env(show=True, frames=True, episode=trials, step=step,
action_dict=oPlayer.action_dict)
env_renderer.close_window()
if __name__ == "__main__":
tkmain(sGL="PIL")
tkmain(sGL="PILSVG")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment