coder-demo / app.py
AhmadA82's picture
update-to-qwin3
8ca6a4f verified
raw
history blame
28.4 kB
# app.py
import os
import json
import hashlib
import logging
import threading
from pathlib import Path
from typing import List, Dict, Any, Tuple
import numpy as np
import faiss
import pickle
import ast as python_ast
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from sentence_transformers import SentenceTransformer
from huggingface_hub import hf_hub_download
from monitor import get_current_metrics, start_monitoring_thread
from memory import get_history, save_history
# =========================
# إعداد السجلّات
# =========================
logging.basicConfig(
level=logging.INFO,
format="🪵 [%(asctime)s] [%(levelname)s] %(message)s"
)
logger = logging.getLogger("app")
# =========================
# ثوابت ومسارات
# =========================
DATA_DIR = Path("data")
CACHE_DIR = DATA_DIR / "cache"
INDEX_DIR = DATA_DIR / "index"
FILES_DIR = DATA_DIR / "files" # تخزين النص الكامل لكل ملف
REPORT_FILE = DATA_DIR / "analysis_report.md"
GRAPH_FILE = DATA_DIR / "code_graph.json"
EMB_FILE = INDEX_DIR / "embeddings.faiss"
META_FILE = INDEX_DIR / "chunks.pkl"
HASH_MAP_FILE = INDEX_DIR / "hash_map.json"
for p in [DATA_DIR, CACHE_DIR, INDEX_DIR, FILES_DIR]:
p.mkdir(parents=True, exist_ok=True)
# Env
HF_TOKEN = os.getenv("HF_TOKEN", "")
MODEL_REPO = os.getenv("MODEL_REPO", "Qwen/Qwen3-8B-Instruct")
# GGUF المحلي (إن توفر)
LOCAL_GGUF_REPO = os.getenv("LOCAL_GGUF_REPO", "Triangle104/Qwen3-8B-Q4_K_M-GGUF")
LOCAL_GGUF_FILE = os.getenv("LOCAL_GGUF_FILE", "qwen3-8b-q4_k_m.gguf")
LOCAL_GGUF_PATH = CACHE_DIR / LOCAL_GGUF_FILE
# تضمين
EMBED_MODEL_NAME = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2")
EMBED_DIM = int(os.getenv("EMBED_DIM", "384"))
# تقسيم الشيفرة
CHUNK_STEP = int(os.getenv("CHUNK_STEP", "40")) # ✅ قابل للتهيئة
MAX_FILE_BYTES = int(os.getenv("MAX_FILE_BYTES", str(10 * 1024 * 1024))) # 10MB احتياطيًا
SYSTEM_PROMPT = """<|im_start|>system
You are a senior AI code analyst. Analyze projects with hybrid indexing (code graph + retrieval).
Return structured, accurate, concise answers. Use Arabic + English labels in the final report.
<|im_end|>"""
# =========================
# الحالة العالمية والقفل
# =========================
embed_model: SentenceTransformer | None = None
faiss_index: faiss.Index | None = None
all_chunks: List[Tuple[str, str]] = [] # (file_name, chunk_text)
code_graph: Dict[str, Any] = {"files": {}}
hash_map: Dict[str, str] = {}
index_lock = threading.RLock() # ✅ لتأمين الفهرسة/الاسترجاع
# =========================
# LLM (محلي/سحابي)
# =========================
try:
from llama_cpp import Llama
except Exception:
Llama = None
llm = None # كائن النموذج المحلي إن توفر
def load_local_model_if_configured():
"""تحميل GGUF محليًا إن كان مفعّلًا."""
global llm
if Llama is None:
logger.info("ℹ️ llama_cpp غير متوفر. سيتم الاعتماد على HF Inference عند الحاجة.")
return
if not LOCAL_GGUF_PATH.exists():
try:
logger.info(f"⬇️ تنزيل GGUF: {LOCAL_GGUF_REPO}/{LOCAL_GGUF_FILE}")
hf_hub_download(
repo_id=LOCAL_GGUF_REPO,
filename=LOCAL_GGUF_FILE,
local_dir=str(CACHE_DIR),
token=HF_TOKEN or None
)
except Exception as e:
logger.warning(f"⚠️ تعذر تنزيل GGUF: {e}. سيتجاهل التحميل المحلي.")
return
try:
llm = Llama(
model_path=str(LOCAL_GGUF_PATH),
n_ctx=int(os.getenv("N_CTX", "32768")),
rope_scaling={"type": "yarn", "factor": 4.0},
n_threads=int(os.getenv("N_THREADS", "2")),
n_gpu_layers=int(os.getenv("N_GPU_LAYERS", "0")),
n_batch=int(os.getenv("N_BATCH", "64")),
use_mlock=False,
verbose=False
)
logger.info("✅ تم تحميل النموذج المحلي (GGUF).")
except Exception as e:
llm = None
logger.warning(f"⚠️ فشل تحميل النموذج المحلي: {e}")
def call_local_llm(prompt: str, max_tokens: int = 800) -> str:
if llm is None or Llama is None:
return ""
try:
res = llm(
prompt,
max_tokens=max_tokens,
temperature=0.4,
top_p=0.9,
stop=["<|im_end|>", "<|im_start|>"],
echo=False
)
return res["choices"][0]["text"].strip()
except Exception as e:
logger.warning(f"⚠️ local LLM call failed: {e}")
return ""
def call_hf_inference(prompt: str, max_new_tokens: int = 900) -> str:
import requests
if not HF_TOKEN:
raise RuntimeError("HF_TOKEN is not set and local LLM unavailable")
url = f"https://api-inference.huggingface.co/models/{MODEL_REPO}"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": max_new_tokens,
"temperature": 0.4,
"top_p": 0.9,
"return_full_text": False
}
}
r = requests.post(url, headers=headers, json=payload, timeout=120)
r.raise_for_status()
data = r.json()
if isinstance(data, list) and data and "generated_text" in data[0]:
return data[0]["generated_text"]
if isinstance(data, dict) and "generated_text" in data:
return data["generated_text"]
if isinstance(data, dict) and "error" in data:
raise RuntimeError(data["error"])
return json.dumps(data)
def call_llm(prompt: str, max_tokens: int = 900) -> str:
out = call_local_llm(prompt, max_tokens)
if out:
return out
return call_hf_inference(prompt, max_tokens)
# =========================
# أدوات التضمين والفهرسة
# =========================
def sha256_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def init_embed():
"""تهيئة نموذج التضمين والقراءات من القرص."""
global embed_model, faiss_index, all_chunks, hash_map, code_graph
embed_model = SentenceTransformer(EMBED_MODEL_NAME)
if EMB_FILE.exists() and META_FILE.exists():
try:
faiss_index = faiss.read_index(str(EMB_FILE))
with open(META_FILE, "rb") as f:
all_chunks = pickle.load(f)
logger.info(f"✅ تم تحميل الفهرس ({faiss_index.ntotal} متجه) من القرص.")
except Exception as e:
logger.warning(f"⚠️ تعذر تحميل الفهرس: {e}. إنشاء فهرس جديد.")
faiss_index = faiss.IndexFlatL2(EMBED_DIM)
all_chunks = []
else:
faiss_index = faiss.IndexFlatL2(EMBED_DIM)
all_chunks = []
if HASH_MAP_FILE.exists():
try:
hash_map = json.loads(HASH_MAP_FILE.read_text(encoding="utf-8"))
except Exception:
hash_map = {}
else:
hash_map = {}
if GRAPH_FILE.exists():
try:
code_graph = json.loads(GRAPH_FILE.read_text(encoding="utf-8"))
except Exception:
code_graph = {"files": {}}
else:
code_graph = {"files": {}}
def chunk_code_structured(code: str) -> List[str]:
"""قسّم الشيفرة إلى كتل حسب AST؛ وإن فشل فاقطع أسطريًا."""
try:
tree = python_ast.parse(code)
chunks: List[str] = []
lines = code.splitlines()
for node in tree.body:
s = max(getattr(node, "lineno", 1) - 1, 0)
e = getattr(node, "end_lineno", s + 1)
e = max(e, s + 1)
chunks.append("\n".join(lines[s:e]))
if chunks:
return chunks
except Exception as e:
# ✅ سجل نوع الاستثناء والموقع إن توفر
msg = f"{type(e).__name__}: {e}"
logger.debug(f"AST parse failed; fallback to line-based. Reason: {msg}")
step = CHUNK_STEP # ✅ من ENV
lines = code.splitlines()
return ["\n".join(lines[i:i + step]) for i in range(0, len(lines), step)]
def parse_code_meta(file_name: str, code: str) -> Dict[str, Any]:
"""استخرج رموز/استيرادات/نداءات لملف."""
meta = {"hash": sha256_text(code), "symbols": [], "calls": [], "imports": []}
try:
tree = python_ast.parse(code)
for node in python_ast.walk(tree):
if isinstance(node, python_ast.FunctionDef):
meta["symbols"].append({"name": node.name, "kind": "function", "line": node.lineno})
elif isinstance(node, python_ast.ClassDef):
meta["symbols"].append({"name": node.name, "kind": "class", "line": node.lineno})
elif isinstance(node, python_ast.Assign):
for t in getattr(node, "targets", []):
if isinstance(t, python_ast.Name):
meta["symbols"].append({"name": t.id, "kind": "variable", "line": node.lineno})
elif isinstance(node, python_ast.Import):
for alias in node.names:
meta["imports"].append(alias.name or "")
elif isinstance(node, python_ast.ImportFrom):
meta["imports"].append(node.module or "")
elif isinstance(node, python_ast.Call):
f = node.func
if hasattr(f, "id"):
meta["calls"].append(getattr(f, "id", ""))
elif hasattr(f, "attr"):
meta["calls"].append(getattr(f, "attr", ""))
except Exception as e:
logger.debug(f"parse_code_meta failed for {file_name}: {type(e).__name__}: {e}")
return meta
def reconstruct_all_vectors() -> np.ndarray:
"""إعادة بناء جميع المتجهات من الفهرس."""
if faiss_index is None or faiss_index.ntotal == 0:
return np.array([], dtype=np.float32)
xs = [faiss_index.reconstruct(i) for i in range(faiss_index.ntotal)]
return np.array(xs, dtype=np.float32) if xs else np.array([], dtype=np.float32)
def persist_index():
"""حفظ الفهرس والميتا والخرائط."""
faiss.write_index(faiss_index, str(EMB_FILE))
with open(META_FILE, "wb") as f:
pickle.dump(all_chunks, f)
GRAPH_FILE.write_text(json.dumps(code_graph, ensure_ascii=False, indent=2), encoding="utf-8")
HASH_MAP_FILE.write_text(json.dumps(hash_map, ensure_ascii=False, indent=2), encoding="utf-8")
def upsert_file_to_index(file_name: str, content: str):
"""إدراج/تحديث ملف داخل الفهرس والميتا والحفظ على القرص."""
global faiss_index, all_chunks, code_graph, hash_map
# ✅ حفظ نسخة أحدث على HF (القرص المحلي)
file_path = FILES_DIR / file_name
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding="utf-8")
content_hash = sha256_text(content)
prev_hash = hash_map.get(file_name)
if prev_hash == content_hash:
return
chunks = chunk_code_structured(content)
embeds = embed_model.encode(chunks, normalize_embeddings=True)
with index_lock:
# إعادة الدمج (سهل وآمن)
old_vecs = reconstruct_all_vectors()
new_vecs = np.array(embeds, dtype=np.float32)
merged = new_vecs if old_vecs.size == 0 else np.vstack([old_vecs, new_vecs])
faiss_index = faiss.IndexFlatL2(EMBED_DIM)
faiss_index.add(merged)
all_chunks.extend([(file_name, c) for c in chunks])
code_graph["files"][file_name] = parse_code_meta(file_name, content)
hash_map[file_name] = content_hash
persist_index()
def rebuild_index_from_files():
"""إعادة بناء الفهرس بالكامل من محتويات data/files/."""
global faiss_index, all_chunks, code_graph, hash_map
with index_lock:
faiss_index = faiss.IndexFlatL2(EMBED_DIM)
all_chunks = []
code_graph = {"files": {}}
hash_map = {}
for p in sorted(FILES_DIR.rglob("*")):
if not p.is_file():
continue
try:
content = p.read_text(encoding="utf-8")
except Exception:
# تجاهل الملفات الثنائية/غير النصية
continue
fname = str(p.relative_to(FILES_DIR)).replace("\\", "/")
chunks = chunk_code_structured(content)
if not chunks:
continue
embeds = embed_model.encode(chunks, normalize_embeddings=True)
vecs = np.array(embeds, dtype=np.float32)
if vecs.size:
faiss_index.add(vecs)
all_chunks.extend([(fname, c) for c in chunks])
code_graph["files"][fname] = parse_code_meta(fname, content)
hash_map[fname] = sha256_text(content)
persist_index()
def retrieve(query: str, k: int = 8) -> List[Tuple[str, str, float]]:
"""استرجاع أفضل k كتل للسياق."""
if faiss_index is None or faiss_index.ntotal == 0 or embed_model is None:
return []
q = embed_model.encode([query], normalize_embeddings=True)
D, I = faiss_index.search(np.array(q, dtype=np.float32), k)
out: List[Tuple[str, str, float]] = []
for score, idx in zip(D[0], I[0]):
if idx < 0 or idx >= len(all_chunks):
continue
file_name, chunk = all_chunks[idx]
# تعزيز بسيط لو ظهر import/call من الاستعلام
boost = 1.0
meta = code_graph["files"].get(file_name, {})
imports = set(meta.get("imports", []))
calls = set(meta.get("calls", []))
if any(tok in query for tok in (list(imports) + list(calls))):
boost = 0.9
out.append((file_name, chunk, float(score) * boost))
out.sort(key=lambda x: x[2])
return out[:k]
def render_graph_overview(limit: int = 100) -> str:
lines = []
files = list(code_graph.get("files", {}).items())[:limit]
for fname, meta in files:
syms = ", ".join([f"{s.get('kind')}:{s.get('name')}" for s in meta.get("symbols", [])][:8])
imps = ", ".join(meta.get("imports", [])[:6])
cls = ", ".join(meta.get("calls", [])[:8])
lines.append(f"- File: {fname}\n Symbols: {syms}\n Imports: {imps}\n Calls: {cls}")
return "\n".join(lines)
def build_chat_prompt(history: List[List[str]], user_msg: str, extra: str = "") -> str:
msgs = [("system", SYSTEM_PROMPT)]
for u, a in history[-8:]:
msgs.append(("user", u))
msgs.append(("assistant", a))
msgs.append(("user", (user_msg or "") + ("\n" + extra if extra else "")))
out = []
for role, content in msgs:
if role == "system":
out.append(f"<|im_start|>system\n{content}<|im_end|>")
elif role == "user":
out.append(f"<|im_start|>user\n{content}<|im_end|>")
else:
out.append(f"<|im_start|>assistant\n{content}<|im_end|>")
out.append("<|im_start|>assistant\n")
return "\n".join(out)
def build_analysis_prompt(query: str, retrieved_docs: List[Tuple[str, str, float]]) -> str:
graph_overview = render_graph_overview(120)
ctx = []
for i, (fname, chunk, score) in enumerate(retrieved_docs, 1):
ctx.append(f"### Source {i}\n[File] {fname}\n[Score] {score:.4f}\n```\n{chunk}\n```")
context_block = "\n\n".join(ctx)
instructions = (
"المطلوب: تحليل الملفات المسترجعة مع السياق التالي لإنتاج تقرير تحليلي شامل يشمل:\n"
"1) التسلسل المنطقي (Logical Sequence) وخطوات التنفيذ\n"
"2) التسلسل الوظيفي (Functional Flow) والمخطط التدفق النصي (Flow Outline)\n"
"3) التبعيات بين الملفات (Dependencies) والاستدعاءات (Call Relations)\n"
"4) العلاقات بين الملفات والمتغيرات العامة ومكان تعريفها (Global Vars Map)\n"
"5) تحديد نقاط الضعف المحتملة (Logic/Security/Performance) إن وجدت\n"
"6) توصيات اصلاح عملية\n"
"صيغة المخرجات: Markdown منظم بعناوين عربية + English labels:\n"
"## نظرة عامة / Overview\n"
"## خريطة التبعيات / Dependency Map\n"
"## المخطط التدفق / Flow Outline\n"
"## تحليل منطقي ووظيفي / Logical & Functional Analysis\n"
"## المتغيرات العامة / Global Variables\n"
"## مشاكل محتملة / Potential Issues\n"
"## توصيات / Recommendations"
)
user = f"سؤال التحليل: {query}\n\n[Graph Overview]\n{graph_overview}\n\n[Retrieved Context]\n{context_block}"
prompt = (
f"<|im_start|>system\n{SYSTEM_PROMPT}\n<|im_end|>\n"
f"<|im_start|>user\n{instructions}\n\n{user}\n<|im_end|>\n"
f"<|im_start|>assistant\n"
)
return prompt
# =========================
# نماذج الطلب/الاستجابة
# =========================
class ChatRequest(BaseModel):
session_id: str
message: str
class ChatResponse(BaseModel):
response: str
updated_history: list[list[str]]
class AnalyzeRequest(BaseModel):
files: dict[str, str] # name -> content
class AnalyzeAndReportRequest(BaseModel):
session_id: str
query: str
top_k: int | None = 10
class DiffRequest(BaseModel):
modified: dict[str, str] = {} # filename -> full new content
deleted: list[str] = []
# =========================
# (اختياري) هوكس Drive/GitHub
# =========================
def maybe_upload_to_drive(local_path: Path):
"""هوك اختياري: ارفع نسخة إلى جوجل درايف إن كان الاتصال مهيّأً."""
# اتركها فارغة الآن حتى تضيف اعتماد Google Drive.
# يمكن لاحقًا قراءة ENV مثل DRIVE_ENABLED=1 و OAuth كشفيرة.
pass
def maybe_commit_to_github(local_path: Path, message: str = "auto: update file"):
"""هوك اختياري: قم بكومِت/دَفع تلقائي إن كان الاتصال مهيّأً."""
# اتركها فارغة الآن لحين ربط GitHub عبر توكن/Repo.
pass
# =========================
# عمليات التحليل/التقرير
# =========================
def analyze_and_report_internal(session_id: str, query: str, k: int = 10) -> str:
retrieved_docs = retrieve(query, k=k)
if not retrieved_docs:
raise HTTPException(status_code=400, detail="لا توجد بيانات مفهرسة بعد. استخدم /analyze-files أولًا.")
prompt = build_analysis_prompt(query, retrieved_docs)
report = call_llm(prompt, max_tokens=1400)
REPORT_FILE.write_text(report, encoding="utf-8")
history = get_history(session_id)
updated = (history + [[f"[ANALYZE] {query}", report]])[-8:]
save_history(session_id, updated)
return report
# =========================
# تطبيق FastAPI
# =========================
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"]
)
@app.on_event("startup")
async def on_startup():
load_local_model_if_configured()
start_monitoring_thread()
init_embed()
logger.info("🚀 التطبيق جاهز.")
@app.get("/")
def root():
return {"message": "الخادم يعمل", "status": "ok"}
@app.get("/model-status")
def model_status():
status = "local_loaded" if llm else ("hf_ready" if HF_TOKEN else "no_model")
return {"status": status, "repo": MODEL_REPO, "local": str(LOCAL_GGUF_PATH) if llm else None}
@app.get("/metrics")
def read_metrics():
return get_current_metrics()
@app.get("/monitor-log")
def read_monitor_log():
log_path = DATA_DIR / "monitor.log"
if not log_path.exists():
log_path.touch()
return {"log": log_path.read_text(encoding="utf-8")}
@app.post("/analyze-files")
def analyze_files(req: AnalyzeRequest):
# ✅ حفظ واستبدال الأحدث محليًا + فهرسة
total_bytes = 0
for fname, content in req.files.items():
total_bytes += len(content.encode("utf-8", errors="ignore"))
if MAX_FILE_BYTES and len(content.encode("utf-8", errors="ignore")) > MAX_FILE_BYTES:
raise HTTPException(status_code=413, detail=f"الملف {fname} يتجاوز الحجم المسموح.")
for fname, content in req.files.items():
upsert_file_to_index(fname, content)
# اختياري: رفع نسخة للأرشفة السحابية
try:
maybe_upload_to_drive(FILES_DIR / fname)
except Exception as e:
logger.warning(f"Drive upload skipped for {fname}: {e}")
try:
maybe_commit_to_github(FILES_DIR / fname, "auto: analyze & save")
except Exception as e:
logger.warning(f"GitHub commit skipped for {fname}: {e}")
return {"status": "Files analyzed and cached", "files_indexed": list(req.files.keys())}
@app.post("/diff-files")
def diff_files(req: DiffRequest):
"""تطبيق تعديلات Git (modified/deleted) مع إعادة بناء نظيفة للفهرس."""
# 1) حذف الملفات المطلوبة (من القرص ومن الخرائط)
for fname in req.deleted:
try:
# حذف من القرص
fp = FILES_DIR / fname
if fp.exists():
fp.unlink()
except Exception as e:
logger.warning(f"⚠️ تعذر حذف {fname} من القرص: {e}")
# تنظيف الخرائط
hash_map.pop(fname, None)
code_graph.get("files", {}).pop(fname, None)
# 2) كتابة/تحديث الملفات المعدّلة على القرص
for fname, content in req.modified.items():
if MAX_FILE_BYTES and len(content.encode("utf-8", errors="ignore")) > MAX_FILE_BYTES:
raise HTTPException(status_code=413, detail=f"الملف {fname} يتجاوز الحجم المسموح.")
fp = FILES_DIR / fname
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content, encoding="utf-8")
# 3) إعادة بناء الفهرس بالكامل من الملفات الحالية
rebuild_index_from_files()
# 4) (اختياري) رفع النسخ المعدلة للأرشفة
for fname in req.modified.keys():
try:
maybe_upload_to_drive(FILES_DIR / fname)
except Exception as e:
logger.warning(f"Drive upload skipped for {fname}: {e}")
try:
maybe_commit_to_github(FILES_DIR / fname, "auto: diff-files update")
except Exception as e:
logger.warning(f"GitHub commit skipped for {fname}: {e}")
return {
"status": "ok",
"deleted": req.deleted,
"modified": list(req.modified.keys()),
"total_index_vectors": int(faiss_index.ntotal) if faiss_index else 0
}
@app.post("/analyze-and-report")
def analyze_and_report(req: AnalyzeAndReportRequest):
report = analyze_and_report_internal(req.session_id, req.query, k=req.top_k or 10)
return {"status": "ok", "report_path": str(REPORT_FILE), "preview": report[:1200]}
def classify_intent(history: List[List[str]], message: str) -> Dict[str, Any]:
inst = (
"أعد JSON فقط دون أي نص آخر.\n"
"المفاتيح: intent (string), confidence (0-1), action (RETRIEVE_ONLY|ANALYZE_AND_REPORT|TRACE_SUBSET|NONE), "
"targets (list of strings), reason (string).\n"
"أمثلة:\n"
"س: ما عمل الملف X؟ → {\"intent\":\"ASK_FILE_ROLE\",\"confidence\":0.9,\"action\":\"RETRIEVE_ONLY\",\"targets\":[\"X\"],\"reason\":\"...\"}\n"
"س: لماذا لا تعمل ميزة الدخول؟ → {\"intent\":\"WHY_FEATURE_NOT_WORKING\",\"confidence\":0.85,\"action\":\"ANALYZE_AND_REPORT\",\"targets\":[],\"reason\":\"...\"}\n"
"س: اين يُعرّف المتغير TOKEN وكيف يتغير؟ → {\"intent\":\"CODE_FLOW_TRACE\",\"confidence\":0.8,\"action\":\"TRACE_SUBSET\",\"targets\":[\"TOKEN\"],\"reason\":\"...\"}\n"
)
p = (
f"<|im_start|>system\n{SYSTEM_PROMPT}\n<|im_end|>\n"
f"<|im_start|>user\n{inst}\nالسؤال: {message}\nأعد JSON فقط.\n<|im_end|>\n"
f"<|im_start|>assistant\n"
)
txt = call_llm(p, max_tokens=200)
try:
start = txt.find("{")
end = txt.rfind("}")
obj = json.loads(txt[start:end+1]) if start != -1 and end != -1 else {}
except Exception:
obj = {}
if not isinstance(obj, dict):
obj = {}
obj.setdefault("intent", "UNKNOWN")
obj.setdefault("confidence", 0.0)
obj.setdefault("action", "NONE")
obj.setdefault("targets", [])
obj.setdefault("reason", "")
return obj
@app.post("/chat", response_model=ChatResponse)
def chat(req: ChatRequest):
history = get_history(req.session_id)
decision = classify_intent(history, req.message)
action = decision.get("action", "NONE")
response_text = ""
if action == "ANALYZE_AND_REPORT":
try:
report = analyze_and_report_internal(req.session_id, req.message, k=10)
response_text = "تم إنشاء تقرير تحليلي:\n\n" + report
except Exception as e:
raise HTTPException(status_code=500, detail=f"LLM error: {str(e)}")
elif action == "RETRIEVE_ONLY":
retrieved_docs = retrieve(req.message, k=6)
ctx = []
for fname, chunk, score in retrieved_docs:
ctx.append(f"From {fname} (score={score:.4f}):\n{chunk}")
extra = "\n\n[Context]\n" + "\n\n".join(ctx) + "\n\n" + render_graph_overview(60)
prompt = build_chat_prompt(history, req.message, extra)
try:
response_text = call_llm(prompt, max_tokens=700)
except Exception as e:
raise HTTPException(status_code=500, detail=f"LLM error: {str(e)}")
elif action == "TRACE_SUBSET":
targets = decision.get("targets", [])
key = " ".join(targets) if targets else req.message
retrieved_docs = retrieve(key, k=10)
ctx = []
for fname, chunk, score in retrieved_docs:
ctx.append(f"From {fname} (score={score:.4f}):\n{chunk}")
flow_query = req.message + "\nPlease trace variables/functions: " + ", ".join(targets)
prompt = build_analysis_prompt(flow_query, retrieved_docs)
try:
trace_report = call_llm(prompt, max_tokens=1200)
REPORT_FILE.write_text(trace_report, encoding="utf-8")
response_text = "تقرير التتبع:\n\n" + trace_report
except Exception as e:
raise HTTPException(status_code=500, detail=f"LLM error: {str(e)}")
else:
prompt = build_chat_prompt(history, req.message, "")
try:
response_text = call_llm(prompt, max_tokens=600)
except Exception as e:
raise HTTPException(status_code=500, detail=f"LLM error: {str(e)}")
updated = (history + [[req.message, response_text]])[-8:]
save_history(req.session_id, updated)
return ChatResponse(response=response_text, updated_history=updated)