custom_preprocessors.py 3.27 KiB
import numpy as np
from ray.rllib.models.preprocessors import Preprocessor
def max_lt(seq, val):
"""
Return greatest item in seq for which item < val applies.
None is returned if seq was empty or all items in seq were >= val.
"""
max = 0
idx = len(seq) - 1
while idx >= 0:
if seq[idx] < val and seq[idx] >= 0 and seq[idx] > max:
max = seq[idx]
idx -= 1
return max
def min_lt(seq, val):
"""
Return smallest item in seq for which item > val applies.
None is returned if seq was empty or all items in seq were >= val.
"""
min = np.inf
idx = len(seq) - 1
while idx >= 0:
if seq[idx] >= val and seq[idx] < min:
min = seq[idx]
idx -= 1
return min
def norm_obs_clip(obs, clip_min=-1, clip_max=1):
"""
This function returns the difference between min and max value of an observation
:param obs: Observation that should be normalized
:param clip_min: min value where observation will be clipped
:param clip_max: max value where observation will be clipped
:return: returnes normalized and clipped observatoin
"""
max_obs = max(1, max_lt(obs, 1000))
min_obs = min(max_obs, min_lt(obs, 0))
if max_obs == min_obs:
return np.clip(np.array(obs) / max_obs, clip_min, clip_max)
norm = np.abs(max_obs - min_obs)
if norm == 0:
norm = 1.
return np.clip((np.array(obs) - min_obs) / norm, clip_min, clip_max)
class CustomPreprocessor(Preprocessor):
def _init_shape(self, obs_space, options):
return sum([space.shape[0] for space in obs_space]),
# return (sum([space.shape[0] for space in obs_space]), )
# return ((sum([space.shape[0] for space in obs_space[:2]]) + obs_space[2].shape[0] * obs_space[2].shape[1]),)
def transform(self, observation):
print('OBSSSSSSSSSSSSSSSSSs', observation, observation.shape)
data = norm_obs_clip(observation[0])
distance = norm_obs_clip(observation[1])
agent_data = np.clip(observation[2], -1, 1)
return np.concatenate((np.concatenate((data, distance)), agent_data))
return norm_obs_clip(observation)
return np.concatenate([norm_obs_clip(observation[0]), norm_obs_clip(observation[1])])
# if len(observation) == 111:
# return np.concatenate([norm_obs_clip(obs) for obs in observation])
# print('OBSERVATION:', observation, len(observation[0]))
return np.concatenate([norm_obs_clip(observation[0]), observation[1], observation[
2].flatten()]) #, norm_obs_clip(observation[1]), observation[2], observation[3].flatten()])
#one_hot = observation[-3:]
#return np.append(obs, one_hot)
# else:
# return observation
class ConvModelPreprocessor(Preprocessor):
def _init_shape(self, obs_space, options):
out_shape = (obs_space[0].shape[0], obs_space[0].shape[1], sum([space.shape[2] for space in obs_space]))
return out_shape
def transform(self, observation):
return np.concatenate([observation[0],
observation[1],
observation[2]], axis=2)
# class NoPreprocessor:
# def _init_shape(self, obs_space, options):
# num_features = 0
# for space in obs_space: