Commit a9c2418d authored by nilabha's avatar nilabha

added merge changes and documentation

parent 6e97f18d
# Saving Recorded Video
We use `gym monitor` for video recording
To use human rendering with recording and uploading videos, update the env_cnfig as follows
```
env_config:
render: human
```
This saves videos to the folder `flatland` in the same location as the running script.
To add a custom location, set the config `video_dir` under env_config
```
env_config:
render: human
# For saving videos in custom folder and to wandb.
# By default if not specified folder is flatland
video_dir: small_tree_video
```
By default, the folder is cleared before saving the new videos. This is done as all videos with the pattern `*0.mp4` are used for uploading to wandb and we do not want to upload videos from older runs
This behavior can be changed using the config parameter `resume: True`
Also, note that
When we process videos, we sort videos which are more recent (based on the latest modified time). Hence videos would appear in descending order in wandb under the section media
## Sample video uploaded to wandb
![](recording/wandbvideos.png)
## Video Folder Names
flatland-1.1250.0.0.11475.video000000.1590223601.6230516
Currently video naming convention is
`flatland-{iteration}-{steps}.{percentage_completition_mean}.{episode_id}.{pid}.video000000.{last_modified_time}`
The first part comprising of `iteration,steps,percentage_completition_mean` are the env stats from when the first result occurs just immediately after the video was first created. Look at the section [TODO](#TODO) for more details regarding this choice.
The second part `{episode_id}.{pid}.video000000` is basically as per the convention used for video names in the openai.gym monitor. episode_id is the current worker's episode number, pid is the process ID for the process which saved the video recording. Often, the last frames are recorded as `*video000001.mp4` which we ignore and don't upload to wandb to reduce the number of videos uploaded to wandb.
We also finally save the last modified time as often there can be multiple versions of same video in different stages of recording.
# Evaluation
We can enable rendering only during evalaution and not in the normal training process. This will take the new config settings and videos will be saved. All videos will be saved in the `video_dir` or the default `flatland` folder. In the same wandb process we only resume videos and we don't overwrite any content in the same folder. Caution should be taken to manually delete the existing videos in the folder or specify a new `video_dir` folder in such cases.
# CAUTION
If we are running multiple wandb process, caution should be taken to avoid writing to the same folder. The recommended practice is to create a different `video_dir` for each run to avoid any conflicts and accidently deleting other run files.
# TODO
- Cannot match exact env details on which video was created and hence defaulting to the env details from when the video was first created. To help identify we must record the video file with the env iteration or/and steps etc.
- Using the env details when the video was created may be useful when recording video during evaluation
where we are more interested in the current training state
import multiprocessing
import numbers import numbers
from pprint import pprint
import wandb import wandb
from ray import tune from ray import tune
from datetime import datetime
import os, fnmatch import os, fnmatch
import shutil import shutil
...@@ -32,7 +33,7 @@ class WandbLogger(tune.logger.Logger): ...@@ -32,7 +33,7 @@ class WandbLogger(tune.logger.Logger):
def _init(self): def _init(self):
self._config = None self._config = None
wandb.init(**self.config.get("env_config", {}).get("wandb", {})) self.metrics_queue_dict = {}
self.reset_state() self.reset_state()
if self.config.get("env_config", {}).get("render", None): if self.config.get("env_config", {}).get("render", None):
''' '''
...@@ -55,30 +56,39 @@ class WandbLogger(tune.logger.Logger): ...@@ -55,30 +56,39 @@ class WandbLogger(tune.logger.Logger):
def reset_state(self): def reset_state(self):
# Holds list of uploaded/moved files so that we dont upload them again # Holds list of uploaded/moved files so that we dont upload them again
self._upload_files = [] self._upload_files = {}
# Holds information of env state and put them in an unique file name # Holds information of env state and put them in an unique file name
# and maps it to the original video file # and maps it to the original video file
self._file_map = {} self._file_map = {}
self._save_folder = None self._save_folder = None
def on_result(self, result): def on_result(self, result):
config = result.get("config") experiment_tag = result.get('experiment_tag', 'no_experiment_tag')
if config and self._config is None: experiment_id = result.get('experiment_id', 'no_experiment_id')
for k in config.keys(): if experiment_tag not in self.metrics_queue_dict:
if k != "callbacks": print("=" * 50)
if wandb.config.get(k) is None: print("Setting up new w&b logger")
wandb.config[k] = config[k] print("Experiment tag:", experiment_tag)
self._config = config print("Experiment id:", experiment_id)
config = result.get("config")
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=wandb_process, args=(queue, config,))
p.start()
self.metrics_queue_dict[experiment_tag] = queue
print("=" * 50)
queue = self.metrics_queue_dict[experiment_tag]
tmp = result.copy() tmp = result.copy()
for k in ["done", "config", "pid", "timestamp"]: for k in ["done", "config", "pid", "timestamp"]:
if k in tmp: if k in tmp:
del tmp[k] del tmp[k]
metrics = {} metrics = {}
for key, value in flatten_dict(tmp, delimiter="/").items(): for key, value in flatten_dict(tmp, delimiter="/").items():
if not isinstance(value, numbers.Number): if not isinstance(value, numbers.Number):
continue continue
metrics[key] = value metrics[key] = value
wandb.log(metrics)
if self.config.get("env_config", {}).get("render", None): if self.config.get("env_config", {}).get("render", None):
# uploading relevant videos to wandb # uploading relevant videos to wandb
...@@ -87,49 +97,83 @@ class WandbLogger(tune.logger.Logger): ...@@ -87,49 +97,83 @@ class WandbLogger(tune.logger.Logger):
self.clear_folders(resume= True) self.clear_folders(resume= True)
if self._save_folder: if self._save_folder:
iterations = result['training_iteration'] metrics = self.update_video_metrics(result,metrics)
steps = result['timesteps_total']
perc_comp_mean = result['custom_metrics'].get('percentage_complete_mean',0)*100 queue.put(metrics)
# We ignore *1.mp4 videos which just has the last frame
_video_file = f'*0.mp4' def update_video_metrics(self, result, metrics):
_found_videos = find(_video_file,self._save_folder) iterations = result['training_iteration']
_found_videos = list(set(_found_videos) - set(self._upload_files)) steps = result['timesteps_total']
# Sort by create time for uploading to wandb perc_comp_mean = result['custom_metrics'].get('percentage_complete_mean',0)*100
_found_videos.sort(key=os.path.getctime) # We ignore *1.mp4 videos which just has the last frame
for _found_video in _found_videos: _video_file = f'*0.mp4'
_splits = _found_video.split(os.sep) _found_videos = find(_video_file,self._save_folder)
_check_file = os.stat(_found_video) #_found_videos = list(set(_found_videos) - set(self._upload_files))
_video_file = _splits[-1] # Sort by create time for uploading to wandb
_file_split = _video_file.split('.') _found_videos.sort(key=os.path.getctime)
_video_file_name = _file_split[0] + "-" + str(iterations) for _found_video in _found_videos:
_original_name = ".".join(_file_split[2:-1]) _splits = _found_video.split(os.sep)
_video_file_name = ".".join([str(_video_file_name),str(steps),str(int(perc_comp_mean)),_original_name,str(_check_file.st_ctime)]) _check_file = os.stat(_found_video)
_key = _found_video # Use the video file path as key to identify the video_name _video_file = _splits[-1]
if not self._file_map.get(_key): _file_split = _video_file.split('.')
# Allocate steps, iteration, completion rate to the earliest case _video_file_name = _file_split[0] + "-" + str(iterations)
# when the video file was first created. Discard recent file names _original_name = ".".join(_file_split[2:-1])
# TODO: Cannot match exact env details on which video was created _video_file_name = ".".join([str(_video_file_name),str(steps),str(int(perc_comp_mean)),_original_name,str(_check_file.st_ctime)])
# and hence defaulting to the env details from when the video was first created _key = _found_video # Use the video file path as key to identify the video_name
# To help identify we must record the video file with the env iteration or/and steps etc. if not self._file_map.get(_key):
# Using the env details when the video was created may be useful when recording video during evaluation # Allocate steps, iteration, completion rate to the earliest case
# where we are more interested in the current training state # when the video file was first created. Discard recent file names
self._file_map[_key]=_video_file_name # TODO: Cannot match exact env details on which video was created
# and hence defaulting to the env details from when the video was first created
# We only move videos that have been flushed out. # To help identify we must record the video file with the env iteration or/and steps etc.
# This is done by checking against a threshold size of 1000 bytes # Using the env details when the video was created may be useful when recording video during evaluation
if _check_file.st_size > 1000: # where we are more interested in the current training state
_video_file_name = self._file_map.get(_key,"Unknown") self._file_map[_key]=_video_file_name
wandb.log({_video_file_name: wandb.Video(_found_video, format="mp4")})
try: # We only move videos that have been flushed out.
# Move upload videos and their meta data once done to the logdir # This is done by checking against a threshold size of 1000 bytes
src = _found_video # Also check if file has changed from last time
dst = os.path.join(self.logdir,_video_file) if _check_file.st_size > 1000 and _check_file.st_ctime > self._upload_files.get(_found_video, True):
shutil.move(src, dst) _video_file_name = self._file_map.get(_key,"Unknown")
shutil.move(src.replace("mp4","meta.json"), dst.replace("mp4","meta.json")) # wandb.log({_video_file_name: wandb.Video(_found_video, format="mp4")})
except OSError as e: metrics[_video_file_name] = wandb.Video(_found_video, format="mp4")
print ("Error: %s - %s." % (e.filename, e.strerror))
self._upload_files.append(_found_video) self._upload_files[_found_video] = _check_file.st_size
return metrics
def close(self): def close(self):
wandb.join() wandb.join()
all_uploaded_videos = self._upload_files.keys()
for _found_video in all_uploaded_videos:
try:
# Copy upload videos and their meta data once done to the logdir
src = _found_video
_video_file = _found_video.split(os.sep)[-1]
dst = os.path.join(self.logdir,_video_file)
shutil.copy2(src, dst)
shutil.copy2(src.replace("mp4","meta.json"), dst.replace("mp4","meta.json"))
except OSError as e:
print ("Error: %s - %s." % (e.filename, e.strerror))
self.reset_state() self.reset_state()
# each logger has to run in a separate process
def wandb_process(queue, config):
run = wandb.init(reinit=True, **config.get("env_config", {}).get("wandb", {}))
if config:
for k in config.keys():
if k != "callbacks":
if wandb.config.get(k) is None:
wandb.config[k] = config[k]
if 'yaml_config' in config['env_config']:
yaml_config = config['env_config']['yaml_config']
print("Saving full experiment config:", yaml_config)
wandb.save(yaml_config)
while True:
metrics = queue.get()
run.log(metrics)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment