""" Defines the Encoder, Decoder and Sequence to Sequence models used in this projet """ import logging import torch logging.basicConfig(level=logging.DEBUG) class Encoder(torch.nn.Module): def __init__( self, vocab_size: int, embeddings_dim: int, hidden_size: int, dropout: int, device, ): # Une idiosyncrasie de torch, pour qu'iel puisse faire sa magie super().__init__() self.device = device # On ajoute un mot supplémentaire au vocabulaire : # on s'en servira pour les mots inconnus self.embeddings = torch.nn.Embedding(vocab_size, embeddings_dim) self.embeddings.to(device) self.hidden = torch.nn.LSTM( embeddings_dim, hidden_size, dropout=dropout) # Comme on va calculer la log-vraisemblance, # c'est le log-softmax qui nous intéresse self.dropout = torch.nn.Dropout(dropout) self.dropout.to(self.device) # Dropout def forward(self, inpt): inpt.to(self.device) emb = self.dropout(self.embeddings(inpt)).to(self.device) emb = emb.to(self.device) output, (hidden, cell) = self.hidden(emb) output.to(self.device) hidden = hidden.to(self.device) cell = cell.to(self.device) return hidden, cell class Decoder(torch.nn.Module): def __init__( self, vocab_size: int, embeddings_dim: int, hidden_size: int, dropout: int, device, ): # Une idiosyncrasie de torch, pour qu'iel puisse faire sa magie super().__init__() self.device = device # On ajoute un mot supplémentaire au vocabulaire : # on s'en servira pour les mots inconnus self.vocab_size = vocab_size self.embeddings = torch.nn.Embedding(vocab_size, embeddings_dim) self.hidden = torch.nn.LSTM( embeddings_dim, hidden_size, dropout=dropout) self.output = torch.nn.Linear(hidden_size, vocab_size) # Comme on va calculer la log-vraisemblance, # c'est le log-softmax qui nous intéresse self.dropout = torch.nn.Dropout(dropout) def forward(self, input, hidden, cell): input = input.unsqueeze(0) input = input.to(self.device) emb = self.dropout(self.embeddings(input)).to(self.device) emb = emb.to(self.device) output, (hidden, cell) = self.hidden(emb, (hidden, cell)) output = output.to(self.device) out = self.output(output.squeeze(0)).to(self.device) return out, hidden, cell class EncoderDecoderModel(torch.nn.Module): def __init__(self, encoder, decoder, vectoriser, device): # Une idiosyncrasie de torch, pour qu'iel puisse faire sa magie super().__init__() self.encoder = encoder self.decoder = decoder self.vectoriser = vectoriser self.device = device def forward(self, source, num_beams=3, summary_len=0.2): """ :param source: tensor the input text :param num_beams: int the number of outputs to iterate on for beam_search :param summary_len: int length ratio of the summary compared to the text """ # The ratio must be inferior to 1 to allow text compression assert summary_len < 1, f"number lesser than 1 expected, got {summary_len}" # Expected summary length (in words) target_len = int(summary_len * source.shape[0]) # Word Embedding length target_vocab_size = self.decoder.vocab_size # Output of the right format (expected summmary length x word # embedding length) filled with zeros. On each iteration, we # will replace one of the row of this matrix with the choosen # word embedding outputs = torch.zeros(target_len, target_vocab_size) # put the tensors on the device (useless if CPU bus very useful in # case of GPU) outputs.to(self.device) source.to(self.device) # last hidden state of the encoder is used # as the initial hidden state of the decoder # Encode the input text hidden, cell = self.encoder(source) # Encode the first word of the summary input = self.vectoriser.encode("") # put the tensors on the device hidden.to(self.device) cell.to(self.device) input.to(self.device) # BEAM SEARCH # # If you wonder, b stands for better values = None b_outputs = torch.zeros(target_len, target_vocab_size).to(self.device) b_outputs.to(self.device) for i in range(1, target_len): # On va déterminer autant de mot que la taille du texte souhaité # insert input token embedding, previous hidden and previous cell states # receive output tensor (predictions) and new hidden and cell # states. # replace predictions in a tensor holding predictions for each token # logging.debug(f"output : {output}") ####### DÉBUT DU BEAM SEARCH ########## if values is None: # On calcule une première fois les premières probabilité de mot # après output, hidden, cell = self.decoder(input, hidden, cell) output.to(self.device) b_hidden = hidden b_cell = cell # On choisi les k meilleurs scores pour choisir la meilleure probabilité # sur deux itérations ensuite values, indices = output.topk(num_beams, sorted=True) else: # On instancie le dictionnaire qui contiendra les scores pour # chaque possibilité scores = {} # Pour chacune des meilleures valeurs, on va calculer l'output for value, indice in zip(values, indices): indice.to(self.device) # On calcule l'output b_output, b_hidden, b_cell = self.decoder( indice, b_hidden, b_cell) # On empêche le modèle de se répéter d'un mot sur l'autre en mettant # de force la probabilité du mot précédent à 0 b_output[indice] = torch.zeros(1) # On choisit le meilleur résultat pour cette possibilité highest_value = torch.log(b_output).max() # On calcule le score des 2 itérations ensembles score = highest_value * torch.log(value) scores[score] = (b_output, b_hidden, b_cell) # On garde le meilleur score sur LES 2 ITÉRATIONS b_output, b_hidden, b_cell = scores.get(max(scores)) # Et du coup on rempli la place de i-1 à la place de i b_outputs[i - 1] = b_output.to(self.device) # On instancies nos nouvelles valeurs pour la prochaine # itération values, indices = b_output.topk(num_beams, sorted=True) ################################## # outputs[i] = output.to(self.device) # input = output.argmax(dim=-1).to(self.device) # input.to(self.device) # logging.debug(f"{vectoriser.decode(outputs.argmax(dim=-1))}") return b_outputs.to(self.device)