import streamlit as st import pandas as pd import os import json import random import string from datetime import datetime, timedelta import time import threading from huggingface_hub import HfApi, hf_hub_download # --- Настройки Hugging Face --- REPO_ID = "Testbase1/testsett" # Замените на ваш репозиторий DB_FILENAME = "auth_system.json" # JSON база данных HF_TOKEN_WRITE = os.getenv("HF_TOKEN") # Токен для записи (загрузки) HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") # Токен для чтения (скачивания) # Глобальный блокировщик для доступа к JSON-файлу db_lock = threading.Lock() # --- Кеширование функций для повышения производительности --- @st.cache_data def load_db(): """Загружает базу данных из JSON-файла.""" if not os.path.exists(DB_FILENAME): data = {"users": [], "products": [], "cart": [], "sales": []} save_db(data) return data try: with open(DB_FILENAME, "r", encoding="utf8") as f: data = json.load(f) except json.JSONDecodeError: data = {"users": [], "products": [], "cart": [], "sales": []} save_db(data) return data def save_db(data): """Сохраняет базу данных в JSON-файл.""" with db_lock: with open(DB_FILENAME, "w", encoding="utf8") as f: json.dump(data, f, ensure_ascii=False, indent=4) def upload_db_to_hf(): """Загружает JSON-файл базы данных в репозиторий Hugging Face.""" try: api = HfApi() api.upload_file( path_or_fileobj=DB_FILENAME, path_in_repo=DB_FILENAME, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"Автоматическое резервное копирование базы данных {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) print("Резервная копия JSON базы успешно загружена на Hugging Face.") except Exception as e: print("Ошибка при загрузке резервной копии:", e) def download_db_from_hf(): """Скачивает JSON-файл базы данных из репозитория Hugging Face.""" try: hf_hub_download( repo_id=REPO_ID, filename=DB_FILENAME, repo_type="dataset", token=HF_TOKEN_READ, local_dir=".", local_dir_use_symlinks=False ) print("JSON база успешно скачана из Hugging Face.") return True except Exception as e: print("Ошибка при скачивании JSON базы:", e) return False def periodic_backup(): """ Периодически, каждые 15 секунд, вызывается функция upload_db_to_hf(). """ while True: upload_db_to_hf() time.sleep(100) # --- Вспомогательные функции --- def generate_token(): """Генерирует случайный токен.""" return ''.join(random.choices(string.ascii_letters + string.digits, k=13)) def get_new_id(items): """Возвращает новый ID для элемента.""" return max([item.get("id", 0) for item in items], default=0) + 1 # --- Функции для работы с продажами --- def record_sales(): """Записывает информацию о продажах, обновляет базу данных и очищает корзину.""" with db_lock: data = load_db() cart_items = data["cart"] if not cart_items: return None, None, 0.0 sales_details = [] total_amount = 0.0 now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") sale_id = generate_token() for item in cart_items: product = next((p for p in data["products"] if p["id"] == item["product_id"]), None) if product: sale_amount = item["quantity"] * product["sale_price"] total_amount += sale_amount sales_details.append([product["id"], item["quantity"], sale_amount, now]) new_sale_id = get_new_id(data["sales"]) data["sales"].append({ "id": new_sale_id, "sale_id": sale_id, "product_id": product["id"], "quantity": item["quantity"], "total_amount": sale_amount, "sale_date": now }) data["cart"] = [] save_db(data) return sales_details, sale_id, total_amount # --- Страницы приложения --- def register(): """Страница регистрации.""" st.title('Регистрация') username = st.text_input("Введите ваше имя пользователя") admin_password = st.text_input("Введите пароль администратора", type="password") if st.button("Зарегистрироваться"): if admin_password == "morshenfullsumflpol": token = generate_token() with db_lock: data = load_db() if any(u["username"] == username for u in data["users"]): st.error("Это имя пользователя уже занято. Попробуйте другое.") else: new_id = get_new_id(data["users"]) data["users"].append({"id": new_id, "username": username, "token": token}) save_db(data) st.success(f"Регистрация успешна! Ваш токен: {token}") else: st.error("Неверный пароль администратора!") def login(): """Страница авторизации.""" st.title('Авторизация') token_input = st.text_input("Введите ваш токен") if st.button("Войти"): data = load_db() user = next((u for u in data["users"] if u["token"] == token_input), None) if user: st.session_state.logged_in = True st.session_state.username = user["username"] st.session_state.user_id = user["id"] st.success("Добро пожаловать!") else: st.error("Неверный токен!") def add_product(): """Страница добавления товара.""" st.title("Добавление товара") product_name = st.text_input("Название товара").strip().lower() product_description = st.text_area("Описание товара") purchase_price = st.number_input("Приходная цена", min_value=0.0, step=0.01) sale_price = st.number_input("Отпускная цена", min_value=0.0, step=0.01) product_quantity = st.number_input("Количество на складе", min_value=0, step=1) if st.button("Добавить товар"): if product_name and sale_price: with db_lock: data = load_db() new_id = get_new_id(data["products"]) data["products"].append({ "id": new_id, "name": product_name, "description": product_description, "purchase_price": purchase_price, "sale_price": sale_price, "quantity_in_stock": product_quantity, "user_id": st.session_state.user_id }) save_db(data) st.success("Товар успешно добавлен!") else: st.error("Пожалуйста, введите все обязательные данные.") def edit_products(): """Страница редактирования товара.""" st.title("Редактирование товара") data = load_db() products = [p for p in data["products"] if p["user_id"] == st.session_state.user_id] if products: product_names = [p["name"] for p in products] selected_product_name = st.selectbox("Выберите товар", product_names) product = next(p for p in products if p["name"] == selected_product_name) new_name = st.text_input("Новое название товара", product["name"]) new_description = st.text_area("Новое описание товара", product["description"]) new_purchase_price = st.number_input("Новая приходная цена", min_value=0.0, step=0.01, value=product["purchase_price"]) new_sale_price = st.number_input("Новая отпускная цена", min_value=0.0, step=0.01, value=product["sale_price"]) new_quantity_in_stock = st.number_input("Новое количество на складе", min_value=0, step=1, value=product["quantity_in_stock"]) if st.button("Сохранить изменения"): with db_lock: data = load_db() for p in data["products"]: if p["id"] == product["id"]: p["name"] = new_name p["description"] = new_description p["purchase_price"] = new_purchase_price p["sale_price"] = new_sale_price p["quantity_in_stock"] = new_quantity_in_stock break save_db(data) st.success("Товар успешно обновлен!") if st.button("Удалить товар"): with db_lock: data = load_db() data["products"] = [p for p in data["products"] if p["id"] != product["id"]] save_db(data) st.success("Товар успешно удален!") else: st.info("У вас пока нет добавленных товаров.") def add_to_cart(): """Страница добавления товара в корзину.""" st.title("Отпуск товара") search_term = st.text_input("Поиск товара").strip().lower() data = load_db() products = [p for p in data["products"] if p["user_id"] == st.session_state.user_id and search_term in p["name"].lower()] if products: for product in products: cols = st.columns(4) with cols[0]: st.write(product["name"]) st.write(f"**Отпускная цена**: {product['sale_price']:.2f}") st.write(f"**Остаток на складе**: {product['quantity_in_stock']}") with cols[1]: quantity = st.number_input(f"Количество для '{product['name']}'", min_value=0, max_value=product["quantity_in_stock"], key=f"quantity_{product['id']}") with cols[2]: add_button = st.button(f"Добавить в корзину", key=f"add_{product['id']}") if add_button: if quantity <= product["quantity_in_stock"]: with db_lock: data = load_db() cart = data["cart"] existing = next((item for item in cart if item["product_id"] == product["id"]), None) if existing: existing["quantity"] += quantity else: new_cart_id = get_new_id(cart) cart.append({"id": new_cart_id, "product_id": product["id"], "quantity": quantity}) for p in data["products"]: if p["id"] == product["id"]: p["quantity_in_stock"] -= quantity break save_db(data) st.success(f"Товар '{product['name']}' успешно добавлен в корзину!") else: st.error(f"Недостаточное количество товара '{product['name']}' на складе!") with cols[3]: remove_button = st.button(f"Удалить из корзины", key=f"remove_{product['id']}") if remove_button: if quantity > 0: with db_lock: data = load_db() cart = data["cart"] existing = next((item for item in cart if item["product_id"] == product["id"]), None) if existing: if existing["quantity"] > quantity: existing["quantity"] -= quantity else: cart.remove(existing) for p in data["products"]: if p["id"] == product["id"]: p["quantity_in_stock"] += quantity break save_db(data) st.success(f"Товар '{product['name']}' успешно удалён из корзины!") else: st.error(f"Введите количество для удаления товара '{product['name']}'.") st.subheader("Состояние корзины") data = load_db() cart_items = [] for item in data["cart"]: prod = next((p for p in data["products"] if p["id"] == item["product_id"]), None) if prod: total = item["quantity"] * prod["sale_price"] cart_items.append([prod["name"], item["quantity"], prod["sale_price"], total]) if cart_items: df = pd.DataFrame(cart_items, columns=["Название", "Количество", "Цена за единицу", "Итого"]) st.dataframe(df.style.format({"Цена за единицу": "{:.2f}", "Итого": "{:.2f}"}), use_container_width=True) total_quantity = sum(item[1] for item in cart_items) total_price = sum(item[3] for item in cart_items) st.write(f"Общее количество: {total_quantity}, Общая стоимость: {total_price:.2f}") if st.button("Пробить"): sales_details, sale_id, total_amount = record_sales() if sales_details: st.write("**Детали сделок:**") sale_details_df = pd.DataFrame(sales_details, columns=["ID товара", "Количество", "Сумма", "Дата и время"]) st.dataframe(sale_details_df.style.format({"Сумма": "{:.2f}", "Дата и время": lambda x: pd.to_datetime(x).strftime('%d-%m-%Y %H:%M:%S')}), use_container_width=True) st.write(f"**Общая сумма сделки:** {total_amount:.2f}") st.success("Корзина успешно пробита! Все товары добавлены в отчет и корзина очищена.") else: st.info("Корзина пуста.") else: st.info("По вашему запросу товары не найдены.") def monthly_report(): """Страница отчета о продажах за месяц.""" st.title("Отчет о продажах за месяц") data = load_db() start_date = datetime.now().replace(day=1) end_date = (datetime.now() + timedelta(days=31)).replace(day=1) sales = [] for sale in data["sales"]: sale_date = datetime.strptime(sale["sale_date"], "%Y-%m-%d %H:%M:%S") if start_date <= sale_date < end_date: prod = next((p for p in data["products"] if p["id"] == sale["product_id"]), None) if prod and prod["user_id"] == st.session_state.user_id: sales.append({ "name": prod["name"], "quantity": sale["quantity"], "sale_price": prod["sale_price"], "total_sales": sale["total_amount"], "profit": sale["quantity"] * (prod["sale_price"] - prod["purchase_price"]) }) report = {} for s in sales: key = s["name"] if key not in report: report[key] = {"quantity": 0, "sale_price": s["sale_price"], "total_sales": 0, "profit": 0} report[key]["quantity"] += s["quantity"] report[key]["total_sales"] += s["total_sales"] report[key]["profit"] += s["profit"] report_data = [] for name, values in report.items(): report_data.append([name, values["quantity"], values["sale_price"], values["total_sales"], values["profit"]]) total_sales = sum(item[3] for item in report_data) total_profit = sum(item[4] for item in report_data) if report_data: df = pd.DataFrame(report_data, columns=["Название", "Общее количество", "Отпускная цена", "Общие продажи", "Прибыль"]) st.write(f"Отчет за {datetime.now().strftime('%B %Y')}") st.dataframe(df.style.format({"Отпускная цена": "{:.2f}", "Общие продажи": "{:.2f}", "Прибыль": "{:.2f}"}), use_container_width=True) st.write(f"**Общая сумма продаж**: {total_sales:.2f}") st.write(f"**Общая сумма прибыли**: {total_profit:.2f}") if st.button("Сделки"): data = load_db() sales_grouped = {} for sale in data["sales"]: prod = next((p for p in data["products"] if p["id"] == sale["product_id"]), None) if prod and prod["user_id"] == st.session_state.user_id: key = sale["sale_id"] if key not in sales_grouped: sales_grouped[key] = {"sale_date": sale["sale_date"], "total_amount": 0, "details": []} sales_grouped[key]["total_amount"] += sale["total_amount"] sales_grouped[key]["details"].append([prod["name"], sale["quantity"], prod["sale_price"], sale["sale_date"]]) for key, group in sales_grouped.items(): st.write(f"**Сделка ID:** {key} ({group['sale_date']}) - **Общая сумма:** {group['total_amount']:.2f}") df_sales = pd.DataFrame(group["details"], columns=["Название товара", "Количество", "Цена за единицу", "Дата и время"]) st.dataframe(df_sales.style.format({"Дата и время": lambda x: pd.to_datetime(x).strftime('%d-%m-%Y %H:%M:%S')}), use_container_width=True) else: st.info("Нет данных о продажах за этот месяц.") # --- Главная функция приложения --- def main(): """Главная функция приложения.""" st.warning("Попытка скачивания базы данных из Hugging Face...") download_success = download_db_from_hf() if download_success: st.success("JSON база успешно скачана из Hugging Face!") try: os.chmod(DB_FILENAME, 0o666) st.info("Права доступа к JSON базе изменены (read-write).") except Exception as e: st.error(f"Не удалось изменить права доступа: {e}") else: if os.path.exists(DB_FILENAME): st.info("Не удалось скачать JSON базу, используется локальная копия.") else: st.error("Не удалось скачать JSON базу и локальной базы нет. Будет создана новая база.") with open(DB_FILENAME, "w", encoding="utf8") as f: json.dump({"users": [], "products": [], "cart": [], "sales": []}, f, ensure_ascii=False, indent=4) if 'logged_in' not in st.session_state: st.session_state.logged_in = False st.sidebar.title("Навигация") if st.session_state.logged_in: st.sidebar.title(f"Привет, {st.session_state.username}!") option = st.sidebar.selectbox("Выберите действие", ["Добавить товар", "Отпуск товара", "Редактировать товары", "Отчет за месяц", "Выйти"]) if option == "Добавить товар": add_product() elif option == "Отпуск товара": add_to_cart() elif option == "Редактировать товары": edit_products() elif option == "Отчет за месяц": monthly_report() elif option == "Выйти": st.session_state.logged_in = False st.session_state.username = None st.session_state.user_id = None st.success("Вы вышли из системы!") else: page = st.sidebar.selectbox("Выберите страницу", ["Авторизация", "Регистрация"]) if page == "Регистрация": register() elif page == "Авторизация": login() if __name__ == "__main__": # Запускаем фоновый поток резервного копирования сразу backup_thread = threading.Thread(target=periodic_backup, daemon=True) backup_thread.start() main()