Commit c5c0576e authored by u214892's avatar u214892
Browse files

#62 increase unit test coverage

parent 2b292dcc
Pipeline #1193 passed with stage
in 9 minutes and 32 seconds
from flatland.core.grid.grid4_utils import validate_new_transition
class AStarNode():
"""A node class for A* Pathfinding"""
def __init__(self, parent=None, pos=None):
self.parent = parent
self.pos = pos
self.g = 0
self.h = 0
self.f = 0
def __eq__(self, other):
return self.pos == other.pos
def __hash__(self):
return hash(self.pos)
def update_if_better(self, other):
if other.g < self.g:
self.parent = other.parent
self.g = other.g
self.h = other.h
self.f = other.f
def a_star(rail_trans, rail_array, start, end):
"""
Returns a list of tuples as a path from the given start to end.
If no path is found, returns path to closest point to end.
"""
rail_shape = rail_array.shape
start_node = AStarNode(None, start)
end_node = AStarNode(None, end)
open_nodes = set()
closed_nodes = set()
open_nodes.add(start_node)
while len(open_nodes) > 0:
# get node with current shortest est. path (lowest f)
current_node = None
for item in open_nodes:
if current_node is None:
current_node = item
continue
if item.f < current_node.f:
current_node = item
# pop current off open list, add to closed list
open_nodes.remove(current_node)
closed_nodes.add(current_node)
# found the goal
if current_node == end_node:
path = []
current = current_node
while current is not None:
path.append(current.pos)
current = current.parent
# return reversed path
return path[::-1]
# generate children
children = []
if current_node.parent is not None:
prev_pos = current_node.parent.pos
else:
prev_pos = None
for new_pos in [(0, -1), (0, 1), (-1, 0), (1, 0)]:
node_pos = (current_node.pos[0] + new_pos[0], current_node.pos[1] + new_pos[1])
if node_pos[0] >= rail_shape[0] or node_pos[0] < 0 or node_pos[1] >= rail_shape[1] or node_pos[1] < 0:
continue
# validate positions
if not validate_new_transition(rail_trans, rail_array, prev_pos, current_node.pos, node_pos, end_node.pos):
continue
# create new node
new_node = AStarNode(current_node, node_pos)
children.append(new_node)
# loop through children
for child in children:
# already in closed list?
if child in closed_nodes:
continue
# create the f, g, and h values
child.g = current_node.g + 1
# this heuristic favors diagonal paths:
# child.h = ((child.pos[0] - end_node.pos[0]) ** 2) + ((child.pos[1] - end_node.pos[1]) ** 2) \# noqa: E800
# this heuristic avoids diagonal paths
child.h = abs(child.pos[0] - end_node.pos[0]) + abs(child.pos[1] - end_node.pos[1])
child.f = child.g + child.h
# already in the open list?
if child in open_nodes:
continue
# add the child to the open list
open_nodes.add(child)
# no full path found
if len(open_nodes) == 0:
return []
from flatland.core.grid.grid4 import Grid4TransitionsEnum
def get_direction(pos1, pos2) -> Grid4TransitionsEnum:
"""
Assumes pos1 and pos2 are adjacent location on grid.
Returns direction (int) that can be used with transitions.
"""
diff_0 = pos2[0] - pos1[0]
diff_1 = pos2[1] - pos1[1]
if diff_0 < 0:
return 0
if diff_0 > 0:
return 2
if diff_1 > 0:
return 1
if diff_1 < 0:
return 3
raise Exception("Could not determine direction {}->{}".format(pos1, pos2))
def mirror(dir):
return (dir + 2) % 4
def validate_new_transition(rail_trans, rail_array, prev_pos, current_pos, new_pos, end_pos):
# start by getting direction used to get to current node
# and direction from current node to possible child node
new_dir = get_direction(current_pos, new_pos)
if prev_pos is not None:
current_dir = get_direction(prev_pos, current_pos)
else:
current_dir = new_dir
# create new transition that would go to child
new_trans = rail_array[current_pos]
if prev_pos is None:
if new_trans == 0:
# need to flip direction because of how end points are defined
new_trans = rail_trans.set_transition(new_trans, mirror(current_dir), new_dir, 1)
else:
# check if matches existing layout
new_trans = rail_trans.set_transition(new_trans, current_dir, new_dir, 1)
else:
# set the forward path
new_trans = rail_trans.set_transition(new_trans, current_dir, new_dir, 1)
# set the backwards path
new_trans = rail_trans.set_transition(new_trans, mirror(new_dir), mirror(current_dir), 1)
if new_pos == end_pos:
# need to validate end pos setup as well
new_trans_e = rail_array[end_pos]
if new_trans_e == 0:
# need to flip direction because of how end points are defined
new_trans_e = rail_trans.set_transition(new_trans_e, new_dir, mirror(new_dir), 1)
else:
# check if matches existing layout
new_trans_e = rail_trans.set_transition(new_trans_e, new_dir, new_dir, 1)
if not rail_trans.is_valid(new_trans_e):
return False
# is transition is valid?
return rail_trans.is_valid(new_trans)
def get_new_position(position, movement):
""" Utility function that converts a compass movement over a 2D grid to new positions (r, c). """
if movement == Grid4TransitionsEnum.NORTH:
return (position[0] - 1, position[1])
elif movement == Grid4TransitionsEnum.EAST:
return (position[0], position[1] + 1)
elif movement == Grid4TransitionsEnum.SOUTH:
return (position[0] + 1, position[1])
elif movement == Grid4TransitionsEnum.WEST:
return (position[0], position[1] - 1)
import numpy as np
def position_to_coordinate(depth, positions):
"""Converts coordinates to positions:
[ (0,0) (0,1) .. (0,w-1)
(1,0) (1,1) (1,w-1)
...
(d-1,0) (d-1,1) (d-1,w-1)
]
-->
[ 0 d .. (w-1)*d
1 d+1
...
d-1 2d-1 w*d-1
]
:param depth:
:param positions:
:return:
"""
coords = ()
for p in positions:
coords = coords + ((int(p) % depth, int(p) // depth),) # changed x_dim to y_dim
return coords
def coordinate_to_position(depth, coords):
"""
Converts positions to coordinates:
[ 0 d .. (w-1)*d
1 d+1
...
d-1 2d-1 w*d-1
]
-->
[ (0,0) (0,1) .. (0,w-1)
(1,0) (1,1) (1,w-1)
...
(d-1,0) (d-1,1) (d-1,w-1)
]
:param depth:
:param coords:
:return:
"""
position = np.empty(len(coords), dtype=int)
idx = 0
for t in coords:
position[idx] = int(t[1] * depth + t[0])
idx += 1
return position
def distance_on_rail(pos1, pos2):
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
......@@ -62,38 +62,6 @@ class RailEnvTransitions(Grid4Transitions):
print("S", format(cell_transition >> (1 * 4) & 0xF, '04b'))
print("W", format(cell_transition >> (0 * 4) & 0xF, '04b'))
def repr(self, cell_transition, version=0):
"""
Provide a string representation of the cell transitions.
This class doesn't represent an individual cell,
but a way of interpreting the contents of a cell.
So using the ad hoc name repr rather than __repr__.
"""
# binary format string without leading 0b
sbinTrans = format(cell_transition, "#018b")[2:]
if version == 0:
sRepr = " ".join([
"{}:{}".format(sDir, sbinTrans[i:(i + 4)])
for i, sDir in
zip(
range(0, len(sbinTrans), 4),
self.lsDirs)]) # NESW
return sRepr
if version == 1:
lsRepr = []
for iDirIn in range(0, 4):
sDirTrans = sbinTrans[(iDirIn * 4):(iDirIn * 4 + 4)]
if sDirTrans == "0000":
continue
sDirsOut = [
self.lsDirs[iDirOut]
for iDirOut in range(0, 4)
if sDirTrans[iDirOut] == "1"]
lsRepr.append(self.lsDirs[iDirIn] + ":" + "".join(sDirsOut))
return ", ".join(lsRepr)
def is_valid(self, cell_transition):
"""
Checks if a cell transition is a valid cell setup.
......
......@@ -4,20 +4,6 @@ import numpy as np
from attr import attrs, attrib
@attrs
class EnvDescription(object):
""" EnvDescription - This is a description of a random env,
based around the rail_generator and stats like size and n_agents.
It mirrors the parameters given to the RailEnv constructor.
Not currently used.
"""
n_agents = attrib()
height = attrib()
width = attrib()
rail_generator = attrib()
obs_builder = attrib() # not sure if this should closer to the agent than the env
@attrs
class EnvAgentStatic(object):
""" EnvAgentStatic - Stores initial position, direction and target.
......@@ -34,18 +20,6 @@ class EnvAgentStatic(object):
# cell if speed=1, as default)
speed_data = attrib(default=dict({'position_fraction': 0.0, 'speed': 1.0, 'transition_action_on_cellexit': 0}))
def __init__(self,
position,
direction,
target,
moving=False,
speed_data={'position_fraction': 0.0, 'speed': 1.0, 'transition_action_on_cellexit': 0}):
self.position = position
self.direction = direction
self.target = target
self.moving = moving
self.speed_data = speed_data
@classmethod
def from_lists(cls, positions, directions, targets, speeds=None):
""" Create a list of EnvAgentStatics from lists of positions, directions and targets
......@@ -84,12 +58,6 @@ class EnvAgent(EnvAgentStatic):
old_direction = attrib(default=None)
old_position = attrib(default=None)
def __init__(self, position, direction, target, handle, old_direction, old_position):
super(EnvAgent, self).__init__(position, direction, target)
self.handle = handle
self.old_direction = old_direction
self.old_position = old_position
def to_list(self):
return [
self.position, self.direction, self.target, self.handle,
......
......@@ -2,8 +2,10 @@ import numpy as np
from flatland.core.transition_map import GridTransitionMap
from flatland.core.grid.rail_env_grid import RailEnvTransitions
from flatland.envs.env_utils import distance_on_rail, connect_rail, get_direction, mirror
from flatland.envs.env_utils import get_rnd_agents_pos_tgt_dir_on_rail
from flatland.envs.grid4_generators_utils import connect_rail
from flatland.core.grid.grid_utils import distance_on_rail
from flatland.core.grid.grid4_utils import get_direction, mirror
from flatland.envs.grid4_generators_utils import get_rnd_agents_pos_tgt_dir_on_rail
def empty_rail_generator():
......
......@@ -7,238 +7,8 @@ a GridTransitionMap object.
import numpy as np
from flatland.core.grid.grid4 import Grid4TransitionsEnum
def get_direction(pos1, pos2) -> Grid4TransitionsEnum:
"""
Assumes pos1 and pos2 are adjacent location on grid.
Returns direction (int) that can be used with transitions.
"""
diff_0 = pos2[0] - pos1[0]
diff_1 = pos2[1] - pos1[1]
if diff_0 < 0:
return 0
if diff_0 > 0:
return 2
if diff_1 > 0:
return 1
if diff_1 < 0:
return 3
raise Exception("Could not determine direction {}->{}".format(pos1, pos2))
def mirror(dir):
return (dir + 2) % 4
def validate_new_transition(rail_trans, rail_array, prev_pos, current_pos, new_pos, end_pos):
# start by getting direction used to get to current node
# and direction from current node to possible child node
new_dir = get_direction(current_pos, new_pos)
if prev_pos is not None:
current_dir = get_direction(prev_pos, current_pos)
else:
current_dir = new_dir
# create new transition that would go to child
new_trans = rail_array[current_pos]
if prev_pos is None:
if new_trans == 0:
# need to flip direction because of how end points are defined
new_trans = rail_trans.set_transition(new_trans, mirror(current_dir), new_dir, 1)
else:
# check if matches existing layout
new_trans = rail_trans.set_transition(new_trans, current_dir, new_dir, 1)
else:
# set the forward path
new_trans = rail_trans.set_transition(new_trans, current_dir, new_dir, 1)
# set the backwards path
new_trans = rail_trans.set_transition(new_trans, mirror(new_dir), mirror(current_dir), 1)
if new_pos == end_pos:
# need to validate end pos setup as well
new_trans_e = rail_array[end_pos]
if new_trans_e == 0:
# need to flip direction because of how end points are defined
new_trans_e = rail_trans.set_transition(new_trans_e, new_dir, mirror(new_dir), 1)
else:
# check if matches existing layout
new_trans_e = rail_trans.set_transition(new_trans_e, new_dir, new_dir, 1)
if not rail_trans.is_valid(new_trans_e):
return False
# is transition is valid?
return rail_trans.is_valid(new_trans)
def position_to_coordinate(depth, positions):
"""Converts coordinates to positions:
[ (0,0) (0,1) .. (0,w-1)
(1,0) (1,1) (1,w-1)
...
(d-1,0) (d-1,1) (d-1,w-1)
]
-->
[ 0 d .. (w-1)*d
1 d+1
...
d-1 2d-1 w*d-1
]
:param depth:
:param positions:
:return:
"""
coords = ()
for p in positions:
coords = coords + ((int(p) % depth, int(p) // depth),) # changed x_dim to y_dim
return coords
def coordinate_to_position(depth, coords):
"""
Converts positions to coordinates:
[ 0 d .. (w-1)*d
1 d+1
...
d-1 2d-1 w*d-1
]
-->
[ (0,0) (0,1) .. (0,w-1)
(1,0) (1,1) (1,w-1)
...
(d-1,0) (d-1,1) (d-1,w-1)
]
:param depth:
:param coords:
:return:
"""
position = np.empty(len(coords), dtype=int)
idx = 0
for t in coords:
position[idx] = int(t[1] * depth + t[0])
idx += 1
return position
def get_new_position(position, movement):
""" Utility function that converts a compass movement over a 2D grid to new positions (r, c). """
if movement == Grid4TransitionsEnum.NORTH:
return (position[0] - 1, position[1])
elif movement == Grid4TransitionsEnum.EAST:
return (position[0], position[1] + 1)
elif movement == Grid4TransitionsEnum.SOUTH:
return (position[0] + 1, position[1])
elif movement == Grid4TransitionsEnum.WEST:
return (position[0], position[1] - 1)
class AStarNode():
"""A node class for A* Pathfinding"""
def __init__(self, parent=None, pos=None):
self.parent = parent
self.pos = pos
self.g = 0
self.h = 0
self.f = 0
def __eq__(self, other):
return self.pos == other.pos
def __hash__(self):
return hash(self.pos)
def update_if_better(self, other):
if other.g < self.g:
self.parent = other.parent
self.g = other.g
self.h = other.h
self.f = other.f
def a_star(rail_trans, rail_array, start, end):
"""
Returns a list of tuples as a path from the given start to end.
If no path is found, returns path to closest point to end.
"""
rail_shape = rail_array.shape
start_node = AStarNode(None, start)
end_node = AStarNode(None, end)
open_nodes = set()
closed_nodes = set()
open_nodes.add(start_node)
while len(open_nodes) > 0:
# get node with current shortest est. path (lowest f)
current_node = None
for item in open_nodes:
if current_node is None:
current_node = item
continue
if item.f < current_node.f:
current_node = item
# pop current off open list, add to closed list
open_nodes.remove(current_node)
closed_nodes.add(current_node)
# found the goal
if current_node == end_node:
path = []
current = current_node
while current is not None:
path.append(current.pos)
current = current.parent
# return reversed path
return path[::-1]
# generate children
children = []
if current_node.parent is not None:
prev_pos = current_node.parent.pos
else:
prev_pos = None
for new_pos in [(0, -1), (0, 1), (-1, 0), (1, 0)]:
node_pos = (current_node.pos[0] + new_pos[0], current_node.pos[1] + new_pos[1])
if node_pos[0] >= rail_shape[0] or node_pos[0] < 0 or node_pos[1] >= rail_shape[1] or node_pos[1] < 0:
continue
# validate positions
if not validate_new_transition(rail_trans, rail_array, prev_pos, current_node.pos, node_pos, end_node.pos):
continue
# create new node
new_node = AStarNode(current_node, node_pos)
children.append(new_node)
# loop through children
for child in children:
# already in closed list?
if child in closed_nodes:
continue
# create the f, g, and h values
child.g = current_node.g + 1
# this heuristic favors diagonal paths:
# child.h = ((child.pos[0] - end_node.pos[0]) ** 2) + ((child.pos[1] - end_node.pos[1]) ** 2) \# noqa: E800
# this heuristic avoids diagonal paths
child.h = abs(child.pos[0] - end_node.pos[0]) + abs(child.pos[1] - end_node.pos[1])
child.f = child.g + child.h
# already in the open list?
if child in open_nodes:
continue
# add the child to the open list
open_nodes.add(child)
# no full path found
if len(open_nodes) == 0:
return []
from flatland.core.grid.grid4_astar import a_star
from flatland.core.grid.grid4_utils import get_direction, mirror, get_new_position
def connect_rail(rail_trans, rail_array, start, end):
......@@ -287,10 +57,6 @@ def connect_rail(rail_trans, rail_array, start, end):
return path
def distance_on_rail(pos1, pos2):
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
def get_rnd_agents_pos_tgt_dir_on_rail(rail, num_agents):
"""
Given a `rail' GridTransitionMap, return a random placement of agents (initial position, direction and target).
......
......@@ -7,7 +7,7 @@ import numpy as np
from flatland.core.env_observation_builder import ObservationBuilder
from flatland.core.grid.grid4 import Grid4TransitionsEnum
from flatland.envs.env_utils import coordinate_to_position