Fifo-Table / app.py
AnnasBlackHat's picture
fix sock price order
67ef6b6
import streamlit as st
import pandas as pd
import requests
from datetime import datetime
from dotenv import load_dotenv
import os
import time
from datetime import datetime, date, timedelta
import re
load_dotenv()
base_url = os.getenv("BASE_URL")
#setup base config for streamlit
st.set_page_config(layout="wide")
# Cache to store requests
# cache = {}
later_time = (datetime.now() + timedelta(hours=7))
# Init variable
if "cache" not in st.session_state:
print('cache not in session...')
st.session_state.cache = {}
if "last_data" not in st.session_state:
st.session_state.last_data = {}
if "param" not in st.session_state:
st.session_state.param = {}
# Set the page to full width
print('>>>> start over <<<<<')
# Streamlit app
# st.title("Stock Movement Data")
# Function to make API request
# @st.cache_data
def make_api_request():
# Replace the following URL with your actual API endpoint
api_url = base_url + "/v1/simulation/fifo"
start_datetime = datetime.combine(start_date, start_time)
start_date_millis = (int(start_datetime.timestamp()) - (7 * 60 * 60)) * 1000
# start_date_str = start_date.strftime("%Y-%m-%d")
# print('--- timestamp', start_date_str)
# start_date_millis = int(datetime.strptime(start_date_str, "%Y-%m-%d").timestamp()) * 1000 - (7 * 60 * 60 )
# print(start_date_millis)
# Make the API request with query parameters
params = {"product": product_name, "outlet": outlet_name, "start_date": start_date_millis}
print(f'---> make a request: {params}')
response = requests.get(api_url, params=params)
if response.status_code == 200:
# print('response ', response.json())
return response.json()
else:
st.error(f"Error: {response.status_code}")
print('request err: ', response)
return None
# Function to format date
def format_date(date_str):
if date_str == 0 or pd.isna(date_str) or date_str is None:
return 0
# Convert date to datetime, add 7 hours, and format it
formatted_date = pd.to_datetime(date_str, unit='ms') + pd.to_timedelta("7 hours")
return formatted_date.strftime("%d/%m/%Y %H:%M")
def remove_trailing_zeros(num):
# return "{:.1f}".format(num).rstrip('0').rstrip('.')
num = f"{num:.3f}"
return re.sub(r'\.?0+$', '', str(num))
def format_df(data):
tables = []
if data == '' : return tables
df = pd.json_normalize(data)
# Check if column exists (number)
for col in ["price", "created_at"]:
if col not in df.columns:
# If not, add the column and fill with NaN or other default
df[col] = 0
# Check if column exists (sting)
for col in ["stock_out_source", "stock_out_id"]:
if col not in df.columns:
# If not, add the column and fill with NaN or other default
df[col] = ""
df['stock_out_origin'] = df['stock_out'].astype(float)
df["stock in"] = df["stock_in_source"] + " #" + df["stock_in_id"].astype(str)
df["stock out"] = df["stock_out_source"] + " #" + df["stock_out_id"].astype(str)
df["date_in"] = df["created_at_in"].apply(format_date)
df["date_out"] = df["created_at"].apply(format_date)
df['stock_out'] = df['stock_out'].apply(remove_trailing_zeros)
df['stock_out_total'] = df['stock_out_total'].apply(remove_trailing_zeros)
fill_values = {'price': 0, 'created_at': 0}
df = df.fillna(value=fill_values)
# Group by stock_price_id and aggregate details
grouped_data = df.groupby("stock_price_id").agg(
{"product_name": "first", "outlet_name": "first", "price": "first",
"stock_in": "first", "stock_out_total": "first", "stock_in_source": "first",
"stock_in_id": "first", "stock in": "first", "date_in":"first", "created_at_in": "first"}
).reset_index()
grouped_data = grouped_data.sort_values(by="created_at_in")
# Iterate through groups and display main and detail tables
for index, row in grouped_data.iterrows():
main_table = pd.DataFrame([row], columns=["stock in", "price", "stock_in"])
main_table['price'] = main_table['price'].apply(remove_trailing_zeros)
main_table['stock_out'] = row['stock_out_total']
main_table['date_in'] = row['date_in']
detail_data = df[df["stock_price_id"] == row["stock_price_id"]]
detail_table = detail_data[["stock out", "stock_out", "date_out"]]
# detail_table['stock_out_origin'] = detail_data['stock_out'].astype(float)
tables.append({'main': main_table,
'detail': detail_table})
return tables
def show_table():
st.write("### Fifo Table")
st.write(f'Product: **{product_name}**, Outlet: **{outlet_name}**, Start Date: **{start_date}** **{start_time}**')
# Make the API request
api_data = make_api_request()
if api_data and api_data['data'] is None:
st.write("No data found. Make sure to stock opname the product!")
if api_data and api_data["data"]:
# st.json(api_data)
timestamp = later_time.strftime("%H:%M:%S") #time.strftime("%H:%M:%S")
st.session_state.cache[timestamp] = api_data['data']
st.session_state.last_data = api_data['data']
# Convert API response to DataFrame
tables = format_df(api_data["data"])
for table in tables:
# Create two columns for the layout
col1, col2 = st.columns(2)
# Display main table
col1.table(table['main'])
# Display detail table with specific columns if the checkbox is checked
if show_detail:
col2.table(table['detail'])
# col2.markdown(f'**Total: {table["detail"]["stock_out_origin"].sum()}**')
st.write("------")
with st.sidebar.form("filter"):
# Input fields
outlet_suggestios = ['Toko Oke', 'Drawing Franco Studio', 'Dr Franco']
product_name = st.text_input("Product Name")
outlet_name = st.text_input("Outlet Name", autocomplete='Toko Oke')
with st.expander("Show More"):
start_date = st.date_input("Start Date (stock in)", value = date.today().replace(day = 1))
start_time = st.time_input("Start Time (stock in)", value = datetime.min )
show_detail = st.checkbox("Show Detail Table", value=True)# Checkbox to show/hide detail table
print('product-name: ', product_name)
submitted = st.form_submit_button("Apply")# on_click=show_table
if submitted:
print('submit....', product_name)
show_table()
with st.sidebar.form("edit_form"):
st.write("## Edit the data")
stock_key = st.text_input("Table Key (e.g. stock_opname #802)")
stock_in = st.number_input("Updated Qty Stock In", min_value=0)
update = st.form_submit_button("Update")
if update:
# Extract stock_in_source and stock_in_id from stock_key
stock_in_source, stock_in_id = stock_key.split("#")
# Prepare data for the API request
data = {
"qty": stock_in,
"stock_in_source": stock_in_source.strip(),
"stock_in_id": stock_in_id.strip()
}
print('--- update: ', data)
# API endpoint
api_url = base_url + "/v1/simulation/fifo"
# Make the PUT request
response = requests.put(api_url, data=data)
if response.status_code == 200:
st.success("Update successful!")
show_table()
else:
st.error(f"Update failed. Status code: {response.status_code}")
st.write("### History")
cache = st.session_state.cache
#if cache empty, set default
print('size:: ', len(cache.keys()))
if cache is None or len(cache.keys()) == 0:
cache = {'None': ''}
tabs_title = list(cache.keys())
tabs = st.tabs(tabs_title)
for tab, key in zip(tabs, list(cache.keys())):
with tab:
if st.button('Show', key=key):
data = cache[key]
tables = format_df(data)
st.write(f'History of **{data[0]["product_name"]}**, at **{data[0]["outlet_name"]}**')
for table in tables:
col1, col2 = st.columns(2)
col1.write(table['main'])
col2.write(table['detail'])
st.write("---")
# show_table()
with st.sidebar.form('form_cache'):
if st.form_submit_button("Clear Cache"):
if "cache" in st.session_state:
del st.session_state["cache"]
st.session_state.cache = {}