| |
| """ |
| 批量分析JSON文件脚本 |
| |
| 从指定目录递归搜索JSON文件,读取其中的文本内容,向后端发送分析请求, |
| 并将结果保存到新文件。 |
| """ |
|
|
| import argparse |
| import json |
| import os |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Optional, Tuple |
|
|
| try: |
| import requests |
| except ImportError: |
| print("错误: 需要安装 requests 库") |
| print("请运行: pip install requests") |
| sys.exit(1) |
|
|
|
|
| |
| |
| DEFAULT_API_BASE = "http://localhost:5001" |
| API_ENDPOINT = "/api/analyze" |
|
|
| |
| HF_TOKEN_ENV = "HF_TOKEN" |
|
|
| |
| SEARCH_DIRS = [ |
| "data/demo/未读" |
| ] |
|
|
|
|
| def find_json_files(base_dir: str, search_dirs: list) -> list[Path]: |
| """递归搜索指定目录下的所有JSON文件""" |
| json_files = [] |
| base_path = Path(base_dir) |
| |
| for search_dir in search_dirs: |
| search_path = base_path / search_dir |
| if not search_path.exists(): |
| print(f"⚠️ 目录不存在: {search_path}") |
| continue |
| |
| |
| for json_file in search_path.rglob("*.json"): |
| json_files.append(json_file) |
| |
| return json_files |
|
|
|
|
| def load_json_file(file_path: Path) -> Optional[dict]: |
| """加载JSON文件""" |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| return json.load(f) |
| except json.JSONDecodeError as e: |
| print(f"❌ JSON解析错误 {file_path}: {e}") |
| return None |
| except Exception as e: |
| print(f"❌ 读取文件错误 {file_path}: {e}") |
| return None |
|
|
|
|
| def extract_text_from_json(data: dict) -> Optional[str]: |
| """从JSON数据中提取文本内容""" |
| if not isinstance(data, dict): |
| return None |
| |
| |
| request = data.get('request', {}) |
| if isinstance(request, dict): |
| text = request.get('text') |
| if text: |
| return text |
| |
| |
| text = data.get('text') |
| if text: |
| return text |
| |
| return None |
|
|
|
|
| def get_model_from_json(data: dict) -> Optional[str]: |
| """从JSON数据中提取模型名称(从result.model读取)""" |
| if not isinstance(data, dict): |
| return None |
| |
| result = data.get('result', {}) |
| if isinstance(result, dict): |
| model = result.get('model') |
| if model: |
| return model |
| |
| return None |
|
|
|
|
| def generate_output_filename(input_path: Path, model_name: Optional[str] = None) -> Path: |
| """生成输出文件名,处理后缀逻辑""" |
| stem = input_path.stem |
| |
| |
| if stem.endswith('_qwen2.5'): |
| stem = stem[:-8] |
| elif stem.endswith('_qwen2'): |
| stem = stem[:-6] |
| |
| |
| if model_name: |
| |
| clean_model = model_name.replace(' ', '_') |
| stem = f"{stem}_{clean_model}" |
| |
| |
| return input_path.parent / f"{stem}.json" |
|
|
|
|
| def analyze_text(api_base: str, text: str, model: Optional[str] = None, token: Optional[str] = None, max_retries: int = 3) -> Optional[dict]: |
| """向后端发送分析请求,支持自动重试""" |
| |
| api_base = api_base.rstrip('/') |
| endpoint = API_ENDPOINT.lstrip('/') |
| url = f"{api_base}/{endpoint}" |
| |
| payload = { |
| "text": text, |
| "model": model if model else "default" |
| } |
| |
| |
| headers = {"Content-Type": "application/json"} |
| if token: |
| headers["Authorization"] = f"Bearer {token}" |
| |
| |
| last_error = None |
| for attempt in range(1, max_retries + 1): |
| try: |
| response = requests.post( |
| url, |
| json=payload, |
| headers=headers, |
| timeout=300 |
| ) |
| response.raise_for_status() |
| if attempt > 1: |
| print(f" ✅ 重试成功 (第 {attempt} 次尝试)") |
| return response.json() |
| |
| except requests.exceptions.SSLError as e: |
| last_error = e |
| if attempt < max_retries: |
| wait_time = attempt * 2 |
| print(f" ⚠️ SSL错误 (尝试 {attempt}/{max_retries}),{wait_time}秒后重试...") |
| time.sleep(wait_time) |
| else: |
| print(f"❌ SSL错误: {e}") |
| print(f" 💡 提示: 网络连接不稳定,已重试 {max_retries} 次仍失败") |
| |
| except requests.exceptions.RequestException as e: |
| last_error = e |
| |
| if hasattr(e, 'response') and e.response is not None: |
| status_code = e.response.status_code |
| if status_code in [404, 401, 400]: |
| print(f"❌ API请求错误: {e}") |
| print(f" 响应状态码: {status_code}") |
| if status_code == 404: |
| print(f" 💡 提示: 如果是Private Space,请确保设置了HF Token") |
| elif status_code == 401: |
| print(f" 💡 提示: Token无效或已过期,请检查HF Token") |
| return None |
| |
| |
| if attempt < max_retries: |
| wait_time = attempt * 2 |
| print(f" ⚠️ 请求错误 (尝试 {attempt}/{max_retries}),{wait_time}秒后重试...") |
| time.sleep(wait_time) |
| else: |
| print(f"❌ API请求错误: {e}") |
| if hasattr(e, 'response') and e.response is not None: |
| try: |
| error_detail = e.response.json() |
| print(f" 错误详情: {error_detail}") |
| except: |
| if e.response.text: |
| print(f" 响应内容: {e.response.text[:200]}") |
| |
| return None |
|
|
|
|
| def save_result(output_path: Path, result: dict): |
| """保存分析结果到文件""" |
| try: |
| |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| |
| with open(output_path, 'w', encoding='utf-8') as f: |
| json.dump(result, f, ensure_ascii=False, indent=2) |
| print(f"✅ 已保存") |
| except Exception as e: |
| print(f"❌ 保存文件错误 {output_path}: {e}") |
|
|
|
|
| def process_file( |
| file_path: Path, |
| api_base: str, |
| dry_run: bool = False, |
| no_write: bool = False, |
| token: Optional[str] = None |
| ) -> Tuple[bool, Optional[Path]]: |
| """处理单个JSON文件""" |
| print(f"\n📄 处理文件: {file_path}") |
| |
| |
| data = load_json_file(file_path) |
| if data is None: |
| return False, None |
| |
| |
| text = extract_text_from_json(data) |
| if not text: |
| print(f"⚠️ 未找到文本内容,跳过") |
| return False, None |
| |
| |
| original_model = get_model_from_json(data) |
| |
| print(f" 文本长度: {len(text)} 字符") |
| if original_model: |
| print(f" 原文件模型: {original_model} (将使用默认模型)") |
| else: |
| print(f" 将使用默认模型") |
| |
| if dry_run: |
| |
| print(f" [DRY RUN] 将发送分析请求(使用默认模型,不实际执行)") |
| |
| return True, None |
| |
| |
| print(f" 📤 发送分析请求(使用默认模型)...") |
| result = analyze_text(api_base, text, "default", token) |
| |
| if result is None: |
| print(f" ❌ 分析失败") |
| return False, None |
| |
| |
| if result.get('success') is False: |
| error_msg = result.get('message', '未知错误') |
| print(f" ❌ 分析失败: {error_msg}") |
| return False, None |
| |
| |
| response_model = None |
| result_info = result.get('result', {}) |
| if isinstance(result_info, dict): |
| response_model = result_info.get('model') |
| |
| if response_model: |
| print(f" 实际使用模型: {response_model}") |
| else: |
| |
| response_model = "default" |
| print(f" ⚠️ 响应中未找到模型名,使用 'default' 作为文件名后缀") |
| |
| |
| output_path = generate_output_filename(file_path, response_model) |
| print(f" 输出文件: {output_path}") |
| |
| |
| if no_write: |
| print(f" [NO WRITE] 跳过保存文件") |
| else: |
| save_result(output_path, result) |
| |
| return True, output_path |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="批量分析JSON文件", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| 示例: |
| # 干运行模式(不实际分析) |
| python analyze_json.py --dry-run |
| |
| # 实际分析但不保存文件 |
| python analyze_json.py --no-write |
| |
| # 实际分析并保存文件 |
| python analyze_json.py |
| |
| # 指定自定义API地址 |
| python analyze_json.py --api-base http://localhost:5001 |
| |
| # 限制最多分析10个文件 |
| python analyze_json.py --max-file 10 |
| |
| # 使用HF Token访问Private Space |
| python analyze_json.py --hf-token hf_xxxxxxxxxxxxx |
| |
| # 或通过环境变量设置HF Token |
| export HF_TOKEN=hf_xxxxxxxxxxxxx |
| python analyze_json.py |
| """ |
| ) |
| |
| parser.add_argument( |
| '--dry-run', |
| action='store_true', |
| help='干运行模式,不实际发送分析请求' |
| ) |
| |
| parser.add_argument( |
| '--no-write', |
| action='store_true', |
| help='不实际保存文件(仍会发送分析请求)' |
| ) |
| |
| parser.add_argument( |
| '--api-base', |
| type=str, |
| default=DEFAULT_API_BASE, |
| help=f'后端API基础地址 (默认: {DEFAULT_API_BASE})' |
| ) |
| |
| parser.add_argument( |
| '--base-dir', |
| type=str, |
| default='.', |
| help='项目根目录 (默认: 当前目录)' |
| ) |
| |
| parser.add_argument( |
| '--max-file', |
| type=int, |
| default=None, |
| help='最多分析的文件数量 (默认: 无限制)' |
| ) |
| |
| parser.add_argument( |
| '--hf-token', |
| type=str, |
| default=None, |
| help=f'Hugging Face Token(用于Private Space,也可通过环境变量{HF_TOKEN_ENV}设置)' |
| ) |
| |
| args = parser.parse_args() |
| |
| |
| hf_token = args.hf_token or os.environ.get(HF_TOKEN_ENV) |
| |
| |
| print("=" * 60) |
| print("批量分析JSON文件") |
| print("=" * 60) |
| print(f"API地址: {args.api_base}") |
| print(f"基础目录: {args.base_dir}") |
| print(f"搜索目录: {', '.join(SEARCH_DIRS)}") |
| print(f"模式: {'DRY RUN (不实际分析)' if args.dry_run else '实际分析'}") |
| if args.max_file: |
| print(f"最大文件数: {args.max_file}") |
| if hf_token: |
| token_preview = hf_token[:10] + "..." if len(hf_token) > 10 else hf_token |
| print(f"HF Token: {token_preview} (已设置)") |
| else: |
| print(f"HF Token: 未设置 (如果是Private Space,请通过 --hf-token 或环境变量 {HF_TOKEN_ENV} 设置)") |
| print("=" * 60) |
| |
| |
| print(f"\n🔍 搜索JSON文件...") |
| json_files = find_json_files(args.base_dir, SEARCH_DIRS) |
| |
| if not json_files: |
| print("❌ 未找到任何JSON文件") |
| return |
| |
| print(f"✅ 找到 {len(json_files)} 个JSON文件") |
| |
| |
| if args.max_file and args.max_file > 0: |
| original_count = len(json_files) |
| json_files = json_files[:args.max_file] |
| if len(json_files) < original_count: |
| print(f"📌 限制处理数量: {len(json_files)} 个文件 (共找到 {original_count} 个)") |
| |
| |
| success_count = 0 |
| failed_count = 0 |
| total_to_process = len(json_files) |
| |
| for i, json_file in enumerate(json_files, 1): |
| print(f"\n[{i}/{total_to_process}]") |
| success, output_path = process_file(json_file, args.api_base, args.dry_run, args.no_write, hf_token) |
| |
| if success: |
| success_count += 1 |
| else: |
| failed_count += 1 |
| |
| |
| print("\n" + "=" * 60) |
| print("处理完成") |
| print("=" * 60) |
| print(f"成功: {success_count}") |
| print(f"失败: {failed_count}") |
| print(f"总计: {total_to_process}") |
| print("=" * 60) |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|
|
|