|
import streamlit as st |
|
from openai import OpenAI |
|
import json, os |
|
import requests |
|
|
|
|
|
debug_mode = True |
|
|
|
|
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
data_extractor_url = "https://data-extractor-67qj89pa0-sonikas-projects-9936eaad.vercel.app/" |
|
|
|
def extract_data_from_product_image(image_links, data_extractor_url): |
|
|
|
url = data_extractor_url + "extract" |
|
data = { |
|
"image_links": image_links |
|
} |
|
|
|
try: |
|
response = requests.post(url, json=data) |
|
if response.status_code == 200 or response.status_code == 201: |
|
print("POST Response:", response.json()) |
|
return response.json() |
|
else: |
|
print(f"POST Request failed with status code: {response.status_code}") |
|
return {} |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error occurred: {e}") |
|
return {} |
|
|
|
def get_product_data_from_db(product_name, data_extractor_url): |
|
|
|
url = data_extractor_url + "product" |
|
params = {"name": product_name} |
|
|
|
try: |
|
response = requests.get(url, params = params) |
|
|
|
if response.status_code == 200: |
|
print("GET Response:", response.json()) |
|
return response.json() |
|
else: |
|
print(f"GET Request failed with status code: {response.status_code}") |
|
return {} |
|
except requests.exceptions.RequestException as e: |
|
print(f"Error occurred: {e}") |
|
return {} |
|
|
|
|
|
|
|
|
|
|
|
|
|
assistant1 = client.beta.assistants.create( |
|
name="Processing Level", |
|
instructions="You are an expert dietician. Use you knowledge base to answer questions about the processing level of food product.", |
|
model="gpt-4o", |
|
tools=[{"type": "file_search"}], |
|
temperature=0, |
|
top_p = 0.85 |
|
) |
|
|
|
|
|
assistant2 = client.beta.assistants.create( |
|
name="Harmful Ingredients", |
|
instructions="You are an expert dietician. Use you knowledge base to answer questions about the ingredients in food product.", |
|
model="gpt-4o", |
|
tools=[{"type": "file_search"}], |
|
temperature=0, |
|
top_p = 0.85 |
|
) |
|
|
|
|
|
assistant3 = client.beta.assistants.create( |
|
name="Misleading Claims", |
|
instructions="You are an expert dietician. Use you knowledge base to answer questions about the misleading claims about food product.", |
|
model="gpt-4o", |
|
tools=[{"type": "file_search"}], |
|
temperature=0, |
|
top_p = 0.85 |
|
) |
|
|
|
|
|
vector_store1 = client.beta.vector_stores.create(name="Processing Level Vec") |
|
|
|
|
|
file_paths = ["Processing_Level.docx"] |
|
file_streams = [open(path, "rb") for path in file_paths] |
|
|
|
|
|
|
|
file_batch1 = client.beta.vector_stores.file_batches.upload_and_poll( |
|
vector_store_id=vector_store1.id, files=file_streams |
|
) |
|
|
|
|
|
print(file_batch1.status) |
|
print(file_batch1.file_counts) |
|
|
|
|
|
vector_store2 = client.beta.vector_stores.create(name="Harmful Ingredients Vec") |
|
|
|
|
|
file_paths = ["Ingredients.docx"] |
|
file_streams = [open(path, "rb") for path in file_paths] |
|
|
|
|
|
|
|
file_batch2 = client.beta.vector_stores.file_batches.upload_and_poll( |
|
vector_store_id=vector_store2.id, files=file_streams |
|
) |
|
|
|
|
|
print(file_batch2.status) |
|
print(file_batch2.file_counts) |
|
|
|
|
|
vector_store3 = client.beta.vector_stores.create(name="Misleading Claims Vec") |
|
|
|
|
|
file_paths = ["MisLeading_Claims.docx"] |
|
file_streams = [open(path, "rb") for path in file_paths] |
|
|
|
|
|
|
|
file_batch3 = client.beta.vector_stores.file_batches.upload_and_poll( |
|
vector_store_id=vector_store3.id, files=file_streams |
|
) |
|
|
|
|
|
print(file_batch3.status) |
|
print(file_batch3.file_counts) |
|
|
|
|
|
assistant1 = client.beta.assistants.update( |
|
assistant_id=assistant1.id, |
|
tool_resources={"file_search": {"vector_store_ids": [vector_store1.id]}}, |
|
) |
|
|
|
|
|
assistant2 = client.beta.assistants.update( |
|
assistant_id=assistant2.id, |
|
tool_resources={"file_search": {"vector_store_ids": [vector_store2.id]}}, |
|
) |
|
|
|
|
|
assistant3 = client.beta.assistants.update( |
|
assistant_id=assistant3.id, |
|
tool_resources={"file_search": {"vector_store_ids": [vector_store3.id]}}, |
|
) |
|
|
|
def analyze_processing_level(ingredients, brand_name, product_name, assistant_id): |
|
global debug_mode |
|
thread = client.beta.threads.create( |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": "Categorize product " + brand_name + " " + product_name + " that has following ingredients: " + ', '.join(ingredients) + " into Group A, Group B, or Group C based on the document. The output must only be the group category name (Group A, Group B, or Group C) alongwith the definition of the respective category. If the group category cannot be determined, output 'NOT FOUND'.", |
|
} |
|
] |
|
) |
|
|
|
run = client.beta.threads.runs.create_and_poll( |
|
thread_id=thread.id, |
|
assistant_id=assistant_id, |
|
include=["step_details.tool_calls[*].file_search.results[*].content"] |
|
) |
|
|
|
messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) |
|
|
|
message_content = messages[0].content[0].text |
|
annotations = message_content.annotations |
|
|
|
for index, annotation in enumerate(annotations): |
|
message_content.value = message_content.value.replace(annotation.text, "") |
|
|
|
|
|
|
|
|
|
if debug_mode: |
|
print(message_content.value) |
|
processing_level_str = message_content.value |
|
return processing_level_str |
|
|
|
def analyze_harmful_ingredients(ingredients, brand_name, product_name, assistant_id): |
|
global debug_mode |
|
thread = client.beta.threads.create( |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": "Provide detailed information about product " + brand_name + " " + product_name + " that has following ingredients: " + ', '.join(ingredients) + ". The output must be in JSON format: {<ingredient_name>: <information from the document>}. If information about an ingredient is not found in the documents, the value for that ingredient must start with the prefix '(NOT FOUND IN DOCUMENT)' followed by the LLM's response based on its own knowledge.", |
|
} |
|
] |
|
) |
|
|
|
run = client.beta.threads.runs.create_and_poll( |
|
thread_id=thread.id, |
|
assistant_id=assistant_id, |
|
include=["step_details.tool_calls[*].file_search.results[*].content"] |
|
) |
|
|
|
messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) |
|
message_content = messages[0].content[0].text |
|
annotations = message_content.annotations |
|
|
|
|
|
|
|
|
|
|
|
for index, annotation in enumerate(annotations): |
|
if file_citation := getattr(annotation, "file_citation", None): |
|
|
|
|
|
message_content.value = message_content.value.replace(annotation.text, "") |
|
|
|
if debug_mode: |
|
ingredients_not_found_in_doc = [] |
|
print(message_content.value) |
|
for key, value in json.loads(message_content.value.replace("```", "").replace("json", "")).items(): |
|
if value.startswith("(NOT FOUND IN DOCUMENT)"): |
|
ingredients_not_found_in_doc.append(key) |
|
print(f"Ingredients not found in the harmful ingredients doc are {','.join(ingredients_not_found_in_doc)}") |
|
harmful_ingredient_analysis = json.loads(message_content.value.replace("```", "").replace("json", "").replace("(NOT FOUND IN DOCUMENT) ", "")) |
|
|
|
harmful_ingredient_analysis_str = "" |
|
for key, value in harmful_ingredient_analysis.items(): |
|
harmful_ingredient_analysis_str += f"{key}: {value}\n" |
|
return harmful_ingredient_analysis_str |
|
|
|
def analyze_claims(claims, assistant_id): |
|
global debug_mode |
|
thread = client.beta.threads.create( |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": "Provide detailed information about the following claims: " + ', '.join(claims) + ". The output must be in JSON format: {<claim_name>: <information from the document>}. If information about a claim is not found in the documents, the value for that claim must start with the prefix '(NOT FOUND IN DOCUMENT)' followed by the LLM's response based on its own knowledge.", |
|
} |
|
] |
|
) |
|
|
|
run = client.beta.threads.runs.create_and_poll( |
|
thread_id=thread.id, |
|
assistant_id=assistant_id, |
|
include=["step_details.tool_calls[*].file_search.results[*].content"] |
|
) |
|
|
|
messages = list(client.beta.threads.messages.list(thread_id=thread.id, run_id=run.id)) |
|
|
|
message_content = messages[0].content[0].text |
|
|
|
|
|
annotations = message_content.annotations |
|
|
|
|
|
|
|
|
|
|
|
for index, annotation in enumerate(annotations): |
|
if file_citation := getattr(annotation, "file_citation", None): |
|
|
|
|
|
message_content.value = message_content.value.replace(annotation.text, "") |
|
|
|
if debug_mode: |
|
claims_not_found_in_doc = [] |
|
print(message_content.value) |
|
for key, value in json.loads(message_content.value.replace("```", "").replace("json", "")).items(): |
|
if value.startswith("(NOT FOUND IN DOCUMENT)"): |
|
claims_not_found_in_doc.append(key) |
|
print(f"Claims not found in the doc are {','.join(claims_not_found_in_doc)}") |
|
claims_analysis = json.loads(message_content.value.replace("```", "").replace("json", "").replace("(NOT FOUND IN DOCUMENT) ", "")) |
|
|
|
claims_analysis_str = "" |
|
for key, value in claims_analysis.items(): |
|
claims_analysis_str += f"{key}: {value}\n" |
|
|
|
return claims_analysis_str |
|
|
|
def generate_final_analysis(product_info, processing_level, harmful_ingredient_analysis, claims_analysis): |
|
global debug_mode |
|
system_prompt = """You are provided with a detailed analysis of a food product. Your task is to generate actionable insights to help the user decide whether to consume the product, at what frequency, and identify any potential harms or benefits. Consider the context of consumption to ensure the advice is personalized and practical. |
|
|
|
Use the following criteria to generate your response: |
|
|
|
1. **Nutrition Analysis:** |
|
- How processed is the product? |
|
|
|
2. **Harmful Ingredients:** |
|
- Identify any harmful or questionable ingredients. |
|
|
|
3. **Misleading Claims:** |
|
- Are there any misleading claims made by the brand? |
|
|
|
Additionally, consider the following while generating insights: |
|
|
|
1. **Consumption Context:** |
|
- Is the product being consumed for health reasons or as a treat? |
|
- Could the consumer be overlooking hidden harms? |
|
- If the product is something they could consume daily, should they? |
|
- If they are consuming it daily, what potential harm are they not noticing? |
|
- If the product is intended for health purposes, are there concerns the user might miss? |
|
|
|
**Output:** |
|
- Recommend whether the product should be consumed or avoided. |
|
- If recommended, specify the appropriate frequency and intended functionality (e.g., treat vs. health). |
|
- Highlight any risks or benefits at that level of consumption.""" |
|
|
|
user_prompt = f""" |
|
Product Name: {product_info['brandName']} {product_info['productName']} |
|
|
|
Processing Level: |
|
{processing_level} |
|
|
|
Ingredient Analysis: |
|
{harmful_ingredient_analysis} |
|
|
|
Claims Analysis: |
|
{claims_analysis} |
|
""" |
|
if debug_mode: |
|
print(f"\nuser_prompt : \n {user_prompt}") |
|
|
|
completion = client.chat.completions.create( |
|
model="gpt-4o", |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": user_prompt} |
|
] |
|
) |
|
|
|
return completion.choices[0].message.content |
|
|
|
|
|
def main(): |
|
st.title("Food Product Analysis") |
|
|
|
|
|
with st.form("product_analysis_form"): |
|
st.write("Please enter the following information about the product:") |
|
|
|
|
|
image_links = st.text_area("Image Links (separated by commas)") |
|
|
|
|
|
submitted = st.form_submit_button("Analyze Product") |
|
|
|
if submitted: |
|
|
|
|
|
image_links_list = [] |
|
if image_links: |
|
image_links_list = [image_link.strip() for image_link in image_links.split(',')] |
|
|
|
|
|
with st.spinner("Analyzing the product... This may take a moment."): |
|
|
|
processing_level = "" |
|
harmful_ingredient_analysis = "" |
|
claims_analysis = "" |
|
ingredients_list = [] |
|
claims_list = [] |
|
product_info = {} |
|
brand_name = "" |
|
brand_product = "" |
|
|
|
if len(image_links_list) > 0: |
|
is_added = extract_data_from_product_image(image_links_list) |
|
print(f"DEBUG : {is_added}") |
|
if "data added" in is_added: |
|
product_info_from_db = get_product_data_from_db(product_name) |
|
try: |
|
brand_name = product_info_from_db["brandName"] |
|
except: |
|
print(f"Brand Name not found in product_info {product_info_from_db}") |
|
|
|
try: |
|
product_name = product_info_from_db["productName"] |
|
except: |
|
print(f"Product Name not found in product_info {product_info_from_db}") |
|
|
|
try: |
|
ingredients_list = [ingredient["name"] for ingredient in product_info_from_db["ingredients"]] |
|
except: |
|
print(f"Ingredient list not found in product_info {product_info_from_db}") |
|
|
|
try: |
|
claims_list = product_info_from_db["claims"] |
|
except: |
|
print(f"Claims list not found in product_info {product_info_from_db}") |
|
|
|
if len(ingredients_list) > 0: |
|
processing_level = analyze_processing_level(ingredients_list, brand_name, product_name, assistant1.id) |
|
harmful_ingredient_analysis = analyze_harmful_ingredients(ingredients_list, brand_name, product_name, assistant2.id) |
|
else: |
|
print("No ingredients specified by the user!") |
|
|
|
if len(claims_list) > 0: |
|
claims_analysis = analyze_claims(claims_list, assistant3.id) |
|
else: |
|
print("No claims specified by the user!") |
|
|
|
|
|
if processing_level != "" or harmful_ingredient_analysis != "" or claims_analysis != "": |
|
final_analysis = generate_final_analysis( |
|
product_info, |
|
processing_level, |
|
harmful_ingredient_analysis, |
|
claims_analysis |
|
) |
|
else: |
|
final_analysis = "Sorry, No product information found! Please re-try" |
|
|
|
|
|
st.success("Analysis complete!") |
|
st.subheader("Final Analysis:") |
|
st.write(final_analysis) |
|
|
|
|
|
if st.button("Analyze Another Product"): |
|
st.rerun() |
|
|
|
if __name__ == "__main__": |
|
main() |