Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import os | |
import json | |
import random | |
import string | |
from datetime import datetime, timedelta | |
import time | |
import threading | |
from huggingface_hub import HfApi, hf_hub_download | |
# --- Настройки Hugging Face --- | |
REPO_ID = "Testbase1/testsett" # Замените на ваш репозиторий | |
DB_FILENAME = "auth_system.json" # JSON база данных | |
HF_TOKEN_WRITE = os.getenv("HF_TOKEN") # Токен для записи (загрузки) | |
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") # Токен для чтения (скачивания) | |
# Глобальный блокировщик для доступа к JSON-файлу | |
db_lock = threading.Lock() | |
# --- Кеширование функций для повышения производительности --- | |
def load_db(): | |
"""Загружает базу данных из JSON-файла.""" | |
if not os.path.exists(DB_FILENAME): | |
data = {"users": [], "products": [], "cart": [], "sales": []} | |
save_db(data) | |
return data | |
try: | |
with open(DB_FILENAME, "r", encoding="utf8") as f: | |
data = json.load(f) | |
except json.JSONDecodeError: | |
data = {"users": [], "products": [], "cart": [], "sales": []} | |
save_db(data) | |
return data | |
def save_db(data): | |
"""Сохраняет базу данных в JSON-файл.""" | |
with db_lock: | |
with open(DB_FILENAME, "w", encoding="utf8") as f: | |
json.dump(data, f, ensure_ascii=False, indent=4) | |
def upload_db_to_hf(): | |
"""Загружает JSON-файл базы данных в репозиторий Hugging Face.""" | |
try: | |
api = HfApi() | |
api.upload_file( | |
path_or_fileobj=DB_FILENAME, | |
path_in_repo=DB_FILENAME, | |
repo_id=REPO_ID, | |
repo_type="dataset", | |
token=HF_TOKEN_WRITE, | |
commit_message=f"Автоматическое резервное копирование базы данных {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" | |
) | |
print("Резервная копия JSON базы успешно загружена на Hugging Face.") | |
except Exception as e: | |
print("Ошибка при загрузке резервной копии:", e) | |
def download_db_from_hf(): | |
"""Скачивает JSON-файл базы данных из репозитория Hugging Face.""" | |
try: | |
hf_hub_download( | |
repo_id=REPO_ID, | |
filename=DB_FILENAME, | |
repo_type="dataset", | |
token=HF_TOKEN_READ, | |
local_dir=".", | |
local_dir_use_symlinks=False | |
) | |
print("JSON база успешно скачана из Hugging Face.") | |
return True | |
except Exception as e: | |
print("Ошибка при скачивании JSON базы:", e) | |
return False | |
def periodic_backup(): | |
""" | |
Периодически, каждые 15 секунд, вызывается функция upload_db_to_hf(). | |
""" | |
while True: | |
upload_db_to_hf() | |
time.sleep(100) | |
# --- Вспомогательные функции --- | |
def generate_token(): | |
"""Генерирует случайный токен.""" | |
return ''.join(random.choices(string.ascii_letters + string.digits, k=13)) | |
def get_new_id(items): | |
"""Возвращает новый ID для элемента.""" | |
return max([item.get("id", 0) for item in items], default=0) + 1 | |
# --- Функции для работы с продажами --- | |
def record_sales(): | |
"""Записывает информацию о продажах, обновляет базу данных и очищает корзину.""" | |
with db_lock: | |
data = load_db() | |
cart_items = data["cart"] | |
if not cart_items: | |
return None, None, 0.0 | |
sales_details = [] | |
total_amount = 0.0 | |
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
sale_id = generate_token() | |
for item in cart_items: | |
product = next((p for p in data["products"] if p["id"] == item["product_id"]), None) | |
if product: | |
sale_amount = item["quantity"] * product["sale_price"] | |
total_amount += sale_amount | |
sales_details.append([product["id"], item["quantity"], sale_amount, now]) | |
new_sale_id = get_new_id(data["sales"]) | |
data["sales"].append({ | |
"id": new_sale_id, | |
"sale_id": sale_id, | |
"product_id": product["id"], | |
"quantity": item["quantity"], | |
"total_amount": sale_amount, | |
"sale_date": now | |
}) | |
data["cart"] = [] | |
save_db(data) | |
return sales_details, sale_id, total_amount | |
# --- Страницы приложения --- | |
def register(): | |
"""Страница регистрации.""" | |
st.title('Регистрация') | |
username = st.text_input("Введите ваше имя пользователя") | |
admin_password = st.text_input("Введите пароль администратора", type="password") | |
if st.button("Зарегистрироваться"): | |
if admin_password == "morshenfullsumflpol": | |
token = generate_token() | |
with db_lock: | |
data = load_db() | |
if any(u["username"] == username for u in data["users"]): | |
st.error("Это имя пользователя уже занято. Попробуйте другое.") | |
else: | |
new_id = get_new_id(data["users"]) | |
data["users"].append({"id": new_id, "username": username, "token": token}) | |
save_db(data) | |
st.success(f"Регистрация успешна! Ваш токен: {token}") | |
else: | |
st.error("Неверный пароль администратора!") | |
def login(): | |
"""Страница авторизации.""" | |
st.title('Авторизация') | |
token_input = st.text_input("Введите ваш токен") | |
if st.button("Войти"): | |
data = load_db() | |
user = next((u for u in data["users"] if u["token"] == token_input), None) | |
if user: | |
st.session_state.logged_in = True | |
st.session_state.username = user["username"] | |
st.session_state.user_id = user["id"] | |
st.success("Добро пожаловать!") | |
else: | |
st.error("Неверный токен!") | |
def add_product(): | |
"""Страница добавления товара.""" | |
st.title("Добавление товара") | |
product_name = st.text_input("Название товара").strip().lower() | |
product_description = st.text_area("Описание товара") | |
purchase_price = st.number_input("Приходная цена", min_value=0.0, step=0.01) | |
sale_price = st.number_input("Отпускная цена", min_value=0.0, step=0.01) | |
product_quantity = st.number_input("Количество на складе", min_value=0, step=1) | |
if st.button("Добавить товар"): | |
if product_name and sale_price: | |
with db_lock: | |
data = load_db() | |
new_id = get_new_id(data["products"]) | |
data["products"].append({ | |
"id": new_id, | |
"name": product_name, | |
"description": product_description, | |
"purchase_price": purchase_price, | |
"sale_price": sale_price, | |
"quantity_in_stock": product_quantity, | |
"user_id": st.session_state.user_id | |
}) | |
save_db(data) | |
st.success("Товар успешно добавлен!") | |
else: | |
st.error("Пожалуйста, введите все обязательные данные.") | |
def edit_products(): | |
"""Страница редактирования товара.""" | |
st.title("Редактирование товара") | |
data = load_db() | |
products = [p for p in data["products"] if p["user_id"] == st.session_state.user_id] | |
if products: | |
product_names = [p["name"] for p in products] | |
selected_product_name = st.selectbox("Выберите товар", product_names) | |
product = next(p for p in products if p["name"] == selected_product_name) | |
new_name = st.text_input("Новое название товара", product["name"]) | |
new_description = st.text_area("Новое описание товара", product["description"]) | |
new_purchase_price = st.number_input("Новая приходная цена", min_value=0.0, step=0.01, value=product["purchase_price"]) | |
new_sale_price = st.number_input("Новая отпускная цена", min_value=0.0, step=0.01, value=product["sale_price"]) | |
new_quantity_in_stock = st.number_input("Новое количество на складе", min_value=0, step=1, value=product["quantity_in_stock"]) | |
if st.button("Сохранить изменения"): | |
with db_lock: | |
data = load_db() | |
for p in data["products"]: | |
if p["id"] == product["id"]: | |
p["name"] = new_name | |
p["description"] = new_description | |
p["purchase_price"] = new_purchase_price | |
p["sale_price"] = new_sale_price | |
p["quantity_in_stock"] = new_quantity_in_stock | |
break | |
save_db(data) | |
st.success("Товар успешно обновлен!") | |
if st.button("Удалить товар"): | |
with db_lock: | |
data = load_db() | |
data["products"] = [p for p in data["products"] if p["id"] != product["id"]] | |
save_db(data) | |
st.success("Товар успешно удален!") | |
else: | |
st.info("У вас пока нет добавленных товаров.") | |
def add_to_cart(): | |
"""Страница добавления товара в корзину.""" | |
st.title("Отпуск товара") | |
search_term = st.text_input("Поиск товара").strip().lower() | |
data = load_db() | |
products = [p for p in data["products"] if p["user_id"] == st.session_state.user_id and search_term in p["name"].lower()] | |
if products: | |
for product in products: | |
cols = st.columns(4) | |
with cols[0]: | |
st.write(product["name"]) | |
st.write(f"**Отпускная цена**: {product['sale_price']:.2f}") | |
st.write(f"**Остаток на складе**: {product['quantity_in_stock']}") | |
with cols[1]: | |
quantity = st.number_input(f"Количество для '{product['name']}'", min_value=0, max_value=product["quantity_in_stock"], key=f"quantity_{product['id']}") | |
with cols[2]: | |
add_button = st.button(f"Добавить в корзину", key=f"add_{product['id']}") | |
if add_button: | |
if quantity <= product["quantity_in_stock"]: | |
with db_lock: | |
data = load_db() | |
cart = data["cart"] | |
existing = next((item for item in cart if item["product_id"] == product["id"]), None) | |
if existing: | |
existing["quantity"] += quantity | |
else: | |
new_cart_id = get_new_id(cart) | |
cart.append({"id": new_cart_id, "product_id": product["id"], "quantity": quantity}) | |
for p in data["products"]: | |
if p["id"] == product["id"]: | |
p["quantity_in_stock"] -= quantity | |
break | |
save_db(data) | |
st.success(f"Товар '{product['name']}' успешно добавлен в корзину!") | |
else: | |
st.error(f"Недостаточное количество товара '{product['name']}' на складе!") | |
with cols[3]: | |
remove_button = st.button(f"Удалить из корзины", key=f"remove_{product['id']}") | |
if remove_button: | |
if quantity > 0: | |
with db_lock: | |
data = load_db() | |
cart = data["cart"] | |
existing = next((item for item in cart if item["product_id"] == product["id"]), None) | |
if existing: | |
if existing["quantity"] > quantity: | |
existing["quantity"] -= quantity | |
else: | |
cart.remove(existing) | |
for p in data["products"]: | |
if p["id"] == product["id"]: | |
p["quantity_in_stock"] += quantity | |
break | |
save_db(data) | |
st.success(f"Товар '{product['name']}' успешно удалён из корзины!") | |
else: | |
st.error(f"Введите количество для удаления товара '{product['name']}'.") | |
st.subheader("Состояние корзины") | |
data = load_db() | |
cart_items = [] | |
for item in data["cart"]: | |
prod = next((p for p in data["products"] if p["id"] == item["product_id"]), None) | |
if prod: | |
total = item["quantity"] * prod["sale_price"] | |
cart_items.append([prod["name"], item["quantity"], prod["sale_price"], total]) | |
if cart_items: | |
df = pd.DataFrame(cart_items, columns=["Название", "Количество", "Цена за единицу", "Итого"]) | |
st.dataframe(df.style.format({"Цена за единицу": "{:.2f}", "Итого": "{:.2f}"}), use_container_width=True) | |
total_quantity = sum(item[1] for item in cart_items) | |
total_price = sum(item[3] for item in cart_items) | |
st.write(f"Общее количество: {total_quantity}, Общая стоимость: {total_price:.2f}") | |
if st.button("Пробить"): | |
sales_details, sale_id, total_amount = record_sales() | |
if sales_details: | |
st.write("**Детали сделок:**") | |
sale_details_df = pd.DataFrame(sales_details, columns=["ID товара", "Количество", "Сумма", "Дата и время"]) | |
st.dataframe(sale_details_df.style.format({"Сумма": "{:.2f}", | |
"Дата и время": lambda x: pd.to_datetime(x).strftime('%d-%m-%Y %H:%M:%S')}), | |
use_container_width=True) | |
st.write(f"**Общая сумма сделки:** {total_amount:.2f}") | |
st.success("Корзина успешно пробита! Все товары добавлены в отчет и корзина очищена.") | |
else: | |
st.info("Корзина пуста.") | |
else: | |
st.info("По вашему запросу товары не найдены.") | |
def monthly_report(): | |
"""Страница отчета о продажах за месяц.""" | |
st.title("Отчет о продажах за месяц") | |
data = load_db() | |
start_date = datetime.now().replace(day=1) | |
end_date = (datetime.now() + timedelta(days=31)).replace(day=1) | |
sales = [] | |
for sale in data["sales"]: | |
sale_date = datetime.strptime(sale["sale_date"], "%Y-%m-%d %H:%M:%S") | |
if start_date <= sale_date < end_date: | |
prod = next((p for p in data["products"] if p["id"] == sale["product_id"]), None) | |
if prod and prod["user_id"] == st.session_state.user_id: | |
sales.append({ | |
"name": prod["name"], | |
"quantity": sale["quantity"], | |
"sale_price": prod["sale_price"], | |
"total_sales": sale["total_amount"], | |
"profit": sale["quantity"] * (prod["sale_price"] - prod["purchase_price"]) | |
}) | |
report = {} | |
for s in sales: | |
key = s["name"] | |
if key not in report: | |
report[key] = {"quantity": 0, "sale_price": s["sale_price"], "total_sales": 0, "profit": 0} | |
report[key]["quantity"] += s["quantity"] | |
report[key]["total_sales"] += s["total_sales"] | |
report[key]["profit"] += s["profit"] | |
report_data = [] | |
for name, values in report.items(): | |
report_data.append([name, values["quantity"], values["sale_price"], values["total_sales"], values["profit"]]) | |
total_sales = sum(item[3] for item in report_data) | |
total_profit = sum(item[4] for item in report_data) | |
if report_data: | |
df = pd.DataFrame(report_data, columns=["Название", "Общее количество", "Отпускная цена", "Общие продажи", "Прибыль"]) | |
st.write(f"Отчет за {datetime.now().strftime('%B %Y')}") | |
st.dataframe(df.style.format({"Отпускная цена": "{:.2f}", "Общие продажи": "{:.2f}", "Прибыль": "{:.2f}"}), use_container_width=True) | |
st.write(f"**Общая сумма продаж**: {total_sales:.2f}") | |
st.write(f"**Общая сумма прибыли**: {total_profit:.2f}") | |
if st.button("Сделки"): | |
data = load_db() | |
sales_grouped = {} | |
for sale in data["sales"]: | |
prod = next((p for p in data["products"] if p["id"] == sale["product_id"]), None) | |
if prod and prod["user_id"] == st.session_state.user_id: | |
key = sale["sale_id"] | |
if key not in sales_grouped: | |
sales_grouped[key] = {"sale_date": sale["sale_date"], "total_amount": 0, "details": []} | |
sales_grouped[key]["total_amount"] += sale["total_amount"] | |
sales_grouped[key]["details"].append([prod["name"], sale["quantity"], prod["sale_price"], sale["sale_date"]]) | |
for key, group in sales_grouped.items(): | |
st.write(f"**Сделка ID:** {key} ({group['sale_date']}) - **Общая сумма:** {group['total_amount']:.2f}") | |
df_sales = pd.DataFrame(group["details"], columns=["Название товара", "Количество", "Цена за единицу", "Дата и время"]) | |
st.dataframe(df_sales.style.format({"Дата и время": lambda x: pd.to_datetime(x).strftime('%d-%m-%Y %H:%M:%S')}), | |
use_container_width=True) | |
else: | |
st.info("Нет данных о продажах за этот месяц.") | |
# --- Главная функция приложения --- | |
def main(): | |
"""Главная функция приложения.""" | |
st.warning("Попытка скачивания базы данных из Hugging Face...") | |
download_success = download_db_from_hf() | |
if download_success: | |
st.success("JSON база успешно скачана из Hugging Face!") | |
try: | |
os.chmod(DB_FILENAME, 0o666) | |
st.info("Права доступа к JSON базе изменены (read-write).") | |
except Exception as e: | |
st.error(f"Не удалось изменить права доступа: {e}") | |
else: | |
if os.path.exists(DB_FILENAME): | |
st.info("Не удалось скачать JSON базу, используется локальная копия.") | |
else: | |
st.error("Не удалось скачать JSON базу и локальной базы нет. Будет создана новая база.") | |
with open(DB_FILENAME, "w", encoding="utf8") as f: | |
json.dump({"users": [], "products": [], "cart": [], "sales": []}, f, ensure_ascii=False, indent=4) | |
if 'logged_in' not in st.session_state: | |
st.session_state.logged_in = False | |
st.sidebar.title("Навигация") | |
if st.session_state.logged_in: | |
st.sidebar.title(f"Привет, {st.session_state.username}!") | |
option = st.sidebar.selectbox("Выберите действие", ["Добавить товар", "Отпуск товара", "Редактировать товары", "Отчет за месяц", "Выйти"]) | |
if option == "Добавить товар": | |
add_product() | |
elif option == "Отпуск товара": | |
add_to_cart() | |
elif option == "Редактировать товары": | |
edit_products() | |
elif option == "Отчет за месяц": | |
monthly_report() | |
elif option == "Выйти": | |
st.session_state.logged_in = False | |
st.session_state.username = None | |
st.session_state.user_id = None | |
st.success("Вы вышли из системы!") | |
else: | |
page = st.sidebar.selectbox("Выберите страницу", ["Авторизация", "Регистрация"]) | |
if page == "Регистрация": | |
register() | |
elif page == "Авторизация": | |
login() | |
if __name__ == "__main__": | |
# Запускаем фоновый поток резервного копирования сразу | |
backup_thread = threading.Thread(target=periodic_backup, daemon=True) | |
backup_thread.start() | |
main() | |