detection_metrics / utils.py
sklum's picture
Fix modules
3e2a0ca
import copy
import pickle
from typing import Dict, List, Tuple, Union
import numpy as np
import torch
import torch.distributed as dist
from datasets import Dataset
from .cocoeval import COCOeval
# Typings
_TYPING_BOX = Tuple[float, float, float, float]
_TYPING_SCORES = List[float]
_TYPING_LABELS = List[int]
_TYPING_BOXES = List[_TYPING_BOX]
_TYPING_PRED_REF = Union[_TYPING_SCORES, _TYPING_LABELS, _TYPING_BOXES]
_TYPING_PREDICTION = Dict[str, _TYPING_PRED_REF]
_TYPING_REFERENCE = Dict[str, _TYPING_PRED_REF]
_TYPING_PREDICTIONS = Dict[int, _TYPING_PREDICTION]
def convert_to_xywh(boxes: torch.Tensor) -> torch.Tensor:
"""
Convert bounding boxes from (xmin, ymin, xmax, ymax) format to (x, y, width, height) format.
Args:
boxes (torch.Tensor): Tensor of shape (N, 4) representing bounding boxes in \
(xmin, ymin, xmax, ymax) format.
Returns:
torch.Tensor: Tensor of shape (N, 4) representing bounding boxes in (x, y, width, height) \
format.
"""
xmin, ymin, xmax, ymax = boxes.unbind(1)
return torch.stack((xmin, ymin, xmax - xmin, ymax - ymin), dim=1)
def create_common_coco_eval(
coco_eval: COCOeval, img_ids: List[int], eval_imgs: np.ndarray
) -> None:
"""
Create a common COCO evaluation by merging image IDs and evaluation images into the \
coco_eval object.
Args:
coco_eval: COCOeval evaluation object.
img_ids (List[int]): Tensor of image IDs.
eval_imgs (torch.Tensor): Tensor of evaluation images.
"""
img_ids, eval_imgs = merge(img_ids, eval_imgs)
img_ids = list(img_ids)
eval_imgs = list(eval_imgs.flatten())
coco_eval.evalImgs = eval_imgs
coco_eval.params.imgIds = img_ids
coco_eval._paramsEval = copy.deepcopy(coco_eval.params)
def merge(img_ids: List[int], eval_imgs: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Merge image IDs and evaluation images from different processes.
Args:
img_ids (List[int]): List of image ID arrays from different processes.
eval_imgs (np.ndarray): Evaluation images from different processes.
Returns:
Tuple[np.ndarray, np.ndarray]: Merged image IDs and evaluation images.
"""
all_img_ids = all_gather(img_ids)
all_eval_imgs = all_gather(eval_imgs)
merged_img_ids = []
for p in all_img_ids:
merged_img_ids.extend(p)
merged_eval_imgs = []
for p in all_eval_imgs:
merged_eval_imgs.append(p)
merged_img_ids = np.array(merged_img_ids)
merged_eval_imgs = np.concatenate(merged_eval_imgs, 2)
# keep only unique (and in sorted order) images
merged_img_ids, idx = np.unique(merged_img_ids, return_index=True)
merged_eval_imgs = merged_eval_imgs[..., idx]
return merged_img_ids, merged_eval_imgs
def all_gather(data: List[int]) -> List[List[int]]:
"""
Run all_gather on arbitrary picklable data (not necessarily tensors).
Args:
data (List[int]): any picklable object
Returns:
List[List[int]]: list of data gathered from each rank
"""
world_size = get_world_size()
if world_size == 1:
return [data]
# serialized to a Tensor
buffer = pickle.dumps(data)
storage = torch.ByteStorage.from_buffer(buffer)
tensor = torch.ByteTensor(storage).to("cuda")
# obtain Tensor size of each rank
local_size = torch.tensor([tensor.numel()], device="cuda")
size_list = [torch.tensor([0], device="cuda") for _ in range(world_size)]
dist.all_gather(size_list, local_size)
size_list = [int(size.item()) for size in size_list]
max_size = max(size_list)
# receiving Tensor from all ranks
# we pad the tensor because torch all_gather does not support
# gathering tensors of different shapes
tensor_list = []
for _ in size_list:
tensor_list.append(torch.empty((max_size,), dtype=torch.uint8, device="cuda"))
if local_size != max_size:
padding = torch.empty(
size=(max_size - local_size,), dtype=torch.uint8, device="cuda"
)
tensor = torch.cat((tensor, padding), dim=0)
dist.all_gather(tensor_list, tensor)
data_list = []
for size, tensor in zip(size_list, tensor_list):
buffer = tensor.cpu().numpy().tobytes()[:size]
data_list.append(pickle.loads(buffer))
return data_list
def get_world_size() -> int:
"""
Get the number of processes in the distributed environment.
Returns:
int: Number of processes.
"""
if not is_dist_avail_and_initialized():
return 1
return dist.get_world_size()
def is_dist_avail_and_initialized() -> bool:
"""
Check if distributed environment is available and initialized.
Returns:
bool: True if distributed environment is available and initialized, False otherwise.
"""
return dist.is_available() and dist.is_initialized()