Spaces:
Sleeping
Sleeping
| """ | |
| DEIMv2: Real-Time Object Detection Meets DINOv3 | |
| Copyright (c) 2025 The DEIMv2 Authors. All Rights Reserved. | |
| --------------------------------------------------------------------------------- | |
| Modified from D-FINE (https://github.com/Peterande/D-FINE) | |
| Copyright (c) 2024 The D-FINE Authors. All Rights Reserved. | |
| """ | |
| import os | |
| import sys | |
| import cv2 # Added for video processing | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torchvision.transforms as T | |
| from PIL import Image, ImageDraw | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../'))) | |
| from engine.core import YAMLConfig | |
| def draw(images, labels, boxes, scores, thrh=0.45): | |
| for i, im in enumerate(images): | |
| draw = ImageDraw.Draw(im) | |
| scr = scores[i] | |
| lab = labels[i][scr > thrh] | |
| box = boxes[i][scr > thrh] | |
| scrs = scr[scr > thrh] | |
| for j, b in enumerate(box): | |
| draw.rectangle(list(b), outline='red') | |
| draw.text((b[0], b[1]), text=f"{lab[j].item()} {round(scrs[j].item(), 2)}", fill='blue', ) | |
| im.save('torch_results.jpg') | |
| def process_image(model, device, file_path, size=(640, 640), vit_backbone=False): | |
| im_pil = Image.open(file_path).convert('RGB') | |
| w, h = im_pil.size | |
| orig_size = torch.tensor([[w, h]]).to(device) | |
| transforms = T.Compose([ | |
| T.Resize(size), | |
| T.ToTensor(), | |
| T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
| if vit_backbone else T.Lambda(lambda x: x) | |
| ]) | |
| im_data = transforms(im_pil).unsqueeze(0).to(device) | |
| output = model(im_data, orig_size) | |
| labels, boxes, scores = output | |
| draw([im_pil], labels, boxes, scores) | |
| def process_video(model, device, file_path, size=(640, 640), vit_backbone=False): | |
| cap = cv2.VideoCapture(file_path) | |
| # Get video properties | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| orig_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| orig_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # Define the codec and create VideoWriter object | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter('torch_results.mp4', fourcc, fps, (orig_w, orig_h)) | |
| transforms = T.Compose([ | |
| T.Resize(size), | |
| T.ToTensor(), | |
| T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
| if vit_backbone else T.Lambda(lambda x: x) | |
| ]) | |
| frame_count = 0 | |
| print("Processing video frames...") | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Convert frame to PIL image | |
| frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| w, h = frame_pil.size | |
| orig_size = torch.tensor([[w, h]]).to(device) | |
| im_data = transforms(frame_pil).unsqueeze(0).to(device) | |
| output = model(im_data, orig_size) | |
| labels, boxes, scores = output | |
| # Draw detections on the frame | |
| draw([frame_pil], labels, boxes, scores) | |
| # Convert back to OpenCV image | |
| frame = cv2.cvtColor(np.array(frame_pil), cv2.COLOR_RGB2BGR) | |
| # Write the frame | |
| out.write(frame) | |
| frame_count += 1 | |
| if frame_count % 10 == 0: | |
| print(f"Processed {frame_count} frames...") | |
| cap.release() | |
| out.release() | |
| print("Video processing complete. Result saved as 'results_video.mp4'.") | |
| def main(args): | |
| """Main function""" | |
| cfg = YAMLConfig(args.config, resume=args.resume) | |
| if 'HGNetv2' in cfg.yaml_cfg: | |
| cfg.yaml_cfg['HGNetv2']['pretrained'] = False | |
| if args.resume: | |
| checkpoint = torch.load(args.resume, map_location='cpu') | |
| if 'ema' in checkpoint: | |
| state = checkpoint['ema']['module'] | |
| else: | |
| state = checkpoint['model'] | |
| else: | |
| raise AttributeError('Only support resume to load model.state_dict by now.') | |
| # Load train mode state and convert to deploy mode | |
| cfg.model.load_state_dict(state) | |
| class Model(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.model = cfg.model.deploy() | |
| self.postprocessor = cfg.postprocessor.deploy() | |
| def forward(self, images, orig_target_sizes): | |
| outputs = self.model(images) | |
| outputs = self.postprocessor(outputs, orig_target_sizes) | |
| return outputs | |
| device = args.device | |
| model = Model().to(device) | |
| img_size = cfg.yaml_cfg["eval_spatial_size"] | |
| vit_backbone = cfg.yaml_cfg.get('DINOv3STAs', False) | |
| # Check if the input file is an image or a video | |
| file_path = args.input | |
| if os.path.splitext(file_path)[-1].lower() in ['.jpg', '.jpeg', '.png', '.bmp']: | |
| # Process as image | |
| process_image(model, device, file_path, img_size, vit_backbone) | |
| print("Image processing complete.") | |
| else: | |
| # Process as video | |
| process_video(model, device, file_path, img_size, vit_backbone) | |
| if __name__ == '__main__': | |
| import argparse | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('-c', '--config', type=str, required=True) | |
| parser.add_argument('-r', '--resume', type=str, required=True) | |
| parser.add_argument('-i', '--input', type=str, required=True) | |
| parser.add_argument('-d', '--device', type=str, default='cpu') | |
| args = parser.parse_args() | |
| main(args) | |