import streamlit as st import pandas as pd import os import re import json from PIL import Image from datetime import datetime from google.cloud import vision from google.oauth2 import service_account st.set_page_config(layout="wide") import database as db GCP_SERVICE_ACCOUNT_JSON = os.getenv("GCP_SERVICE_ACCOUNT_JSON") def get_vision_client(): if not GCP_SERVICE_ACCOUNT_JSON: raise Exception("GCP service account JSON is missing. Check your environment variables.") credentials_dict = json.loads(GCP_SERVICE_ACCOUNT_JSON) credentials = service_account.Credentials.from_service_account_info(credentials_dict) return vision.ImageAnnotatorClient(credentials=credentials) client = get_vision_client() def parse_description(description): """ A naive regex approach that grabs lines containing $XX.XX and pairs them with the line above as the item name. """ lines = description.split("\n") items = [] price_pattern = r"\$(\d+\.\d{2})" for i in range(1, len(lines)): line = lines[i].strip() prev_line = lines[i - 1].strip() match = re.search(price_pattern, line) if match: item_name = prev_line price = float(match.group(1)) items.append({"name": item_name, "price": price}) return items def extract_invoice_data(image_data: bytes): """ Calls Google Vision to detect text from the uploaded image bytes, then uses parse_description to produce a list of item dicts. """ image = vision.Image(content=image_data) response = client.text_detection(image=image) if response.error.message: raise Exception(f"Vision API Error: {response.error.message}") response_dict = vision.AnnotateImageResponse.to_dict(response) annotations = response_dict.get("text_annotations", []) if not annotations: return [] description = annotations[0]["description"] return parse_description(description) def load_existing_data(): if 'logged_in' not in st.session_state: st.session_state['logged_in'] = False if 'email' not in st.session_state: st.session_state['email'] = None if 'section' not in st.session_state: st.session_state['section'] = None if 'participants' not in st.session_state: st.session_state['participants'] = [] if 'items' not in st.session_state: st.session_state['items'] = [] if 'selected_items' not in st.session_state: st.session_state['selected_items'] = [] if 'submitted_items' not in st.session_state: st.session_state['submitted_items'] = [] if 'submitted' not in st.session_state: st.session_state['submitted'] = False if 'sections_loaded' not in st.session_state: st.session_state['sections_loaded'] = False if 'existing_sections' not in st.session_state: st.session_state['existing_sections'] = [] def load_section_from_db(section_name): owner_email = st.session_state['email'] section_doc = db.get_section(owner_email, section_name) if section_doc: st.session_state['section'] = section_name st.session_state['participants'] = section_doc['participants'] st.session_state['items'] = [] st.session_state['selected_items'] = [] st.session_state['submitted_items'] = [] st.session_state['submitted'] = False def save_section_to_db(section_name, participants): owner_email = st.session_state['email'] db.update_section(owner_email, section_name, participants) def update_submitted_items(): owner_email = st.session_state['email'] section_name = st.session_state['section'] st.session_state['submitted_items'] = db.get_submitted_items(owner_email, section_name) def get_most_bought_item(): owner_email = st.session_state['email'] section_name = st.session_state['section'] return db.get_most_bought_item(owner_email, section_name) def main(): load_existing_data() st.title("EzSplit - Scan. Split. Quit Arguing.") if not st.session_state['logged_in']: tab1, tab2 = st.tabs(["Login", "Sign Up"]) with tab1: st.header("Login") email = st.text_input("Email") password = st.text_input("Password", type="password") if st.button("Login"): user_doc = db.get_user_by_email_and_password(email, password) if user_doc: st.session_state['logged_in'] = True st.session_state['email'] = email st.rerun() else: st.error("Invalid email or password") with tab2: st.header("Sign Up") signup_email = st.text_input("New Email") signup_password = st.text_input("New Password", type="password") confirm_password = st.text_input("Confirm Password", type="password") if st.button("Sign Up"): if signup_password == confirm_password: try: db.create_user(signup_email, signup_password) st.success("Sign up successful! Please log in.") except ValueError: st.error("Email already exists") else: st.error("Passwords do not match") else: with st.sidebar: st.write(f"Welcome, {st.session_state['email']}") if st.button("Sign Out"): st.session_state.clear() st.rerun() st.header("Billing Sections") if not st.session_state['sections_loaded']: owner_email = st.session_state['email'] existing_sections = db.get_all_sections(owner_email) st.session_state['sections_loaded'] = True st.session_state['existing_sections'] = existing_sections section_name = st.radio("Select a section", ["Create New"] + st.session_state['existing_sections']) if section_name == "Create New": new_section_name = st.text_input("New Section Name") participants_str = st.text_area("Participants (comma-separated)") if st.button("Create Section"): try: participants_list = [p.strip() for p in participants_str.split(",") if p.strip()] db.create_section(st.session_state['email'], new_section_name, participants_list) st.session_state['section'] = new_section_name st.session_state['participants'] = participants_list st.session_state['items'] = [] st.session_state['selected_items'] = [] st.session_state['submitted_items'] = [] st.session_state['submitted'] = False st.session_state['existing_sections'].append(new_section_name) st.rerun() except ValueError as e: st.error(str(e)) else: if st.button("Load Section"): load_section_from_db(section_name) update_submitted_items() st.rerun() if st.button("Delete Section"): db.delete_section(st.session_state['email'], section_name) st.session_state['existing_sections'].remove(section_name) if st.session_state['section'] == section_name: st.session_state['section'] = None st.session_state['participants'] = [] st.session_state['items'] = [] st.session_state['selected_items'] = [] st.session_state['submitted_items'] = [] st.session_state['submitted'] = False st.rerun() if st.session_state['section'] and st.session_state['participants']: col1, col2 = st.columns([3, 1]) with col1: st.subheader(f"Section: {st.session_state['section']}") st.write("Participants: " + ", ".join(st.session_state['participants'])) st.subheader("Bill Image (OCR)") uploaded_file = st.file_uploader("Upload a bill image (jpg, jpeg, png)", type=["jpg", "jpeg", "png"]) if uploaded_file is not None: if st.button("Extract Items via OCR"): try: image_data = uploaded_file.read() ocr_items = extract_invoice_data(image_data) if ocr_items: st.session_state['items'].extend(ocr_items) st.success("OCR extraction successful. Items appended.") else: st.warning("No items found in the extracted text.") except Exception as e: st.error(f"OCR failed: {e}") st.subheader("Manual Item Entry") manual_item_name = st.text_input("Item Name", key="manual_item_name") manual_item_price = st.number_input("Item Price", min_value=0.0, step=0.01, key="manual_item_price") if st.button("Add Item Manually"): if manual_item_name.strip(): st.session_state['items'].append({ "name": manual_item_name.strip(), "price": manual_item_price }) st.success(f"Added '{manual_item_name}' at ${manual_item_price:.2f}") else: st.warning("Please provide a non-empty item name.") st.subheader("Available Items") if st.session_state['items']: st.write([f"{itm['name']} (${itm['price']})" for itm in st.session_state['items']]) else: st.write("No items found yet.") if st.session_state['items']: not_submitted = [ itm["name"] for itm in st.session_state['items'] if itm["name"] not in st.session_state['submitted_items'] ] selected_items = st.multiselect("Select Items to Tag", not_submitted, key="item_select") if st.button("Tag Selected Items"): st.session_state['selected_items'] = selected_items st.session_state['submitted'] = True if st.session_state['submitted'] and st.session_state['selected_items']: chosen_participant = st.selectbox("Participant to Assign Items", st.session_state['participants'], key="participant_select") if st.button("Confirm Assignment"): for it in st.session_state['items']: if it['name'] in st.session_state['selected_items']: db.create_bill( owner_email=st.session_state['email'], section_name=st.session_state['section'], participant=chosen_participant, item=it['name'], price=it['price'] ) st.session_state['submitted_items'].append(it['name']) st.success("Items assigned successfully!") st.session_state['submitted'] = False st.session_state['selected_items'] = [] st.subheader("Billing History") history = db.get_billing_history(st.session_state['email'], st.session_state['section']) if history: data_list = [] for row in history: data_list.append({ "Participant": row["_id"], "Total Price": row["total_price"], "Last Updated": row["last_updated"] }) df = pd.DataFrame(data_list) st.dataframe(df) else: st.write("No bills recorded yet.") with col2: st.subheader("Most Bought Item") top_item = get_most_bought_item() if top_item: item_name, count_val, price_val = top_item st.write(f"Item: {item_name}, Count: {count_val}, Price: ${price_val}") else: st.write("No items purchased yet.") st.subheader("Manage Participants") new_participant = st.text_input("Add Participant") if st.button("Add Participant"): if new_participant.strip(): st.session_state['participants'].append(new_participant.strip()) save_section_to_db(st.session_state['section'], st.session_state['participants']) st.rerun() remove_part = st.selectbox("Remove Participant", st.session_state['participants'], key="remove_participant") if st.button("Remove Participant"): if remove_part: st.session_state['participants'].remove(remove_part) save_section_to_db(st.session_state['section'], st.session_state['participants']) db.remove_items( st.session_state['email'], st.session_state['section'], remove_part, items_to_remove=None ) st.rerun() st.subheader("Remove Wrongly Tagged Items") part_to_remove_from = st.selectbox("Select Participant", st.session_state['participants'], key="remove_participant_select") if part_to_remove_from: pipeline = [ {"$match": { "owner_email": st.session_state['email'], "section_name": st.session_state['section'], "participant": part_to_remove_from }} ] results = list(db.bills_coll.aggregate(pipeline)) if results: tagged_items = [doc["item"] for doc in results] remove_items_select = st.multiselect("Select Items to Remove", tagged_items, key="remove_item_select") if st.button("Remove Items"): db.remove_items( st.session_state['email'], st.session_state['section'], part_to_remove_from, remove_items_select ) for itm in remove_items_select: if itm in st.session_state['submitted_items']: st.session_state['submitted_items'].remove(itm) st.success("Items removed successfully!") st.rerun() else: st.write("No items found for this participant.") if __name__ == "__main__": main()