import lightning.pytorch as pl from transformers import ( AdamW, AutoModel, get_linear_schedule_with_warmup, ) import torch from torch import nn from loss import ( ContrastiveLoss_simcse, ContrastiveLoss_simcse_w, ContrastiveLoss_samp, ContrastiveLoss_samp_w, ) class BERTContrastiveLearning_simcse(pl.LightningModule): def __init__(self, n_batches=None, n_epochs=None, lr=None, **kwargs): super().__init__() ### Parameters self.n_batches = n_batches self.n_epochs = n_epochs self.lr = lr ### Architecture self.bert = AutoModel.from_pretrained( "emilyalsentzer/Bio_ClinicalBERT", return_dict=True ) # Unfreeze encoder self.bert_layer_num = sum(1 for _ in self.bert.named_parameters()) self.num_unfreeze_layer = self.bert_layer_num self.ratio_unfreeze_layer = 0.0 if kwargs: for key, value in kwargs.items(): if key == "unfreeze" and isinstance(value, float): assert ( value >= 0.0 and value <= 1.0 ), "ValueError: value must be a ratio between 0.0 and 1.0" self.ratio_unfreeze_layer = value if self.ratio_unfreeze_layer > 0.0: self.num_unfreeze_layer = int( self.bert_layer_num * self.ratio_unfreeze_layer ) for param in list(self.bert.parameters())[: -self.num_unfreeze_layer]: param.requires_grad = False # Random dropouts self.dropout1 = nn.Dropout(p=0.1) self.dropout2 = nn.Dropout(p=0.1) # Linear projector self.projector = nn.Linear(self.bert.config.hidden_size, 128) print("Model Initialized!") ### Loss self.criterion = ContrastiveLoss_simcse() ### Logs self.train_loss, self.val_loss, self.test_loss = [], [], [] self.training_step_outputs = [] self.validation_step_outputs = [] def configure_optimizers(self): # Optimizer self.trainable_params = [ param for param in self.parameters() if param.requires_grad ] optimizer = AdamW(self.trainable_params, lr=self.lr) # Scheduler # warmup_steps = self.n_batches // 3 # total_steps = self.n_batches * self.n_epochs - warmup_steps # scheduler = get_linear_schedule_with_warmup( # optimizer, warmup_steps, total_steps # ) return [optimizer] def forward(self, input_ids, attention_mask): emb = self.bert(input_ids=input_ids, attention_mask=attention_mask) cls = emb.pooler_output out = self.projector(cls) anchor_out = self.dropout1(out[0:1]) rest_out = self.dropout2(out[1:]) output = torch.cat([anchor_out, rest_out]) return cls, output def training_step(self, batch, batch_idx): label = batch["label"] input_ids = batch["input_ids"] attention_mask = batch["attention_mask"] cls, out = self( input_ids, attention_mask, ) loss = self.criterion(out, label) logs = {"loss": loss} self.training_step_outputs.append(logs) self.log("train_loss", loss, prog_bar=True, logger=True, sync_dist=True) return loss def on_train_epoch_end(self): loss = ( torch.stack([x["loss"] for x in self.training_step_outputs]) .mean() .detach() .cpu() .numpy() ) self.train_loss.append(loss) print("train_epoch:", self.current_epoch, "avg_loss:", loss) self.training_step_outputs.clear() def validation_step(self, batch, batch_idx): label = batch["label"] input_ids = batch["input_ids"] attention_mask = batch["attention_mask"] cls, out = self( input_ids, attention_mask, ) loss = self.criterion(out, label) logs = {"loss": loss} self.validation_step_outputs.append(logs) self.log("validation_loss", loss, prog_bar=True, logger=True, sync_dist=True) return loss def on_validation_epoch_end(self): loss = ( torch.stack([x["loss"] for x in self.validation_step_outputs]) .mean() .detach() .cpu() .numpy() ) self.val_loss.append(loss) print("val_epoch:", self.current_epoch, "avg_loss:", loss) self.validation_step_outputs.clear() class BERTContrastiveLearning_simcse_w(pl.LightningModule): def __init__(self, n_batches=None, n_epochs=None, lr=None, **kwargs): super().__init__() ### Parameters self.n_batches = n_batches self.n_epochs = n_epochs self.lr = lr ### Architecture self.bert = AutoModel.from_pretrained( "emilyalsentzer/Bio_ClinicalBERT", return_dict=True ) # Unfreeze encoder self.bert_layer_num = sum(1 for _ in self.bert.named_parameters()) self.num_unfreeze_layer = self.bert_layer_num self.ratio_unfreeze_layer = 0.0 if kwargs: for key, value in kwargs.items(): if key == "unfreeze" and isinstance(value, float): assert ( value >= 0.0 and value <= 1.0 ), "ValueError: value must be a ratio between 0.0 and 1.0" self.ratio_unfreeze_layer = value if self.ratio_unfreeze_layer > 0.0: self.num_unfreeze_layer = int( self.bert_layer_num * self.ratio_unfreeze_layer ) for param in list(self.bert.parameters())[: -self.num_unfreeze_layer]: param.requires_grad = False # Random dropouts self.dropout1 = nn.Dropout(p=0.1) self.dropout2 = nn.Dropout(p=0.1) # Linear projector self.projector = nn.Linear(self.bert.config.hidden_size, 128) print("Model Initialized!") ### Loss self.criterion = ContrastiveLoss_simcse_w() ### Logs self.train_loss, self.val_loss, self.test_loss = [], [], [] self.training_step_outputs = [] self.validation_step_outputs = [] def configure_optimizers(self): # Optimizer self.trainable_params = [ param for param in self.parameters() if param.requires_grad ] optimizer = AdamW(self.trainable_params, lr=self.lr) # Scheduler # warmup_steps = self.n_batches // 3 # total_steps = self.n_batches * self.n_epochs - warmup_steps # scheduler = get_linear_schedule_with_warmup( # optimizer, warmup_steps, total_steps # ) return [optimizer] def forward(self, input_ids, attention_mask): emb = self.bert(input_ids=input_ids, attention_mask=attention_mask) cls = emb.pooler_output out = self.projector(cls) anchor_out = self.dropout1(out[0:1]) rest_out = self.dropout2(out[1:]) output = torch.cat([anchor_out, rest_out]) return cls, output def training_step(self, batch, batch_idx): label = batch["label"] input_ids = batch["input_ids"] attention_mask = batch["attention_mask"] score = batch["score"] cls, out = self( input_ids, attention_mask, ) loss = self.criterion(out, label, score) logs = {"loss": loss} self.training_step_outputs.append(logs) self.log("train_loss", loss, prog_bar=True, logger=True, sync_dist=True) return loss def on_train_epoch_end(self): loss = ( torch.stack([x["loss"] for x in self.training_step_outputs]) .mean() .detach() .cpu() .numpy() ) self.train_loss.append(loss) print("train_epoch:", self.current_epoch, "avg_loss:", loss) self.training_step_outputs.clear() def validation_step(self, batch, batch_idx): label = batch["label"] input_ids = batch["input_ids"] attention_mask = batch["attention_mask"] score = batch["score"] cls, out = self( input_ids, attention_mask, ) loss = self.criterion(out, label, score) logs = {"loss": loss} self.validation_step_outputs.append(logs) self.log("validation_loss", loss, prog_bar=True, logger=True, sync_dist=True) return loss def on_validation_epoch_end(self): loss = ( torch.stack([x["loss"] for x in self.validation_step_outputs]) .mean() .detach() .cpu() .numpy() ) self.val_loss.append(loss) print("val_epoch:", self.current_epoch, "avg_loss:", loss) self.validation_step_outputs.clear() class BERTContrastiveLearning_samp(pl.LightningModule): def __init__(self, n_batches=None, n_epochs=None, lr=None, **kwargs): super().__init__() ### Parameters self.n_batches = n_batches self.n_epochs = n_epochs self.lr = lr ### Architecture self.bert = AutoModel.from_pretrained( "emilyalsentzer/Bio_ClinicalBERT", return_dict=True ) # Unfreeze encoder self.bert_layer_num = sum(1 for _ in self.bert.named_parameters()) self.num_unfreeze_layer = self.bert_layer_num self.ratio_unfreeze_layer = 0.0 if kwargs: for key, value in kwargs.items(): if key == "unfreeze" and isinstance(value, float): assert ( value >= 0.0 and value <= 1.0 ), "ValueError: value must be a ratio between 0.0 and 1.0" self.ratio_unfreeze_layer = value if self.ratio_unfreeze_layer > 0.0: self.num_unfreeze_layer = int( self.bert_layer_num * self.ratio_unfreeze_layer ) for param in list(self.bert.parameters())[: -self.num_unfreeze_layer]: param.requires_grad = False # Linear projector self.projector = nn.Linear(self.bert.config.hidden_size, 128) print("Model Initialized!") ### Loss self.criterion = ContrastiveLoss_samp() ### Logs self.train_loss, self.val_loss, self.test_loss = [], [], [] self.training_step_outputs = [] self.validation_step_outputs = [] def configure_optimizers(self): # Optimizer self.trainable_params = [ param for param in self.parameters() if param.requires_grad ] optimizer = AdamW(self.trainable_params, lr=self.lr) # Scheduler # warmup_steps = self.n_batches // 3 # total_steps = self.n_batches * self.n_epochs - warmup_steps # scheduler = get_linear_schedule_with_warmup( # optimizer, warmup_steps, total_steps # ) return [optimizer] def forward(self, input_ids, attention_mask): emb = self.bert(input_ids=input_ids, attention_mask=attention_mask) cls = emb.pooler_output out = self.projector(cls) return cls, out def training_step(self, batch, batch_idx): label = batch["label"] input_ids = batch["input_ids"] attention_mask = batch["attention_mask"] cls, out = self( input_ids, attention_mask, ) loss = self.criterion(out, label) logs = {"loss": loss} self.training_step_outputs.append(logs) self.log("train_loss", loss, prog_bar=True, logger=True, sync_dist=True) return loss def on_train_epoch_end(self): loss = ( torch.stack([x["loss"] for x in self.training_step_outputs]) .mean() .detach() .cpu() .numpy() ) self.train_loss.append(loss) print("train_epoch:", self.current_epoch, "avg_loss:", loss) self.training_step_outputs.clear() def validation_step(self, batch, batch_idx): label = batch["label"] input_ids = batch["input_ids"] attention_mask = batch["attention_mask"] cls, out = self( input_ids, attention_mask, ) loss = self.criterion(out, label) logs = {"loss": loss} self.validation_step_outputs.append(logs) self.log("validation_loss", loss, prog_bar=True, logger=True, sync_dist=True) return loss def on_validation_epoch_end(self): loss = ( torch.stack([x["loss"] for x in self.validation_step_outputs]) .mean() .detach() .cpu() .numpy() ) self.val_loss.append(loss) print("val_epoch:", self.current_epoch, "avg_loss:", loss) self.validation_step_outputs.clear() class BERTContrastiveLearning_samp_w(pl.LightningModule): def __init__(self, n_batches=None, n_epochs=None, lr=None, **kwargs): super().__init__() ### Parameters self.n_batches = n_batches self.n_epochs = n_epochs self.lr = lr ### Architecture self.bert = AutoModel.from_pretrained( "emilyalsentzer/Bio_ClinicalBERT", return_dict=True ) # Unfreeze encoder self.bert_layer_num = sum(1 for _ in self.bert.named_parameters()) self.num_unfreeze_layer = self.bert_layer_num self.ratio_unfreeze_layer = 0.0 if kwargs: for key, value in kwargs.items(): if key == "unfreeze" and isinstance(value, float): assert ( value >= 0.0 and value <= 1.0 ), "ValueError: value must be a ratio between 0.0 and 1.0" self.ratio_unfreeze_layer = value if self.ratio_unfreeze_layer > 0.0: self.num_unfreeze_layer = int( self.bert_layer_num * self.ratio_unfreeze_layer ) for param in list(self.bert.parameters())[: -self.num_unfreeze_layer]: param.requires_grad = False # Linear projector self.projector = nn.Linear(self.bert.config.hidden_size, 128) print("Model Initialized!") ### Loss self.criterion = ContrastiveLoss_samp_w() ### Logs self.train_loss, self.val_loss, self.test_loss = [], [], [] self.training_step_outputs = [] self.validation_step_outputs = [] def configure_optimizers(self): # Optimizer self.trainable_params = [ param for param in self.parameters() if param.requires_grad ] optimizer = AdamW(self.trainable_params, lr=self.lr) # Scheduler # warmup_steps = self.n_batches // 3 # total_steps = self.n_batches * self.n_epochs - warmup_steps # scheduler = get_linear_schedule_with_warmup( # optimizer, warmup_steps, total_steps # ) return [optimizer] def forward(self, input_ids, attention_mask): emb = self.bert(input_ids=input_ids, attention_mask=attention_mask) cls = emb.pooler_output out = self.projector(cls) return cls, out def training_step(self, batch, batch_idx): label = batch["label"] input_ids = batch["input_ids"] attention_mask = batch["attention_mask"] score = batch["score"] cls, out = self( input_ids, attention_mask, ) loss = self.criterion(out, label, score) logs = {"loss": loss} self.training_step_outputs.append(logs) self.log("train_loss", loss, prog_bar=True, logger=True, sync_dist=True) return loss def on_train_epoch_end(self): loss = ( torch.stack([x["loss"] for x in self.training_step_outputs]) .mean() .detach() .cpu() .numpy() ) self.train_loss.append(loss) print("train_epoch:", self.current_epoch, "avg_loss:", loss) self.training_step_outputs.clear() def validation_step(self, batch, batch_idx): label = batch["label"] input_ids = batch["input_ids"] attention_mask = batch["attention_mask"] score = batch["score"] cls, out = self( input_ids, attention_mask, ) loss = self.criterion(out, label, score) logs = {"loss": loss} self.validation_step_outputs.append(logs) self.log("validation_loss", loss, prog_bar=True, logger=True, sync_dist=True) return loss def on_validation_epoch_end(self): loss = ( torch.stack([x["loss"] for x in self.validation_step_outputs]) .mean() .detach() .cpu() .numpy() ) self.val_loss.append(loss) print("val_epoch:", self.current_epoch, "avg_loss:", loss) self.validation_step_outputs.clear()