|
import streamlit as st |
|
import pandas as pd |
|
import os |
|
import re |
|
import json |
|
from PIL import Image |
|
from datetime import datetime |
|
from google.cloud import vision |
|
from google.oauth2 import service_account |
|
|
|
st.set_page_config(layout="wide") |
|
|
|
import database as db |
|
|
|
GCP_SERVICE_ACCOUNT_JSON = os.getenv("GCP_SERVICE_ACCOUNT_JSON") |
|
|
|
def get_vision_client(): |
|
if not GCP_SERVICE_ACCOUNT_JSON: |
|
raise Exception("GCP service account JSON is missing. Check your environment variables.") |
|
credentials_dict = json.loads(GCP_SERVICE_ACCOUNT_JSON) |
|
credentials = service_account.Credentials.from_service_account_info(credentials_dict) |
|
return vision.ImageAnnotatorClient(credentials=credentials) |
|
|
|
client = get_vision_client() |
|
|
|
def parse_description(description): |
|
""" |
|
A naive regex approach that grabs lines containing $XX.XX |
|
and pairs them with the line above as the item name. |
|
""" |
|
lines = description.split("\n") |
|
items = [] |
|
price_pattern = r"\$(\d+\.\d{2})" |
|
|
|
for i in range(1, len(lines)): |
|
line = lines[i].strip() |
|
prev_line = lines[i - 1].strip() |
|
match = re.search(price_pattern, line) |
|
if match: |
|
item_name = prev_line |
|
price = float(match.group(1)) |
|
items.append({"name": item_name, "price": price}) |
|
|
|
return items |
|
|
|
def extract_invoice_data(image_data: bytes): |
|
""" |
|
Calls Google Vision to detect text from the uploaded image bytes, |
|
then uses parse_description to produce a list of item dicts. |
|
""" |
|
image = vision.Image(content=image_data) |
|
response = client.text_detection(image=image) |
|
if response.error.message: |
|
raise Exception(f"Vision API Error: {response.error.message}") |
|
|
|
response_dict = vision.AnnotateImageResponse.to_dict(response) |
|
annotations = response_dict.get("text_annotations", []) |
|
if not annotations: |
|
return [] |
|
|
|
description = annotations[0]["description"] |
|
return parse_description(description) |
|
|
|
def load_existing_data(): |
|
if 'logged_in' not in st.session_state: |
|
st.session_state['logged_in'] = False |
|
if 'email' not in st.session_state: |
|
st.session_state['email'] = None |
|
if 'section' not in st.session_state: |
|
st.session_state['section'] = None |
|
if 'participants' not in st.session_state: |
|
st.session_state['participants'] = [] |
|
if 'items' not in st.session_state: |
|
st.session_state['items'] = [] |
|
if 'selected_items' not in st.session_state: |
|
st.session_state['selected_items'] = [] |
|
if 'submitted_items' not in st.session_state: |
|
st.session_state['submitted_items'] = [] |
|
if 'submitted' not in st.session_state: |
|
st.session_state['submitted'] = False |
|
if 'sections_loaded' not in st.session_state: |
|
st.session_state['sections_loaded'] = False |
|
if 'existing_sections' not in st.session_state: |
|
st.session_state['existing_sections'] = [] |
|
|
|
def load_section_from_db(section_name): |
|
owner_email = st.session_state['email'] |
|
section_doc = db.get_section(owner_email, section_name) |
|
if section_doc: |
|
st.session_state['section'] = section_name |
|
st.session_state['participants'] = section_doc['participants'] |
|
st.session_state['items'] = [] |
|
st.session_state['selected_items'] = [] |
|
st.session_state['submitted_items'] = [] |
|
st.session_state['submitted'] = False |
|
|
|
def save_section_to_db(section_name, participants): |
|
owner_email = st.session_state['email'] |
|
db.update_section(owner_email, section_name, participants) |
|
|
|
def update_submitted_items(): |
|
owner_email = st.session_state['email'] |
|
section_name = st.session_state['section'] |
|
st.session_state['submitted_items'] = db.get_submitted_items(owner_email, section_name) |
|
|
|
def get_most_bought_item(): |
|
owner_email = st.session_state['email'] |
|
section_name = st.session_state['section'] |
|
return db.get_most_bought_item(owner_email, section_name) |
|
|
|
def main(): |
|
load_existing_data() |
|
st.title("EzSplit - Scan. Split. Quit Arguing.") |
|
|
|
if not st.session_state['logged_in']: |
|
tab1, tab2 = st.tabs(["Login", "Sign Up"]) |
|
|
|
with tab1: |
|
st.header("Login") |
|
email = st.text_input("Email") |
|
password = st.text_input("Password", type="password") |
|
if st.button("Login"): |
|
user_doc = db.get_user_by_email_and_password(email, password) |
|
if user_doc: |
|
st.session_state['logged_in'] = True |
|
st.session_state['email'] = email |
|
st.rerun() |
|
else: |
|
st.error("Invalid email or password") |
|
|
|
with tab2: |
|
st.header("Sign Up") |
|
signup_email = st.text_input("New Email") |
|
signup_password = st.text_input("New Password", type="password") |
|
confirm_password = st.text_input("Confirm Password", type="password") |
|
if st.button("Sign Up"): |
|
if signup_password == confirm_password: |
|
try: |
|
db.create_user(signup_email, signup_password) |
|
st.success("Sign up successful! Please log in.") |
|
except ValueError: |
|
st.error("Email already exists") |
|
else: |
|
st.error("Passwords do not match") |
|
|
|
else: |
|
with st.sidebar: |
|
st.write(f"Welcome, {st.session_state['email']}") |
|
if st.button("Sign Out"): |
|
st.session_state.clear() |
|
st.rerun() |
|
|
|
st.header("Billing Sections") |
|
|
|
if not st.session_state['sections_loaded']: |
|
owner_email = st.session_state['email'] |
|
existing_sections = db.get_all_sections(owner_email) |
|
st.session_state['sections_loaded'] = True |
|
st.session_state['existing_sections'] = existing_sections |
|
|
|
section_name = st.radio("Select a section", |
|
["Create New"] + st.session_state['existing_sections']) |
|
|
|
if section_name == "Create New": |
|
new_section_name = st.text_input("New Section Name") |
|
participants_str = st.text_area("Participants (comma-separated)") |
|
if st.button("Create Section"): |
|
try: |
|
participants_list = [p.strip() for p in participants_str.split(",") if p.strip()] |
|
db.create_section(st.session_state['email'], new_section_name, participants_list) |
|
st.session_state['section'] = new_section_name |
|
st.session_state['participants'] = participants_list |
|
st.session_state['items'] = [] |
|
st.session_state['selected_items'] = [] |
|
st.session_state['submitted_items'] = [] |
|
st.session_state['submitted'] = False |
|
st.session_state['existing_sections'].append(new_section_name) |
|
st.rerun() |
|
except ValueError as e: |
|
st.error(str(e)) |
|
else: |
|
if st.button("Load Section"): |
|
load_section_from_db(section_name) |
|
update_submitted_items() |
|
st.rerun() |
|
|
|
if st.button("Delete Section"): |
|
db.delete_section(st.session_state['email'], section_name) |
|
st.session_state['existing_sections'].remove(section_name) |
|
if st.session_state['section'] == section_name: |
|
st.session_state['section'] = None |
|
st.session_state['participants'] = [] |
|
st.session_state['items'] = [] |
|
st.session_state['selected_items'] = [] |
|
st.session_state['submitted_items'] = [] |
|
st.session_state['submitted'] = False |
|
st.rerun() |
|
|
|
if st.session_state['section'] and st.session_state['participants']: |
|
col1, col2 = st.columns([3, 1]) |
|
|
|
with col1: |
|
st.subheader(f"Section: {st.session_state['section']}") |
|
st.write("Participants: " + ", ".join(st.session_state['participants'])) |
|
|
|
st.subheader("Bill Image (OCR)") |
|
uploaded_file = st.file_uploader("Upload a bill image (jpg, jpeg, png)", |
|
type=["jpg", "jpeg", "png"]) |
|
|
|
if uploaded_file is not None: |
|
if st.button("Extract Items via OCR"): |
|
try: |
|
image_data = uploaded_file.read() |
|
ocr_items = extract_invoice_data(image_data) |
|
if ocr_items: |
|
st.session_state['items'].extend(ocr_items) |
|
st.success("OCR extraction successful. Items appended.") |
|
else: |
|
st.warning("No items found in the extracted text.") |
|
except Exception as e: |
|
st.error(f"OCR failed: {e}") |
|
|
|
st.subheader("Manual Item Entry") |
|
manual_item_name = st.text_input("Item Name", key="manual_item_name") |
|
manual_item_price = st.number_input("Item Price", min_value=0.0, step=0.01, key="manual_item_price") |
|
|
|
if st.button("Add Item Manually"): |
|
if manual_item_name.strip(): |
|
st.session_state['items'].append({ |
|
"name": manual_item_name.strip(), |
|
"price": manual_item_price |
|
}) |
|
st.success(f"Added '{manual_item_name}' at ${manual_item_price:.2f}") |
|
else: |
|
st.warning("Please provide a non-empty item name.") |
|
|
|
st.subheader("Available Items") |
|
if st.session_state['items']: |
|
st.write([f"{itm['name']} (${itm['price']})" for itm in st.session_state['items']]) |
|
else: |
|
st.write("No items found yet.") |
|
|
|
if st.session_state['items']: |
|
not_submitted = [ |
|
itm["name"] for itm in st.session_state['items'] |
|
if itm["name"] not in st.session_state['submitted_items'] |
|
] |
|
selected_items = st.multiselect("Select Items to Tag", not_submitted, key="item_select") |
|
|
|
if st.button("Tag Selected Items"): |
|
st.session_state['selected_items'] = selected_items |
|
st.session_state['submitted'] = True |
|
|
|
if st.session_state['submitted'] and st.session_state['selected_items']: |
|
chosen_participant = st.selectbox("Participant to Assign Items", |
|
st.session_state['participants'], |
|
key="participant_select") |
|
|
|
if st.button("Confirm Assignment"): |
|
for it in st.session_state['items']: |
|
if it['name'] in st.session_state['selected_items']: |
|
db.create_bill( |
|
owner_email=st.session_state['email'], |
|
section_name=st.session_state['section'], |
|
participant=chosen_participant, |
|
item=it['name'], |
|
price=it['price'] |
|
) |
|
st.session_state['submitted_items'].append(it['name']) |
|
st.success("Items assigned successfully!") |
|
st.session_state['submitted'] = False |
|
st.session_state['selected_items'] = [] |
|
|
|
st.subheader("Billing History") |
|
history = db.get_billing_history(st.session_state['email'], st.session_state['section']) |
|
if history: |
|
data_list = [] |
|
for row in history: |
|
data_list.append({ |
|
"Participant": row["_id"], |
|
"Total Price": row["total_price"], |
|
"Last Updated": row["last_updated"] |
|
}) |
|
df = pd.DataFrame(data_list) |
|
st.dataframe(df) |
|
else: |
|
st.write("No bills recorded yet.") |
|
|
|
with col2: |
|
st.subheader("Most Bought Item") |
|
top_item = get_most_bought_item() |
|
if top_item: |
|
item_name, count_val, price_val = top_item |
|
st.write(f"Item: {item_name}, Count: {count_val}, Price: ${price_val}") |
|
else: |
|
st.write("No items purchased yet.") |
|
|
|
st.subheader("Manage Participants") |
|
new_participant = st.text_input("Add Participant") |
|
if st.button("Add Participant"): |
|
if new_participant.strip(): |
|
st.session_state['participants'].append(new_participant.strip()) |
|
save_section_to_db(st.session_state['section'], st.session_state['participants']) |
|
st.rerun() |
|
|
|
remove_part = st.selectbox("Remove Participant", st.session_state['participants'], key="remove_participant") |
|
if st.button("Remove Participant"): |
|
if remove_part: |
|
st.session_state['participants'].remove(remove_part) |
|
save_section_to_db(st.session_state['section'], st.session_state['participants']) |
|
db.remove_items( |
|
st.session_state['email'], |
|
st.session_state['section'], |
|
remove_part, |
|
items_to_remove=None |
|
) |
|
st.rerun() |
|
|
|
st.subheader("Remove Wrongly Tagged Items") |
|
part_to_remove_from = st.selectbox("Select Participant", |
|
st.session_state['participants'], |
|
key="remove_participant_select") |
|
if part_to_remove_from: |
|
pipeline = [ |
|
{"$match": { |
|
"owner_email": st.session_state['email'], |
|
"section_name": st.session_state['section'], |
|
"participant": part_to_remove_from |
|
}} |
|
] |
|
results = list(db.bills_coll.aggregate(pipeline)) |
|
if results: |
|
tagged_items = [doc["item"] for doc in results] |
|
remove_items_select = st.multiselect("Select Items to Remove", |
|
tagged_items, |
|
key="remove_item_select") |
|
if st.button("Remove Items"): |
|
db.remove_items( |
|
st.session_state['email'], |
|
st.session_state['section'], |
|
part_to_remove_from, |
|
remove_items_select |
|
) |
|
for itm in remove_items_select: |
|
if itm in st.session_state['submitted_items']: |
|
st.session_state['submitted_items'].remove(itm) |
|
st.success("Items removed successfully!") |
|
st.rerun() |
|
else: |
|
st.write("No items found for this participant.") |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|