Spaces:
Sleeping
Sleeping
| # database/setup.py | |
| import os | |
| import sqlite3 | |
| import json | |
| from pymilvus import MilvusClient, FieldSchema, CollectionSchema, DataType | |
| from llama_index.embeddings.google import GooglePairedEmbeddings | |
| # 导入你的工具注册表 | |
| from tools.tool_registry import get_all_tools | |
| # --- 配置持久化路径 --- | |
| DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data") | |
| SQLITE_DB_PATH = os.path.join(DATA_DIR, "tools.metadata.db") | |
| MILVUS_DATA_PATH = os.path.join( | |
| DATA_DIR, "milvus_data.db" | |
| ) # Milvus Lite 将数据存在一个文件中 | |
| # --- Milvus Lite 配置 --- | |
| MILVUS_COLLECTION_NAME = "tool_embeddings" | |
| EMBEDDING_DIM = 768 # Google's text-embedding-004 model dimension | |
| # --- 全局变量,避免重复初始化 --- | |
| _db_initialized = False | |
| _milvus_initialized = False | |
| def initialize_system(): | |
| """ | |
| 系统的主初始化函数。 | |
| 它会创建目录、设置数据库和向量库,并加载工具。 | |
| 这个函数是幂等的,即多次调用不会产生副作用。 | |
| """ | |
| global _db_initialized, _milvus_initialized | |
| print("--- 开始系统初始化 ---") | |
| # 1. 创建数据目录 | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| # 2. 初始化SQLite数据库 | |
| if not _db_initialized: | |
| _init_sqlite_db() | |
| _db_initialized = True | |
| # 3. 初始化Milvus Lite向量数据库 | |
| if not _milvus_initialized: | |
| milvus_client = _init_milvus_lite() | |
| _milvus_initialized = True | |
| else: | |
| milvus_client = MilvusClient(uri=MILVUS_DATA_PATH) | |
| # 4. 获取所有工具定义 | |
| all_tools_definitions = get_all_tools() | |
| # 5. 将工具元数据同步到SQLite | |
| _sync_tools_to_sqlite(all_tools_definitions) | |
| # 6. 将工具描述的嵌入同步到Milvus Lite | |
| _sync_tool_embeddings_to_milvus(milvus_client) | |
| # 7. 从LlamaIndex创建工具推荐器 (在这里创建并返回) | |
| from core.tool_recommender import LlamaIndexToolRecommender | |
| tool_recommender = LlamaIndexToolRecommender( | |
| milvus_client=milvus_client, sqlite_db_path=SQLITE_DB_PATH | |
| ) | |
| print("--- 系统初始化完成 ---") | |
| return all_tools_definitions, tool_recommender | |
| def _init_sqlite_db(): | |
| """初始化SQLite数据库并创建表。""" | |
| print(f"SQLite DB 路径: {SQLITE_DB_PATH}") | |
| with sqlite3.connect(SQLITE_DB_PATH) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS tools ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT UNIQUE NOT NULL, | |
| description TEXT NOT NULL, | |
| parameters TEXT NOT NULL -- 存储JSON字符串 | |
| ) | |
| """ | |
| ) | |
| conn.commit() | |
| print("SQLite DB 表已确认存在。") | |
| def _init_milvus_lite(): | |
| """初始化Milvus Lite并创建集合和索引。""" | |
| print(f"Milvus Lite 数据路径: {MILVUS_DATA_PATH}") | |
| client = MilvusClient(uri=MILVUS_DATA_PATH) | |
| if not client.has_collection(collection_name=MILVUS_COLLECTION_NAME): | |
| print(f"Milvus集合 '{MILVUS_COLLECTION_NAME}' 不存在,正在创建...") | |
| fields = [ | |
| FieldSchema(name="id", dtype=DataType.INT64, is_primary=True), | |
| FieldSchema( | |
| name="embedding", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM | |
| ), | |
| ] | |
| schema = CollectionSchema(fields, description="Tool embedding collection") | |
| client.create_collection(collection_name=MILVUS_COLLECTION_NAME, schema=schema) | |
| index_params = client.prepare_index_params() | |
| index_params.add_index( | |
| field_name="embedding", | |
| index_type="AUTOINDEX", # 让Milvus自动选择最佳索引 | |
| metric_type="L2", | |
| ) | |
| client.create_index( | |
| collection_name=MILVUS_COLLECTION_NAME, index_params=index_params | |
| ) | |
| print("Milvus集合和索引创建完成。") | |
| else: | |
| print(f"Milvus集合 '{MILVUS_COLLECTION_NAME}' 已存在。") | |
| # 确保集合已加载到内存中以供搜索 | |
| client.load_collection(collection_name=MILVUS_COLLECTION_NAME) | |
| return client | |
| def _sync_tools_to_sqlite(tools_definitions): | |
| """将工具定义同步到SQLite数据库。""" | |
| print("正在同步工具元数据到SQLite...") | |
| with sqlite3.connect(SQLITE_DB_PATH) as conn: | |
| cursor = conn.cursor() | |
| for tool in tools_definitions: | |
| cursor.execute("SELECT id FROM tools WHERE name = ?", (tool.name,)) | |
| if cursor.fetchone() is None: | |
| # 工具不存在,插入新工具 | |
| cursor.execute( | |
| "INSERT INTO tools (name, description, parameters) VALUES (?, ?, ?)", | |
| (tool.name, tool.description, json.dumps(tool.args)), | |
| ) | |
| print(f" - 新增工具到SQLite: {tool.name}") | |
| conn.commit() | |
| print("SQLite同步完成。") | |
| def _sync_tool_embeddings_to_milvus(milvus_client): | |
| """计算并同步工具描述的嵌入到Milvus Lite。""" | |
| print("正在同步工具嵌入到Milvus...") | |
| # 1. 从SQLite获取所有工具 | |
| with sqlite3.connect(SQLITE_DB_PATH) as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT id, description FROM tools") | |
| all_tools_in_db = cursor.fetchall() | |
| # 2. 获取Milvus中已存在的工具ID | |
| try: | |
| existing_milvus_ids_raw = milvus_client.query( | |
| collection_name=MILVUS_COLLECTION_NAME, | |
| filter="id > 0", | |
| output_fields=["id"], | |
| ) | |
| existing_milvus_ids = {item["id"] for item in existing_milvus_ids_raw} | |
| except Exception: | |
| existing_milvus_ids = set() | |
| # 3. 找出需要计算嵌入的新工具 | |
| new_tools_to_embed = [ | |
| (tool_id, description) | |
| for tool_id, description in all_tools_in_db | |
| if tool_id not in existing_milvus_ids | |
| ] | |
| if not new_tools_to_embed: | |
| print("所有工具嵌入已是最新,无需同步。") | |
| return | |
| print(f"发现 {len(new_tools_to_embed)} 个新工具需要生成嵌入...") | |
| # 4. 初始化嵌入模型 | |
| try: | |
| # 确保你的API Key已在环境中设置 | |
| embed_model = GooglePairedEmbeddings( | |
| model_name="models/text-embedding-004", | |
| task_type="retrieval_document", # 用于存储的文档嵌入 | |
| ) | |
| except Exception as e: | |
| print(f"错误:无法初始化Google嵌入模型。请检查API Key。 - {e}") | |
| return | |
| # 5. 生成嵌入并准备插入 | |
| tool_ids_to_insert = [tool[0] for tool in new_tools_to_embed] | |
| descriptions_to_embed = [tool[1] for tool in new_tools_to_embed] | |
| embeddings = embed_model.get_text_embedding_batch( | |
| descriptions_to_embed, show_progress=True | |
| ) | |
| data_to_insert = [ | |
| {"id": tool_id, "embedding": embedding} | |
| for tool_id, embedding in zip(tool_ids_to_insert, embeddings) | |
| ] | |
| # 6. 插入到Milvus | |
| milvus_client.insert(collection_name=MILVUS_COLLECTION_NAME, data=data_to_insert) | |
| milvus_client.flush([MILVUS_COLLECTION_NAME]) # 确保数据写入 | |
| print(f"成功将 {len(data_to_insert)} 个新嵌入插入到Milvus。") | |