Prathamesh Sable commited on
Commit
da8dc09
·
1 Parent(s): 5013d7d

added agent and other services update

Browse files
services/ai_agent.py DELETED
@@ -1,122 +0,0 @@
1
- from sqlalchemy.orm import Session
2
- from fastapi import HTTPException
3
- from utils.fetch_data import fetch_product_data_from_api
4
- from utils.file_operations import save_json_file
5
- from models.ingredient import Ingredient
6
- from models.product import Product
7
- from services.ingredients import get_ingredient_by_name, save_ingredient_data, fetch_ingredient_data_from_api
8
- from typing import Dict, Any
9
- import json
10
- from transformers import pipeline
11
-
12
- def preprocess_data(barcode: str) -> Dict[str, Any]:
13
- data = fetch_product_data_from_api(barcode)
14
- product = data.get('product', {})
15
-
16
- product_info = {
17
- "product_name": product.get('product_name_en', product.get('product_name', 'N/A')),
18
- "generic_name": product.get('generic_name_en', product.get('generic_name', 'N/A')),
19
- "brands": product.get('brands', 'N/A'),
20
- "ingredients": [],
21
- "ingredients_text": product.get('ingredients_text_en', product.get('ingredients_text', 'N/A')),
22
- "ingredients_analysis": product.get('ingredients_analysis', {}),
23
- "nutriscore": product.get('nutriscore', {}),
24
- "nutrient_levels": product.get('nutrient_levels', {}),
25
- "nutriments": product.get('nutriments', {}),
26
- "data_quality_warnings": product.get('data_quality_warnings_tags', [])
27
- }
28
-
29
- ingredients_list = product.get('ingredients', [])
30
- for ingredient in ingredients_list:
31
- ingredient_info = {
32
- "text": ingredient.get('text', 'N/A'),
33
- "percent": ingredient.get('percent', ingredient.get('percent_estimate', 'N/A')),
34
- "vegan": ingredient.get('vegan', 'N/A'),
35
- "vegetarian": ingredient.get('vegetarian', 'N/A'),
36
- "sub_ingredients": []
37
- }
38
- sub_ingredients = ingredient.get('ingredients', [])
39
- for sub_ingredient in sub_ingredients:
40
- sub_ingredient_info = {
41
- "text": sub_ingredient.get('text', 'N/A'),
42
- "percent": sub_ingredient.get('percent', sub_ingredient.get('percent_estimate', 'N/A')),
43
- "vegan": sub_ingredient.get('vegan', 'N/A'),
44
- "vegetarian": sub_ingredient.get('vegetarian', 'N/A')
45
- }
46
- ingredient_info["sub_ingredients"].append(sub_ingredient_info)
47
- product_info["ingredients"].append(ingredient_info)
48
-
49
- return product_info
50
-
51
- def validate_data(data: Dict[str, Any]) -> bool:
52
- required_fields = ["product_name", "generic_name", "brands", "ingredients", "nutriscore", "nutrient_levels", "nutriments"]
53
- for field in required_fields:
54
- if field not in data or not data[field]:
55
- return False
56
- return True
57
-
58
- def clean_data(data: Dict[str, Any]) -> Dict[str, Any]:
59
- for ingredient in data["ingredients"]:
60
- if "percent" in ingredient and ingredient["percent"] == "N/A":
61
- ingredient["percent"] = 0
62
- for sub_ingredient in ingredient["sub_ingredients"]:
63
- if "percent" in sub_ingredient and sub_ingredient["percent"] == "N/A":
64
- sub_ingredient["percent"] = 0
65
- return data
66
-
67
- def standardize_data(data: Dict[str, Any]) -> Dict[str, Any]:
68
- for ingredient in data["ingredients"]:
69
- ingredient["text"] = ingredient["text"].lower()
70
- for sub_ingredient in ingredient["sub_ingredients"]:
71
- sub_ingredient["text"] = sub_ingredient["text"].lower()
72
- return data
73
-
74
- def enrich_data(db: Session, data: Dict[str, Any]) -> Dict[str, Any]:
75
- for ingredient in data["ingredients"]:
76
- ingredient_data = get_ingredient_by_name(db, ingredient["text"])
77
- if not ingredient_data:
78
- ingredient_data = fetch_ingredient_data_from_api(ingredient["text"])
79
- save_ingredient_data(db, ingredient["text"], ingredient_data)
80
- ingredient["nutritional_info"] = ingredient_data
81
- return data
82
-
83
- def process_data(db: Session, barcode: str) -> Dict[str, Any]:
84
- data = preprocess_data(barcode)
85
- if not validate_data(data):
86
- raise HTTPException(status_code=400, detail="Invalid data")
87
- data = clean_data(data)
88
- data = standardize_data(data)
89
- data = enrich_data(db, data)
90
-
91
- # Save product details in the Product model
92
- product = Product(
93
- product_name=data["product_name"],
94
- generic_name=data["generic_name"],
95
- brands=data["brands"],
96
- ingredients=data["ingredients"],
97
- ingredients_text=data["ingredients_text"],
98
- ingredients_analysis=data["ingredients_analysis"],
99
- nutriscore=data["nutriscore"],
100
- nutrient_levels=data["nutrient_levels"],
101
- nutriments=data["nutriments"],
102
- data_quality_warnings=data["data_quality_warnings"]
103
- )
104
- db.add(product)
105
- db.commit()
106
- db.refresh(product)
107
-
108
- # Save ingredient details in the Ingredient model
109
- for ingredient in data["ingredients"]:
110
- ingredient_data = get_ingredient_by_name(db, ingredient["text"])
111
- if not ingredient_data:
112
- ingredient_data = fetch_ingredient_data_from_api(ingredient["text"])
113
- save_ingredient_data(db, ingredient["text"], ingredient_data)
114
- ingredient["nutritional_info"] = ingredient_data
115
-
116
- save_json_file(barcode, data)
117
- return data
118
-
119
- def integrate_hugging_face_transformers(model_name: str, text: str) -> str:
120
- nlp = pipeline("fill-mask", model=model_name)
121
- result = nlp(text)
122
- return result[0]['sequence']
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
services/analysis_agent.py DELETED
@@ -1,49 +0,0 @@
1
- from sqlalchemy.orm import Session
2
- from fastapi import HTTPException
3
- from typing import Dict, Any, List
4
- from models.user_preferences import UserPreferences
5
- from services.ingredients import get_ingredient_data, filter_ingredients_by_preferences
6
- from models.ingredient import Ingredient
7
-
8
- def analyze_ingredients(db: Session, ingredients: List[Dict[str, Any]], user_id: int) -> Dict[str, Any]:
9
- preferences = db.query(UserPreferences).filter(UserPreferences.user_id == user_id).first()
10
- if not preferences:
11
- raise HTTPException(status_code=404, detail="User preferences not found")
12
-
13
- filtered_ingredients = filter_ingredients_by_preferences(ingredients, preferences.__dict__)
14
- analysis_results = {
15
- "safe_ingredients": [],
16
- "unsafe_ingredients": [],
17
- "additional_facts": []
18
- }
19
-
20
- for ingredient in filtered_ingredients:
21
- ingredient_data = get_ingredient_data(db, ingredient["text"])
22
- if ingredient_data:
23
- analysis_results["safe_ingredients"].append({
24
- "name": ingredient["text"],
25
- "nutritional_info": ingredient_data
26
- })
27
- else:
28
- analysis_results["unsafe_ingredients"].append({
29
- "name": ingredient["text"],
30
- "reason": "Information not found"
31
- })
32
-
33
- return analysis_results
34
-
35
- def provide_personalized_recommendations(db: Session, user_id: int) -> Dict[str, Any]:
36
- preferences = db.query(UserPreferences).filter(UserPreferences.user_id == user_id).first()
37
- if not preferences:
38
- raise HTTPException(status_code=404, detail="User preferences not found")
39
-
40
- recommended_ingredients = []
41
- all_ingredients = db.query(Ingredient).all()
42
- for ingredient in all_ingredients:
43
- if ingredient.name not in preferences.disliked_ingredients:
44
- recommended_ingredients.append({
45
- "name": ingredient.name,
46
- "nutritional_info": ingredient.nutritional_info
47
- })
48
-
49
- return {"recommended_ingredients": recommended_ingredients}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
services/auth_service.py CHANGED
@@ -4,9 +4,10 @@ from datetime import datetime, timedelta
4
  from fastapi import Depends, HTTPException, status
5
  from fastapi.security import OAuth2PasswordBearer
6
  from sqlalchemy.orm import Session,Mapped
7
- from database import get_db
8
- from models.user import User
9
- from pydantic import BaseModel
 
10
 
11
  # to get a string like this run:
12
  # openssl rand -hex 32
@@ -17,42 +18,62 @@ ACCESS_TOKEN_EXPIRE_MINUTES = 30
17
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
18
  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
19
 
20
- class Token(BaseModel):
21
- access_token: str
22
- token_type: str
23
-
24
- class TokenData(BaseModel):
25
- username: str | None = None
26
 
27
 
28
  def verify_password(plain_password, hashed_password):
29
- return pwd_context.verify(plain_password, hashed_password)
 
 
 
 
 
30
 
31
  def get_password_hash(password):
32
- return pwd_context.hash(password)
 
 
 
 
 
33
 
34
  def get_user(db, username: str):
35
- return db.query(User).filter(User.username == username).first()
 
 
 
 
 
36
 
37
  def authenticate_user(db, username: str, password: str):
38
- user = get_user(db, username)
39
- if not user:
40
- return False
41
- if not verify_password(password, user.hashed_password):
42
- return False
43
- return user
 
 
 
 
 
44
 
45
  def create_access_token(data: dict, expires_delta: timedelta | None = None):
46
- to_encode = data.copy()
47
- if expires_delta:
48
- expire = datetime.utcnow() + expires_delta
49
- else:
50
- expire = datetime.utcnow() + timedelta(minutes=15)
51
- to_encode.update({"exp": expire})
52
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
53
- return encoded_jwt
 
 
 
 
 
54
 
55
  async def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)):
 
56
  credentials_exception = HTTPException(
57
  status_code=status.HTTP_401_UNAUTHORIZED,
58
  detail="Could not validate credentials",
@@ -64,22 +85,36 @@ async def get_current_user(db: Session = Depends(get_db), token: str = Depends(o
64
  if username is None:
65
  raise credentials_exception
66
  token_data = TokenData(username=username)
67
- except JWTError:
 
68
  raise credentials_exception
 
 
 
69
  user = get_user(db, username=token_data.username)
70
  if user is None:
71
  raise credentials_exception
72
  return user
73
 
74
  async def get_current_active_user(current_user: User = Depends(get_current_user)):
75
- if not current_user.is_active:
76
- raise HTTPException(status_code=400, detail="Inactive user")
77
- return current_user
 
 
 
 
 
78
 
79
  def create_user(db: Session, username: str, email: str, password: str):
80
- hashed_password = get_password_hash(password)
81
- db_user = User(username=username, email=email, hashed_password=hashed_password)
82
- db.add(db_user)
83
- db.commit()
84
- db.refresh(db_user)
85
- return db_user
 
 
 
 
 
 
4
  from fastapi import Depends, HTTPException, status
5
  from fastapi.security import OAuth2PasswordBearer
6
  from sqlalchemy.orm import Session,Mapped
7
+ from db.database import get_db
8
+ from db.models import User
9
+ from interfaces.authModels import UserResponse,TokenData
10
+ from logger_manager import log_info, log_error
11
 
12
  # to get a string like this run:
13
  # openssl rand -hex 32
 
18
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
19
  oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
20
 
 
 
 
 
 
 
21
 
22
 
23
  def verify_password(plain_password, hashed_password):
24
+ log_info("Verifying password")
25
+ try:
26
+ return pwd_context.verify(plain_password, hashed_password)
27
+ except Exception as e:
28
+ log_error(f"Error verifying password: {str(e)}")
29
+ raise HTTPException(status_code=500, detail="Internal Server Error")
30
 
31
  def get_password_hash(password):
32
+ log_info("Hashing password")
33
+ try:
34
+ return pwd_context.hash(password)
35
+ except Exception as e:
36
+ log_error(f"Error hashing password: {str(e)}")
37
+ raise HTTPException(status_code=500, detail="Internal Server Error")
38
 
39
  def get_user(db, username: str):
40
+ log_info(f"Getting user: {username}")
41
+ try:
42
+ return db.query(User).filter(User.username == username).first()
43
+ except Exception as e:
44
+ log_error(f"Error getting user: {str(e)}")
45
+ raise HTTPException(status_code=500, detail="Internal Server Error")
46
 
47
  def authenticate_user(db, username: str, password: str):
48
+ log_info(f"Authenticating user: {username}")
49
+ try:
50
+ user = get_user(db, username)
51
+ if not user:
52
+ return False
53
+ if not verify_password(password, user.hashed_password):
54
+ return False
55
+ return user
56
+ except Exception as e:
57
+ log_error(f"Error authenticating user: {str(e)}")
58
+ raise HTTPException(status_code=500, detail="Internal Server Error")
59
 
60
  def create_access_token(data: dict, expires_delta: timedelta | None = None):
61
+ log_info("Creating access token")
62
+ try:
63
+ to_encode = data.copy()
64
+ if expires_delta:
65
+ expire = datetime.utcnow() + expires_delta
66
+ else:
67
+ expire = datetime.utcnow() + timedelta(minutes=15)
68
+ to_encode.update({"exp": expire})
69
+ encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
70
+ return encoded_jwt
71
+ except Exception as e:
72
+ log_error(f"Error creating access token: {str(e)}")
73
+ raise HTTPException(status_code=500, detail="Internal Server Error")
74
 
75
  async def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)):
76
+ log_info("Getting current user")
77
  credentials_exception = HTTPException(
78
  status_code=status.HTTP_401_UNAUTHORIZED,
79
  detail="Could not validate credentials",
 
85
  if username is None:
86
  raise credentials_exception
87
  token_data = TokenData(username=username)
88
+ except JWTError as e:
89
+ log_error(f"JWT error: {str(e)}")
90
  raise credentials_exception
91
+ except Exception as e:
92
+ log_error(f"Error decoding token: {str(e)}")
93
+ raise HTTPException(status_code=500, detail="Internal Server Error")
94
  user = get_user(db, username=token_data.username)
95
  if user is None:
96
  raise credentials_exception
97
  return user
98
 
99
  async def get_current_active_user(current_user: User = Depends(get_current_user)):
100
+ log_info("Getting current active user")
101
+ try:
102
+ if not current_user.is_active:
103
+ raise HTTPException(status_code=400, detail="Inactive user")
104
+ return UserResponse.from_orm(current_user)
105
+ except Exception as e:
106
+ log_error(f"Error getting current active user: {str(e)}")
107
+ raise HTTPException(status_code=500, detail="Internal Server Error")
108
 
109
  def create_user(db: Session, username: str, email: str, password: str):
110
+ log_info(f"Creating user: {username}")
111
+ try:
112
+ hashed_password = get_password_hash(password)
113
+ db_user = User(username=username, email=email, hashed_password=hashed_password)
114
+ db.add(db_user)
115
+ db.commit()
116
+ db.refresh(db_user)
117
+ return db_user
118
+ except Exception as e:
119
+ log_error(f"Error creating user: {str(e)}")
120
+ raise HTTPException(status_code=500, detail="Internal Server Error")
services/ingredientFinderAgent.py ADDED
@@ -0,0 +1,628 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import traceback
4
+ import requests
5
+ import pandas as pd
6
+ from dotenv import load_dotenv
7
+
8
+ from typing import Dict, Any
9
+ from langchain_google_genai import ChatGoogleGenerativeAI
10
+ from langchain_community.tools import DuckDuckGoSearchRun
11
+ from langchain_community.tools import WikipediaQueryRun
12
+ from langchain_community.utilities.wikipedia import WikipediaAPIWrapper
13
+ from langchain_core.tools import tool
14
+
15
+ # modular
16
+ from logger_manager import logger
17
+ from interfaces.ingredientModels import IngredientAnalysisResult,IngredientState
18
+
19
+ # Load environment variables from .env file
20
+ load_dotenv()
21
+
22
+ # Load Scraped Database
23
+ SCRAPED_DB_PATH = "data/Food_Aditives_E_numbers.csv" # Ensure this file exists
24
+ if os.path.exists(SCRAPED_DB_PATH):
25
+ additives_df = pd.read_csv(SCRAPED_DB_PATH)
26
+ logger.info(f"Loaded database with {len(additives_df)} entries")
27
+ else:
28
+ additives_df = None
29
+ logger.warning("Scraped database not found!")
30
+
31
+
32
+
33
+ # Define tool functions
34
+ @tool("search_local_db")
35
+ def search_local_db(ingredient: str) -> Dict[str, Any]:
36
+ """Search local database for ingredient information. E number database scrapped"""
37
+ logger.info(f"Searching local DB for: {ingredient}")
38
+ if additives_df is not None:
39
+ match = additives_df[additives_df['Name of Aditive'].str.contains(ingredient, case=False, na=False)]
40
+ if not match.empty:
41
+ return {"source": "Local DB", "found": True, "data": match.iloc[0].to_dict()}
42
+ return {"source": "Local DB", "found": False, "data": None}
43
+
44
+ @tool("search_open_food_facts")
45
+ def search_open_food_facts(ingredient: str) -> Dict[str, Any]:
46
+ """Search Open Food Facts database for ingredient information."""
47
+ logger.info(f"Searching Open Food Facts for: {ingredient}")
48
+
49
+ try:
50
+ open_food_facts_api = "https://world.openfoodfacts.org/api/v0"
51
+ # Search for the ingredient
52
+ search_url = f"{open_food_facts_api}/ingredient/{ingredient.lower().replace(' ', '-')}.json"
53
+ response = requests.get(search_url, timeout=10)
54
+
55
+ if response.status_code == 200:
56
+ data = response.json()
57
+ if data.get("status") == 1: # Successfully found
58
+ return {
59
+ "source": "Open Food Facts",
60
+ "found": True,
61
+ "data": data
62
+ }
63
+
64
+ # Try searching products containing this ingredient
65
+ product_search_url = f"{open_food_facts_api}/search.json?ingredients_tags={ingredient.lower().replace(' ', '_')}&page_size=5"
66
+ response = requests.get(product_search_url, timeout=10)
67
+
68
+ if response.status_code == 200:
69
+ data = response.json()
70
+ if data.get("count") > 0:
71
+ return {
72
+ "source": "Open Food Facts Products",
73
+ "found": True,
74
+ "data": data
75
+ }
76
+
77
+ return {"source": "Open Food Facts", "found": False, "data": None}
78
+
79
+ except Exception as e:
80
+ logger.error(f"Error searching Open Food Facts: {e}")
81
+ return {"source": "Open Food Facts", "found": False, "error": str(e)}
82
+
83
+ @tool("search_usda")
84
+ def search_usda(ingredient: str) -> Dict[str, Any]:
85
+ """Search USDA FoodData Central for ingredient information."""
86
+ logger.info(f"Searching USDA for: {ingredient}")
87
+
88
+ try:
89
+ usda_api = "https://api.nal.usda.gov/fdc/v1"
90
+ usda_api_key = os.getenv("USDA_API_KEY", "DEMO_KEY") # Use DEMO_KEY if not provided
91
+
92
+ # Search for the ingredient
93
+ search_url = f"{usda_api}/foods/search"
94
+ params = {
95
+ "api_key": usda_api_key,
96
+ "query": ingredient,
97
+ "dataType": ["Foundation", "SR Legacy", "Branded"],
98
+ "pageSize": 5
99
+ }
100
+
101
+ response = requests.get(search_url, params=params, timeout=10)
102
+
103
+ if response.status_code == 200:
104
+ data = response.json()
105
+ if data.get("totalHits", 0) > 0:
106
+ return {
107
+ "source": "USDA FoodData Central",
108
+ "found": True,
109
+ "data": data
110
+ }
111
+
112
+ return {"source": "USDA FoodData Central", "found": False, "data": None}
113
+
114
+ except Exception as e:
115
+ logger.error(f"Error searching USDA: {e}")
116
+ return {"source": "USDA FoodData Central", "found": False, "error": str(e)}
117
+
118
+ @tool("search_pubchem")
119
+ def search_pubchem(ingredient: str) -> Dict[str, Any]:
120
+ """Search PubChem for chemical information about the ingredient."""
121
+ logger.info(f"Searching PubChem for: {ingredient}")
122
+
123
+ try:
124
+ pubchem_api = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"
125
+
126
+ # First try to get compound information by name
127
+ search_url = f"{pubchem_api}/compound/name/{ingredient}/JSON"
128
+ response = requests.get(search_url, timeout=10)
129
+
130
+ if response.status_code == 200:
131
+ data = response.json()
132
+ if "PC_Compounds" in data:
133
+ compound_id = data["PC_Compounds"][0]["id"]["id"]["cid"]
134
+
135
+ # Get more detailed information using the CID
136
+ property_url = f"{pubchem_api}/compound/cid/{compound_id}/property/MolecularFormula,MolecularWeight,IUPACName,InChI,InChIKey,CanonicalSMILES/JSON"
137
+ prop_response = requests.get(property_url, timeout=10)
138
+ properties_data = None
139
+ if prop_response.status_code == 200:
140
+ properties_data = prop_response.json()
141
+
142
+ # Get classifications and categories
143
+ classification_url = f"{pubchem_api}/compound/cid/{compound_id}/classification/JSON"
144
+ class_response = requests.get(classification_url, timeout=10)
145
+ classification_data = None
146
+ if class_response.status_code == 200:
147
+ classification_data = class_response.json()
148
+
149
+ return {
150
+ "source": "PubChem",
151
+ "found": True,
152
+ "data": {
153
+ "compound_info": data,
154
+ "properties": properties_data,
155
+ "classification": classification_data
156
+ }
157
+ }
158
+
159
+ return {"source": "PubChem", "found": False, "data": None}
160
+
161
+ except Exception as e:
162
+ logger.error(f"Error searching PubChem: {e}")
163
+ return {"source": "PubChem", "found": False, "error": str(e)}
164
+
165
+ @tool("search_wikipedia")
166
+ def search_wikipedia(ingredient: str) -> Dict[str, Any]:
167
+ """Search Wikipedia for ingredient information."""
168
+ logger.info(f"Searching Wikipedia for: {ingredient}")
169
+
170
+ try:
171
+ wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
172
+ wiki_result = wikipedia.run(ingredient)
173
+
174
+ if wiki_result and len(wiki_result) > 100: # Only count substantial results
175
+ return {
176
+ "source": "Wikipedia",
177
+ "found": True,
178
+ "data": wiki_result
179
+ }
180
+ else:
181
+ # Try with more specific searches
182
+ food_wiki = wikipedia.run(f"{ingredient} food additive")
183
+ if food_wiki and len(food_wiki) > 100:
184
+ return {
185
+ "source": "Wikipedia",
186
+ "found": True,
187
+ "data": food_wiki
188
+ }
189
+
190
+ chemical_wiki = wikipedia.run(f"{ingredient} chemical compound")
191
+ if chemical_wiki and len(chemical_wiki) > 100:
192
+ return {
193
+ "source": "Wikipedia",
194
+ "found": True,
195
+ "data": chemical_wiki
196
+ }
197
+
198
+ return {"source": "Wikipedia", "found": False, "data": None}
199
+
200
+ except Exception as e:
201
+ logger.error(f"Error searching Wikipedia: {e}")
202
+ return {"source": "Wikipedia", "found": False, "error": str(e)}
203
+
204
+ @tool("search_web")
205
+ def search_web(ingredient: str) -> Dict[str, Any]:
206
+ """Search web for ingredient information using DuckDuckGo."""
207
+ logger.info(f"Searching web for: {ingredient}")
208
+
209
+ try:
210
+ duckduckgo = DuckDuckGoSearchRun()
211
+ search_queries = [f"{ingredient} food ingredient safety", f"{ingredient} E-number food additive"]
212
+ all_results = []
213
+ for query in search_queries:
214
+ result = duckduckgo.run(query)
215
+ if result:
216
+ all_results.append({"query": query, "result": result})
217
+ return {"source": "DuckDuckGo", "found": bool(all_results), "data": all_results}
218
+ except Exception as e:
219
+ logger.error(f"Web search error: {e}")
220
+ return {"source": "DuckDuckGo", "found": False, "error": str(e)}
221
+
222
+ def create_summary_from_source(source: Dict[str, Any]) -> str:
223
+ """Create a meaningful summary from source data."""
224
+ source_name = source.get("source", "Unknown")
225
+ source_data = source.get("data")
226
+
227
+ if not source_data:
228
+ return "Data found but empty"
229
+
230
+ # Handle different types of sources
231
+ if source_name == "Local DB":
232
+ if isinstance(source_data, dict):
233
+ # Get the most informative fields from local DB
234
+ return f"E-Number: {source_data.get('E No.', 'N/A')}, " \
235
+ f"Category: {source_data.get('Functional Class', 'N/A')}, " \
236
+ f"Description: {source_data.get('Main Use', '')[:100]}..."
237
+
238
+ elif source_name == "DuckDuckGo":
239
+ if isinstance(source_data, list) and source_data:
240
+ # Get the first query and a snippet of the result
241
+ first_result = source_data[0]
242
+ query = first_result.get("query", "")
243
+ result_snippet = first_result.get("result", "")[:150]
244
+ return f"Query: '{query}', Result: '{result_snippet}...'"
245
+
246
+ elif source_name == "Wikipedia":
247
+ # For wikipedia, return the first paragraph
248
+ if isinstance(source_data, str):
249
+ first_paragraph = source_data.split("\n\n")[0][:200]
250
+ return f"Wikipedia excerpt: {first_paragraph}..."
251
+
252
+ elif source_name in ["Open Food Facts", "Open Food Facts Products"]:
253
+ if isinstance(source_data, dict):
254
+ # Try to extract product name or ingredient description
255
+ if "product" in source_data:
256
+ return f"Product info: {source_data.get('product', {}).get('product_name', 'Unknown')}"
257
+ elif "ingredients_text" in source_data:
258
+ return f"Ingredients: {source_data.get('ingredients_text', '')[:150]}..."
259
+ else:
260
+ return f"Found data with {len(source_data)} fields"
261
+
262
+ elif source_name == "USDA FoodData Central":
263
+ if isinstance(source_data, dict) and "foods" in source_data:
264
+ foods = source_data.get("foods", [])
265
+ if foods:
266
+ first_food = foods[0]
267
+ return f"Food: {first_food.get('description', 'Unknown')}, " \
268
+ f"Category: {first_food.get('foodCategory', 'N/A')}"
269
+ else:
270
+ return "Found USDA data, but no specific foods listed"
271
+
272
+ elif source_name == "PubChem":
273
+ if isinstance(source_data, dict):
274
+ compound_info = source_data.get("compound_info", {})
275
+ properties = source_data.get("properties", {})
276
+
277
+ if "PC_Compounds" in compound_info and compound_info["PC_Compounds"]:
278
+ compound = compound_info["PC_Compounds"][0]
279
+ return f"Chemical ID: {compound.get('id', {}).get('id', {}).get('cid', 'N/A')}, " \
280
+ f"Found chemical property data"
281
+
282
+ # Default for unknown or complex sources
283
+ return f"Found data from {source_name} ({type(source_data).__name__})"
284
+
285
+ def analyze_ingredient(state: IngredientState) -> IngredientState:
286
+ """Analyze ingredient data with LLM to generate structured information.
287
+
288
+ Takes the current state with collected sources_data and uses an LLM to generate
289
+ a comprehensive analysis of the ingredient including safety rating, health effects,
290
+ description, and alternate names.
291
+
292
+ Args:
293
+ state: The current IngredientState containing all collected data
294
+
295
+ Returns:
296
+ Updated state with analysis results
297
+ """
298
+ # Get API key and model from environment
299
+ api_key = os.getenv("GOOGLE_API_KEY")
300
+ model_name = os.getenv("LLM_MODEL_NAME", "gemini-1.5-pro")
301
+
302
+ # Basic validation
303
+ if not api_key:
304
+ logger.error("No Google API key found in environment variables")
305
+ new_state = state.copy()
306
+ new_state["result"] = {
307
+ "name": state["ingredient"],
308
+ "is_found": False,
309
+ "description": "Error: Missing API credentials for analysis"
310
+ }
311
+ new_state["analysis_done"] = True
312
+ new_state["status"] = "analysis_error"
313
+ return new_state
314
+
315
+ # Initialize LLM
316
+ try:
317
+ llm = ChatGoogleGenerativeAI(
318
+ google_api_key=api_key,
319
+ model=model_name,
320
+ temperature=0.3, # Lower temperature for more factual responses
321
+ convert_system_message_to_human=True
322
+ )
323
+ except Exception as e:
324
+ logger.error(f"Error initializing LLM: {e}")
325
+ new_state = state.copy()
326
+ new_state["result"] = {
327
+ "name": state["ingredient"],
328
+ "is_found": False,
329
+ "description": f"Error initializing LLM: {str(e)}"
330
+ }
331
+ new_state["analysis_done"] = True
332
+ new_state["status"] = "analysis_error"
333
+ return new_state
334
+
335
+ # Get sources from state
336
+ sources_data = state["sources_data"]
337
+ logger.info(f"Analyzing ingredient with {len(sources_data)} total sources")
338
+
339
+ # Filter for successful sources only
340
+ found_sources = [source for source in sources_data if source.get('found', False)]
341
+ logger.info(f"Found {len(found_sources)} sources with usable data")
342
+
343
+ # Create default result structure
344
+ result = {
345
+ "name": state["ingredient"],
346
+ "alternate_names": [],
347
+ "is_found": len(found_sources) > 0,
348
+ "safety_rating": 5, # Default middle rating
349
+ "description": "No reliable information found." if not found_sources else "",
350
+ "health_effects": ["Unknown - insufficient data"] if not found_sources else [],
351
+ "details_with_source": [
352
+ {
353
+ "source": source.get("source", "Unknown"),
354
+ "found": source.get("found", False),
355
+ "summary": create_summary_from_source(source) if source.get("found", False) else "No data found",
356
+ }
357
+ for source in sources_data
358
+ ]
359
+ }
360
+
361
+ # If we have data, analyze it
362
+ if found_sources:
363
+ # Format source data for the prompt
364
+ source_texts = []
365
+ for i, source in enumerate(found_sources):
366
+ source_name = source.get('source', f'Source {i+1}')
367
+ source_data = source.get('data')
368
+
369
+ # Process different data formats appropriately
370
+ try:
371
+ if isinstance(source_data, dict):
372
+ source_text = format_dict_source(source_name, source_data)
373
+ elif isinstance(source_data, list):
374
+ source_text = format_list_source(source_name, source_data)
375
+ elif isinstance(source_data, str):
376
+ # For string data, include as is (limiting length)
377
+ source_text = f"--- {source_name} ---\n{source_data[:1500]}"
378
+ else:
379
+ # For other types, convert to string
380
+ source_text = f"--- {source_name} ---\n{str(source_data)[:1000]}"
381
+
382
+ source_texts.append(source_text)
383
+ except Exception as e:
384
+ logger.error(f"Error formatting source {source_name}: {e}")
385
+ source_texts.append(f"--- {source_name} ---\nError formatting data: {str(e)}")
386
+
387
+ # Combine all source texts
388
+ combined_data = "\n\n".join(source_texts)
389
+ logger.debug(f"Combined data for analysis:\n{combined_data[:500]}...(truncated)")
390
+
391
+ # Create the analysis prompt
392
+ analysis_prompt = f"""
393
+ Task: Analyze food ingredient data and provide a structured assessment.
394
+
395
+ Ingredient: {state["ingredient"]}
396
+
397
+ Based on the following data sources, provide:
398
+ 1. Safety rating (scale 1-10, where 1=unsafe for consumption, 5=moderate concerns, 10=very safe)
399
+ 2. List of potential health effects (both positive & negative, maximum 5 points)
400
+ 3. Brief description of what this ingredient is, how it's used, and its properties
401
+ 4. Alternative names for this ingredient
402
+
403
+ Available data:
404
+ {combined_data}
405
+
406
+ Format your response as a JSON object with these keys:
407
+ - "safety_rating": (number between 1-10)
408
+ - "health_effects": (array of strings)
409
+ - "description": (string)
410
+ - "alternate_names": (array of strings)
411
+
412
+ Only include factual information supported by the provided data. If information is
413
+ unavailable for any field, use appropriate default values.
414
+ """
415
+
416
+ # Process with LLM
417
+ try:
418
+ logger.info("Sending analysis prompt to LLM")
419
+ llm_response = llm.invoke(analysis_prompt)
420
+ logger.info("Received LLM response")
421
+
422
+ # Extract and parse JSON from LLM response
423
+ try:
424
+ analysis_text = llm_response.content
425
+ logger.debug(f"LLM response: {analysis_text[:500]}...(truncated)")
426
+
427
+ # Find JSON in the response
428
+ start_idx = analysis_text.find('{')
429
+ end_idx = analysis_text.rfind('}') + 1
430
+
431
+ if start_idx >= 0 and end_idx > start_idx:
432
+ json_str = analysis_text[start_idx:end_idx]
433
+ analysis = json.loads(json_str)
434
+
435
+ # Update result with analyzed data
436
+ result.update({
437
+ "safety_rating": analysis.get("safety_rating", 5),
438
+ "description": analysis.get("description", "No description available."),
439
+ "health_effects": analysis.get("health_effects", []),
440
+ "alternate_names": analysis.get("alternate_names", [])
441
+ })
442
+ logger.info(f"Analysis complete - Safety Rating: {result['safety_rating']}")
443
+ else:
444
+ logger.warning("Could not find JSON in LLM response")
445
+ result["description"] = "Error: Failed to parse LLM analysis output."
446
+ except json.JSONDecodeError as e:
447
+ logger.error(f"JSON parsing error: {e}")
448
+ result["description"] = f"Error parsing analysis: {str(e)}"
449
+
450
+ except Exception as e:
451
+ logger.error(f"Error in LLM analysis: {e}")
452
+ logger.error(traceback.format_exc())
453
+ result.update({
454
+ "description": f"Error in analysis: {str(e)}",
455
+ "health_effects": ["Error in analysis"],
456
+ })
457
+
458
+ # Update state with results
459
+ new_state = state.copy()
460
+ new_state["result"] = result
461
+ new_state["analysis_done"] = True
462
+ new_state["status"] = "analysis_complete"
463
+ return new_state
464
+
465
+ def format_dict_source(source_name: str, source_data: dict) -> str:
466
+ """Format dictionary source data for LLM consumption."""
467
+ source_text = f"--- {source_name} ---\n"
468
+
469
+ # Handle different sources appropriately
470
+ if source_name == "Local DB":
471
+ relevant_keys = [k for k in source_data.keys()]
472
+ for key in relevant_keys:
473
+ source_text += f"{key}: {source_data[key]}\n"
474
+ elif source_name == "DuckDuckGo":
475
+ if isinstance(source_data, list):
476
+ for item in source_data:
477
+ source_text += f"Query: {item.get('query', '')}\n"
478
+ source_text += f"Summary: {item.get('result', '')[:500]}...\n"
479
+ elif source_name in ["Open Food Facts", "USDA FoodData Central"]:
480
+ # Extract key info for food databases
481
+ if "ingredients_text" in source_data:
482
+ source_text += f"Ingredients: {source_data['ingredients_text']}\n"
483
+ if "description" in source_data:
484
+ source_text += f"Description: {source_data['description']}\n"
485
+ if "categories" in source_data:
486
+ source_text += f"Categories: {source_data['categories']}\n"
487
+ # Include top-level fields only
488
+ for key, value in source_data.items():
489
+ if not isinstance(value, (dict, list)) and key not in ["ingredients_text", "description", "categories"]:
490
+ source_text += f"{key}: {value}\n"
491
+ elif source_name == "PubChem":
492
+ # Extract key chemical information
493
+ if "compound_info" in source_data:
494
+ source_text += "Chemical information:\n"
495
+ compound_data = source_data.get("compound_info", {})
496
+ if "PC_Compounds" in compound_data and len(compound_data["PC_Compounds"]) > 0:
497
+ compound = compound_data["PC_Compounds"][0]
498
+ source_text += f"Compound ID: {compound.get('id', {}).get('id', {}).get('cid', 'N/A')}\n"
499
+
500
+ if "properties" in source_data and source_data["properties"]:
501
+ properties = source_data["properties"]
502
+ if "PropertyTable" in properties:
503
+ prop_table = properties["PropertyTable"]
504
+ if "Properties" in prop_table and len(prop_table["Properties"]) > 0:
505
+ props = prop_table["Properties"][0]
506
+ source_text += "Properties:\n"
507
+ for key, value in props.items():
508
+ source_text += f"{key}: {value}\n"
509
+ else:
510
+ # Generic dictionary handling for other sources
511
+ for key, value in source_data.items():
512
+ if not isinstance(value, (dict, list)) or len(str(value)) < 100:
513
+ source_text += f"{key}: {value}\n"
514
+ else:
515
+ source_text += f"{key}: [Complex data]\n"
516
+
517
+ return source_text
518
+
519
+ def format_list_source(source_name: str, source_data: list) -> str:
520
+ """Format list source data for LLM consumption."""
521
+ source_text = f"--- {source_name} ---\n"
522
+
523
+ # Handle different list structures
524
+ if len(source_data) > 0:
525
+ if isinstance(source_data[0], dict):
526
+ # List of dictionaries
527
+ source_text += f"Found {len(source_data)} items:\n"
528
+ for i, item in enumerate(source_data[:3]): # Limit to first 3 items
529
+ source_text += f"Item {i+1}:\n"
530
+ for key, value in item.items():
531
+ if not isinstance(value, (dict, list)):
532
+ source_text += f" {key}: {value}\n"
533
+ else:
534
+ # List of other types
535
+ source_text += f"Data points ({len(source_data)}):\n"
536
+ for i, item in enumerate(source_data[:5]): # Limit to first 5 items
537
+ source_text += f"{i+1}. {str(item)[:200]}\n"
538
+ else:
539
+ source_text += "Empty list\n"
540
+
541
+ return source_text
542
+
543
+ class IngredientInfoAgentLangGraph:
544
+ # Add this method to your IngredientInfoAgentLangGraph class
545
+ def process_ingredient(self, ingredient: str) -> IngredientAnalysisResult:
546
+ """Process an ingredient using direct sequential approach instead of LangGraph."""
547
+ logger.info(f"=== Direct sequential processing for: {ingredient} ===")
548
+
549
+ # Initialize empty sources data
550
+ sources_data = []
551
+
552
+ # Run each tool directly in sequence and collect results
553
+ logger.info(f"Searching local database for {ingredient}")
554
+ result = search_local_db(ingredient)
555
+ if result.get("found", False):
556
+ sources_data.append(result)
557
+ logger.info(f"Local DB found data for {ingredient}")
558
+
559
+ logger.info(f"Searching web for {ingredient}")
560
+ result = search_web(ingredient)
561
+ if result.get("found", False):
562
+ sources_data.append(result)
563
+ logger.info(f"Web search found data for {ingredient}")
564
+
565
+ logger.info(f"Searching Wikipedia for {ingredient}")
566
+ result = search_wikipedia(ingredient)
567
+ if result.get("found", False):
568
+ sources_data.append(result)
569
+ logger.info(f"Wikipedia found data for {ingredient}")
570
+
571
+ logger.info(f"Searching Open Food Facts for {ingredient}")
572
+ result = search_open_food_facts(ingredient)
573
+ if result.get("found", False):
574
+ sources_data.append(result)
575
+ logger.info(f"Open Food Facts found data for {ingredient}")
576
+
577
+ # Optional - Add these if needed:
578
+ logger.info(f"Searching USDA for {ingredient}")
579
+ result = search_usda(ingredient)
580
+ if result.get("found", False):
581
+ sources_data.append(result)
582
+ logger.info(f"USDA found data for {ingredient}")
583
+
584
+ logger.info(f"Searching PubChem for {ingredient}")
585
+ result = search_pubchem(ingredient)
586
+ if result.get("found", False):
587
+ sources_data.append(result)
588
+ logger.info(f"PubChem found data for {ingredient}")
589
+
590
+ # Create a state for analysis
591
+ state = {
592
+ "ingredient": ingredient,
593
+ "sources_data": sources_data,
594
+ "result": None,
595
+ "status": "ready_for_analysis",
596
+ "analysis_done": False,
597
+ "local_db_checked": True,
598
+ "web_search_done": True,
599
+ "wikipedia_checked": True,
600
+ "open_food_facts_checked": True,
601
+ "usda_checked": True,
602
+ "pubchem_checked": True
603
+ }
604
+
605
+ # Run the analysis with the collected data
606
+ final_state = analyze_ingredient(state)
607
+
608
+ # Extract the result or create a default
609
+ if final_state.get("result"):
610
+ logger.info(f"Analysis complete for {ingredient}")
611
+ return IngredientAnalysisResult(**final_state["result"])
612
+ else:
613
+ logger.info(f"No result in final state for {ingredient}, returning default")
614
+ return IngredientAnalysisResult(
615
+ name=ingredient,
616
+ is_found=len(sources_data) > 0,
617
+ details_with_source=sources_data
618
+ )
619
+
620
+ if __name__ == "__main__":
621
+ agent = IngredientInfoAgentLangGraph()
622
+
623
+ # Use the simple method that works reliably
624
+ result = agent.process_ingredient("SODIUM TRIPOLYPHOSPHATE")
625
+ print(json.dumps(result.model_dump(), indent=2))
626
+
627
+ benzoate_result = agent.process_ingredient("Sodium Benzoate")
628
+ print(json.dumps(benzoate_result.model_dump(), indent=2))
services/ingredients.py CHANGED
@@ -1,10 +1,10 @@
1
  from sqlalchemy.orm import Session
2
- from models.ingredient import Ingredient
3
  from fastapi import HTTPException
4
  from cachetools import cached, TTLCache
5
  from typing import List, Dict, Any
6
  import requests
7
- from utils.fetch_data import fetch_ingredient_data_from_api
8
 
9
  cache = TTLCache(maxsize=100, ttl=300)
10
 
@@ -55,15 +55,3 @@ def save_ingredient_data(db: Session, name: str, data: Dict[str, Any]):
55
  db.add(ingredient)
56
  db.commit()
57
  db.refresh(ingredient)
58
-
59
- def filter_ingredients_by_preferences(ingredients: List[Dict[str, Any]], preferences: Dict[str, Any]) -> List[Dict[str, Any]]:
60
- filtered_ingredients = []
61
- for ingredient in ingredients:
62
- if preferences.get("low_sugar") and ingredient.get("sugar", 0) > 5:
63
- continue
64
- if preferences.get("low_fat") and ingredient.get("fat", 0) > 5:
65
- continue
66
- if preferences.get("allergens") and any(allergen in ingredient.get("allergens", []) for allergen in preferences["allergens"]):
67
- continue
68
- filtered_ingredients.append(ingredient)
69
- return filtered_ingredients
 
1
  from sqlalchemy.orm import Session
2
+ from db.models import Ingredient
3
  from fastapi import HTTPException
4
  from cachetools import cached, TTLCache
5
  from typing import List, Dict, Any
6
  import requests
7
+ from utils.fetch_data import fetch_product_data_from_api
8
 
9
  cache = TTLCache(maxsize=100, ttl=300)
10
 
 
55
  db.add(ingredient)
56
  db.commit()
57
  db.refresh(ingredient)
 
 
 
 
 
 
 
 
 
 
 
 
services/logging_service.py DELETED
@@ -1,38 +0,0 @@
1
- import logging
2
- from logging.handlers import RotatingFileHandler
3
-
4
- # Configure logging
5
- logger = logging.getLogger("food_analyzer")
6
- logger.setLevel(logging.DEBUG)
7
-
8
- # Create a file handler that logs debug and higher level messages
9
- file_handler = RotatingFileHandler("food_analyzer.log", maxBytes=5*1024*1024, backupCount=3)
10
- file_handler.setLevel(logging.DEBUG)
11
-
12
- # Create a console handler that logs error and higher level messages
13
- console_handler = logging.StreamHandler()
14
- console_handler.setLevel(logging.ERROR)
15
-
16
- # Create a formatter and set it for both handlers
17
- formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
18
- file_handler.setFormatter(formatter)
19
- console_handler.setFormatter(formatter)
20
-
21
- # Add the handlers to the logger
22
- logger.addHandler(file_handler)
23
- logger.addHandler(console_handler)
24
-
25
- def log_debug(message: str):
26
- logger.debug(message)
27
-
28
- def log_info(message: str):
29
- logger.info(message)
30
-
31
- def log_warning(message: str):
32
- logger.warning(message)
33
-
34
- def log_error(message: str):
35
- logger.error(message)
36
-
37
- def log_critical(message: str):
38
- logger.critical(message)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
services/scan_history.py CHANGED
@@ -1,20 +1,35 @@
 
1
  from sqlalchemy.orm import Session
2
- from models.scan_history import ScanHistory
3
  from datetime import datetime
 
4
 
5
  def record_scan(db: Session, user_id: int, product_id: int) -> ScanHistory:
6
- scan_entry = ScanHistory(
7
- user_id=user_id,
8
- product_id=product_id,
9
- scan_date=datetime.utcnow()
10
- )
11
- db.add(scan_entry)
12
- db.commit()
13
- db.refresh(scan_entry)
14
- return scan_entry
 
 
 
 
 
 
15
 
16
  def get_scan_history(db: Session, user_id: int) -> list[ScanHistory]:
17
- return db.query(ScanHistory)\
18
- .filter(ScanHistory.user_id == user_id)\
19
- .order_by(ScanHistory.scan_date.desc())\
20
- .all()
 
 
 
 
 
 
 
 
1
+ from fastapi import HTTPException
2
  from sqlalchemy.orm import Session
3
+ from db.models import ScanHistory
4
  from datetime import datetime
5
+ from logger_manager import log_info, log_error
6
 
7
  def record_scan(db: Session, user_id: int, product_id: int) -> ScanHistory:
8
+ log_info("Recording scan")
9
+ try:
10
+ scan_entry = ScanHistory(
11
+ user_id=user_id,
12
+ product_id=product_id,
13
+ scan_date=datetime.utcnow()
14
+ )
15
+ db.add(scan_entry)
16
+ db.commit()
17
+ db.refresh(scan_entry)
18
+ log_info("Scan recorded successfully")
19
+ return scan_entry
20
+ except Exception as e:
21
+ log_error(f"Error recording scan: {str(e)}")
22
+ raise HTTPException(status_code=500, detail="Internal Server Error")
23
 
24
  def get_scan_history(db: Session, user_id: int) -> list[ScanHistory]:
25
+ log_info("Getting scan history")
26
+ try:
27
+ scan_history = db.query(ScanHistory)\
28
+ .filter(ScanHistory.user_id == user_id)\
29
+ .order_by(ScanHistory.scan_date.desc())\
30
+ .all()
31
+ log_info("Scan history retrieved successfully")
32
+ return scan_history
33
+ except Exception as e:
34
+ log_error(f"Error getting scan history: {str(e)}")
35
+ raise HTTPException(status_code=500, detail="Internal Server Error")