Spaces:
Paused
Paused
import os | |
import psycopg2 | |
import logging | |
from sklearn.preprocessing import LabelEncoder | |
from db.db_utils import get_connection | |
from dotenv import load_dotenv | |
from psycopg2.extras import DictCursor | |
import pandas as pd | |
from datasets import Dataset | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments, EarlyStoppingCallback | |
import torch | |
import transformers | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
transformers.logging.set_verbosity_info() | |
# Load environment variables | |
load_dotenv() | |
# Set device | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
logger.info(f"Using device: {device}") | |
# Fetch data from database | |
def fetch_data(): | |
logger.info("Connecting to the database...") | |
conn = get_connection() | |
cursor = conn.cursor(cursor_factory=DictCursor) | |
try: | |
logger.info("Fetching data from mappings table...") | |
cursor.execute("SELECT input_word, dictionary_word FROM mappings") | |
mappings_data = cursor.fetchall() | |
logger.info(f"Fetched {len(mappings_data)} records from mappings table.") | |
logger.info("Fetching data from dictionary table...") | |
cursor.execute("SELECT description FROM dictionary") | |
dictionary_data = cursor.fetchall() | |
logger.info(f"Fetched {len(dictionary_data)} records from dictionary table.") | |
return mappings_data, dictionary_data | |
finally: | |
cursor.close() | |
conn.close() | |
logger.info("Database connection closed.") | |
# Load data | |
logger.info("Loading data from database...") | |
mappings_data, dictionary_data = fetch_data() | |
# Prepare data for model | |
logger.info("Preparing data for the model...") | |
mappings_df = pd.DataFrame(mappings_data, columns=['word', 'usda_item']) | |
dictionary_df = pd.DataFrame(dictionary_data, columns=['usda_item']) | |
# Combine the mappings and dictionary data | |
data = pd.concat([mappings_df, dictionary_df], ignore_index=True).drop_duplicates() | |
# Ensure the 'word' column is of type str | |
data['word'] = data['word'].astype(str) | |
# Show the first 100 rows of the dataset | |
# logger.info("Showing the first 100 rows of the dataset...") | |
# print(data.head(100)) | |
# Encode the USDA items as labels | |
logger.info("Encoding USDA items as labels...") | |
label_encoder = LabelEncoder() | |
data['label'] = label_encoder.fit_transform(data['usda_item']) | |
# Prepare the dataset | |
logger.info("Creating dataset from the data frame...") | |
dataset = Dataset.from_pandas(data) | |
# Split data into training and validation sets | |
logger.info("Splitting data into training and validation sets...") | |
train_test = dataset.train_test_split(test_size=0.1) | |
train_dataset = train_test['train'] | |
eval_dataset = train_test['test'] | |
# Initialize tokenizer and model | |
model_name = "roberta-base" | |
logger.info(f"Loading tokenizer and model: {model_name}...") | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(label_encoder.classes_)) | |
# Move model to device | |
model.to(device) | |
# Tokenize data | |
logger.info("Tokenizing data...") | |
def preprocess_data(examples): | |
return tokenizer(examples['word'], truncation=True, padding='max_length') | |
train_dataset = train_dataset.map(preprocess_data, batched=True) | |
eval_dataset = eval_dataset.map(preprocess_data, batched=True) | |
# Set format for PyTorch | |
logger.info("Setting dataset format for PyTorch...") | |
train_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label']) | |
eval_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'label']) | |
# Define training arguments | |
logger.info("Defining training arguments...") | |
training_args = TrainingArguments( | |
output_dir="./results", | |
evaluation_strategy="epoch", # or 'steps' to match save_strategy | |
save_strategy="epoch", # Ensure this matches evaluation_strategy | |
logging_dir='./logs', # Directory for storing logs | |
logging_steps=10, # Log every 10 steps | |
learning_rate=2e-5, # Try different values like 1e-5, 3e-5, etc. | |
per_device_train_batch_size=16, # Try different values like 32, 64, etc. | |
per_device_eval_batch_size=16, | |
num_train_epochs=5, # Experiment with 3, 5, 10, etc. | |
weight_decay=0.01, # Try different values like 0.1 | |
warmup_steps=500, # Number of warmup steps for learning rate scheduler | |
save_total_limit=2, # Limit the total amount of checkpoints | |
load_best_model_at_end=True, # Load the best model at the end | |
fp16=True, # Use mixed precision training | |
) | |
# Initialize Trainer | |
logger.info("Initializing Trainer...") | |
trainer = Trainer( | |
model=model, | |
args=training_args, | |
train_dataset=train_dataset, | |
eval_dataset=eval_dataset, | |
callbacks=[EarlyStoppingCallback(early_stopping_patience=3)] # Early stopping | |
) | |
# Train the model | |
logger.info("Starting model training...") | |
logger.info(f"Using device: {device}") | |
trainer.train() | |
logger.info("Model training completed.") | |
# Evaluate the model | |
logger.info("Evaluating the model...") | |
trainer.evaluate() | |
logger.info("Model evaluation completed.") | |
# Save the trained model and tokenizer | |
output_dir = "./trained_model" | |
logger.info(f"Saving model and tokenizer to {output_dir}...") | |
model.save_pretrained(output_dir) | |
tokenizer.save_pretrained(output_dir) | |
logger.info("Model and tokenizer saved.") | |
# Function to predict USDA food item | |
def predict_usda_item(word): | |
logger.info(f"Predicting USDA food item for the word: {word}") | |
inputs = tokenizer(word, return_tensors="pt", truncation=True, padding="max_length").to(device) | |
outputs = model(**inputs) | |
predictions = outputs.logits.argmax(-1) | |
predicted_label = predictions.item() | |
predicted_item = label_encoder.inverse_transform([predicted_label])[0] | |
logger.info(f"Predicted USDA food item: {predicted_item}") | |
return predicted_item | |
# Test the function | |
logger.info("Testing the prediction function...") | |
print(predict_usda_item("Squash")) | |
logger.info("Script completed.") | |