Bloom_Ware / core /database /optimized.py
XiaoBai1221's picture
Bloom Ware 最終修復版本 (Spaces 兼容)
3f0377e
"""
優化後的數據庫調用層
整合緩存、批量操作、請求合併等優化策略
"""
import logging
from datetime import datetime
from typing import Dict, Any, Optional, List
import asyncio
from .base import (
firestore_db, users_collection, chats_collection,
messages_collection,
get_user_by_id as _get_user_by_id_original,
get_chat as _get_chat_original,
get_user_chats as _get_user_chats_original,
save_message as _save_message_original,
save_chat_message as _save_chat_message_original,
get_user_memories as _get_user_memories_original,
)
from .cache import db_cache
logger = logging.getLogger("DatabaseOptimized")
# ==================== 用戶操作優化 ====================
async def get_user_by_id(user_id: str) -> Optional[Dict[str, Any]]:
"""
優化版:獲取用戶資料(帶緩存)
優化策略:
1. 先查緩存
2. 緩存未命中時,使用請求合併避免重複查詢
3. 查詢結果寫入緩存
"""
# 1. 檢查緩存
cached_user = await db_cache.get_user_cached(user_id)
if cached_user is not None:
logger.debug(f"用戶緩存命中: {user_id}")
return cached_user
# 2. 使用請求合併
cache_key = f"fetch_user:{user_id}"
async def fetch_user():
logger.debug(f"從數據庫獲取用戶: {user_id}")
user = await _get_user_by_id_original(user_id)
if user:
# 寫入緩存
await db_cache.set_user_cache(user_id, user)
return user
return await db_cache.coalesce_request(cache_key, fetch_user)
# ==================== 對話操作優化 ====================
async def get_chat(chat_id: str) -> Dict[str, Any]:
"""
優化版:獲取對話(帶緩存)
優化策略:
1. 先查緩存
2. 緩存未命中時查詢數據庫
3. 結果寫入緩存
"""
# 1. 檢查緩存
cached_chat = await db_cache.get_chat_cached(chat_id)
if cached_chat is not None:
logger.debug(f"對話緩存命中: {chat_id}")
return {"success": True, "chat": cached_chat}
# 2. 使用請求合併
cache_key = f"fetch_chat:{chat_id}"
async def fetch_chat():
logger.debug(f"從數據庫獲取對話: {chat_id}")
result = await _get_chat_original(chat_id)
if result.get("success") and result.get("chat"):
# 寫入緩存
await db_cache.set_chat_cache(chat_id, result["chat"])
return result
return await db_cache.coalesce_request(cache_key, fetch_chat)
async def get_user_chats(user_id: str) -> Dict[str, Any]:
"""
優化版:獲取用戶對話列表(帶緩存)
優化策略:
1. 先查緩存
2. 緩存未命中時查詢數據庫
3. 結果寫入緩存(短時間)
"""
# 1. 檢查緩存
cached_chats = await db_cache.get_user_chats_cached(user_id)
if cached_chats is not None:
logger.debug(f"用戶對話列表緩存命中: {user_id}")
return {"success": True, "chats": cached_chats}
# 2. 使用請求合併
cache_key = f"fetch_user_chats:{user_id}"
async def fetch_chats():
logger.debug(f"從數據庫獲取用戶對話列表: {user_id}")
result = await _get_user_chats_original(user_id)
if result.get("success") and result.get("chats"):
# 寫入緩存
await db_cache.set_user_chats_cache(user_id, result["chats"])
return result
return await db_cache.coalesce_request(cache_key, fetch_chats)
async def save_chat_message(chat_id: str, sender: str, content: str, buffered: bool = False) -> Dict[str, Any]:
"""
優化版:保存對話消息(支援批量緩衝)
優化策略:
1. buffered=True: 先放入緩衝區,10秒或50條後批量寫入(適用於高頻場景)
2. buffered=False: 直接寫入(確保數據一致性,適用於關鍵操作)
3. 使對話緩存失效
"""
try:
if buffered:
# 使用緩衝寫入(非阻塞)
should_flush = await db_cache.buffer_write("messages", {
"chat_id": chat_id,
"sender": sender,
"content": content,
"timestamp": datetime.now()
})
# 如果緩衝區已滿,觸發批量寫入(後台執行)
if should_flush:
import asyncio
asyncio.create_task(_flush_message_buffer())
# 使對話緩存失效
await db_cache.invalidate_chat_cache(chat_id)
return {"success": True, "buffered": True}
else:
# 直接寫入(阻塞)
result = await _save_chat_message_original(chat_id, sender, content)
if result.get("success"):
# 使對話緩存失效
await db_cache.invalidate_chat_cache(chat_id)
logger.debug(f"消息已保存並使緩存失效: {chat_id}")
return result
except Exception as e:
logger.error(f"保存消息失敗: {e}")
return {"success": False, "error": str(e)}
async def _flush_message_buffer():
"""清空消息緩衝區並批量寫入"""
try:
result = await db_cache.flush_write_buffer("messages")
if result.get("messages", 0) > 0:
logger.info(f"✅ 批量寫入 {result['messages']} 條消息")
except Exception as e:
logger.error(f"❌ 批量寫入失敗: {e}")
# ==================== 記憶操作優化 ====================
async def get_user_memories(
user_id: str,
memory_type: Optional[str] = None,
limit: int = 10,
min_importance: float = 0.0
) -> Dict[str, Any]:
"""
優化版:獲取用戶記憶(帶緩存)
優化策略:
1. 先查緩存(只緩存常見查詢)
2. 緩存未命中時查詢數據庫
3. 結果寫入緩存
"""
# 只對標準查詢使用緩存(避免緩存碎片化)
use_cache = (limit == 10 and min_importance == 0.0)
if use_cache:
# 1. 檢查緩存
cached_memories = await db_cache.get_memories_cached(user_id, memory_type)
if cached_memories is not None:
logger.debug(f"記憶緩存命中: {user_id}, type={memory_type}")
return {"success": True, "memories": cached_memories}
# 2. 查詢數據庫
cache_key = f"fetch_memories:{user_id}:{memory_type}:{limit}:{min_importance}"
async def fetch_memories():
logger.debug(f"從數據庫獲取記憶: {user_id}")
result = await _get_user_memories_original(user_id, memory_type, limit, min_importance)
if use_cache and result.get("success") and result.get("memories"):
# 寫入緩存
await db_cache.set_memories_cache(user_id, result["memories"], memory_type)
return result
return await db_cache.coalesce_request(cache_key, fetch_memories)
# ==================== 批量操作優化 ====================
class BatchWriter:
"""批量寫入管理器"""
def __init__(self):
self.batch_queue: Dict[str, List[Dict[str, Any]]] = {
"messages": [],
"memories": [],
}
self.batch_size = 50
self.lock = asyncio.Lock()
async def add_message(self, user_id: str, content: str, is_bot: bool = False):
"""添加消息到批量寫入隊列"""
async with self.lock:
self.batch_queue["messages"].append({
"user_id": user_id,
"content": content,
"is_bot": is_bot,
"timestamp": datetime.now(),
})
# 達到批量大小時觸發寫入
if len(self.batch_queue["messages"]) >= self.batch_size:
await self._flush_messages()
async def _flush_messages(self):
"""批量寫入消息"""
if not self.batch_queue["messages"]:
return
messages = self.batch_queue["messages"]
self.batch_queue["messages"] = []
logger.info(f"批量寫入 {len(messages)} 條消息到 Firestore")
try:
# Firestore 批量寫入
batch = firestore_db.batch()
for msg in messages:
doc_ref = messages_collection.document()
batch.set(doc_ref, msg)
# 執行批量寫入
await asyncio.to_thread(batch.commit)
logger.info(f"✅ 批量寫入成功: {len(messages)} 條消息")
except Exception as e:
logger.error(f"批量寫入失敗: {e}")
# 失敗時重新加入隊列
self.batch_queue["messages"].extend(messages)
async def flush_all(self):
"""清空所有隊列"""
async with self.lock:
await self._flush_messages()
# 全局批量寫入器
batch_writer = BatchWriter()
# ==================== 查詢優化工具 ====================
class QueryOptimizer:
"""查詢優化工具"""
def __init__(self):
# 記錄查詢統計
self.query_stats: Dict[str, Dict[str, int]] = {}
self.lock = asyncio.Lock()
async def record_query(self, query_type: str, from_cache: bool):
"""記錄查詢統計"""
async with self.lock:
if query_type not in self.query_stats:
self.query_stats[query_type] = {"total": 0, "cached": 0, "db": 0}
self.query_stats[query_type]["total"] += 1
if from_cache:
self.query_stats[query_type]["cached"] += 1
else:
self.query_stats[query_type]["db"] += 1
def get_stats(self) -> Dict[str, Any]:
"""獲取查詢統計"""
stats = {}
for query_type, counts in self.query_stats.items():
total = counts["total"]
cached = counts["cached"]
cache_hit_rate = (cached / total * 100) if total > 0 else 0
stats[query_type] = {
"total": total,
"from_cache": cached,
"from_db": counts["db"],
"cache_hit_rate": f"{cache_hit_rate:.2f}%"
}
return stats
async def reset_stats(self):
"""重置統計"""
async with self.lock:
self.query_stats.clear()
# 全局查詢優化器
query_optimizer = QueryOptimizer()
# ==================== 導出接口 ====================
__all__ = [
"get_user_by_id",
"get_chat",
"get_user_chats",
"save_chat_message",
"get_user_memories",
"batch_writer",
"query_optimizer",
]