Spaces:
Paused
Paused
import os | |
import pickle | |
import pandas as pd | |
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM | |
import torch | |
import re | |
from tqdm import tqdm | |
from sklearn.metrics.pairwise import cosine_similarity as sklearn_cosine_similarity | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
# Check if MPS is available | |
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu') | |
print("Device:", device) | |
# Load model and tokenizer | |
model_id = "meta-llama/Meta-Llama-3-8B" | |
tokenizer = AutoTokenizer.from_pretrained(model_id) | |
model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.bfloat16).to(device) | |
# Preprocess the dictionary words | |
# We should remove the word "raw" from the dictionary words | |
# Also, if the dictionary word is comma separated, we should remove the comma and reverse the order | |
# So, for example, "Tomato, Roma" should be converted to "Roma Tomato" | |
def preprocess_dictionary_word(text): | |
# lowercase the word and remove leading/trailing whitespaces | |
text = text.strip().lower() | |
# Remove the word "raw" | |
text = text.replace(", raw", "").replace(" raw", "") | |
# Remove the word "nfs" (not further specified) | |
text = text.replace(", nfs", "").replace(" nfs", "") | |
# If the text contains a comma, reverse the order | |
if ',' in text: | |
parts = [part.strip() for part in text.split(',')] | |
text = ' '.join(reversed(parts)) | |
return text | |
def generate_embedding(sentence): | |
inputs = tokenizer(sentence, return_tensors='pt').to(device) | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
embeddings = outputs.logits.mean(dim=1).squeeze().cpu() | |
return embeddings | |
def cosine_similarity(embedding1, embedding2): | |
return torch.nn.functional.cosine_similarity(embedding1, embedding2, dim=0).item() | |
# Load the dictionary | |
csv_file_path = './dictionary/dictionary.csv' | |
df_dictionary = pd.read_csv(csv_file_path) | |
dictionary = df_dictionary['description'].astype(str).tolist() | |
# Load the input words | |
input_file_path = 'raw/food-forward-2023-raw-data - food-forward-2023-raw-data.csv' | |
df_input = pd.read_csv(input_file_path) | |
input_words = df_input['description'].astype(str).tolist() | |
# Check if the embeddings pickle file exists | |
pickle_file_path = './dictionary_embeddings_llama.pkl' | |
if os.path.exists(pickle_file_path): | |
with open(pickle_file_path, 'rb') as f: | |
dictionary_embeddings = pickle.load(f) | |
else: | |
# Generate embeddings for dictionary words | |
dictionary_embeddings = {} | |
for desc in tqdm(dictionary, desc="Generating embeddings for dictionary words"): | |
dictionary_embeddings[desc] = generate_embedding(preprocess_dictionary_word(desc)) | |
# Save the embeddings to a pickle file | |
with open(pickle_file_path, 'wb') as f: | |
pickle.dump(dictionary_embeddings, f) | |
# Find the most similar word in the dictionary for each input word | |
results = [] | |
for input_word in tqdm(input_words, desc="Processing input words"): | |
if not isinstance(input_word, str) or not input_word: | |
continue | |
input_word_clean = re.sub(r'\(.*?\)', '', input_word).strip() | |
print(f"Processing input word: {input_word}\nCleaned: {input_word_clean}") | |
input_embedding = generate_embedding(input_word_clean) | |
similarities = [(desc, cosine_similarity(input_embedding, dict_embedding)) | |
for desc, dict_embedding in dictionary_embeddings.items()] | |
most_similar_word, highest_score = max(similarities, key=lambda x: x[1]) | |
print(f"Most similar word: {most_similar_word}") | |
# Calculate confidence score | |
high_similarities = [(desc, score) for desc, score in similarities if abs(score - highest_score) <= 0.05] | |
high_similarities.sort(key=lambda x: x[1], reverse=True) | |
confidence_score = 1 if len(high_similarities) <= 1 else 0 | |
print(f"Most similar word: {most_similar_word}") | |
similar_words = [] | |
if confidence_score == 0: | |
similar_words = [desc for desc, score in high_similarities[:5]] # Limit to top 5 similar words | |
results.append((input_word, input_word_clean, most_similar_word, highest_score, confidence_score, similar_words)) | |
# Print the results | |
for input_word, input_word_clean, most_similar_word, score, confidence, similar_words in results: | |
print(f"Input word: {input_word}") | |
print(f"Cleaned word: {input_word_clean}") | |
print(f"Most similar word: {most_similar_word}") | |
print(f"Similarity score: {score}") | |
print(f"Confidence score: {confidence}") | |
print(f"Similar words: {similar_words}\n") | |
# Export results to CSV | |
output_file_path = './results/experiment2.csv' | |
df_results = pd.DataFrame(results, columns=['input_word', 'input_word_clean', 'match_word', 'similarity_score', 'confidence_score', 'similar_words']) | |
df_results.to_csv(output_file_path, index=False) | |
# If there are a number of results that are within 0.01 of each other, then we need to consider all of them | |
# cosine_similarity(generate_embedding("Italian Squash"), generate_embedding("Squash, Italian, raw")) | |
# cosine_similarity(generate_embedding("Italian Squash"), generate_embedding("Italian Sausage")) | |
# cosine_similarity(generate_embedding("Tomato - Beefsteak Tomato"), generate_embedding("Beef with tomato-based sauce")) | |
# cosine_similarity(generate_embedding("Tomato - Beefsteak Tomato"), generate_embedding("Tomato, Roma")) | |
# cosine_similarity(generate_embedding("Tomato - Beefsteak Tomato"), generate_embedding("Tomato, raw")) | |
# cosine_similarity(generate_embedding("Eggplant"), generate_embedding("Eggplant dip")) | |
# cosine_similarity(generate_embedding("Eggplant"), generate_embedding("Eggplant,raw")) | |
# cosine_similarity(generate_embedding("Eggplant"), generate_embedding("Eggplant raw")) | |
# cosine_similarity(generate_embedding("Eggplant"), generate_embedding("raw Eggplant")) |