diff --git a/.gitignore b/.gitignore index 82c633d27c6d9c3b332b27e0e73a7eb05385934e..9cdbd1b04e3227442af0509adf7ced42411b266e 100644 --- a/.gitignore +++ b/.gitignore @@ -111,3 +111,5 @@ ENV/ images/test/ test_save.dat + +.visualizations \ No newline at end of file diff --git a/flatland/core/env_observation_builder.py b/flatland/core/env_observation_builder.py index 53e7a068b73f9907217777251bce0fdd704603be..060785f537251484ab4fd1c520fc92bc8a564cbc 100644 --- a/flatland/core/env_observation_builder.py +++ b/flatland/core/env_observation_builder.py @@ -73,3 +73,24 @@ class ObservationBuilder: direction = np.zeros(4) direction[agent.direction] = 1 return direction + +class DummyObservationBuilder(ObservationBuilder): + """ + DummyObservationBuilder class which returns dummy observations + This is used in the evaluation service + """ + + def __init__(self): + self.observation_space = () + + def _set_env(self, env): + self.env = env + + def reset(self): + pass + + def get_many(self, handles=[]): + return True + + def get(self, handle=0): + return True diff --git a/flatland/evaluators/__init__.py b/flatland/evaluators/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..11d856251207d2058fabc08ea31f2c35e9c96d8e --- /dev/null +++ b/flatland/evaluators/__init__.py @@ -0,0 +1,6 @@ +# -*- coding: utf-8 -*- + +"""Top-level package for flatland.""" + +__author__ = """S.P. Mohanty""" +__email__ = 'mohanty@aicrowd.com' diff --git a/flatland/evaluators/aicrowd_helpers.py b/flatland/evaluators/aicrowd_helpers.py new file mode 100644 index 0000000000000000000000000000000000000000..f368c4cc2eee848fc8277ea0df061780ef3187ae --- /dev/null +++ b/flatland/evaluators/aicrowd_helpers.py @@ -0,0 +1,109 @@ +import os +import boto3 +import uuid +import subprocess + +############################################################### +# Expected Env Variables +############################################################### +# Default Values to be provided : +# AICROWD_IS_GRADING : true +# CROWDAI_IS_GRADING : true +# S3_BUCKET : aicrowd-production +# S3_UPLOAD_PATH_TEMPLATE : misc/flatland-rl-Media/{}.mp4 +# AWS_ACCESS_KEY_ID +# AWS_SECRET_ACCESS_KEY +# http_proxy +# https_proxy +############################################################### +AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID", False) +AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY", False) +S3_BUCKET = os.getenv("S3_BUCKET", "aicrowd-production") +S3_UPLOAD_PATH_TEMPLATE = os.getenv("S3_UPLOAD_PATH_TEMPLATE", "misc/flatland-rl-Media/{}.mp4") + + +def get_boto_client(): + if not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY: + raise Exception("AWS Credentials not provided..") + return boto3.client( + 's3', + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY + ) + + +def is_aws_configured(): + if not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY: + return False + else: + return True + + +def is_grading(): + return os.getenv("CROWDAI_IS_GRADING", False) or \ + os.getenv("AICROWD_IS_GRADING", False) + + +def upload_to_s3(localpath): + s3 = get_boto_client() + if not S3_UPLOAD_PATH_TEMPLATE: + raise Exception("S3_UPLOAD_PATH_TEMPLATE not provided...") + if not S3_BUCKET: + raise Exception("S3_BUCKET not provided...") + + image_target_key = S3_UPLOAD_PATH_TEMPLATE.format(str(uuid.uuid4())) + s3.put_object( + ACL="public-read", + Bucket=S3_BUCKET, + Key=image_target_key, + Body=open(localpath, 'rb') + ) + return image_target_key + + +def make_subprocess_call(command, shell=False): + result = subprocess.run( + command.split(), + shell=shell, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + stdout = result.stdout.decode('utf-8') + stderr = result.stderr.decode('utf-8') + return result.returncode, stdout, stderr + + +def generate_movie_from_frames(frames_folder): + """ + Expects the frames in the frames_folder folder + and then use ffmpeg to generate the video + which writes the output to the frames_folder + """ + # Generate Thumbnail Video + print("Generating Thumbnail...") + frames_path = os.path.join(frames_folder, "flatland_frame_%04d.png") + thumb_output_path = os.path.join(frames_folder, "out_thumb.mp4") + return_code, output, output_err = make_subprocess_call( + "ffmpeg -r 7 -start_number 0 -i " + + frames_path + + " -c:v libx264 -vf fps=7 -pix_fmt yuv420p -s 320x320 " + + thumb_output_path + ) + if return_code != 0: + raise Exception(output_err) + + # Generate Normal Sized Video + print("Generating Normal Video...") + frames_path = os.path.join(frames_folder, "flatland_frame_%04d.png") + output_path = os.path.join(frames_folder, "out.mp4") + return_code, output, output_err = make_subprocess_call( + "ffmpeg -r 7 -start_number 0 -i " + + frames_path + + " -c:v libx264 -vf fps=7 -pix_fmt yuv420p -s 600x600 " + + output_path + ) + if return_code != 0: + raise Exception(output_err) + + return output_path, thumb_output_path + diff --git a/flatland/evaluators/client.py b/flatland/evaluators/client.py new file mode 100644 index 0000000000000000000000000000000000000000..f74af355d8e6233d7569d9a786068586bbf6a089 --- /dev/null +++ b/flatland/evaluators/client.py @@ -0,0 +1,265 @@ +import redis +import json +import os +import numpy as np +import msgpack +import msgpack_numpy as m +import hashlib +import random +from flatland.evaluators import messages +from flatland.envs.rail_env import RailEnv +from flatland.envs.generators import rail_from_file +from flatland.envs.observations import TreeObsForRailEnv +from flatland.envs.predictions import ShortestPathPredictorForRailEnv +import time +import logging +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +m.patch() + + +def are_dicts_equal(d1, d2): + """ return True if all keys and values are the same """ + return all(k in d2 and d1[k] == d2[k] + for k in d1) \ + and all(k in d1 and d1[k] == d2[k] + for k in d2) + + +class FlatlandRemoteClient(object): + """ + Redis client to interface with flatland-rl remote-evaluation-service + The Docker container hosts a redis-server inside the container. + This client connects to the same redis-server, + and communicates with the service. + The service eventually will reside outside the docker container, + and will communicate + with the client only via the redis-server of the docker container. + On the instantiation of the docker container, one service will be + instantiated parallely. + The service will accepts commands at "`service_id`::commands" + where `service_id` is either provided as an `env` variable or is + instantiated to "flatland_rl_redis_service_id" + """ + def __init__(self, + remote_host='127.0.0.1', + remote_port=6379, + remote_db=0, + remote_password=None, + verbose=False): + + self.remote_host = remote_host + self.remote_port = remote_port + self.remote_db = remote_db + self.remote_password = remote_password + self.redis_pool = redis.ConnectionPool( + host=remote_host, + port=remote_port, + db=remote_db, + password=remote_password) + self.namespace = "flatland-rl" + try: + self.service_id = os.environ['FLATLAND_RL_SERVICE_ID'] + except KeyError: + self.service_id = "FLATLAND_RL_SERVICE_ID" + self.command_channel = "{}::{}::commands".format( + self.namespace, + self.service_id + ) + self.verbose = verbose + + self.env = None + self.ping_pong() + + def get_redis_connection(self): + return redis.Redis(connection_pool=self.redis_pool) + + def _generate_response_channel(self): + random_hash = hashlib.md5( + "{}".format( + random.randint(0, 10**10) + ).encode('utf-8')).hexdigest() + response_channel = "{}::{}::response::{}".format(self.namespace, + self.service_id, + random_hash) + return response_channel + + def _blocking_request(self, _request): + """ + request: + -command_type + -payload + -response_channel + response: (on response_channel) + - RESULT + * Send the payload on command_channel (self.namespace+"::command") + ** redis-left-push (LPUSH) + * Keep listening on response_channel (BLPOP) + """ + assert isinstance(_request, dict) + _request['response_channel'] = self._generate_response_channel() + + _redis = self.get_redis_connection() + """ + The client always pushes in the left + and the service always pushes in the right + """ + if self.verbose: + print("Request : ", _request) + # Push request in command_channels + # Note: The patched msgpack supports numpy arrays + payload = msgpack.packb(_request, default=m.encode, use_bin_type=True) + _redis.lpush(self.command_channel, payload) + # Wait with a blocking pop for the response + _response = _redis.blpop(_request['response_channel'])[1] + if self.verbose: + print("Response : ", _response) + _response = msgpack.unpackb( + _response, + object_hook=m.decode, + encoding="utf8") + if _response['type'] == messages.FLATLAND_RL.ERROR: + raise Exception(str(_response["payload"])) + else: + return _response + + def ping_pong(self): + """ + Official Handshake with the evaluation service + Send a PING + and wait for PONG + If not PONG, raise error + """ + _request = {} + _request['type'] = messages.FLATLAND_RL.PING + _request['payload'] = {} + _response = self._blocking_request(_request) + if _response['type'] != messages.FLATLAND_RL.PONG: + raise Exception( + "Unable to perform handshake with the redis service. \ + Expected PONG; received {}".format(json.dumps(_response))) + else: + return True + + def env_create(self, obs_builder_object): + """ + Create a local env and remote env on which the + local agent can operate. + The observation builder is only used in the local env + and the remote env uses a DummyObservationBuilder + """ + _request = {} + _request['type'] = messages.FLATLAND_RL.ENV_CREATE + _request['payload'] = {} + _response = self._blocking_request(_request) + observation = _response['payload']['observation'] + + if not observation: + # If the observation is False, + # then the evaluations are complete + # hence return false + return observation + + test_env_file_path = _response['payload']['env_file_path'] + self.env = RailEnv( + width=1, + height=1, + rail_generator=rail_from_file(test_env_file_path), + obs_builder_object=obs_builder_object + ) + self.env._max_episode_steps = \ + int(1.5 * (self.env.width + self.env.height)) + + local_observation = self.env.reset() + # Use the local observation + # as the remote server uses a dummy observation builder + return local_observation + + def env_step(self, action, render=False): + """ + Respond with [observation, reward, done, info] + """ + _request = {} + _request['type'] = messages.FLATLAND_RL.ENV_STEP + _request['payload'] = {} + _request['payload']['action'] = action + _response = self._blocking_request(_request) + _payload = _response['payload'] + + # remote_observation = _payload['observation'] + remote_reward = _payload['reward'] + remote_done = _payload['done'] + remote_info = _payload['info'] + + # Replicate the action in the local env + local_observation, local_rewards, local_done, local_info = \ + self.env.step(action) + + assert are_dicts_equal(remote_reward, local_rewards) + assert are_dicts_equal(remote_done, local_done) + + # Return local_observation instead of remote_observation + # as the remote_observation is build using a dummy observation + # builder + # We return the remote rewards and done as they are the + # once used by the evaluator + return [local_observation, remote_reward, remote_done, remote_info] + + def submit(self): + _request = {} + _request['type'] = messages.FLATLAND_RL.ENV_SUBMIT + _request['payload'] = {} + _response = self._blocking_request(_request) + if os.getenv("AICROWD_BLOCKING_SUBMIT"): + """ + If the submission is supposed to happen as a blocking submit, + then wait indefinitely for the evaluator to decide what to + do with the container. + """ + while True: + time.sleep(10) + return _response['payload'] + + +if __name__ == "__main__": + remote_client = FlatlandRemoteClient() + + def my_controller(obs, _env): + _action = {} + for _idx, _ in enumerate(_env.agents): + _action[_idx] = np.random.randint(0, 5) + return _action + + my_observation_builder = TreeObsForRailEnv(max_depth=3, + predictor=ShortestPathPredictorForRailEnv()) + + episode = 0 + obs = True + while obs: + obs = remote_client.env_create( + obs_builder_object=my_observation_builder + ) + if not obs: + """ + The remote env returns False as the first obs + when it is done evaluating all the individual episodes + """ + break + print("Episode : {}".format(episode)) + episode += 1 + + print(remote_client.env.dones['__all__']) + + while True: + action = my_controller(obs, remote_client.env) + observation, all_rewards, done, info = remote_client.env_step(action) + if done['__all__']: + print("Current Episode : ", episode) + print("Episode Done") + print("Reward : ", sum(list(all_rewards.values()))) + break + + print("Evaluation Complete...") + print(remote_client.submit()) + + diff --git a/flatland/evaluators/messages.py b/flatland/evaluators/messages.py new file mode 100644 index 0000000000000000000000000000000000000000..dfe71efb39d9a80180c4ef3838e7f9d94c2d9db7 --- /dev/null +++ b/flatland/evaluators/messages.py @@ -0,0 +1,18 @@ +class FLATLAND_RL: + PING = "FLATLAND_RL.PING" + PONG = "FLATLAND_RL.PONG" + + ENV_CREATE = "FLATLAND_RL.ENV_CREATE" + ENV_CREATE_RESPONSE = "FLATLAND_RL.ENV_CREATE_RESPONSE" + + ENV_RESET = "FLATLAND_RL.ENV_RESET" + ENV_RESET_RESPONSE = "FLATLAND_RL.ENV_RESET_RESPONSE" + + ENV_STEP = "FLATLAND_RL.ENV_STEP" + ENV_STEP_RESPONSE = "FLATLAND_RL.ENV_STEP_RESPONSE" + + ENV_SUBMIT = "FLATLAND_RL.ENV_SUBMIT" + ENV_SUBMIT_RESPONSE = "FLATLAND_RL.ENV_SUBMIT_RESPONSE" + + ERROR = "FLATLAND_RL.ERROR" + \ No newline at end of file diff --git a/flatland/evaluators/service.py b/flatland/evaluators/service.py new file mode 100644 index 0000000000000000000000000000000000000000..445d9226d230e944133ce50867c68c59cf0f1708 --- /dev/null +++ b/flatland/evaluators/service.py @@ -0,0 +1,612 @@ +#!/usr/bin/env python +from __future__ import print_function +import redis +from flatland.envs.generators import rail_from_file +from flatland.envs.rail_env import RailEnv +from flatland.core.env_observation_builder import DummyObservationBuilder +from flatland.evaluators import messages +from flatland.evaluators import aicrowd_helpers +from flatland.utils.rendertools import RenderTool +import numpy as np +import msgpack +import msgpack_numpy as m +import os +import shutil +import timeout_decorator +import time +import traceback +import crowdai_api +m.patch() + +######################################################## +# CONSTANTS +######################################################## +PER_STEP_TIMEOUT = 10*60 # 5 minutes + + +class FlatlandRemoteEvaluationService: + """ + A remote evaluation service which exposes the following interfaces + of a RailEnv : + - env_create + - env_step + and an additional `env_submit` to cater to score computation and + on-episode-complete post processings. + + This service is designed to be used in conjunction with + `FlatlandRemoteClient` and both the srevice and client maintain a + local instance of the RailEnv instance, and in case of any unexpected + divergences in the state of both the instances, the local RailEnv + instance of the `FlatlandRemoteEvaluationService` is supposed to act + as the single source of truth. + + Both the client and remote service communicate with each other + via Redis as a message broker. The individual messages are packed and + unpacked with `msgpack` (a patched version of msgpack which also supports + numpy arrays). + """ + def __init__(self, + test_env_folder="/tmp", + flatland_rl_service_id='FLATLAND_RL_SERVICE_ID', + remote_host='127.0.0.1', + remote_port=6379, + remote_db=0, + remote_password=None, + visualize=False, + video_generation_envs=[], + report=None, + verbose=False): + + # Test Env folder Paths + self.test_env_folder = test_env_folder + self.video_generation_envs = video_generation_envs + self.video_generation_indices = [] + self.env_file_paths = self.get_env_filepaths() + print(self.video_generation_indices) + + # Logging and Reporting related vars + self.verbose = verbose + self.report = report + + # Communication Protocol Related vars + self.namespace = "flatland-rl" + self.service_id = flatland_rl_service_id + self.command_channel = "{}::{}::commands".format( + self.namespace, + self.service_id + ) + + # Message Broker related vars + self.remote_host = remote_host + self.remote_port = remote_port + self.remote_db = remote_db + self.remote_password = remote_password + self.instantiate_redis_connection_pool() + + # AIcrowd evaluation specific vars + self.oracle_events = crowdai_api.events.CrowdAIEvents(with_oracle=True) + self.evaluation_state = { + "state": "PENDING", + "progress": 0.0, + "simulation_count": 0, + "total_simulation_count": len(self.env_file_paths), + "score": { + "score": 0.0, + "score_secondary": 0.0 + } + } + + # RailEnv specific variables + self.env = False + self.env_renderer = False + self.reward = 0 + self.simulation_count = 0 + self.simulation_rewards = [] + self.simulation_percentage_complete = [] + self.simulation_steps = [] + self.simulation_times = [] + self.begin_simulation = False + self.current_step = 0 + self.visualize = visualize + self.vizualization_folder_name = "./.visualizations" + self.record_frame_step = 0 + + if self.visualize: + try: + shutil.rmtree(self.vizualization_folder_name) + except Exception as e: + print(e) + + os.mkdir(self.vizualization_folder_name) + + def get_env_filepaths(self): + """ + Gathers a list of all available rail env files to be used + for evaluation. The folder structure expected at the `test_env_folder` + is similar to : + + . + ├── Test_0 + │  ├── Level_1.pkl + │  ├── ....... + │  ├── ....... + │  └── Level_99.pkl + └── Test_1 + ├── Level_1.pkl +   ├── ....... +   ├── ....... + └── Level_99.pkl + """ + env_paths = [] + folder_path = self.test_env_folder + for root, dirs, files in os.walk(folder_path): + for file in files: + if file.endswith(".pkl"): + env_paths.append( + os.path.join(root, file) + ) + env_paths = sorted(env_paths) + for _idx, env_path in enumerate(env_paths): + """ + Here we collect the indices of the environments for which + we need to generate the videos + + We increment the simulation count on env_create + so the 1st simulation has an index of 1, when comparing in + env_step + """ + for vg_env in self.video_generation_envs: + if vg_env in env_path: + self.video_generation_indices.append(_idx+1) + return sorted(env_paths) + + def instantiate_redis_connection_pool(self): + """ + Instantiates a Redis connection pool which can be used to + communicate with the message broker + """ + if self.verbose or self.report: + print("Attempting to connect to redis server at {}:{}/{}".format( + self.remote_host, + self.remote_port, + self.remote_db)) + + self.redis_pool = redis.ConnectionPool( + host=self.remote_host, + port=self.remote_port, + db=self.remote_db, + password=self.remote_password + ) + + def get_redis_connection(self): + """ + Obtains a new redis connection from a previously instantiated + redis connection pool + """ + redis_conn = redis.Redis(connection_pool=self.redis_pool) + try: + redis_conn.ping() + except Exception as e: + raise Exception( + "Unable to connect to redis server at {}:{} ." + "Are you sure there is a redis-server running at the " + "specified location ?".format( + self.remote_host, + self.remote_port + ) + ) + return redis_conn + + def _error_template(self, payload): + """ + Simple helper function to pass a payload as a part of a + flatland comms error template. + """ + _response = {} + _response['type'] = messages.FLATLAND_RL.ERROR + _response['payload'] = payload + return _response + + @timeout_decorator.timeout(PER_STEP_TIMEOUT) # timeout for each command + def _get_next_command(self, _redis): + """ + A low level wrapper for obtaining the next command from a + pre-agreed command channel. + At the momment, the communication protocol uses lpush for pushing + in commands, and brpop for reading out commands. + """ + command = _redis.brpop(self.command_channel)[1] + return command + + def get_next_command(self): + """ + A helper function to obtain the next command, which transparently + also deals with things like unpacking of the command from the + packed message, and consider the timeouts, etc when trying to + fetch a new command. + """ + try: + _redis = self.get_redis_connection() + command = self._get_next_command(_redis) + if self.verbose or self.report: + print("Command Service: ", command) + except timeout_decorator.timeout_decorator.TimeoutError: + raise Exception( + "Timeout in step {} of simulation {}".format( + self.current_step, + self.simulation_count + )) + command = msgpack.unpackb( + command, + object_hook=m.decode, + encoding="utf8" + ) + if self.verbose: + print("Received Request : ", command) + + return command + + def send_response(self, _command_response, command, suppress_logs=False): + _redis = self.get_redis_connection() + command_response_channel = command['response_channel'] + + if self.verbose and not suppress_logs: + print("Responding with : ", _command_response) + + _redis.rpush( + command_response_channel, + msgpack.packb( + _command_response, + default=m.encode, + use_bin_type=True) + ) + + def handle_ping(self, command): + """ + Handles PING command from the client. + """ + _command_response = {} + _command_response['type'] = messages.FLATLAND_RL.PONG + _command_response['payload'] = {} + + self.send_response(_command_response, command) + + def handle_env_create(self, command): + """ + Handles a ENV_CREATE command from the client + TODO: + Add a high level summary of everything thats + hapenning here. + """ + + if self.simulation_count < len(self.env_file_paths): + """ + There are still test envs left that are yet to be evaluated + """ + + test_env_file_path = self.env_file_paths[self.simulation_count] + del self.env + self.env = RailEnv( + width=1, + height=1, + rail_generator=rail_from_file(test_env_file_path), + obs_builder_object=DummyObservationBuilder() + ) + if self.visualize: + if self.env_renderer: + del self.env_renderer + self.env_renderer = RenderTool(self.env, gl="PILSVG", ) + + # Set max episode steps allowed + self.env._max_episode_steps = \ + int(1.5 * (self.env.width + self.env.height)) + + self.simulation_count += 1 + + if self.begin_simulation: + # If begin simulation has already been initialized + # atleast once + self.simulation_times.append(time.time()-self.begin_simulation) + self.begin_simulation = time.time() + + self.simulation_rewards.append(0) + self.simulation_percentage_complete.append(0) + self.simulation_steps.append(0) + + self.current_step = 0 + + _observation = self.env.reset() + + _command_response = {} + _command_response['type'] = messages.FLATLAND_RL.ENV_CREATE_RESPONSE + _command_response['payload'] = {} + _command_response['payload']['observation'] = _observation + _command_response['payload']['env_file_path'] = test_env_file_path + else: + """ + All test env evaluations are complete + """ + _command_response = {} + _command_response['type'] = messages.FLATLAND_RL.ENV_CREATE_RESPONSE + _command_response['payload'] = {} + _command_response['payload']['observation'] = False + _command_response['payload']['env_file_path'] = False + + self.send_response(_command_response, command) + ##################################################################### + # Update evaluation state + ##################################################################### + progress = np.clip( + self.simulation_count * 1.0 / len(self.env_file_paths), + 0, 1) + mean_reward = np.mean(self.simulation_rewards) + mean_percentage_complete = np.mean(self.simulation_percentage_complete) + self.evaluation_state["state"] = "IN_PROGRESS" + self.evaluation_state["progress"] = progress + self.evaluation_state["simulation_count"] = self.simulation_count + self.evaluation_state["score"]["score"] = mean_percentage_complete + self.evaluation_state["score"]["score_secondary"] = mean_reward + self.handle_aicrowd_info_event(self.evaluation_state) + + def handle_env_step(self, command): + """ + Handles a ENV_STEP command from the client + TODO: + Add a high level summary of everything thats + hapenning here. + """ + _payload = command['payload'] + + if not self.env: + raise Exception( + "env_client.step called before env_client.env_create() call") + if self.env.dones['__all__']: + raise Exception( + "Client attempted to perform an action on an Env which \ + has done['__all__']==True") + + action = _payload['action'] + _observation, all_rewards, done, info = self.env.step(action) + + cumulative_reward = np.sum(list(all_rewards.values())) + self.simulation_rewards[-1] += cumulative_reward + self.simulation_steps[-1] += 1 + + if done["__all__"]: + # Compute percentage complete + complete = 0 + for i_agent in range(self.env.get_num_agents()): + agent = self.env.agents[i_agent] + if agent.position == agent.target: + complete += 1 + percentage_complete = complete * 1.0 / self.env.get_num_agents() + self.simulation_percentage_complete[-1] = percentage_complete + + # Record Frame + if self.visualize: + self.env_renderer.render_env(show=False, show_observations=False, show_predictions=False) + """ + Only save the frames for environments which are separately provided + in video_generation_indices param + """ + if self.simulation_count in self.video_generation_indices: + self.env_renderer.gl.save_image( + os.path.join( + self.vizualization_folder_name, + "flatland_frame_{:04d}.png".format(self.record_frame_step) + )) + self.record_frame_step += 1 + + # Build and send response + _command_response = {} + _command_response['type'] = messages.FLATLAND_RL.ENV_STEP_RESPONSE + _command_response['payload'] = {} + _command_response['payload']['observation'] = _observation + _command_response['payload']['reward'] = all_rewards + _command_response['payload']['done'] = done + _command_response['payload']['info'] = info + self.send_response(_command_response, command) + + def handle_env_submit(self, command): + """ + Handles a ENV_SUBMIT command from the client + TODO: + Add a high level summary of everything thats + hapenning here. + """ + _payload = command['payload'] + + # Register simulation time of the last episode + self.simulation_times.append(time.time()-self.begin_simulation) + + if len(self.simulation_rewards) != len(self.env_file_paths): + raise Exception( + """env.submit called before the agent had the chance + to operate on all the test environments. + """ + ) + + mean_reward = np.mean(self.simulation_rewards) + mean_percentage_complete = np.mean(self.simulation_percentage_complete) + + if self.visualize and len(os.listdir(self.vizualization_folder_name)) > 0: + # Generate the video + # + # Note, if you had depdency issues due to ffmpeg, you can + # install it by : + # + # conda install -c conda-forge x264 ffmpeg + + print("Generating Video from thumbnails...") + video_output_path, video_thumb_output_path = \ + aicrowd_helpers.generate_movie_from_frames( + self.vizualization_folder_name + ) + print("Videos : ", video_output_path, video_thumb_output_path) + # Upload to S3 if configuration is available + if aicrowd_helpers.is_grading() and aicrowd_helpers.is_aws_configured() and self.visualize: + video_s3_key = aicrowd_helpers.upload_to_s3( + video_output_path + ) + video_thumb_s3_key = aicrowd_helpers.upload_to_s3( + video_thumb_output_path + ) + self.evaluation_state["score"]["media_content_type"] = "video/mp4" + self.evaluation_state["score"]["media_large"] = video_s3_key + self.evaluation_state["score"]["media_thumbnail"] = video_thumb_s3_key + else: + print("[WARNING] Ignoring uploading of video to S3") + + _command_response = {} + _command_response['type'] = messages.FLATLAND_RL.ENV_SUBMIT_RESPONSE + _payload = {} + _payload['mean_reward'] = mean_reward + _payload['mean_percentage_complete'] = mean_percentage_complete + _command_response['payload'] = _payload + self.send_response(_command_response, command) + + ##################################################################### + # Update evaluation state + ##################################################################### + self.evaluation_state["state"] = "FINISHED" + self.evaluation_state["progress"] = 1.0 + self.evaluation_state["simulation_count"] = self.simulation_count + self.evaluation_state["score"]["score"] = mean_percentage_complete + self.evaluation_state["score"]["score_secondary"] = mean_reward + self.handle_aicrowd_success_event(self.evaluation_state) + + def report_error(self, error_message, command_response_channel): + """ + A helper function used to report error back to the client + """ + _redis = self.get_redis_connection() + _command_response = {} + _command_response['type'] = messages.FLATLAND_RL.ERROR + _command_response['payload'] = error_message + _redis.rpush( + command_response_channel, + msgpack.packb( + _command_response, + default=m.encode, + use_bin_type=True) + ) + self.evaluation_state["state"] = "ERROR" + self.evaluation_state["error"] = error_message + self.handle_aicrowd_error_event(self.evaluation_state) + + def handle_aicrowd_info_event(self, payload): + self.oracle_events.register_event( + event_type=self.oracle_events.CROWDAI_EVENT_INFO, + payload=payload + ) + + def handle_aicrowd_success_event(self, payload): + self.oracle_events.register_event( + event_type=self.oracle_events.CROWDAI_EVENT_SUCCESS, + payload=payload + ) + + def handle_aicrowd_error_event(self, payload): + self.oracle_events.register_event( + event_type=self.oracle_events.CROWDAI_EVENT_ERROR, + payload=payload + ) + + def run(self): + """ + Main runner function which waits for commands from the client + and acts accordingly. + """ + print("Listening for commands at : ", self.command_channel) + while True: + command = self.get_next_command() + + if self.verbose: + print("Self.Reward : ", self.reward) + print("Current Simulation : ", self.simulation_count) + if self.env_file_paths and \ + self.simulation_count < len(self.env_file_paths): + print("Current Env Path : ", + self.env_file_paths[self.simulation_count]) + + try: + if command['type'] == messages.FLATLAND_RL.PING: + """ + INITIAL HANDSHAKE : Respond with PONG + """ + self.handle_ping(command) + + elif command['type'] == messages.FLATLAND_RL.ENV_CREATE: + """ + ENV_CREATE + + Respond with an internal _env object + """ + self.handle_env_create(command) + elif command['type'] == messages.FLATLAND_RL.ENV_STEP: + """ + ENV_STEP + + Request : Action dict + Respond with updated [observation,reward,done,info] after step + """ + self.handle_env_step(command) + elif command['type'] == messages.FLATLAND_RL.ENV_SUBMIT: + """ + ENV_SUBMIT + + Submit the final cumulative reward + """ + self.handle_env_submit(command) + else: + _error = self._error_template( + "UNKNOWN_REQUEST:{}".format( + str(command))) + if self.verbose: + print("Responding with : ", _error) + self.report_error( + _error, + command['response_channel']) + return _error + except Exception as e: + print("Error : ", str(e)) + print(traceback.format_exc()) + self.report_error( + self._error_template(str(e)), + command['response_channel']) + return self._error_template(str(e)) + + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(description='Submit the result to AIcrowd') + parser.add_argument('--service_id', + dest='service_id', + default='FLATLAND_RL_SERVICE_ID', + required=False) + parser.add_argument('--test_folder', + dest='test_folder', + default="../../../submission-scoring/Envs-Small", + help="Folder containing the files for the test envs", + required=False) + args = parser.parse_args() + + test_folder = args.test_folder + + grader = FlatlandRemoteEvaluationService( + test_env_folder=test_folder, + flatland_rl_service_id=args.service_id, + verbose=True, + visualize=True, + video_generation_envs=["Test_0/Level_1.pkl"] + ) + result = grader.run() + if result['type'] == messages.FLATLAND_RL.ENV_SUBMIT_RESPONSE: + cumulative_results = result['payload'] + print("Results : ", cumulative_results) + elif result['type'] == messages.FLATLAND_RL.ERROR: + error = result['payload'] + raise Exception("Evaluation Failed : {}".format(str(error))) + else: + # Evaluation failed + print("Evaluation Failed : ", result['payload']) diff --git a/requirements_dev.txt b/requirements_dev.txt index 17d19533089b915b425b24a3bf47bc660bb05ff7..3beb1cfb6ebf7ef3606d512c3ff043f98bf4967d 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -3,6 +3,8 @@ tox>=3.5.2 twine>=1.12.1 pytest>=3.8.2 pytest-runner>=4.2 +crowdai-api>=0.1.21 +boto3>=1.9.194 numpy>=1.16.2 recordtype>=1.3 xarray>=0.11.3 @@ -17,5 +19,6 @@ pyarrow>=0.13.0 importlib-metadata>=0.17 importlib-resources>=1.0.1 six>=1.12.0 +timeout-decorator>=0.4.1 attrs ushlex