Spaces:
Restarting
Restarting
from fastapi import FastAPI, Request | |
from pydantic import BaseModel | |
from config.settings import Settings | |
from src.embedding import Embedding | |
from src.search import Search | |
from src.firestore_db import FirestoreDB | |
from src.gemini import Gemini | |
from src.cache import Cache | |
from src.lottery import Lottery | |
from src.logger import logger | |
import httpx | |
import json | |
from datetime import datetime | |
from typing import Dict, Optional | |
from functools import lru_cache | |
logger.info("Bắt đầu import các module") | |
app = FastAPI() | |
logger.info("Khởi tạo FastAPI hoàn tất") | |
# Tái sử dụng AsyncClient toàn cục | |
http_client = httpx.AsyncClient() | |
logger.info("Khởi tạo httpx.AsyncClient hoàn tất") | |
class CommandHandler: | |
"""Base class for command handlers.""" | |
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: | |
raise NotImplementedError | |
class StartCommand(CommandHandler): | |
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: | |
await bot.send_message(chat_id, "Chào! Gửi câu hỏi hoặc dùng /help để xem các lệnh.") | |
class HelpCommand(CommandHandler): | |
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: | |
help_text = ( | |
"Hướng dẫn sử dụng @CotienBot:\n" | |
"- /start: Khởi động bot.\n" | |
"- /help: Hiển thị hướng dẫn này.\n" | |
"- /auth <mật_khẩu>: Xác thực để sử dụng các lệnh nhạy cảm.\n" | |
"- /train: Nhập dữ liệu đào tạo (FAISS và xổ số).\n" | |
"- /Gemini: Chuyển sang chế độ Gemini AI.\n" | |
"- /exit: Thoát chế độ Gemini.\n" | |
"- /lottery <đài> <ngày_bắt_đầu> <ngày_kết_thúc>: Phân tích tần suất, top bộ số (YYYY-MM-DD).\n" | |
"- /lottery_position <đài> <ngày> <số_ngày_trước> <độ_khớp>: Phân tích lồng quay, dự đoán (ngày DD/MM/YYYY).\n" | |
"- /load_lottery: Tải dữ liệu xổ số từ file JSON (data/lottery.json).\n" | |
"- Gửi câu hỏi thông thường để tìm kiếm thông tin." | |
) | |
cache_key = f"{user_id}:{text}" | |
bot.cache.set(cache_key, help_text) | |
await bot.send_message(chat_id, help_text) | |
class AuthCommand(CommandHandler): | |
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: | |
if await bot.is_user_authenticated(user_id): | |
await bot.send_message(chat_id, "Bạn đã xác thực rồi!") | |
return | |
password = text.split(" ", 1)[1] if len(text.split(" ")) > 1 else "" | |
if await bot.authenticate_user(user_id, password): | |
await bot.send_message(chat_id, "Xác thực thành công! Bạn có thể sử dụng bot.") | |
else: | |
await bot.send_message(chat_id, "Mật khẩu sai. Vui lòng thử lại với /auth <mật_khẩu>.") | |
class GeminiCommand(CommandHandler): | |
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: | |
bot.gemini_mode[user_id] = True | |
await bot.send_message(chat_id, "Chuyển sang chế độ Gemini. Gửi câu hỏi hoặc /exit.") | |
class ExitCommand(CommandHandler): | |
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: | |
bot.gemini_mode[user_id] = False | |
await bot.send_message(chat_id, "Đã thoát chế độ Gemini.") | |
class TrainCommand(CommandHandler): | |
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: | |
if not await bot.is_user_authenticated(user_id): | |
await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth <mật_khẩu>") | |
return | |
parts = text.split(maxsplit=1) | |
train_param = parts[1] if len(parts) > 1 else None | |
data_type = "đối thoại" # Mặc định là đối thoại | |
lottery_data = None | |
if train_param: | |
try: | |
parsed = json.loads(train_param) | |
if isinstance(parsed, dict) and "ngay" in parsed and "dai" in parsed and "giai" in parsed: | |
lottery_data = parsed | |
data_type = "xổ số" | |
except json.JSONDecodeError: | |
pass # Không phải JSON, coi là đối thoại | |
await bot.send_message(chat_id, f"Đang đào tạo {data_type}... (Dữ liệu: {train_param or 'Không có'})") | |
try: | |
from train import train_data | |
import asyncio | |
await asyncio.get_event_loop().run_in_executor( | |
None, train_data, bot.db, bot.search, bot.embedding, bot.lottery, train_param if not lottery_data else None, lottery_data | |
) | |
await bot.send_message(chat_id, f"Đào tạo {data_type} hoàn tất!") | |
except Exception as e: | |
error_type = "xổ số" if lottery_data else "đối thoại" | |
logger.error(f"Lỗi khi đào tạo {error_type}: {str(e)}") | |
await bot.send_message(chat_id, f"Lỗi khi đào tạo {error_type}: {str(e)}") | |
class LotteryPositionCommand(CommandHandler): | |
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: | |
if not await bot.is_user_authenticated(user_id): | |
await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth <mật_khẩu>") | |
return | |
parts = text.split() | |
if len(parts) != 5: | |
await bot.send_message(chat_id, "Sai định dạng. Vui lòng dùng: /lottery_position <đài> <ngày> <số_ngày_trước> <độ_khớp> (ngày DD/MM/YYYY).") | |
return | |
_, dai, date, days_before, match_threshold = parts | |
try: | |
date = datetime.strptime(date, "%d/%m/%Y").strftime("%Y-%m-%d") | |
except ValueError: | |
await bot.send_message(chat_id, "Định dạng ngày không hợp lệ. Vui lòng dùng DD/MM/YYYY.") | |
return | |
result = bot.lottery.analyze_position(dai, date, days_before, match_threshold) | |
await bot.send_message(chat_id, result) | |
class LotteryCommand(CommandHandler): | |
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: | |
if not await bot.is_user_authenticated(user_id): | |
await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth <mật_khẩu>") | |
return | |
parts = text.split() | |
if len(parts) != 4: | |
await bot.send_message(chat_id, "Sai định dạng. Vui lòng dùng: /lottery <đài> <ngày_bắt_đầu> <ngày_kết_thúc> (YYYY-MM-DD).") | |
return | |
_, dai, start_date, end_date = parts | |
result = bot.lottery.analyze(dai, start_date, end_date) | |
await bot.send_message(chat_id, result) | |
class LoadLotteryCommand(CommandHandler): | |
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: | |
if not await bot.is_user_authenticated(user_id): | |
await bot.send_message(chat_id, "Vui lòng xác thực trước khi sử dụng lệnh này. Gửi: /auth <mật_khẩu>") | |
return | |
json_file_path = "data/lottery_data.json" # Đường dẫn mặc định | |
result = bot.lottery.load_from_json(json_file_path) | |
await bot.send_message(chat_id, result) | |
class DefaultCommand(CommandHandler): | |
async def handle(self, bot, chat_id: int, user_id: int, text: str) -> None: | |
cache_key = f"{user_id}:{text}" | |
gemini_cache_key = f"{user_id}:{text}:GEMINI" | |
cache_type = "gemini" if bot.gemini_mode.get(user_id) else "default" | |
cache_key_to_use = gemini_cache_key if bot.gemini_mode.get(user_id) else cache_key | |
cached = bot.cache.get(cache_key_to_use, type=cache_type) | |
if cached: | |
await bot.send_message(chat_id, cached) | |
return | |
query_vector = bot.embedding.generate(text) | |
results = bot.search.search(query_vector) | |
context = results[0][0] if results else None | |
if bot.gemini_mode.get(user_id): | |
response = bot.gemini.query(text, context) | |
bot.cache.set(gemini_cache_key, response, type="gemini") | |
else: | |
response = context or "Không tìm thấy thông tin." | |
bot.cache.set(cache_key, response) | |
await bot.send_message(chat_id, response) | |
logger.info("Định nghĩa các CommandHandler hoàn tất") | |
class Bot: | |
def __init__(self): | |
logger.info("Bắt đầu khởi tạo Bot") | |
self.gemini_mode: Dict[int, bool] = {} | |
self.authenticated_users: Dict[int, bool] = {} | |
self._embedding = None | |
self._search = None | |
self._db = None | |
self._gemini = None | |
self._cache = None | |
self._lottery = None | |
self.command_handlers = { | |
"/start": StartCommand(), | |
"/help": HelpCommand(), | |
"/auth": AuthCommand(), | |
"/Gemini": GeminiCommand(), | |
"/exit": ExitCommand(), | |
"/train": TrainCommand(), | |
"/lottery": LotteryCommand(), | |
"/lottery_position": LotteryPositionCommand(), | |
"/load_lottery": LoadLotteryCommand(), # Thêm handler mới | |
} | |
logger.info("Khởi tạo Bot hoàn tất") | |
def embedding(self): | |
if self._embedding is None: | |
logger.info("Khởi tạo Embedding") | |
self._embedding = Embedding() | |
return self._embedding | |
def search(self): | |
if self._search is None: | |
logger.info("Khởi tạo Search") | |
self._search = Search() | |
return self._search | |
def db(self): | |
if self._db is None: | |
logger.info("Khởi tạo FirestoreDB") | |
self._db = FirestoreDB() | |
return self._db | |
def gemini(self): | |
if self._gemini is None: | |
logger.info("Khởi tạo Gemini") | |
self._gemini = Gemini() | |
return self._gemini | |
def cache(self): | |
if self._cache is None: | |
logger.info("Khởi tạo Cache") | |
self._cache = Cache() | |
return self._cache | |
def lottery(self): | |
if self._lottery is None: | |
logger.info("Khởi tạo Lottery") | |
self._lottery = Lottery(self.db, self.cache, self.embedding) | |
return self._lottery | |
async def send_message(self, chat_id: int, text: str) -> None: | |
url = f"https://api.telegram.org/bot{Settings.TELEGRAM_TOKEN}/sendMessage" | |
payload = {"chat_id": chat_id, "text": text} | |
try: | |
await http_client.post(url, json=payload) | |
except Exception as e: | |
logger.error(f"Lỗi khi gửi tin nhắn: {str(e)}") | |
async def is_user_authenticated(self, user_id: int) -> bool: | |
if user_id in self.authenticated_users: | |
return True | |
doc = self.db.db.collection("auth_users").document(str(user_id)).get() | |
if doc.exists and doc.to_dict().get("authenticated", False): | |
self.authenticated_users[user_id] = True | |
return True | |
return False | |
async def authenticate_user(self, user_id: int, password: str) -> bool: | |
if password == Settings.BOT_PASSWORD: | |
self.authenticated_users[user_id] = True | |
self.db.db.collection("auth_users").document(str(user_id)).set({"authenticated": True}) | |
logger.info(f"Người dùng {user_id} đã xác thực thành công") | |
return True | |
logger.warning(f"Xác thực thất bại cho người dùng {user_id}") | |
return False | |
async def handle_message(self, chat_id: int, user_id: int, text: str) -> None: | |
logger.info(f"Xử lý lệnh: {text} từ user_id: {user_id}") | |
command = text.split()[0] if text.startswith("/") else None | |
if command and command not in self.command_handlers: | |
await self.send_message(chat_id, "Lệnh không hợp lệ. Gửi /help để xem hướng dẫn.") | |
return | |
handler = self.command_handlers.get(command, DefaultCommand()) | |
await handler.handle(self, chat_id, user_id, text) | |
logger.info("Định nghĩa lớp Bot hoàn tất") | |
bot = Bot() | |
logger.info("Khởi tạo bot hoàn tất") | |
class WebhookUpdate(BaseModel): | |
message: dict | |
async def read_root(): | |
return {"status": "ok"} | |
async def health_check(): | |
return {"status": "healthy"} | |
async def webhook(update: WebhookUpdate): | |
message = update.message | |
chat_id = message["chat"]["id"] | |
user_id = message["from"]["id"] | |
text = message.get("text", "") | |
logger.info(f"Nhận tin nhắn từ {user_id}: {text}") | |
await bot.handle_message(chat_id, user_id, text) | |
return {"status": "ok"} | |
async def startup_event(): | |
logger.info("Bắt đầu startup_event") | |
webhook_url = f"{Settings.WEBHOOK_URL}/webhook" | |
url = f"https://api.telegram.org/bot{Settings.TELEGRAM_TOKEN}/setWebhook" | |
try: | |
logger.info(f"Gửi yêu cầu thiết lập webhook tới {url}") | |
response = await http_client.post(url, json={"url": webhook_url}, timeout=10.0) | |
logger.info(f"Phản hồi từ Telegram API: {response.status_code}") | |
if response.status_code == 200: | |
logger.info("Webhook đã được thiết lập") | |
else: | |
logger.error(f"Thiết lập webhook thất bại: {response.text}") | |
except Exception as e: | |
logger.error(f"Lỗi khi thiết lập webhook: {str(e)}") | |
logger.info("Kết thúc startup_event") | |
async def shutdown_event(): | |
logger.info("Bắt đầu shutdown_event") | |
await http_client.aclose() | |
logger.info("Đã đóng HTTP client") | |
logger.info("Định nghĩa các route và event handlers hoàn tất") |