diff --git a/flatland/evaluators/client.py b/flatland/evaluators/client.py index 7096317d415951cef51e9b56d083a3a434904c57..545d52a7d5ad11cf951de1de34fb62ae953fa4ff 100644 --- a/flatland/evaluators/client.py +++ b/flatland/evaluators/client.py @@ -84,20 +84,32 @@ class FlatlandRemoteClient(object): self.env_step_times = [] self.stats = {} - def update_running_mean_stats(self, key, scalar): + def update_running_stats(self, key, scalar): """ Computes the running mean for certain params """ mean_key = "{}_mean".format(key) counter_key = "{}_counter".format(key) + min_key = "{}_min".format(key) + max_key = "{}_max".format(key) try: + # Update Mean self.stats[mean_key] = \ ((self.stats[mean_key] * self.stats[counter_key]) + scalar) / (self.stats[counter_key] + 1) + # Update min + if scalar < self.stats[min_key]: + self.stats[min_key] = scalar + # Update max + if scalar > self.stats[max_key]: + self.stats[max_key] = scalar + self.stats[counter_key] += 1 except KeyError: - self.stats[mean_key] = 0 - self.stats[counter_key] = 0 + self.stats[mean_key] = scalar + self.stats[min_key] = scalar + self.stats[max_key] = scalar + self.stats[counter_key] = 1 def get_redis_connection(self): return self.redis_conn @@ -191,7 +203,7 @@ class FlatlandRemoteClient(object): random_seed = _response['payload']['random_seed'] test_env_file_path = _response['payload']['env_file_path'] time_diff = time.time() - time_start - self.update_running_mean_stats("env_creation_wait_time", time_diff) + self.update_running_stats("env_creation_wait_time", time_diff) if not observation: # If the observation is False, @@ -223,6 +235,8 @@ class FlatlandRemoteClient(object): obs_builder_object=obs_builder_object) time_start = time.time() + # Use the local observation + # as the remote server uses a dummy observation builder local_observation, info = self.env.reset( regenerate_rail=True, regenerate_schedule=True, @@ -230,19 +244,25 @@ class FlatlandRemoteClient(object): random_seed=random_seed ) time_diff = time.time() - time_start - self.update_running_mean_stats("internal_env_reset_time", time_diff) - # Use the local observation - # as the remote server uses a dummy observation builder + self.update_running_stats("internal_env_reset_time", time_diff) + + # We use the last_env_step_time as an approximate measure of the inference time + self.last_env_step_time = time.time() return local_observation, info def env_step(self, action, render=False): """ Respond with [observation, reward, done, info] - """ + """ + # We use the last_env_step_time as an approximate measure of the inference time + approximate_inference_time = time.time() - self.last_env_step_time + self.update_running_stats("inference_time(approx)", approximate_inference_time) + _request = {} _request['type'] = messages.FLATLAND_RL.ENV_STEP _request['payload'] = {} _request['payload']['action'] = action + _request['payload']['inference_time'] = approximate_inference_time # Relay the action in a non-blocking way to the server # so that it can start doing an env.step on it in ~ parallel @@ -254,7 +274,10 @@ class FlatlandRemoteClient(object): self.env.step(action) time_diff = time.time() - time_start # Compute a running mean of env step times - self.update_running_mean_stats("internal_env_step_time", time_diff) + self.update_running_stats("internal_env_step_time", time_diff) + + # We use the last_env_step_time as an approximate measure of the inference time + self.last_env_step_time = time.time() return [local_observation, local_reward, local_done, local_info] @@ -273,7 +296,15 @@ class FlatlandRemoteClient(object): print("=" * 100) for _key in self.stats: if _key.endswith("_mean"): - print("\t - {}\t:{}".format(_key, self.stats[_key])) + metric_name = _key.replace("_mean", "") + mean_key = "{}_mean".format(metric_name) + min_key = "{}_min".format(metric_name) + max_key = "{}_max".format(metric_name) + print("\t - {}\t => min: {} || mean: {} || max: {}".format( + metric_name, + self.stats[min_key], + self.stats[mean_key], + self.stats[max_key])) print("=" * 100) if os.getenv("AICROWD_BLOCKING_SUBMIT"): """ diff --git a/flatland/evaluators/service.py b/flatland/evaluators/service.py index 2a6e64c52128323800d480d1d275d38c6782dfbf..09b98b4d9d0081b3da4651e0142f103383df1f8b 100644 --- a/flatland/evaluators/service.py +++ b/flatland/evaluators/service.py @@ -12,6 +12,7 @@ import crowdai_api import msgpack import msgpack_numpy as m import numpy as np +import pandas as pd import redis import timeout_decorator @@ -91,6 +92,7 @@ class FlatlandRemoteEvaluationService: visualize=False, video_generation_envs=[], report=None, + result_output_path=None, verbose=False): # Test Env folder Paths @@ -101,11 +103,14 @@ class FlatlandRemoteEvaluationService: print(self.env_file_paths) # Shuffle all the env_file_paths for more exciting videos # and for more uniform time progression + self.instantiate_evaluation_metadata() # Logging and Reporting related vars self.verbose = verbose self.report = report + self.result_output_path = result_output_path + # Communication Protocol Related vars self.namespace = "flatland-rl" self.service_id = flatland_rl_service_id @@ -146,6 +151,7 @@ class FlatlandRemoteEvaluationService: self.env_renderer = False self.reward = 0 self.simulation_count = -1 + self.simulation_env_file_paths = [] self.simulation_rewards = [] self.simulation_rewards_normalized = [] self.simulation_percentage_complete = [] @@ -166,20 +172,50 @@ class FlatlandRemoteEvaluationService: shutil.rmtree(self.vizualization_folder_name) os.mkdir(self.vizualization_folder_name) - def update_running_mean_stats(self, key, scalar): + def update_running_stats(self, key, scalar): """ Computes the running mean for certain params """ mean_key = "{}_mean".format(key) counter_key = "{}_counter".format(key) + min_key = "{}_min".format(key) + max_key = "{}_max".format(key) try: + # Update Mean self.stats[mean_key] = \ ((self.stats[mean_key] * self.stats[counter_key]) + scalar) / (self.stats[counter_key] + 1) + # Update min + if scalar < self.stats[min_key]: + self.stats[min_key] = scalar + # Update max + if scalar > self.stats[max_key]: + self.stats[max_key] = scalar + self.stats[counter_key] += 1 except KeyError: - self.stats[mean_key] = 0 - self.stats[counter_key] = 0 + self.stats[mean_key] = scalar + self.stats[min_key] = scalar + self.stats[max_key] = scalar + self.stats[counter_key] = 1 + + def delete_key_in_running_stats(self, key): + """ + This deletes a particular key in the running stats + dictionary, if it exists + """ + mean_key = "{}_mean".format(key) + counter_key = "{}_counter".format(key) + min_key = "{}_min".format(key) + max_key = "{}_max".format(key) + + try: + del mean_key + del counter_key + del min_key + del max_key + except KeyError: + pass def get_env_filepaths(self): """ @@ -213,6 +249,86 @@ class FlatlandRemoteEvaluationService: ) for x in env_paths]) return env_paths + + def instantiate_evaluation_metadata(self): + """ + This instantiates a pandas dataframe to record + information specific to each of the individual env evaluations. + + This loads the template CSV with pre-filled information from the + provided metadata.csv file, and fills it up with + evaluation runtime information. + """ + self.evaluation_metadata_df = None + metadata_file_path = os.path.join( + self.test_env_folder, + "metadata.csv" + ) + if os.path.exists(metadata_file_path): + self.evaluation_metadata_df = pd.read_csv(metadata_file_path) + self.evaluation_metadata_df["filename"] = \ + self.evaluation_metadata_df["test_id"] + \ + "/" + self.evaluation_metadata_df["env_id"] + ".pkl" + self.evaluation_metadata_df = self.evaluation_metadata_df.set_index("filename") + + # Add custom columns for evaluation specific metrics + self.evaluation_metadata_df["reward"] = np.nan + self.evaluation_metadata_df["normalized_reward"] = np.nan + self.evaluation_metadata_df["percentage_complete"] = np.nan + self.evaluation_metadata_df["steps"] = np.nan + self.evaluation_metadata_df["simulation_time"] = np.nan + + # Add client specific columns + # TODO: This needs refactoring + self.evaluation_metadata_df["controller_inference_time_min"] = np.nan + self.evaluation_metadata_df["controller_inference_time_mean"] = np.nan + self.evaluation_metadata_df["controller_inference_time_max"] = np.nan + else: + print("[WARNING] metadata.csv not found in tests folder. Granular metric collection is hence Disabled.") + + def update_evaluation_metadata(self): + """ + This function is called when we move from one simulation to another + and it simply tries to update the simulation specific information + for the **previous** episode in the metadata_df if it exists. + """ + if self.evaluation_metadata_df is not None and len(self.simulation_env_file_paths) > 0: + + last_simulation_env_file_path = self.simulation_env_file_paths[-1] + + _row = self.evaluation_metadata_df.loc[ + last_simulation_env_file_path + ] + + _row.reward = self.simulation_rewards[-1] + _row.normalized_reward = self.simulation_rewards_normalized[-1] + _row.percentage_complete = self.simulation_percentage_complete[-1] + _row.steps = self.simulation_steps[-1] + _row.simulation_time = self.simulation_times[-1] + + # TODO: This needs refactoring + # Add controller_inference_time_metrics + _row.controller_inference_time_min = self.stats[ + "current_episode_controller_inference_time_min" + ] + _row.controller_inference_time_mean = self.stats[ + "current_episode_controller_inference_time_mean" + ] + _row.controller_inference_time_max = self.stats[ + "current_episode_controller_inference_time_max" + ] + + self.evaluation_metadata_df.loc[ + last_simulation_env_file_path + ] = _row + + # Delete this key from the stats to ensure that it + # gets computed again from scratch in the next episode + self.delete_key_in_running_stats( + "current_episode_controller_inference_time") + + if self.verbose: + print(self.evaluation_metadata_df) def instantiate_redis_connection_pool(self): """ @@ -322,7 +438,7 @@ class FlatlandRemoteEvaluationService: print("Received Request : ", command) message_queue_latency = time.time() - command["timestamp"] - self.update_running_mean_stats("message_queue_latency", message_queue_latency) + self.update_running_stats("message_queue_latency", message_queue_latency) return command def send_response(self, _command_response, command, suppress_logs=False): @@ -390,9 +506,20 @@ class FlatlandRemoteEvaluationService: if self.begin_simulation: # If begin simulation has already been initialized # atleast once + # This adds the simulation time for the previous episode self.simulation_times.append(time.time() - self.begin_simulation) self.begin_simulation = time.time() + # Update evaluation metadata for the previous episode + self.update_evaluation_metadata() + + # Start adding placeholders for the new episode + self.simulation_env_file_paths.append( + os.path.relpath( + test_env_file_path, + self.test_env_folder + )) # relative path + self.simulation_rewards.append(0) self.simulation_rewards_normalized.append(0) self.simulation_percentage_complete.append(0) @@ -465,10 +592,17 @@ class FlatlandRemoteEvaluationService: has done['__all__']==True") action = _payload['action'] + inference_time = _payload['inference_time'] + # We record this metric in two keys: + # - One for the current episode + # - One global + self.update_running_stats("current_episode_controller_inference_time", inference_time) + self.update_running_stats("controller_inference_time", inference_time) + time_start = time.time() _observation, all_rewards, done, info = self.env.step(action) time_diff = time.time() - time_start - self.update_running_mean_stats("internal_env_step_time", time_diff) + self.update_running_stats("internal_env_step_time", time_diff) cumulative_reward = sum(all_rewards.values()) self.simulation_rewards[-1] += cumulative_reward @@ -531,11 +665,21 @@ class FlatlandRemoteEvaluationService: print("=" * 100) for _key in self.stats: if _key.endswith("_mean"): - print("\t - {}\t:{}".format(_key, self.stats[_key])) + metric_name = _key.replace("_mean", "") + mean_key = "{}_mean".format(metric_name) + min_key = "{}_min".format(metric_name) + max_key = "{}_max".format(metric_name) + print("\t - {}\t => min: {} || mean: {} || max: {}".format( + metric_name, + self.stats[min_key], + self.stats[mean_key], + self.stats[max_key])) print("=" * 100) # Register simulation time of the last episode self.simulation_times.append(time.time() - self.begin_simulation) + # Compute the evaluation metadata for the last episode + self.update_evaluation_metadata() if len(self.simulation_rewards) != len(self.env_file_paths): raise Exception( @@ -581,6 +725,17 @@ class FlatlandRemoteEvaluationService: else: print("[WARNING] Ignoring uploading of video to S3") + ##################################################################### + # Write Results to a file (if applicable) + ##################################################################### + if self.result_output_path: + if self.evaluation_metadata_df is not None: + self.evaluation_metadata_df.to_csv(self.result_output_path) + print("Wrote output results to : {}".format(self.result_output_path)) + else: + print("[WARING] Unable to write final results to the specified path" + " as metadata.csv is not provided in the tests_folder") + _command_response = {} _command_response['type'] = messages.FLATLAND_RL.ENV_SUBMIT_RESPONSE _payload = {} @@ -749,7 +904,8 @@ if __name__ == "__main__": flatland_rl_service_id=args.service_id, verbose=False, visualize=True, - video_generation_envs=["Test_0/Level_100.pkl"] + video_generation_envs=["Test_0/Level_100.pkl"], + result_output_path="/tmp/output.csv" ) result = grader.run() if result['type'] == messages.FLATLAND_RL.ENV_SUBMIT_RESPONSE: diff --git a/requirements_dev.txt b/requirements_dev.txt index 4abb5acec55aab6af94cc7d553fc4d4afce0913a..910635e288de911b3d884a06e610fd2868abd1fe 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -13,6 +13,7 @@ msgpack>=0.6.1 msgpack-numpy>=0.4.4.0 svgutils>=0.3.1 pyarrow>=0.13.0 +pandas>=0.25.1 importlib-metadata>=0.17 importlib-resources>=1.0.1 six>=1.12.0