From ffd7a57201933dccfa0d3de35498256ba63d3f38 Mon Sep 17 00:00:00 2001
From: flaurent <florian.laurent@gmail.com>
Date: Thu, 9 Jul 2020 23:36:56 +0200
Subject: [PATCH] Added a kill switch after 10 successive timeouts

---
 flatland/evaluators/service.py | 36 ++++++++++++++++++++++------------
 1 file changed, 24 insertions(+), 12 deletions(-)

diff --git a/flatland/evaluators/service.py b/flatland/evaluators/service.py
index 6e58f02d..dfcae856 100644
--- a/flatland/evaluators/service.py
+++ b/flatland/evaluators/service.py
@@ -58,8 +58,10 @@ DEFAULT_COMMAND_TIMEOUT = int(os.getenv(
     1 * 60))  # 1 min
 # This applies to the rest of the commands
 
+MAX_SUCCESSIVE_TIMEOUTS = 10
 
 RANDOM_SEED = int(os.getenv("FLATLAND_EVALUATION_RANDOM_SEED", 1001))
+
 SUPPORTED_CLIENT_VERSIONS = \
     [
         flatland.__version__
@@ -72,10 +74,10 @@ class FlatlandRemoteEvaluationService:
     of a RailEnv :
     - env_create
     - env_step
-    and an additional `env_submit` to cater to score computation and on-episode-complete post processings.
+    and an additional `env_submit` to cater to score computation and on-episode-complete post-processings.
 
     This service is designed to be used in conjunction with
-    `FlatlandRemoteClient` and both the srevice and client maintain a
+    `FlatlandRemoteClient` and both the service and client maintain a
     local instance of the RailEnv instance, and in case of any unexpected
     divergences in the state of both the instances, the local RailEnv
     instance of the `FlatlandRemoteEvaluationService` is supposed to act
@@ -107,6 +109,7 @@ class FlatlandRemoteEvaluationService:
                  result_output_path=None,
                  ):
 
+        # Episode recording properties
         self.actionDir = actionDir
         if actionDir and not os.path.exists(self.actionDir):
             os.makedirs(self.actionDir)
@@ -118,15 +121,18 @@ class FlatlandRemoteEvaluationService:
             os.makedirs(self.mergeDir)
         self.use_pickle = use_pickle
         self.missing_only = missing_only
+
         # Test Env folder Paths
         self.test_env_folder = test_env_folder
         self.video_generation_envs = video_generation_envs
         self.env_file_paths = self.get_env_filepaths()
+
+        # Shuffle all the env_file_paths for more exciting videos
+        # and for more uniform time progression
         if shuffle:
             random.shuffle(self.env_file_paths)
         print(self.env_file_paths)
-        # Shuffle all the env_file_paths for more exciting videos
-        # and for more uniform time progression
+
         self.instantiate_evaluation_metadata()
 
         # Logging and Reporting related vars
@@ -137,6 +143,10 @@ class FlatlandRemoteEvaluationService:
         # this should be reset to False after env reset() to get the next env.
         self.state_env_timed_out = False
 
+        # Count the number of successive timeouts (will kill after MAX_SUCCESSIVE_TIMEOUTS)
+        # This prevents a crashed submission to keep running forever
+        self.timeout_counter = 0
+
         self.result_output_path = result_output_path
 
         # Communication Protocol Related vars
@@ -452,9 +462,7 @@ class FlatlandRemoteEvaluationService:
             """
             COMMAND_TIMEOUT = 10 ** 6
 
-        @timeout_decorator.timeout(
-            COMMAND_TIMEOUT,
-            use_signals=use_signals_in_timeout)  # timeout for each command
+        @timeout_decorator.timeout(COMMAND_TIMEOUT, use_signals=use_signals_in_timeout)  # timeout for each command
         def _get_next_command(command_channel, _redis):
             """
             A low level wrapper for obtaining the next command from a
@@ -523,10 +531,7 @@ class FlatlandRemoteEvaluationService:
             where we do not have a command, so we have no response channel!
         """
         _redis = self.get_redis_connection()
-        # command_response_channel = command['response_channel']
-
-        # if self.verbose and not suppress_logs:
-        print("Send error : ", error_dict)
+        print("Sending error : ", error_dict)
 
         if self.use_pickle:
             sResponse = pickle.dumps(error_dict)
@@ -1000,7 +1005,6 @@ class FlatlandRemoteEvaluationService:
 
             try:
                 command = self.get_next_command()
-
             except timeout_decorator.timeout_decorator.TimeoutError:
                 # a timeout occured: send an error, and give -1.0 normalized score for this episode
                 if self.previous_command['type'] == messages.FLATLAND_RL.ENV_STEP:
@@ -1019,10 +1023,18 @@ class FlatlandRemoteEvaluationService:
                     self.simulation_rewards_normalized[-1]
                 ))
 
+                self.timeout_counter += 1
                 self.state_env_timed_out = True
                 self.simulation_done = True
+
+                print("Consecutive timeouts: {}".format(self.timeout_counter))
+                if self.timeout_counter > MAX_SUCCESSIVE_TIMEOUTS:
+                    raise Exception("{} consecutive timeouts, aborting.".format(self.timeout_counter))
+
                 continue
 
+            self.timeout_counter = 0
+
             if "timestamp" in command.keys():
                 latency = time.time() - command["timestamp"]
                 MESSAGE_QUEUE_LATENCY.append(latency)
-- 
GitLab