From 1ff597c02841c2976216443489d7639c2b5f0274 Mon Sep 17 00:00:00 2001 From: flaurent <florian.laurent@gmail.com> Date: Thu, 9 Jul 2020 14:33:14 +0200 Subject: [PATCH] Fixed bug when timeout occurs in first step, give normalized score of -1.0 and 0% completion if timeout occurs --- flatland/evaluators/client.py | 22 +++---- flatland/evaluators/service.py | 105 +++++++++++++++++++-------------- 2 files changed, 71 insertions(+), 56 deletions(-) diff --git a/flatland/evaluators/client.py b/flatland/evaluators/client.py index 3b47cab2..03d32a7a 100644 --- a/flatland/evaluators/client.py +++ b/flatland/evaluators/client.py @@ -48,7 +48,7 @@ class FlatlandRemoteClient(object): test_envs_root=None, verbose=False, use_pickle=False): - self.use_pickle=use_pickle + self.use_pickle = use_pickle self.remote_host = remote_host self.remote_port = remote_port self.remote_db = remote_db @@ -91,8 +91,6 @@ class FlatlandRemoteClient(object): self.env_step_times = [] self.stats = {} - - def update_running_stats(self, key, scalar): """ Computes the running mean for certain params @@ -168,8 +166,8 @@ class FlatlandRemoteClient(object): object_hook=m.decode, strict_map_key=False, # new for msgpack 1.0? encoding="utf8" # remove for msgpack 1.0 - ) - print("error received: ", error_dict) + ) + print("Error received: ", error_dict) raise StopAsyncIteration(error_dict["type"]) # Push request in command_channels @@ -193,7 +191,7 @@ class FlatlandRemoteClient(object): object_hook=m.decode, strict_map_key=False, # new for msgpack 1.0? encoding="utf8" # remove for msgpack 1.0 - ) + ) if _response['type'] == messages.FLATLAND_RL.ERROR: raise Exception(str(_response["payload"])) else: @@ -286,7 +284,7 @@ class FlatlandRemoteClient(object): def env_step(self, action, render=False): """ Respond with [observation, reward, done, info] - """ + """ # We use the last_env_step_time as an approximate measure of the inference time approximate_inference_time = time.time() - self.last_env_step_time self.update_running_stats("inference_time(approx)", approximate_inference_time) @@ -335,10 +333,10 @@ class FlatlandRemoteClient(object): min_key = "{}_min".format(metric_name) max_key = "{}_max".format(metric_name) print("\t - {}\t => min: {} || mean: {} || max: {}".format( - metric_name, - self.stats[min_key], - self.stats[mean_key], - self.stats[max_key])) + metric_name, + self.stats[min_key], + self.stats[mean_key], + self.stats[max_key])) print("=" * 100) if os.getenv("AICROWD_BLOCKING_SUBMIT"): """ @@ -354,12 +352,14 @@ class FlatlandRemoteClient(object): if __name__ == "__main__": remote_client = FlatlandRemoteClient() + def my_controller(obs, _env): _action = {} for _idx, _ in enumerate(_env.agents): _action[_idx] = np.random.randint(0, 5) return _action + my_observation_builder = DummyObservationBuilder() episode = 0 diff --git a/flatland/evaluators/service.py b/flatland/evaluators/service.py index 5e1e9076..6e58f02d 100644 --- a/flatland/evaluators/service.py +++ b/flatland/evaluators/service.py @@ -152,7 +152,6 @@ class FlatlandRemoteEvaluationService: self.service_id ) - # Message Broker related vars self.remote_host = remote_host self.remote_port = remote_port @@ -283,16 +282,14 @@ class FlatlandRemoteEvaluationService: x, self.test_env_folder ) for x in env_paths]) - - # if requested, only generate actions for those envs which don't already have them + # if requested, only generate actions for those envs which don't already have them if self.mergeDir and self.missing_only: existing_paths = (itertools.chain.from_iterable( - [ glob.glob(os.path.join(self.mergeDir, f"envs/*.{ext}")) - for ext in ["pkl", "mpk"] ])) + [glob.glob(os.path.join(self.mergeDir, f"envs/*.{ext}")) + for ext in ["pkl", "mpk"]])) existing_paths = [os.path.relpath(sPath, self.mergeDir) for sPath in existing_paths] env_paths = sorted(set(env_paths) - set(existing_paths)) - return env_paths def instantiate_evaluation_metadata(self): @@ -337,8 +334,8 @@ class FlatlandRemoteEvaluationService: and it simply tries to update the simulation specific information for the **previous** episode in the metadata_df if it exists. """ - if self.evaluation_metadata_df is not None and len(self.simulation_env_file_paths) > 0: + if self.evaluation_metadata_df is not None and len(self.simulation_env_file_paths) > 0: last_simulation_env_file_path = self.simulation_env_file_paths[-1] _row = self.evaluation_metadata_df.loc[ @@ -353,15 +350,21 @@ class FlatlandRemoteEvaluationService: # TODO: This needs refactoring # Add controller_inference_time_metrics - _row.controller_inference_time_min = self.stats[ - "current_episode_controller_inference_time_min" - ] - _row.controller_inference_time_mean = self.stats[ - "current_episode_controller_inference_time_mean" - ] - _row.controller_inference_time_max = self.stats[ - "current_episode_controller_inference_time_max" - ] + # These metrics may be missing if no step was done before the episode finished + if "current_episode_controller_inference_time_min" in self.stats: + _row.controller_inference_time_min = self.stats[ + "current_episode_controller_inference_time_min" + ] + _row.controller_inference_time_mean = self.stats[ + "current_episode_controller_inference_time_mean" + ] + _row.controller_inference_time_max = self.stats[ + "current_episode_controller_inference_time_max" + ] + else: + _row.controller_inference_time_min = 0.0 + _row.controller_inference_time_mean = 0.0 + _row.controller_inference_time_max = 0.0 self.evaluation_metadata_df.loc[ last_simulation_env_file_path @@ -462,19 +465,19 @@ class FlatlandRemoteEvaluationService: command = _redis.brpop(command_channel)[1] return command - #try: + # try: if True: _redis = self.get_redis_connection() command = _get_next_command(self.command_channel, _redis) if self.verbose or self.report: print("Command Service: ", command) - #except timeout_decorator.timeout_decorator.TimeoutError: - #raise Exception( - # "Timeout of {}s in step {} of simulation {}".format( - # COMMAND_TIMEOUT, - # self.current_step, - # self.simulation_count - # )) + # except timeout_decorator.timeout_decorator.TimeoutError: + # raise Exception( + # "Timeout of {}s in step {} of simulation {}".format( + # COMMAND_TIMEOUT, + # self.current_step, + # self.simulation_count + # )) # print("Timeout of {}s in step {} of simulation {}".format( # COMMAND_TIMEOUT, @@ -483,15 +486,14 @@ class FlatlandRemoteEvaluationService: # )) # return {"type":messages.FLATLAND_RL.ENV_STEP_TIMEOUT} - if self.use_pickle: command = pickle.loads(command) else: command = msgpack.unpackb( command, object_hook=m.decode, - strict_map_key=False, # msgpack 1.0 - encoding="utf8" # msgpack 1.0 + strict_map_key=False, # msgpack 1.0 + encoding="utf8" # msgpack 1.0 ) if self.verbose: print("Received Request : ", command) @@ -521,9 +523,9 @@ class FlatlandRemoteEvaluationService: where we do not have a command, so we have no response channel! """ _redis = self.get_redis_connection() - #command_response_channel = command['response_channel'] + # command_response_channel = command['response_channel'] - #if self.verbose and not suppress_logs: + # if self.verbose and not suppress_logs: print("Send error : ", error_dict) if self.use_pickle: @@ -679,7 +681,7 @@ class FlatlandRemoteEvaluationService: """ if self.state_env_timed_out: - print("ignoring step command after timeout") + print("Ignoring step command after timeout") return _payload = command['payload'] @@ -725,6 +727,8 @@ class FlatlandRemoteEvaluationService: # record the actions before checking for done if self.actionDir is not None: self.lActions.append(action) + + # all done! episode over if done["__all__"]: self.simulation_done = True @@ -793,7 +797,7 @@ class FlatlandRemoteEvaluationService: sfEpisode = self.episodeDir + "/" + sfEnv print("env path: ", sfEnv, " sfEpisode:", sfEpisode) RailEnvPersister.save_episode(self.env, sfEpisode) - #self.env.save_episode(sfEpisode) + # self.env.save_episode(sfEpisode) def save_merged_env(self): sfEnv = self.env_file_paths[self.simulation_count] @@ -804,7 +808,7 @@ class FlatlandRemoteEvaluationService: print("Input env path: ", sfEnv, " Merge File:", sfMergeEnv) RailEnvPersister.save_episode(self.env, sfMergeEnv) - #self.env.save_episode(sfMergeEnv) + # self.env.save_episode(sfMergeEnv) def handle_env_submit(self, command): """ @@ -998,13 +1002,25 @@ class FlatlandRemoteEvaluationService: command = self.get_next_command() except timeout_decorator.timeout_decorator.TimeoutError: + # a timeout occured: send an error, and give -1.0 normalized score for this episode if self.previous_command['type'] == messages.FLATLAND_RL.ENV_STEP: - self.send_error({"type":messages.FLATLAND_RL.ENV_STEP_TIMEOUT}) + self.send_error({"type": messages.FLATLAND_RL.ENV_STEP_TIMEOUT}) elif self.previous_command['type'] == messages.FLATLAND_RL.ENV_CREATE: - self.send_error({"type":messages.FLATLAND_RL.ENV_RESET_TIMEOUT}) + self.send_error({"type": messages.FLATLAND_RL.ENV_RESET_TIMEOUT}) + + self.simulation_steps[-1] += 1 + self.simulation_rewards[-1] = self.env._max_episode_steps * self.env.get_num_agents() + self.simulation_rewards_normalized[-1] = -1.0 + + print("Evaluation TIMED OUT after {} timesteps, using max penalty. Percentage agents done: {:.3f}. Normalized reward: {:.3f}.".format( + self.simulation_steps[-1], + self.simulation_percentage_complete[-1], + self.simulation_rewards_normalized[-1] + )) self.state_env_timed_out = True + self.simulation_done = True continue if "timestamp" in command.keys(): @@ -1050,16 +1066,16 @@ class FlatlandRemoteEvaluationService: print("Overall Message Queue Latency : ", np.array(MESSAGE_QUEUE_LATENCY).mean()) self.handle_env_submit(command) - elif command['type'] == messages.FLATLAND_RL.ENV_STEP_TIMEOUT: - """ - ENV_STEP_TIMEOUT - - The client took too long to give us the next command. - - """ - - print("client env_step timeout") - self.handle_env_step_timeout(command) + # elif command['type'] == messages.FLATLAND_RL.ENV_STEP_TIMEOUT: + # """ + # ENV_STEP_TIMEOUT + # + # The client took too long to give us the next command. + # + # """ + # + # print("client env_step timeout") + # self.handle_env_step_timeout(command) else: _error = self._error_template( @@ -1142,7 +1158,6 @@ if __name__ == "__main__": help="only request the envs/actions which are missing", required=False) - parser.add_argument('--verbose', default=False, action="store_true", -- GitLab