Spaces:
Sleeping
Sleeping
import sys | |
import logging | |
import copy | |
import torch | |
from PIL import Image | |
import torchvision.transforms as transforms | |
from torchvision.transforms.functional import pil_to_tensor | |
from utils import factory | |
from utils.data_manager import DataManager | |
from utils.toolkit import count_parameters | |
from utils.data_manager import pil_loader | |
import os | |
import numpy as np | |
import json | |
import argparse | |
import imghdr | |
import time | |
def is_image_imghdr(path): | |
""" | |
Checks if a path points to a valid image using imghdr. | |
Args: | |
path: The path to the file. | |
Returns: | |
True if the path is a valid image, False otherwise. | |
""" | |
if not os.path.isfile(path): | |
return False | |
return imghdr.what(path) in ['jpeg', 'png'] | |
def _set_device(args): | |
device_type = args["device"] | |
gpus = [] | |
for device in device_type: | |
if device == -1: | |
device = torch.device("cpu") | |
else: | |
device = torch.device("cuda:{}".format(device)) | |
gpus.append(device) | |
args["device"] = gpus | |
def get_methods(object, spacing=20): | |
methodList = [] | |
for method_name in dir(object): | |
try: | |
if callable(getattr(object, method_name)): | |
methodList.append(str(method_name)) | |
except Exception: | |
methodList.append(str(method_name)) | |
processFunc = (lambda s: ' '.join(s.split())) or (lambda s: s) | |
for method in methodList: | |
try: | |
print(str(method.ljust(spacing)) + ' ' + | |
processFunc(str(getattr(object, method).__doc__)[0:90])) | |
except Exception: | |
print(method.ljust(spacing) + ' ' + ' getattr() failed') | |
def load_model(args): | |
_set_device(args) | |
model = factory.get_model(args["model_name"], args) | |
model.load_checkpoint(args["checkpoint"]) | |
return model | |
def main(): | |
args = setup_parser().parse_args() | |
param = load_json(args.config) | |
args = vars(args) # Converting argparse Namespace to a dict. | |
args.update(param) # Add parameters from json | |
assert args['output'].split(".")[-1] == "json" or os.path.isdir(args['output']) | |
model = load_model(args) | |
result = [] | |
if is_image_imghdr(args['input']): | |
img = pil_to_tensor(pil_loader(args['input'])) | |
img = img.unsqueeze(0) | |
predictions = model.inference(img) | |
out = {"img": args['input'].split("/")[-1]} | |
out.update({"predictions": [{"confident": confident, "index": pred, "label": label } for pred, label, confident in zip(predictions[0], predictions[1], predictions[2])]}) | |
result.append(out) | |
else: | |
image_list = filter(lambda x: is_image_imghdr(os.path.join(args['input'], x)), os.listdir(args['input'])) | |
for image in image_list: | |
print("Inference on image", image) | |
img = pil_to_tensor(pil_loader(os.path.join(args['input'], image))) | |
img = img.unsqueeze(0) | |
predictions = model.inference(img) | |
out = {"img": image.split("/")[-1]} | |
out.update({"predictions": [{"confident": confident, "index": pred, "label": label } for pred, label, confident in zip(predictions[0], predictions[1], predictions[2])]}) | |
result.append(out) | |
if args['output'].split(".")[-1] == "json": | |
with open(args['output'], "w+") as f: | |
json.dump(result, f, indent=4) | |
else: | |
with open(os.path.join(args['output'], "output_model_{}.json".format(time.time())), "w+") as f: | |
json.dump(result, f, indent=4) | |
def load_json(settings_path): | |
with open(settings_path) as data_file: | |
param = json.load(data_file) | |
return param | |
def setup_parser(): | |
parser = argparse.ArgumentParser(description='Reproduce of multiple continual learning algorthms.') | |
parser.add_argument('--config', type=str, help='Json file of settings.') | |
parser.add_argument('--checkpoint', type=str, help="path to checkpoint file. File must be a .pth format file") | |
parser.add_argument('--input', type=str, help="Path to input. This could be an folder or an image file") | |
parser.add_argument('--output', type=str, help = "Output path to save prediction") | |
return parser | |
if __name__ == '__main__': | |
main() | |