Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import requests | |
from datetime import datetime | |
from dotenv import load_dotenv | |
import os | |
import time | |
from datetime import datetime, date, timedelta | |
import re | |
load_dotenv() | |
base_url = os.getenv("BASE_URL") | |
#setup base config for streamlit | |
st.set_page_config(layout="wide") | |
# Cache to store requests | |
# cache = {} | |
later_time = (datetime.now() + timedelta(hours=7)) | |
# Init variable | |
if "cache" not in st.session_state: | |
print('cache not in session...') | |
st.session_state.cache = {} | |
if "last_data" not in st.session_state: | |
st.session_state.last_data = {} | |
if "param" not in st.session_state: | |
st.session_state.param = {} | |
# Set the page to full width | |
print('>>>> start over <<<<<') | |
# Streamlit app | |
# st.title("Stock Movement Data") | |
# Function to make API request | |
# @st.cache_data | |
def make_api_request(): | |
# Replace the following URL with your actual API endpoint | |
api_url = base_url + "/v1/simulation/fifo" | |
start_datetime = datetime.combine(start_date, start_time) | |
start_date_millis = (int(start_datetime.timestamp()) - (7 * 60 * 60)) * 1000 | |
# start_date_str = start_date.strftime("%Y-%m-%d") | |
# print('--- timestamp', start_date_str) | |
# start_date_millis = int(datetime.strptime(start_date_str, "%Y-%m-%d").timestamp()) * 1000 - (7 * 60 * 60 ) | |
# print(start_date_millis) | |
# Make the API request with query parameters | |
params = {"product": product_name, "outlet": outlet_name, "start_date": start_date_millis} | |
print(f'---> make a request: {params}') | |
response = requests.get(api_url, params=params) | |
if response.status_code == 200: | |
# print('response ', response.json()) | |
return response.json() | |
else: | |
st.error(f"Error: {response.status_code}") | |
print('request err: ', response) | |
return None | |
# Function to format date | |
def format_date(date_str): | |
if date_str == 0 or pd.isna(date_str) or date_str is None: | |
return 0 | |
# Convert date to datetime, add 7 hours, and format it | |
formatted_date = pd.to_datetime(date_str, unit='ms') + pd.to_timedelta("7 hours") | |
return formatted_date.strftime("%d/%m/%Y %H:%M") | |
def remove_trailing_zeros(num): | |
# return "{:.1f}".format(num).rstrip('0').rstrip('.') | |
num = f"{num:.3f}" | |
return re.sub(r'\.?0+$', '', str(num)) | |
def format_df(data): | |
tables = [] | |
if data == '' : return tables | |
df = pd.json_normalize(data) | |
# Check if column exists (number) | |
for col in ["price", "created_at"]: | |
if col not in df.columns: | |
# If not, add the column and fill with NaN or other default | |
df[col] = 0 | |
# Check if column exists (sting) | |
for col in ["stock_out_source", "stock_out_id"]: | |
if col not in df.columns: | |
# If not, add the column and fill with NaN or other default | |
df[col] = "" | |
df['stock_out_origin'] = df['stock_out'].astype(float) | |
df["stock in"] = df["stock_in_source"] + " #" + df["stock_in_id"].astype(str) | |
df["stock out"] = df["stock_out_source"] + " #" + df["stock_out_id"].astype(str) | |
df["date_in"] = df["created_at_in"].apply(format_date) | |
df["date_out"] = df["created_at"].apply(format_date) | |
df['stock_out'] = df['stock_out'].apply(remove_trailing_zeros) | |
df['stock_out_total'] = df['stock_out_total'].apply(remove_trailing_zeros) | |
fill_values = {'price': 0, 'created_at': 0} | |
df = df.fillna(value=fill_values) | |
# Group by stock_price_id and aggregate details | |
grouped_data = df.groupby("stock_price_id").agg( | |
{"product_name": "first", "outlet_name": "first", "price": "first", | |
"stock_in": "first", "stock_out_total": "first", "stock_in_source": "first", | |
"stock_in_id": "first", "stock in": "first", "date_in":"first", "created_at_in": "first"} | |
).reset_index() | |
grouped_data = grouped_data.sort_values(by="created_at_in") | |
# Iterate through groups and display main and detail tables | |
for index, row in grouped_data.iterrows(): | |
main_table = pd.DataFrame([row], columns=["stock in", "price", "stock_in"]) | |
main_table['price'] = main_table['price'].apply(remove_trailing_zeros) | |
main_table['stock_out'] = row['stock_out_total'] | |
main_table['date_in'] = row['date_in'] | |
detail_data = df[df["stock_price_id"] == row["stock_price_id"]] | |
detail_table = detail_data[["stock out", "stock_out", "date_out"]] | |
# detail_table['stock_out_origin'] = detail_data['stock_out'].astype(float) | |
tables.append({'main': main_table, | |
'detail': detail_table}) | |
return tables | |
def show_table(): | |
st.write("### Fifo Table") | |
st.write(f'Product: **{product_name}**, Outlet: **{outlet_name}**, Start Date: **{start_date}** **{start_time}**') | |
# Make the API request | |
api_data = make_api_request() | |
if api_data and api_data['data'] is None: | |
st.write("No data found. Make sure to stock opname the product!") | |
if api_data and api_data["data"]: | |
# st.json(api_data) | |
timestamp = later_time.strftime("%H:%M:%S") #time.strftime("%H:%M:%S") | |
st.session_state.cache[timestamp] = api_data['data'] | |
st.session_state.last_data = api_data['data'] | |
# Convert API response to DataFrame | |
tables = format_df(api_data["data"]) | |
for table in tables: | |
# Create two columns for the layout | |
col1, col2 = st.columns(2) | |
# Display main table | |
col1.table(table['main']) | |
# Display detail table with specific columns if the checkbox is checked | |
if show_detail: | |
col2.table(table['detail']) | |
# col2.markdown(f'**Total: {table["detail"]["stock_out_origin"].sum()}**') | |
st.write("------") | |
with st.sidebar.form("filter"): | |
# Input fields | |
outlet_suggestios = ['Toko Oke', 'Drawing Franco Studio', 'Dr Franco'] | |
product_name = st.text_input("Product Name") | |
outlet_name = st.text_input("Outlet Name", autocomplete='Toko Oke') | |
with st.expander("Show More"): | |
start_date = st.date_input("Start Date (stock in)", value = date.today().replace(day = 1)) | |
start_time = st.time_input("Start Time (stock in)", value = datetime.min ) | |
show_detail = st.checkbox("Show Detail Table", value=True)# Checkbox to show/hide detail table | |
print('product-name: ', product_name) | |
submitted = st.form_submit_button("Apply")# on_click=show_table | |
if submitted: | |
print('submit....', product_name) | |
show_table() | |
with st.sidebar.form("edit_form"): | |
st.write("## Edit the data") | |
stock_key = st.text_input("Table Key (e.g. stock_opname #802)") | |
stock_in = st.number_input("Updated Qty Stock In", min_value=0) | |
update = st.form_submit_button("Update") | |
if update: | |
# Extract stock_in_source and stock_in_id from stock_key | |
stock_in_source, stock_in_id = stock_key.split("#") | |
# Prepare data for the API request | |
data = { | |
"qty": stock_in, | |
"stock_in_source": stock_in_source.strip(), | |
"stock_in_id": stock_in_id.strip() | |
} | |
print('--- update: ', data) | |
# API endpoint | |
api_url = base_url + "/v1/simulation/fifo" | |
# Make the PUT request | |
response = requests.put(api_url, data=data) | |
if response.status_code == 200: | |
st.success("Update successful!") | |
show_table() | |
else: | |
st.error(f"Update failed. Status code: {response.status_code}") | |
st.write("### History") | |
cache = st.session_state.cache | |
#if cache empty, set default | |
print('size:: ', len(cache.keys())) | |
if cache is None or len(cache.keys()) == 0: | |
cache = {'None': ''} | |
tabs_title = list(cache.keys()) | |
tabs = st.tabs(tabs_title) | |
for tab, key in zip(tabs, list(cache.keys())): | |
with tab: | |
if st.button('Show', key=key): | |
data = cache[key] | |
tables = format_df(data) | |
st.write(f'History of **{data[0]["product_name"]}**, at **{data[0]["outlet_name"]}**') | |
for table in tables: | |
col1, col2 = st.columns(2) | |
col1.write(table['main']) | |
col2.write(table['detail']) | |
st.write("---") | |
# show_table() | |
with st.sidebar.form('form_cache'): | |
if st.form_submit_button("Clear Cache"): | |
if "cache" in st.session_state: | |
del st.session_state["cache"] | |
st.session_state.cache = {} | |