Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • flatland/flatland
  • stefan_otte/flatland
  • jiaodaxiaozi/flatland
  • sfwatergit/flatland
  • utozx126/flatland
  • ChenKuanSun/flatland
  • ashivani/flatland
  • minhhoa/flatland
  • pranjal_dhole/flatland
  • darthgera123/flatland
  • rivesunder/flatland
  • thomaslecat/flatland
  • joel_joseph/flatland
  • kchour/flatland
  • alex_zharichenko/flatland
  • yoogottamk/flatland
  • troye_fang/flatland
  • elrichgro/flatland
  • jun_jin/flatland
  • nimishsantosh107/flatland
20 results
Show changes
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
from flatland.envs.observations import TreeObsForRailEnv, GlobalObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import rail_from_grid_transition_map, rail_from_file, empty_rail_generator
from flatland.envs.line_generators import sparse_line_generator, line_from_file
from flatland.utils.simple_rail import make_simple_rail
from flatland.envs.persistence import RailEnvPersister
from flatland.envs.step_utils.states import TrainState
def test_empty_rail_generator():
n_agents = 2
x_dim = 5
y_dim = 10
# Check that a random level at with correct parameters is generated
rail, _ = empty_rail_generator().generate(width=x_dim, height=y_dim, num_agents=n_agents)
# Check the dimensions
assert rail.grid.shape == (y_dim, x_dim)
# Check that no grid was generated
assert np.count_nonzero(rail.grid) == 0
def test_rail_from_grid_transition_map():
rail, rail_map, optionals = make_simple_rail()
n_agents = 2
env = RailEnv(width=rail_map.shape[1], height=rail_map.shape[0], rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=n_agents)
env.reset(False, False)
for a_idx in range(len(env.agents)):
env.agents[a_idx].position = env.agents[a_idx].initial_position
env.agents[a_idx]._set_state(TrainState.MOVING)
nr_rail_elements = np.count_nonzero(env.rail.grid)
# Check if the number of non-empty rail cells is ok
assert nr_rail_elements == 16
# Check that agents are placed on a rail
for a in env.agents:
assert env.rail.grid[a.position] != 0
assert env.get_num_agents() == n_agents
def tests_rail_from_file():
file_name = "test_with_distance_map.pkl"
# Test to save and load file with distance map.
rail, rail_map, optionals = make_simple_rail()
env = RailEnv(width=rail_map.shape[1], height=rail_map.shape[0], rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=3,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()))
env.reset()
#env.save(file_name)
RailEnvPersister.save(env, file_name)
dist_map_shape = np.shape(env.distance_map.get())
rails_initial = env.rail.grid
agents_initial = env.agents
env = RailEnv(width=1, height=1, rail_generator=rail_from_file(file_name),
line_generator=line_from_file(file_name), number_of_agents=1,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()))
env.reset()
rails_loaded = env.rail.grid
agents_loaded = env.agents
# override `earliest_departure` & `latest_arrival` since they aren't expected to be the same
for agent_initial, agent_loaded in zip(agents_initial, agents_loaded):
agent_loaded.earliest_departure = agent_initial.earliest_departure
agent_loaded.latest_arrival = agent_initial.latest_arrival
assert np.all(np.array_equal(rails_initial, rails_loaded))
assert agents_initial == agents_loaded
# Check that distance map was not recomputed
assert np.shape(env.distance_map.get()) == dist_map_shape
assert env.distance_map.get() is not None
# Test to save and load file without distance map.
file_name_2 = "test_without_distance_map.pkl"
env2 = RailEnv(width=rail_map.shape[1], height=rail_map.shape[0],
rail_generator=rail_from_grid_transition_map(rail, optionals), line_generator=sparse_line_generator(),
number_of_agents=3, obs_builder_object=GlobalObsForRailEnv())
env2.reset()
#env2.save(file_name_2)
RailEnvPersister.save(env2, file_name_2)
rails_initial_2 = env2.rail.grid
agents_initial_2 = env2.agents
env2 = RailEnv(width=1, height=1, rail_generator=rail_from_file(file_name_2),
line_generator=line_from_file(file_name_2), number_of_agents=1,
obs_builder_object=GlobalObsForRailEnv())
env2.reset()
rails_loaded_2 = env2.rail.grid
agents_loaded_2 = env2.agents
# override `earliest_departure` & `latest_arrival` since they aren't expected to be the same
for agent_initial, agent_loaded in zip(agents_initial_2, agents_loaded_2):
agent_loaded.earliest_departure = agent_initial.earliest_departure
agent_loaded.latest_arrival = agent_initial.latest_arrival
assert np.all(np.array_equal(rails_initial_2, rails_loaded_2))
assert agents_initial_2 == agents_loaded_2
assert not hasattr(env2.obs_builder, "distance_map")
# Test to save with distance map and load without
env3 = RailEnv(width=1, height=1, rail_generator=rail_from_file(file_name),
line_generator=line_from_file(file_name), number_of_agents=1,
obs_builder_object=GlobalObsForRailEnv())
env3.reset()
rails_loaded_3 = env3.rail.grid
agents_loaded_3 = env3.agents
# override `earliest_departure` & `latest_arrival` since they aren't expected to be the same
for agent_initial, agent_loaded in zip(agents_initial, agents_loaded_3):
agent_loaded.earliest_departure = agent_initial.earliest_departure
agent_loaded.latest_arrival = agent_initial.latest_arrival
assert np.all(np.array_equal(rails_initial, rails_loaded_3))
assert agents_initial == agents_loaded_3
assert not hasattr(env2.obs_builder, "distance_map")
# Test to save without distance map and load with generating distance map
env4 = RailEnv(width=1,
height=1,
rail_generator=rail_from_file(file_name_2),
line_generator=line_from_file(file_name_2),
number_of_agents=1,
obs_builder_object=TreeObsForRailEnv(max_depth=2),
)
env4.reset()
rails_loaded_4 = env4.rail.grid
agents_loaded_4 = env4.agents
# override `earliest_departure` & `latest_arrival` since they aren't expected to be the same
for agent_initial, agent_loaded in zip(agents_initial_2, agents_loaded_4):
agent_loaded.earliest_departure = agent_initial.earliest_departure
agent_loaded.latest_arrival = agent_initial.latest_arrival
# Check that no distance map was saved
assert not hasattr(env2.obs_builder, "distance_map")
assert np.all(np.array_equal(rails_initial_2, rails_loaded_4))
assert agents_initial_2 == agents_loaded_4
# Check that distance map was generated with correct shape
assert env4.distance_map.get() is not None
assert np.shape(env4.distance_map.get()) == dist_map_shape
def main():
tests_rail_from_file()
if __name__ == "__main__":
main()
import numpy as np
from flatland.envs.agent_utils import EnvAgent
from flatland.envs.observations import GlobalObsForRailEnv
from flatland.envs.rail_env import RailEnv, RailEnvActions
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.step_utils.states import TrainState
def test_get_global_observation():
number_of_agents = 20
stochastic_data = {'prop_malfunction': 1., # Percentage of defective agents
'malfunction_rate': 30, # Rate of malfunction occurence
'min_duration': 3, # Minimal duration of malfunction
'max_duration': 20 # Max duration of malfunction
}
speed_ration_map = {1.: 0.25, # Fast passenger train
1. / 2.: 0.25, # Fast freight train
1. / 3.: 0.25, # Slow commuter train
1. / 4.: 0.25} # Slow freight train
env = RailEnv(width=50, height=50, rail_generator=sparse_rail_generator(max_num_cities=6,
max_rails_between_cities=4,
seed=15,
grid_mode=False
),
line_generator=sparse_line_generator(speed_ration_map), number_of_agents=number_of_agents,
obs_builder_object=GlobalObsForRailEnv())
env.reset()
# Perform DO_NOTHING actions until all trains get to READY_TO_DEPART
for _ in range(max([agent.earliest_departure for agent in env.agents])):
env.step({}) # DO_NOTHING for all agents
obs, all_rewards, done, _ = env.step({i: RailEnvActions.MOVE_FORWARD for i in range(number_of_agents)})
for i in range(len(env.agents)):
agent: EnvAgent = env.agents[i]
print("[{}] state={}, position={}, target={}, initial_position={}".format(i, agent.state, agent.position,
agent.target,
agent.initial_position))
for i, agent in enumerate(env.agents):
obs_agents_state = obs[i][1]
obs_targets = obs[i][2]
# test first channel of obs_targets: own target
nr_agents = np.count_nonzero(obs_targets[:, :, 0])
assert nr_agents == 1, "agent {}: something wrong with own target, found {}".format(i, nr_agents)
# test second channel of obs_targets: other agent's target
for r in range(env.height):
for c in range(env.width):
_other_agent_target = 0
for other_i, other_agent in enumerate(env.agents):
if other_agent.target == (r, c):
_other_agent_target = 1
break
assert obs_targets[(r, c)][
1] == _other_agent_target, "agent {}: at {} expected to be other agent's target = {}".format(
i, (r, c),
_other_agent_target)
# test first channel of obs_agents_state: direction at own position
for r in range(env.height):
for c in range(env.width):
if (agent.state.is_on_map_state() or agent.state == TrainState.DONE) and (
r, c) == agent.position:
assert np.isclose(obs_agents_state[(r, c)][0], agent.direction), \
"agent {} in state {} at {} expected to contain own direction {}, found {}" \
.format(i, agent.state, (r, c), agent.direction, obs_agents_state[(r, c)][0])
elif (agent.state == TrainState.READY_TO_DEPART) and (r, c) == agent.initial_position:
assert np.isclose(obs_agents_state[(r, c)][0], agent.direction), \
"agent {} in state {} at {} expected to contain own direction {}, found {}" \
.format(i, agent.state, (r, c), agent.direction, obs_agents_state[(r, c)][0])
else:
assert np.isclose(obs_agents_state[(r, c)][0], -1), \
"agent {} in state {} at {} expected contain -1 found {}" \
.format(i, agent.state, (r, c), obs_agents_state[(r, c)][0])
# test second channel of obs_agents_state: direction at other agents position
for r in range(env.height):
for c in range(env.width):
has_agent = False
for other_i, other_agent in enumerate(env.agents):
if i == other_i:
continue
if other_agent.state in [TrainState.MOVING, TrainState.MALFUNCTION, TrainState.STOPPED, TrainState.DONE] and (
r, c) == other_agent.position:
assert np.isclose(obs_agents_state[(r, c)][1], other_agent.direction), \
"agent {} in state {} at {} should see other agent with direction {}, found = {}" \
.format(i, agent.state, (r, c), other_agent.direction, obs_agents_state[(r, c)][1])
has_agent = True
if not has_agent:
assert np.isclose(obs_agents_state[(r, c)][1], -1), \
"agent {} in state {} at {} should see no other agent direction (-1), found = {}" \
.format(i, agent.state, (r, c), obs_agents_state[(r, c)][1])
# test third and fourth channel of obs_agents_state: malfunction and speed of own or other agent in the grid
for r in range(env.height):
for c in range(env.width):
has_agent = False
for other_i, other_agent in enumerate(env.agents):
if other_agent.state in [TrainState.MOVING, TrainState.MALFUNCTION, TrainState.STOPPED,
TrainState.DONE] and other_agent.position == (r, c):
assert np.isclose(obs_agents_state[(r, c)][2], other_agent.malfunction_handler.malfunction_down_counter), \
"agent {} in state {} at {} should see agent malfunction {}, found = {}" \
.format(i, agent.state, (r, c), other_agent.malfunction_handler.malfunction_down_counter,
obs_agents_state[(r, c)][2])
assert np.isclose(obs_agents_state[(r, c)][3], other_agent.speed_counter.speed)
has_agent = True
if not has_agent:
assert np.isclose(obs_agents_state[(r, c)][2], -1), \
"agent {} in state {} at {} should see no agent malfunction (-1), found = {}" \
.format(i, agent.state, (r, c), obs_agents_state[(r, c)][2])
assert np.isclose(obs_agents_state[(r, c)][3], -1), \
"agent {} in state {} at {} should see no agent speed (-1), found = {}" \
.format(i, agent.state, (r, c), obs_agents_state[(r, c)][3])
# test fifth channel of obs_agents_state: number of agents ready to depart in to this cell
for r in range(env.height):
for c in range(env.width):
count = 0
for other_i, other_agent in enumerate(env.agents):
if other_agent.state == TrainState.READY_TO_DEPART and other_agent.initial_position == (r, c):
count += 1
assert np.isclose(obs_agents_state[(r, c)][4], count), \
"agent {} in state {} at {} should see {} agents ready to depart, found{}" \
.format(i, agent.state, (r, c), count, obs_agents_state[(r, c)][4])
import random
import numpy as np
from examples.demo import Demo
# ensure that every demo run behave constantly equal
random.seed(1)
np.random.seed(1)
def test_flatland_000():
Demo.run_example_flatland_000()
# TODO test assertions
def test_flatland_001():
Demo.run_example_flatland_001()
# TODO test assertions
def test_network_000():
Demo.run_example_network_000()
# TODO test assertions
def test_network_001():
Demo.run_example_network_001()
# TODO test assertions
def test_network_002():
Demo.run_example_network_002()
# TODO test assertions
def test_complex_scene():
Demo.run_complex_scene()
# TODO test assertions
def test_generate_complex_scenario():
Demo.run_generate_complex_scenario()
# TODO test assertions
def test_generate_random_scenario():
Demo.run_generate_random_scenario()
# TODO test assertions
from flatland.envs.malfunction_generators import malfunction_from_params, malfunction_from_file, \
single_malfunction_generator, MalfunctionParameters
from flatland.envs.rail_env import RailEnv, RailEnvActions
from flatland.envs.rail_generators import rail_from_grid_transition_map
from flatland.envs.line_generators import sparse_line_generator
from flatland.utils.simple_rail import make_simple_rail2
from flatland.envs.persistence import RailEnvPersister
import pytest
def test_malfanction_from_params():
"""
Test loading malfunction from
Returns
-------
"""
stochastic_data = MalfunctionParameters(malfunction_rate=1000, # Rate of malfunction occurence
min_duration=2, # Minimal duration of malfunction
max_duration=5 # Max duration of malfunction
)
rail, rail_map, optionals = make_simple_rail2()
env = RailEnv(width=25,
height=30,
rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(),
number_of_agents=10,
malfunction_generator_and_process_data=malfunction_from_params(stochastic_data)
)
env.reset()
assert env.malfunction_process_data.malfunction_rate == 1000
assert env.malfunction_process_data.min_duration == 2
assert env.malfunction_process_data.max_duration == 5
def test_malfanction_to_and_from_file():
"""
Test loading malfunction from
Returns
-------
"""
stochastic_data = MalfunctionParameters(malfunction_rate=1000, # Rate of malfunction occurence
min_duration=2, # Minimal duration of malfunction
max_duration=5 # Max duration of malfunction
)
rail, rail_map, optionals = make_simple_rail2()
env = RailEnv(width=25,
height=30,
rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(),
number_of_agents=10,
malfunction_generator_and_process_data=malfunction_from_params(stochastic_data)
)
env.reset()
#env.save("./malfunction_saving_loading_tests.pkl")
RailEnvPersister.save(env, "./malfunction_saving_loading_tests.pkl")
malfunction_generator, malfunction_process_data = malfunction_from_file("./malfunction_saving_loading_tests.pkl")
env2 = RailEnv(width=25,
height=30,
rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(),
number_of_agents=10,
malfunction_generator_and_process_data=malfunction_from_params(stochastic_data)
)
env2.reset()
assert env2.malfunction_process_data == env.malfunction_process_data
assert env2.malfunction_process_data.malfunction_rate == 1000
assert env2.malfunction_process_data.min_duration == 2
assert env2.malfunction_process_data.max_duration == 5
@pytest.mark.skip("Single malfunction generator is deprecated")
def test_single_malfunction_generator():
"""
Test single malfunction generator
Returns
-------
"""
rail, rail_map, optionals = make_simple_rail2()
env = RailEnv(width=25,
height=30,
rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(),
number_of_agents=10,
malfunction_generator_and_process_data=single_malfunction_generator(earlierst_malfunction=3,
malfunction_duration=5)
)
for test in range(10):
env.reset()
action_dict = dict()
tot_malfunctions = 0
print(test)
for i in range(10):
for agent in env.agents:
# Go forward all the time
action_dict[agent.handle] = RailEnvActions(2)
_, _, dones, _ = env.step(action_dict)
if dones['__all__']:
break
for agent in env.agents:
# Go forward all the time
tot_malfunctions += agent.malfunction_handler.num_malfunctions
assert tot_malfunctions == 1
import numpy as np
from flatland.core.grid.grid4 import Grid4TransitionsEnum
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv, RailEnvActions
from flatland.envs.rail_generators import sparse_rail_generator, rail_from_grid_transition_map
from flatland.envs.line_generators import sparse_line_generator
from flatland.utils.simple_rail import make_simple_rail
from test_utils import ReplayConfig, Replay, run_replay_config, set_penalties_for_replay
from flatland.envs.step_utils.states import TrainState
from flatland.envs.step_utils.speed_counter import SpeedCounter
# Use the sparse_rail_generator to generate feasible network configurations with corresponding tasks
# Training on simple small tasks is the best way to get familiar with the environment
#
class RandomAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.np_random = np.random.RandomState(seed=42)
def act(self, state):
"""
:param state: input is the observation of the agent
:return: returns an action
"""
return self.np_random.choice([1, 2, 3])
def step(self, memories):
"""
Step function to improve agent by adjusting policy given the observations
:param memories: SARS Tuple to be
:return:
"""
return
def save(self, filename):
# Store the current policy
return
def load(self, filename):
# Load a policy
return
def test_multi_speed_init():
env = RailEnv(width=50, height=50,
rail_generator=sparse_rail_generator(seed=2), line_generator=sparse_line_generator(),
random_seed=3,
number_of_agents=3)
# Initialize the agent with the parameters corresponding to the environment and observation_builder
agent = RandomAgent(218, 4)
# Empty dictionary for all agent action
action_dict = dict()
# Set all the different speeds
# Reset environment and get initial observations for all agents
env.reset(False, False)
env._max_episode_steps = 1000
for a_idx in range(len(env.agents)):
env.agents[a_idx].position = env.agents[a_idx].initial_position
env.agents[a_idx]._set_state(TrainState.MOVING)
# Here you can also further enhance the provided observation by means of normalization
# See training navigation example in the baseline repository
old_pos = []
for i_agent in range(env.get_num_agents()):
env.agents[i_agent].speed_counter = SpeedCounter(speed = 1. / (i_agent + 1))
old_pos.append(env.agents[i_agent].position)
print(env.agents[i_agent].position)
# Run episode
for step in range(100):
# Choose an action for each agent in the environment
for a in range(env.get_num_agents()):
action = agent.act(0)
action_dict.update({a: action})
# Check that agent did not move in between its speed updates
assert old_pos[a] == env.agents[a].position
# Environment step which returns the observations for all agents, their corresponding
# reward and whether they are done
_, _, _, _ = env.step(action_dict)
# Update old position whenever an agent was allowed to move
for i_agent in range(env.get_num_agents()):
if (step + 1) % (i_agent + 1) == 0:
print(step, i_agent, env.agents[i_agent].position)
old_pos[i_agent] = env.agents[i_agent].position
def test_multispeed_actions_no_malfunction_no_blocking():
"""Test that actions are correctly performed on cell exit for a single agent."""
rail, rail_map, optionals = make_simple_rail()
env = RailEnv(width=rail_map.shape[1], height=rail_map.shape[0], rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=1,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()))
env.reset()
env._max_episode_steps = 1000
set_penalties_for_replay(env)
test_config = ReplayConfig(
replay=[
Replay(
position=(3, 9), # east dead-end
direction=Grid4TransitionsEnum.EAST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 0.5 # starting and running at speed 0.5
),
Replay(
position=(3, 9),
direction=Grid4TransitionsEnum.EAST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_LEFT,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(4, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=RailEnvActions.STOP_MOVING,
reward=env.stop_penalty + env.step_penalty * 0.5 # stopping and step penalty
),
#
Replay(
position=(4, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=RailEnvActions.STOP_MOVING,
reward=env.step_penalty * 0.5 # step penalty for speed 0.5 when stopped
),
Replay(
position=(4, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 0.5 # starting + running at speed 0.5
),
Replay(
position=(4, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(5, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
],
target=(3, 0), # west dead-end
speed=0.5,
initial_position=(3, 9), # east dead-end
initial_direction=Grid4TransitionsEnum.EAST,
)
run_replay_config(env, [test_config], skip_reward_check=True, skip_action_required_check=True)
def test_multispeed_actions_no_malfunction_blocking():
"""The second agent blocks the first because it is slower."""
rail, rail_map, optionals = make_simple_rail()
env = RailEnv(width=rail_map.shape[1], height=rail_map.shape[0], rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=2,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()),
random_seed=1)
env.reset()
set_penalties_for_replay(env)
test_configs = [
ReplayConfig(
replay=[
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 1.0 / 3.0 # starting and running at speed 1/3
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
)
],
target=(3, 0), # west dead-end
speed=1 / 3,
initial_position=(3, 8),
initial_direction=Grid4TransitionsEnum.WEST,
),
ReplayConfig(
replay=[
Replay(
position=(3, 9), # east dead-end
direction=Grid4TransitionsEnum.EAST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 0.5 # starting and running at speed 0.5
),
Replay(
position=(3, 9),
direction=Grid4TransitionsEnum.EAST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# blocked although fraction >= 1.0
Replay(
position=(3, 9),
direction=Grid4TransitionsEnum.EAST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# blocked although fraction >= 1.0
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# blocked although fraction >= 1.0
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_LEFT,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# not blocked, action required!
Replay(
position=(4, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
],
target=(3, 0), # west dead-end
speed=0.5,
initial_position=(3, 9), # east dead-end
initial_direction=Grid4TransitionsEnum.EAST,
)
]
run_replay_config(env, test_configs, skip_reward_check=True)
def test_multispeed_actions_malfunction_no_blocking():
"""Test on a single agent whether action on cell exit work correctly despite malfunction."""
rail, rail_map, optionals = make_simple_rail()
env = RailEnv(width=rail_map.shape[1], height=rail_map.shape[0], rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=1,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()))
env.reset()
# Perform DO_NOTHING actions until all trains get to READY_TO_DEPART
for _ in range(max([agent.earliest_departure for agent in env.agents]) + 1):
env.step({}) # DO_NOTHING for all agents
env._max_episode_steps = 10000
set_penalties_for_replay(env)
test_config = ReplayConfig(
replay=[
Replay( # 0
position=(3, 9), # east dead-end
direction=Grid4TransitionsEnum.EAST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 0.5 # starting and running at speed 0.5
),
Replay( # 1
position=(3, 9),
direction=Grid4TransitionsEnum.EAST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 2
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# add additional step in the cell
Replay( # 3
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
set_malfunction=2, # recovers in two steps from now!,
malfunction=2,
reward=env.step_penalty * 0.5 # step penalty for speed 0.5 when malfunctioning
),
# agent recovers in this step
Replay( # 4
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
malfunction=1,
reward=env.step_penalty * 0.5 # recovered: running at speed 0.5
),
Replay( # 5
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 6
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 7
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
set_malfunction=2, # recovers in two steps from now!
malfunction=2,
reward=env.step_penalty * 0.5 # step penalty for speed 0.5 when malfunctioning
),
# agent recovers in this step; since we're at the beginning, we provide a different action although we're broken!
Replay( # 8
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
malfunction=1,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 9
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 10
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.STOP_MOVING,
reward=env.stop_penalty + env.step_penalty * 0.5 # stopping and step penalty for speed 0.5
),
Replay( # 11
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.STOP_MOVING,
reward=env.step_penalty * 0.5 # step penalty for speed 0.5 while stopped
),
Replay( # 12
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 0.5 # starting and running at speed 0.5
),
Replay( # 13
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# DO_NOTHING keeps moving!
Replay( # 14
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.DO_NOTHING,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 15
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 16
position=(3, 4),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
],
target=(3, 0), # west dead-end
speed=0.5,
initial_position=(3, 9), # east dead-end
initial_direction=Grid4TransitionsEnum.EAST,
)
run_replay_config(env, [test_config], skip_reward_check=True)
# TODO invalid action penalty seems only given when forward is not possible - is this the intended behaviour?
def test_multispeed_actions_no_malfunction_invalid_actions():
"""Test that actions are correctly performed on cell exit for a single agent."""
rail, rail_map, optionals = make_simple_rail()
env = RailEnv(width=rail_map.shape[1], height=rail_map.shape[0], rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=1,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()))
env.reset()
# Perform DO_NOTHING actions until all trains get to READY_TO_DEPART
for _ in range(max([agent.earliest_departure for agent in env.agents])):
env.step({}) # DO_NOTHING for all agents
env._max_episode_steps = 10000
set_penalties_for_replay(env)
test_config = ReplayConfig(
replay=[
Replay(
position=(3, 9), # east dead-end
direction=Grid4TransitionsEnum.EAST,
action=RailEnvActions.MOVE_LEFT,
reward=env.start_penalty + env.step_penalty * 0.5 # auto-correction left to forward without penalty!
),
Replay(
position=(3, 9),
direction=Grid4TransitionsEnum.EAST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_RIGHT,
reward=env.step_penalty * 0.5 # wrong action is corrected to forward without penalty!
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_RIGHT,
reward=env.step_penalty * 0.5 # wrong action is corrected to forward without penalty!
), Replay(
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
],
target=(3, 0), # west dead-end
speed=0.5,
initial_position=(3, 9), # east dead-end
initial_direction=Grid4TransitionsEnum.EAST,
)
run_replay_config(env, [test_config], skip_reward_check=True)
import pytest
@pytest.mark.skip(reason="Only for testing pettingzoo interface and wrappers")
def test_petting_zoo_interface_env():
import numpy as np
import os
import PIL
import shutil
from flatland.contrib.interface import flatland_env
from flatland.contrib.utils import env_generators
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
# First of all we import the Flatland rail environment
from flatland.utils.rendertools import RenderTool, AgentRenderVariant
from flatland.contrib.wrappers.flatland_wrappers import SkipNoChoiceCellsWrapper
from flatland.contrib.wrappers.flatland_wrappers import ShortestPathActionWrapper # noqa
# Custom observation builder without predictor
# observation_builder = GlobalObsForRailEnv()
# Custom observation builder with predictor
observation_builder = TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv(30))
seed = 11
save = True
np.random.seed(seed)
experiment_name = "flatland_pettingzoo"
total_episodes = 2
if save:
try:
if os.path.isdir(experiment_name):
shutil.rmtree(experiment_name)
os.mkdir(experiment_name)
except OSError as e:
print("Error: %s - %s." % (e.filename, e.strerror))
rail_env = env_generators.sparse_env_small(seed, observation_builder)
rail_env = env_generators.small_v0(seed, observation_builder)
rail_env.reset(random_seed=seed)
# For Shortest Path Action Wrapper, change action to 1
# rail_env = ShortestPathActionWrapper(rail_env)
rail_env = SkipNoChoiceCellsWrapper(rail_env, accumulate_skipped_rewards=False, discounting=0.0)
dones = {}
dones['__all__'] = False
step = 0
ep_no = 0
frame_list = []
all_actions_env = []
all_actions_pettingzoo_env = []
# while not dones['__all__']:
while ep_no < total_episodes:
action_dict = {}
# Chose an action for each agent
for a in range(rail_env.get_num_agents()):
# action = env_generators.get_shortest_path_action(rail_env, a)
action = 2
all_actions_env.append(action)
action_dict.update({a: action})
step += 1
# Do the environment step
observations, rewards, dones, information = rail_env.step(action_dict)
frame_list.append(PIL.Image.fromarray(rail_env.render(mode="rgb_array")))
if dones['__all__']:
completion = env_generators.perc_completion(rail_env)
print("Final Agents Completed:", completion)
ep_no += 1
if save:
frame_list[0].save(f"{experiment_name}{os.sep}out_{ep_no}.gif", save_all=True,
append_images=frame_list[1:], duration=3, loop=0)
frame_list = []
rail_env.reset(random_seed=seed+ep_no)
# __sphinx_doc_begin__
env = flatland_env.env(environment=rail_env)
seed = 11
env.reset(random_seed=seed)
step = 0
ep_no = 0
frame_list = []
while ep_no < total_episodes:
for agent in env.agent_iter():
obs, reward, done, info = env.last()
# act = env_generators.get_shortest_path_action(env.environment, get_agent_handle(agent))
act = 2
all_actions_pettingzoo_env.append(act)
env.step(act)
frame_list.append(PIL.Image.fromarray(env.render(mode='rgb_array')))
step += 1
# __sphinx_doc_end__
completion = env_generators.perc_completion(env)
print("Final Agents Completed:", completion)
ep_no += 1
if save:
frame_list[0].save(f"{experiment_name}{os.sep}pettyzoo_out_{ep_no}.gif", save_all=True,
append_images=frame_list[1:], duration=3, loop=0)
frame_list = []
env.close()
env.reset(random_seed=seed+ep_no)
min_len = min(len(all_actions_pettingzoo_env), len(all_actions_env))
assert all_actions_pettingzoo_env[:min_len] == all_actions_env[:min_len], "actions do not match"
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-sv", __file__]))
from examples.play_model import main
def test_main():
main(render=True, n_steps=20, n_trials=2, sGL="PIL")
main(render=True, n_steps=20, n_trials=2, sGL="PILSVG")
if __name__ == "__main__":
test_main()
import numpy as np
from flatland.envs.observations import GlobalObsForRailEnv, TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import rail_from_grid_transition_map, sparse_rail_generator
from flatland.envs.line_generators import sparse_line_generator
from flatland.utils.simple_rail import make_simple_rail2
def ndom_seeding():
# Set fixed malfunction duration for this test
rail, rail_map, optionals = make_simple_rail2()
# Move target to unreachable position in order to not interfere with test
for idx in range(100):
env = RailEnv(width=25, height=30, rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(seed=12), number_of_agents=10)
env.reset(True, True, random_seed=1)
env.agents[0].target = (0, 0)
for step in range(10):
actions = {}
actions[0] = 2
env.step(actions)
agent_positions = []
env.agents[0].initial_position == (3, 2)
env.agents[1].initial_position == (3, 5)
env.agents[2].initial_position == (3, 6)
env.agents[3].initial_position == (5, 6)
env.agents[4].initial_position == (3, 4)
env.agents[5].initial_position == (3, 1)
env.agents[6].initial_position == (3, 9)
env.agents[7].initial_position == (4, 6)
env.agents[8].initial_position == (0, 3)
env.agents[9].initial_position == (3, 7)
# Test generation print
# for a in range(env.get_num_agents()):
# print("env.agents[{}].initial_position == {}".format(a,env.agents[a].initial_position))
# print("env.agents[0].initial_position == {}".format(env.agents[0].initial_position))
# print("assert env.agents[0].position == {}".format(env.agents[0].position))
def test_seeding_and_observations():
# Test if two different instances diverge with different observations
rail, rail_map, optionals = make_simple_rail2()
optionals['agents_hints']['num_agents'] = 10
# Make two seperate envs with different observation builders
# Global Observation
env = RailEnv(width=25, height=30, rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(seed=12), number_of_agents=10,
obs_builder_object=GlobalObsForRailEnv())
# Tree Observation
env2 = RailEnv(width=25, height=30, rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(seed=12), number_of_agents=10,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()))
env.reset(False, False, random_seed=12)
env2.reset(False, False, random_seed=12)
# Check that both environments produce the same initial start positions
assert env.agents[0].initial_position == env2.agents[0].initial_position
assert env.agents[1].initial_position == env2.agents[1].initial_position
assert env.agents[2].initial_position == env2.agents[2].initial_position
assert env.agents[3].initial_position == env2.agents[3].initial_position
assert env.agents[4].initial_position == env2.agents[4].initial_position
assert env.agents[5].initial_position == env2.agents[5].initial_position
assert env.agents[6].initial_position == env2.agents[6].initial_position
assert env.agents[7].initial_position == env2.agents[7].initial_position
assert env.agents[8].initial_position == env2.agents[8].initial_position
assert env.agents[9].initial_position == env2.agents[9].initial_position
action_dict = {}
for step in range(10):
for a in range(env.get_num_agents()):
action = np.random.randint(4)
action_dict[a] = action
env.step(action_dict)
env2.step(action_dict)
# Check that both environments end up in the same position
assert env.agents[0].position == env2.agents[0].position
assert env.agents[1].position == env2.agents[1].position
assert env.agents[2].position == env2.agents[2].position
assert env.agents[3].position == env2.agents[3].position
assert env.agents[4].position == env2.agents[4].position
assert env.agents[5].position == env2.agents[5].position
assert env.agents[6].position == env2.agents[6].position
assert env.agents[7].position == env2.agents[7].position
assert env.agents[8].position == env2.agents[8].position
assert env.agents[9].position == env2.agents[9].position
for a in range(env.get_num_agents()):
print("assert env.agents[{}].position == env2.agents[{}].position".format(a, a))
def test_seeding_and_malfunction():
# Test if two different instances diverge with different observations
rail, rail_map, optionals = make_simple_rail2()
optionals['agents_hints']['num_agents'] = 10
stochastic_data = {'prop_malfunction': 0.4,
'malfunction_rate': 2,
'min_duration': 10,
'max_duration': 10}
# Make two seperate envs with different and see if the exhibit the same malfunctions
# Global Observation
for tests in range(1, 100):
env = RailEnv(width=25, height=30, rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=10,
obs_builder_object=GlobalObsForRailEnv())
# Tree Observation
env2 = RailEnv(width=25, height=30, rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=10,
obs_builder_object=GlobalObsForRailEnv())
env.reset(True, False, random_seed=tests)
env2.reset(True, False, random_seed=tests)
# Check that both environments produce the same initial start positions
assert env.agents[0].initial_position == env2.agents[0].initial_position
assert env.agents[1].initial_position == env2.agents[1].initial_position
assert env.agents[2].initial_position == env2.agents[2].initial_position
assert env.agents[3].initial_position == env2.agents[3].initial_position
assert env.agents[4].initial_position == env2.agents[4].initial_position
assert env.agents[5].initial_position == env2.agents[5].initial_position
assert env.agents[6].initial_position == env2.agents[6].initial_position
assert env.agents[7].initial_position == env2.agents[7].initial_position
assert env.agents[8].initial_position == env2.agents[8].initial_position
assert env.agents[9].initial_position == env2.agents[9].initial_position
action_dict = {}
for step in range(10):
for a in range(env.get_num_agents()):
action = np.random.randint(4)
action_dict[a] = action
# print("----------------------")
# print(env.agents[a].malfunction_handler, env.agents[a].status)
# print(env2.agents[a].malfunction_handler, env2.agents[a].status)
_, reward1, done1, _ = env.step(action_dict)
_, reward2, done2, _ = env2.step(action_dict)
for a in range(env.get_num_agents()):
assert reward1[a] == reward2[a]
assert done1[a] == done2[a]
# Check that both environments end up in the same position
assert env.agents[0].position == env2.agents[0].position
assert env.agents[1].position == env2.agents[1].position
assert env.agents[2].position == env2.agents[2].position
assert env.agents[3].position == env2.agents[3].position
assert env.agents[4].position == env2.agents[4].position
assert env.agents[5].position == env2.agents[5].position
assert env.agents[6].position == env2.agents[6].position
assert env.agents[7].position == env2.agents[7].position
assert env.agents[8].position == env2.agents[8].position
assert env.agents[9].position == env2.agents[9].position
def test_reproducability_env():
"""
Test that no random generators are present within the env that get influenced by external np random
"""
speed_ration_map = {1.: 1., # Fast passenger train
1. / 2.: 0., # Fast freight train
1. / 3.: 0., # Slow commuter train
1. / 4.: 0.} # Slow freight train
env = RailEnv(width=25, height=30, rail_generator=sparse_rail_generator(max_num_cities=5,
max_rails_between_cities=3,
seed=10, # Random seed
grid_mode=True
),
line_generator=sparse_line_generator(speed_ration_map), number_of_agents=1)
env.reset(True, True, random_seed=1)
excpeted_grid = [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 16386, 1025, 4608, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[16386, 17411, 1025, 5633, 17411, 3089, 1025, 1097, 5633, 17411, 1025, 5633, 1025, 1025, 1025, 1025, 5633, 17411, 1025, 1025, 1025, 5633, 17411, 1025, 4608],
[32800, 32800, 0, 72, 3089, 5633, 1025, 17411, 1097, 2064, 0, 72, 1025, 1025, 1025, 1025, 1097, 3089, 1025, 1025, 1025, 1097, 3089, 1025, 37408],
[32800, 32800, 0, 0, 0, 72, 1025, 2064, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[32800, 32872, 4608, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16386, 34864],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[72, 37408, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[0, 49186, 2064, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 72, 37408],
[0, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[0, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[0, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[0, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[0, 32872, 1025, 5633, 17411, 1025, 1025, 1025, 5633, 17411, 1025, 1025, 1025, 1025, 1025, 1025, 5633, 17411, 1025, 1025, 1025, 5633, 17411, 1025, 34864],
[0, 72, 1025, 1097, 3089, 1025, 1025, 1025, 1097, 3089, 1025, 1025, 1025, 1025, 1025, 1025, 1097, 3089, 1025, 1025, 1025, 1097, 3089, 1025, 2064],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]
assert env.rail.grid.tolist() == excpeted_grid
# Test that we don't have interference from calling mulitple function outisde
env2 = RailEnv(width=25, height=30, rail_generator=sparse_rail_generator(max_num_cities=5,
max_rails_between_cities=3,
seed=10, # Random seed
grid_mode=True
),
line_generator=sparse_line_generator(speed_ration_map), number_of_agents=1)
np.random.seed(1)
for i in range(10):
np.random.randn()
env2.reset(True, True, random_seed=1)
assert env2.rail.grid.tolist() == excpeted_grid
"""Test speed initialization by a map of speeds and their corresponding ratios."""
import numpy as np
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.envs.line_generators import speed_initialization_helper, sparse_line_generator
def test_speed_initialization_helper():
random_generator = np.random.RandomState()
random_generator.seed(10)
speed_ratio_map = {1: 0.3, 2: 0.4, 3: 0.3}
actual_speeds = speed_initialization_helper(10, speed_ratio_map, np_random=random_generator)
# seed makes speed_initialization_helper deterministic -> check generated speeds.
assert actual_speeds == [3, 1, 2, 3, 2, 1, 1, 3, 1, 1]
def test_rail_env_speed_intializer():
speed_ratio_map = {1: 0.3, 2: 0.4, 3: 0.1, 5: 0.2}
env = RailEnv(width=50, height=50,
rail_generator=sparse_rail_generator(), line_generator=sparse_line_generator(),
number_of_agents=10)
env.reset()
actual_speeds = list(map(lambda agent: agent.speed_counter.speed, env.agents))
expected_speed_set = set(speed_ratio_map.keys())
# check that the number of speeds generated is correct
assert len(actual_speeds) == env.get_num_agents()
# check that only the speeds defined are generated
assert all({(actual_speed in expected_speed_set) for actual_speed in actual_speeds})
"""Test Utils."""
from typing import List, Tuple, Optional
import numpy as np
from attr import attrs, attrib
from flatland.core.grid.grid4 import Grid4TransitionsEnum
from flatland.envs.agent_utils import EnvAgent
from flatland.envs.malfunction_generators import MalfunctionParameters, malfunction_from_params
from flatland.envs.rail_env import RailEnvActions, RailEnv
from flatland.envs.rail_generators import RailGenerator
from flatland.envs.line_generators import LineGenerator
from flatland.utils.rendertools import RenderTool
from flatland.envs.persistence import RailEnvPersister
from flatland.envs.step_utils.states import TrainState
from flatland.envs.step_utils.speed_counter import SpeedCounter
@attrs
class Replay(object):
position = attrib(type=Tuple[int, int])
direction = attrib(type=Grid4TransitionsEnum)
action = attrib(type=RailEnvActions)
malfunction = attrib(default=0, type=int)
set_malfunction = attrib(default=None, type=Optional[int])
reward = attrib(default=None, type=Optional[float])
state = attrib(default=None, type=Optional[TrainState])
@attrs
class ReplayConfig(object):
replay = attrib(type=List[Replay])
target = attrib(type=Tuple[int, int])
speed = attrib(type=float)
initial_position = attrib(type=Tuple[int, int])
initial_direction = attrib(type=Grid4TransitionsEnum)
# ensure that env is working correctly with start/stop/invalidaction penalty different from 0
def set_penalties_for_replay(env: RailEnv):
env.step_penalty = -7
env.start_penalty = -13
env.stop_penalty = -19
env.invalid_action_penalty = -29
def run_replay_config(env: RailEnv, test_configs: List[ReplayConfig], rendering: bool = False, activate_agents=True,
skip_reward_check=False, set_ready_to_depart=False, skip_action_required_check=False):
"""
Runs the replay configs and checks assertions.
*Initially*
- The `initial_position`, `initial_direction`, `target` and `speed` are taken from the `ReplayConfig` to initialize the agents.
*Before each step*
- `position` is verfified
- `direction` is verified
- `status` is verified (optionally, only if not `None` in `Replay`)
- `set_malfunction` is applied (optionally, only if not `None` in `Replay`)
- `malfunction` is verified
- `action` must only be provided if action_required from previous step (initally all True)
*Step*
- performed with the given `action`
*After each step*
- `reward` is verified after step
Parameters
----------
activate_agents: should the agents directly be activated when the environment is initially setup by `reset()`?
env: the environment; is `reset()` to set the agents' intial position, direction, target and speed
test_configs: the `ReplayConfig`s, one for each agent
rendering: should be rendered during replay?
"""
if rendering:
renderer = RenderTool(env)
renderer.render_env(show=True, frames=False, show_observations=False)
info_dict = {
'action_required': [True for _ in test_configs]
}
for step in range(len(test_configs[0].replay)):
if step == 0:
for a, test_config in enumerate(test_configs):
agent: EnvAgent = env.agents[a]
# set the initial position
agent.initial_position = test_config.initial_position
agent.initial_direction = test_config.initial_direction
agent.direction = test_config.initial_direction
agent.target = test_config.target
agent.speed_counter = SpeedCounter(speed=test_config.speed)
env.reset(False, False)
if set_ready_to_depart:
# Set all agents to ready to depart
for i_agent in range(len(env.agents)):
env.agents[i_agent].earliest_departure = 0
env.agents[i_agent]._set_state(TrainState.READY_TO_DEPART)
elif activate_agents:
for a_idx in range(len(env.agents)):
env.agents[a_idx].position = env.agents[a_idx].initial_position
env.agents[a_idx]._set_state(TrainState.MOVING)
def _assert(a, actual, expected, msg):
print("[{}] verifying {} on agent {}: actual={}, expected={}".format(step, msg, a, actual, expected))
assert (actual == expected) or (
np.allclose(actual, expected)), "[{}] agent {} {}: actual={}, expected={}".format(step, a, msg,
actual,
expected)
action_dict = {}
for a, test_config in enumerate(test_configs):
agent: EnvAgent = env.agents[a]
replay = test_config.replay[step]
# if not agent.position == replay.position:
# import pdb; pdb.set_trace()
_assert(a, agent.position, replay.position, 'position')
_assert(a, agent.direction, replay.direction, 'direction')
if replay.state is not None:
_assert(a, agent.state, replay.state, 'state')
if replay.action is not None:
if not skip_action_required_check:
assert info_dict['action_required'][
a] == True or agent.state == TrainState.READY_TO_DEPART, "[{}] agent {} expecting action_required={} or agent status READY_TO_DEPART".format(
step, a, True)
action_dict[a] = replay.action
else:
if not skip_action_required_check:
assert info_dict['action_required'][
a] == False, "[{}] agent {} expecting action_required={}, but found {}".format(
step, a, False, info_dict['action_required'][a])
if replay.set_malfunction is not None:
# As we force malfunctions on the agents we have to set a positive rate that the env
# recognizes the agent as potentially malfuncitoning
# We also set next malfunction to infitiy to avoid interference with our tests
env.agents[a].malfunction_handler._set_malfunction_down_counter(replay.set_malfunction)
_assert(a, agent.malfunction_handler.malfunction_down_counter, replay.malfunction, 'malfunction')
print(step)
_, rewards_dict, _, info_dict = env.step(action_dict)
# import pdb; pdb.set_trace()
if rendering:
renderer.render_env(show=True, show_observations=True)
for a, test_config in enumerate(test_configs):
replay = test_config.replay[step]
if not skip_reward_check:
_assert(a, rewards_dict[a], replay.reward, 'reward')
def create_and_save_env(file_name: str, line_generator: LineGenerator, rail_generator: RailGenerator):
stochastic_data = MalfunctionParameters(malfunction_rate=1000, # Rate of malfunction occurence
min_duration=15, # Minimal duration of malfunction
max_duration=50 # Max duration of malfunction
)
env = RailEnv(width=30,
height=30,
rail_generator=rail_generator,
line_generator=line_generator,
number_of_agents=10,
malfunction_generator_and_process_data=malfunction_from_params(stochastic_data),
remove_agents_at_target=True)
env.reset(True, True)
#env.save(file_name)
RailEnvPersister.save(env, file_name)
return env
[tox]
envlist = py36, py37, examples, notebooks, flake8, docs, coverage, benchmarks
envlist = py37, py38, examples, docs, coverage
[travis]
python =
3.8: py38
3.7: py37
3.6: py36
[flake8]
max-line-length = 120
ignore = E121 E126 E123 E128 E133 E226 E241 E242 E704 W291 W293 W391 W503 W504 W505
[testenv:flake8]
basepython = python
basepython = python3.7
passenv = DISPLAY
deps =
-r{toxinidir}/requirements_dev.txt
......@@ -20,38 +21,52 @@ commands =
flake8 flatland tests examples benchmarks
[testenv:docs]
basepython = python
basepython = python3.7
whitelist_externals = make
passenv =
DISPLAY
HTTP_PROXY
HTTPS_PROXY
conda_deps =
tk
graphviz
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
changedir = {toxinidir}
commands =
make docs
[testenv:coverage]
basepython = python
basepython = python3.7
whitelist_externals = make
passenv =
DISPLAY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
conda_deps =
tk
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
changedir = {toxinidir}
commands =
make coverage
python make_coverage.py
[testenv:benchmarks]
basepython = python
basepython = python3.7
setenv =
PYTHONPATH = {toxinidir}
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
......@@ -59,11 +74,36 @@ whitelist_externals = sh
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
changedir = {toxinidir}
commands =
sh -c 'ls benchmarks/*.py | xargs -n 1 python'
python --version
python {toxinidir}/benchmarks/benchmark_all_examples.py
[testenv:profiling]
basepython = python3.7
setenv =
PYTHONPATH = {toxinidir}
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
conda_deps =
tk
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
changedir = {toxinidir}
commands =
python {toxinidir}/benchmarks/profile_all_examples.py
[testenv:examples]
basepython = python
; TODO should examples be run with py36 and py37??
basepython = python3.7
setenv =
PYTHONPATH = {toxinidir}
passenv =
......@@ -72,19 +112,24 @@ passenv =
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
whitelist_externals = sh
conda_deps =
tk
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
; run tests from subfolder to ensure that resources are accessed via resources and not via relative paths
changedir = {envtmpdir}/c236d3c240d61a0969d4cb59e2180ce5
commands =
sh -c 'echo DISPLAY=$DISPLAY'
sh -c 'echo XAUTHORITY=$XAUTHORITY'
; pipe echo into python since some examples expect input to close the window after the example is run
sh -c 'ls examples/*.py | xargs -I{} -n 1 sh -c "echo -e \"\n====== Running {} ========\n\"; echo "q" | python {}"'
python {toxinidir}/benchmarks/run_all_examples.py
[testenv:notebooks]
basepython = python
; TODO should examples be run with py36 and py37??
basepython = python3.7
setenv =
PYTHONPATH = {toxinidir}
PYTHONPATH = {envdir}
;{toxinidir}
passenv =
DISPLAY
XAUTHORITY
......@@ -92,28 +137,100 @@ passenv =
HTTP_PROXY
HTTPS_PROXY
whitelist_externals = sh
bash
pwd
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
conda_deps =
tk
conda_channels :
conda-forge
anaconda
; run tests from subfolder to ensure that resources are accessed via resources and not via relative paths
changedir = {envtmpdir}/6f59bc68108c3895b1828abdd04b9a06
commands =
sh -c 'jupyter nbextension enable --py --sys-prefix widgetsnbextension'
sh -c 'jupyter nbextension enable --py --sys-prefix jpy_canvas'
; https://stackoverflow.com/questions/35545402/how-to-run-an-ipynb-jupyter-notebook-from-terminal/35545463
sh -c 'ls notebooks/*.ipynb | xargs -n 1 jupyter nbconvert --to python'
sh -c 'ls notebooks/*.py | xargs -I{} -n 1 sh -c "echo -e \"\n====== Running {} ========\n\"; ipython {}"'
bash -c "pwd"
bash -c "echo $PYTHONPATH"
python -m jupyter nbextension install --py --sys-prefix widgetsnbextension
python -m jupyter nbextension enable --py --sys-prefix widgetsnbextension
python -m jupyter nbextension install --py --sys-prefix jpy_canvas
python -m jupyter nbextension enable --py --sys-prefix jpy_canvas
python {toxinidir}/notebooks/run_all_notebooks.py
[testenv]
[testenv:start_jupyter]
basepython = python3.7
setenv =
PYTHONPATH = {toxinidir}
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
whitelist_externals = sh
pip
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
conda_deps =
tk
conda_channels :
conda-forge
anaconda
changedir = {toxinidir}
commands =
python -m jupyter nbextension install --py --sys-prefix widgetsnbextension
python -m jupyter nbextension enable --py --sys-prefix widgetsnbextension
python -m jupyter nbextension install --py --sys-prefix jpy_canvas
python -m jupyter nbextension enable --py --sys-prefix jpy_canvas
python -m jupyter notebook
[testenv:py37]
platform = linux|linux2|darwin
basepython = python3.7
setenv =
PYTHONPATH = {toxinidir}
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
conda_deps =
tk
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
; run tests from subfolder to ensure that resources are accessed via resources and not via relative paths
changedir = {envtmpdir}/fefed3ba12bf1ed81dbcc20fb52706ea
commands =
python --version
python -m pytest --basetemp={envtmpdir} {toxinidir}
[testenv:py38]
platform = linux|linux2|darwin
basepython = python3.8
setenv =
PYTHONPATH = {toxinidir}
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
conda_deps =
tk
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
; run tests from subfolder to ensure that resources are accessed via resources and not via relative paths
changedir = {envtmpdir}/fefed3ba12bf1ed81dbcc20fb52706ea
commands =
sh -c 'echo DISPLAY: $DISPLAY'
py.test --basetemp={envtmpdir}
python --version
python -m pytest --basetemp={envtmpdir} {toxinidir}