Source code for rofunc.learning.RofuncRL.models.utils

# Copyright 2023, Junjia LIU, jjliu@mae.cuhk.edu.hk
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union, List

import gym
import gymnasium
import torch.nn as nn


[docs]def build_mlp(dims: [int], hidden_activation: nn = nn.ReLU, output_activation: nn = None) -> nn.Sequential: """ Build multi-Layer perceptron :param dims: layer dimensions, including input and output layers :param hidden_activation: activation function for hidden layers :param output_activation: activation function for output layer :return: """ layers = [] for i in range(len(dims) - 1): layers.append(nn.Linear(dims[i], dims[i + 1])) layers.append(hidden_activation) if output_activation is not None: layers.append(output_activation) return nn.Sequential(*layers)
[docs]def build_cnn(dims: [int], kernel_size: Union[int, tuple, List], stride: Union[int, tuple, List] = 1, padding: Union[int, tuple, List] = 0, dilation: Union[int, tuple, List] = 1, hidden_activation: nn = nn.ReLU, output_activation: nn = None, pooling=None, pooling_args: dict = None) -> nn.Sequential: """ Build convolutional neural network :param dims: layer dimensions, including input and output layers :param kernel_size: kernel size for convolutional layers :param stride: stride controls the stride for the cross-correlation :param padding: padding controls the amount of padding applied to the input :param dilation: dilation controls the spacing between the kernel points :param hidden_activation: activation function for hidden layers :param output_activation: activation function for output layer :param pooling: pooling layer type, ['max', 'avg', 'none'] :param pooling_args: pooling layer arguments The parameters kernel_size, stride, padding, dilation can either be: - a single int: in which case the same value is used for the height and width dimension - a tuple of two ints: in which case, the first int is used for the height dimension, and the second int for the width dimension :return: """ if isinstance(kernel_size, int) or isinstance(kernel_size, tuple): kernel_size = [kernel_size] * (len(dims) - 1) if isinstance(stride, int) or isinstance(stride, tuple): stride = [stride] * (len(dims) - 1) if isinstance(padding, int) or isinstance(padding, tuple): padding = [padding] * (len(dims) - 1) if isinstance(dilation, int) or isinstance(dilation, tuple): dilation = [dilation] * (len(dims) - 1) if pooling is not None: if isinstance(pooling_args['cnn_pooling_kernel_size'], int) or isinstance(kernel_size, tuple): pooling_kernel_size = [pooling_args['cnn_pooling_kernel_size']] * (len(dims) - 1) if isinstance(pooling_args['cnn_pooling_stride'], int) or isinstance(stride, tuple): pooling_stride = [pooling_args['cnn_pooling_stride']] * (len(dims) - 1) if isinstance(pooling_args['cnn_pooling_padding'], int) or isinstance(padding, tuple): pooling_padding = [pooling_args['cnn_pooling_padding']] * (len(dims) - 1) if isinstance(pooling_args['cnn_pooling_dilation'], int) or isinstance(dilation, tuple): pooling_dilation = [pooling_args['cnn_pooling_dilation']] * (len(dims) - 1) if pooling == 'max': pooling_method = nn.MaxPool2d elif pooling == 'avg': pooling_method = nn.AvgPool2d layers = [] for i in range(len(dims) - 1): layers.append( nn.Conv2d(dims[i], dims[i + 1], kernel_size=kernel_size[i], stride=stride[i], padding=padding[i], dilation=dilation[i])) layers.append(hidden_activation) if pooling is not None: layers.append( pooling_method(kernel_size=pooling_kernel_size[i], stride=pooling_stride[i], padding=pooling_padding[i], dilation=pooling_dilation[i])) if output_activation is not None: layers.append(output_activation) return nn.Sequential(*layers)
[docs]def activation_func(activation: str) -> nn: if activation == 'relu': return nn.ReLU() elif activation == 'tanh': return nn.Tanh() elif activation == 'sigmoid': return nn.Sigmoid() elif activation == 'elu': return nn.ELU() else: raise NotImplementedError
[docs]def init_layers(layers, gain=1.0, bias_const=1e-6, init_type='orthogonal'): # For more information about the initialization, please refer to https://pytorch.org/docs/stable/nn.init.html init_type_map = { "orthogonal": nn.init.orthogonal_, "xavier_normal": nn.init.xavier_normal_, "xavier_uniform": nn.init.xavier_uniform_, "kaiming_normal": nn.init.kaiming_normal_, "kaiming_uniform": nn.init.kaiming_uniform_, "normal": nn.init.normal_, } torch_init = init_type_map[init_type] for layer in layers.children(): if isinstance(layer, nn.Linear): torch_init(layer.weight, gain=gain) elif isinstance(layer, nn.Conv2d): assert init_type in ['kaiming_normal', 'kaiming_uniform', 'xavier_uniform'] torch_init(layer.weight) elif isinstance(layer, nn.LayerNorm): nn.init.constant_(layer.weight, 1) elif isinstance(layer, nn.BatchNorm2d): nn.init.constant_(layer.weight, 1) if hasattr(layer, 'bias'): nn.init.constant_(layer.bias, bias_const)
[docs]def get_space_dim(space): if isinstance(space, int): dim = space elif isinstance(space, tuple) or isinstance(space, list): dim = 0 for i in range(len(space)): dim += get_space_dim(space[i]) elif isinstance(space, gym.Space) or isinstance(space, gymnasium.Space): dim = space.shape if isinstance(dim, tuple) and len(dim) == 1: dim = dim[0] else: raise NotImplementedError return dim