Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • flatland/flatland
  • stefan_otte/flatland
  • jiaodaxiaozi/flatland
  • sfwatergit/flatland
  • utozx126/flatland
  • ChenKuanSun/flatland
  • ashivani/flatland
  • minhhoa/flatland
  • pranjal_dhole/flatland
  • darthgera123/flatland
  • rivesunder/flatland
  • thomaslecat/flatland
  • joel_joseph/flatland
  • kchour/flatland
  • alex_zharichenko/flatland
  • yoogottamk/flatland
  • troye_fang/flatland
  • elrichgro/flatland
  • jun_jin/flatland
  • nimishsantosh107/flatland
20 results
Show changes
import numpy as np
from flatland.core.grid.grid4 import Grid4TransitionsEnum
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv, RailEnvActions
from flatland.envs.rail_generators import sparse_rail_generator, rail_from_grid_transition_map
from flatland.envs.line_generators import sparse_line_generator
from flatland.utils.simple_rail import make_simple_rail
from test_utils import ReplayConfig, Replay, run_replay_config, set_penalties_for_replay
from flatland.envs.step_utils.states import TrainState
from flatland.envs.step_utils.speed_counter import SpeedCounter
# Use the sparse_rail_generator to generate feasible network configurations with corresponding tasks
# Training on simple small tasks is the best way to get familiar with the environment
#
class RandomAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.np_random = np.random.RandomState(seed=42)
def act(self, state):
"""
:param state: input is the observation of the agent
:return: returns an action
"""
return self.np_random.choice([1, 2, 3])
def step(self, memories):
"""
Step function to improve agent by adjusting policy given the observations
:param memories: SARS Tuple to be
:return:
"""
return
def save(self, filename):
# Store the current policy
return
def load(self, filename):
# Load a policy
return
def test_multi_speed_init():
env = RailEnv(width=50, height=50,
rail_generator=sparse_rail_generator(seed=2), line_generator=sparse_line_generator(),
random_seed=3,
number_of_agents=3)
# Initialize the agent with the parameters corresponding to the environment and observation_builder
agent = RandomAgent(218, 4)
# Empty dictionary for all agent action
action_dict = dict()
# Set all the different speeds
# Reset environment and get initial observations for all agents
env.reset(False, False)
env._max_episode_steps = 1000
for a_idx in range(len(env.agents)):
env.agents[a_idx].position = env.agents[a_idx].initial_position
env.agents[a_idx]._set_state(TrainState.MOVING)
# Here you can also further enhance the provided observation by means of normalization
# See training navigation example in the baseline repository
old_pos = []
for i_agent in range(env.get_num_agents()):
env.agents[i_agent].speed_counter = SpeedCounter(speed = 1. / (i_agent + 1))
old_pos.append(env.agents[i_agent].position)
print(env.agents[i_agent].position)
# Run episode
for step in range(100):
# Choose an action for each agent in the environment
for a in range(env.get_num_agents()):
action = agent.act(0)
action_dict.update({a: action})
# Check that agent did not move in between its speed updates
assert old_pos[a] == env.agents[a].position
# Environment step which returns the observations for all agents, their corresponding
# reward and whether they are done
_, _, _, _ = env.step(action_dict)
# Update old position whenever an agent was allowed to move
for i_agent in range(env.get_num_agents()):
if (step + 1) % (i_agent + 1) == 0:
print(step, i_agent, env.agents[i_agent].position)
old_pos[i_agent] = env.agents[i_agent].position
def test_multispeed_actions_no_malfunction_no_blocking():
"""Test that actions are correctly performed on cell exit for a single agent."""
rail, rail_map, optionals = make_simple_rail()
env = RailEnv(width=rail_map.shape[1], height=rail_map.shape[0], rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=1,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()))
env.reset()
env._max_episode_steps = 1000
set_penalties_for_replay(env)
test_config = ReplayConfig(
replay=[
Replay(
position=(3, 9), # east dead-end
direction=Grid4TransitionsEnum.EAST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 0.5 # starting and running at speed 0.5
),
Replay(
position=(3, 9),
direction=Grid4TransitionsEnum.EAST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_LEFT,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(4, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=RailEnvActions.STOP_MOVING,
reward=env.stop_penalty + env.step_penalty * 0.5 # stopping and step penalty
),
#
Replay(
position=(4, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=RailEnvActions.STOP_MOVING,
reward=env.step_penalty * 0.5 # step penalty for speed 0.5 when stopped
),
Replay(
position=(4, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 0.5 # starting + running at speed 0.5
),
Replay(
position=(4, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(5, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
],
target=(3, 0), # west dead-end
speed=0.5,
initial_position=(3, 9), # east dead-end
initial_direction=Grid4TransitionsEnum.EAST,
)
run_replay_config(env, [test_config], skip_reward_check=True, skip_action_required_check=True)
def test_multispeed_actions_no_malfunction_blocking():
"""The second agent blocks the first because it is slower."""
rail, rail_map, optionals = make_simple_rail()
env = RailEnv(width=rail_map.shape[1], height=rail_map.shape[0], rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=2,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()),
random_seed=1)
env.reset()
set_penalties_for_replay(env)
test_configs = [
ReplayConfig(
replay=[
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 1.0 / 3.0 # starting and running at speed 1/3
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
),
Replay(
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 1.0 / 3.0 # running at speed 1/3
)
],
target=(3, 0), # west dead-end
speed=1 / 3,
initial_position=(3, 8),
initial_direction=Grid4TransitionsEnum.WEST,
),
ReplayConfig(
replay=[
Replay(
position=(3, 9), # east dead-end
direction=Grid4TransitionsEnum.EAST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 0.5 # starting and running at speed 0.5
),
Replay(
position=(3, 9),
direction=Grid4TransitionsEnum.EAST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# blocked although fraction >= 1.0
Replay(
position=(3, 9),
direction=Grid4TransitionsEnum.EAST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# blocked although fraction >= 1.0
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# blocked although fraction >= 1.0
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_LEFT,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# not blocked, action required!
Replay(
position=(4, 6),
direction=Grid4TransitionsEnum.SOUTH,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
],
target=(3, 0), # west dead-end
speed=0.5,
initial_position=(3, 9), # east dead-end
initial_direction=Grid4TransitionsEnum.EAST,
)
]
run_replay_config(env, test_configs, skip_reward_check=True)
def test_multispeed_actions_malfunction_no_blocking():
"""Test on a single agent whether action on cell exit work correctly despite malfunction."""
rail, rail_map, optionals = make_simple_rail()
env = RailEnv(width=rail_map.shape[1], height=rail_map.shape[0], rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=1,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()))
env.reset()
# Perform DO_NOTHING actions until all trains get to READY_TO_DEPART
for _ in range(max([agent.earliest_departure for agent in env.agents]) + 1):
env.step({}) # DO_NOTHING for all agents
env._max_episode_steps = 10000
set_penalties_for_replay(env)
test_config = ReplayConfig(
replay=[
Replay( # 0
position=(3, 9), # east dead-end
direction=Grid4TransitionsEnum.EAST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 0.5 # starting and running at speed 0.5
),
Replay( # 1
position=(3, 9),
direction=Grid4TransitionsEnum.EAST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 2
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# add additional step in the cell
Replay( # 3
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
set_malfunction=2, # recovers in two steps from now!,
malfunction=2,
reward=env.step_penalty * 0.5 # step penalty for speed 0.5 when malfunctioning
),
# agent recovers in this step
Replay( # 4
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
malfunction=1,
reward=env.step_penalty * 0.5 # recovered: running at speed 0.5
),
Replay( # 5
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 6
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 7
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
set_malfunction=2, # recovers in two steps from now!
malfunction=2,
reward=env.step_penalty * 0.5 # step penalty for speed 0.5 when malfunctioning
),
# agent recovers in this step; since we're at the beginning, we provide a different action although we're broken!
Replay( # 8
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
malfunction=1,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 9
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 10
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.STOP_MOVING,
reward=env.stop_penalty + env.step_penalty * 0.5 # stopping and step penalty for speed 0.5
),
Replay( # 11
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.STOP_MOVING,
reward=env.step_penalty * 0.5 # step penalty for speed 0.5 while stopped
),
Replay( # 12
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.start_penalty + env.step_penalty * 0.5 # starting and running at speed 0.5
),
Replay( # 13
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
# DO_NOTHING keeps moving!
Replay( # 14
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.DO_NOTHING,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 15
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay( # 16
position=(3, 4),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
],
target=(3, 0), # west dead-end
speed=0.5,
initial_position=(3, 9), # east dead-end
initial_direction=Grid4TransitionsEnum.EAST,
)
run_replay_config(env, [test_config], skip_reward_check=True)
# TODO invalid action penalty seems only given when forward is not possible - is this the intended behaviour?
def test_multispeed_actions_no_malfunction_invalid_actions():
"""Test that actions are correctly performed on cell exit for a single agent."""
rail, rail_map, optionals = make_simple_rail()
env = RailEnv(width=rail_map.shape[1], height=rail_map.shape[0], rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=1,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()))
env.reset()
# Perform DO_NOTHING actions until all trains get to READY_TO_DEPART
for _ in range(max([agent.earliest_departure for agent in env.agents])):
env.step({}) # DO_NOTHING for all agents
env._max_episode_steps = 10000
set_penalties_for_replay(env)
test_config = ReplayConfig(
replay=[
Replay(
position=(3, 9), # east dead-end
direction=Grid4TransitionsEnum.EAST,
action=RailEnvActions.MOVE_LEFT,
reward=env.start_penalty + env.step_penalty * 0.5 # auto-correction left to forward without penalty!
),
Replay(
position=(3, 9),
direction=Grid4TransitionsEnum.EAST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 8),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_FORWARD,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 7),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_RIGHT,
reward=env.step_penalty * 0.5 # wrong action is corrected to forward without penalty!
),
Replay(
position=(3, 6),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
Replay(
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=RailEnvActions.MOVE_RIGHT,
reward=env.step_penalty * 0.5 # wrong action is corrected to forward without penalty!
), Replay(
position=(3, 5),
direction=Grid4TransitionsEnum.WEST,
action=None,
reward=env.step_penalty * 0.5 # running at speed 0.5
),
],
target=(3, 0), # west dead-end
speed=0.5,
initial_position=(3, 9), # east dead-end
initial_direction=Grid4TransitionsEnum.EAST,
)
run_replay_config(env, [test_config], skip_reward_check=True)
import pytest
@pytest.mark.skip(reason="Only for testing pettingzoo interface and wrappers")
def test_petting_zoo_interface_env():
import numpy as np
import os
import PIL
import shutil
from flatland.contrib.interface import flatland_env
from flatland.contrib.utils import env_generators
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
# First of all we import the Flatland rail environment
from flatland.utils.rendertools import RenderTool, AgentRenderVariant
from flatland.contrib.wrappers.flatland_wrappers import SkipNoChoiceCellsWrapper
from flatland.contrib.wrappers.flatland_wrappers import ShortestPathActionWrapper # noqa
# Custom observation builder without predictor
# observation_builder = GlobalObsForRailEnv()
# Custom observation builder with predictor
observation_builder = TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv(30))
seed = 11
save = True
np.random.seed(seed)
experiment_name = "flatland_pettingzoo"
total_episodes = 2
if save:
try:
if os.path.isdir(experiment_name):
shutil.rmtree(experiment_name)
os.mkdir(experiment_name)
except OSError as e:
print("Error: %s - %s." % (e.filename, e.strerror))
rail_env = env_generators.sparse_env_small(seed, observation_builder)
rail_env = env_generators.small_v0(seed, observation_builder)
rail_env.reset(random_seed=seed)
# For Shortest Path Action Wrapper, change action to 1
# rail_env = ShortestPathActionWrapper(rail_env)
rail_env = SkipNoChoiceCellsWrapper(rail_env, accumulate_skipped_rewards=False, discounting=0.0)
dones = {}
dones['__all__'] = False
step = 0
ep_no = 0
frame_list = []
all_actions_env = []
all_actions_pettingzoo_env = []
# while not dones['__all__']:
while ep_no < total_episodes:
action_dict = {}
# Chose an action for each agent
for a in range(rail_env.get_num_agents()):
# action = env_generators.get_shortest_path_action(rail_env, a)
action = 2
all_actions_env.append(action)
action_dict.update({a: action})
step += 1
# Do the environment step
observations, rewards, dones, information = rail_env.step(action_dict)
frame_list.append(PIL.Image.fromarray(rail_env.render(mode="rgb_array")))
if dones['__all__']:
completion = env_generators.perc_completion(rail_env)
print("Final Agents Completed:", completion)
ep_no += 1
if save:
frame_list[0].save(f"{experiment_name}{os.sep}out_{ep_no}.gif", save_all=True,
append_images=frame_list[1:], duration=3, loop=0)
frame_list = []
rail_env.reset(random_seed=seed+ep_no)
# __sphinx_doc_begin__
env = flatland_env.env(environment=rail_env)
seed = 11
env.reset(random_seed=seed)
step = 0
ep_no = 0
frame_list = []
while ep_no < total_episodes:
for agent in env.agent_iter():
obs, reward, done, info = env.last()
# act = env_generators.get_shortest_path_action(env.environment, get_agent_handle(agent))
act = 2
all_actions_pettingzoo_env.append(act)
env.step(act)
frame_list.append(PIL.Image.fromarray(env.render(mode='rgb_array')))
step += 1
# __sphinx_doc_end__
completion = env_generators.perc_completion(env)
print("Final Agents Completed:", completion)
ep_no += 1
if save:
frame_list[0].save(f"{experiment_name}{os.sep}pettyzoo_out_{ep_no}.gif", save_all=True,
append_images=frame_list[1:], duration=3, loop=0)
frame_list = []
env.close()
env.reset(random_seed=seed+ep_no)
min_len = min(len(all_actions_pettingzoo_env), len(all_actions_env))
assert all_actions_pettingzoo_env[:min_len] == all_actions_env[:min_len], "actions do not match"
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-sv", __file__]))
from examples.play_model import main
# from examples.tkplay import tkmain
def test_main():
main(render=True, n_steps=20, n_trials=2, sGL="PIL")
main(render=True, n_steps=20, n_trials=2, sGL="PILSVG")
if __name__ == "__main__":
test_main()
import numpy as np
from flatland.envs.observations import GlobalObsForRailEnv, TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import rail_from_grid_transition_map, sparse_rail_generator
from flatland.envs.line_generators import sparse_line_generator
from flatland.utils.simple_rail import make_simple_rail2
def ndom_seeding():
# Set fixed malfunction duration for this test
rail, rail_map, optionals = make_simple_rail2()
# Move target to unreachable position in order to not interfere with test
for idx in range(100):
env = RailEnv(width=25, height=30, rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(seed=12), number_of_agents=10)
env.reset(True, True, random_seed=1)
env.agents[0].target = (0, 0)
for step in range(10):
actions = {}
actions[0] = 2
env.step(actions)
agent_positions = []
env.agents[0].initial_position == (3, 2)
env.agents[1].initial_position == (3, 5)
env.agents[2].initial_position == (3, 6)
env.agents[3].initial_position == (5, 6)
env.agents[4].initial_position == (3, 4)
env.agents[5].initial_position == (3, 1)
env.agents[6].initial_position == (3, 9)
env.agents[7].initial_position == (4, 6)
env.agents[8].initial_position == (0, 3)
env.agents[9].initial_position == (3, 7)
# Test generation print
# for a in range(env.get_num_agents()):
# print("env.agents[{}].initial_position == {}".format(a,env.agents[a].initial_position))
# print("env.agents[0].initial_position == {}".format(env.agents[0].initial_position))
# print("assert env.agents[0].position == {}".format(env.agents[0].position))
def test_seeding_and_observations():
# Test if two different instances diverge with different observations
rail, rail_map, optionals = make_simple_rail2()
optionals['agents_hints']['num_agents'] = 10
# Make two seperate envs with different observation builders
# Global Observation
env = RailEnv(width=25, height=30, rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(seed=12), number_of_agents=10,
obs_builder_object=GlobalObsForRailEnv())
# Tree Observation
env2 = RailEnv(width=25, height=30, rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(seed=12), number_of_agents=10,
obs_builder_object=TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv()))
env.reset(False, False, random_seed=12)
env2.reset(False, False, random_seed=12)
# Check that both environments produce the same initial start positions
assert env.agents[0].initial_position == env2.agents[0].initial_position
assert env.agents[1].initial_position == env2.agents[1].initial_position
assert env.agents[2].initial_position == env2.agents[2].initial_position
assert env.agents[3].initial_position == env2.agents[3].initial_position
assert env.agents[4].initial_position == env2.agents[4].initial_position
assert env.agents[5].initial_position == env2.agents[5].initial_position
assert env.agents[6].initial_position == env2.agents[6].initial_position
assert env.agents[7].initial_position == env2.agents[7].initial_position
assert env.agents[8].initial_position == env2.agents[8].initial_position
assert env.agents[9].initial_position == env2.agents[9].initial_position
action_dict = {}
for step in range(10):
for a in range(env.get_num_agents()):
action = np.random.randint(4)
action_dict[a] = action
env.step(action_dict)
env2.step(action_dict)
# Check that both environments end up in the same position
assert env.agents[0].position == env2.agents[0].position
assert env.agents[1].position == env2.agents[1].position
assert env.agents[2].position == env2.agents[2].position
assert env.agents[3].position == env2.agents[3].position
assert env.agents[4].position == env2.agents[4].position
assert env.agents[5].position == env2.agents[5].position
assert env.agents[6].position == env2.agents[6].position
assert env.agents[7].position == env2.agents[7].position
assert env.agents[8].position == env2.agents[8].position
assert env.agents[9].position == env2.agents[9].position
for a in range(env.get_num_agents()):
print("assert env.agents[{}].position == env2.agents[{}].position".format(a, a))
def test_seeding_and_malfunction():
# Test if two different instances diverge with different observations
rail, rail_map, optionals = make_simple_rail2()
optionals['agents_hints']['num_agents'] = 10
stochastic_data = {'prop_malfunction': 0.4,
'malfunction_rate': 2,
'min_duration': 10,
'max_duration': 10}
# Make two seperate envs with different and see if the exhibit the same malfunctions
# Global Observation
for tests in range(1, 100):
env = RailEnv(width=25, height=30, rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=10,
obs_builder_object=GlobalObsForRailEnv())
# Tree Observation
env2 = RailEnv(width=25, height=30, rail_generator=rail_from_grid_transition_map(rail, optionals),
line_generator=sparse_line_generator(), number_of_agents=10,
obs_builder_object=GlobalObsForRailEnv())
env.reset(True, False, random_seed=tests)
env2.reset(True, False, random_seed=tests)
# Check that both environments produce the same initial start positions
assert env.agents[0].initial_position == env2.agents[0].initial_position
assert env.agents[1].initial_position == env2.agents[1].initial_position
assert env.agents[2].initial_position == env2.agents[2].initial_position
assert env.agents[3].initial_position == env2.agents[3].initial_position
assert env.agents[4].initial_position == env2.agents[4].initial_position
assert env.agents[5].initial_position == env2.agents[5].initial_position
assert env.agents[6].initial_position == env2.agents[6].initial_position
assert env.agents[7].initial_position == env2.agents[7].initial_position
assert env.agents[8].initial_position == env2.agents[8].initial_position
assert env.agents[9].initial_position == env2.agents[9].initial_position
action_dict = {}
for step in range(10):
for a in range(env.get_num_agents()):
action = np.random.randint(4)
action_dict[a] = action
# print("----------------------")
# print(env.agents[a].malfunction_handler, env.agents[a].status)
# print(env2.agents[a].malfunction_handler, env2.agents[a].status)
_, reward1, done1, _ = env.step(action_dict)
_, reward2, done2, _ = env2.step(action_dict)
for a in range(env.get_num_agents()):
assert reward1[a] == reward2[a]
assert done1[a] == done2[a]
# Check that both environments end up in the same position
assert env.agents[0].position == env2.agents[0].position
assert env.agents[1].position == env2.agents[1].position
assert env.agents[2].position == env2.agents[2].position
assert env.agents[3].position == env2.agents[3].position
assert env.agents[4].position == env2.agents[4].position
assert env.agents[5].position == env2.agents[5].position
assert env.agents[6].position == env2.agents[6].position
assert env.agents[7].position == env2.agents[7].position
assert env.agents[8].position == env2.agents[8].position
assert env.agents[9].position == env2.agents[9].position
def test_reproducability_env():
"""
Test that no random generators are present within the env that get influenced by external np random
"""
speed_ration_map = {1.: 1., # Fast passenger train
1. / 2.: 0., # Fast freight train
1. / 3.: 0., # Slow commuter train
1. / 4.: 0.} # Slow freight train
env = RailEnv(width=25, height=30, rail_generator=sparse_rail_generator(max_num_cities=5,
max_rails_between_cities=3,
seed=10, # Random seed
grid_mode=True
),
line_generator=sparse_line_generator(speed_ration_map), number_of_agents=1)
env.reset(True, True, random_seed=1)
excpeted_grid = [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 16386, 1025, 4608, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[16386, 17411, 1025, 5633, 17411, 3089, 1025, 1097, 5633, 17411, 1025, 5633, 1025, 1025, 1025, 1025, 5633, 17411, 1025, 1025, 1025, 5633, 17411, 1025, 4608],
[32800, 32800, 0, 72, 3089, 5633, 1025, 17411, 1097, 2064, 0, 72, 1025, 1025, 1025, 1025, 1097, 3089, 1025, 1025, 1025, 1097, 3089, 1025, 37408],
[32800, 32800, 0, 0, 0, 72, 1025, 2064, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[32800, 32872, 4608, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16386, 34864],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[32800, 32800, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[72, 37408, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800, 32800],
[0, 49186, 2064, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 72, 37408],
[0, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[0, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[0, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[0, 32800, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32800],
[0, 32872, 1025, 5633, 17411, 1025, 1025, 1025, 5633, 17411, 1025, 1025, 1025, 1025, 1025, 1025, 5633, 17411, 1025, 1025, 1025, 5633, 17411, 1025, 34864],
[0, 72, 1025, 1097, 3089, 1025, 1025, 1025, 1097, 3089, 1025, 1025, 1025, 1025, 1025, 1025, 1097, 3089, 1025, 1025, 1025, 1097, 3089, 1025, 2064],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]
assert env.rail.grid.tolist() == excpeted_grid
# Test that we don't have interference from calling mulitple function outisde
env2 = RailEnv(width=25, height=30, rail_generator=sparse_rail_generator(max_num_cities=5,
max_rails_between_cities=3,
seed=10, # Random seed
grid_mode=True
),
line_generator=sparse_line_generator(speed_ration_map), number_of_agents=1)
np.random.seed(1)
for i in range(10):
np.random.randn()
env2.reset(True, True, random_seed=1)
assert env2.rail.grid.tolist() == excpeted_grid
"""Test speed initialization by a map of speeds and their corresponding ratios."""
import numpy as np
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.envs.line_generators import speed_initialization_helper, sparse_line_generator
def test_speed_initialization_helper():
random_generator = np.random.RandomState()
random_generator.seed(10)
speed_ratio_map = {1: 0.3, 2: 0.4, 3: 0.3}
actual_speeds = speed_initialization_helper(10, speed_ratio_map, np_random=random_generator)
# seed makes speed_initialization_helper deterministic -> check generated speeds.
assert actual_speeds == [3, 1, 2, 3, 2, 1, 1, 3, 1, 1]
def test_rail_env_speed_intializer():
speed_ratio_map = {1: 0.3, 2: 0.4, 3: 0.1, 5: 0.2}
env = RailEnv(width=50, height=50,
rail_generator=sparse_rail_generator(), line_generator=sparse_line_generator(),
number_of_agents=10)
env.reset()
actual_speeds = list(map(lambda agent: agent.speed_counter.speed, env.agents))
expected_speed_set = set(speed_ratio_map.keys())
# check that the number of speeds generated is correct
assert len(actual_speeds) == env.get_num_agents()
# check that only the speeds defined are generated
assert all({(actual_speed in expected_speed_set) for actual_speed in actual_speeds})
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Tests for `flatland` package."""
from flatland.core.transitions import RailEnvTransitions, Grid8Transitions
# from flatland.envs.rail_env import validate_new_transition
from flatland.envs.env_utils import validate_new_transition
import numpy as np
def test_is_valid_railenv_transitions():
rail_env_trans = RailEnvTransitions()
transition_list = rail_env_trans.transitions
for t in transition_list:
assert(rail_env_trans.is_valid(t) is True)
for i in range(3):
rot_trans = rail_env_trans.rotate_transition(t, 90 * i)
assert(rail_env_trans.is_valid(rot_trans) is True)
assert(rail_env_trans.is_valid(int('1111111111110010', 2)) is False)
assert(rail_env_trans.is_valid(int('1001111111110010', 2)) is False)
assert(rail_env_trans.is_valid(int('1001111001110110', 2)) is False)
def test_adding_new_valid_transition():
rail_trans = RailEnvTransitions()
rail_array = np.zeros(shape=(15, 15), dtype=np.uint16)
# adding straight
assert(validate_new_transition(rail_trans, rail_array, (4, 5), (5, 5), (6, 5), (10, 10)) is True)
# adding valid right turn
assert(validate_new_transition(rail_trans, rail_array, (5, 4), (5, 5), (5, 6), (10, 10)) is True)
# adding valid left turn
assert(validate_new_transition(rail_trans, rail_array, (5, 6), (5, 5), (5, 6), (10, 10)) is True)
# adding invalid turn
rail_array[(5, 5)] = rail_trans.transitions[2]
assert(validate_new_transition(rail_trans, rail_array, (4, 5), (5, 5), (5, 6), (10, 10)) is False)
# should create #4 -> valid
rail_array[(5, 5)] = rail_trans.transitions[3]
assert(validate_new_transition(rail_trans, rail_array, (4, 5), (5, 5), (5, 6), (10, 10)) is True)
# adding invalid turn
rail_array[(5, 5)] = rail_trans.transitions[7]
assert(validate_new_transition(rail_trans, rail_array, (4, 5), (5, 5), (5, 6), (10, 10)) is False)
# test path start condition
rail_array[(5, 5)] = rail_trans.transitions[0]
assert(validate_new_transition(rail_trans, rail_array, None, (5, 5), (5, 6), (10, 10)) is True)
# test path end condition
rail_array[(5, 5)] = rail_trans.transitions[0]
assert(validate_new_transition(rail_trans, rail_array, (5, 4), (5, 5), (6, 5), (6, 5)) is True)
def test_valid_railenv_transitions():
rail_env_trans = RailEnvTransitions()
# dir_map = {'N': 0,
# 'E': 1,
# 'S': 2,
# 'W': 3}
for i in range(2):
assert(rail_env_trans.get_transitions(
int('1100110000110011', 2), i) == (1, 1, 0, 0))
assert(rail_env_trans.get_transitions(
int('1100110000110011', 2), 2 + i) == (0, 0, 1, 1))
no_transition_cell = int('0000000000000000', 2)
for i in range(4):
assert(rail_env_trans.get_transitions(
no_transition_cell, i) == (0, 0, 0, 0))
# Facing south, going south
north_south_transition = rail_env_trans.set_transitions(no_transition_cell, 2, (0, 0, 1, 0))
assert(rail_env_trans.set_transition(
north_south_transition, 2, 2, 0) == no_transition_cell)
assert(rail_env_trans.get_transition(
north_south_transition, 2, 2))
# Facing north, going east
south_east_transition = \
rail_env_trans.set_transition(no_transition_cell, 0, 1, 1)
assert(rail_env_trans.get_transition(
south_east_transition, 0, 1))
# The opposite transitions are not feasible
assert(not rail_env_trans.get_transition(
north_south_transition, 2, 0))
assert(not rail_env_trans.get_transition(
south_east_transition, 2, 1))
east_west_transition = rail_env_trans.rotate_transition(north_south_transition, 90)
north_west_transition = rail_env_trans.rotate_transition(south_east_transition, 180)
# Facing west, going west
assert(rail_env_trans.get_transition(
east_west_transition, 3, 3))
# Facing south, going west
assert(rail_env_trans.get_transition(
north_west_transition, 2, 3))
assert(south_east_transition == rail_env_trans.rotate_transition(
south_east_transition, 360))
def test_diagonal_transitions():
diagonal_trans_env = Grid8Transitions([])
# Facing north, going north-east
south_northeast_transition = int('01000000' + '0' * 8 * 7, 2)
assert(diagonal_trans_env.get_transitions(
south_northeast_transition, 0) == (0, 1, 0, 0, 0, 0, 0, 0))
# Allowing transition from north to southwest: Facing south, going SW
north_southwest_transition = \
diagonal_trans_env.set_transitions(int('0' * 64, 2), 4, (0, 0, 0, 0, 0, 1, 0, 0))
assert(diagonal_trans_env.rotate_transition(
south_northeast_transition, 180) == north_southwest_transition)
"""Test Utils."""
from typing import List, Tuple, Optional
import numpy as np
from attr import attrs, attrib
from flatland.core.grid.grid4 import Grid4TransitionsEnum
from flatland.envs.agent_utils import EnvAgent
from flatland.envs.malfunction_generators import MalfunctionParameters, malfunction_from_params
from flatland.envs.rail_env import RailEnvActions, RailEnv
from flatland.envs.rail_generators import RailGenerator
from flatland.envs.line_generators import LineGenerator
from flatland.utils.rendertools import RenderTool
from flatland.envs.persistence import RailEnvPersister
from flatland.envs.step_utils.states import TrainState
from flatland.envs.step_utils.speed_counter import SpeedCounter
@attrs
class Replay(object):
position = attrib(type=Tuple[int, int])
direction = attrib(type=Grid4TransitionsEnum)
action = attrib(type=RailEnvActions)
malfunction = attrib(default=0, type=int)
set_malfunction = attrib(default=None, type=Optional[int])
reward = attrib(default=None, type=Optional[float])
state = attrib(default=None, type=Optional[TrainState])
@attrs
class ReplayConfig(object):
replay = attrib(type=List[Replay])
target = attrib(type=Tuple[int, int])
speed = attrib(type=float)
initial_position = attrib(type=Tuple[int, int])
initial_direction = attrib(type=Grid4TransitionsEnum)
# ensure that env is working correctly with start/stop/invalidaction penalty different from 0
def set_penalties_for_replay(env: RailEnv):
env.step_penalty = -7
env.start_penalty = -13
env.stop_penalty = -19
env.invalid_action_penalty = -29
def run_replay_config(env: RailEnv, test_configs: List[ReplayConfig], rendering: bool = False, activate_agents=True,
skip_reward_check=False, set_ready_to_depart=False, skip_action_required_check=False):
"""
Runs the replay configs and checks assertions.
*Initially*
- The `initial_position`, `initial_direction`, `target` and `speed` are taken from the `ReplayConfig` to initialize the agents.
*Before each step*
- `position` is verfified
- `direction` is verified
- `status` is verified (optionally, only if not `None` in `Replay`)
- `set_malfunction` is applied (optionally, only if not `None` in `Replay`)
- `malfunction` is verified
- `action` must only be provided if action_required from previous step (initally all True)
*Step*
- performed with the given `action`
*After each step*
- `reward` is verified after step
Parameters
----------
activate_agents: should the agents directly be activated when the environment is initially setup by `reset()`?
env: the environment; is `reset()` to set the agents' intial position, direction, target and speed
test_configs: the `ReplayConfig`s, one for each agent
rendering: should be rendered during replay?
"""
if rendering:
renderer = RenderTool(env)
renderer.render_env(show=True, frames=False, show_observations=False)
info_dict = {
'action_required': [True for _ in test_configs]
}
for step in range(len(test_configs[0].replay)):
if step == 0:
for a, test_config in enumerate(test_configs):
agent: EnvAgent = env.agents[a]
# set the initial position
agent.initial_position = test_config.initial_position
agent.initial_direction = test_config.initial_direction
agent.direction = test_config.initial_direction
agent.target = test_config.target
agent.speed_counter = SpeedCounter(speed=test_config.speed)
env.reset(False, False)
if set_ready_to_depart:
# Set all agents to ready to depart
for i_agent in range(len(env.agents)):
env.agents[i_agent].earliest_departure = 0
env.agents[i_agent]._set_state(TrainState.READY_TO_DEPART)
elif activate_agents:
for a_idx in range(len(env.agents)):
env.agents[a_idx].position = env.agents[a_idx].initial_position
env.agents[a_idx]._set_state(TrainState.MOVING)
def _assert(a, actual, expected, msg):
print("[{}] verifying {} on agent {}: actual={}, expected={}".format(step, msg, a, actual, expected))
assert (actual == expected) or (
np.allclose(actual, expected)), "[{}] agent {} {}: actual={}, expected={}".format(step, a, msg,
actual,
expected)
action_dict = {}
for a, test_config in enumerate(test_configs):
agent: EnvAgent = env.agents[a]
replay = test_config.replay[step]
# if not agent.position == replay.position:
# import pdb; pdb.set_trace()
_assert(a, agent.position, replay.position, 'position')
_assert(a, agent.direction, replay.direction, 'direction')
if replay.state is not None:
_assert(a, agent.state, replay.state, 'state')
if replay.action is not None:
if not skip_action_required_check:
assert info_dict['action_required'][
a] == True or agent.state == TrainState.READY_TO_DEPART, "[{}] agent {} expecting action_required={} or agent status READY_TO_DEPART".format(
step, a, True)
action_dict[a] = replay.action
else:
if not skip_action_required_check:
assert info_dict['action_required'][
a] == False, "[{}] agent {} expecting action_required={}, but found {}".format(
step, a, False, info_dict['action_required'][a])
if replay.set_malfunction is not None:
# As we force malfunctions on the agents we have to set a positive rate that the env
# recognizes the agent as potentially malfuncitoning
# We also set next malfunction to infitiy to avoid interference with our tests
env.agents[a].malfunction_handler._set_malfunction_down_counter(replay.set_malfunction)
_assert(a, agent.malfunction_handler.malfunction_down_counter, replay.malfunction, 'malfunction')
print(step)
_, rewards_dict, _, info_dict = env.step(action_dict)
# import pdb; pdb.set_trace()
if rendering:
renderer.render_env(show=True, show_observations=True)
for a, test_config in enumerate(test_configs):
replay = test_config.replay[step]
if not skip_reward_check:
_assert(a, rewards_dict[a], replay.reward, 'reward')
def create_and_save_env(file_name: str, line_generator: LineGenerator, rail_generator: RailGenerator):
stochastic_data = MalfunctionParameters(malfunction_rate=1000, # Rate of malfunction occurence
min_duration=15, # Minimal duration of malfunction
max_duration=50 # Max duration of malfunction
)
env = RailEnv(width=30,
height=30,
rail_generator=rail_generator,
line_generator=line_generator,
number_of_agents=10,
malfunction_generator_and_process_data=malfunction_from_params(stochastic_data),
remove_agents_at_target=True)
env.reset(True, True)
#env.save(file_name)
RailEnvPersister.save(env, file_name)
return env
[tox]
envlist = py36, py37, flake8, docs, coverage, benchmark, sh
envlist = py37, py38, examples, docs, coverage
[travis]
python =
3.8: py38
3.7: py37
3.6: py36
[flake8]
max-line-length = 120
ignore = E121 E126 E123 E128 E133 E226 E241 E242 E704 W291 W293 W391 W503 W504 W505
[testenv:flake8]
basepython = python
deps = flake8
basepython = python3.7
passenv = DISPLAY
commands = flake8 flatland tests examples benchmarks
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
commands =
flake8 flatland tests examples benchmarks
[testenv:docs]
basepython = python
basepython = python3.7
whitelist_externals = make
passenv =
DISPLAY
commands = make docs
HTTP_PROXY
HTTPS_PROXY
conda_deps =
tk
graphviz
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
changedir = {toxinidir}
commands =
make docs
[testenv:coverage]
basepython = python
basepython = python3.7
whitelist_externals = make
passenv = DISPLAY
passenv =
DISPLAY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
conda_deps =
tk
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
changedir = {toxinidir}
commands =
pip install -U pip
pip install -r requirements_dev.txt
make coverage
python make_coverage.py
[testenv:benchmark]
basepython = python
[testenv:benchmarks]
basepython = python3.7
setenv =
PYTHONPATH = {toxinidir}
passenv = DISPLAY
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
whitelist_externals = sh
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
changedir = {toxinidir}
commands =
python --version
python {toxinidir}/benchmarks/benchmark_all_examples.py
[testenv:profiling]
basepython = python3.7
setenv =
PYTHONPATH = {toxinidir}
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
conda_deps =
tk
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
changedir = {toxinidir}
commands =
python {toxinidir}/benchmarks/profile_all_examples.py
[testenv:examples]
; TODO should examples be run with py36 and py37??
basepython = python3.7
setenv =
PYTHONPATH = {toxinidir}
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
conda_deps =
tk
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
; run tests from subfolder to ensure that resources are accessed via resources and not via relative paths
changedir = {envtmpdir}/c236d3c240d61a0969d4cb59e2180ce5
commands =
python {toxinidir}/benchmarks/run_all_examples.py
[testenv:notebooks]
; TODO should examples be run with py36 and py37??
basepython = python3.7
setenv =
PYTHONPATH = {envdir}
;{toxinidir}
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
whitelist_externals = sh
bash
pwd
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
conda_deps =
tk
conda_channels :
conda-forge
anaconda
; run tests from subfolder to ensure that resources are accessed via resources and not via relative paths
changedir = {envtmpdir}/6f59bc68108c3895b1828abdd04b9a06
commands =
sh -c 'ls benchmarks/*.py | xargs -n 1 python'
bash -c "pwd"
bash -c "echo $PYTHONPATH"
python -m jupyter nbextension install --py --sys-prefix widgetsnbextension
python -m jupyter nbextension enable --py --sys-prefix widgetsnbextension
python -m jupyter nbextension install --py --sys-prefix jpy_canvas
python -m jupyter nbextension enable --py --sys-prefix jpy_canvas
python {toxinidir}/notebooks/run_all_notebooks.py
[testenv]
[testenv:start_jupyter]
basepython = python3.7
setenv =
PYTHONPATH = {toxinidir}
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
whitelist_externals = sh
pip
deps =
-r{toxinidir}/requirements_dev.txt
-r{toxinidir}/requirements_continuous_integration.txt
conda_deps =
tk
conda_channels :
conda-forge
anaconda
changedir = {toxinidir}
commands =
python -m jupyter nbextension install --py --sys-prefix widgetsnbextension
python -m jupyter nbextension enable --py --sys-prefix widgetsnbextension
python -m jupyter nbextension install --py --sys-prefix jpy_canvas
python -m jupyter nbextension enable --py --sys-prefix jpy_canvas
python -m jupyter notebook
[testenv:py37]
platform = linux|linux2|darwin
basepython = python3.7
setenv =
PYTHONPATH = {toxinidir}
passenv = DISPLAY
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
conda_deps =
tk
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
; If you want to make tox run the tests with the same versions, create a
; requirements.txt with the pinned versions and uncomment the following line:
; -r{toxinidir}/requirements.txt
; run tests from subfolder to ensure that resources are accessed via resources and not via relative paths
changedir = {envtmpdir}/fefed3ba12bf1ed81dbcc20fb52706ea
commands =
pip install -U pip
pip install -r requirements_dev.txt
sh -c 'echo DISPLAY: $DISPLAY'
py.test --basetemp={envtmpdir}
python --version
python -m pytest --basetemp={envtmpdir} {toxinidir}
[testenv:py38]
platform = linux|linux2|darwin
basepython = python3.8
setenv =
PYTHONPATH = {toxinidir}
passenv =
DISPLAY
XAUTHORITY
; HTTP_PROXY+HTTPS_PROXY required behind corporate proxies
HTTP_PROXY
HTTPS_PROXY
conda_deps =
tk
conda_channels :
conda-forge
anaconda
deps =
-r{toxinidir}/requirements_dev.txt
; run tests from subfolder to ensure that resources are accessed via resources and not via relative paths
changedir = {envtmpdir}/fefed3ba12bf1ed81dbcc20fb52706ea
commands =
python --version
python -m pytest --basetemp={envtmpdir} {toxinidir}