diff --git a/flatland/evaluators/aicrowd_helpers.py b/flatland/evaluators/aicrowd_helpers.py index 2ae1e335b42b800b7e7b383a5c7992b3a69c6f50..0d46dca01f47cf00185332a1cd0e751b9f1c8d4c 100644 --- a/flatland/evaluators/aicrowd_helpers.py +++ b/flatland/evaluators/aicrowd_helpers.py @@ -28,7 +28,7 @@ def get_boto_client(): raise Exception("AWS Credentials not provided..") try: import boto3 - except ImportError as e: + except ImportError: raise Exception( "boto3 is not installed. Please manually install by : ", " pip install -U boto3" diff --git a/flatland/evaluators/client.py b/flatland/evaluators/client.py index 1b743843ae64dec4a9df3bf2e58c437f8a58d1e3..7096317d415951cef51e9b56d083a3a434904c57 100644 --- a/flatland/evaluators/client.py +++ b/flatland/evaluators/client.py @@ -16,6 +16,7 @@ from flatland.envs.rail_env import RailEnv from flatland.envs.rail_generators import rail_from_file from flatland.envs.schedule_generators import schedule_from_file from flatland.evaluators import messages +from flatland.core.env_observation_builder import DummyObservationBuilder logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -288,14 +289,12 @@ class FlatlandRemoteClient(object): if __name__ == "__main__": remote_client = FlatlandRemoteClient() - def my_controller(obs, _env): _action = {} for _idx, _ in enumerate(_env.agents): _action[_idx] = np.random.randint(0, 5) return _action - my_observation_builder = DummyObservationBuilder() episode = 0 diff --git a/flatland/evaluators/service.py b/flatland/evaluators/service.py index d9b303c79e71037d224cb108235805ae1f235f56..2a6e64c52128323800d480d1d275d38c6782dfbf 100644 --- a/flatland/evaluators/service.py +++ b/flatland/evaluators/service.py @@ -41,7 +41,18 @@ m.patch() ######################################################## # CONSTANTS ######################################################## -PER_STEP_TIMEOUT = 10 * 60 # 5 minutes +INTIAL_PLANNING_TIMEOUT = int(os.getenv( + "FLATLAND_INITIAL_PLANNING_TIMEOUT", + 5 * 60)) # 5 mins +PER_STEP_TIMEOUT = int(os.getenv( + "FLATLAND_PER_STEP_TIMEOUT", + 5)) # 5 seconds +DEFAULT_COMMAND_TIMEOUT = int(os.getenv( + "FLATLAND_DEFAULT_COMMAND_TIMEOUT", + 1 * 60)) # 1 min +# This applies to the rest of the commands + + RANDOM_SEED = int(os.getenv("FLATLAND_EVALUATION_RANDOM_SEED", 1001)) SUPPORTED_CLIENT_VERSIONS = \ [ @@ -126,6 +137,9 @@ class FlatlandRemoteEvaluationService: } } self.stats = {} + self.previous_command = { + "type": None + } # RailEnv specific variables self.env = False @@ -236,19 +250,6 @@ class FlatlandRemoteEvaluationService: _response['payload'] = payload return _response - @timeout_decorator.timeout( - PER_STEP_TIMEOUT, - use_signals=use_signals_in_timeout) # timeout for each command - def _get_next_command(self, _redis): - """ - A low level wrapper for obtaining the next command from a - pre-agreed command channel. - At the momment, the communication protocol uses lpush for pushing - in commands, and brpop for reading out commands. - """ - command = _redis.brpop(self.command_channel)[1] - return command - def get_next_command(self): """ A helper function to obtain the next command, which transparently @@ -256,14 +257,59 @@ class FlatlandRemoteEvaluationService: packed message, and consider the timeouts, etc when trying to fetch a new command. """ + + COMMAND_TIMEOUT = DEFAULT_COMMAND_TIMEOUT + """ + Handle case specific timeouts : + - INTIAL_PLANNING_TIMEOUT + The timeout between an env_create call and the first env_step call + - PER_STEP_TIMEOUT + The timeout between two consecutive env_step calls + """ + if self.previous_command['type'] == messages.FLATLAND_RL.ENV_CREATE: + """ + In case the previous command is an env_create, then leave + a but more time for the intial planning + """ + COMMAND_TIMEOUT = INTIAL_PLANNING_TIMEOUT + elif self.previous_command['type'] == messages.FLATLAND_RL.ENV_STEP: + """ + Use the per_step_time for all timesteps between two env_step calls + # Corner Case : + - Are there any reasons why a call between the last env_step call + and the subsequent env_create call will take an excessively large + amount of time (>5s in this case) + """ + COMMAND_TIMEOUT = PER_STEP_TIMEOUT + elif self.previous_command['type'] == messages.FLATLAND_RL.ENV_SUBMIT: + """ + If the user has already done an env_submit call, then the timeout + can be an arbitrarily large number. + """ + COMMAND_TIMEOUT = 10**6 + + @timeout_decorator.timeout( + COMMAND_TIMEOUT, + use_signals=use_signals_in_timeout) # timeout for each command + def _get_next_command(command_channel, _redis): + """ + A low level wrapper for obtaining the next command from a + pre-agreed command channel. + At the momment, the communication protocol uses lpush for pushing + in commands, and brpop for reading out commands. + """ + command = _redis.brpop(command_channel)[1] + return command + try: _redis = self.get_redis_connection() - command = self._get_next_command(_redis) + command = _get_next_command(self.command_channel, _redis) if self.verbose or self.report: print("Command Service: ", command) except timeout_decorator.timeout_decorator.TimeoutError: raise Exception( - "Timeout in step {} of simulation {}".format( + "Timeout of {}s in step {} of simulation {}".format( + COMMAND_TIMEOUT, self.current_step, self.simulation_count )) @@ -617,9 +663,9 @@ class FlatlandRemoteEvaluationService: print("Self.Reward : ", self.reward) print("Current Simulation : ", self.simulation_count) if self.env_file_paths and \ - self.simulation_count < len(self.env_file_paths): + self.simulation_count < len(self.env_file_paths): print("Current Env Path : ", - self.env_file_paths[self.simulation_count]) + self.env_file_paths[self.simulation_count]) try: if command['type'] == messages.FLATLAND_RL.PING: @@ -662,6 +708,16 @@ class FlatlandRemoteEvaluationService: _error, command['response_channel']) return _error + ########################################### + # We keep a record of the previous command + # to be able to have different behaviors + # between different "command transitions" + # + # An example use case, is when we want to + # have a different timeout for the + # first step in every environment + # to account for some initial planning time + self.previous_command = command except Exception as e: print("Error : ", str(e)) print(traceback.format_exc()) @@ -691,9 +747,9 @@ if __name__ == "__main__": grader = FlatlandRemoteEvaluationService( test_env_folder=test_folder, flatland_rl_service_id=args.service_id, - verbose=True, + verbose=False, visualize=True, - video_generation_envs=["Test_0/Level_1.pkl"] + video_generation_envs=["Test_0/Level_100.pkl"] ) result = grader.run() if result['type'] == messages.FLATLAND_RL.ENV_SUBMIT_RESPONSE: