gavinyuan
udpate: app.py import FSGenerator
a104d3f
import torch
import torch.nn.functional as F
from torch import nn
from torch.nn import Parameter
import numpy as np
__all__ = ['Softmax', 'AMCosFace', 'AMArcFace', ]
MIN_NUM_PATCHES = 16
""" All losses can run in 'torch.distributed.DistributedDataParallel'.
"""
class Softmax(nn.Module):
r"""Implementation of Softmax (normal classification head):
Args:
in_features: dimension (d_in) of input feature (B, d_in)
out_features: dimension (d_out) of output feature (B, d_out)
device_id: the ID of GPU where the model will be trained by data parallel (or DP). (not used)
if device_id=None, it will be trained on model parallel (or DDP). (recommend!)
"""
def __init__(self,
in_features: int,
out_features: int,
device_id,
):
super(Softmax, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.device_id = device_id
self.weight = Parameter(torch.FloatTensor(out_features, in_features))
self.bias = Parameter(torch.FloatTensor(out_features))
nn.init.xavier_uniform_(self.weight)
nn.init.zeros_(self.bias)
def forward(self, embedding, label):
"""
:param embedding: learned face representation
:param label:
- label >= 0: ground truth identity
- label = -1: invalid identity for this GPU (refer to 'PartialFC')
+ Example: label = torch.tensor([-1, 4, -1, 5, 3, -1])
:return:
"""
if self.device_id is None:
""" Regular linear layer.
"""
out = F.linear(embedding, self.weight, self.bias)
else:
raise ValueError('DataParallel is not implemented yet.')
x = input
sub_weights = torch.chunk(self.weight, len(self.device_id), dim=0)
sub_biases = torch.chunk(self.bias, len(self.device_id), dim=0)
temp_x = x.cuda(self.device_id[0])
weight = sub_weights[0].cuda(self.device_id[0])
bias = sub_biases[0].cuda(self.device_id[0])
out = F.linear(temp_x, weight, bias)
for i in range(1, len(self.device_id)):
temp_x = x.cuda(self.device_id[i])
weight = sub_weights[i].cuda(self.device_id[i])
bias = sub_biases[i].cuda(self.device_id[i])
out = torch.cat((out, F.linear(temp_x, weight, bias).cuda(self.device_id[0])), dim=1)
return out
""" Not Used """
class ArcFace(nn.Module):
r"""Implement of ArcFace (https://arxiv.org/pdf/1801.07698v1.pdf):
Args:
in_features: size of each input sample
out_features: size of each output sample
device_id: the ID of GPU where the model will be trained by model parallel.
if device_id=None, it will be trained on CPU without model parallel.
s: norm of input feature
m: margin
cos(theta+m)
"""
def __init__(self, in_features, out_features, device_id, s=64.0, m=0.50, easy_margin=False):
super(ArcFace, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.device_id = device_id
self.s = s
self.m = m
print('ArcFace, s=%.1f, m=%.2f' % (s, m))
self.weight = Parameter(torch.FloatTensor(out_features, in_features))
nn.init.xavier_uniform_(self.weight)
self.easy_margin = easy_margin
self.cos_m = np.cos(m)
self.sin_m = np.sin(m)
self.th = np.cos(np.pi - m)
self.mm = np.sin(np.pi - m) * m
def forward(self, input, label):
# --------------------------- cos(theta) & phi(theta) ---------------------------
if self.device_id == None:
cosine = F.linear(F.normalize(input), F.normalize(self.weight))
else:
x = input
sub_weights = torch.chunk(self.weight, len(self.device_id), dim=0)
temp_x = x.cuda(self.device_id[0])
weight = sub_weights[0].cuda(self.device_id[0])
cosine = F.linear(F.normalize(temp_x), F.normalize(weight))
for i in range(1, len(self.device_id)):
temp_x = x.cuda(self.device_id[i])
weight = sub_weights[i].cuda(self.device_id[i])
cosine = torch.cat((cosine, F.linear(F.normalize(temp_x), F.normalize(weight)).cuda(self.device_id[0])),
dim=1)
sine = torch.sqrt(1.0 - torch.pow(cosine, 2))
phi = cosine * self.cos_m - sine * self.sin_m
if self.easy_margin:
phi = torch.where(cosine > 0, phi, cosine)
else:
phi = torch.where(cosine > self.th, phi, cosine - self.mm)
# --------------------------- convert label to one-hot ---------------------------
one_hot = torch.zeros(cosine.size())
if self.device_id != None:
one_hot = one_hot.cuda(self.device_id[0])
else:
one_hot = one_hot.cuda()
one_hot.scatter_(1, label.view(-1, 1).long(), 1)
# -------------torch.where(out_i = {x_i if condition_i else y_i) -------------
output = (one_hot * phi) + (
(1.0 - one_hot) * cosine) # you can use torch.where if your torch.__version__ is 0.4
output *= self.s
return output
""" Not Used """
class CosFace(nn.Module):
r"""Implement of CosFace (https://arxiv.org/pdf/1801.09414.pdf):
Args:
in_features: size of each input sample
out_features: size of each output sample
device_id: the ID of GPU where the model will be trained by model parallel.
if device_id=None, it will be trained on CPU without model parallel.
s: norm of input feature
m: margin
cos(theta)-m
"""
def __init__(self, in_features, out_features, device_id, s=64.0, m=0.4):
super(CosFace, self).__init__()
print('CosFace, s=%.1f, m=%.2f' % (s, m))
self.in_features = in_features
self.out_features = out_features
self.device_id = device_id
self.s = s
self.m = m
self.weight = Parameter(torch.FloatTensor(out_features, in_features))
nn.init.xavier_uniform_(self.weight)
def forward(self, input, label):
# --------------------------- cos(theta) & phi(theta) ---------------------------
if self.device_id == None:
cosine = F.linear(F.normalize(input), F.normalize(self.weight))
else:
x = input
sub_weights = torch.chunk(self.weight, len(self.device_id), dim=0)
temp_x = x.cuda(self.device_id[0])
weight = sub_weights[0].cuda(self.device_id[0])
cosine = F.linear(F.normalize(temp_x), F.normalize(weight))
for i in range(1, len(self.device_id)):
temp_x = x.cuda(self.device_id[i])
weight = sub_weights[i].cuda(self.device_id[i])
cosine = torch.cat((cosine, F.linear(F.normalize(temp_x), F.normalize(weight)).cuda(self.device_id[0])),
dim=1)
phi = cosine - self.m
# --------------------------- convert label to one-hot ---------------------------
one_hot = torch.zeros(cosine.size()).cuda()
if self.device_id != None:
one_hot = one_hot.cuda(self.device_id[0])
# one_hot = one_hot.cuda() if cosine.is_cuda else one_hot
one_hot.scatter_(1, label.cuda(self.device_id[0]).view(-1, 1).long(), 1)
else:
one_hot.scatter_(1, label.view(-1, 1).long(), 1)
# -------------torch.where(out_i = {x_i if condition_i else y_i) -------------
output = (one_hot * phi) + (
(1.0 - one_hot) * cosine) # you can use torch.where if your torch.__version__ is 0.4
output *= self.s
return output
def __repr__(self):
return self.__class__.__name__ + '(' \
+ 'in_features = ' + str(self.in_features) \
+ ', out_features = ' + str(self.out_features) \
+ ', s = ' + str(self.s) \
+ ', m = ' + str(self.m) + ')'
class AMCosFace(nn.Module):
r"""Implementation of Adaptive Margin CosFace:
cos(theta)-m+k(theta-a)
When k is 0, AMCosFace degenerates into CosFace.
Args:
in_features: dimension (d_in) of input feature (B, d_in)
out_features: dimension (d_out) of output feature (B, d_out)
device_id: the ID of GPU where the model will be trained by data parallel (or DP). (not used)
if device_id=None, it will be trained on model parallel (or DDP). (recommend!)
s: norm of input feature
m: margin
a: AM Loss
k: AM Loss
"""
def __init__(self,
in_features: int,
out_features: int,
device_id,
s: float = 64.0,
m: float = 0.4,
a: float = 1.2,
k: float = 0.1,
):
super(AMCosFace, self).__init__()
print('AMCosFace, s=%.1f, m=%.2f, a=%.2f, k=%.2f' % (s, m, a, k))
self.in_features = in_features
self.out_features = out_features
self.device_id = device_id
self.s = s
self.m = m
self.a = a
self.k = k
""" Weight Matrix W (d_out, d_in) """
self.weight = Parameter(torch.FloatTensor(out_features, in_features))
nn.init.xavier_uniform_(self.weight)
def forward(self, embedding, label):
"""
:param embedding: learned face representation
:param label:
- label >= 0: ground truth identity
- label = -1: invalid identity for this GPU (refer to 'PartialFC')
+ Example: label = torch.tensor([-1, 4, -1, 5, 3, -1])
:return:
"""
if self.device_id is None:
""" - embedding: shape is (B, d_in)
- weight: shape is (d_out, d_in)
- cosine: shape is (B, d_out)
+ F.normalize is very important here.
"""
cosine = F.linear(F.normalize(embedding), F.normalize(self.weight)) # y = xA^T + b
else:
raise ValueError('DataParallel is not implemented yet.')
x = input
sub_weights = torch.chunk(self.weight, len(self.device_id), dim=0)
temp_x = x.cuda(self.device_id[0])
weight = sub_weights[0].cuda(self.device_id[0])
cosine = F.linear(F.normalize(temp_x), F.normalize(weight))
for i in range(1, len(self.device_id)):
temp_x = x.cuda(self.device_id[i])
weight = sub_weights[i].cuda(self.device_id[i])
cosine = torch.cat((cosine, F.linear(F.normalize(temp_x),
F.normalize(weight)).cuda(self.device_id[0])),
dim=1)
""" - index: the index of valid identity in label, shape is (d_valid, )
+ torch.where() returns a tuple indicating the index of each dimension
+ Example: index = torch.tensor([1, 3, 4])
"""
index = torch.where(label != -1)[0]
""" - m_hot: one-hot tensor of margin m_2, shape is (d_valid, d_out)
+ torch.tensor.scatter_(dim, index, source) is usually used to generate ont-hot tensor
+ Example: label = torch.tensor([-1, 4, -1, 5, 3, -1])
index = torch.tensor([1, 3, 4]) # d_valid = index.shape[0] = 3
m_hot = torch.tensor([[0, 0, 0, 0, m, 0],
[0, 0, 0, 0, 0, m],
[0, 0, 0, m, 0, 0],
])
"""
m_hot = torch.zeros(index.size()[0], cosine.size()[1], device=cosine.device)
m_hot.scatter_(1, label[index, None], self.m)
""" logit(theta) = cos(theta) - m_2 + k * (theta - a)
- theta = cosine.acos_()
+ Example: m_hot = torch.tensor([[0, 0, 0, 0, m-k(theta[0,4]-a), 0],
[0, 0, 0, 0, 0, m-k(theta[1,5]-a)],
[0, 0, 0, m-k(theta[2,3]-a), 0, 0],
])
"""
a = self.a
k = self.k
m_hot[range(0, index.size()[0]), label[index]] -= k * (cosine[index, label[index]].acos_() - a)
cosine[index] -= m_hot
""" Because we have used F.normalize, we should rescale the logit term by s.
"""
output = cosine * self.s
return output
def __repr__(self):
return self.__class__.__name__ + '(' \
+ 'in_features = ' + str(self.in_features) \
+ ', out_features = ' + str(self.out_features) \
+ ', s = ' + str(self.s) \
+ ', m = ' + str(self.m) \
+ ', a = ' + str(self.a) \
+ ', k = ' + str(self.k) \
+ ')'
class AMArcFace(nn.Module):
r"""Implementation of Adaptive Margin ArcFace:
cos(theta+m-k(theta-a))
When k is 0, AMArcFace degenerates into ArcFace.
Args:
in_features: dimension (d_in) of input feature (B, d_in)
out_features: dimension (d_out) of output feature (B, d_out)
device_id: the ID of GPU where the model will be trained by data parallel (or DP). (not used)
if device_id=None, it will be trained on model parallel (or DDP). (recommend!)
s: norm of input feature
m: margin
a: AM Loss
k: AM Loss
"""
def __init__(self,
in_features: int,
out_features: int,
device_id,
s: float = 64.0,
m: float = 0.5,
a: float = 1.2,
k: float = 0.1,
):
super(AMArcFace, self).__init__()
print('AMArcFace, s=%.1f, m=%.2f, a=%.2f, k=%.2f' % (s, m, a, k))
self.in_features = in_features
self.out_features = out_features
self.device_id = device_id
self.s = s
self.m = m
self.a = a
self.k = k
""" Weight Matrix W (d_out, d_in) """
self.weight = Parameter(torch.FloatTensor(out_features, in_features))
nn.init.xavier_uniform_(self.weight)
def forward(self, embedding, label):
"""
:param embedding: learned face representation
:param label:
- label >= 0: ground truth identity
- label = -1: invalid identity for this GPU (refer to 'PartialFC')
+ Example: label = torch.tensor([-1, 4, -1, 5, 3, -1])
:return:
"""
if self.device_id is None:
""" - embedding: shape is (B, d_in)
- weight: shape is (d_out, d_in)
- cosine: shape is (B, d_out)
+ F.normalize is very important here.
"""
cosine = F.linear(F.normalize(embedding), F.normalize(self.weight)) # y = xA^T + b
else:
raise ValueError('DataParallel is not implemented yet.')
x = input
sub_weights = torch.chunk(self.weight, len(self.device_id), dim=0)
temp_x = x.cuda(self.device_id[0])
weight = sub_weights[0].cuda(self.device_id[0])
cosine = F.linear(F.normalize(temp_x), F.normalize(weight))
for i in range(1, len(self.device_id)):
temp_x = x.cuda(self.device_id[i])
weight = sub_weights[i].cuda(self.device_id[i])
cosine = torch.cat((cosine, F.linear(F.normalize(temp_x),
F.normalize(weight)).cuda(self.device_id[0])),
dim=1)
""" - index: the index of valid identity in label, shape is (d_valid, )
+ torch.where() returns a tuple indicating the index of each dimension
+ Example: index = torch.tensor([1, 3, 4])
"""
index = torch.where(label != -1)[0]
""" - m_hot: one-hot tensor of margin m_2, shape is (d_valid, d_out)
+ torch.tensor.scatter_(dim, index, source) is usually used to generate ont-hot tensor
+ Example: label = torch.tensor([-1, 4, -1, 5, 3, -1])
index = torch.tensor([1, 3, 4]) # d_valid = index.shape[0] = 3
m_hot = torch.tensor([[0, 0, 0, 0, m, 0],
[0, 0, 0, 0, 0, m],
[0, 0, 0, m, 0, 0],
])
"""
m_hot = torch.zeros(index.size()[0], cosine.size()[1], device=cosine.device)
m_hot.scatter_(1, label[index, None], self.m)
""" logit(theta) = cos(theta) - m_2 + k * (theta - a)
- theta = cosine.acos_()
+ Example: m_hot = torch.tensor([[0, 0, 0, 0, m-k(theta[0,4]-a), 0],
[0, 0, 0, 0, 0, m-k(theta[1,5]-a)],
[0, 0, 0, m-k(theta[2,3]-a), 0, 0],
])
"""
a = self.a
k = self.k
m_hot[range(0, index.size()[0]), label[index]] -= k * (cosine[index, label[index]].acos_() - a)
cosine.acos_()
cosine[index] += m_hot
cosine.cos_().mul_(self.s)
return cosine
def __repr__(self):
return self.__class__.__name__ + '(' \
+ 'in_features = ' + str(self.in_features) \
+ ', out_features = ' + str(self.out_features) \
+ ', s = ' + str(self.s) \
+ ', m = ' + str(self.m) \
+ ', a = ' + str(self.a) \
+ ', k = ' + str(self.k) \
+ ')'
if __name__ == '__main__':
cosine = torch.randn(6, 8) / 100
cosine[0][2] = 0.3
cosine[1][4] = 0.4
cosine[2][6] = 0.5
cosine[3][5] = 0.6
cosine[4][3] = 0.7
cosine[5][0] = 0.8
label = torch.tensor([-1, 4, -1, 5, 3, -1])
# layer = AMCosFace(in_features=8,
# out_features=8,
# device_id=None,
# m=0.35, s=1.0,
# a=1.2, k=0.1)
# layer = Softmax(in_features=8,
# out_features=8,
# device_id=None)
layer = AMArcFace(in_features=8,
out_features=8,
device_id=None,
m=0.5, s=1.0,
a=1.2, k=0.1)
logit = layer(cosine, label)
logit = F.softmax(logit, dim=-1)
from utils.vis_tensor import plot_tensor
plot_tensor((cosine, logit),
('embedding', 'logit'),
'AMArc.jpg')