import streamlit as st import pandas as pd import requests from datetime import datetime from dotenv import load_dotenv import os import time from datetime import datetime, date, timedelta import re load_dotenv() base_url = os.getenv("BASE_URL") #setup base config for streamlit st.set_page_config(layout="wide") # Cache to store requests # cache = {} later_time = (datetime.now() + timedelta(hours=7)) # Init variable if "cache" not in st.session_state: print('cache not in session...') st.session_state.cache = {} if "last_data" not in st.session_state: st.session_state.last_data = {} if "param" not in st.session_state: st.session_state.param = {} # Set the page to full width print('>>>> start over <<<<<') # Streamlit app # st.title("Stock Movement Data") # Function to make API request # @st.cache_data def make_api_request(): # Replace the following URL with your actual API endpoint api_url = base_url + "/v1/simulation/fifo" start_datetime = datetime.combine(start_date, start_time) start_date_millis = (int(start_datetime.timestamp()) - (7 * 60 * 60)) * 1000 # start_date_str = start_date.strftime("%Y-%m-%d") # print('--- timestamp', start_date_str) # start_date_millis = int(datetime.strptime(start_date_str, "%Y-%m-%d").timestamp()) * 1000 - (7 * 60 * 60 ) # print(start_date_millis) # Make the API request with query parameters params = {"product": product_name, "outlet": outlet_name, "start_date": start_date_millis} print(f'---> make a request: {params}') response = requests.get(api_url, params=params) if response.status_code == 200: # print('response ', response.json()) return response.json() else: st.error(f"Error: {response.status_code}") print('request err: ', response) return None # Function to format date def format_date(date_str): if date_str == 0 or pd.isna(date_str) or date_str is None: return 0 # Convert date to datetime, add 7 hours, and format it formatted_date = pd.to_datetime(date_str, unit='ms') + pd.to_timedelta("7 hours") return formatted_date.strftime("%d/%m/%Y %H:%M") def remove_trailing_zeros(num): # return "{:.1f}".format(num).rstrip('0').rstrip('.') num = f"{num:.3f}" return re.sub(r'\.?0+$', '', str(num)) def format_df(data): tables = [] if data == '' : return tables df = pd.json_normalize(data) # Check if column exists (number) for col in ["price", "created_at"]: if col not in df.columns: # If not, add the column and fill with NaN or other default df[col] = 0 # Check if column exists (sting) for col in ["stock_out_source", "stock_out_id"]: if col not in df.columns: # If not, add the column and fill with NaN or other default df[col] = "" df['stock_out_origin'] = df['stock_out'].astype(float) df["stock in"] = df["stock_in_source"] + " #" + df["stock_in_id"].astype(str) df["stock out"] = df["stock_out_source"] + " #" + df["stock_out_id"].astype(str) df["date_in"] = df["created_at_in"].apply(format_date) df["date_out"] = df["created_at"].apply(format_date) df['stock_out'] = df['stock_out'].apply(remove_trailing_zeros) df['stock_out_total'] = df['stock_out_total'].apply(remove_trailing_zeros) fill_values = {'price': 0, 'created_at': 0} df = df.fillna(value=fill_values) # Group by stock_price_id and aggregate details grouped_data = df.groupby("stock_price_id").agg( {"product_name": "first", "outlet_name": "first", "price": "first", "stock_in": "first", "stock_out_total": "first", "stock_in_source": "first", "stock_in_id": "first", "stock in": "first", "date_in":"first", "created_at_in": "first"} ).reset_index() grouped_data = grouped_data.sort_values(by="created_at_in") # Iterate through groups and display main and detail tables for index, row in grouped_data.iterrows(): main_table = pd.DataFrame([row], columns=["stock in", "price", "stock_in"]) main_table['price'] = main_table['price'].apply(remove_trailing_zeros) main_table['stock_out'] = row['stock_out_total'] main_table['date_in'] = row['date_in'] detail_data = df[df["stock_price_id"] == row["stock_price_id"]] detail_table = detail_data[["stock out", "stock_out", "date_out"]] # detail_table['stock_out_origin'] = detail_data['stock_out'].astype(float) tables.append({'main': main_table, 'detail': detail_table}) return tables def show_table(): st.write("### Fifo Table") st.write(f'Product: **{product_name}**, Outlet: **{outlet_name}**, Start Date: **{start_date}** **{start_time}**') # Make the API request api_data = make_api_request() if api_data and api_data['data'] is None: st.write("No data found. Make sure to stock opname the product!") if api_data and api_data["data"]: # st.json(api_data) timestamp = later_time.strftime("%H:%M:%S") #time.strftime("%H:%M:%S") st.session_state.cache[timestamp] = api_data['data'] st.session_state.last_data = api_data['data'] # Convert API response to DataFrame tables = format_df(api_data["data"]) for table in tables: # Create two columns for the layout col1, col2 = st.columns(2) # Display main table col1.table(table['main']) # Display detail table with specific columns if the checkbox is checked if show_detail: col2.table(table['detail']) # col2.markdown(f'**Total: {table["detail"]["stock_out_origin"].sum()}**') st.write("------") with st.sidebar.form("filter"): # Input fields outlet_suggestios = ['Toko Oke', 'Drawing Franco Studio', 'Dr Franco'] product_name = st.text_input("Product Name") outlet_name = st.text_input("Outlet Name", autocomplete='Toko Oke') with st.expander("Show More"): start_date = st.date_input("Start Date (stock in)", value = date.today().replace(day = 1)) start_time = st.time_input("Start Time (stock in)", value = datetime.min ) show_detail = st.checkbox("Show Detail Table", value=True)# Checkbox to show/hide detail table print('product-name: ', product_name) submitted = st.form_submit_button("Apply")# on_click=show_table if submitted: print('submit....', product_name) show_table() with st.sidebar.form("edit_form"): st.write("## Edit the data") stock_key = st.text_input("Table Key (e.g. stock_opname #802)") stock_in = st.number_input("Updated Qty Stock In", min_value=0) update = st.form_submit_button("Update") if update: # Extract stock_in_source and stock_in_id from stock_key stock_in_source, stock_in_id = stock_key.split("#") # Prepare data for the API request data = { "qty": stock_in, "stock_in_source": stock_in_source.strip(), "stock_in_id": stock_in_id.strip() } print('--- update: ', data) # API endpoint api_url = base_url + "/v1/simulation/fifo" # Make the PUT request response = requests.put(api_url, data=data) if response.status_code == 200: st.success("Update successful!") show_table() else: st.error(f"Update failed. Status code: {response.status_code}") st.write("### History") cache = st.session_state.cache #if cache empty, set default print('size:: ', len(cache.keys())) if cache is None or len(cache.keys()) == 0: cache = {'None': ''} tabs_title = list(cache.keys()) tabs = st.tabs(tabs_title) for tab, key in zip(tabs, list(cache.keys())): with tab: if st.button('Show', key=key): data = cache[key] tables = format_df(data) st.write(f'History of **{data[0]["product_name"]}**, at **{data[0]["outlet_name"]}**') for table in tables: col1, col2 = st.columns(2) col1.write(table['main']) col2.write(table['detail']) st.write("---") # show_table() with st.sidebar.form('form_cache'): if st.form_submit_button("Clear Cache"): if "cache" in st.session_state: del st.session_state["cache"] st.session_state.cache = {}