| from fastapi import APIRouter, Depends, HTTPException, status, Request |
| from pydantic import BaseModel |
| from typing import List, Dict, Any, Optional |
| from celery.result import AsyncResult |
| from redis import Redis |
|
|
| from .auth import get_current_user, User |
| from ..config import settings |
| from ..core.neo4j_store import Neo4jStore |
| from ..workers.celery_worker import celery_app |
|
|
| router = APIRouter(prefix="/api/admin", tags=["Admin Dashboard"]) |
|
|
| |
| |
| |
| |
| def get_graph_store(request: Request) -> Neo4jStore: |
| """Return the app-level shared Neo4jStore (set during startup).""" |
| store: Optional[Neo4jStore] = getattr(request.app.state, "graph_store", None) |
| if store is None: |
| raise HTTPException( |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, |
| detail="Graph store not initialised yet.", |
| ) |
| return store |
|
|
|
|
| |
| |
| def check_admin_scope(user: User = Depends(get_current_user)): |
| """Dependency to check if user has admin scope""" |
| if "admin" not in user.scopes: |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="Admin access required - Not enough permissions" |
| ) |
| return user |
|
|
| class SystemConfig(BaseModel): |
| llm_provider: str |
| embedding_provider: str |
| chunk_size: int |
| workers_online: int |
|
|
| class TaskDashboardResponse(BaseModel): |
| active_tasks: int |
| pending_tasks: List[Dict[str, Any]] |
| failed_tasks: List[Dict[str, Any]] |
|
|
| @router.get("/stats", summary="Get global admin statistics") |
| async def get_admin_stats( |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store), |
| ): |
| """Get system-wide stats like document counts, node sizes, LLM costs (mocked for now)""" |
| try: |
| |
| nodes_q = "MATCH (n) RETURN count(n) as count" |
| nodes_res = await store.execute_query(nodes_q) |
| nodes_count = nodes_res[0]["count"] if nodes_res else 0 |
| |
| edges_q = "MATCH ()-[r]->() RETURN count(r) as count" |
| edges_res = await store.execute_query(edges_q) |
| edges_count = edges_res[0]["count"] if edges_res else 0 |
|
|
| |
| estimated_cost_usd = 2.45 |
| |
| return { |
| "graph": { |
| "nodes": nodes_count, |
| "relationships": edges_count |
| }, |
| "costs": { |
| "total_estimated_usd": estimated_cost_usd, |
| "tokens_processed": 145000 |
| }, |
| "system": { |
| "provider": settings.default_llm_provider, |
| "environment": settings.environment |
| } |
| } |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=str(exc)) |
|
|
| @router.get("/tasks", summary="Get pending and active celery tasks") |
| async def get_tasks(admin_user: User = Depends(check_admin_scope)): |
| """Fetch all tasks from workers (integration with Flower/Celery events)""" |
| |
| i = celery_app.control.inspect() |
| active = i.active() or {} |
| reserved = i.reserved() or {} |
| |
| active_list = [] |
| for worker, tasks in active.items(): |
| active_list.extend([{"worker": worker, "id": t["id"], "name": t["name"]} for t in tasks]) |
| |
| reserved_list = [] |
| for worker, tasks in reserved.items(): |
| reserved_list.extend([{"worker": worker, "id": t["id"], "name": t["name"]} for t in tasks]) |
|
|
| return TaskDashboardResponse( |
| active_tasks=len(active_list), |
| pending_tasks=reserved_list, |
| failed_tasks=[] |
| ) |
|
|
| @router.post("/config", summary="Update system LLM configuration live") |
| async def update_config(config: SystemConfig, admin_user: User = Depends(check_admin_scope)): |
| """Dynamically update configurations - requires restart logic usually, but here we mock DB save""" |
| |
| settings.default_llm_provider = config.llm_provider |
| return {"status": "success", "message": f"Updated config to use {config.llm_provider}"} |
|
|
| @router.get("/entities/review", summary="Get entities flagged for human review") |
| async def get_review_queue( |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store), |
| ): |
| """Fetch entities that resolved between 0.85-0.95 confidence""" |
| |
| |
| query = "MATCH (e:Entity) WHERE e.needs_review = true RETURN e.id as id, e.name as name LIMIT 50" |
| res = await store.execute_query(query) |
| return {"queue": res} |
|
|
| @router.post("/entities/merge", summary="Force merge two entities") |
| async def force_merge_entities( |
| source_id: str, |
| target_id: str, |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store), |
| ): |
| """Admin override to merge two nodes""" |
| try: |
| import json |
| import re |
| |
| |
| props_q = "MATCH (e:Entity) WHERE e.id IN [$id1, $id2] RETURN e.id as id, e.properties as props" |
| props_res = await store.execute_query(props_q, {"id1": source_id, "id2": target_id}) |
| |
| props1, props2 = {}, {} |
| for r in props_res: |
| try: |
| p = json.loads(r["props"]) if isinstance(r["props"], str) else (r["props"] or {}) |
| except Exception: |
| p = {} |
| if r["id"] == source_id: |
| props1 = p |
| else: |
| props2 = p |
| |
| |
| merged_props = {**props2, **props1} |
| |
| |
| upd_q = "MATCH (e1:Entity {id: $id1}) SET e1.properties = $props" |
| await store.execute_query(upd_q, {"id1": source_id, "props": json.dumps(merged_props)}) |
| |
| |
| rel_q = """ |
| MATCH (e2:Entity {id: $id2})-[r]->(other) RETURN type(r) as t, properties(r) as p, id(other) as oid, 'out' as dir |
| UNION ALL |
| MATCH (other)-[r]->(e2:Entity {id: $id2}) RETURN type(r) as t, properties(r) as p, id(other) as oid, 'in' as dir |
| """ |
| rels = await store.execute_query(rel_q, {"id2": target_id}) |
| |
| for rel in rels: |
| t = rel["t"].upper().replace(" ", "_") |
| if not re.match(r'^[A-Z0-9_]+$', t): |
| t = "RELATED_TO" |
| |
| if rel["dir"] == "out": |
| merge_q = f"MATCH (e1:Entity {{id: $id1}}), (other) WHERE id(other) = $oid MERGE (e1)-[r:`{t}`]->(other) SET r = $p" |
| else: |
| merge_q = f"MATCH (e1:Entity {{id: $id1}}), (other) WHERE id(other) = $oid MERGE (other)-[r:`{t}`]->(e1) SET r = $p" |
| await store.execute_query(merge_q, {"id1": source_id, "oid": rel["oid"], "p": rel["p"]}) |
| |
| |
| del_q = "MATCH (e2:Entity {id: $id2}) DETACH DELETE e2" |
| await store.execute_query(del_q, {"id2": target_id}) |
| |
| return {"status": "merged", "result": source_id} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| |
|
|
| @router.get("/graph/nodes", summary="Search graph nodes for CRUD") |
| async def search_nodes( |
| query: str = "", |
| limit: int = 50, |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store), |
| ): |
| if query: |
| cypher = """ |
| MATCH (n) |
| WHERE any(prop in keys(n) WHERE toString(n[prop]) CONTAINS $query) OR head(labels(n)) CONTAINS $query |
| RETURN id(n) as id, labels(n) as labels, properties(n) as properties LIMIT $limit |
| """ |
| else: |
| cypher = "MATCH (n) RETURN id(n) as id, labels(n) as labels, properties(n) as properties LIMIT $limit" |
| |
| res = await store.execute_query(cypher, {"query": query, "limit": limit}) |
| return {"nodes": res} |
|
|
| @router.delete("/graph/nodes/{node_id}", summary="Delete a node and its edges") |
| async def delete_node( |
| node_id: int, |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store), |
| ): |
| |
| _DELETABLE_LABELS = {"Entity", "Chunk", "Document", "OntologyProposal", "Community"} |
| |
| label_q = "MATCH (n) WHERE id(n) = $node_id RETURN labels(n) as labels" |
| label_res = await store.execute_query(label_q, {"node_id": node_id}) |
| if not label_res: |
| raise HTTPException(status_code=404, detail=f"Node {node_id} not found.") |
| node_labels = set(label_res[0].get("labels", [])) |
| if not node_labels.intersection(_DELETABLE_LABELS): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail=f"Deletion of node with labels {node_labels} is not permitted.", |
| ) |
| cypher = "MATCH (n) WHERE id(n) = $node_id DETACH DELETE n" |
| await store.execute_query(cypher, {"node_id": node_id}) |
| return {"status": "success", "message": f"Node {node_id} deleted."} |
|
|
| |
|
|
| @router.get("/documents", summary="List all ingested documents") |
| async def list_documents( |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store), |
| ): |
| cypher = "MATCH (d:Document) RETURN d.id as id, d.filename as filename, d.status as status, d.uploaded_at as uploaded_at" |
| res = await store.execute_query(cypher) |
| return {"documents": res} |
|
|
| @router.delete("/documents/{doc_id}", summary="Delete document and cascade graph chunks") |
| async def delete_document( |
| doc_id: str, |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store), |
| ): |
| |
| cypher = """ |
| MATCH (d:Document {id: $doc_id}) |
| OPTIONAL MATCH (d)-[:CONTAINS]->(c:Chunk) |
| DETACH DELETE c, d |
| """ |
| await store.execute_query(cypher, {"doc_id": doc_id}) |
| return {"status": "success", "message": f"Document {doc_id} and components deleted."} |
|
|
| |
|
|
| @router.get("/ontology/pending", summary="List pending ontology suggestions") |
| async def get_pending_ontology( |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store), |
| ): |
| cypher = "MATCH (o:DriftReport) WHERE o.status = 'pending' RETURN o.id as id, o.new_entity_types as new_entity_types, o.new_relationship_types as new_relationship_types" |
| res = await store.execute_query(cypher) |
| return {"proposals": res} |
|
|
| @router.post("/ontology/approve/{prop_id}", summary="Approve ontology type") |
| async def approve_ontology( |
| prop_id: str, |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store) |
| ): |
| from ..services.ontology_drift_detector import OntologyDriftDetector |
| from ..config import settings |
| detector = OntologyDriftDetector( |
| graph_store=store, |
| llm_provider=settings.default_llm_provider, |
| ) |
| success = await detector.apply_drift_report( |
| report_id=prop_id, |
| approved_by=admin_user.username, |
| ) |
| if not success: |
| raise HTTPException(status_code=404, detail="Drift report not found") |
| return {"status": "approved", "id": prop_id} |
|
|
| @router.post("/ontology/reject/{prop_id}", summary="Reject ontology type") |
| async def reject_ontology( |
| prop_id: str, |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store) |
| ): |
| cypher = "MATCH (o:DriftReport {id: $prop_id}) SET o.status = 'rejected' RETURN o" |
| await store.execute_query(cypher, {"prop_id": prop_id}) |
| return {"status": "rejected", "id": prop_id} |
|
|
| |
|
|
| @router.get("/users", summary="List all system users") |
| async def list_users( |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store), |
| ): |
| cypher = "MATCH (u:User) RETURN u.username as username, u.scopes as scopes, u.disabled as disabled" |
| res = await store.execute_query(cypher) |
| |
| if not res: |
| res = [] |
| return {"users": res} |
|
|
| @router.put("/users/{username}/role", summary="Update user role/scopes") |
| async def update_user_role( |
| username: str, |
| payload: dict, |
| admin_user: User = Depends(check_admin_scope), |
| store: Neo4jStore = Depends(get_graph_store) |
| ): |
| scopes = payload.get("scopes", []) |
| if username == admin_user.username and "admin" not in scopes: |
| raise HTTPException(status_code=400, detail="Cannot remove your own admin privileges.") |
| cypher = "MATCH (u:User {username: $username}) SET u.scopes = $scopes RETURN u" |
| await store.execute_query(cypher, {"username": username, "scopes": scopes}) |
| return {"status": "success", "username": username, "new_scopes": scopes} |
|
|
|
|