| |
|
|
|
|
| import collections.abc |
| import math |
| import warnings |
| from itertools import repeat |
| from typing import List, Optional, Tuple |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
|
|
| try: |
| from torch import _assert |
| except ImportError: |
|
|
| def _assert(condition: bool, message: str): |
| assert condition, message |
|
|
|
|
| def drop_block_2d( |
| x, |
| drop_prob: float = 0.1, |
| block_size: int = 7, |
| gamma_scale: float = 1.0, |
| with_noise: bool = False, |
| inplace: bool = False, |
| batchwise: bool = False, |
| ): |
| """DropBlock. See https://arxiv.org/pdf/1810.12890.pdf |
| |
| DropBlock with an experimental gaussian noise option. This layer has been tested on a few training |
| runs with success, but needs further validation and possibly optimization for lower runtime impact. |
| """ |
| b, c, h, w = x.shape |
| total_size = w * h |
| clipped_block_size = min(block_size, min(w, h)) |
| |
| gamma = gamma_scale * drop_prob * total_size / clipped_block_size**2 / ((w - block_size + 1) * (h - block_size + 1)) |
|
|
| |
| w_i, h_i = torch.meshgrid(torch.arange(w).to(x.device), torch.arange(h).to(x.device)) |
| valid_block = ((w_i >= clipped_block_size // 2) & (w_i < w - (clipped_block_size - 1) // 2)) & ( |
| (h_i >= clipped_block_size // 2) & (h_i < h - (clipped_block_size - 1) // 2) |
| ) |
| valid_block = torch.reshape(valid_block, (1, 1, h, w)).to(dtype=x.dtype) |
|
|
| if batchwise: |
| |
| uniform_noise = torch.rand((1, c, h, w), dtype=x.dtype, device=x.device) |
| else: |
| uniform_noise = torch.rand_like(x) |
| block_mask = ((2 - gamma - valid_block + uniform_noise) >= 1).to(dtype=x.dtype) |
| block_mask = -F.max_pool2d( |
| -block_mask, kernel_size=clipped_block_size, stride=1, padding=clipped_block_size // 2 |
| ) |
|
|
| if with_noise: |
| normal_noise = torch.randn((1, c, h, w), dtype=x.dtype, device=x.device) if batchwise else torch.randn_like(x) |
| if inplace: |
| x.mul_(block_mask).add_(normal_noise * (1 - block_mask)) |
| else: |
| x = x * block_mask + normal_noise * (1 - block_mask) |
| else: |
| normalize_scale = (block_mask.numel() / block_mask.to(dtype=torch.float32).sum().add(1e-7)).to(x.dtype) |
| if inplace: |
| x.mul_(block_mask * normalize_scale) |
| else: |
| x = x * block_mask * normalize_scale |
| return x |
|
|
|
|
| def drop_block_fast_2d( |
| x: torch.Tensor, |
| drop_prob: float = 0.1, |
| block_size: int = 7, |
| gamma_scale: float = 1.0, |
| with_noise: bool = False, |
| inplace: bool = False, |
| ): |
| """DropBlock. See https://arxiv.org/pdf/1810.12890.pdf |
| |
| DropBlock with an experimental gaussian noise option. Simplied from above without concern for valid |
| block mask at edges. |
| """ |
| b, c, h, w = x.shape |
| total_size = w * h |
| clipped_block_size = min(block_size, min(w, h)) |
| gamma = gamma_scale * drop_prob * total_size / clipped_block_size**2 / ((w - block_size + 1) * (h - block_size + 1)) |
|
|
| block_mask = torch.empty_like(x).bernoulli_(gamma) |
| block_mask = F.max_pool2d( |
| block_mask.to(x.dtype), kernel_size=clipped_block_size, stride=1, padding=clipped_block_size // 2 |
| ) |
|
|
| if with_noise: |
| normal_noise = torch.empty_like(x).normal_() |
| if inplace: |
| x.mul_(1.0 - block_mask).add_(normal_noise * block_mask) |
| else: |
| x = x * (1.0 - block_mask) + normal_noise * block_mask |
| else: |
| block_mask = 1 - block_mask |
| normalize_scale = (block_mask.numel() / block_mask.to(dtype=torch.float32).sum().add(1e-6)).to(dtype=x.dtype) |
| if inplace: |
| x.mul_(block_mask * normalize_scale) |
| else: |
| x = x * block_mask * normalize_scale |
| return x |
|
|
|
|
| class DropBlock2d(nn.Module): |
| """DropBlock. See https://arxiv.org/pdf/1810.12890.pdf""" |
|
|
| def __init__( |
| self, drop_prob=0.1, block_size=7, gamma_scale=1.0, with_noise=False, inplace=False, batchwise=False, fast=True |
| ): |
| super(DropBlock2d, self).__init__() |
| self.drop_prob = drop_prob |
| self.gamma_scale = gamma_scale |
| self.block_size = block_size |
| self.with_noise = with_noise |
| self.inplace = inplace |
| self.batchwise = batchwise |
| self.fast = fast |
|
|
| def forward(self, x): |
| if not self.training or not self.drop_prob: |
| return x |
| if self.fast: |
| return drop_block_fast_2d( |
| x, self.drop_prob, self.block_size, self.gamma_scale, self.with_noise, self.inplace |
| ) |
| else: |
| return drop_block_2d( |
| x, self.drop_prob, self.block_size, self.gamma_scale, self.with_noise, self.inplace, self.batchwise |
| ) |
|
|
|
|
| def drop_path(x, drop_prob: float = 0.0, training: bool = False, scale_by_keep: bool = True): |
| """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). |
| |
| This is the same as the DropConnect impl I created for EfficientNet, etc networks, however, |
| the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... |
| See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for |
| changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use |
| 'survival rate' as the argument. |
| |
| """ |
| if drop_prob == 0.0 or not training: |
| return x |
| keep_prob = 1 - drop_prob |
| shape = (x.shape[0],) + (1,) * (x.ndim - 1) |
| random_tensor = x.new_empty(shape).bernoulli_(keep_prob) |
| if keep_prob > 0.0 and scale_by_keep: |
| random_tensor.div_(keep_prob) |
| return x * random_tensor |
|
|
|
|
| class DropPath(nn.Module): |
| """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).""" |
|
|
| def __init__(self, drop_prob=None, scale_by_keep=True): |
| super(DropPath, self).__init__() |
| self.drop_prob = drop_prob |
| self.scale_by_keep = scale_by_keep |
|
|
| def forward(self, x): |
| return drop_path(x, self.drop_prob, self.training, self.scale_by_keep) |
|
|
|
|
| def create_conv3d(in_channels, out_channels, kernel_size, **kwargs): |
| """Select a 2d convolution implementation based on arguments |
| Creates and returns one of torch.nn.Conv2d, Conv2dSame, MixedConv3d, or CondConv2d. |
| |
| Used extensively by EfficientNet, MobileNetv3 and related networks. |
| """ |
|
|
| depthwise = kwargs.pop("depthwise", False) |
| |
| groups = in_channels if depthwise else kwargs.pop("groups", 1) |
|
|
| m = create_conv3d_pad(in_channels, out_channels, kernel_size, groups=groups, **kwargs) |
| return m |
|
|
|
|
| def conv3d_same( |
| x, |
| weight: torch.Tensor, |
| bias: Optional[torch.Tensor] = None, |
| stride: Tuple[int, int] = (1, 1, 1), |
| padding: Tuple[int, int] = (0, 0, 0), |
| dilation: Tuple[int, int] = (1, 1, 1), |
| groups: int = 1, |
| ): |
| x = pad_same(x, weight.shape[-3:], stride, dilation) |
| return F.conv3d(x, weight, bias, stride, (0, 0, 0), dilation, groups) |
|
|
|
|
| class Conv3dSame(nn.Conv2d): |
| """Tensorflow like 'SAME' convolution wrapper for 2D convolutions""" |
|
|
| def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True): |
| super(Conv3dSame, self).__init__(in_channels, out_channels, kernel_size, stride, 0, dilation, groups, bias) |
|
|
| def forward(self, x): |
| return conv3d_same(x, self.weight, self.bias, self.stride, self.padding, self.dilation, self.groups) |
|
|
|
|
| def create_conv3d_pad(in_chs, out_chs, kernel_size, **kwargs): |
| padding = kwargs.pop("padding", "") |
| kwargs.setdefault("bias", False) |
| padding, is_dynamic = get_padding_value(padding, kernel_size, **kwargs) |
| if is_dynamic: |
| return Conv3dSame(in_chs, out_chs, kernel_size, **kwargs) |
| else: |
| return nn.Conv3d(in_chs, out_chs, kernel_size, padding=padding, **kwargs) |
|
|
|
|
| |
| def get_padding(kernel_size: int, stride: int = 1, dilation: int = 1, **_) -> int: |
| padding = ((stride - 1) + dilation * (kernel_size - 1)) // 2 |
| return padding |
|
|
|
|
| |
| def get_same_padding(x: int, k: int, s: int, d: int): |
| return max((math.ceil(x / s) - 1) * s + (k - 1) * d + 1 - x, 0) |
|
|
|
|
| |
| def is_static_pad(kernel_size: int, stride: int = 1, dilation: int = 1, **_): |
| return stride == 1 and (dilation * (kernel_size - 1)) % 2 == 0 |
|
|
|
|
| |
| def pad_same(x, k: List[int], s: List[int], d: List[int] = (1, 1, 1), value: float = 0): |
| id, ih, iw = x.size()[-3:] |
| pad_d, pad_h, pad_w = ( |
| get_same_padding(id, k[0], s[0], d[0]), |
| get_same_padding(ih, k[1], s[1], d[1]), |
| get_same_padding(iw, k[2], s[2], d[2]), |
| ) |
| if pad_d > 0 or pad_h > 0 or pad_w > 0: |
| x = F.pad( |
| x, |
| [pad_d // 2, pad_d - pad_d // 2, pad_w // 2, pad_w - pad_w // 2, pad_h // 2, pad_h - pad_h // 2], |
| value=value, |
| ) |
| return x |
|
|
|
|
| def get_padding_value(padding, kernel_size, **kwargs) -> Tuple[Tuple, bool]: |
| dynamic = False |
| if isinstance(padding, str): |
| |
| padding = padding.lower() |
| if padding == "same": |
| |
| if is_static_pad(kernel_size, **kwargs): |
| |
| padding = get_padding(kernel_size, **kwargs) |
| else: |
| |
| padding = 0 |
| dynamic = True |
| elif padding == "valid": |
| |
| padding = 0 |
| else: |
| |
| padding = get_padding(kernel_size, **kwargs) |
| return padding, dynamic |
|
|
|
|
| |
| def _ntuple(n): |
| def parse(x): |
| if isinstance(x, collections.abc.Iterable): |
| return x |
| return tuple(repeat(x, n)) |
|
|
| return parse |
|
|
|
|
| to_1tuple = _ntuple(1) |
| to_2tuple = _ntuple(2) |
| to_3tuple = _ntuple(3) |
| to_4tuple = _ntuple(4) |
| to_ntuple = _ntuple |
|
|
|
|
| def make_divisible(v, divisor=8, min_value=None, round_limit=0.9): |
| min_value = min_value or divisor |
| new_v = max(min_value, int(v + divisor / 2) // divisor * divisor) |
| |
| if new_v < round_limit * v: |
| new_v += divisor |
| return new_v |
|
|
|
|
| class Linear(nn.Linear): |
| r"""Applies a linear transformation to the incoming data: :math:`y = xA^T + b` |
| |
| Wraps torch.nn.Linear to support AMP + torchscript usage by manually casting |
| weight & bias to input.dtype to work around an issue w/ torch.addmm in this use case. |
| """ |
|
|
| def forward(self, input: torch.Tensor) -> torch.Tensor: |
| if torch.jit.is_scripting(): |
| bias = self.bias.to(dtype=input.dtype) if self.bias is not None else None |
| return F.linear(input, self.weight.to(dtype=input.dtype), bias=bias) |
| else: |
| return F.linear(input, self.weight, self.bias) |
|
|
|
|
| class Mlp(nn.Module): |
| """MLP as used in Vision Transformer, MLP-Mixer and related networks""" |
|
|
| def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.0): |
| super().__init__() |
| out_features = out_features or in_features |
| hidden_features = hidden_features or in_features |
| drop_probs = to_2tuple(drop) |
|
|
| self.fc1 = nn.Linear(in_features, hidden_features) |
| self.act = act_layer() |
| self.drop1 = nn.Dropout(drop_probs[0]) |
| self.fc2 = nn.Linear(hidden_features, out_features) |
| self.drop2 = nn.Dropout(drop_probs[1]) |
|
|
| def forward(self, x): |
| x = self.fc1(x) |
| x = self.act(x) |
| x = self.drop1(x) |
| x = self.fc2(x) |
| x = self.drop2(x) |
| return x |
|
|
|
|
| def avg_pool3d_same( |
| x, |
| kernel_size: List[int], |
| stride: List[int], |
| padding: List[int] = (0, 0, 0), |
| ceil_mode: bool = False, |
| count_include_pad: bool = True, |
| ): |
| |
| x = pad_same(x, kernel_size, stride) |
| return F.avg_pool3d(x, kernel_size, stride, (0, 0, 0), ceil_mode, count_include_pad) |
|
|
|
|
| class AvgPool3dSame(nn.AvgPool2d): |
| """Tensorflow like 'SAME' wrapper for 2D average pooling""" |
|
|
| def __init__(self, kernel_size: int, stride=None, padding=0, ceil_mode=False, count_include_pad=True): |
| kernel_size = to_2tuple(kernel_size) |
| stride = to_2tuple(stride) |
| super(AvgPool3dSame, self).__init__(kernel_size, stride, (0, 0, 0), ceil_mode, count_include_pad) |
|
|
| def forward(self, x): |
| x = pad_same(x, self.kernel_size, self.stride) |
| return F.avg_pool3d(x, self.kernel_size, self.stride, self.padding, self.ceil_mode, self.count_include_pad) |
|
|
|
|
| def max_pool3d_same( |
| x, |
| kernel_size: List[int], |
| stride: List[int], |
| padding: List[int] = (0, 0, 0), |
| dilation: List[int] = (1, 1, 1), |
| ceil_mode: bool = False, |
| ): |
| x = pad_same(x, kernel_size, stride, value=-float("inf")) |
| return F.max_pool3d(x, kernel_size, stride, (0, 0, 0), dilation, ceil_mode) |
|
|
|
|
| class MaxPool3dSame(nn.MaxPool2d): |
| """Tensorflow like 'SAME' wrapper for 3D max pooling""" |
|
|
| def __init__(self, kernel_size: int, stride=None, padding=0, dilation=1, ceil_mode=False): |
| kernel_size = to_2tuple(kernel_size) |
| stride = to_2tuple(stride) |
| dilation = to_2tuple(dilation) |
| super(MaxPool3dSame, self).__init__(kernel_size, stride, (0, 0, 0), dilation, ceil_mode) |
|
|
| def forward(self, x): |
| x = pad_same(x, self.kernel_size, self.stride, value=-float("inf")) |
| return F.max_pool3d(x, self.kernel_size, self.stride, (0, 0, 0), self.dilation, self.ceil_mode) |
|
|
|
|
| def create_pool3d(pool_type, kernel_size, stride=None, **kwargs): |
| stride = stride or kernel_size |
| padding = kwargs.pop("padding", "") |
| padding, is_dynamic = get_padding_value(padding, kernel_size, stride=stride, **kwargs) |
| if is_dynamic: |
| if pool_type == "avg": |
| return AvgPool3dSame(kernel_size, stride=stride, **kwargs) |
| elif pool_type == "max": |
| return MaxPool3dSame(kernel_size, stride=stride, **kwargs) |
| else: |
| raise AssertionError() |
|
|
| |
| else: |
| if pool_type == "avg": |
| return nn.AvgPool3d(kernel_size, stride=stride, padding=padding, **kwargs) |
| elif pool_type == "max": |
| return nn.MaxPool3d(kernel_size, stride=stride, padding=padding, **kwargs) |
| else: |
| raise AssertionError() |
|
|
| |
|
|
|
|
| def _float_to_int(x: float) -> int: |
| """ |
| Symbolic tracing helper to substitute for inbuilt `int`. |
| Hint: Inbuilt `int` can't accept an argument of type `Proxy` |
| """ |
| return int(x) |
|
|
|
|
| def _no_grad_trunc_normal_(tensor, mean, std, a, b): |
| |
| |
| def norm_cdf(x): |
| |
| return (1.0 + math.erf(x / math.sqrt(2.0))) / 2.0 |
|
|
| if (mean < a - 2 * std) or (mean > b + 2 * std): |
| warnings.warn( |
| "mean is more than 2 std from [a, b] in nn.init.trunc_normal_. " |
| "The distribution of values may be incorrect.", |
| stacklevel=2, |
| ) |
|
|
| with torch.no_grad(): |
| |
| |
| |
| l = norm_cdf((a - mean) / std) |
| u = norm_cdf((b - mean) / std) |
|
|
| |
| |
| tensor.uniform_(2 * l - 1, 2 * u - 1) |
|
|
| |
| |
| tensor.erfinv_() |
|
|
| |
| tensor.mul_(std * math.sqrt(2.0)) |
| tensor.add_(mean) |
|
|
| |
| tensor.clamp_(min=a, max=b) |
| return tensor |
|
|
|
|
| def trunc_normal_(tensor, mean=0.0, std=1.0, a=-2.0, b=2.0): |
| r"""Fills the input Tensor with values drawn from a truncated |
| normal distribution. The values are effectively drawn from the |
| normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)` |
| with values outside :math:`[a, b]` redrawn until they are within |
| the bounds. The method used for generating the random values works |
| best when :math:`a \leq \text{mean} \leq b`. |
| Args: |
| tensor: an n-dimensional `torch.Tensor` |
| mean: the mean of the normal distribution |
| std: the standard deviation of the normal distribution |
| a: the minimum cutoff value |
| b: the maximum cutoff value |
| Examples: |
| >>> w = torch.empty(3, 5) |
| >>> nn.init.trunc_normal_(w) |
| """ |
| return _no_grad_trunc_normal_(tensor, mean, std, a, b) |
|
|