nutrition / app.py
kerols77's picture
Update app.py
ce8e104 verified
from flask import Flask, request, jsonify
import pandas as pd
from transformers import pipeline
import os
import re
import json
import requests
import random
from difflib import get_close_matches
from textblob import TextBlob
from nltk.tokenize import word_tokenize, sent_tokenize
import nltk
import ast
from urllib.parse import quote
def force_download_nltk():
nltk_data_dir = os.environ.get("NLTK_DATA", "/app/nltk_data")
transformers_cache_dir = os.environ.get("TRANSFORMERS_CACHE", "/app/transformers_cache")
os.makedirs(nltk_data_dir, exist_ok=True)
os.makedirs(transformers_cache_dir, exist_ok=True)
os.environ["NLTK_DATA"] = nltk_data_dir
os.environ["TRANSFORMERS_CACHE"] = transformers_cache_dir
needed_packages = ["punkt"]
for package in needed_packages:
try:
nltk.data.find(f"tokenizers/{package}")
except LookupError:
print(f"Downloading NLTK package: {package} to {nltk_data_dir}")
nltk.download(package, download_dir=nltk_data_dir)
force_download_nltk()
domain_words = {
"carb", "carbs", "carbo", "carbohydrate", "carbohydrates",
"fat", "fats", "protein", "proteins", "fiber", "cholesterol",
"calcium", "iron", "magnesium", "potassium", "sodium", "vitamin", "vitamin c",
"calories", "calorie"
}
def smart_correct_spelling(text, domain_set):
tokens = word_tokenize(text)
corrected_tokens = []
for token in tokens:
if token.isalpha() and token.lower() not in domain_set:
corrected_word = str(TextBlob(token).correct())
corrected_tokens.append(corrected_word)
else:
corrected_tokens.append(token)
return " ".join(corrected_tokens)
qa_pipeline = pipeline("question-answering", model="distilbert-base-cased-distilled-squad")
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
def summarize_input(text):
summary = summarizer(text, max_length=130, min_length=30, do_sample=False)
return summary[0]['summary_text']
df = pd.read_csv("Datasets/Final used Datasets/food_dataset_with_nutriition.csv")
print(f"Starting with {len(df)} recipes in dataset")
nutrition_columns = ["calories", "Total fats", "Carbohydrate", "Fiber", "Protein",
"Cholesterol", "Calcium", "Iron", "Magnesium", "Potassium", "Sodium", "Vitamin C"]
for col in nutrition_columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
disease_df = pd.read_csv("Datasets/Final used Datasets/disease_food_nutrition_mapping.csv")
disease_df["Disease"] = disease_df["Disease"].str.lower()
try:
with open("docs/common_misspellings.json", "r") as file:
common_misspellings = json.load(file)
except FileNotFoundError:
common_misspellings = {"suger": "sugar", "milc": "milk"}
with open("docs/common_misspellings.json", "w") as file:
json.dump(common_misspellings, file, indent=2)
try:
with open("docs/common_ingredients.json", "r") as file:
common_ingredients = json.load(file)
except FileNotFoundError:
common_ingredients = ["sugar", "salt", "flour", "milk", "eggs", "butter", "oil", "water"]
with open("docs/common_ingredients.json", "w") as file:
json.dump(common_ingredients, file, indent=2)
def create_ingredient_dictionary(dataframe, common_ingredients_list):
all_ingredients = []
all_ingredients.extend(common_ingredients_list)
all_ingredients.extend(set(common_misspellings.values()))
for ingredients_list in dataframe['ingredients']:
parts = re.split(r',|\sand\s|\sor\s|;', str(ingredients_list))
for part in parts:
clean_part = re.sub(
r'\d+[\s/]*(oz|ounce|cup|tbsp|tsp|tablespoon|teaspoon|pound|lb|g|ml|l|pinch|dash)\b\.?',
'', part)
clean_part = re.sub(
r'\b(fresh|freshly|chopped|minced|diced|sliced|grated|ground|powdered|crushed|toasted|roasted)\b',
'', clean_part)
clean_part = re.sub(r'\(.*?\)', '', clean_part)
clean_part = clean_part.strip()
subparts = re.split(r'\sand\s|\sor\s', clean_part)
for subpart in subparts:
cleaned_subpart = subpart.strip().lower()
if cleaned_subpart and len(cleaned_subpart) > 2:
all_ingredients.append(cleaned_subpart)
unique_ingredients = list(set(all_ingredients))
unique_ingredients.sort(key=len, reverse=True)
return unique_ingredients
food_dictionary = create_ingredient_dictionary(df, common_ingredients)
def identify_food_ingredient(text, ingredient_dict, misspellings_dict):
cleaned = re.sub(
r'\d+[\s/]*(oz|ounce|cup|tbsp|tsp|tablespoon|teaspoon|pound|lb|g|ml|l|pinch|dash)\b\.?',
'', text)
cleaned = re.sub(
r'\b(fresh|freshly|chopped|minced|diced|sliced|grated|ground|powdered|crushed|toasted|roasted)\b',
'', cleaned)
cleaned = re.sub(r'\(.*?\)', '', cleaned)
cleaned = cleaned.strip().lower()
if cleaned in misspellings_dict:
return misspellings_dict[cleaned]
if cleaned in ingredient_dict:
return cleaned
words = cleaned.split()
for word in words:
if word in ingredient_dict:
return word
if word in misspellings_dict:
return misspellings_dict[word]
close_matches = get_close_matches(cleaned, ingredient_dict, n=3, cutoff=0.8)
if close_matches:
return close_matches[0]
for dict_ingredient in ingredient_dict:
if dict_ingredient in cleaned:
return dict_ingredient
close_matches = get_close_matches(cleaned, ingredient_dict, n=3, cutoff=0.6)
if close_matches:
return close_matches[0]
return None
def correct_food_ingredient(ingredient, ingredient_dict, misspellings_dict):
cleaned = re.sub(
r'\d+[\s/]*(oz|ounce|cup|tbsp|tsp|tablespoon|teaspoon|pound|lb|g|ml|l|pinch|dash)\b\.?',
'', ingredient)
cleaned = re.sub(
r'\b(fresh|freshly|chopped|minced|diced|sliced|grated|ground|powdered|crushed|toasted|roasted)\b',
'', cleaned)
cleaned = re.sub(r'\(.*?\)', '', cleaned)
cleaned = cleaned.strip().lower()
if cleaned in misspellings_dict:
return misspellings_dict[cleaned]
if cleaned in ingredient_dict:
return cleaned
close_matches = get_close_matches(cleaned, ingredient_dict, n=3, cutoff=0.8)
if close_matches:
return close_matches[0]
close_matches = get_close_matches(cleaned, ingredient_dict, n=3, cutoff=0.6)
if close_matches:
return close_matches[0]
for dict_ingredient in ingredient_dict:
if cleaned in dict_ingredient or dict_ingredient in cleaned:
return dict_ingredient
return cleaned
def add_misspelling(misspelled, correct):
try:
with open("docs/common_misspellings.json", "r") as file:
misspellings = json.load(file)
misspellings[misspelled.lower()] = correct.lower()
with open("docs/common_misspellings.json", "w") as file:
json.dump(misspellings, file, indent=2, sort_keys=True)
return True
except Exception:
return False
def extract_unwanted_ingredients(input_text):
question = "What ingredients should be excluded?"
result = qa_pipeline(question=question, context=input_text)
raw_answer = result['answer']
potential_ingredients = []
for part in raw_answer.split(','):
for subpart in part.split(' and '):
for item in subpart.split(' or '):
clean_item = item.strip()
if clean_item:
potential_ingredients.append(clean_item)
valid_ingredients = []
for item in potential_ingredients:
corrected = identify_food_ingredient(item, food_dictionary, common_misspellings)
if corrected:
valid_ingredients.append(corrected)
return valid_ingredients if valid_ingredients else [raw_answer]
def classify_clause(clause):
candidate_labels = ["include", "exclude"]
result = classifier(clause, candidate_labels, hypothesis_template="This clause means the ingredient should be {}.")
return result["labels"][0].lower()
def extract_ingredients_from_clause(clause, ingredient_dict, misspellings_dict):
found = []
for ingredient in ingredient_dict:
if ingredient.lower() in clause.lower():
normalized = identify_food_ingredient(ingredient, ingredient_dict, misspellings_dict)
if normalized:
found.append(normalized)
return list(set(found))
def classify_ingredients_in_query(query, ingredient_dict, misspellings_dict):
include_ingredients = []
exclude_ingredients = []
nutrition_terms = ['calories', 'calorie', 'fat', 'fats', 'carb', 'carbs', 'protein',
'fiber', 'cholesterol', 'calcium', 'iron', 'magnesium',
'potassium', 'sodium', 'vitamin']
modified_query = query
for term in nutrition_terms:
pattern = re.compile(r'(low|high)\s+' + term, re.IGNORECASE)
modified_query = pattern.sub('', modified_query)
clauses = re.split(r'\bbut\b|,', modified_query, flags=re.IGNORECASE)
for clause in clauses:
clause = clause.strip()
if not clause:
continue
intent = classify_clause(clause)
ingredients_found = extract_ingredients_from_clause(clause, ingredient_dict, misspellings_dict)
if intent == "include":
include_ingredients.extend(ingredients_found)
elif intent == "exclude":
exclude_ingredients.extend(ingredients_found)
return list(set(include_ingredients)), list(set(exclude_ingredients))
def extract_nutrition_from_clause(clause, nutrition_dict, misspellings_dict):
found = []
clause_lower = clause.lower()
sorted_terms = sorted(nutrition_dict, key=lambda x: -len(x))
for term in sorted_terms:
pattern = r'\b' + re.escape(term.lower()) + r'\b'
if re.search(pattern, clause_lower):
found.append(term.lower())
return list(set(found))
def classify_nutrition_in_query(query, nutrition_dict, misspellings_dict):
include_nutrition = []
exclude_nutrition = []
clauses = re.split(r'\band\b|,|but', query, flags=re.IGNORECASE)
overall_intent = "exclude" if re.search(r'sensitivity|allergy|exclude', query, flags=re.IGNORECASE) else "include"
for clause in clauses:
clause = clause.strip()
if not clause:
continue
intent = "include" if "i want" in clause.lower() else overall_intent
numbers = re.findall(r'\d+(?:\.\d+)?', clause)
threshold = float(numbers[0]) if numbers else None
if re.search(r'\b(high|over|above|more than|exceeding)\b', clause, flags=re.IGNORECASE):
modifier = "high"
elif re.search(r'\b(low|under|less than|below)\b', clause, flags=re.IGNORECASE):
modifier = "low"
else:
modifier = "high" if intent == "exclude" else "low"
terms_found = extract_nutrition_from_clause(clause, nutrition_dict, misspellings_dict)
for term in terms_found:
norm_term = nutrition_terms_dictionary.get(term, term)
condition = (modifier, norm_term, threshold) if threshold is not None else (modifier, norm_term)
if intent == "include":
include_nutrition.append(condition)
elif intent == "exclude":
exclude_nutrition.append(condition)
return list(set(include_nutrition)), list(set(exclude_nutrition))
nutrition_terms_dictionary = {
"calorie": "calories",
"calories": "calories",
"fat": "Total fats",
"fats": "Total fats",
"total fat": "Total fats",
"total fats": "Total fats",
"carb": "Carbohydrate",
"carbs": "Carbohydrate",
"carbo": "Carbohydrate",
"carbohydrate": "Carbohydrate",
"carbohydrates": "Carbohydrate",
"fiber": "Fiber",
"protein": "Protein",
"proteins": "Protein",
"cholesterol": "Cholesterol",
"calcium": "Calcium",
"iron": "Iron",
"magnesium": "Magnesium",
"potassium": "Potassium",
"sodium": "Sodium",
"vitamin c": "Vitamin C"
}
fixed_thresholds = {
"calories": 700,
"Total fats": 60,
"Carbohydrate": 120,
"Fiber": 10,
"Protein": 30,
"Cholesterol": 100,
"Calcium": 300,
"Iron": 5,
"Magnesium": 100,
"Potassium": 300,
"Sodium": 400,
"Vitamin C": 50
}
def filter_by_nutrition_condition(df, condition):
if isinstance(condition, tuple):
if len(condition) == 3:
direction, nutrition_term, threshold = condition
elif len(condition) == 2:
direction, nutrition_term = condition
threshold = fixed_thresholds.get(nutrition_term)
else:
return df
column = nutrition_term
if column is None or threshold is None:
return df
if direction == "low":
return df[df[column] < threshold]
elif direction == "high":
return df[df[column] >= threshold]
return df
def score_recipe_ingredients(recipe_ingredients, include_list):
recipe_lower = recipe_ingredients.lower()
match_count = sum(
1 for ingredient in include_list
if ingredient.lower() in recipe_lower
)
return match_count
def filter_and_rank_recipes(df, include_list, exclude_list, include_nutrition, exclude_nutrition):
filtered_df = df.copy()
print(f"Starting with {len(filtered_df)} recipes for filtering")
if include_list:
filtered_df['ingredient_match_count'] = filtered_df['ingredients'].apply(
lambda x: score_recipe_ingredients(str(x), include_list)
)
filtered_df = filtered_df[filtered_df['ingredient_match_count'] >= 2]
print(f"After requiring at least 2 included ingredients: {len(filtered_df)} recipes remain")
for ingredient in exclude_list:
before_count = len(filtered_df)
filtered_df = filtered_df[
~filtered_df['ingredients']
.str.lower()
.fillna('')
.str.contains(re.escape(ingredient.lower()))
]
print(f"After excluding '{ingredient}': {len(filtered_df)} recipes remain (removed {before_count - len(filtered_df)})")
for i, cond in enumerate(include_nutrition):
before_count = len(filtered_df)
filtered_df = filter_by_nutrition_condition(filtered_df, cond)
after_count = len(filtered_df)
print(f"After applying nutrition condition {i+1} (include) '{cond}': {after_count} recipes remain (removed {before_count - after_count})")
for i, cond in enumerate(exclude_nutrition):
before_count = len(filtered_df)
temp_df = filter_by_nutrition_condition(df.copy(), cond)
filtered_df = filtered_df[~filtered_df.index.isin(temp_df.index)]
after_count = len(filtered_df)
print(f"After applying nutrition condition {i+1} (exclude) '{cond}': {after_count} recipes remain (removed {before_count - after_count})")
if filtered_df.empty:
print("\nNo recipes match all criteria. Implementing fallback approach...")
fallback_df = df.copy()
if include_list:
fallback_df['ingredient_match_count'] = fallback_df['ingredients'].apply(
lambda x: score_recipe_ingredients(str(x), include_list)
)
fallback_df = fallback_df[fallback_df['ingredient_match_count'] >= 1]
else:
fallback_df['ingredient_match_count'] = 1
for ingredient in exclude_list:
fallback_df = fallback_df[
~fallback_df['ingredients']
.str.lower()
.fillna('')
.str.contains(re.escape(ingredient.lower()))
]
if fallback_df.empty:
fallback_df = df.sample(min(5, len(df)))
fallback_df['ingredient_match_count'] = 0
print("No matches found. Showing random recipes as a fallback")
filtered_df = fallback_df
if 'ingredient_match_count' not in filtered_df.columns:
filtered_df['ingredient_match_count'] = 0
filtered_df = filtered_df.sort_values('ingredient_match_count', ascending=False)
return filtered_df
def get_disease_recommendations(user_text, disease_mapping_df):
user_text_lower = user_text.lower()
matches = disease_mapping_df[disease_mapping_df['Disease'].apply(lambda d: d in user_text_lower)]
if not matches.empty:
disease_info = matches.iloc[0]
def safe_parse_list(x):
if isinstance(x, str):
try:
return ast.literal_eval(x)
except:
return [item.strip() for item in x.split(',') if item.strip()]
return x
best_foods = safe_parse_list(disease_info.get("Best_Foods", "[]"))
worst_foods = safe_parse_list(disease_info.get("Worst_Foods", "[]"))
best_nutrition = safe_parse_list(disease_info.get("Best_Nutrition", "[]"))
worst_nutrition = safe_parse_list(disease_info.get("Worst_Nutrition", "[]"))
recommendations = {
"Disease": disease_info['Disease'],
"Best_Foods": best_foods,
"Worst_Foods": worst_foods,
"Best_Nutrition": best_nutrition,
"Worst_Nutrition": worst_nutrition
}
return recommendations
return None
def get_recipe_output(recipe_row):
recipe_name = recipe_row['title']
ner_info = recipe_row.get('NER', '')
try:
ner_list = json.loads(ner_info)
ner_str = ", ".join(ner_list)
except Exception:
ner_str = ner_info
nutrition_details = {col: float(recipe_row[col]) for col in nutrition_columns}
result = {
"Meal name": recipe_name,
"NER": ner_str,
"Nutrition details": nutrition_details
}
print(f"Meal name: {recipe_name}")
print(f"NER: {ner_str}")
print(f"Nutrition details: {nutrition_details}")
return result
def process_long_query(query):
if len(query.split()) > 500:
print("Long input detected. Summarizing...")
query = summarize_input(query)
print(f"Processed Query: \"{query}\"")
corrected = smart_correct_spelling(query, domain_words)
sentences = sent_tokenize(corrected)
aggregated_include = []
aggregated_exclude = []
aggregated_include_nutrition = []
aggregated_exclude_nutrition = []
for sentence in sentences:
inc, exc = classify_ingredients_in_query(sentence, food_dictionary, common_misspellings)
aggregated_include.extend(inc)
aggregated_exclude.extend(exc)
inc_nut, exc_nut = classify_nutrition_in_query(sentence, list(nutrition_terms_dictionary.keys()), common_misspellings)
aggregated_include_nutrition.extend(inc_nut)
aggregated_exclude_nutrition.extend(exc_nut)
return corrected, list(set(aggregated_include)), list(set(aggregated_exclude)), \
list(set(aggregated_include_nutrition)), list(set(aggregated_exclude_nutrition))
def send_to_api(meal_data, parent_id):
try:
api_endpoint = "http://54.242.19.19:3000/api/ResturantMenu/add"
meal_id = random.randint(1000, 9999)
meal_name = meal_data.get("Meal name", "No meal name available")
ner_info = meal_data.get("NER", "")
images_public = "https://kero.beshoy.me/recipe_images/"
image_path = True
image_url = ""
if image_path:
try:
image_url = images_public + quote(meal_name, safe="") + ".jpg"
print(f"Successfully uploaded image to the server for {meal_name}: {image_url}")
except Exception as cl_err:
print(f"Error uploading to the server: {cl_err}")
if not image_url:
image_url = "https://picsum.photos/200"
payload = {
"id": str(meal_id),
"name": meal_name,
"description": ner_info,
"photo": image_url,
"parentId": parent_id
}
print(f"\nSending payload to API: {payload}")
response = requests.post(api_endpoint, json=payload)
print(f"API Response for meal {meal_name}: {response.status_code}")
try:
return response.json()
except Exception:
return {"error": response.text}
except Exception as e:
print(f"Error sending meal to API: {e}")
return {"error": str(e)}
app = Flask(__name__)
@app.route('/process', methods=['POST'])
def process():
try:
input_text = ""
parent_id = ""
if request.is_json:
data = request.json
input_text = data.get("description", "")
parent_id = data.get("parentId", "")
if not input_text:
return jsonify({"error": "Missing description in request"}), 400
if not parent_id:
return jsonify({"error": "Missing parentId in request"}), 400
else:
input_text_json = request.form
input_text = input_text_json.get("description", "")
parent_id = input_text_json.get("parentId", "")
if not input_text:
return jsonify({"error": "Missing description in request"}), 400
if not parent_id:
return jsonify({"error": "Missing parentId in request"}), 400
print("WARNING: Using raw data format. Please consider using JSON format.")
raw_input_text = input_text
processed_input, user_include, user_exclude, user_include_nutrition, user_exclude_nutrition = process_long_query(raw_input_text)
include_list, exclude_list = [], []
include_nutrition, exclude_nutrition = [], []
disease_recs = get_disease_recommendations(processed_input, disease_df)
if disease_recs:
print("\nDisease-related Recommendations Detected:")
print(f"Disease: {disease_recs['Disease']}")
print(f"Best Foods: {disease_recs['Best_Foods']}")
print(f"Worst Foods: {disease_recs['Worst_Foods']}")
print(f"Best Nutrition: {disease_recs['Best_Nutrition']}")
print(f"Worst Nutrition: {disease_recs['Worst_Nutrition']}")
include_list.extend(disease_recs["Best_Foods"])
exclude_list.extend(disease_recs["Worst_Foods"])
def parse_nutrition_condition(nutrition_phrase):
parts = nutrition_phrase.strip().split()
if len(parts) == 2:
direction = parts[0].lower()
nutrient = parts[1].lower()
mapped_nutrient = nutrition_terms_dictionary.get(nutrient, nutrient)
return (direction, mapped_nutrient)
return None
for bn in disease_recs["Best_Nutrition"]:
cond = parse_nutrition_condition(bn)
if cond:
include_nutrition.append(cond)
for wn in disease_recs["Worst_Nutrition"]:
cond = parse_nutrition_condition(wn)
if cond:
exclude_nutrition.append(cond)
include_list.extend(user_include)
exclude_list.extend(user_exclude)
include_nutrition.extend(user_include_nutrition)
exclude_nutrition.extend(user_exclude_nutrition)
include_list = list(set(include_list))
exclude_list = list(set(exclude_list))
include_nutrition = list(set(include_nutrition))
exclude_nutrition = list(set(exclude_nutrition))
print("\nFinal Lists After Combining Disease + User Query:")
print(f"Ingredients to include: {include_list}")
print(f"Ingredients to exclude: {exclude_list}")
print(f"Nutrition conditions to include: {include_nutrition}")
print(f"Nutrition conditions to exclude: {exclude_nutrition}")
corrected_include = [correct_food_ingredient(ingredient, food_dictionary, common_misspellings) for ingredient in include_list]
corrected_exclude = [correct_food_ingredient(ingredient, food_dictionary, common_misspellings) for ingredient in exclude_list]
include_list = list(set(corrected_include))
exclude_list = list(set(corrected_exclude))
filtered_df = filter_and_rank_recipes(
df,
include_list,
exclude_list,
include_nutrition,
exclude_nutrition
)
final_output = {}
api_responses = []
if not filtered_df.empty:
filtered_df = filtered_df.sample(frac=1)
meal_count = min(6, len(filtered_df))
for i in range(meal_count):
if i == 0:
print("\nRecommended Meal:")
meal_data = get_recipe_output(filtered_df.iloc[i])
final_output["Recommended Meal"] = meal_data
else:
print(f"\nOption {i}:")
meal_data = get_recipe_output(filtered_df.iloc[i])
final_output[f"Option {i}"] = meal_data
api_response = send_to_api(meal_data, parent_id)
api_responses.append(api_response)
else:
error_message = f"No recipes found that match your criteria.\nIngredients to include: {', '.join(include_list)}\nIngredients to exclude: {', '.join(exclude_list)}\nNutrition Include: {', '.join(str(cond) for cond in include_nutrition)}\nNutrition Exclude: {', '.join(str(cond) for cond in exclude_nutrition)}."
print(error_message)
final_output["Message"] = error_message
return jsonify({"error": error_message}), 404
return jsonify({
"original_response": final_output,
"api_responses": api_responses,
"message": f"Successfully processed {len(api_responses)} meals"
})
except Exception as e:
print(f"Error processing request: {str(e)}")
return jsonify({"error": f"Internal server error: {str(e)}"}), 500
if __name__ == '__main__':
port = int(os.environ.get("PORT", 7860))
app.run(host="0.0.0.0", port=port, debug=False)