import streamlit as st from openai import OpenAI import json, os import requests #Used the @st.cache_resource decorator on this function. #This Streamlit decorator ensures that the function is only executed once and its result (the OpenAI client) is cached. #Subsequent calls to this function will return the cached client, avoiding unnecessary recreation. @st.cache_resource def get_openai_client(): #Enable debug mode for testing only return True, OpenAI(api_key=os.getenv("OPENAI_API_KEY")) @st.cache_resource def get_backend_urls(): data_extractor_url = "https://data-extractor-67qj89pa0-sonikas-projects-9936eaad.vercel.app/" return data_extractor_url debug_mode, client = get_openai_client() data_extractor_url = get_backend_urls() def extract_data_from_product_image(image_links, data_extractor_url): #Send product label image url to data extractor url = data_extractor_url + "extract" data = { "image_links": image_links } try: response = requests.post(url, json=data) if response.status_code == 200 or response.status_code == 201: print("POST Response:", response.json()) # Assuming JSON response return response.json() else: print(f"POST Request failed with status code: {response.status_code}") return {} except requests.exceptions.RequestException as e: print(f"Error occurred: {e}") return {} def get_product_data_from_db(product_name, data_extractor_url): #Extract data for a product by calling data extractor's API : https://data-extractor-3cn8or2tc-sonikas-projects-9936eaad.vercel.app/ url = data_extractor_url + "product" params = {"name": product_name} try: response = requests.get(url, params = params) # Check if the request was successful if response.status_code == 200: print("GET Response:", response.json()) # Assuming the response is JSON return response.json() else: print(f"GET Request failed with status code: {response.status_code}") return {} except requests.exceptions.RequestException as e: print(f"Error occurred: {e}") return {} def get_product_list(product_name_by_user, data_extractor_url): url = data_extractor_url + "find_product" params = {"name": product_name_by_user} try: response = requests.get(url, params = params) # Check if the request was successful if response.status_code == 200: print("GET Response:", response.json()) # Assuming the response is JSON return response.json() else: print(f"GET Request failed with status code: {response.status_code}") return {} except requests.exceptions.RequestException as e: print(f"Error occurred: {e}") return {} # Initialize assistants and vector stores # Function to initialize vector stores and assistants @st.cache_resource def initialize_assistants_and_vector_stores(): #Processing Level global client assistant1 = client.beta.assistants.create( name="Processing Level", instructions="You are an expert dietician. Use you knowledge base to answer questions about the processing level of food product.", model="gpt-4o", tools=[{"type": "file_search"}], temperature=0, top_p = 0.85 ) #Harmful Ingredients assistant2 = client.beta.assistants.create( name="Harmful Ingredients", instructions="You are an expert dietician. Use you knowledge base to answer questions about the ingredients in food product.", model="gpt-4o", tools=[{"type": "file_search"}], temperature=0, top_p = 0.85 ) #Harmful Ingredients assistant3 = client.beta.assistants.create( name="Misleading Claims", instructions="You are an expert dietician. Use you knowledge base to answer questions about the misleading claims about food product.", model="gpt-4o", tools=[{"type": "file_search"}], temperature=0, top_p = 0.85 ) # Create a vector store vector_store1 = client.beta.vector_stores.create(name="Processing Level Vec") # Ready the files for upload to OpenAI file_paths = ["Processing_Level.docx"] file_streams = [open(path, "rb") for path in file_paths] # Use the upload and poll SDK helper to upload the files, add them to the vector store, # and poll the status of the file batch for completion. file_batch1 = client.beta.vector_stores.file_batches.upload_and_poll( vector_store_id=vector_store1.id, files=file_streams ) # You can print the status and the file counts of the batch to see the result of this operation. print(file_batch1.status) print(file_batch1.file_counts) # Create a vector store vector_store2 = client.beta.vector_stores.create(name="Harmful Ingredients Vec") # Ready the files for upload to OpenAI file_paths = ["Ingredients.docx"] file_streams = [open(path, "rb") for path in file_paths] # Use the upload and poll SDK helper to upload the files, add them to the vector store, # and poll the status of the file batch for completion. file_batch2 = client.beta.vector_stores.file_batches.upload_and_poll( vector_store_id=vector_store2.id, files=file_streams ) # You can print the status and the file counts of the batch to see the result of this operation. print(file_batch2.status) print(file_batch2.file_counts) # Create a vector store vector_store3 = client.beta.vector_stores.create(name="Misleading Claims Vec") # Ready the files for upload to OpenAI file_paths = ["MisLeading_Claims.docx"] file_streams = [open(path, "rb") for path in file_paths] # Use the upload and poll SDK helper to upload the files, add them to the vector store, # and poll the status of the file batch for completion. file_batch3 = client.beta.vector_stores.file_batches.upload_and_poll( vector_store_id=vector_store3.id, files=file_streams ) # You can print the status and the file counts of the batch to see the result of this operation. print(file_batch3.status) print(file_batch3.file_counts) #Processing Level assistant1 = client.beta.assistants.update( assistant_id=assistant1.id, tool_resources={"file_search": {"vector_store_ids": [vector_store1.id]}}, ) #harmful Ingredients assistant2 = client.beta.assistants.update( assistant_id=assistant2.id, tool_resources={"file_search": {"vector_store_ids": [vector_store2.id]}}, ) #Misleading Claims assistant3 = client.beta.assistants.update( assistant_id=assistant3.id, tool_resources={"file_search": {"vector_store_ids": [vector_store3.id]}}, ) return assistant1, assistant2, assistant3 assistant1, assistant2, assistant3 = initialize_assistants_and_vector_stores() def analyze_processing_level(ingredients, brand_name, product_name, assistant_id): global debug_mode, client thread = client.beta.threads.create( messages=[ { "role": "user", "content": "Categorize food product that has following ingredients: " + ', '.join(ingredients) + " into Group A, Group B, or Group C based on the document. The output must only be the group category name (Group A, Group B, or Group C) alongwith the reason behind assigning that respective category to the product. If the group category cannot be determined, output 'NOT FOUND'.", } ] ) run = client.beta.threads.runs.create_and_poll( thread_id=thread.id, assistant_id=assistant_id, include=["step_details.tool_calls[*].file_search.results[*].content"] ) messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) message_content = messages[0].content[0].text annotations = message_content.annotations #citations = [] for index, annotation in enumerate(annotations): message_content.value = message_content.value.replace(annotation.text, "") #if file_citation := getattr(annotation, "file_citation", None): # cited_file = client.files.retrieve(file_citation.file_id) # citations.append(f"[{index}] {cited_file.filename}") if debug_mode: print(message_content.value) processing_level_str = message_content.value return processing_level_str def analyze_harmful_ingredients(ingredients, brand_name, product_name, assistant_id): global debug_mode, client thread = client.beta.threads.create( messages=[ { "role": "user", "content": "Provide detailed information about food product that has following ingredients: " + ', '.join(ingredients) + ". The output must be in JSON format: {: }. If information about an ingredient is not found in the documents, the value for that ingredient must start with the prefix '(NOT FOUND IN DOCUMENT)' followed by the LLM's response based on its own knowledge.", } ] ) run = client.beta.threads.runs.create_and_poll( thread_id=thread.id, assistant_id=assistant_id, include=["step_details.tool_calls[*].file_search.results[*].content"] ) messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) message_content = messages[0].content[0].text annotations = message_content.annotations #citations = [] #print(f"Length of annotations is {len(annotations)}") for index, annotation in enumerate(annotations): if file_citation := getattr(annotation, "file_citation", None): #cited_file = client.files.retrieve(file_citation.file_id) #citations.append(f"[{index}] {cited_file.filename}") message_content.value = message_content.value.replace(annotation.text, "") if debug_mode: ingredients_not_found_in_doc = [] print(message_content.value) for key, value in json.loads(message_content.value.replace("```", "").replace("json", "")).items(): if value.startswith("(NOT FOUND IN DOCUMENT)"): ingredients_not_found_in_doc.append(key) print(f"Ingredients not found in the harmful ingredients doc are {','.join(ingredients_not_found_in_doc)}") harmful_ingredient_analysis = json.loads(message_content.value.replace("```", "").replace("json", "").replace("(NOT FOUND IN DOCUMENT) ", "")) harmful_ingredient_analysis_str = "" for key, value in harmful_ingredient_analysis.items(): harmful_ingredient_analysis_str += f"{key}: {value}\n" return harmful_ingredient_analysis_str def analyze_claims(claims, assistant_id): global debug_mode, client thread = client.beta.threads.create( messages=[ { "role": "user", "content": "Provide detailed information about the food product with following claims: " + ', '.join(claims) + ". The output must be in JSON format: {: }. If information about a claim is not found in the documents, the value for that claim must start with the prefix '(NOT FOUND IN DOCUMENT)' followed by the LLM's response based on its own knowledge.", } ] ) run = client.beta.threads.runs.create_and_poll( thread_id=thread.id, assistant_id=assistant_id, include=["step_details.tool_calls[*].file_search.results[*].content"] ) messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) message_content = messages[0].content[0].text annotations = message_content.annotations #citations = [] #print(f"Length of annotations is {len(annotations)}") for index, annotation in enumerate(annotations): if file_citation := getattr(annotation, "file_citation", None): #cited_file = client.files.retrieve(file_citation.file_id) #citations.append(f"[{index}] {cited_file.filename}") message_content.value = message_content.value.replace(annotation.text, "") if debug_mode: claims_not_found_in_doc = [] print(message_content.value) for key, value in json.loads(message_content.value.replace("```", "").replace("json", "")).items(): if value.startswith("(NOT FOUND IN DOCUMENT)"): claims_not_found_in_doc.append(key) print(f"Claims not found in the doc are {','.join(claims_not_found_in_doc)}") claims_analysis = json.loads(message_content.value.replace("```", "").replace("json", "").replace("(NOT FOUND IN DOCUMENT) ", "")) claims_analysis_str = "" for key, value in claims_analysis.items(): claims_analysis_str += f"{key}: {value}\n" return claims_analysis_str def generate_final_analysis(brand_name, product_name, processing_level, harmful_ingredient_analysis, claims_analysis): global debug_mode, client system_prompt = """You are provided with a detailed analysis of a food product. Your task is to generate actionable insights to help the user decide whether to consume the product, at what frequency, and identify any potential harms or benefits. Consider the context of consumption to ensure the advice is personalized and practical. Use the following criteria to generate your response: 1. **Nutrition Analysis:** - How processed is the product? 2. **Harmful Ingredients:** - Identify any harmful or questionable ingredients. 3. **Misleading Claims:** - Are there any misleading claims made by the brand? Additionally, consider the following while generating insights: 1. **Consumption Context:** - Is the product being consumed for health reasons or as a treat? - Could the consumer be overlooking hidden harms? - If the product is something they could consume daily, should they? - If they are consuming it daily, what potential harm are they not noticing? - If the product is intended for health purposes, are there concerns the user might miss? **Output:** - Recommend whether the product should be consumed or avoided. - If recommended, specify the appropriate frequency and intended functionality (e.g., treat vs. health). - Highlight any risks or benefits at that level of consumption.""" user_prompt = f""" Product Name: {brand_name} {product_name} Processing Level: {processing_level} Ingredient Analysis: {harmful_ingredient_analysis} Claims Analysis: {claims_analysis} """ if debug_mode: print(f"\nuser_prompt : \n {user_prompt}") completion = client.chat.completions.create( model="gpt-4o", # Make sure to use an appropriate model messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] ) return completion.choices[0].message.content def analyze_product(product_info_raw): global assistant1, assistant2, assistant3 if product_info_raw != "{}": product_info_from_db = json.loads(product_info_raw) brand_name = product_info_from_db.get("brandName", "") product_name = product_info_from_db.get("productName", "") ingredients_list = [ingredient["name"] for ingredient in product_info_from_db.get("ingredients", [])] claims_list = product_info_from_db.get("claims", []) if len(ingredients_list) > 0: processing_level = analyze_processing_level(ingredients_list, brand_name, product_name, assistant1.id) if ingredients_list else "" harmful_ingredient_analysis = analyze_harmful_ingredients(ingredients_list, brand_name, product_name, assistant2.id) if ingredients_list else "" if len(claims_list) > 0: claims_analysis = analyze_claims(claims_list, assistant3.id) if claims_list else "" final_analysis = generate_final_analysis(brand_name,product_name,processing_level,harmful_ingredient_analysis,claims_analysis) return final_analysis else: return "I'm sorry, product information could not be extracted from the url." # Streamlit app # Initialize session state if 'messages' not in st.session_state: st.session_state.messages = [] def chatbot_response(image_urls_str, product_name_by_user, data_extractor_url, extract_info = True): # Process the user input and generate a response processing_level = "" harmful_ingredient_analysis = "" claims_analysis = "" image_urls = [] if product_name_by_user != "": similar_product_list_json = get_product_list(product_name_by_user, data_extractor_url) if similar_product_list_json != "{}" and extract_info == False: print(f"similar_product_list_json : {similar_product_list_json}") similar_product_list = json.loads(similar_product_list_json)['product_list'] return similar_product_list elif extract_info == True: with st.spinner("Analyzing the product... This may take a moment."): product_info_raw = get_product_data_from_db(product_name_by_user, data_extractor_url) print(f"DEBUG product_info_raw : {product_info_raw}") final_analysis = analyze_product(product_info_raw) return final_analysis else: return "" elif "http:/" in image_urls_str.lower() or "https:/" in image_urls_str.lower(): # Extract image URL from user input if "," not in image_urls_str: image_urls.append(image_urls_str) else: for url in image_urls_str.split(","): if "http:/" in url.lower() or "https:/" in url.lower(): image_urls.append(url) with st.spinner("Analyzing the product... This may take a moment."): product_info_raw = extract_data_from_product_image(image_urls, data_extractor_url) print(f"DEBUG product_info_raw : {product_info_raw}") final_analysis = analyze_product(product_info_raw) return final_analysis else: return "I'm here to analyze food products. Please provide an image URL (Example : http://example.com/image.jpg) or product name (Example : Harvest Gold Bread)" # Streamlit app st.title("ConsumeWise") st.write("Hello! I'm your food product analysis assistant. Select one of the following options") #Give the following four options to user to pick from #1. I want to know more about this product #2. How much % of my daily allowance of calorie, fat, protein, carb, sugar or any other nutrient will I exhaust if i eat this? #3. I want to understand what this ingredient name means #4. I want to understand what different nutrients mean #If user selects 1, ask user to enter image_urls separated by commas else do nothing. Repeat the options after response is generated. #Do not allow user to send prompts while chatbot is still generating a response # User input product_name_by_user = st.text_input("Enter product name") # Display chat messages for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) if product_name_by_user != "": # Add user message to chat history st.session_state.messages.append({"role": "user", "content": product_name_by_user}) with st.chat_message("user"): st.markdown(product_name_by_user) # Prevent further input while generating response st.session_state["response_pending"] = True # Generate and display assistant response. Do not allow user to send prompts while chatbot is still generating a response similar_product_list = chatbot_response("", product_name_by_user, data_extractor_url, extract_info = False) if len(similar_product_list) > 0: choice = st.radio("Select a product:", similar_product_list.append("None of the above")) if choice != "None of the above": response = chatbot_response("", product_name_by_user, data_extractor_url, extract_info = True) if len(similar_product_list) == 0 or choice == "None of the above": image_urls_str = st.text_input("Enter image URLs separated by commas") # Add user message to chat history st.session_state.messages.append({"role": "user", "content": image_urls_str}) with st.chat_message("user"): st.markdown(image_urls_str) response = chatbot_response(image_urls_str, "", data_extractor_url, extract_info = True) st.session_state.messages.append({"role": "assistant", "content": response}) with st.chat_message("assistant"): st.markdown(response) st.session_state["response_pending"] = False # Option to clear chat history if st.button("Clear Chat History"): st.session_state.messages = [] st.rerun()