Skip to content
Snippets Groups Projects
Commit 67ac98b3 authored by mohanty's avatar mohanty
Browse files

Merge branch 'spm/318-finegrained-metrics' into 'master'

Spm/318 finegrained metrics

See merge request flatland/flatland!291
parents 049d8d13 7ca5b2c1
No related branches found
No related tags found
No related merge requests found
......@@ -84,20 +84,32 @@ class FlatlandRemoteClient(object):
self.env_step_times = []
self.stats = {}
def update_running_mean_stats(self, key, scalar):
def update_running_stats(self, key, scalar):
"""
Computes the running mean for certain params
"""
mean_key = "{}_mean".format(key)
counter_key = "{}_counter".format(key)
min_key = "{}_min".format(key)
max_key = "{}_max".format(key)
try:
# Update Mean
self.stats[mean_key] = \
((self.stats[mean_key] * self.stats[counter_key]) + scalar) / (self.stats[counter_key] + 1)
# Update min
if scalar < self.stats[min_key]:
self.stats[min_key] = scalar
# Update max
if scalar > self.stats[max_key]:
self.stats[max_key] = scalar
self.stats[counter_key] += 1
except KeyError:
self.stats[mean_key] = 0
self.stats[counter_key] = 0
self.stats[mean_key] = scalar
self.stats[min_key] = scalar
self.stats[max_key] = scalar
self.stats[counter_key] = 1
def get_redis_connection(self):
return self.redis_conn
......@@ -191,7 +203,7 @@ class FlatlandRemoteClient(object):
random_seed = _response['payload']['random_seed']
test_env_file_path = _response['payload']['env_file_path']
time_diff = time.time() - time_start
self.update_running_mean_stats("env_creation_wait_time", time_diff)
self.update_running_stats("env_creation_wait_time", time_diff)
if not observation:
# If the observation is False,
......@@ -223,6 +235,8 @@ class FlatlandRemoteClient(object):
obs_builder_object=obs_builder_object)
time_start = time.time()
# Use the local observation
# as the remote server uses a dummy observation builder
local_observation, info = self.env.reset(
regenerate_rail=True,
regenerate_schedule=True,
......@@ -230,19 +244,25 @@ class FlatlandRemoteClient(object):
random_seed=random_seed
)
time_diff = time.time() - time_start
self.update_running_mean_stats("internal_env_reset_time", time_diff)
# Use the local observation
# as the remote server uses a dummy observation builder
self.update_running_stats("internal_env_reset_time", time_diff)
# We use the last_env_step_time as an approximate measure of the inference time
self.last_env_step_time = time.time()
return local_observation, info
def env_step(self, action, render=False):
"""
Respond with [observation, reward, done, info]
"""
"""
# We use the last_env_step_time as an approximate measure of the inference time
approximate_inference_time = time.time() - self.last_env_step_time
self.update_running_stats("inference_time(approx)", approximate_inference_time)
_request = {}
_request['type'] = messages.FLATLAND_RL.ENV_STEP
_request['payload'] = {}
_request['payload']['action'] = action
_request['payload']['inference_time'] = approximate_inference_time
# Relay the action in a non-blocking way to the server
# so that it can start doing an env.step on it in ~ parallel
......@@ -254,7 +274,10 @@ class FlatlandRemoteClient(object):
self.env.step(action)
time_diff = time.time() - time_start
# Compute a running mean of env step times
self.update_running_mean_stats("internal_env_step_time", time_diff)
self.update_running_stats("internal_env_step_time", time_diff)
# We use the last_env_step_time as an approximate measure of the inference time
self.last_env_step_time = time.time()
return [local_observation, local_reward, local_done, local_info]
......@@ -273,7 +296,15 @@ class FlatlandRemoteClient(object):
print("=" * 100)
for _key in self.stats:
if _key.endswith("_mean"):
print("\t - {}\t:{}".format(_key, self.stats[_key]))
metric_name = _key.replace("_mean", "")
mean_key = "{}_mean".format(metric_name)
min_key = "{}_min".format(metric_name)
max_key = "{}_max".format(metric_name)
print("\t - {}\t => min: {} || mean: {} || max: {}".format(
metric_name,
self.stats[min_key],
self.stats[mean_key],
self.stats[max_key]))
print("=" * 100)
if os.getenv("AICROWD_BLOCKING_SUBMIT"):
"""
......
......@@ -12,6 +12,7 @@ import crowdai_api
import msgpack
import msgpack_numpy as m
import numpy as np
import pandas as pd
import redis
import timeout_decorator
......@@ -91,6 +92,7 @@ class FlatlandRemoteEvaluationService:
visualize=False,
video_generation_envs=[],
report=None,
result_output_path=None,
verbose=False):
# Test Env folder Paths
......@@ -101,11 +103,14 @@ class FlatlandRemoteEvaluationService:
print(self.env_file_paths)
# Shuffle all the env_file_paths for more exciting videos
# and for more uniform time progression
self.instantiate_evaluation_metadata()
# Logging and Reporting related vars
self.verbose = verbose
self.report = report
self.result_output_path = result_output_path
# Communication Protocol Related vars
self.namespace = "flatland-rl"
self.service_id = flatland_rl_service_id
......@@ -146,6 +151,7 @@ class FlatlandRemoteEvaluationService:
self.env_renderer = False
self.reward = 0
self.simulation_count = -1
self.simulation_env_file_paths = []
self.simulation_rewards = []
self.simulation_rewards_normalized = []
self.simulation_percentage_complete = []
......@@ -166,20 +172,50 @@ class FlatlandRemoteEvaluationService:
shutil.rmtree(self.vizualization_folder_name)
os.mkdir(self.vizualization_folder_name)
def update_running_mean_stats(self, key, scalar):
def update_running_stats(self, key, scalar):
"""
Computes the running mean for certain params
"""
mean_key = "{}_mean".format(key)
counter_key = "{}_counter".format(key)
min_key = "{}_min".format(key)
max_key = "{}_max".format(key)
try:
# Update Mean
self.stats[mean_key] = \
((self.stats[mean_key] * self.stats[counter_key]) + scalar) / (self.stats[counter_key] + 1)
# Update min
if scalar < self.stats[min_key]:
self.stats[min_key] = scalar
# Update max
if scalar > self.stats[max_key]:
self.stats[max_key] = scalar
self.stats[counter_key] += 1
except KeyError:
self.stats[mean_key] = 0
self.stats[counter_key] = 0
self.stats[mean_key] = scalar
self.stats[min_key] = scalar
self.stats[max_key] = scalar
self.stats[counter_key] = 1
def delete_key_in_running_stats(self, key):
"""
This deletes a particular key in the running stats
dictionary, if it exists
"""
mean_key = "{}_mean".format(key)
counter_key = "{}_counter".format(key)
min_key = "{}_min".format(key)
max_key = "{}_max".format(key)
try:
del mean_key
del counter_key
del min_key
del max_key
except KeyError:
pass
def get_env_filepaths(self):
"""
......@@ -213,6 +249,86 @@ class FlatlandRemoteEvaluationService:
) for x in env_paths])
return env_paths
def instantiate_evaluation_metadata(self):
"""
This instantiates a pandas dataframe to record
information specific to each of the individual env evaluations.
This loads the template CSV with pre-filled information from the
provided metadata.csv file, and fills it up with
evaluation runtime information.
"""
self.evaluation_metadata_df = None
metadata_file_path = os.path.join(
self.test_env_folder,
"metadata.csv"
)
if os.path.exists(metadata_file_path):
self.evaluation_metadata_df = pd.read_csv(metadata_file_path)
self.evaluation_metadata_df["filename"] = \
self.evaluation_metadata_df["test_id"] + \
"/" + self.evaluation_metadata_df["env_id"] + ".pkl"
self.evaluation_metadata_df = self.evaluation_metadata_df.set_index("filename")
# Add custom columns for evaluation specific metrics
self.evaluation_metadata_df["reward"] = np.nan
self.evaluation_metadata_df["normalized_reward"] = np.nan
self.evaluation_metadata_df["percentage_complete"] = np.nan
self.evaluation_metadata_df["steps"] = np.nan
self.evaluation_metadata_df["simulation_time"] = np.nan
# Add client specific columns
# TODO: This needs refactoring
self.evaluation_metadata_df["controller_inference_time_min"] = np.nan
self.evaluation_metadata_df["controller_inference_time_mean"] = np.nan
self.evaluation_metadata_df["controller_inference_time_max"] = np.nan
else:
print("[WARNING] metadata.csv not found in tests folder. Granular metric collection is hence Disabled.")
def update_evaluation_metadata(self):
"""
This function is called when we move from one simulation to another
and it simply tries to update the simulation specific information
for the **previous** episode in the metadata_df if it exists.
"""
if self.evaluation_metadata_df is not None and len(self.simulation_env_file_paths) > 0:
last_simulation_env_file_path = self.simulation_env_file_paths[-1]
_row = self.evaluation_metadata_df.loc[
last_simulation_env_file_path
]
_row.reward = self.simulation_rewards[-1]
_row.normalized_reward = self.simulation_rewards_normalized[-1]
_row.percentage_complete = self.simulation_percentage_complete[-1]
_row.steps = self.simulation_steps[-1]
_row.simulation_time = self.simulation_times[-1]
# TODO: This needs refactoring
# Add controller_inference_time_metrics
_row.controller_inference_time_min = self.stats[
"current_episode_controller_inference_time_min"
]
_row.controller_inference_time_mean = self.stats[
"current_episode_controller_inference_time_mean"
]
_row.controller_inference_time_max = self.stats[
"current_episode_controller_inference_time_max"
]
self.evaluation_metadata_df.loc[
last_simulation_env_file_path
] = _row
# Delete this key from the stats to ensure that it
# gets computed again from scratch in the next episode
self.delete_key_in_running_stats(
"current_episode_controller_inference_time")
if self.verbose:
print(self.evaluation_metadata_df)
def instantiate_redis_connection_pool(self):
"""
......@@ -322,7 +438,7 @@ class FlatlandRemoteEvaluationService:
print("Received Request : ", command)
message_queue_latency = time.time() - command["timestamp"]
self.update_running_mean_stats("message_queue_latency", message_queue_latency)
self.update_running_stats("message_queue_latency", message_queue_latency)
return command
def send_response(self, _command_response, command, suppress_logs=False):
......@@ -390,9 +506,20 @@ class FlatlandRemoteEvaluationService:
if self.begin_simulation:
# If begin simulation has already been initialized
# atleast once
# This adds the simulation time for the previous episode
self.simulation_times.append(time.time() - self.begin_simulation)
self.begin_simulation = time.time()
# Update evaluation metadata for the previous episode
self.update_evaluation_metadata()
# Start adding placeholders for the new episode
self.simulation_env_file_paths.append(
os.path.relpath(
test_env_file_path,
self.test_env_folder
)) # relative path
self.simulation_rewards.append(0)
self.simulation_rewards_normalized.append(0)
self.simulation_percentage_complete.append(0)
......@@ -465,10 +592,17 @@ class FlatlandRemoteEvaluationService:
has done['__all__']==True")
action = _payload['action']
inference_time = _payload['inference_time']
# We record this metric in two keys:
# - One for the current episode
# - One global
self.update_running_stats("current_episode_controller_inference_time", inference_time)
self.update_running_stats("controller_inference_time", inference_time)
time_start = time.time()
_observation, all_rewards, done, info = self.env.step(action)
time_diff = time.time() - time_start
self.update_running_mean_stats("internal_env_step_time", time_diff)
self.update_running_stats("internal_env_step_time", time_diff)
cumulative_reward = sum(all_rewards.values())
self.simulation_rewards[-1] += cumulative_reward
......@@ -531,11 +665,21 @@ class FlatlandRemoteEvaluationService:
print("=" * 100)
for _key in self.stats:
if _key.endswith("_mean"):
print("\t - {}\t:{}".format(_key, self.stats[_key]))
metric_name = _key.replace("_mean", "")
mean_key = "{}_mean".format(metric_name)
min_key = "{}_min".format(metric_name)
max_key = "{}_max".format(metric_name)
print("\t - {}\t => min: {} || mean: {} || max: {}".format(
metric_name,
self.stats[min_key],
self.stats[mean_key],
self.stats[max_key]))
print("=" * 100)
# Register simulation time of the last episode
self.simulation_times.append(time.time() - self.begin_simulation)
# Compute the evaluation metadata for the last episode
self.update_evaluation_metadata()
if len(self.simulation_rewards) != len(self.env_file_paths):
raise Exception(
......@@ -581,6 +725,17 @@ class FlatlandRemoteEvaluationService:
else:
print("[WARNING] Ignoring uploading of video to S3")
#####################################################################
# Write Results to a file (if applicable)
#####################################################################
if self.result_output_path:
if self.evaluation_metadata_df is not None:
self.evaluation_metadata_df.to_csv(self.result_output_path)
print("Wrote output results to : {}".format(self.result_output_path))
else:
print("[WARING] Unable to write final results to the specified path"
" as metadata.csv is not provided in the tests_folder")
_command_response = {}
_command_response['type'] = messages.FLATLAND_RL.ENV_SUBMIT_RESPONSE
_payload = {}
......@@ -749,7 +904,8 @@ if __name__ == "__main__":
flatland_rl_service_id=args.service_id,
verbose=False,
visualize=True,
video_generation_envs=["Test_0/Level_100.pkl"]
video_generation_envs=["Test_0/Level_100.pkl"],
result_output_path="/tmp/output.csv"
)
result = grader.run()
if result['type'] == messages.FLATLAND_RL.ENV_SUBMIT_RESPONSE:
......
......@@ -13,6 +13,7 @@ msgpack>=0.6.1
msgpack-numpy>=0.4.4.0
svgutils>=0.3.1
pyarrow>=0.13.0
pandas>=0.25.1
importlib-metadata>=0.17
importlib-resources>=1.0.1
six>=1.12.0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment