From 783345f73d0ec3e7bf49865a5f8b34f969023478 Mon Sep 17 00:00:00 2001
From: flaurent <florian.laurent@gmail.com>
Date: Sun, 6 Sep 2020 17:24:27 +0200
Subject: [PATCH] Round 2 modifications (infinite wave)

---
 flatland/evaluators/service.py | 205 ++++++++++++++++++---------------
 1 file changed, 112 insertions(+), 93 deletions(-)

diff --git a/flatland/evaluators/service.py b/flatland/evaluators/service.py
index f47c38b2..0277652b 100644
--- a/flatland/evaluators/service.py
+++ b/flatland/evaluators/service.py
@@ -50,24 +50,33 @@ m.patch()
 ########################################################
 
 # Don't proceed to next Test if the previous one
-# didn't reach this completion percentage on average
+# didn't reach this mean completion percentage
 TEST_MIN_PERCENTAGE_COMPLETE_MEAN = 0.25
 
+# After this number of consecutive timeouts, kill the submission:
+# this probably means the submission has crashed
+MAX_SUCCESSIVE_TIMEOUTS = 10
+
+# 8 hours
 OVERALL_TIMEOUT = int(os.getenv(
     "FLATLAND_OVERALL_TIMEOUT",
-    8 * 60 * 60))  # 8 hours
+    # 8 * 60 * 60))
+    15))
+
+# 10 mins
 INTIAL_PLANNING_TIMEOUT = int(os.getenv(
     "FLATLAND_INITIAL_PLANNING_TIMEOUT",
-    10 * 60))  # 10 mins
+    10 * 60))
+
+# 10 seconds
 PER_STEP_TIMEOUT = int(os.getenv(
     "FLATLAND_PER_STEP_TIMEOUT",
-    10))  # 10 seconds
+    10))
+
+# 5 min - applies to the rest of the commands
 DEFAULT_COMMAND_TIMEOUT = int(os.getenv(
     "FLATLAND_DEFAULT_COMMAND_TIMEOUT",
-    1 * 60))  # 1 min
-# This applies to the rest of the commands
-
-MAX_SUCCESSIVE_TIMEOUTS = 10
+    5 * 60))
 
 RANDOM_SEED = int(os.getenv("FLATLAND_EVALUATION_RANDOM_SEED", 1001))
 
@@ -98,37 +107,38 @@ class FlatlandRemoteEvaluationService:
     numpy arrays).
     """
 
-    def __init__(self,
-                 test_env_folder="/tmp",
-                 flatland_rl_service_id='FLATLAND_RL_SERVICE_ID',
-                 remote_host='127.0.0.1',
-                 remote_port=6379,
-                 remote_db=0,
-                 remote_password=None,
-                 visualize=False,
-                 video_generation_envs=[],
-                 report=None,
-                 verbose=False,
-                 actionDir=None,
-                 episodeDir=None,
-                 mergeDir=None,
-                 use_pickle=False,
-                 shuffle=True,
-                 missing_only=False,
-                 result_output_path=None,
-                 disable_timeouts=False
-                 ):
+    def __init__(
+        self,
+        test_env_folder="/tmp",
+        flatland_rl_service_id='FLATLAND_RL_SERVICE_ID',
+        remote_host='127.0.0.1',
+        remote_port=6379,
+        remote_db=0,
+        remote_password=None,
+        visualize=False,
+        video_generation_envs=[],
+        report=None,
+        verbose=False,
+        action_dir=None,
+        episode_dir=None,
+        merge_dir=None,
+        use_pickle=False,
+        shuffle=False,
+        missing_only=False,
+        result_output_path=None,
+        disable_timeouts=False
+    ):
 
         # Episode recording properties
-        self.actionDir = actionDir
-        if actionDir and not os.path.exists(self.actionDir):
-            os.makedirs(self.actionDir)
-        self.episodeDir = episodeDir
-        if episodeDir and not os.path.exists(self.episodeDir):
-            os.makedirs(self.episodeDir)
-        self.mergeDir = mergeDir
-        if mergeDir and not os.path.exists(self.mergeDir):
-            os.makedirs(self.mergeDir)
+        self.action_dir = action_dir
+        if action_dir and not os.path.exists(self.action_dir):
+            os.makedirs(self.action_dir)
+        self.episode_dir = episode_dir
+        if episode_dir and not os.path.exists(self.episode_dir):
+            os.makedirs(self.episode_dir)
+        self.merge_dir = merge_dir
+        if merge_dir and not os.path.exists(self.merge_dir):
+            os.makedirs(self.merge_dir)
         self.use_pickle = use_pickle
         self.missing_only = missing_only
         self.disable_timeouts = disable_timeouts
@@ -138,9 +148,9 @@ class FlatlandRemoteEvaluationService:
             print("Timeout are DISABLED!")
             print("=" * 20)
 
-        if not shuffle:
+        if shuffle:
             print("=" * 20)
-            print("Env shuffling is DISABLED!")
+            print("Env shuffling is ENABLED! not suitable for infinite wave")
             print("=" * 20)
 
         # Test Env folder Paths
@@ -161,13 +171,13 @@ class FlatlandRemoteEvaluationService:
         self.report = report
 
         # Use a state to swallow and ignore any steps after an env times out.
-        # this should be reset to False after env reset() to get the next env.
         self.state_env_timed_out = False
 
         # Count the number of successive timeouts (will kill after MAX_SUCCESSIVE_TIMEOUTS)
         # This prevents a crashed submission to keep running forever
         self.timeout_counter = 0
 
+        # Results are the metrics: percent done, rewards, timing...
         self.result_output_path = result_output_path
 
         # Communication Protocol Related vars
@@ -177,7 +187,6 @@ class FlatlandRemoteEvaluationService:
             self.namespace,
             self.service_id
         )
-
         self.error_channel = "{}::{}::errors".format(
             self.namespace,
             self.service_id
@@ -226,6 +235,7 @@ class FlatlandRemoteEvaluationService:
         self.env_step_times = []
         self.nb_malfunctioning_trains = []
         self.overall_start_time = 0
+        self.overall_timeout_reached = False
         self.begin_simulation = False
         self.current_step = 0
         self.current_test = -1
@@ -244,7 +254,7 @@ class FlatlandRemoteEvaluationService:
 
     def update_running_stats(self, key, scalar):
         """
-        Computes the running mean for certain params
+        Computes the running min/mean/max for given param
         """
         mean_key = "{}_mean".format(key)
         counter_key = "{}_counter".format(key)
@@ -319,8 +329,6 @@ class FlatlandRemoteEvaluationService:
 
         # Sort in proper numerical order
         def get_file_order(filename):
-            # filename = os.path.relpath(f)
-            # filename = os.path.sep.join(filename.split(os.path.sep)[-2:])
             test_id, level_id = self.get_env_test_and_level(filename)
             value = test_id * 1000 + level_id
             return value
@@ -328,11 +336,11 @@ class FlatlandRemoteEvaluationService:
         env_paths.sort(key=get_file_order)
 
         # if requested, only generate actions for those envs which don't already have them
-        if self.mergeDir and self.missing_only:
+        if self.merge_dir and self.missing_only:
             existing_paths = (itertools.chain.from_iterable(
-                [glob.glob(os.path.join(self.mergeDir, f"envs/*.{ext}"))
+                [glob.glob(os.path.join(self.merge_dir, f"envs/*.{ext}"))
                  for ext in ["pkl", "mpk"]]))
-            existing_paths = [os.path.relpath(sPath, self.mergeDir) for sPath in existing_paths]
+            existing_paths = [os.path.relpath(sPath, self.merge_dir) for sPath in existing_paths]
             env_paths = set(env_paths) - set(existing_paths)
 
         return env_paths
@@ -346,10 +354,6 @@ class FlatlandRemoteEvaluationService:
         else:
             print(numbers)
             raise ValueError("Unexpected file path, expects 'Test_<N>/Level_<M>.pkl', found", filename)
-
-        # TODO remove
-        #print(filename, test_id, level_id)
-
         return test_id, level_id
 
     def instantiate_evaluation_metadata(self):
@@ -615,22 +619,23 @@ class FlatlandRemoteEvaluationService:
         """
 
         # Check if the previous episode was finished
-        if not self.simulation_done:
+        if not self.simulation_done and not self.overall_timeout_reached:
             _command_response = self._error_template("CAN'T CREATE NEW ENV BEFORE PREVIOUS IS DONE")
             self.send_response(_command_response, command)
             raise Exception(_command_response['payload'])
 
+        self.simulation_count += 1
+        self.simulation_done = False
+
         if self.simulation_count == 0:
             # Very first episode: start the overall timer
+            print("Starting overall timer...")
             self.overall_start_time = time.time()
 
-        self.simulation_count += 1
-        self.simulation_done = False
-
         # reset the timeout flag / state.
         self.state_env_timed_out = False
 
-        if self.simulation_count < len(self.env_file_paths) or self.overall_timeout_reached:
+        if self.simulation_count < len(self.env_file_paths) and not self.overall_timeout_reached:
             """
             There are still test envs left that are yet to be evaluated 
             """
@@ -726,17 +731,18 @@ class FlatlandRemoteEvaluationService:
         #####################################################################
         # Update evaluation state
         #####################################################################
+        elapsed = time.time() - self.overall_start_time
         progress = np.clip(
-            self.simulation_count * 1.0 / len(self.env_file_paths),
+            elapsed / OVERALL_TIMEOUT,
             0, 1)
 
-        mean_reward, mean_normalized_reward, mean_percentage_complete = self.compute_mean_scores()
+        mean_reward, mean_normalized_reward, sum_normalized_reward, mean_percentage_complete = self.compute_mean_scores()
 
         self.evaluation_state["state"] = "IN_PROGRESS"
         self.evaluation_state["progress"] = progress
         self.evaluation_state["simulation_count"] = self.simulation_count
-        self.evaluation_state["score"]["score"] = mean_percentage_complete
-        self.evaluation_state["score"]["score_secondary"] = mean_reward
+        self.evaluation_state["score"]["score"] = sum_normalized_reward
+        self.evaluation_state["score"]["score_secondary"] = mean_percentage_complete
         self.evaluation_state["meta"]["normalized_reward"] = mean_normalized_reward
         self.handle_aicrowd_info_event(self.evaluation_state)
         self.lActions = []
@@ -760,14 +766,24 @@ class FlatlandRemoteEvaluationService:
                 "Client attempted to perform an action on an Env which \
                 has done['__all__']==True")
 
-        overall_elapsed = (time.time() - self.overall_start_time) / 1000000
+        overall_elapsed = (time.time() - self.overall_start_time)
         if overall_elapsed > OVERALL_TIMEOUT:
             msg = "Reached time limit: took {:.2f}s, limit is {:.2f}s, went over by {:.2f}s".format(
                 overall_elapsed, OVERALL_TIMEOUT, overall_elapsed - OVERALL_TIMEOUT
             )
-            _command_response = self._error_template(msg)
-            self.send_response(_command_response, command)
-            raise Exception(_command_response['payload'])
+            # _command_response = self._error_template(msg)
+            # self.send_response(_command_response, command)
+            # raise Exception(_command_response['payload'])
+            self.overall_timeout_reached = True
+
+            print("=" * 15)
+            print(msg)
+            print("Skipping these rewards...")
+            print("=" * 15)
+            return
+        # else:
+        #     print("="*15)
+        #     print("{}s left!".format(OVERALL_TIMEOUT - overall_elapsed))
 
         action = _payload['action']
         inference_time = _payload['inference_time']
@@ -795,21 +811,21 @@ class FlatlandRemoteEvaluationService:
         that episode
         """
         self.simulation_rewards_normalized[-1] += \
-            cumulative_reward / (
+            (cumulative_reward / (
                 self.env._max_episode_steps *
                 self.env.get_num_agents()
-            )
+            ))
 
         # We count the number of agents that malfunctioned by checking how many have 1 more steps left before recovery
         num_malfunctioning = sum(agent.malfunction_data['malfunction'] == 1 for agent in self.env.agents)
 
-        if num_malfunctioning > 0:
+        if self.verbose and num_malfunctioning > 0:
             print("Step {}: {} agents have malfunctioned and will recover next step".format(self.current_step, num_malfunctioning))
 
         self.nb_malfunctioning_trains[-1] += num_malfunctioning
 
         # record the actions before checking for done
-        if self.actionDir is not None:
+        if self.action_dir is not None:
             self.lActions.append(action)
 
         # Is the episode over?
@@ -830,9 +846,11 @@ class FlatlandRemoteEvaluationService:
             percentage_complete = complete * 1.0 / self.env.get_num_agents()
             self.simulation_percentage_complete[-1] = percentage_complete
 
+            # adds 1.0 so we can add them up
+            self.simulation_rewards_normalized[-1] += 1.0
+
             if self.current_test not in self.simulation_percentage_complete_per_test:
                 self.simulation_percentage_complete_per_test[self.current_test] = []
-
             self.simulation_percentage_complete_per_test[self.current_test].append(percentage_complete)
             print("Percentage for test {}, level {}: {}".format(self.current_test, self.current_level, percentage_complete))
             print(self.simulation_percentage_complete_per_test[self.current_test])
@@ -851,13 +869,13 @@ class FlatlandRemoteEvaluationService:
                 self.evaluation_metadata_df.to_csv(self.result_output_path)
                 print("Wrote intermediate output results to : {}".format(self.result_output_path))
 
-            if self.actionDir is not None:
+            if self.action_dir is not None:
                 self.save_actions()
 
-            if self.episodeDir is not None:
+            if self.episode_dir is not None:
                 self.save_episode()
 
-            if self.mergeDir is not None:
+            if self.merge_dir is not None:
                 self.save_merged_env()
 
         # Record Frame
@@ -885,7 +903,7 @@ class FlatlandRemoteEvaluationService:
     def save_actions(self):
         sfEnv = self.env_file_paths[self.simulation_count]
 
-        sfActions = self.actionDir + "/" + sfEnv.replace(".pkl", ".json")
+        sfActions = self.action_dir + "/" + sfEnv.replace(".pkl", ".json")
 
         print("env path: ", sfEnv, " sfActions:", sfActions)
 
@@ -899,14 +917,14 @@ class FlatlandRemoteEvaluationService:
 
     def save_episode(self):
         sfEnv = self.env_file_paths[self.simulation_count]
-        sfEpisode = self.episodeDir + "/" + sfEnv
+        sfEpisode = self.episode_dir + "/" + sfEnv
         print("env path: ", sfEnv, " sfEpisode:", sfEpisode)
         RailEnvPersister.save_episode(self.env, sfEpisode)
         # self.env.save_episode(sfEpisode)
 
     def save_merged_env(self):
         sfEnv = self.env_file_paths[self.simulation_count]
-        sfMergeEnv = self.mergeDir + "/" + sfEnv
+        sfMergeEnv = self.merge_dir + "/" + sfEnv
 
         if not os.path.exists(os.path.dirname(sfMergeEnv)):
             os.makedirs(os.path.dirname(sfMergeEnv))
@@ -947,14 +965,14 @@ class FlatlandRemoteEvaluationService:
         # Compute the evaluation metadata for the last episode
         self.update_evaluation_metadata()
 
-        if len(self.simulation_rewards) != len(self.env_file_paths):
+        if len(self.simulation_rewards) != len(self.env_file_paths) and not self.overall_timeout_reached:
             raise Exception(
                 """env.submit called before the agent had the chance 
                 to operate on all the test environments.
                 """
             )
 
-        mean_reward, mean_normalized_reward, mean_percentage_complete = self.compute_mean_scores()
+        mean_reward, mean_normalized_reward, sum_normalized_reward, mean_percentage_complete = self.compute_mean_scores()
 
         if self.visualize and len(os.listdir(self.vizualization_folder_name)) > 0:
             # Generate the video
@@ -1015,11 +1033,12 @@ class FlatlandRemoteEvaluationService:
         #####################################################################
         # Update evaluation state
         #####################################################################
+
         self.evaluation_state["state"] = "FINISHED"
         self.evaluation_state["progress"] = 1.0
         self.evaluation_state["simulation_count"] = self.simulation_count
-        self.evaluation_state["score"]["score"] = mean_percentage_complete
-        self.evaluation_state["score"]["score_secondary"] = mean_reward
+        self.evaluation_state["score"]["score"] = sum_normalized_reward
+        self.evaluation_state["score"]["score_secondary"] = mean_percentage_complete
         self.evaluation_state["meta"]["normalized_reward"] = mean_normalized_reward
         self.evaluation_state["meta"]["reward"] = mean_reward
         self.evaluation_state["meta"]["percentage_complete"] = mean_percentage_complete
@@ -1028,8 +1047,9 @@ class FlatlandRemoteEvaluationService:
         print("EVALUATION COMPLETE !!")
         print("#" * 100)
         print("# Mean Reward : {}".format(mean_reward))
+        print("# Sum Normalized Reward : {} (primary score)".format(sum_normalized_reward))
+        print("# Mean Percentage Complete : {} (secondary score)".format(mean_percentage_complete))
         print("# Mean Normalized Reward : {}".format(mean_normalized_reward))
-        print("# Mean Percentage Complete : {}".format(mean_percentage_complete))
         print("#" * 100)
         print("#" * 100)
 
@@ -1040,22 +1060,21 @@ class FlatlandRemoteEvaluationService:
         # we group all the results by the test_ids
         # so we first compute the mean in each of the test_id groups,
         # and then we compute the mean across each of the test_id groups
-        #
-        #
         #################################################################################
         #################################################################################
         source_df = self.evaluation_metadata_df.dropna()
-        grouped_df = source_df.groupby(['test_id']).mean()
+        #grouped_df = source_df.groupby(['test_id']).mean()
 
-        mean_reward = grouped_df["reward"].mean()
-        mean_normalized_reward = grouped_df["normalized_reward"].mean()
-        mean_percentage_complete = grouped_df["percentage_complete"].mean()
+        mean_reward = source_df["reward"].mean()
+        mean_normalized_reward = source_df["normalized_reward"].mean()
+        sum_normalized_reward = source_df["normalized_reward"].sum()
+        mean_percentage_complete = source_df["percentage_complete"].mean()
         # Round off the reward values
         mean_reward = round(mean_reward, 2)
         mean_normalized_reward = round(mean_normalized_reward, 5)
         mean_percentage_complete = round(mean_percentage_complete, 3)
 
-        return mean_reward, mean_normalized_reward, mean_percentage_complete
+        return mean_reward, mean_normalized_reward, sum_normalized_reward, mean_percentage_complete
 
     def report_error(self, error_message, command_response_channel):
         """
@@ -1255,10 +1274,10 @@ if __name__ == "__main__":
                         help="use pickle instead of msgpack",
                         required=False)
 
-    parser.add_argument('--noShuffle',
+    parser.add_argument('--shuffle',
                         default=False,
                         action="store_true",
-                        help="don't shuffle the envs.  Default is to shuffle.",
+                        help="Shuffle the environments",
                         required=False)
 
     parser.add_argument('--disableTimeouts',
@@ -1294,11 +1313,11 @@ if __name__ == "__main__":
         visualize=True,
         video_generation_envs=["Test_0/Level_100.pkl"],
         result_output_path=args.resultsDir,
-        actionDir=args.actionDir,
-        episodeDir=args.episodeDir,
-        mergeDir=args.mergeDir,
+        action_dir=args.actionDir,
+        episode_dir=args.episodeDir,
+        merge_dir=args.mergeDir,
         use_pickle=args.pickle,
-        shuffle=not args.noShuffle,
+        shuffle=args.shuffle,
         missing_only=args.missingOnly,
         disable_timeouts=args.disableTimeouts
     )
-- 
GitLab