diff --git a/flatland/evaluators/aicrowd_helpers.py b/flatland/evaluators/aicrowd_helpers.py index 606550d521ea02e7bba305fab46e4d94163a54ab..610956f4320de57294c847e29cc0227c858f3424 100644 --- a/flatland/evaluators/aicrowd_helpers.py +++ b/flatland/evaluators/aicrowd_helpers.py @@ -20,15 +20,15 @@ import pathlib ############################################################### AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", False) AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", False) -S3_BUCKET = os.getenv("S3_BUCKET", "aicrowd-production") S3_UPLOAD_PATH_TEMPLATE = os.getenv("S3_UPLOAD_PATH_TEMPLATE", "misc/flatland-rl-Media/{}") - +S3_BUCKET = os.getenv("S3_BUCKET", "aicrowd-production") +S3_BUCKET_ACL = "public-read" if S3_BUCKET == "aicrowd-production" else "" def get_boto_client(): if not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY: raise Exception("AWS Credentials not provided..") try: - import boto3 + import boto3 # type: ignore except ImportError: raise Exception( "boto3 is not installed. Please manually install by : ", @@ -54,6 +54,11 @@ def is_grading(): os.getenv("AICROWD_IS_GRADING", False) +def get_submission_id(): + submission_id = os.getenv("AICROWD_EVALUATION_NAME", f"testflatland-3-T12345-XXXX").split('-')[-2] + return submission_id + + def upload_random_frame_to_s3(frames_folder): all_frames = glob.glob(os.path.join(frames_folder, "*.png")) random_frame = random.choice(all_frames) @@ -65,7 +70,7 @@ def upload_random_frame_to_s3(frames_folder): image_target_key = (S3_UPLOAD_PATH_TEMPLATE + ".png").format(str(uuid.uuid4())) s3.put_object( - ACL="public-read", + ACL=S3_BUCKET_ACL, Bucket=S3_BUCKET, Key=image_target_key, Body=open(random_frame, 'rb') @@ -85,7 +90,7 @@ def upload_to_s3(localpath): str(uuid.uuid4()) ) s3.put_object( - ACL="public-read", + ACL=S3_BUCKET_ACL, Bucket=S3_BUCKET, Key=file_target_key, Body=open(localpath, 'rb') @@ -93,6 +98,24 @@ def upload_to_s3(localpath): return file_target_key +def upload_folder_to_s3(folderpath): + s3 = get_boto_client() + if not S3_BUCKET: + raise Exception("S3_BUCKET not provided...") + + for path, subdirs, files in os.walk(folderpath): + if len(files) != 0: + for file in files: + file_target_key = f'analysis_logs/{get_submission_id()}/{path[path.find(next(filter(str.isalpha, path))):]}/{file}' + localpath = os.path.join(path, file) + + s3.put_object( + ACL=S3_BUCKET_ACL, + Bucket=S3_BUCKET, + Key=file_target_key, + Body=open(localpath, 'rb') + ) + def make_subprocess_call(command, shell=False): result = subprocess.run( command.split(), diff --git a/flatland/evaluators/service.py b/flatland/evaluators/service.py index 97dce409039a800bfc1d3f38898b6d8de207f4f5..4d9b8213aa246f0fcbdc6387c7c13007c08f80b3 100644 --- a/flatland/evaluators/service.py +++ b/flatland/evaluators/service.py @@ -119,6 +119,7 @@ class FlatlandRemoteEvaluationService: verbose=False, action_dir=None, episode_dir=None, + analysis_data_dir=None, merge_dir=None, use_pickle=False, shuffle=False, @@ -134,6 +135,9 @@ class FlatlandRemoteEvaluationService: self.episode_dir = episode_dir if episode_dir and not os.path.exists(self.episode_dir): os.makedirs(self.episode_dir) + self.analysis_data_dir = analysis_data_dir + if analysis_data_dir and not os.path.exists(self.analysis_data_dir): + os.makedirs(self.analysis_data_dir) self.merge_dir = merge_dir if merge_dir and not os.path.exists(self.merge_dir): os.makedirs(self.merge_dir) @@ -933,6 +937,10 @@ class FlatlandRemoteEvaluationService: if self.episode_dir is not None: self.save_episode() + if self.analysis_data_dir is not None: + self.collect_analysis_data() + self.save_analysis_data() + if self.merge_dir is not None: self.save_merged_env() @@ -961,9 +969,9 @@ class FlatlandRemoteEvaluationService: def save_actions(self): sfEnv = self.env_file_paths[self.simulation_count] - sfActions = self.action_dir + "/" + sfEnv.replace(".pkl", ".json") + sfActions = os.path.join(self.action_dir, sfEnv.replace(".pkl", ".json")) - print("env path: ", sfEnv, " sfActions:", sfActions) + print("env path: ", sfEnv, " actions path:", sfActions) if not os.path.exists(os.path.dirname(sfActions)): os.makedirs(os.path.dirname(sfActions)) @@ -979,6 +987,64 @@ class FlatlandRemoteEvaluationService: print("env path: ", sfEnv, " sfEpisode:", sfEpisode) RailEnvPersister.save_episode(self.env, sfEpisode) # self.env.save_episode(sfEpisode) + + def collect_analysis_data(self): + ''' + Collect data at the END of an episode. + Data to be saved in a json file corresponding to the episode. + ''' + self.analysis_data = {} + + agent_speeds = [] + agent_states = [] + agent_earliest_departures = [] + agent_latest_arrivals = [] + agent_arrival_times = [] + agent_shortest_paths = [] # only for nor arrived trains + agent_current_delays = [] # only for not arrived trains + agent_rewards = list(self.env.rewards_dict.values()) + + + for i_agent in range(self.env.get_num_agents()): + agent = self.env.agents[i_agent] + + agent_speeds.append(agent.speed_counter.speed) + agent_states.append(agent.state) + agent_earliest_departures.append(agent.earliest_departure) + agent_latest_arrivals.append(agent.latest_arrival) + agent_arrival_times.append(agent.arrival_time) + + if (agent.state != TrainState.DONE): + agent_shortest_paths.append(len(agent.get_shortest_path(self.env.distance_map))) + agent_current_delays.append(agent.get_current_delay(self.env._elapsed_steps, self.env.distance_map)) + else: + agent_shortest_paths.append(None) + agent_current_delays.append(None) + + self.analysis_data['agent_speeds'] = agent_speeds + self.analysis_data['agent_states'] = agent_states + self.analysis_data['agent_earliest_departures'] = agent_earliest_departures + self.analysis_data['agent_latest_arrivals'] = agent_latest_arrivals + self.analysis_data['agent_arrival_times'] = agent_arrival_times + self.analysis_data['agent_shortest_paths'] = agent_shortest_paths + self.analysis_data['agent_current_delays'] = agent_current_delays + self.analysis_data['agent_rewards'] = agent_rewards + + def save_analysis_data(self): + sfEnv = self.env_file_paths[self.simulation_count] + + sfData = os.path.join(self.analysis_data_dir, sfEnv.replace(".pkl", ".json")) + + print("env path: ", sfEnv, " data path:", sfData) + + if not os.path.exists(os.path.dirname(sfData)): + os.makedirs(os.path.dirname(sfData)) + + with open(sfData, "w") as fOut: + json.dump(self.analysis_data, fOut) + + self.analysis_data = {} + def save_merged_env(self): sfEnv = self.env_file_paths[self.simulation_count] @@ -1064,6 +1130,22 @@ class FlatlandRemoteEvaluationService: self.evaluation_state["meta"]["static_media_frame"] = static_thumbnail_s3_key else: print("[WARNING] Ignoring uploading of video to S3") + + ##################################################################### + # Save `data` and `action` directories + ##################################################################### + if self.action_dir: + if aicrowd_helpers.is_grading() and aicrowd_helpers.is_aws_configured(): + aicrowd_helpers.upload_folder_to_s3(self.action_dir) + else: + print("[WARNING] Ignoring uploading action_dir to S3") + + + if self.analysis_data_dir: + if aicrowd_helpers.is_grading() and aicrowd_helpers.is_aws_configured(): + aicrowd_helpers.upload_folder_to_s3(self.analysis_data_dir) + else: + print("[WARNING] Ignoring uploading analysis_data_dir to S3") ##################################################################### # Write Results to a file (if applicable)