import os import psycopg2 import logging from sklearn.preprocessing import LabelEncoder from db.db_utils import get_connection from dotenv import load_dotenv from psycopg2.extras import DictCursor import pandas as pd from datasets import Dataset from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback import torch import transformers # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) transformers.logging.set_verbosity_info() # Load environment variables load_dotenv() # Set device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f"Using device: {device}") # Fetch data from database def fetch_data(): logger.info("Connecting to the database...") conn = get_connection() cursor = conn.cursor(cursor_factory=DictCursor) try: logger.info("Fetching data from mappings table...") cursor.execute("SELECT input_word, dictionary_word FROM mappings") mappings_data = cursor.fetchall() logger.info(f"Fetched {len(mappings_data)} records from mappings table.") logger.info("Fetching data from dictionary table...") cursor.execute("SELECT description FROM dictionary") dictionary_data = cursor.fetchall() logger.info(f"Fetched {len(dictionary_data)} records from dictionary table.") return mappings_data, dictionary_data finally: cursor.close() conn.close() logger.info("Database connection closed.") # Load data logger.info("Loading data from database...") mappings_data, dictionary_data = fetch_data() # Prepare data for model logger.info("Preparing data for the model...") mappings_df = pd.DataFrame(mappings_data, columns=['word', 'usda_item']) dictionary_df = pd.DataFrame(dictionary_data, columns=['usda_item']) # Combine the mappings and dictionary data data = pd.concat([mappings_df, dictionary_df], ignore_index=True).drop_duplicates() # Ensure the 'word' column is of type str data['word'] = data['word'].astype(str) # Show the first 100 rows of the dataset # logger.info("Showing the first 100 rows of the dataset...") # print(data.head(100)) # Encode the USDA items as labels logger.info("Encoding USDA items as labels...") label_encoder = LabelEncoder() data['label'] = label_encoder.fit_transform(data['usda_item']) # Prepare the dataset logger.info("Creating dataset from the data frame...") dataset = Dataset.from_pandas(data) # Split data into training and validation sets logger.info("Splitting data into training and validation sets...") train_test = dataset.train_test_split(test_size=0.1) train_dataset = train_test['train'] eval_dataset = train_test['test'] # Initialize tokenizer and model model_name = "roberta-base" logger.info(f"Loading tokenizer and model: {model_name}...") tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(label_encoder.classes_)) # Move model to device model.to(device) # Tokenize data logger.info("Tokenizing data...") def preprocess_data(examples): return tokenizer(examples['word'], truncation=True, padding='max_length') train_dataset = train_dataset.map(preprocess_data, batched=True) eval_dataset = eval_dataset.map(preprocess_data, batched=True) # Set format for PyTorch logger.info("Setting dataset format for PyTorch...") train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label']) eval_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label']) # Define training arguments logger.info("Defining training arguments...") training_args = TrainingArguments( output_dir="./results", evaluation_strategy="epoch", # or 'steps' to match save_strategy save_strategy="epoch", # Ensure this matches evaluation_strategy logging_dir='./logs', # Directory for storing logs logging_steps=10, # Log every 10 steps learning_rate=2e-5, # Try different values like 1e-5, 3e-5, etc. per_device_train_batch_size=16, # Try different values like 32, 64, etc. per_device_eval_batch_size=16, num_train_epochs=5, # Experiment with 3, 5, 10, etc. weight_decay=0.01, # Try different values like 0.1 warmup_steps=500, # Number of warmup steps for learning rate scheduler save_total_limit=2, # Limit the total amount of checkpoints load_best_model_at_end=True, # Load the best model at the end fp16=True, # Use mixed precision training ) # Initialize Trainer logger.info("Initializing Trainer...") trainer = Trainer( model=model, args=training_args, train_dataset=train_dataset, eval_dataset=eval_dataset, callbacks=[EarlyStoppingCallback(early_stopping_patience=3)] # Early stopping ) # Train the model logger.info("Starting model training...") logger.info(f"Using device: {device}") trainer.train() logger.info("Model training completed.") # Evaluate the model logger.info("Evaluating the model...") trainer.evaluate() logger.info("Model evaluation completed.") # Save the trained model and tokenizer output_dir = "./trained_model" logger.info(f"Saving model and tokenizer to {output_dir}...") model.save_pretrained(output_dir) tokenizer.save_pretrained(output_dir) logger.info("Model and tokenizer saved.") # Function to predict USDA food item def predict_usda_item(word): logger.info(f"Predicting USDA food item for the word: {word}") inputs = tokenizer(word, return_tensors="pt", truncation=True, padding="max_length").to(device) outputs = model(**inputs) predictions = outputs.logits.argmax(-1) predicted_label = predictions.item() predicted_item = label_encoder.inverse_transform([predicted_label])[0] logger.info(f"Predicted USDA food item: {predicted_item}") return predicted_item # Test the function logger.info("Testing the prediction function...") print(predict_usda_item("Squash")) logger.info("Script completed.")