Source code for rofunc.learning.RofuncRL.models.actor_models

# Copyright 2023, Junjia LIU, jjliu@mae.cuhk.edu.hk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union, Tuple, Optional, List

import gym
import gymnasium
import torch
import torch.nn as nn
import torch.nn.functional as F
from omegaconf import DictConfig
from torch import Tensor
from torch.distributions import Beta, Normal

from rofunc.learning.RofuncRL.models.utils import build_mlp, init_layers, activation_func, get_space_dim
from rofunc.learning.RofuncRL.state_encoders.base_encoders import EmptyEncoder


[docs]class BaseActor(nn.Module): def __init__(self, cfg: DictConfig, observation_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space, List]], action_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space]], state_encoder: Optional[nn.Module] = EmptyEncoder()): super().__init__() self.cfg = cfg self.action_dim = get_space_dim(action_space) self.mlp_hidden_dims = cfg.actor.mlp_hidden_dims self.mlp_activation = activation_func(cfg.actor.mlp_activation) # state encoder self.state_encoder = state_encoder if isinstance(self.state_encoder, EmptyEncoder): self.state_dim = get_space_dim(observation_space) else: self.state_dim = self.state_encoder.output_dim self.backbone_net = None # build_mlp(dims=[state_dim, *dims, action_dim]) self.state_avg = nn.Parameter(torch.zeros((self.state_dim,)), requires_grad=False) self.state_std = nn.Parameter(torch.ones((self.state_dim,)), requires_grad=False)
[docs] def state_norm(self, state: Tensor) -> Tensor: return (state - self.state_avg) / self.state_std
[docs] def freeze_parameters(self, freeze: bool = True) -> None: """ Freeze or unfreeze internal parameters :param freeze: freeze (True) or unfreeze (False) """ for parameters in self.parameters(): parameters.requires_grad = not freeze
[docs] def update_parameters(self, model: torch.nn.Module, polyak: float = 1) -> None: """ Update internal parameters by hard or soft (polyak averaging) update - Hard update: :math:`\\theta = \\theta_{net}` - Soft (polyak averaging) update: :math:`\\theta = (1 - \\rho) \\theta + \\rho \\theta_{net}` :param model: Model used to update the internal parameters :param polyak: Polyak hyperparameter between 0 and 1 (default: ``1``). A hard update is performed when its value is 1 """ with torch.no_grad(): # hard update if polyak == 1: for parameters, model_parameters in zip(self.parameters(), model.parameters()): parameters.data.copy_(model_parameters.data) # soft update (use in-place operations to avoid creating new parameters) else: for parameters, model_parameters in zip(self.parameters(), model.parameters()): parameters.data.mul_(1 - polyak) parameters.data.add_(polyak * model_parameters.data)
[docs]class ActorPPO_Beta(BaseActor): def __init__(self, cfg: DictConfig, observation_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space]], action_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space]], state_encoder: Optional[nn.Module] = EmptyEncoder()): super().__init__(cfg, observation_space, action_space, state_encoder) # Build mlp network except the output layer self.backbone_net = build_mlp(dims=[self.state_dim, *self.mlp_hidden_dims], hidden_activation=self.mlp_activation) self.alpha_layer = nn.Linear(self.mlp_hidden_dims[-1], self.action_dim) self.beta_layer = nn.Linear(self.mlp_hidden_dims[-1], self.action_dim) if self.cfg.Model.use_init: init_layers(self.backbone_net, gain=1) init_layers([self.alpha_layer, self.beta_layer], gain=0.01)
[docs] def forward(self, state: Tensor): state = self.state_encoder(state) state = self.backbone_net(state) # alpha and beta need to be larger than 1,so we use 'softplus' as the activation function and then plus 1 alpha = F.softplus(self.alpha_layer(state)) + 1.0 beta = F.softplus(self.beta_layer(state)) + 1.0 return alpha, beta
[docs] def get_dist(self, state): alpha, beta = self.forward(state) dist = Beta(alpha, beta) return dist
[docs] def mean(self, state): alpha, beta = self.forward(state) mean = alpha / (alpha + beta) # The mean of the beta distribution return mean
[docs]class ActorPPO_Gaussian(BaseActor): def __init__(self, cfg: DictConfig, observation_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space]], action_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space]], state_encoder: Optional[nn.Module] = EmptyEncoder()): """ Gaussian policy network for PPO ActorPPO_Gaussian( (mlp_activation): ELU(alpha=1.0) (backbone_net): Sequential( (0): Linear(in_features=self.state_dim, out_features=self.mlp_hidden_dims[0], bias=True) (1): hidden_activation (2): Linear(in_features=self.mlp_hidden_dims[0], out_features=self.mlp_hidden_dims[1], bias=True) (3): hidden_activation ... (4): Linear(in_features=self.mlp_hidden_dims[-2], out_features=self.mlp_hidden_dims[-1], bias=True) (5): hidden_activation ) (mean_layer): Linear(in_features=self.mlp_hidden_dims[-1], out_features=self.action_dim, bias=True) (value_layer): Linear(in_features=self.mlp_hidden_dims[-1], out_features=self.action_dim, bias=True) ) :param cfg: model config :param observation_space: :param action_space: :param state_encoder: """ super().__init__(cfg, observation_space, action_space, state_encoder) # Build mlp network except the output layer self.backbone_net = build_mlp(dims=[self.state_dim, *self.mlp_hidden_dims], hidden_activation=self.mlp_activation) self.mean_layer = nn.Linear(self.mlp_hidden_dims[-1], self.action_dim) # Use 'nn.Parameter' to train log_std automatically self.log_std = nn.Parameter(torch.zeros(self.action_dim)) self.value_layer = nn.Linear(self.mlp_hidden_dims[-1], 1) self.dist = None if self.cfg.use_init: init_layers(self.backbone_net, gain=1) init_layers(self.mean_layer, gain=0.01)
[docs] def forward(self, state, action=None, deterministic=False): state = self.state_encoder(state) state = self.backbone_net(state) if self.cfg.use_action_out_tanh: output_action = self.cfg.action_scale * torch.tanh(self.mean_layer(state)) # [-action_scale, action_scale] else: output_action = self.cfg.action_scale * self.mean_layer(state) log_prob = torch.zeros(output_action.shape[0], 1, device=output_action.device) if not deterministic: log_std = self.log_std if self.cfg.use_log_std_clip: log_std = torch.clamp(log_std, self.cfg.log_std_clip_min, self.cfg.log_std_clip_max) self.dist = Normal(output_action, log_std.exp()) # Get the Gaussian distribution # sample using the re-parameterization trick if action is None: action = self.dist.rsample() if self.cfg.use_action_clip: action = torch.clamp(action, -self.cfg.action_clip, self.cfg.action_clip) # [-max,max] log_prob = self.dist.log_prob(action).sum(dim=-1, keepdim=True) output_action = action return output_action, log_prob
[docs] def get_entropy(self): return self.dist.entropy()
[docs] def get_value(self, state): state = self.state_encoder(state) state = self.backbone_net(state) value = self.value_layer(state) return value
[docs]class ActorSAC(BaseActor): def __init__(self, cfg: DictConfig, observation_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space]], action_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space]], state_encoder: Optional[nn.Module] = EmptyEncoder()): super().__init__(cfg, observation_space, action_space, state_encoder) # Build mlp network except the output layer self.backbone_net = build_mlp(dims=[self.state_dim, *self.mlp_hidden_dims], hidden_activation=self.mlp_activation) self.mean_layer = nn.Linear(self.mlp_hidden_dims[-1], self.action_dim) self.log_std = nn.Parameter(torch.zeros(self.action_dim)) self.dist = None if self.cfg.use_init: init_layers(self.backbone_net, gain=1) init_layers(self.mean_layer, gain=0.01)
[docs] def forward(self, state, action=None): state = self.state_encoder(state) state = self.backbone_net(state) mean_action = self.cfg.action_scale * torch.tanh(self.mean_layer(state)) # [-1,1] log_std = self.log_std if self.cfg.use_log_std_clip: log_std = torch.clamp(log_std, self.cfg.log_std_clip_min, self.cfg.log_std_clip_max) self.dist = Normal(mean_action, log_std.exp()) # Get the Gaussian distribution # sample using the re-parameterization trick if action is None: action = self.dist.rsample() if self.cfg.use_action_clip: action = torch.clamp(action, -self.cfg.action_clip, self.cfg.action_clip) # [-max,max] log_prob = self.dist.log_prob(action).sum(dim=-1, keepdim=True) return action, log_prob
[docs]class ActorTD3(ActorSAC): def __init__(self, cfg: DictConfig, observation_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space]], action_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space]], state_encoder: Optional[nn.Module] = EmptyEncoder()): super().__init__(cfg, observation_space, action_space, state_encoder)
[docs] def forward(self, state): state = self.state_encoder(state) state = self.backbone_net(state) mean_action = torch.tanh(self.mean_layer(state)) return mean_action, None
[docs]class ActorAMP(ActorPPO_Gaussian): def __init__(self, cfg: DictConfig, observation_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space]], action_space: Optional[Union[int, Tuple[int], gym.Space, gymnasium.Space]], state_encoder: Optional[nn.Module] = EmptyEncoder()): super().__init__(cfg, observation_space, action_space, state_encoder) self.log_std = nn.Parameter(torch.full((self.action_dim,), fill_value=-2.9), requires_grad=False)
if __name__ == '__main__': from omegaconf import DictConfig cfg = DictConfig({'use_init': True, 'action_scale': 1.0, 'use_log_std_clip': True, 'log_std_clip_min': -20, 'log_std_clip_max': 2, 'use_action_clip': True, 'action_clip': 1.0, 'actor': {'type': "Gaussian", 'mlp_hidden_dims': [512, 256, 128], 'mlp_activation': "elu", 'use_lstm': False, 'lstm_cell_size': 256, 'lstm_use_prev_action': False, 'max_seq_len': 20}}) model = ActorPPO_Gaussian(cfg=cfg, observation_space=3, action_space=1) print(model)