Spaces:
Running
on
Zero
Running
on
Zero
from multiprocessing.sharedctypes import Value | |
import torch | |
import torch.distributed.nn | |
from torch import distributed as dist, nn as nn | |
from torch.nn import functional as F | |
import numpy as np | |
from sklearn.metrics import average_precision_score, roc_auc_score, accuracy_score | |
try: | |
import horovod.torch as hvd | |
except ImportError: | |
hvd = None | |
def gather_features( | |
audio_features, | |
text_features, | |
audio_features_mlp=None, | |
text_features_mlp=None, | |
local_loss=False, | |
gather_with_grad=False, | |
rank=0, | |
world_size=1, | |
use_horovod=False, | |
mlp_loss=False, | |
): | |
if use_horovod: | |
assert hvd is not None, "Please install horovod" | |
if gather_with_grad: | |
all_audio_features = hvd.allgather(audio_features) | |
all_text_features = hvd.allgather(text_features) | |
if mlp_loss: | |
all_audio_features_mlp = hvd.allgather(audio_features_mlp) | |
all_text_features_mlp = hvd.allgather(text_features_mlp) | |
else: | |
with torch.no_grad(): | |
all_audio_features = hvd.allgather(audio_features) | |
all_text_features = hvd.allgather(text_features) | |
if mlp_loss: | |
all_audio_features_mlp = hvd.allgather(audio_features_mlp) | |
all_text_features_mlp = hvd.allgather(text_features_mlp) | |
if not local_loss: | |
# ensure grads for local rank when all_* features don't have a gradient | |
gathered_audio_features = list( | |
all_audio_features.chunk(world_size, dim=0) | |
) | |
gathered_text_features = list( | |
all_text_features.chunk(world_size, dim=0) | |
) | |
gathered_audio_features[rank] = audio_features | |
gathered_text_features[rank] = text_features | |
all_audio_features = torch.cat(gathered_audio_features, dim=0) | |
all_text_features = torch.cat(gathered_text_features, dim=0) | |
if mlp_loss: | |
gathered_audio_features_mlp = list( | |
all_audio_features_mlp.chunk(world_size, dim=0) | |
) | |
gathered_text_features_mlp = list( | |
all_text_features_mlp.chunk(world_size, dim=0) | |
) | |
gathered_audio_features_mlp[rank] = audio_features_mlp | |
gathered_text_features_mlp[rank] = text_features_mlp | |
all_audio_features_mlp = torch.cat( | |
gathered_audio_features_mlp, dim=0 | |
) | |
all_text_features_mlp = torch.cat(gathered_text_features_mlp, dim=0) | |
else: | |
# We gather tensors from all gpus | |
if gather_with_grad: | |
all_audio_features = torch.cat( | |
torch.distributed.nn.all_gather(audio_features), dim=0 | |
) | |
all_text_features = torch.cat( | |
torch.distributed.nn.all_gather(text_features), dim=0 | |
) | |
if mlp_loss: | |
all_audio_features_mlp = torch.cat( | |
torch.distributed.nn.all_gather(audio_features_mlp), dim=0 | |
) | |
all_text_features_mlp = torch.cat( | |
torch.distributed.nn.all_gather(text_features_mlp), dim=0 | |
) | |
else: | |
gathered_audio_features = [ | |
torch.zeros_like(audio_features) for _ in range(world_size) | |
] | |
gathered_text_features = [ | |
torch.zeros_like(text_features) for _ in range(world_size) | |
] | |
dist.all_gather(gathered_audio_features, audio_features) | |
dist.all_gather(gathered_text_features, text_features) | |
if mlp_loss: | |
gathered_audio_features_mlp = [ | |
torch.zeros_like(audio_features_mlp) for _ in range(world_size) | |
] | |
gathered_text_features_mlp = [ | |
torch.zeros_like(text_features_mlp) for _ in range(world_size) | |
] | |
dist.all_gather(gathered_audio_features_mlp, audio_features_mlp) | |
dist.all_gather(gathered_text_features_mlp, text_features_mlp) | |
if not local_loss: | |
# ensure grads for local rank when all_* features don't have a gradient | |
gathered_audio_features[rank] = audio_features | |
gathered_text_features[rank] = text_features | |
if mlp_loss: | |
gathered_audio_features_mlp[rank] = audio_features_mlp | |
gathered_text_features_mlp[rank] = text_features_mlp | |
all_audio_features = torch.cat(gathered_audio_features, dim=0) | |
all_text_features = torch.cat(gathered_text_features, dim=0) | |
if mlp_loss: | |
all_audio_features_mlp = torch.cat(gathered_audio_features_mlp, dim=0) | |
all_text_features_mlp = torch.cat(gathered_text_features_mlp, dim=0) | |
if mlp_loss: | |
return ( | |
all_audio_features, | |
all_text_features, | |
all_audio_features_mlp, | |
all_text_features_mlp, | |
) | |
else: | |
return all_audio_features, all_text_features | |
class ClipLoss(nn.Module): | |
def __init__( | |
self, | |
local_loss=False, | |
gather_with_grad=False, | |
cache_labels=False, | |
rank=0, | |
world_size=1, | |
use_horovod=False, | |
mlp_loss=False, | |
weight_loss_kappa=0, | |
): | |
super().__init__() | |
self.local_loss = local_loss | |
self.gather_with_grad = gather_with_grad | |
self.cache_labels = cache_labels | |
self.rank = rank | |
self.world_size = world_size | |
self.use_horovod = use_horovod | |
self.mlp_loss = mlp_loss | |
self.weighted_loss = bool(weight_loss_kappa != 0) | |
self.weight_loss_kappa = weight_loss_kappa | |
# cache state | |
self.prev_num_logits = 0 | |
self.labels = {} | |
def forward( | |
self, | |
audio_features, | |
text_features, | |
logit_scale_a, | |
logit_scale_t=None, | |
audio_features_mlp=None, | |
text_features_mlp=None, | |
): | |
device = audio_features.device | |
if self.mlp_loss: | |
if self.world_size > 1: | |
( | |
all_audio_features, | |
all_text_features, | |
all_audio_features_mlp, | |
all_text_features_mlp, | |
) = gather_features( | |
audio_features=audio_features, | |
text_features=text_features, | |
audio_features_mlp=audio_features_mlp, | |
text_features_mlp=text_features_mlp, | |
local_loss=self.local_loss, | |
gather_with_grad=self.gather_with_grad, | |
rank=self.rank, | |
world_size=self.world_size, | |
use_horovod=self.use_horovod, | |
mlp_loss=self.mlp_loss, | |
) | |
if self.local_loss: | |
a_logits_per_audio = ( | |
logit_scale_a * audio_features @ all_text_features_mlp.T | |
) | |
a_logits_per_text = ( | |
logit_scale_a * text_features_mlp @ all_audio_features.T | |
) | |
t_logits_per_audio = ( | |
logit_scale_t * audio_features_mlp @ all_text_features.T | |
) | |
t_logits_per_text = ( | |
logit_scale_t * text_features @ all_audio_features_mlp.T | |
) | |
else: | |
a_logits_per_audio = ( | |
logit_scale_a * all_audio_features @ all_text_features_mlp.T | |
) | |
a_logits_per_text = a_logits_per_audio.T | |
t_logits_per_audio = ( | |
logit_scale_t * all_audio_features_mlp @ all_text_features.T | |
) | |
t_logits_per_text = t_logits_per_audio.T | |
else: | |
a_logits_per_audio = ( | |
logit_scale_a * audio_features @ text_features_mlp.T | |
) | |
a_logits_per_text = logit_scale_a * text_features_mlp @ audio_features.T | |
t_logits_per_audio = ( | |
logit_scale_t * audio_features_mlp @ text_features.T | |
) | |
t_logits_per_text = logit_scale_t * text_features @ audio_features_mlp.T | |
# calculated ground-truth and cache if enabled | |
num_logits = a_logits_per_audio.shape[0] | |
if self.prev_num_logits != num_logits or device not in self.labels: | |
labels = torch.arange(num_logits, device=device, dtype=torch.long) | |
if self.world_size > 1 and self.local_loss: | |
labels = labels + num_logits * self.rank | |
if self.cache_labels: | |
self.labels[device] = labels | |
self.prev_num_logits = num_logits | |
else: | |
labels = self.labels[device] | |
if not self.weighted_loss: | |
total_loss = ( | |
F.cross_entropy(a_logits_per_audio, labels) | |
+ F.cross_entropy(a_logits_per_text, labels) | |
+ F.cross_entropy(t_logits_per_audio, labels) | |
+ F.cross_entropy(t_logits_per_text, labels) | |
) / 4 | |
else: | |
audio_weight = (audio_features @ audio_features.T).detach() | |
audio_weight = ( | |
torch.exp( | |
torch.sum(audio_weight, axis=1) | |
/ (self.weight_loss_kappa * len(audio_weight)) | |
) | |
).detach() | |
text_weight = (text_features @ text_features.T).detach() | |
text_weight = ( | |
torch.exp( | |
torch.sum(text_weight, axis=1) | |
/ (self.weight_loss_kappa * len(text_features)) | |
) | |
).detach() | |
total_loss = ( | |
F.cross_entropy(a_logits_per_audio, labels, weight=audio_weight) | |
+ F.cross_entropy(a_logits_per_text, labels, weight=audio_weight) | |
+ F.cross_entropy(t_logits_per_audio, labels, weight=text_weight) | |
+ F.cross_entropy(t_logits_per_text, labels, weight=text_weight) | |
) / 4 | |
else: | |
if self.world_size > 1: | |
all_audio_features, all_text_features = gather_features( | |
audio_features=audio_features, | |
text_features=text_features, | |
local_loss=self.local_loss, | |
gather_with_grad=self.gather_with_grad, | |
rank=self.rank, | |
world_size=self.world_size, | |
use_horovod=self.use_horovod, | |
mlp_loss=self.mlp_loss, | |
) | |
if self.local_loss: | |
logits_per_audio = ( | |
logit_scale_a * audio_features @ all_text_features.T | |
) | |
logits_per_text = ( | |
logit_scale_a * text_features @ all_audio_features.T | |
) | |
else: | |
logits_per_audio = ( | |
logit_scale_a * all_audio_features @ all_text_features.T | |
) | |
logits_per_text = logits_per_audio.T | |
else: | |
logits_per_audio = logit_scale_a * audio_features @ text_features.T | |
logits_per_text = logit_scale_a * text_features @ audio_features.T | |
# calculated ground-truth and cache if enabled | |
num_logits = logits_per_audio.shape[0] | |
if self.prev_num_logits != num_logits or device not in self.labels: | |
labels = torch.arange(num_logits, device=device, dtype=torch.long) | |
if self.world_size > 1 and self.local_loss: | |
labels = labels + num_logits * self.rank | |
if self.cache_labels: | |
self.labels[device] = labels | |
self.prev_num_logits = num_logits | |
else: | |
labels = self.labels[device] | |
if not self.weighted_loss: | |
total_loss = ( | |
F.cross_entropy(logits_per_audio, labels) | |
+ F.cross_entropy(logits_per_text, labels) | |
) / 2 | |
else: | |
audio_weight = (all_audio_features @ all_audio_features.T).detach() | |
audio_weight = ( | |
torch.exp( | |
torch.sum(audio_weight, axis=1) | |
/ (self.weight_loss_kappa * len(all_audio_features)) | |
) | |
).detach() | |
text_weight = (all_text_features @ all_text_features.T).detach() | |
text_weight = ( | |
torch.exp( | |
torch.sum(text_weight, axis=1) | |
/ (self.weight_loss_kappa * len(all_text_features)) | |
) | |
).detach() | |
total_loss = ( | |
F.cross_entropy(logits_per_audio, labels, weight=text_weight) | |
+ F.cross_entropy(logits_per_text, labels, weight=audio_weight) | |
) / 2 | |
return total_loss | |
def lp_gather_features(pred, target, world_size=1, use_horovod=False): | |
if use_horovod: | |
assert hvd is not None, "Please install horovod" | |
with torch.no_grad(): | |
all_preds = hvd.allgather(pred) | |
all_targets = hvd.allgath(target) | |
else: | |
gathered_preds = [torch.zeros_like(pred) for _ in range(world_size)] | |
gathered_targets = [torch.zeros_like(target) for _ in range(world_size)] | |
dist.all_gather(gathered_preds, pred) | |
dist.all_gather(gathered_targets, target) | |
all_preds = torch.cat(gathered_preds, dim=0) | |
all_targets = torch.cat(gathered_targets, dim=0) | |
return all_preds, all_targets | |
def get_map(pred, target): | |
pred = torch.sigmoid(pred).numpy() | |
target = target.numpy() | |
return np.mean(average_precision_score(target, pred, average=None)) | |
def get_acc(pred, target): | |
pred = torch.argmax(pred, 1).numpy() | |
target = torch.argmax(target, 1).numpy() | |
return accuracy_score(target, pred) | |
def get_mauc(pred, target): | |
pred = torch.sigmoid(pred).numpy() | |
target = target.numpy() | |
return np.mean(roc_auc_score(target, pred, average=None)) | |
class LPMetrics(object): | |
def __init__(self, metric_names=["map", "acc", "mauc"]): | |
self.metrics = [] | |
for name in metric_names: | |
self.metrics.append(self.get_metric(name)) | |
self.metric_names = metric_names | |
def get_metric(self, name): | |
if name == "map": | |
return get_map | |
elif name == "acc": | |
return get_acc | |
elif name == "mauc": | |
return get_mauc | |
else: | |
raise ValueError(f"the metric should be at least one of [map, acc, mauc]") | |
def evaluate_mertics(self, pred, target): | |
metric_dict = {} | |
for i in range(len(self.metric_names)): | |
metric_dict[self.metric_names[i]] = self.metrics[i](pred, target) | |
return metric_dict | |
def calc_celoss(pred, target): | |
target = torch.argmax(target, 1).long() | |
return nn.CrossEntropyLoss()(pred, target) | |
class LPLoss(nn.Module): | |
def __init__(self, loss_name): | |
super().__init__() | |
if loss_name == "bce": | |
self.loss_func = nn.BCEWithLogitsLoss() | |
elif loss_name == "ce": | |
self.loss_func = calc_celoss | |
elif loss_name == "mse": | |
self.loss_func = nn.MSELoss() | |
else: | |
raise ValueError(f"the loss func should be at least one of [bce, ce, mse]") | |
def forward(self, pred, target): | |
loss = self.loss_func(pred, target) | |
return loss | |