Skip to content
Snippets Groups Projects
Commit 783345f7 authored by MasterScrat's avatar MasterScrat
Browse files

Round 2 modifications (infinite wave)

parent eb52f440
No related branches found
No related tags found
No related merge requests found
......@@ -50,24 +50,33 @@ m.patch()
########################################################
# Don't proceed to next Test if the previous one
# didn't reach this completion percentage on average
# didn't reach this mean completion percentage
TEST_MIN_PERCENTAGE_COMPLETE_MEAN = 0.25
# After this number of consecutive timeouts, kill the submission:
# this probably means the submission has crashed
MAX_SUCCESSIVE_TIMEOUTS = 10
# 8 hours
OVERALL_TIMEOUT = int(os.getenv(
"FLATLAND_OVERALL_TIMEOUT",
8 * 60 * 60)) # 8 hours
# 8 * 60 * 60))
15))
# 10 mins
INTIAL_PLANNING_TIMEOUT = int(os.getenv(
"FLATLAND_INITIAL_PLANNING_TIMEOUT",
10 * 60)) # 10 mins
10 * 60))
# 10 seconds
PER_STEP_TIMEOUT = int(os.getenv(
"FLATLAND_PER_STEP_TIMEOUT",
10)) # 10 seconds
10))
# 5 min - applies to the rest of the commands
DEFAULT_COMMAND_TIMEOUT = int(os.getenv(
"FLATLAND_DEFAULT_COMMAND_TIMEOUT",
1 * 60)) # 1 min
# This applies to the rest of the commands
MAX_SUCCESSIVE_TIMEOUTS = 10
5 * 60))
RANDOM_SEED = int(os.getenv("FLATLAND_EVALUATION_RANDOM_SEED", 1001))
......@@ -98,37 +107,38 @@ class FlatlandRemoteEvaluationService:
numpy arrays).
"""
def __init__(self,
test_env_folder="/tmp",
flatland_rl_service_id='FLATLAND_RL_SERVICE_ID',
remote_host='127.0.0.1',
remote_port=6379,
remote_db=0,
remote_password=None,
visualize=False,
video_generation_envs=[],
report=None,
verbose=False,
actionDir=None,
episodeDir=None,
mergeDir=None,
use_pickle=False,
shuffle=True,
missing_only=False,
result_output_path=None,
disable_timeouts=False
):
def __init__(
self,
test_env_folder="/tmp",
flatland_rl_service_id='FLATLAND_RL_SERVICE_ID',
remote_host='127.0.0.1',
remote_port=6379,
remote_db=0,
remote_password=None,
visualize=False,
video_generation_envs=[],
report=None,
verbose=False,
action_dir=None,
episode_dir=None,
merge_dir=None,
use_pickle=False,
shuffle=False,
missing_only=False,
result_output_path=None,
disable_timeouts=False
):
# Episode recording properties
self.actionDir = actionDir
if actionDir and not os.path.exists(self.actionDir):
os.makedirs(self.actionDir)
self.episodeDir = episodeDir
if episodeDir and not os.path.exists(self.episodeDir):
os.makedirs(self.episodeDir)
self.mergeDir = mergeDir
if mergeDir and not os.path.exists(self.mergeDir):
os.makedirs(self.mergeDir)
self.action_dir = action_dir
if action_dir and not os.path.exists(self.action_dir):
os.makedirs(self.action_dir)
self.episode_dir = episode_dir
if episode_dir and not os.path.exists(self.episode_dir):
os.makedirs(self.episode_dir)
self.merge_dir = merge_dir
if merge_dir and not os.path.exists(self.merge_dir):
os.makedirs(self.merge_dir)
self.use_pickle = use_pickle
self.missing_only = missing_only
self.disable_timeouts = disable_timeouts
......@@ -138,9 +148,9 @@ class FlatlandRemoteEvaluationService:
print("Timeout are DISABLED!")
print("=" * 20)
if not shuffle:
if shuffle:
print("=" * 20)
print("Env shuffling is DISABLED!")
print("Env shuffling is ENABLED! not suitable for infinite wave")
print("=" * 20)
# Test Env folder Paths
......@@ -161,13 +171,13 @@ class FlatlandRemoteEvaluationService:
self.report = report
# Use a state to swallow and ignore any steps after an env times out.
# this should be reset to False after env reset() to get the next env.
self.state_env_timed_out = False
# Count the number of successive timeouts (will kill after MAX_SUCCESSIVE_TIMEOUTS)
# This prevents a crashed submission to keep running forever
self.timeout_counter = 0
# Results are the metrics: percent done, rewards, timing...
self.result_output_path = result_output_path
# Communication Protocol Related vars
......@@ -177,7 +187,6 @@ class FlatlandRemoteEvaluationService:
self.namespace,
self.service_id
)
self.error_channel = "{}::{}::errors".format(
self.namespace,
self.service_id
......@@ -226,6 +235,7 @@ class FlatlandRemoteEvaluationService:
self.env_step_times = []
self.nb_malfunctioning_trains = []
self.overall_start_time = 0
self.overall_timeout_reached = False
self.begin_simulation = False
self.current_step = 0
self.current_test = -1
......@@ -244,7 +254,7 @@ class FlatlandRemoteEvaluationService:
def update_running_stats(self, key, scalar):
"""
Computes the running mean for certain params
Computes the running min/mean/max for given param
"""
mean_key = "{}_mean".format(key)
counter_key = "{}_counter".format(key)
......@@ -319,8 +329,6 @@ class FlatlandRemoteEvaluationService:
# Sort in proper numerical order
def get_file_order(filename):
# filename = os.path.relpath(f)
# filename = os.path.sep.join(filename.split(os.path.sep)[-2:])
test_id, level_id = self.get_env_test_and_level(filename)
value = test_id * 1000 + level_id
return value
......@@ -328,11 +336,11 @@ class FlatlandRemoteEvaluationService:
env_paths.sort(key=get_file_order)
# if requested, only generate actions for those envs which don't already have them
if self.mergeDir and self.missing_only:
if self.merge_dir and self.missing_only:
existing_paths = (itertools.chain.from_iterable(
[glob.glob(os.path.join(self.mergeDir, f"envs/*.{ext}"))
[glob.glob(os.path.join(self.merge_dir, f"envs/*.{ext}"))
for ext in ["pkl", "mpk"]]))
existing_paths = [os.path.relpath(sPath, self.mergeDir) for sPath in existing_paths]
existing_paths = [os.path.relpath(sPath, self.merge_dir) for sPath in existing_paths]
env_paths = set(env_paths) - set(existing_paths)
return env_paths
......@@ -346,10 +354,6 @@ class FlatlandRemoteEvaluationService:
else:
print(numbers)
raise ValueError("Unexpected file path, expects 'Test_<N>/Level_<M>.pkl', found", filename)
# TODO remove
#print(filename, test_id, level_id)
return test_id, level_id
def instantiate_evaluation_metadata(self):
......@@ -615,22 +619,23 @@ class FlatlandRemoteEvaluationService:
"""
# Check if the previous episode was finished
if not self.simulation_done:
if not self.simulation_done and not self.overall_timeout_reached:
_command_response = self._error_template("CAN'T CREATE NEW ENV BEFORE PREVIOUS IS DONE")
self.send_response(_command_response, command)
raise Exception(_command_response['payload'])
self.simulation_count += 1
self.simulation_done = False
if self.simulation_count == 0:
# Very first episode: start the overall timer
print("Starting overall timer...")
self.overall_start_time = time.time()
self.simulation_count += 1
self.simulation_done = False
# reset the timeout flag / state.
self.state_env_timed_out = False
if self.simulation_count < len(self.env_file_paths) or self.overall_timeout_reached:
if self.simulation_count < len(self.env_file_paths) and not self.overall_timeout_reached:
"""
There are still test envs left that are yet to be evaluated
"""
......@@ -726,17 +731,18 @@ class FlatlandRemoteEvaluationService:
#####################################################################
# Update evaluation state
#####################################################################
elapsed = time.time() - self.overall_start_time
progress = np.clip(
self.simulation_count * 1.0 / len(self.env_file_paths),
elapsed / OVERALL_TIMEOUT,
0, 1)
mean_reward, mean_normalized_reward, mean_percentage_complete = self.compute_mean_scores()
mean_reward, mean_normalized_reward, sum_normalized_reward, mean_percentage_complete = self.compute_mean_scores()
self.evaluation_state["state"] = "IN_PROGRESS"
self.evaluation_state["progress"] = progress
self.evaluation_state["simulation_count"] = self.simulation_count
self.evaluation_state["score"]["score"] = mean_percentage_complete
self.evaluation_state["score"]["score_secondary"] = mean_reward
self.evaluation_state["score"]["score"] = sum_normalized_reward
self.evaluation_state["score"]["score_secondary"] = mean_percentage_complete
self.evaluation_state["meta"]["normalized_reward"] = mean_normalized_reward
self.handle_aicrowd_info_event(self.evaluation_state)
self.lActions = []
......@@ -760,14 +766,24 @@ class FlatlandRemoteEvaluationService:
"Client attempted to perform an action on an Env which \
has done['__all__']==True")
overall_elapsed = (time.time() - self.overall_start_time) / 1000000
overall_elapsed = (time.time() - self.overall_start_time)
if overall_elapsed > OVERALL_TIMEOUT:
msg = "Reached time limit: took {:.2f}s, limit is {:.2f}s, went over by {:.2f}s".format(
overall_elapsed, OVERALL_TIMEOUT, overall_elapsed - OVERALL_TIMEOUT
)
_command_response = self._error_template(msg)
self.send_response(_command_response, command)
raise Exception(_command_response['payload'])
# _command_response = self._error_template(msg)
# self.send_response(_command_response, command)
# raise Exception(_command_response['payload'])
self.overall_timeout_reached = True
print("=" * 15)
print(msg)
print("Skipping these rewards...")
print("=" * 15)
return
# else:
# print("="*15)
# print("{}s left!".format(OVERALL_TIMEOUT - overall_elapsed))
action = _payload['action']
inference_time = _payload['inference_time']
......@@ -795,21 +811,21 @@ class FlatlandRemoteEvaluationService:
that episode
"""
self.simulation_rewards_normalized[-1] += \
cumulative_reward / (
(cumulative_reward / (
self.env._max_episode_steps *
self.env.get_num_agents()
)
))
# We count the number of agents that malfunctioned by checking how many have 1 more steps left before recovery
num_malfunctioning = sum(agent.malfunction_data['malfunction'] == 1 for agent in self.env.agents)
if num_malfunctioning > 0:
if self.verbose and num_malfunctioning > 0:
print("Step {}: {} agents have malfunctioned and will recover next step".format(self.current_step, num_malfunctioning))
self.nb_malfunctioning_trains[-1] += num_malfunctioning
# record the actions before checking for done
if self.actionDir is not None:
if self.action_dir is not None:
self.lActions.append(action)
# Is the episode over?
......@@ -830,9 +846,11 @@ class FlatlandRemoteEvaluationService:
percentage_complete = complete * 1.0 / self.env.get_num_agents()
self.simulation_percentage_complete[-1] = percentage_complete
# adds 1.0 so we can add them up
self.simulation_rewards_normalized[-1] += 1.0
if self.current_test not in self.simulation_percentage_complete_per_test:
self.simulation_percentage_complete_per_test[self.current_test] = []
self.simulation_percentage_complete_per_test[self.current_test].append(percentage_complete)
print("Percentage for test {}, level {}: {}".format(self.current_test, self.current_level, percentage_complete))
print(self.simulation_percentage_complete_per_test[self.current_test])
......@@ -851,13 +869,13 @@ class FlatlandRemoteEvaluationService:
self.evaluation_metadata_df.to_csv(self.result_output_path)
print("Wrote intermediate output results to : {}".format(self.result_output_path))
if self.actionDir is not None:
if self.action_dir is not None:
self.save_actions()
if self.episodeDir is not None:
if self.episode_dir is not None:
self.save_episode()
if self.mergeDir is not None:
if self.merge_dir is not None:
self.save_merged_env()
# Record Frame
......@@ -885,7 +903,7 @@ class FlatlandRemoteEvaluationService:
def save_actions(self):
sfEnv = self.env_file_paths[self.simulation_count]
sfActions = self.actionDir + "/" + sfEnv.replace(".pkl", ".json")
sfActions = self.action_dir + "/" + sfEnv.replace(".pkl", ".json")
print("env path: ", sfEnv, " sfActions:", sfActions)
......@@ -899,14 +917,14 @@ class FlatlandRemoteEvaluationService:
def save_episode(self):
sfEnv = self.env_file_paths[self.simulation_count]
sfEpisode = self.episodeDir + "/" + sfEnv
sfEpisode = self.episode_dir + "/" + sfEnv
print("env path: ", sfEnv, " sfEpisode:", sfEpisode)
RailEnvPersister.save_episode(self.env, sfEpisode)
# self.env.save_episode(sfEpisode)
def save_merged_env(self):
sfEnv = self.env_file_paths[self.simulation_count]
sfMergeEnv = self.mergeDir + "/" + sfEnv
sfMergeEnv = self.merge_dir + "/" + sfEnv
if not os.path.exists(os.path.dirname(sfMergeEnv)):
os.makedirs(os.path.dirname(sfMergeEnv))
......@@ -947,14 +965,14 @@ class FlatlandRemoteEvaluationService:
# Compute the evaluation metadata for the last episode
self.update_evaluation_metadata()
if len(self.simulation_rewards) != len(self.env_file_paths):
if len(self.simulation_rewards) != len(self.env_file_paths) and not self.overall_timeout_reached:
raise Exception(
"""env.submit called before the agent had the chance
to operate on all the test environments.
"""
)
mean_reward, mean_normalized_reward, mean_percentage_complete = self.compute_mean_scores()
mean_reward, mean_normalized_reward, sum_normalized_reward, mean_percentage_complete = self.compute_mean_scores()
if self.visualize and len(os.listdir(self.vizualization_folder_name)) > 0:
# Generate the video
......@@ -1015,11 +1033,12 @@ class FlatlandRemoteEvaluationService:
#####################################################################
# Update evaluation state
#####################################################################
self.evaluation_state["state"] = "FINISHED"
self.evaluation_state["progress"] = 1.0
self.evaluation_state["simulation_count"] = self.simulation_count
self.evaluation_state["score"]["score"] = mean_percentage_complete
self.evaluation_state["score"]["score_secondary"] = mean_reward
self.evaluation_state["score"]["score"] = sum_normalized_reward
self.evaluation_state["score"]["score_secondary"] = mean_percentage_complete
self.evaluation_state["meta"]["normalized_reward"] = mean_normalized_reward
self.evaluation_state["meta"]["reward"] = mean_reward
self.evaluation_state["meta"]["percentage_complete"] = mean_percentage_complete
......@@ -1028,8 +1047,9 @@ class FlatlandRemoteEvaluationService:
print("EVALUATION COMPLETE !!")
print("#" * 100)
print("# Mean Reward : {}".format(mean_reward))
print("# Sum Normalized Reward : {} (primary score)".format(sum_normalized_reward))
print("# Mean Percentage Complete : {} (secondary score)".format(mean_percentage_complete))
print("# Mean Normalized Reward : {}".format(mean_normalized_reward))
print("# Mean Percentage Complete : {}".format(mean_percentage_complete))
print("#" * 100)
print("#" * 100)
......@@ -1040,22 +1060,21 @@ class FlatlandRemoteEvaluationService:
# we group all the results by the test_ids
# so we first compute the mean in each of the test_id groups,
# and then we compute the mean across each of the test_id groups
#
#
#################################################################################
#################################################################################
source_df = self.evaluation_metadata_df.dropna()
grouped_df = source_df.groupby(['test_id']).mean()
#grouped_df = source_df.groupby(['test_id']).mean()
mean_reward = grouped_df["reward"].mean()
mean_normalized_reward = grouped_df["normalized_reward"].mean()
mean_percentage_complete = grouped_df["percentage_complete"].mean()
mean_reward = source_df["reward"].mean()
mean_normalized_reward = source_df["normalized_reward"].mean()
sum_normalized_reward = source_df["normalized_reward"].sum()
mean_percentage_complete = source_df["percentage_complete"].mean()
# Round off the reward values
mean_reward = round(mean_reward, 2)
mean_normalized_reward = round(mean_normalized_reward, 5)
mean_percentage_complete = round(mean_percentage_complete, 3)
return mean_reward, mean_normalized_reward, mean_percentage_complete
return mean_reward, mean_normalized_reward, sum_normalized_reward, mean_percentage_complete
def report_error(self, error_message, command_response_channel):
"""
......@@ -1255,10 +1274,10 @@ if __name__ == "__main__":
help="use pickle instead of msgpack",
required=False)
parser.add_argument('--noShuffle',
parser.add_argument('--shuffle',
default=False,
action="store_true",
help="don't shuffle the envs. Default is to shuffle.",
help="Shuffle the environments",
required=False)
parser.add_argument('--disableTimeouts',
......@@ -1294,11 +1313,11 @@ if __name__ == "__main__":
visualize=True,
video_generation_envs=["Test_0/Level_100.pkl"],
result_output_path=args.resultsDir,
actionDir=args.actionDir,
episodeDir=args.episodeDir,
mergeDir=args.mergeDir,
action_dir=args.actionDir,
episode_dir=args.episodeDir,
merge_dir=args.mergeDir,
use_pickle=args.pickle,
shuffle=not args.noShuffle,
shuffle=args.shuffle,
missing_only=args.missingOnly,
disable_timeouts=args.disableTimeouts
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment