diff --git a/flatland/evaluators/messages.py b/flatland/evaluators/messages.py index 3f23ea996c613c796893213724bf244230d05eb2..dfe71efb39d9a80180c4ef3838e7f9d94c2d9db7 100644 --- a/flatland/evaluators/messages.py +++ b/flatland/evaluators/messages.py @@ -13,5 +13,6 @@ class FLATLAND_RL: ENV_SUBMIT = "FLATLAND_RL.ENV_SUBMIT" ENV_SUBMIT_RESPONSE = "FLATLAND_RL.ENV_SUBMIT_RESPONSE" - - ERROR = "FLATLAND_RL.ERROR" \ No newline at end of file + + ERROR = "FLATLAND_RL.ERROR" + \ No newline at end of file diff --git a/flatland/evaluators/service.py b/flatland/evaluators/service.py index ef2266ac031221de74afc48d609d8066cbea2262..a19b5493c57fa84d34a9904a0e5d80b06912281a 100644 --- a/flatland/evaluators/service.py +++ b/flatland/evaluators/service.py @@ -22,6 +22,26 @@ PER_STEP_TIMEOUT = 5*60 # 5 minutes class FlatlandRemoteEvaluationService: + """ + A remote evaluation service which exposes the following interfaces + of a RailEnv : + - env_create + - env_step + and an additional `env_submit` to cater to score computation and + on-episode-complete post processings. + + This service is designed to be used in conjunction with + `FlatlandRemoteClient` and both the srevice and client maintain a + local instance of the RailEnv instance, and in case of any unexpected + divergences in the state of both the instances, the local RailEnv + instance of the `FlatlandRemoteEvaluationService` is supposed to act + as the single source of truth. + + Both the client and remote service communicate with each other + via Redis as a message broker. The individual messages are packed and + unpacked with `msgpack` (a patched version of msgpack which also supports + numpy arrays). + """ def __init__(self, test_env_folder="/tmp", flatland_rl_service_id='FLATLAND_RL_SERVICE_ID', @@ -70,6 +90,23 @@ class FlatlandRemoteEvaluationService: self.visualize = visualize def get_env_filepaths(self): + """ + Gathers a list of all available rail env files to be used + for evaluation. The folder structure expected at the `test_env_folder` + is similar to : + + . + ├── Test_0 + │  ├── Level_1.pkl + │  ├── ....... + │  ├── ....... + │  └── Level_99.pkl + └── Test_1 + ├── Level_1.pkl +   ├── ....... +   ├── ....... + └── Level_99.pkl + """ env_paths = [] folder_path = self.test_env_folder for root, dirs, files in os.walk(folder_path): @@ -81,6 +118,10 @@ class FlatlandRemoteEvaluationService: return sorted(env_paths) def instantiate_redis_connection_pool(self): + """ + Instantiates a Redis connection pool which can be used to + communicate with the message broker + """ if self.verbose or self.report: print("Attempting to connect to redis server at {}:{}/{}".format( self.remote_host, @@ -95,6 +136,10 @@ class FlatlandRemoteEvaluationService: ) def get_redis_connection(self): + """ + Obtains a new redis connection from a previously instantiated + redis connection pool + """ redis_conn = redis.Redis(connection_pool=self.redis_pool) try: redis_conn.ping() @@ -110,6 +155,10 @@ class FlatlandRemoteEvaluationService: return redis_conn def _error_template(self, payload): + """ + Simple helper function to pass a payload as a part of a + flatland comms error template. + """ _response = {} _response['type'] = messages.FLATLAND_RL.ERROR _response['payload'] = payload @@ -117,10 +166,22 @@ class FlatlandRemoteEvaluationService: @timeout_decorator.timeout(PER_STEP_TIMEOUT) # timeout for each command def _get_next_command(self, _redis): + """ + A low level wrapper for obtaining the next command from a + pre-agreed command channel. + At the momment, the communication protocol uses lpush for pushing + in commands, and brpop for reading out commands. + """ command = _redis.brpop(self.command_channel)[1] return command def get_next_command(self): + """ + A helper function to obtain the next command, which transparently + also deals with things like unpacking of the command from the + packed message, and consider the timeouts, etc when trying to + fetch a new command. + """ try: _redis = self.get_redis_connection() command = self._get_next_command(_redis) @@ -143,6 +204,9 @@ class FlatlandRemoteEvaluationService: return command def handle_ping(self, command): + """ + Handles PING command from the client. + """ _redis = self.get_redis_connection() command_response_channel = command['response_channel'] @@ -160,6 +224,12 @@ class FlatlandRemoteEvaluationService: ) def handle_env_create(self, command): + """ + Handles a ENV_CREATE command from the client + TODO: + Add a high level summary of everything thats + hapenning here. + """ _redis = self.get_redis_connection() command_response_channel = command['response_channel'] @@ -235,6 +305,12 @@ class FlatlandRemoteEvaluationService: ) def handle_env_step(self, command): + """ + Handles a ENV_STEP command from the client + TODO: + Add a high level summary of everything thats + hapenning here. + """ _redis = self.get_redis_connection() command_response_channel = command['response_channel'] _payload = command['payload'] @@ -284,6 +360,12 @@ class FlatlandRemoteEvaluationService: ) def handle_env_submit(self, command): + """ + Handles a ENV_SUBMIT command from the client + TODO: + Add a high level summary of everything thats + hapenning here. + """ _redis = self.get_redis_connection() command_response_channel = command['response_channel'] _payload = command['payload'] @@ -318,6 +400,9 @@ class FlatlandRemoteEvaluationService: ) def report_error(self, error_message, command_response_channel): + """ + A helper function used to report error back to the client + """ _redis = self.get_redis_connection() _response = {} _response['type'] = messages.FLATLAND_RL.ERROR @@ -331,6 +416,10 @@ class FlatlandRemoteEvaluationService: ) def run(self): + """ + Main runner function which waits for commands from the client + and acts accordingly. + """ print("Listening for commands at : ", self.command_channel) while True: command = self.get_next_command()