Commit 4102e91c authored by nilabha's avatar nilabha

Added pure imitation learning in rllib

parent 5349396b
from bayes_opt import BayesianOptimization, JSONLogger, Events
from libs.cell_graph_dispatcher import CellGraphDispatcher
from libs.cell_graph_validator import CellGraphValidator
import numpy as np
N = 15
seed = 42
np.random.seed(seed)
width = np.random.randint(20, 150, (N,))
height = np.random.randint(20, 150, (N,))
nr_train = np.random.randint(50, 200, (N,))
n_cities = np.random.randint(2, 35, (N,))
grid_distribution_of_cities = False
max_rails_between_cities = np.random.randint(2, 4, (N,))
max_rail_in_city = np.random.randint(3, 6, (N,))
malfunction_rate = np.random.randint(500, 4000, (N,))
#???
prop_malfunction = np.random.uniform(0.01, 0.01, (N,))
min_duration = np.random.randint(20, 80, (N,))
max_duration = np.random.randint(20, 80, (N,))
max_duration = np.maximum(min_duration, max_duration)
speed_ration_map = {1.: 0.25, # Fast passenger train
1. / 2.: 0.25, # Fast freight train
1. / 3.: 0.25, # Slow commuter train
1. / 4.: 0.25} # Slow freight train
#make half with square sizes + last one is 150x150x200 trains
width[N-1] = 150
height[N//2:] = width[N//2:]
nr_train[N-1] = 200
test = {
"width": width,
"height": height,
"trains": nr_train,
"seed": seed,
"cities": n_cities,
"rails_between_cities": max_rails_between_cities,
"rails_in_city": max_rail_in_city,
"malfunction_rate": malfunction_rate,
"prop_malfunction": prop_malfunction,
"min_prop": min_duration,
"max_prop": max_duration
}
#Run
def flatland_function(speed_coef, time_coef):
def get_dispatcher(env):
def get_sort_function(dispatcher: CellGraphDispatcher):
def sort(idx):
time = dispatcher.controllers[idx].dist_to_target[
dispatcher.graph._vertex_idx_from_point(env.agents[idx].initial_position), env.agents[
idx].initial_direction]
speed = env.agents[idx].speed_data['speed']
return speed * speed_coef + time * time_coef
return sort
return CellGraphDispatcher(env, sort_function=get_sort_function)
res = CellGraphValidator.multiple_tests(get_dispatcher, **test)
return res["finished"]
pbounds = {'speed_coef' : (-10000, 10000), 'time_coef': (-1, 1)}
optimizer = BayesianOptimization(
f=flatland_function,
pbounds=pbounds,
random_state=seed,
)
logger = JSONLogger(path="./opt_log.json")
optimizer.subscribe(Events.OPTMIZATION_STEP, logger)
optimizer.probe({'speed_coef' : -10000, 'time_coef': 1})
optimizer.probe({'speed_coef' : -10000, 'time_coef': -1})
optimizer.probe({'speed_coef' : +10000, 'time_coef': 1})
optimizer.maximize(init_points=10, n_iter=100)
print(optimizer.max)
\ No newline at end of file
import numpy as np
import time
# In Flatland you can use custom observation builders and predicitors
# Observation builders generate the observation needed by the controller
# Preditctors can be used to do short time prediction which can help in avoiding conflicts in the network
from flatland.envs.observations import GlobalObsForRailEnv, ObservationBuilder
# First of all we import the Flatland rail environment
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_env import RailEnvActions, RailAgentStatus
from flatland.envs.rail_generators import sparse_rail_generator
from flatland.envs.schedule_generators import sparse_schedule_generator
# We also include a renderer because we want to visualize what is going on in the environment
from flatland.utils.rendertools import RenderTool, AgentRenderVariant
from flatland.envs.malfunction_generators import malfunction_from_params
from libs.cell_graph_dispatcher import CellGraphDispatcher
start_time = time.time()
# width = 150 # With of map
# height = 150 # Height of map
# nr_trains = 200 # Number of trains that have an assigned task in the env
# cities_in_map = 35 # Number of cities where agents can start or end
# seed = 5 # Random seed
width = 50 # With of map
height = 50 # Height of map
nr_trains = 200 # Number of trains that have an assigned task in the env
cities_in_map = 35 # Number of cities where agents can start or end
seed = 5 # Random seed
# width = 150 # With of map
# height = 150 # Height of map
# nr_trains = 100 # Number of trains that have an assigned task in the env
# cities_in_map = 100 # Number of cities where agents can start or end
# seed = 14 # Random seed
grid_distribution_of_cities = False # Type of city distribution, if False cities are randomly placed
max_rails_between_cities = 2 # Max number of tracks allowed between cities. This is number of entry point to a city
max_rail_in_cities = 6 # Max number of parallel tracks within a city, representing a realistic trainstation
rail_generator = sparse_rail_generator(max_num_cities=cities_in_map,
seed=seed,
grid_mode=grid_distribution_of_cities,
max_rails_between_cities=max_rails_between_cities,
max_rails_in_city=max_rail_in_cities,
)
# The schedule generator can make very basic schedules with a start point, end point and a speed profile for each agent.
# The speed profiles can be adjusted directly as well as shown later on. We start by introducing a statistical
# distribution of speed profiles
# Different agent types (trains) with different speeds.
speed_ration_map = {1.: 0.25, # Fast passenger train
1. / 2.: 0.25, # Fast freight train
1. / 3.: 0.25, # Slow commuter train
1. / 4.: 0.25} # Slow freight train
# We can now initiate the schedule generator with the given speed profiles
schedule_generator = sparse_schedule_generator(speed_ration_map)
# We can furthermore pass stochastic data to the RailEnv constructor which will allow for stochastic malfunctions
# during an episode.
stochastic_data = {'malfunction_rate': 500, # Rate of malfunction occurence of single agent
'prop_malfunction': 0.01,
'min_duration': 20, # Minimal duration of malfunction
'max_duration': 80 # Max duration of malfunction
}
# Custom observation builder without predictor
class DummyObservationBuilder(ObservationBuilder):
"""
DummyObservationBuilder class which returns dummy observations
This is used in the evaluation service
"""
def __init__(self):
super().__init__()
def reset(self):
pass
def get_many(self, handles = None) -> bool:
return True
def get(self, handle: int = 0) -> bool:
return True
observation_builder = DummyObservationBuilder()
# Custom observation builder with predictor, uncomment line below if you want to try this one
# observation_builder = TreeObsForRailEnv(max_depth=2, predictor=ShortestPathPredictorForRailEnv())
# Construct the enviornment with the given observation, generataors, predictors, and stochastic data
env = RailEnv(width=width,
height=height,
rail_generator=rail_generator,
schedule_generator=schedule_generator,
number_of_agents=nr_trains,
malfunction_generator_and_process_data=malfunction_from_params(stochastic_data), # Malfunction data generator
obs_builder_object=observation_builder,
remove_agents_at_target=True # Removes agents at the end of their journey to make space for others
)
env.reset()
# Initiate the renderer
env_renderer = RenderTool(env, gl="PILSVG",
agent_render_variant=AgentRenderVariant.AGENT_SHOWS_OPTIONS_AND_BOX,
show_debug=False,
screen_height=1920, # Adjust these parameters to fit your resolution
screen_width=1080) # Adjust these parameters to fit your resolution
dispatcher = CellGraphDispatcher(env)
score = 0
# Run episode
frame_step = 0
step = 0
while True:
step += 1
action_dict = dispatcher.step(step)
# Environment step which returns the observations for all agents, their corresponding
# reward and whether their are done
next_obs, all_rewards, done, _ = env.step(action_dict)
env_renderer.render_env(show=True, show_observations=False, show_predictions=False)
# env_renderer.render_env(show=True, show_observations=True, show_predictions=True)
# os.makedirs('./misc/Fames2/', exist_ok=True)
# env_renderer.gl.save_image('./misc/Fames2/flatland_frame_{:04d}.png'.format(step))
frame_step += 1
score += np.sum(list(all_rewards.values()))
#
# observations = next_obs.copy()
finished = np.sum([a.status==RailAgentStatus.DONE or a.status==RailAgentStatus.DONE_REMOVED for a in env.agents])
print('Episode: Steps {}\t Score = {}\t Finished = {}'.format(step, score, finished))
if done['__all__']:
break
finished = np.sum([a.status==RailAgentStatus.DONE or a.status==RailAgentStatus.DONE_REMOVED for a in env.agents])
print(f'Trains finished {finished}/{len(env.agents)} = {finished*100/len(env.agents):.2f}%')
print(f'Total time: {time.time()-start_time}s')
This diff is collapsed.
from libs.cell_graph_validator import CellGraphValidator
from libs.cell_graph_dispatcher import CellGraphDispatcher
import numpy as np
def no_sort_dispatcher(env):
return CellGraphDispatcher(env)
micro_test = {
"width": [50],
"height": [50],
"cities": [10],
"trains": [50],
"seed": 42
}
mini_test = {
"width": [50, 50, 100, 125],
"height": [50, 50, 100, 125],
"cities": [10]*4,
"trains": [100, 200, 100, 100],
"seed": 42
}
#small_test
N = 15
seed = 42
np.random.seed(seed)
width = np.random.randint(20, 150, (N,))
height = np.random.randint(20, 150, (N,))
width[N-1] = 150
height[N//2:] = width[N//2:]
nr_train = np.random.randint(50, 200, (N,))
nr_train[N-1] = 200
n_cities = np.random.randint(2, 35, (N,))
grid_distribution_of_cities = False
max_rails_between_cities = np.random.randint(2, 4, (N,))
max_rail_in_city = np.random.randint(3, 6, (N,))
malfunction_rate = np.random.randint(500, 4000, (N,))
prop_malfunction = np.random.uniform(0.01, 0.01, (N,))
min_duration = np.random.randint(20, 80, (N,))
max_duration = np.random.randint(20, 80, (N,))
max_duration = np.maximum(min_duration, max_duration)
small_test = {
"width": width,
"height": height,
"trains": nr_train,
"seed": seed,
"cities": n_cities,
"rails_between_cities": max_rails_between_cities,
"rails_in_city": max_rail_in_city,
"malfunction_rate": malfunction_rate,
"prop_malfunction": prop_malfunction,
"min_prop": min_duration,
"max_prop": max_duration
}
#Run
CellGraphValidator.multiple_tests(no_sort_dispatcher, **mini_test)
import numpy as np
from collections import deque
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_env import RailEnvActions
class Vertex:
def __init__(self, y, x, idx):
self.point = (y, x)
self.idx = idx
self.out = [[], [], [], []]
self.in_edges = [[], [], [], []]
class Edge:
def __init__(self, start_v, end_v, start_dir, end_dir, action_type):
self.start_v = start_v
self.end_v = end_v
self.start_direction = start_dir
self.end_direction = end_dir
self.action_type = action_type
class CellGraph:
def __init__(self, env : RailEnv):
self.env = env
self._build_graph()
def _build_graph(self):
width = self.env.width
height = self.env.height
self.vertex_idx = np.zeros((height, width), dtype=np.int)
self.vertex_idx.fill(-1)
self.vertexes = []
for y in range(height):
for x in range(width):
if self._is_rail(y, x):
idx = len(self.vertexes)
self.vertexes.append(Vertex(y, x, idx))
self.vertex_idx[y, x] = idx
# print('vertexes:', len(self.vertexes))
edges_cnt = 0
for v_idx, v in enumerate(self.vertexes):
start_point = v.point
for direction in range(4):
directions = self._possible_directions(start_point, direction)
# assert len(directions) <= 2
for end_direction in directions:
next_point = self._next_point(start_point, end_direction)
end_v = self._vertex_idx_from_point(next_point)
action_type = self._action_from_directions(direction, end_direction)
e = Edge(v_idx, end_v, direction, end_direction, action_type)
v.out[direction].append(e)
self.vertexes[end_v].in_edges[end_direction].append(e)
edges_cnt += 1
# print('edges_cnt', edges_cnt)
def _is_rail(self, y, x):
return self.env.rail.grid[y, x] != 0
def _next_point(self, point, direction):
if direction==0:
return (point[0]-1, point[1])
elif direction==1:
return (point[0], point[1]+1)
elif direction==2:
return (point[0]+1, point[1])
else:
return (point[0], point[1]-1)
def _possible_directions(self, point, in_direction):
return np.flatnonzero(self.env.rail.get_transitions(point[0], point[1], in_direction))
def _vertex_idx_from_point(self, point):
assert (point[0] >= 0) and (point[0] < self.vertex_idx.shape[0])
assert (point[1] >= 0) and (point[1] < self.vertex_idx.shape[1])
return self.vertex_idx[point[0], point[1]]
def position_from_vertexid(self, vertexid: int):
return self.vertexes[vertexid].point
def _action_from_directions(self, in_direction, new_direction):
if in_direction==new_direction:
return RailEnvActions.MOVE_FORWARD
if (in_direction+1)%4 == new_direction:
return RailEnvActions.MOVE_RIGHT
elif (in_direction-1)%4 == new_direction:
return RailEnvActions.MOVE_LEFT
else:
return RailEnvActions.MOVE_FORWARD
This diff is collapsed.
import traceback
from copy import deepcopy
from typing import Dict
from flatland.envs.rail_env import RailEnv, RailAgentStatus, RailEnvActions
from libs import cell_graph_rescheduling, cell_graph_partial_rescheduling, cell_graph_rescheduling_data
from libs.cell_graph import CellGraph
from libs.cell_graph_agent import CellGraphAgent
from libs.cell_graph_locker import CellGraphLocker
class CellGraphDispatcher:
def __init__(self, env: RailEnv, sort_function=None):
self.env = env
self.graph = CellGraph(env)
self.locker = CellGraphLocker(self.graph)
max_steps = env._max_episode_steps
self.controllers = [CellGraphAgent(agent, self.graph, self.locker, i, max_steps) for i, agent in
enumerate(env.agents)]
self.action_dict = {}
if sort_function is None:
sort_function = lambda idx: self.controllers[idx].dist_to_target[
self.graph._vertex_idx_from_point(env.agents[idx].initial_position),
env.agents[idx].initial_direction] \
- 10000 * env.agents[idx].speed_data['speed']
else:
sort_function = sort_function(self)
self.agents_order = sorted(range(len(env.agents)), key=sort_function)
self.agent_locked_by_malfunction = []
for agent in env.agents:
self.agent_locked_by_malfunction.append(agent.malfunction_data['malfunction'] > 0)
self.crashed = False
self.blocked_agents = set()
def step(self, step) -> Dict[int, RailEnvActions]:
try:
has_new_malfunctions = False
for i, agent in enumerate(self.env.agents):
is_locked = agent.malfunction_data['malfunction']
if agent.status == RailAgentStatus.ACTIVE:
if (not self.agent_locked_by_malfunction[i]) and is_locked:
has_new_malfunctions = True
self.agent_locked_by_malfunction[i] = is_locked
updated = set()
full_recalc_needed = False
# old_locker = None
try:
if has_new_malfunctions:
# print('new malfunction at step', step)
# old_locker = deepcopy(self.locker)
cached_ways, vertex_agent_order, agent_way_position, agent_position_duration = \
cell_graph_rescheduling_data.get_rescheduling_data(self.env, step, self.controllers, self.graph,
self.locker)
vertex_agent_order2 = deepcopy(vertex_agent_order)
agent_way_position2 = deepcopy(agent_way_position)
agent_position_duration2 = deepcopy(agent_position_duration)
new_way, full_recalc_needed = cell_graph_rescheduling.reschedule(cached_ways, vertex_agent_order,
agent_way_position,
agent_position_duration,
self.env, step, self.controllers,
self.graph, self.locker)
for i in self.agents_order:
if len(new_way[i]):
changed = cell_graph_rescheduling.recover_agent_way(self.controllers[i], self.env.agents[i],
self.graph, new_way[i])
if changed:
updated.add(i)
# resheduling failed, try to make a partial rescheduling
except Exception as e:
print("-----------------Rescheduling Exception----------------")
print("Step: ", step)
# traceback.print_exc()
print("-----------------Rescheduling Exception----------------")
updated.clear()
full_recalc_needed = False
# if old_locker is not None:
# self.locker.data = old_locker.data
self.partial_resheduling(cached_ways, vertex_agent_order2, agent_way_position2,
agent_position_duration2, step)
self.limit_max_visited()
for i in self.agents_order:
try:
agent = self.env.agents[i]
# if agent.speed_data['position_fraction'] >= 1.0:
# print('agent', i, 'blocked by some another agent, fraction:', agent.speed_data['position_fraction'])
force_new_path = full_recalc_needed or self.crashed or i in updated
# force_new_path = full_recalc_needed or i in updated
# if (force_new_path and i in self.blocked_agents):
# # self.action_dict.update({i: RailEnvActions.DO_NOTHING})
# force_new_path = False
# # continue
if i in self.blocked_agents:
force_new_path = True
if agent.speed_data['position_fraction'] > 0.0 and not force_new_path:
self.action_dict.update({i: RailEnvActions.DO_NOTHING})
continue
# action = self.controllers[i].act(agent, step, force_new_path=has_new_malfunctions)
action = self.controllers[i].act(agent, step, force_new_path=force_new_path)
self.action_dict.update({i: action})
# act crashed tor one agent
except Exception as e:
print("-----------------Agent step Exception----------------", i)
print("Step: ", step)
# traceback.print_exc()
print("-----------------Agent step Exception----------------")
self.action_dict.update({i: RailEnvActions.DO_NOTHING})
self.limit_max_visited()
# pass
self.blocked_agents.clear()
self.crashed = False
# global step exception handling, no idea what to do here
except Exception as e:
# except ArithmeticError:
self.crashed = True
print("-----------------Step Exception----------------")
print("Step: ", step)
traceback.print_exc()
print("-----------------Step Exception----------------")
# hit_problem = False
# for j in self.agents_order:
# if j == i:
# hit_problem = True
# if hit_problem:
# self.action_dict.update({j: RailEnvActions.STOP_MOVING })
self.action_dict = {i: RailEnvActions.STOP_MOVING for i in self.agents_order}
self.limit_max_visited()
# raise e
return self.action_dict
def partial_resheduling(self, cached_ways, vertex_agent_order2, agent_way_position2, agent_position_duration2,
step):
print('partial_resheduling')
try:
new_way, blocked_agents = cell_graph_partial_rescheduling.partial_reschedule(cached_ways,
vertex_agent_order2,
agent_way_position2,
agent_position_duration2,
self.env, step,
self.controllers, self.graph,
self.locker)
for i in self.agents_order:
if len(new_way[i]):
cell_graph_rescheduling.recover_agent_way(self.controllers[i], self.env.agents[i], self.graph,
new_way[i])
self.blocked_agents.update(blocked_agents)
print('blocked agents', self.blocked_agents)
except Exception as e:
self.crashed = True
print("-----------------Partial rescheduing Exception----------------")
traceback.print_exc()
print("-----------------Partial rescheduing Exception----------------")
self.limit_max_visited()
def limit_max_visited(self):
for c in self.controllers:
c.set_max_visited(100)
import numpy as np
class CellGraphLocker:
def __init__(self, graph):
self.graph = graph
self.data = []
self.reset()
def reset(self):
vertexes = len(self.graph.vertexes)
self.data = [[] for i in range(vertexes)]
def lock(self, vertex_idx, agent_idx, duration):
# assert not self.is_locked(vertex_idx, agent_idx, duration)
if len(self.data[vertex_idx])==0:
self.data[vertex_idx].append((duration, agent_idx))
return
# index = self.equal_or_greater_index(vertex_idx, duration[0])
index = self.equal_or_greater_index_end(vertex_idx, duration[1])
if index < len(self.data[vertex_idx]):
curr_lock_info = self.data[vertex_idx][index]
if (curr_lock_info[1] == agent_idx) and self._has_intersection(curr_lock_info[0], duration):
assert (curr_lock_info[0][0] <= duration[0]) and (duration[1] <= curr_lock_info[0][1])
return
assert curr_lock_info[0][0] >= duration[1]
self.data[vertex_idx].insert(index, (duration, agent_idx))
# if (curr_lock_info[1]==agent_idx) and (curr_lock_info[0][1] == duration[1]) and (curr_lock_info[0][0] <= duration[0]):
# self.data[vertex_idx][index] = (duration, agent_idx)
# else:
# self.data[vertex_idx].insert(index, (duration, agent_idx) )
else:
self.data[vertex_idx].append((duration, agent_idx))
def is_locked(self, vertex_idx, agent_idx, duration):
if len(self.data[vertex_idx])==0:
return False
new_lock = (duration, agent_idx)