|
from flask import Flask, request, jsonify |
|
import pandas as pd |
|
from transformers import pipeline |
|
import os |
|
import re |
|
import json |
|
import requests |
|
import random |
|
from difflib import get_close_matches |
|
from textblob import TextBlob |
|
from nltk.tokenize import word_tokenize, sent_tokenize |
|
import nltk |
|
import ast |
|
from urllib.parse import quote |
|
|
|
def force_download_nltk(): |
|
nltk_data_dir = os.environ.get("NLTK_DATA", "/app/nltk_data") |
|
transformers_cache_dir = os.environ.get("TRANSFORMERS_CACHE", "/app/transformers_cache") |
|
os.makedirs(nltk_data_dir, exist_ok=True) |
|
os.makedirs(transformers_cache_dir, exist_ok=True) |
|
os.environ["NLTK_DATA"] = nltk_data_dir |
|
os.environ["TRANSFORMERS_CACHE"] = transformers_cache_dir |
|
needed_packages = ["punkt"] |
|
for package in needed_packages: |
|
try: |
|
nltk.data.find(f"tokenizers/{package}") |
|
except LookupError: |
|
print(f"Downloading NLTK package: {package} to {nltk_data_dir}") |
|
nltk.download(package, download_dir=nltk_data_dir) |
|
force_download_nltk() |
|
|
|
domain_words = { |
|
"carb", "carbs", "carbo", "carbohydrate", "carbohydrates", |
|
"fat", "fats", "protein", "proteins", "fiber", "cholesterol", |
|
"calcium", "iron", "magnesium", "potassium", "sodium", "vitamin", "vitamin c", |
|
"calories", "calorie" |
|
} |
|
|
|
def smart_correct_spelling(text, domain_set): |
|
tokens = word_tokenize(text) |
|
corrected_tokens = [] |
|
for token in tokens: |
|
if token.isalpha() and token.lower() not in domain_set: |
|
corrected_word = str(TextBlob(token).correct()) |
|
corrected_tokens.append(corrected_word) |
|
else: |
|
corrected_tokens.append(token) |
|
return " ".join(corrected_tokens) |
|
|
|
qa_pipeline = pipeline("question-answering", model="distilbert-base-cased-distilled-squad") |
|
classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli") |
|
summarizer = pipeline("summarization", model="facebook/bart-large-cnn") |
|
|
|
def summarize_input(text): |
|
summary = summarizer(text, max_length=130, min_length=30, do_sample=False) |
|
return summary[0]['summary_text'] |
|
|
|
df = pd.read_csv("Datasets/Final used Datasets/food_dataset_with_nutriition.csv") |
|
print(f"Starting with {len(df)} recipes in dataset") |
|
nutrition_columns = ["calories", "Total fats", "Carbohydrate", "Fiber", "Protein", |
|
"Cholesterol", "Calcium", "Iron", "Magnesium", "Potassium", "Sodium", "Vitamin C"] |
|
for col in nutrition_columns: |
|
df[col] = pd.to_numeric(df[col], errors='coerce') |
|
|
|
disease_df = pd.read_csv("Datasets/Final used Datasets/disease_food_nutrition_mapping.csv") |
|
disease_df["Disease"] = disease_df["Disease"].str.lower() |
|
|
|
try: |
|
with open("docs/common_misspellings.json", "r") as file: |
|
common_misspellings = json.load(file) |
|
except FileNotFoundError: |
|
common_misspellings = {"suger": "sugar", "milc": "milk"} |
|
with open("docs/common_misspellings.json", "w") as file: |
|
json.dump(common_misspellings, file, indent=2) |
|
|
|
try: |
|
with open("docs/common_ingredients.json", "r") as file: |
|
common_ingredients = json.load(file) |
|
except FileNotFoundError: |
|
common_ingredients = ["sugar", "salt", "flour", "milk", "eggs", "butter", "oil", "water"] |
|
with open("docs/common_ingredients.json", "w") as file: |
|
json.dump(common_ingredients, file, indent=2) |
|
|
|
def create_ingredient_dictionary(dataframe, common_ingredients_list): |
|
all_ingredients = [] |
|
all_ingredients.extend(common_ingredients_list) |
|
all_ingredients.extend(set(common_misspellings.values())) |
|
for ingredients_list in dataframe['ingredients']: |
|
parts = re.split(r',|\sand\s|\sor\s|;', str(ingredients_list)) |
|
for part in parts: |
|
clean_part = re.sub( |
|
r'\d+[\s/]*(oz|ounce|cup|tbsp|tsp|tablespoon|teaspoon|pound|lb|g|ml|l|pinch|dash)\b\.?', |
|
'', part) |
|
clean_part = re.sub( |
|
r'\b(fresh|freshly|chopped|minced|diced|sliced|grated|ground|powdered|crushed|toasted|roasted)\b', |
|
'', clean_part) |
|
clean_part = re.sub(r'\(.*?\)', '', clean_part) |
|
clean_part = clean_part.strip() |
|
subparts = re.split(r'\sand\s|\sor\s', clean_part) |
|
for subpart in subparts: |
|
cleaned_subpart = subpart.strip().lower() |
|
if cleaned_subpart and len(cleaned_subpart) > 2: |
|
all_ingredients.append(cleaned_subpart) |
|
unique_ingredients = list(set(all_ingredients)) |
|
unique_ingredients.sort(key=len, reverse=True) |
|
return unique_ingredients |
|
food_dictionary = create_ingredient_dictionary(df, common_ingredients) |
|
|
|
def identify_food_ingredient(text, ingredient_dict, misspellings_dict): |
|
cleaned = re.sub( |
|
r'\d+[\s/]*(oz|ounce|cup|tbsp|tsp|tablespoon|teaspoon|pound|lb|g|ml|l|pinch|dash)\b\.?', |
|
'', text) |
|
cleaned = re.sub( |
|
r'\b(fresh|freshly|chopped|minced|diced|sliced|grated|ground|powdered|crushed|toasted|roasted)\b', |
|
'', cleaned) |
|
cleaned = re.sub(r'\(.*?\)', '', cleaned) |
|
cleaned = cleaned.strip().lower() |
|
if cleaned in misspellings_dict: |
|
return misspellings_dict[cleaned] |
|
if cleaned in ingredient_dict: |
|
return cleaned |
|
words = cleaned.split() |
|
for word in words: |
|
if word in ingredient_dict: |
|
return word |
|
if word in misspellings_dict: |
|
return misspellings_dict[word] |
|
close_matches = get_close_matches(cleaned, ingredient_dict, n=3, cutoff=0.8) |
|
if close_matches: |
|
return close_matches[0] |
|
for dict_ingredient in ingredient_dict: |
|
if dict_ingredient in cleaned: |
|
return dict_ingredient |
|
close_matches = get_close_matches(cleaned, ingredient_dict, n=3, cutoff=0.6) |
|
if close_matches: |
|
return close_matches[0] |
|
return None |
|
|
|
def correct_food_ingredient(ingredient, ingredient_dict, misspellings_dict): |
|
cleaned = re.sub( |
|
r'\d+[\s/]*(oz|ounce|cup|tbsp|tsp|tablespoon|teaspoon|pound|lb|g|ml|l|pinch|dash)\b\.?', |
|
'', ingredient) |
|
cleaned = re.sub( |
|
r'\b(fresh|freshly|chopped|minced|diced|sliced|grated|ground|powdered|crushed|toasted|roasted)\b', |
|
'', cleaned) |
|
cleaned = re.sub(r'\(.*?\)', '', cleaned) |
|
cleaned = cleaned.strip().lower() |
|
if cleaned in misspellings_dict: |
|
return misspellings_dict[cleaned] |
|
if cleaned in ingredient_dict: |
|
return cleaned |
|
close_matches = get_close_matches(cleaned, ingredient_dict, n=3, cutoff=0.8) |
|
if close_matches: |
|
return close_matches[0] |
|
close_matches = get_close_matches(cleaned, ingredient_dict, n=3, cutoff=0.6) |
|
if close_matches: |
|
return close_matches[0] |
|
for dict_ingredient in ingredient_dict: |
|
if cleaned in dict_ingredient or dict_ingredient in cleaned: |
|
return dict_ingredient |
|
return cleaned |
|
|
|
def add_misspelling(misspelled, correct): |
|
try: |
|
with open("docs/common_misspellings.json", "r") as file: |
|
misspellings = json.load(file) |
|
misspellings[misspelled.lower()] = correct.lower() |
|
with open("docs/common_misspellings.json", "w") as file: |
|
json.dump(misspellings, file, indent=2, sort_keys=True) |
|
return True |
|
except Exception: |
|
return False |
|
|
|
def extract_unwanted_ingredients(input_text): |
|
question = "What ingredients should be excluded?" |
|
result = qa_pipeline(question=question, context=input_text) |
|
raw_answer = result['answer'] |
|
potential_ingredients = [] |
|
for part in raw_answer.split(','): |
|
for subpart in part.split(' and '): |
|
for item in subpart.split(' or '): |
|
clean_item = item.strip() |
|
if clean_item: |
|
potential_ingredients.append(clean_item) |
|
valid_ingredients = [] |
|
for item in potential_ingredients: |
|
corrected = identify_food_ingredient(item, food_dictionary, common_misspellings) |
|
if corrected: |
|
valid_ingredients.append(corrected) |
|
return valid_ingredients if valid_ingredients else [raw_answer] |
|
|
|
def classify_clause(clause): |
|
candidate_labels = ["include", "exclude"] |
|
result = classifier(clause, candidate_labels, hypothesis_template="This clause means the ingredient should be {}.") |
|
return result["labels"][0].lower() |
|
|
|
def extract_ingredients_from_clause(clause, ingredient_dict, misspellings_dict): |
|
found = [] |
|
for ingredient in ingredient_dict: |
|
if ingredient.lower() in clause.lower(): |
|
normalized = identify_food_ingredient(ingredient, ingredient_dict, misspellings_dict) |
|
if normalized: |
|
found.append(normalized) |
|
return list(set(found)) |
|
|
|
def classify_ingredients_in_query(query, ingredient_dict, misspellings_dict): |
|
include_ingredients = [] |
|
exclude_ingredients = [] |
|
|
|
nutrition_terms = ['calories', 'calorie', 'fat', 'fats', 'carb', 'carbs', 'protein', |
|
'fiber', 'cholesterol', 'calcium', 'iron', 'magnesium', |
|
'potassium', 'sodium', 'vitamin'] |
|
modified_query = query |
|
for term in nutrition_terms: |
|
pattern = re.compile(r'(low|high)\s+' + term, re.IGNORECASE) |
|
modified_query = pattern.sub('', modified_query) |
|
clauses = re.split(r'\bbut\b|,', modified_query, flags=re.IGNORECASE) |
|
for clause in clauses: |
|
clause = clause.strip() |
|
if not clause: |
|
continue |
|
intent = classify_clause(clause) |
|
ingredients_found = extract_ingredients_from_clause(clause, ingredient_dict, misspellings_dict) |
|
if intent == "include": |
|
include_ingredients.extend(ingredients_found) |
|
elif intent == "exclude": |
|
exclude_ingredients.extend(ingredients_found) |
|
return list(set(include_ingredients)), list(set(exclude_ingredients)) |
|
|
|
def extract_nutrition_from_clause(clause, nutrition_dict, misspellings_dict): |
|
found = [] |
|
clause_lower = clause.lower() |
|
sorted_terms = sorted(nutrition_dict, key=lambda x: -len(x)) |
|
for term in sorted_terms: |
|
pattern = r'\b' + re.escape(term.lower()) + r'\b' |
|
if re.search(pattern, clause_lower): |
|
found.append(term.lower()) |
|
return list(set(found)) |
|
|
|
def classify_nutrition_in_query(query, nutrition_dict, misspellings_dict): |
|
include_nutrition = [] |
|
exclude_nutrition = [] |
|
clauses = re.split(r'\band\b|,|but', query, flags=re.IGNORECASE) |
|
overall_intent = "exclude" if re.search(r'sensitivity|allergy|exclude', query, flags=re.IGNORECASE) else "include" |
|
for clause in clauses: |
|
clause = clause.strip() |
|
if not clause: |
|
continue |
|
intent = "include" if "i want" in clause.lower() else overall_intent |
|
numbers = re.findall(r'\d+(?:\.\d+)?', clause) |
|
threshold = float(numbers[0]) if numbers else None |
|
if re.search(r'\b(high|over|above|more than|exceeding)\b', clause, flags=re.IGNORECASE): |
|
modifier = "high" |
|
elif re.search(r'\b(low|under|less than|below)\b', clause, flags=re.IGNORECASE): |
|
modifier = "low" |
|
else: |
|
modifier = "high" if intent == "exclude" else "low" |
|
terms_found = extract_nutrition_from_clause(clause, nutrition_dict, misspellings_dict) |
|
for term in terms_found: |
|
norm_term = nutrition_terms_dictionary.get(term, term) |
|
condition = (modifier, norm_term, threshold) if threshold is not None else (modifier, norm_term) |
|
if intent == "include": |
|
include_nutrition.append(condition) |
|
elif intent == "exclude": |
|
exclude_nutrition.append(condition) |
|
return list(set(include_nutrition)), list(set(exclude_nutrition)) |
|
|
|
nutrition_terms_dictionary = { |
|
"calorie": "calories", |
|
"calories": "calories", |
|
"fat": "Total fats", |
|
"fats": "Total fats", |
|
"total fat": "Total fats", |
|
"total fats": "Total fats", |
|
"carb": "Carbohydrate", |
|
"carbs": "Carbohydrate", |
|
"carbo": "Carbohydrate", |
|
"carbohydrate": "Carbohydrate", |
|
"carbohydrates": "Carbohydrate", |
|
"fiber": "Fiber", |
|
"protein": "Protein", |
|
"proteins": "Protein", |
|
"cholesterol": "Cholesterol", |
|
"calcium": "Calcium", |
|
"iron": "Iron", |
|
"magnesium": "Magnesium", |
|
"potassium": "Potassium", |
|
"sodium": "Sodium", |
|
"vitamin c": "Vitamin C" |
|
} |
|
|
|
fixed_thresholds = { |
|
"calories": 700, |
|
"Total fats": 60, |
|
"Carbohydrate": 120, |
|
"Fiber": 10, |
|
"Protein": 30, |
|
"Cholesterol": 100, |
|
"Calcium": 300, |
|
"Iron": 5, |
|
"Magnesium": 100, |
|
"Potassium": 300, |
|
"Sodium": 400, |
|
"Vitamin C": 50 |
|
} |
|
|
|
def filter_by_nutrition_condition(df, condition): |
|
if isinstance(condition, tuple): |
|
if len(condition) == 3: |
|
direction, nutrition_term, threshold = condition |
|
elif len(condition) == 2: |
|
direction, nutrition_term = condition |
|
threshold = fixed_thresholds.get(nutrition_term) |
|
else: |
|
return df |
|
column = nutrition_term |
|
if column is None or threshold is None: |
|
return df |
|
if direction == "low": |
|
return df[df[column] < threshold] |
|
elif direction == "high": |
|
return df[df[column] >= threshold] |
|
return df |
|
|
|
def score_recipe_ingredients(recipe_ingredients, include_list): |
|
recipe_lower = recipe_ingredients.lower() |
|
match_count = sum( |
|
1 for ingredient in include_list |
|
if ingredient.lower() in recipe_lower |
|
) |
|
return match_count |
|
|
|
def filter_and_rank_recipes(df, include_list, exclude_list, include_nutrition, exclude_nutrition): |
|
filtered_df = df.copy() |
|
print(f"Starting with {len(filtered_df)} recipes for filtering") |
|
if include_list: |
|
filtered_df['ingredient_match_count'] = filtered_df['ingredients'].apply( |
|
lambda x: score_recipe_ingredients(str(x), include_list) |
|
) |
|
filtered_df = filtered_df[filtered_df['ingredient_match_count'] >= 2] |
|
print(f"After requiring at least 2 included ingredients: {len(filtered_df)} recipes remain") |
|
for ingredient in exclude_list: |
|
before_count = len(filtered_df) |
|
filtered_df = filtered_df[ |
|
~filtered_df['ingredients'] |
|
.str.lower() |
|
.fillna('') |
|
.str.contains(re.escape(ingredient.lower())) |
|
] |
|
print(f"After excluding '{ingredient}': {len(filtered_df)} recipes remain (removed {before_count - len(filtered_df)})") |
|
for i, cond in enumerate(include_nutrition): |
|
before_count = len(filtered_df) |
|
filtered_df = filter_by_nutrition_condition(filtered_df, cond) |
|
after_count = len(filtered_df) |
|
print(f"After applying nutrition condition {i+1} (include) '{cond}': {after_count} recipes remain (removed {before_count - after_count})") |
|
for i, cond in enumerate(exclude_nutrition): |
|
before_count = len(filtered_df) |
|
temp_df = filter_by_nutrition_condition(df.copy(), cond) |
|
filtered_df = filtered_df[~filtered_df.index.isin(temp_df.index)] |
|
after_count = len(filtered_df) |
|
print(f"After applying nutrition condition {i+1} (exclude) '{cond}': {after_count} recipes remain (removed {before_count - after_count})") |
|
if filtered_df.empty: |
|
print("\nNo recipes match all criteria. Implementing fallback approach...") |
|
fallback_df = df.copy() |
|
if include_list: |
|
fallback_df['ingredient_match_count'] = fallback_df['ingredients'].apply( |
|
lambda x: score_recipe_ingredients(str(x), include_list) |
|
) |
|
fallback_df = fallback_df[fallback_df['ingredient_match_count'] >= 1] |
|
else: |
|
fallback_df['ingredient_match_count'] = 1 |
|
for ingredient in exclude_list: |
|
fallback_df = fallback_df[ |
|
~fallback_df['ingredients'] |
|
.str.lower() |
|
.fillna('') |
|
.str.contains(re.escape(ingredient.lower())) |
|
] |
|
if fallback_df.empty: |
|
fallback_df = df.sample(min(5, len(df))) |
|
fallback_df['ingredient_match_count'] = 0 |
|
print("No matches found. Showing random recipes as a fallback") |
|
filtered_df = fallback_df |
|
if 'ingredient_match_count' not in filtered_df.columns: |
|
filtered_df['ingredient_match_count'] = 0 |
|
filtered_df = filtered_df.sort_values('ingredient_match_count', ascending=False) |
|
return filtered_df |
|
|
|
def get_disease_recommendations(user_text, disease_mapping_df): |
|
user_text_lower = user_text.lower() |
|
matches = disease_mapping_df[disease_mapping_df['Disease'].apply(lambda d: d in user_text_lower)] |
|
if not matches.empty: |
|
disease_info = matches.iloc[0] |
|
def safe_parse_list(x): |
|
if isinstance(x, str): |
|
try: |
|
return ast.literal_eval(x) |
|
except: |
|
return [item.strip() for item in x.split(',') if item.strip()] |
|
return x |
|
best_foods = safe_parse_list(disease_info.get("Best_Foods", "[]")) |
|
worst_foods = safe_parse_list(disease_info.get("Worst_Foods", "[]")) |
|
best_nutrition = safe_parse_list(disease_info.get("Best_Nutrition", "[]")) |
|
worst_nutrition = safe_parse_list(disease_info.get("Worst_Nutrition", "[]")) |
|
recommendations = { |
|
"Disease": disease_info['Disease'], |
|
"Best_Foods": best_foods, |
|
"Worst_Foods": worst_foods, |
|
"Best_Nutrition": best_nutrition, |
|
"Worst_Nutrition": worst_nutrition |
|
} |
|
return recommendations |
|
return None |
|
|
|
def get_recipe_output(recipe_row): |
|
recipe_name = recipe_row['title'] |
|
ner_info = recipe_row.get('NER', '') |
|
try: |
|
ner_list = json.loads(ner_info) |
|
ner_str = ", ".join(ner_list) |
|
except Exception: |
|
ner_str = ner_info |
|
nutrition_details = {col: float(recipe_row[col]) for col in nutrition_columns} |
|
result = { |
|
"Meal name": recipe_name, |
|
"NER": ner_str, |
|
"Nutrition details": nutrition_details |
|
} |
|
print(f"Meal name: {recipe_name}") |
|
print(f"NER: {ner_str}") |
|
print(f"Nutrition details: {nutrition_details}") |
|
return result |
|
|
|
def process_long_query(query): |
|
if len(query.split()) > 500: |
|
print("Long input detected. Summarizing...") |
|
query = summarize_input(query) |
|
print(f"Processed Query: \"{query}\"") |
|
corrected = smart_correct_spelling(query, domain_words) |
|
sentences = sent_tokenize(corrected) |
|
aggregated_include = [] |
|
aggregated_exclude = [] |
|
aggregated_include_nutrition = [] |
|
aggregated_exclude_nutrition = [] |
|
for sentence in sentences: |
|
inc, exc = classify_ingredients_in_query(sentence, food_dictionary, common_misspellings) |
|
aggregated_include.extend(inc) |
|
aggregated_exclude.extend(exc) |
|
inc_nut, exc_nut = classify_nutrition_in_query(sentence, list(nutrition_terms_dictionary.keys()), common_misspellings) |
|
aggregated_include_nutrition.extend(inc_nut) |
|
aggregated_exclude_nutrition.extend(exc_nut) |
|
return corrected, list(set(aggregated_include)), list(set(aggregated_exclude)), \ |
|
list(set(aggregated_include_nutrition)), list(set(aggregated_exclude_nutrition)) |
|
|
|
def send_to_api(meal_data, parent_id): |
|
try: |
|
api_endpoint = "http://54.242.19.19:3000/api/ResturantMenu/add" |
|
meal_id = random.randint(1000, 9999) |
|
meal_name = meal_data.get("Meal name", "No meal name available") |
|
ner_info = meal_data.get("NER", "") |
|
images_public = "https://kero.beshoy.me/recipe_images/" |
|
image_path = True |
|
image_url = "" |
|
if image_path: |
|
try: |
|
image_url = images_public + quote(meal_name, safe="") + ".jpg" |
|
print(f"Successfully uploaded image to the server for {meal_name}: {image_url}") |
|
except Exception as cl_err: |
|
print(f"Error uploading to the server: {cl_err}") |
|
if not image_url: |
|
image_url = "https://picsum.photos/200" |
|
payload = { |
|
"id": str(meal_id), |
|
"name": meal_name, |
|
"description": ner_info, |
|
"photo": image_url, |
|
"parentId": parent_id |
|
} |
|
print(f"\nSending payload to API: {payload}") |
|
response = requests.post(api_endpoint, json=payload) |
|
print(f"API Response for meal {meal_name}: {response.status_code}") |
|
|
|
try: |
|
return response.json() |
|
except Exception: |
|
return {"error": response.text} |
|
except Exception as e: |
|
print(f"Error sending meal to API: {e}") |
|
return {"error": str(e)} |
|
|
|
app = Flask(__name__) |
|
@app.route('/process', methods=['POST']) |
|
def process(): |
|
try: |
|
|
|
input_text = "" |
|
parent_id = "" |
|
|
|
if request.is_json: |
|
|
|
data = request.json |
|
input_text = data.get("description", "") |
|
parent_id = data.get("parentId", "") |
|
|
|
if not input_text: |
|
return jsonify({"error": "Missing description in request"}), 400 |
|
if not parent_id: |
|
return jsonify({"error": "Missing parentId in request"}), 400 |
|
|
|
else: |
|
|
|
input_text_json = request.form |
|
input_text = input_text_json.get("description", "") |
|
parent_id = input_text_json.get("parentId", "") |
|
|
|
if not input_text: |
|
return jsonify({"error": "Missing description in request"}), 400 |
|
if not parent_id: |
|
return jsonify({"error": "Missing parentId in request"}), 400 |
|
|
|
print("WARNING: Using raw data format. Please consider using JSON format.") |
|
|
|
raw_input_text = input_text |
|
processed_input, user_include, user_exclude, user_include_nutrition, user_exclude_nutrition = process_long_query(raw_input_text) |
|
|
|
include_list, exclude_list = [], [] |
|
include_nutrition, exclude_nutrition = [], [] |
|
|
|
disease_recs = get_disease_recommendations(processed_input, disease_df) |
|
|
|
if disease_recs: |
|
print("\nDisease-related Recommendations Detected:") |
|
print(f"Disease: {disease_recs['Disease']}") |
|
print(f"Best Foods: {disease_recs['Best_Foods']}") |
|
print(f"Worst Foods: {disease_recs['Worst_Foods']}") |
|
print(f"Best Nutrition: {disease_recs['Best_Nutrition']}") |
|
print(f"Worst Nutrition: {disease_recs['Worst_Nutrition']}") |
|
|
|
include_list.extend(disease_recs["Best_Foods"]) |
|
exclude_list.extend(disease_recs["Worst_Foods"]) |
|
|
|
def parse_nutrition_condition(nutrition_phrase): |
|
parts = nutrition_phrase.strip().split() |
|
if len(parts) == 2: |
|
direction = parts[0].lower() |
|
nutrient = parts[1].lower() |
|
mapped_nutrient = nutrition_terms_dictionary.get(nutrient, nutrient) |
|
return (direction, mapped_nutrient) |
|
return None |
|
|
|
for bn in disease_recs["Best_Nutrition"]: |
|
cond = parse_nutrition_condition(bn) |
|
if cond: |
|
include_nutrition.append(cond) |
|
for wn in disease_recs["Worst_Nutrition"]: |
|
cond = parse_nutrition_condition(wn) |
|
if cond: |
|
exclude_nutrition.append(cond) |
|
|
|
include_list.extend(user_include) |
|
exclude_list.extend(user_exclude) |
|
include_nutrition.extend(user_include_nutrition) |
|
exclude_nutrition.extend(user_exclude_nutrition) |
|
|
|
include_list = list(set(include_list)) |
|
exclude_list = list(set(exclude_list)) |
|
include_nutrition = list(set(include_nutrition)) |
|
exclude_nutrition = list(set(exclude_nutrition)) |
|
|
|
print("\nFinal Lists After Combining Disease + User Query:") |
|
print(f"Ingredients to include: {include_list}") |
|
print(f"Ingredients to exclude: {exclude_list}") |
|
print(f"Nutrition conditions to include: {include_nutrition}") |
|
print(f"Nutrition conditions to exclude: {exclude_nutrition}") |
|
|
|
corrected_include = [correct_food_ingredient(ingredient, food_dictionary, common_misspellings) for ingredient in include_list] |
|
corrected_exclude = [correct_food_ingredient(ingredient, food_dictionary, common_misspellings) for ingredient in exclude_list] |
|
|
|
include_list = list(set(corrected_include)) |
|
exclude_list = list(set(corrected_exclude)) |
|
filtered_df = filter_and_rank_recipes( |
|
df, |
|
include_list, |
|
exclude_list, |
|
include_nutrition, |
|
exclude_nutrition |
|
) |
|
|
|
final_output = {} |
|
api_responses = [] |
|
|
|
if not filtered_df.empty: |
|
filtered_df = filtered_df.sample(frac=1) |
|
meal_count = min(6, len(filtered_df)) |
|
|
|
for i in range(meal_count): |
|
if i == 0: |
|
print("\nRecommended Meal:") |
|
meal_data = get_recipe_output(filtered_df.iloc[i]) |
|
final_output["Recommended Meal"] = meal_data |
|
else: |
|
print(f"\nOption {i}:") |
|
meal_data = get_recipe_output(filtered_df.iloc[i]) |
|
final_output[f"Option {i}"] = meal_data |
|
|
|
api_response = send_to_api(meal_data, parent_id) |
|
api_responses.append(api_response) |
|
else: |
|
error_message = f"No recipes found that match your criteria.\nIngredients to include: {', '.join(include_list)}\nIngredients to exclude: {', '.join(exclude_list)}\nNutrition Include: {', '.join(str(cond) for cond in include_nutrition)}\nNutrition Exclude: {', '.join(str(cond) for cond in exclude_nutrition)}." |
|
print(error_message) |
|
final_output["Message"] = error_message |
|
return jsonify({"error": error_message}), 404 |
|
|
|
return jsonify({ |
|
"original_response": final_output, |
|
"api_responses": api_responses, |
|
"message": f"Successfully processed {len(api_responses)} meals" |
|
}) |
|
|
|
except Exception as e: |
|
print(f"Error processing request: {str(e)}") |
|
return jsonify({"error": f"Internal server error: {str(e)}"}), 500 |
|
|
|
|
|
if __name__ == '__main__': |
|
port = int(os.environ.get("PORT", 7860)) |
|
app.run(host="0.0.0.0", port=port, debug=False) |
|
|