Fifo-Table / app.py
AnnasBlackHat's picture
fix sock price order
67ef6b6
raw
history blame contribute delete
No virus
8.52 kB
import streamlit as st
import pandas as pd
import requests
from datetime import datetime
from dotenv import load_dotenv
import os
import time
from datetime import datetime, date, timedelta
import re
load_dotenv()
base_url = os.getenv("BASE_URL")
#setup base config for streamlit
st.set_page_config(layout="wide")
# Cache to store requests
# cache = {}
later_time = (datetime.now() + timedelta(hours=7))
# Init variable
if "cache" not in st.session_state:
print('cache not in session...')
st.session_state.cache = {}
if "last_data" not in st.session_state:
st.session_state.last_data = {}
if "param" not in st.session_state:
st.session_state.param = {}
# Set the page to full width
print('>>>> start over <<<<<')
# Streamlit app
# st.title("Stock Movement Data")
# Function to make API request
# @st.cache_data
def make_api_request():
# Replace the following URL with your actual API endpoint
api_url = base_url + "/v1/simulation/fifo"
start_datetime = datetime.combine(start_date, start_time)
start_date_millis = (int(start_datetime.timestamp()) - (7 * 60 * 60)) * 1000
# start_date_str = start_date.strftime("%Y-%m-%d")
# print('--- timestamp', start_date_str)
# start_date_millis = int(datetime.strptime(start_date_str, "%Y-%m-%d").timestamp()) * 1000 - (7 * 60 * 60 )
# print(start_date_millis)
# Make the API request with query parameters
params = {"product": product_name, "outlet": outlet_name, "start_date": start_date_millis}
print(f'---> make a request: {params}')
response = requests.get(api_url, params=params)
if response.status_code == 200:
# print('response ', response.json())
return response.json()
else:
st.error(f"Error: {response.status_code}")
print('request err: ', response)
return None
# Function to format date
def format_date(date_str):
if date_str == 0 or pd.isna(date_str) or date_str is None:
return 0
# Convert date to datetime, add 7 hours, and format it
formatted_date = pd.to_datetime(date_str, unit='ms') + pd.to_timedelta("7 hours")
return formatted_date.strftime("%d/%m/%Y %H:%M")
def remove_trailing_zeros(num):
# return "{:.1f}".format(num).rstrip('0').rstrip('.')
num = f"{num:.3f}"
return re.sub(r'\.?0+$', '', str(num))
def format_df(data):
tables = []
if data == '' : return tables
df = pd.json_normalize(data)
# Check if column exists (number)
for col in ["price", "created_at"]:
if col not in df.columns:
# If not, add the column and fill with NaN or other default
df[col] = 0
# Check if column exists (sting)
for col in ["stock_out_source", "stock_out_id"]:
if col not in df.columns:
# If not, add the column and fill with NaN or other default
df[col] = ""
df['stock_out_origin'] = df['stock_out'].astype(float)
df["stock in"] = df["stock_in_source"] + " #" + df["stock_in_id"].astype(str)
df["stock out"] = df["stock_out_source"] + " #" + df["stock_out_id"].astype(str)
df["date_in"] = df["created_at_in"].apply(format_date)
df["date_out"] = df["created_at"].apply(format_date)
df['stock_out'] = df['stock_out'].apply(remove_trailing_zeros)
df['stock_out_total'] = df['stock_out_total'].apply(remove_trailing_zeros)
fill_values = {'price': 0, 'created_at': 0}
df = df.fillna(value=fill_values)
# Group by stock_price_id and aggregate details
grouped_data = df.groupby("stock_price_id").agg(
{"product_name": "first", "outlet_name": "first", "price": "first",
"stock_in": "first", "stock_out_total": "first", "stock_in_source": "first",
"stock_in_id": "first", "stock in": "first", "date_in":"first", "created_at_in": "first"}
).reset_index()
grouped_data = grouped_data.sort_values(by="created_at_in")
# Iterate through groups and display main and detail tables
for index, row in grouped_data.iterrows():
main_table = pd.DataFrame([row], columns=["stock in", "price", "stock_in"])
main_table['price'] = main_table['price'].apply(remove_trailing_zeros)
main_table['stock_out'] = row['stock_out_total']
main_table['date_in'] = row['date_in']
detail_data = df[df["stock_price_id"] == row["stock_price_id"]]
detail_table = detail_data[["stock out", "stock_out", "date_out"]]
# detail_table['stock_out_origin'] = detail_data['stock_out'].astype(float)
tables.append({'main': main_table,
'detail': detail_table})
return tables
def show_table():
st.write("### Fifo Table")
st.write(f'Product: **{product_name}**, Outlet: **{outlet_name}**, Start Date: **{start_date}** **{start_time}**')
# Make the API request
api_data = make_api_request()
if api_data and api_data['data'] is None:
st.write("No data found. Make sure to stock opname the product!")
if api_data and api_data["data"]:
# st.json(api_data)
timestamp = later_time.strftime("%H:%M:%S") #time.strftime("%H:%M:%S")
st.session_state.cache[timestamp] = api_data['data']
st.session_state.last_data = api_data['data']
# Convert API response to DataFrame
tables = format_df(api_data["data"])
for table in tables:
# Create two columns for the layout
col1, col2 = st.columns(2)
# Display main table
col1.table(table['main'])
# Display detail table with specific columns if the checkbox is checked
if show_detail:
col2.table(table['detail'])
# col2.markdown(f'**Total: {table["detail"]["stock_out_origin"].sum()}**')
st.write("------")
with st.sidebar.form("filter"):
# Input fields
outlet_suggestios = ['Toko Oke', 'Drawing Franco Studio', 'Dr Franco']
product_name = st.text_input("Product Name")
outlet_name = st.text_input("Outlet Name", autocomplete='Toko Oke')
with st.expander("Show More"):
start_date = st.date_input("Start Date (stock in)", value = date.today().replace(day = 1))
start_time = st.time_input("Start Time (stock in)", value = datetime.min )
show_detail = st.checkbox("Show Detail Table", value=True)# Checkbox to show/hide detail table
print('product-name: ', product_name)
submitted = st.form_submit_button("Apply")# on_click=show_table
if submitted:
print('submit....', product_name)
show_table()
with st.sidebar.form("edit_form"):
st.write("## Edit the data")
stock_key = st.text_input("Table Key (e.g. stock_opname #802)")
stock_in = st.number_input("Updated Qty Stock In", min_value=0)
update = st.form_submit_button("Update")
if update:
# Extract stock_in_source and stock_in_id from stock_key
stock_in_source, stock_in_id = stock_key.split("#")
# Prepare data for the API request
data = {
"qty": stock_in,
"stock_in_source": stock_in_source.strip(),
"stock_in_id": stock_in_id.strip()
}
print('--- update: ', data)
# API endpoint
api_url = base_url + "/v1/simulation/fifo"
# Make the PUT request
response = requests.put(api_url, data=data)
if response.status_code == 200:
st.success("Update successful!")
show_table()
else:
st.error(f"Update failed. Status code: {response.status_code}")
st.write("### History")
cache = st.session_state.cache
#if cache empty, set default
print('size:: ', len(cache.keys()))
if cache is None or len(cache.keys()) == 0:
cache = {'None': ''}
tabs_title = list(cache.keys())
tabs = st.tabs(tabs_title)
for tab, key in zip(tabs, list(cache.keys())):
with tab:
if st.button('Show', key=key):
data = cache[key]
tables = format_df(data)
st.write(f'History of **{data[0]["product_name"]}**, at **{data[0]["outlet_name"]}**')
for table in tables:
col1, col2 = st.columns(2)
col1.write(table['main'])
col2.write(table['detail'])
st.write("---")
# show_table()
with st.sidebar.form('form_cache'):
if st.form_submit_button("Clear Cache"):
if "cache" in st.session_state:
del st.session_state["cache"]
st.session_state.cache = {}