From eb52f44048664712bf6c7d5486911b7b6469db36 Mon Sep 17 00:00:00 2001
From: flaurent <florian.laurent@gmail.com>
Date: Fri, 4 Sep 2020 00:03:16 +0200
Subject: [PATCH] Infinite wave: overall timeout, min percentage per Test,
 evaluation in order, doubled pre-planning/step time limits

---
 flatland/evaluators/service.py | 152 ++++++++++++++++++++++++---------
 1 file changed, 113 insertions(+), 39 deletions(-)

diff --git a/flatland/evaluators/service.py b/flatland/evaluators/service.py
index 7eef7f06..f47c38b2 100644
--- a/flatland/evaluators/service.py
+++ b/flatland/evaluators/service.py
@@ -48,12 +48,20 @@ m.patch()
 ########################################################
 # CONSTANTS
 ########################################################
+
+# Don't proceed to next Test if the previous one
+# didn't reach this completion percentage on average
+TEST_MIN_PERCENTAGE_COMPLETE_MEAN = 0.25
+
+OVERALL_TIMEOUT = int(os.getenv(
+    "FLATLAND_OVERALL_TIMEOUT",
+    8 * 60 * 60))  # 8 hours
 INTIAL_PLANNING_TIMEOUT = int(os.getenv(
     "FLATLAND_INITIAL_PLANNING_TIMEOUT",
-    5 * 60))  # 5 mins
+    10 * 60))  # 10 mins
 PER_STEP_TIMEOUT = int(os.getenv(
     "FLATLAND_PER_STEP_TIMEOUT",
-    5))  # 5 seconds
+    10))  # 10 seconds
 DEFAULT_COMMAND_TIMEOUT = int(os.getenv(
     "FLATLAND_DEFAULT_COMMAND_TIMEOUT",
     1 * 60))  # 1 min
@@ -212,12 +220,16 @@ class FlatlandRemoteEvaluationService:
         self.simulation_rewards = []
         self.simulation_rewards_normalized = []
         self.simulation_percentage_complete = []
+        self.simulation_percentage_complete_per_test = {}
         self.simulation_steps = []
         self.simulation_times = []
         self.env_step_times = []
         self.nb_malfunctioning_trains = []
+        self.overall_start_time = 0
         self.begin_simulation = False
         self.current_step = 0
+        self.current_test = -1
+        self.current_level = -1
         self.visualize = visualize
         self.vizualization_folder_name = "./.visualizations"
         self.record_frame_step = 0
@@ -293,26 +305,24 @@ class FlatlandRemoteEvaluationService:
                 ├── .......
                 └── Level_99.pkl
         """
-        env_paths = sorted(
-            glob.glob(
-                os.path.join(
-                    self.test_env_folder,
-                    "*/*.pkl"
-                )
+        env_paths = glob.glob(
+            os.path.join(
+                self.test_env_folder,
+                "*/*.pkl"
             )
         )
 
         # Remove the root folder name from the individual
         # lists, so that we only have the path relative
         # to the test root folder
-        env_paths = sorted([os.path.relpath(
-            x, self.test_env_folder
-        ) for x in env_paths])
-
-        # Sort in proper order
-        def get_file_order(f):
-            numbers = re.findall(r'\d+', os.path.relpath(f))
-            value = int(numbers[0]) * 1000 + int(numbers[1])
+        env_paths = [os.path.relpath(x, self.test_env_folder) for x in env_paths]
+
+        # Sort in proper numerical order
+        def get_file_order(filename):
+            # filename = os.path.relpath(f)
+            # filename = os.path.sep.join(filename.split(os.path.sep)[-2:])
+            test_id, level_id = self.get_env_test_and_level(filename)
+            value = test_id * 1000 + level_id
             return value
 
         env_paths.sort(key=get_file_order)
@@ -327,6 +337,21 @@ class FlatlandRemoteEvaluationService:
 
         return env_paths
 
+    def get_env_test_and_level(self, filename):
+        numbers = re.findall(r'\d+', os.path.relpath(filename))
+
+        if len(numbers) == 2:
+            test_id = int(numbers[0])
+            level_id = int(numbers[1])
+        else:
+            print(numbers)
+            raise ValueError("Unexpected file path, expects 'Test_<N>/Level_<M>.pkl', found", filename)
+
+        # TODO remove
+        #print(filename, test_id, level_id)
+
+        return test_id, level_id
+
     def instantiate_evaluation_metadata(self):
         """
             This instantiates a pandas dataframe to record
@@ -587,30 +612,55 @@ class FlatlandRemoteEvaluationService:
     def handle_env_create(self, command):
         """
         Handles a ENV_CREATE command from the client
-        TODO: Add a high level summary of everything thats happening here.
         """
+
+        # Check if the previous episode was finished
         if not self.simulation_done:
-            # trying to reset a simulation before finishing the previous one
             _command_response = self._error_template("CAN'T CREATE NEW ENV BEFORE PREVIOUS IS DONE")
             self.send_response(_command_response, command)
             raise Exception(_command_response['payload'])
 
+        if self.simulation_count == 0:
+            # Very first episode: start the overall timer
+            self.overall_start_time = time.time()
+
         self.simulation_count += 1
         self.simulation_done = False
 
         # reset the timeout flag / state.
         self.state_env_timed_out = False
 
-        if self.simulation_count < len(self.env_file_paths):
+        if self.simulation_count < len(self.env_file_paths) or self.overall_timeout_reached:
             """
             There are still test envs left that are yet to be evaluated 
             """
+
             test_env_file_path = self.env_file_paths[self.simulation_count]
             print("Evaluating {} ({}/{})".format(test_env_file_path, self.simulation_count, len(self.env_file_paths)))
+            env_test, env_level = self.get_env_test_and_level(test_env_file_path)
+
             test_env_file_path = os.path.join(
                 self.test_env_folder,
                 test_env_file_path
             )
+
+            if self.current_test != env_test and env_test != 0:
+                if self.current_test not in self.simulation_percentage_complete_per_test:
+                    raise Exception("Missing percentages for previous test: test {}".format(self.current_test))
+
+                # Check if episodes from the previous test had good enough results
+                mean_test_complete_percentage = np.mean(self.simulation_percentage_complete_per_test[self.current_test])
+                if mean_test_complete_percentage >= TEST_MIN_PERCENTAGE_COMPLETE_MEAN:
+                    print("Starting new test: test {} to test {}".format(self.current_test, env_test))
+                else:
+                    _command_response = self._error_template(
+                        "COMPLETE PERCENTAGE TOO LOW: {} < {}".format(mean_test_complete_percentage, TEST_MIN_PERCENTAGE_COMPLETE_MEAN))
+                    self.send_response(_command_response, command)
+                    raise Exception(_command_response['payload'])
+
+            self.current_test = env_test
+            self.current_level = env_level
+
             del self.env
             self.env = RailEnv(width=1, height=1,
                                rail_generator=rail_from_file(test_env_file_path),
@@ -704,13 +754,21 @@ class FlatlandRemoteEvaluationService:
         _payload = command['payload']
 
         if not self.env:
-            raise Exception(
-                "env_client.step called before env_client.env_create() call")
+            raise Exception("env_client.step called before env_client.env_create() call")
         if self.env.dones['__all__']:
             raise Exception(
                 "Client attempted to perform an action on an Env which \
                 has done['__all__']==True")
 
+        overall_elapsed = (time.time() - self.overall_start_time) / 1000000
+        if overall_elapsed > OVERALL_TIMEOUT:
+            msg = "Reached time limit: took {:.2f}s, limit is {:.2f}s, went over by {:.2f}s".format(
+                overall_elapsed, OVERALL_TIMEOUT, overall_elapsed - OVERALL_TIMEOUT
+            )
+            _command_response = self._error_template(msg)
+            self.send_response(_command_response, command)
+            raise Exception(_command_response['payload'])
+
         action = _payload['action']
         inference_time = _payload['inference_time']
         # We record this metric in two keys:
@@ -719,6 +777,7 @@ class FlatlandRemoteEvaluationService:
         self.update_running_stats("current_episode_controller_inference_time", inference_time)
         self.update_running_stats("controller_inference_time", inference_time)
 
+        # Perform the step
         time_start = time.time()
         _observation, all_rewards, done, info = self.env.step(action)
         time_diff = time.time() - time_start
@@ -741,9 +800,11 @@ class FlatlandRemoteEvaluationService:
                 self.env.get_num_agents()
             )
 
-        num_malfunctioning = sum(agent.malfunction_data['malfunction'] > 0 for agent in self.env.agents)
-        if (num_malfunctioning > 0):
-            print(num_malfunctioning, "agent malfunctioning at step", self.current_step)
+        # We count the number of agents that malfunctioned by checking how many have 1 more steps left before recovery
+        num_malfunctioning = sum(agent.malfunction_data['malfunction'] == 1 for agent in self.env.agents)
+
+        if num_malfunctioning > 0:
+            print("Step {}: {} agents have malfunctioned and will recover next step".format(self.current_step, num_malfunctioning))
 
         self.nb_malfunctioning_trains[-1] += num_malfunctioning
 
@@ -751,7 +812,7 @@ class FlatlandRemoteEvaluationService:
         if self.actionDir is not None:
             self.lActions.append(action)
 
-        # all done! episode over
+        # Is the episode over?
         if done["__all__"]:
             self.simulation_done = True
 
@@ -769,13 +830,21 @@ class FlatlandRemoteEvaluationService:
             percentage_complete = complete * 1.0 / self.env.get_num_agents()
             self.simulation_percentage_complete[-1] = percentage_complete
 
-            print("Evaluation finished in {} timesteps, {:.3f} seconds. Percentage agents done: {:.3f}. Normalized reward: {:.3f}. Number of malfunctions: {}.".format(
-                self.simulation_steps[-1],
-                self.simulation_times[-1],
-                self.simulation_percentage_complete[-1],
-                self.simulation_rewards_normalized[-1],
-                self.nb_malfunctioning_trains[-1]
-            ))
+            if self.current_test not in self.simulation_percentage_complete_per_test:
+                self.simulation_percentage_complete_per_test[self.current_test] = []
+
+            self.simulation_percentage_complete_per_test[self.current_test].append(percentage_complete)
+            print("Percentage for test {}, level {}: {}".format(self.current_test, self.current_level, percentage_complete))
+            print(self.simulation_percentage_complete_per_test[self.current_test])
+
+            print(
+                "Evaluation finished in {} timesteps, {:.3f} seconds. Percentage agents done: {:.3f}. Normalized reward: {:.3f}. Number of malfunctions: {}.".format(
+                    self.simulation_steps[-1],
+                    self.simulation_times[-1],
+                    self.simulation_percentage_complete[-1],
+                    self.simulation_rewards_normalized[-1],
+                    self.nb_malfunctioning_trains[-1]
+                ))
 
             # Write intermediate results
             if self.result_output_path:
@@ -1040,20 +1109,26 @@ class FlatlandRemoteEvaluationService:
                 # a timeout occurred: send an error, and give -1.0 normalized score for this episode
                 if self.previous_command['type'] == messages.FLATLAND_RL.ENV_STEP:
                     self.send_error({"type": messages.FLATLAND_RL.ENV_STEP_TIMEOUT})
+                    timeout_details = "step time limit of {}s".format(PER_STEP_TIMEOUT)
 
                 elif self.previous_command['type'] == messages.FLATLAND_RL.ENV_CREATE:
                     self.send_error({"type": messages.FLATLAND_RL.ENV_RESET_TIMEOUT})
+                    timeout_details = "pre-planning time limit of {}s".format(INTIAL_PLANNING_TIMEOUT)
 
                 self.simulation_steps[-1] += 1
                 self.simulation_rewards[-1] = self.env._max_episode_steps * self.env.get_num_agents()
                 self.simulation_rewards_normalized[-1] = -1.0
 
-                print("Evaluation TIMED OUT after {} timesteps, using max penalty. Percentage agents done: {:.3f}. Normalized reward: {:.3f}. Number of malfunctions: {}".format(
-                    self.simulation_steps[-1],
-                    self.simulation_percentage_complete[-1],
-                    self.simulation_rewards_normalized[-1],
-                    self.nb_malfunctioning_trains[-1],
-                ))
+                print(
+                    "Evaluation TIMED OUT after {} timesteps (exceeded {}), using max penalty. {} consecutive timeouts."
+                    "Percentage agents done: {:.3f}. Normalized reward: {:.3f}. Number of malfunctions: {}".format(
+                        self.simulation_steps[-1],
+                        timeout_details,
+                        self.timeout_counter,
+                        self.simulation_percentage_complete[-1],
+                        self.simulation_rewards_normalized[-1],
+                        self.nb_malfunctioning_trains[-1],
+                    ))
 
                 self.timeout_counter += 1
                 self.state_env_timed_out = True
@@ -1203,7 +1278,6 @@ if __name__ == "__main__":
                         help="Results CSV path",
                         required=False)
 
-
     parser.add_argument('--verbose',
                         default=False,
                         action="store_true",
-- 
GitLab