SunDou's picture
Upload data3/main.py with huggingface_hub
0b75f52 verified
import os
import subprocess
import csv
import shutil
import threading
import logging
import signal
import sys
from pathlib import Path
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
# ================= 配置区域 =================
# OpenAI API Key
OPENAI_API_KEY = "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"
# 源文件夹路径
SOURCE_REPOS_DIR = Path("/home/weifengsun/tangou1/domain_code/src/workdir/repos_raw").resolve()
# 基础输出路径
BASE_OUTPUT_DIR = Path("~/chemrepo").expanduser().resolve()
# 全局失败日志路径
GLOBAL_ERROR_LOG = BASE_OUTPUT_DIR / "failures.log"
# CSV 记录路径
CSV_FILE = BASE_OUTPUT_DIR / "run.csv"
# 并发数量
MAX_WORKERS = 256
# ===========================================
# 设置环境变量
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
# 确保基础输出目录存在
BASE_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# --- 全局锁与状态追踪 ---
# 用于写入 failures.log 的锁
error_log_lock = threading.Lock()
# 用于追踪当前正在处理的项目(用于中断时清理)
active_projects = set()
active_projects_lock = threading.Lock()
def add_active_project(name):
with active_projects_lock:
active_projects.add(name)
def remove_active_project(name):
with active_projects_lock:
active_projects.discard(name)
def log_failure_globally(project_name, content, extra_info=""):
"""将失败信息写入全局日志"""
with error_log_lock:
with open(GLOBAL_ERROR_LOG, "a", encoding="utf-8") as g_log:
g_log.write(f"\n{'='*40}\n")
g_log.write(f"PROJECT: {project_name}\n")
g_log.write(f"TIME: {datetime.now()}\n")
g_log.write(f"STATUS: Failed/Interrupted\n")
g_log.write(f"{'='*40}\n")
g_log.write(content)
if extra_info:
g_log.write(f"\n[Details]: {extra_info}\n")
g_log.write(f"\n{'='*40}\n")
def cleanup_project_folder(project_name):
"""删除项目输出文件夹"""
project_out_dir = BASE_OUTPUT_DIR / project_name
if project_out_dir.exists():
try:
shutil.rmtree(project_out_dir)
print(f"🗑️ Deleted failed/interrupted directory: {project_out_dir}")
except OSError as e:
print(f"⚠️ Failed to delete directory {project_out_dir}: {e}")
def process_single_project(project_path):
"""
处理单个项目文件夹的任务函数
"""
project_name = project_path.name
start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 定义输出路径
project_out_dir = BASE_OUTPUT_DIR / project_name
hp_dir = project_out_dir / "hp"
mdp_dir = project_out_dir / "mdp"
local_log_file = project_out_dir / "process.log"
# --- 1. 检查输出文件夹是否存在 (断点续传) ---
# 如果 hp 和 mdp 存在,且 mdp 不为空,才算跳过;如果为空,重新跑一遍可能也没意义,
# 但根据逻辑这里只要文件夹在就跳过。如果你想重试空项目,可以把这里改一下。
if hp_dir.exists() and mdp_dir.exists():
return {
"project": project_name,
"status": "Skipped",
"start_time": start_time,
"end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# 标记为活跃项目
add_active_project(project_name)
# 创建项目主目录
project_out_dir.mkdir(parents=True, exist_ok=True)
status = "Failed"
python_error = None
# 使用 try-finally 确保即使线程崩溃也能从 active 列表移除
try:
with open(local_log_file, "w", encoding="utf-8") as log_f:
try:
log_f.write(f"[{datetime.now()}] Processing project: {project_name}\n")
# --- 2. 确保 .gitignore 存在 ---
gitignore_path = project_path / ".gitignore"
if not gitignore_path.exists():
gitignore_path.touch()
log_f.write(f"[{datetime.now()}] Created .gitignore file.\n")
# --- 3. 构建命令 ---
cmd = [
"repoagent", "run",
"-m", "gpt-5.1-2025-11-13",
"-r", "1",
"-tp", str(project_path.absolute()),
"--print-hierarchy",
"-hp", str(hp_dir),
"-mdp", str(mdp_dir)
]
log_f.write(f"[{datetime.now()}] Command: {' '.join(cmd)}\n")
log_f.write(f"[{datetime.now()}] Starting RepoAgent...\n")
log_f.flush()
# --- 4. 执行命令 ---
subprocess.run(cmd, stdout=log_f, stderr=subprocess.STDOUT, check=True)
# --- 5. 检查是否生成了文档 (新增逻辑) ---
has_docs = False
if mdp_dir.exists():
# 检查目录下是否有任何文件
if any(mdp_dir.iterdir()):
has_docs = True
if has_docs:
status = "Success"
log_f.write(f"\n[{datetime.now()}] Completed successfully.\n")
else:
status = "EmptyProject"
log_f.write(f"\n[{datetime.now()}] Finished, but mdp folder is EMPTY. Marked as EmptyProject.\n")
except Exception as e:
status = "Failed"
python_error = str(e)
try: log_f.write(f"\n[{datetime.now()}] ERROR: {python_error}\n")
except: pass
print(f"❌ Error processing {project_name}: {python_error}")
# --- 6. 失败处理逻辑 ---
if status == "Failed":
# 读取日志内容
failed_log_content = ""
if local_log_file.exists():
try:
with open(local_log_file, "r", encoding="utf-8", errors='ignore') as f:
failed_log_content = f.read()
except: failed_log_content = "Read Error"
# 写入全局日志
log_failure_globally(project_name, failed_log_content, python_error)
# 删除文件夹
cleanup_project_folder(project_name)
except Exception:
# 兜底捕获
pass
finally:
remove_active_project(project_name)
return {
"project": project_name,
"status": status,
"start_time": start_time,
"end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
def main():
if not SOURCE_REPOS_DIR.exists():
print(f"Error: Source directory {SOURCE_REPOS_DIR} does not exist.")
return
# CSV 头部
csv_headers = ["project", "status", "start_time", "end_time"]
# 初始化 CSV
file_exists = CSV_FILE.exists()
with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=csv_headers)
if not file_exists:
writer.writeheader()
# --- 1. 获取项目并按首字母排序 ---
projects = sorted([p for p in SOURCE_REPOS_DIR.iterdir() if p.is_dir()], key=lambda x: x.name)
print(f"Found {len(projects)} projects (Sorted A-Z).\nOutput Dir: {BASE_OUTPUT_DIR}")
print(f"Failures Log: {GLOBAL_ERROR_LOG}")
print(f"Starting concurrent processing with {MAX_WORKERS} workers...\n")
print(f"💡 Press Ctrl+C to stop. Interrupted projects will be cleaned up automatically.\n")
executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
try:
future_to_project = {executor.submit(process_single_project, p): p for p in projects}
with open(CSV_FILE, mode='a', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=csv_headers)
for future in as_completed(future_to_project):
result = future.result()
writer.writerow(result)
f.flush()
# 控制台输出增加 EmptyProject 的显示
if result["status"] == "Success":
print(f"✅ {result['project']} Finished.")
elif result["status"] == "EmptyProject":
print(f"⚠️ {result['project']} Finished (Empty - No Docs Generated).")
elif result["status"] == "Skipped":
print(f"⏭️ {result['project']} Skipped.")
else:
print(f"❌ {result['project']} Failed.")
except KeyboardInterrupt:
print("\n\n🛑 KeyboardInterrupt detected! Stopping workers...")
executor.shutdown(wait=False, cancel_futures=True)
print("🧹 Cleaning up active incomplete projects...")
with active_projects_lock:
projects_to_clean = list(active_projects)
for proj_name in projects_to_clean:
log_failure_globally(proj_name, "Process terminated by User (KeyboardInterrupt).")
cleanup_project_folder(proj_name)
print("Done. Exiting.")
sys.exit(0)
print(f"\nAll tasks completed. \nCSV: {CSV_FILE}")
if __name__ == "__main__":
main()