| import torch |
| import torch.nn as nn |
| import os |
| import torch.nn.functional as F |
|
|
| |
| class SRCNN(nn.Module): |
| def __init__(self): |
| super(SRCNN, self).__init__() |
| self.conv1 = nn.Conv2d(1, 64, kernel_size=9, padding=4) |
| self.conv2 = nn.Conv2d(64, 32, kernel_size=5, padding=2) |
| self.conv3 = nn.Conv2d(32, 1, kernel_size=5, padding=2) |
| self.relu = nn.ReLU(inplace=True) |
|
|
| def forward(self, x): |
| |
| if x.shape[2:] != (x.shape[2]*4, x.shape[3]*4): |
| x = F.interpolate(x, scale_factor=4, mode='bicubic', align_corners=False) |
| x = self.relu(self.conv1(x)) |
| x = self.relu(self.conv2(x)) |
| x = self.conv3(x) |
| return x |
|
|
|
|
|
|
| |
| class SatlasSR(nn.Module): |
| def __init__(self): |
| super(SatlasSR, self).__init__() |
| |
| |
| |
| pass |
|
|
| def forward(self, x): |
| return F.interpolate(x, scale_factor=4, mode='bicubic', align_corners=False) |
|
|
| |
| class ResidualDenseBlock(nn.Module): |
| def __init__(self, num_feat=64, num_grow_ch=32): |
| super(ResidualDenseBlock, self).__init__() |
| self.conv1 = nn.Conv2d(num_feat, num_grow_ch, 3, 1, 1) |
| self.conv2 = nn.Conv2d(num_feat + num_grow_ch, num_grow_ch, 3, 1, 1) |
| self.conv3 = nn.Conv2d(num_feat + 2 * num_grow_ch, num_grow_ch, 3, 1, 1) |
| self.conv4 = nn.Conv2d(num_feat + 3 * num_grow_ch, num_grow_ch, 3, 1, 1) |
| self.conv5 = nn.Conv2d(num_feat + 4 * num_grow_ch, num_feat, 3, 1, 1) |
| self.lrelu = nn.LeakyReLU(negative_slope=0.2, inplace=True) |
|
|
| def forward(self, x): |
| x1 = self.lrelu(self.conv1(x)) |
| x2 = self.lrelu(self.conv2(torch.cat((x, x1), 1))) |
| x3 = self.lrelu(self.conv3(torch.cat((x, x1, x2), 1))) |
| x4 = self.lrelu(self.conv4(torch.cat((x, x1, x2, x3), 1))) |
| x5 = self.conv5(torch.cat((x, x1, x2, x3, x4), 1)) |
| return x5 * 0.2 + x |
|
|
| class RRDB(nn.Module): |
| def __init__(self, num_feat, num_grow_ch=32): |
| super(RRDB, self).__init__() |
| self.rdb1 = ResidualDenseBlock(num_feat, num_grow_ch) |
| self.rdb2 = ResidualDenseBlock(num_feat, num_grow_ch) |
| self.rdb3 = ResidualDenseBlock(num_feat, num_grow_ch) |
|
|
| def forward(self, x): |
| out = self.rdb1(x) |
| out = self.rdb2(out) |
| out = self.rdb3(out) |
| return out * 0.2 + x |
|
|
| class RRDBNet(nn.Module): |
| def __init__(self): |
| super(RRDBNet, self).__init__() |
| num_in_ch=3 |
| num_out_ch=3 |
| num_feat=64 |
| num_block=23 |
| num_grow_ch=32 |
| self.conv_first = nn.Conv2d(num_in_ch, num_feat, 3, 1, 1) |
| self.body = nn.Sequential(*[RRDB(num_feat=num_feat, num_grow_ch=num_grow_ch) for _ in range(num_block)]) |
| self.conv_body = nn.Conv2d(num_feat, num_feat, 3, 1, 1) |
| self.conv_up1 = nn.Conv2d(num_feat, num_feat, 3, 1, 1) |
| self.conv_up2 = nn.Conv2d(num_feat, num_feat, 3, 1, 1) |
| self.conv_hr = nn.Conv2d(num_feat, num_feat, 3, 1, 1) |
| self.conv_last = nn.Conv2d(num_feat, num_out_ch, 3, 1, 1) |
| self.lrelu = nn.LeakyReLU(negative_slope=0.2, inplace=True) |
|
|
| def forward(self, x): |
| feat = self.conv_first(x) |
| body_feat = self.conv_body(self.body(feat)) |
| feat = feat + body_feat |
| feat = self.lrelu(self.conv_up1(F.interpolate(feat, scale_factor=2, mode='nearest'))) |
| feat = self.lrelu(self.conv_up2(F.interpolate(feat, scale_factor=2, mode='nearest'))) |
| out = self.conv_last(self.lrelu(self.conv_hr(feat))) |
| return out |
|
|
| def load_model(model_name, model_path, device): |
| if not os.path.exists(model_path): |
| return None |
| |
| if model_name == "srcnn": |
| model = SRCNN() |
| elif model_name == "satlas": |
| model = SatlasSR() |
| elif model_name == "esrgan": |
| model = RRDBNet() |
| else: |
| return None |
|
|
| try: |
| state_dict = torch.load(model_path, map_location=device) |
| |
| |
| if 'params_ema' in state_dict: |
| state_dict = state_dict['params_ema'] |
| elif 'params' in state_dict: |
| state_dict = state_dict['params'] |
|
|
| |
| |
| model.load_state_dict(state_dict, strict=False) |
| model.eval() |
| model.to(device) |
| return model |
| except Exception as e: |
| print(f"Error loading {model_name}: {e}") |
| return None |
|
|
| def get_available_models(model_dir="models", device="cpu"): |
| models = {} |
| |
| paths = { |
| "srcnn": os.path.join(model_dir, "srcnn_x4.pth"), |
| "satlas": os.path.join(model_dir, "aerial_swinb_si.pth"), |
| "esrgan": os.path.join(model_dir, "RealESRGAN_x4plus.pth") |
| } |
| |
| for name, path in paths.items(): |
| if os.path.exists(path): |
| print(f"Loading {name}...") |
| model = load_model(name, path, device) |
| if model is not None: |
| models[name] = model |
| else: |
| print(f"Model file for {name} not found at {path}. Skipping.") |
| |
| return models |
|
|