import os import torch from torch import nn from torch.nn import functional as F from torch.autograd import Function from torch.utils.cpp_extension import load module_path = os.path.dirname(__file__) cuda_available = torch.cuda.is_available() if cuda_available: fused = load( "fused", sources=[ os.path.join(module_path, "fused_bias_act.cpp"), os.path.join(module_path, "fused_bias_act_kernel.cu"), ], ) else: fused = None print("fused_act.py is running on cpu") class FusedLeakyReLUFunctionBackward(Function): @staticmethod def forward(ctx, grad_output, out, negative_slope, scale): ctx.save_for_backward(out) ctx.negative_slope = negative_slope ctx.scale = scale empty = grad_output.new_empty(0) grad_input = fused.fused_bias_act( grad_output, empty, out, 3, 1, negative_slope, scale ) dim = [0] if grad_input.ndim > 2: dim += list(range(2, grad_input.ndim)) grad_bias = grad_input.sum(dim).detach() return grad_input, grad_bias @staticmethod def backward(ctx, gradgrad_input, gradgrad_bias): out, = ctx.saved_tensors gradgrad_out = fused.fused_bias_act( gradgrad_input, gradgrad_bias, out, 3, 1, ctx.negative_slope, ctx.scale ) return gradgrad_out, None, None, None class FusedLeakyReLUFunction(Function): @staticmethod def forward(ctx, input, bias, negative_slope, scale): empty = input.new_empty(0) out = fused.fused_bias_act(input, bias, empty, 3, 0, negative_slope, scale) ctx.save_for_backward(out) ctx.negative_slope = negative_slope ctx.scale = scale return out @staticmethod def backward(ctx, grad_output): out, = ctx.saved_tensors grad_input, grad_bias = FusedLeakyReLUFunctionBackward.apply( grad_output, out, ctx.negative_slope, ctx.scale ) return grad_input, grad_bias, None, None class FusedLeakyReLU(nn.Module): def __init__(self, channel, negative_slope=0.2, scale=2 ** 0.5): super().__init__() self.bias = nn.Parameter(torch.zeros(channel)) self.negative_slope = negative_slope self.scale = scale def forward(self, input): return fused_leaky_relu(input, self.bias, self.negative_slope, self.scale) def fused_leaky_relu(input, bias, negative_slope=0.2, scale=2 ** 0.5): if input.device.type == "cpu": rest_dim = [1] * (input.ndim - bias.ndim - 1) return ( F.leaky_relu( input + bias.view(1, bias.shape[0], *rest_dim), negative_slope=0.2 ) * scale ) else: return FusedLeakyReLUFunction.apply(input, bias, negative_slope, scale)