Skip to content
Snippets Groups Projects
Commit 15801ce1 authored by Adrian Egli's avatar Adrian Egli
Browse files

other testing

parent 64c0f418
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,6 @@
"challenge_id": "neurips-2020-flatland-challenge",
"grader_id": "neurips-2020-flatland-challenge",
"debug": false,
"tags": ["RL"]
"tags": ["other"]
}
......@@ -4,10 +4,12 @@ import numpy as np
from flatland.envs.agent_utils import RailAgentStatus
from flatland.evaluators.client import FlatlandRemoteClient
#####################################################################
# Instantiate a Remote Client
#####################################################################
from src.extra import Extra
from src.simple.DeadLock_Avoidance import calculate_one_step_heuristics, calculate_one_step_package_implementation,calculate_one_step,calculate_one_step_primitive_implementation
remote_client = FlatlandRemoteClient()
......@@ -19,9 +21,16 @@ remote_client = FlatlandRemoteClient()
# compute the necessary action for this step for all (or even some)
# of the agents
#####################################################################
def my_controller(extra: Extra, observation, info):
def my_controller_RL(extra: Extra, observation, info):
return extra.rl_agent_act(observation, info)
def my_controller(local_env, obs, number_of_agents):
_action, _ = calculate_one_step(extra.env)
# _action, _ = calculate_one_step_package_implementation(local_env)
# _action, _ = calculate_one_step_primitive_implementation(local_env)
# _action, _ = calculate_one_step_heuristics(local_env)
return _action
#####################################################################
# Instantiate your custom Observation Builder
......
This diff is collapsed.
from enum import IntEnum
import numpy as np
class ProblemInstanceClass(IntEnum):
SHORTEST_PATH_ONLY = 0
SHORTEST_PATH_ORDERING_PROBLEM = 1
REQUIRE_ALTERNATIVE_PATH = 2
def check_is_only_shortest_path_problem(env, project_path_matrix):
x = project_path_matrix.copy()
x[x < 2] = 0
return np.sum(x) == 0
def check_is_shortest_path_and_ordering_problem(env, project_path_matrix):
x = project_path_matrix.copy()
for a in range(env.get_num_agents()):
# loop over all path and project start position and target into the project_path_matrix
agent = env.agents[a]
if x[agent.position[0]][agent.position[1]] > 1:
return False
if x[agent.target[0]][agent.target[1]] > 1:
return False
return True
def check_is_require_alternative_path(env, project_path_matrix):
paths = env.dev_pred_dict
for a in range(env.get_num_agents()):
agent = env.agents[a]
path = paths[a]
for path_loop in range(len(path)):
p = path[path_loop]
if p[0] == agent.target[0] and p[1] == agent.target[1]:
break
if project_path_matrix[p[0]][p[1]] > 1:
# potential overlapping path found
for opp_a in range(env.get_num_agents()):
opp_agent = env.agents[opp_a]
opp_path = paths[opp_a]
if p[0] == opp_agent.position[0] and p[1] == opp_agent.position[1]:
opp_path_loop = 0
tmp_path_loop = path_loop
while True:
if tmp_path_loop > len(path) - 1:
break
opp_p = opp_path[opp_path_loop]
tmp_p = path[tmp_path_loop + 1]
if opp_p[0] == opp_agent.target[0] and opp_p[1] == opp_agent.target[1]:
return True
if not (opp_p[0] == tmp_p[0] and opp_p[1] == tmp_p[1]):
break
if tmp_p[0] == agent.target[0] and tmp_p[1] == agent.target[1]:
break
opp_path_loop += 1
tmp_path_loop += 1
return False
def classify_problem_instance(env):
# shortest path from ShortesPathPredictorForRailEnv
paths = env.dev_pred_dict
project_path_matrix = np.zeros(shape=(env.height, env.width))
for a in range(env.get_num_agents()):
# loop over all path and project start position and target into the project_path_matrix
agent = env.agents[a]
project_path_matrix[agent.position[0]][agent.position[1]] += 1.0
project_path_matrix[agent.target[0]][agent.target[1]] += 1.0
if not (agent.target[0] == agent.position[0] and agent.target[1] == agent.position[1]):
# project the whole path into
path = paths[a]
for path_loop in range(len(path)):
p = path[path_loop]
if p[0] == agent.target[0] and p[1] == agent.target[1]:
break
else:
project_path_matrix[p[0]][p[1]] += 1.0
return \
{
# analyse : SHORTEST_PATH_ONLY -> if conflict_mat does not contain any number > 1
"SHORTEST_PATH_ONLY": check_is_only_shortest_path_problem(env, project_path_matrix),
# analyse : SHORTEST_PATH_ORDERING_PROBLEM -> if agent_start and agent_target position does not contain any number > 1
"SHORTEST_PATH_ORDERING_PROBLEM": check_is_shortest_path_and_ordering_problem(env, project_path_matrix),
# analyse : REQUIRE_ALTERNATIVE_PATH -> if agent_start and agent_target position does not contain any number > 1
"REQUIRE_ALTERNATIVE_PATH": check_is_require_alternative_path(env, project_path_matrix)
}
This diff is collapsed.
import numpy as np
from flatland.core.env_prediction_builder import PredictionBuilder
from flatland.core.grid.grid4_utils import get_new_position
from flatland.envs.rail_env import RailEnvActions
class AdrianShortestPathPredictorForRailEnv(PredictionBuilder):
"""
ShortestPathPredictorForRailEnv object.
This object returns shortest-path predictions for agents in the RailEnv environment.
The prediction acts as if no other agent is in the environment and always takes the forward action.
"""
def __init__(self, max_depth=20):
# Initialize with depth 20
self.max_depth = max_depth
def get(self, custom_args=None, handle=None):
"""
Called whenever get_many in the observation build is called.
Requires distance_map to extract the shortest path.
Parameters
-------
custom_args: dict
- distance_map : dict
handle : int (optional)
Handle of the agent for which to compute the observation vector.
Returns
-------
np.array
Returns a dictionary indexed by the agent handle and for each agent a vector of (max_depth + 1)x5 elements:
- time_offset
- position axis 0
- position axis 1
- direction
- action taken to come here
The prediction at 0 is the current position, direction etc.
"""
agents = self.env.agents
if handle:
agents = [self.env.agents[handle]]
assert custom_args is not None
distance_map = custom_args.get('distance_map')
assert distance_map is not None
prediction_dict = {}
for agent in agents:
_agent_initial_position = agent.position
_agent_initial_direction = agent.direction
prediction = np.zeros(shape=(self.max_depth + 1, 5))
prediction[0] = [0, *_agent_initial_position, _agent_initial_direction, 0]
visited = []
for index in range(1, self.max_depth + 1):
# if we're at the target, stop moving...
if agent.position == agent.target:
prediction[index] = [index, *agent.target, agent.direction, RailEnvActions.STOP_MOVING]
visited.append((agent.position[0], agent.position[1], agent.direction))
continue
# Take shortest possible path
cell_transitions = self.env.rail.get_transitions(*agent.position, agent.direction)
new_position = None
new_direction = None
if np.sum(cell_transitions) == 1:
new_direction = np.argmax(cell_transitions)
new_position = get_new_position(agent.position, new_direction)
elif np.sum(cell_transitions) > 1:
min_dist = np.inf
no_dist_found = True
for direction in range(4):
if cell_transitions[direction] == 1:
neighbour_cell = get_new_position(agent.position, direction)
target_dist = distance_map[agent.handle, neighbour_cell[0], neighbour_cell[1], direction]
if target_dist < min_dist or no_dist_found:
min_dist = target_dist
new_direction = direction
no_dist_found = False
new_position = get_new_position(agent.position, new_direction)
else:
print("--------------------")
print(agent.position, agent.direction, "valid:", self.env.rail.cell_neighbours_valid(
agent.position),
self.env.rail.get_full_transitions(agent.position[0],agent.position[1])
)
print("--------------------")
raise Exception("No transition possible {}".format(cell_transitions))
# update the agent's position and direction
agent.position = new_position
agent.direction = new_direction
# prediction is ready
prediction[index] = [index, *new_position, new_direction, 0]
visited.append((new_position[0], new_position[1], new_direction))
self.env.dev_pred_dict[agent.handle] = visited
prediction_dict[agent.handle] = prediction
# cleanup: reset initial position
agent.position = _agent_initial_position
agent.direction = _agent_initial_direction
return prediction_dict
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment