lengocduc195's picture
history blame
5.23 kB
from .. import util
import torch
from torch import nn, Tensor
from typing import Iterable, Dict
import torch.nn.functional as F
class MegaBatchMarginLoss(nn.Module):
Loss function inspired from ParaNMT paper:
Given a large batch (like 500 or more examples) of (anchor_i, positive_i) pairs,
find for each pair in the batch the hardest negative, i.e. find j != i such that cos_sim(anchor_i, positive_j)
is maximal. Then create from this a triplet (anchor_i, positive_i, positive_j) where positive_j
serves as the negative for this triplet.
Train than as with the triplet loss
def __init__(self, model, positive_margin: float = 0.8, negative_margin: float = 0.3, use_mini_batched_version: bool = True, mini_batch_size: int = 50):
:param model: SentenceTransformerModel
:param positive_margin: Positive margin, cos(anchor, positive) should be > positive_margin
:param negative_margin: Negative margin, cos(anchor, negative) should be < negative_margin
:param use_mini_batched_version: As large batch sizes require a lot of memory, we can use a mini-batched version. We break down the large batch with 500 examples to smaller batches with fewer examples.
:param mini_batch_size: Size for the mini-batches. Should be a devisor for the batch size in your data loader.
super(MegaBatchMarginLoss, self).__init__()
self.model = model
self.positive_margin = positive_margin
self.negative_margin = negative_margin
self.mini_batch_size = mini_batch_size
self.forward = self.forward_mini_batched if use_mini_batched_version else self.forward_non_mini_batched
def forward_mini_batched(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
anchor, positive = sentence_features
feature_names = list(anchor.keys())
with torch.no_grad():
all_positive_emb = self.model(positive)['sentence_embedding'].detach()
diagonal_matrix = torch.eye(len(all_positive_emb), len(all_positive_emb), device=all_positive_emb.device)
#Iterate over the triplets (anchor, positive, hardest_negative) in smaller mini_batch sizes
for start_idx in range(0, len(all_positive_emb), self.mini_batch_size):
end_idx = start_idx + self.mini_batch_size
anchor_emb = self.model({key: anchor[key][start_idx:end_idx] for key in feature_names})['sentence_embedding']
# Find hard negatives. For each anchor, find the hardest negative
# Store them in the triplets (anchor, positive, hardest_negative)
hard_negative_features = {key: [] for key in feature_names}
with torch.no_grad():
cos_scores = util.pytorch_cos_sim(anchor_emb, all_positive_emb)
negative_scores = cos_scores - 2 * diagonal_matrix[start_idx:end_idx] # Remove positive scores along the diagonal, set them to -1 so that they are not selected by the max() operation
negatives_max, negatives_ids = torch.max(negative_scores, dim=1)
for hard_negative_id in negatives_ids:
for key in feature_names:
for key in feature_names:
hard_negative_features[key] = torch.stack(hard_negative_features[key])
#Compute differentiable negative and positive embeddings
positive_emb = self.model({key: positive[key][start_idx:end_idx] for key in feature_names})['sentence_embedding']
negative_emb = self.model(hard_negative_features)['sentence_embedding']
assert anchor_emb.shape == positive_emb.shape
assert anchor_emb.shape == negative_emb.shape
#Compute loss
pos_cosine = F.cosine_similarity(anchor_emb, positive_emb)
neg_cosine = F.cosine_similarity(anchor_emb, negative_emb)
losses = F.relu(self.positive_margin - pos_cosine) + F.relu(neg_cosine - self.negative_margin)
losses = losses.mean()
#Backpropagate unless it is the last mini batch. The last mini-batch will be back propagated by the outside train loop
if end_idx < len(cos_scores):
return losses
##### Non mini-batched version ###
def forward_non_mini_batched(self, sentence_features: Iterable[Dict[str, Tensor]], labels: Tensor):
reps = [self.model(sentence_feature)['sentence_embedding'] for sentence_feature in sentence_features]
embeddings_a, embeddings_b = reps
cos_scores = util.pytorch_cos_sim(embeddings_a, embeddings_b)
positive_scores = torch.diagonal(cos_scores)
negative_scores = cos_scores - (2*torch.eye(*cos_scores.shape, device=cos_scores.device)) # Remove positive scores along the diagonal
negatives_max, _ = torch.max(negative_scores, dim=1)
losses = F.relu(self.positive_margin - positive_scores) + F.relu(negatives_max - self.negative_margin)
return losses.mean()