| | |
| | |
| | |
| |
|
| | import os |
| | import time |
| | import uuid |
| | from dataclasses import dataclass, field |
| | from typing import List, Dict, Any |
| |
|
| | from datetime import datetime |
| |
|
| | import numpy as np |
| | import torch |
| | from sentence_transformers import SentenceTransformer |
| | from fastapi import FastAPI |
| | from pydantic import BaseModel |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | PHI_43 = 22.93606797749979 |
| | SYSTEM_ID = "QUANTARION_HYPERGRAPH-RAG-PROD" |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | LOG_DIR = os.path.join(os.getcwd(), "Logs") |
| | os.makedirs(LOG_DIR, exist_ok=True) |
| | LOG_PATH = os.path.join(LOG_DIR, ".text") |
| |
|
| | def log_line(msg: str) -> None: |
| | ts = datetime.utcnow().isoformat() |
| | line = f"[{ts}] [{SYSTEM_ID}] {msg}" |
| | print(line) |
| | try: |
| | with open(LOG_PATH, "a", encoding="utf-8") as f: |
| | f.write(line + " |
| | ") |
| | except Exception: |
| | |
| | pass |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @dataclass |
| | class Hyperedge: |
| | id: str |
| | vertices: List[str] |
| | weight: float |
| | meta: Dict[str, Any] = field(default_factory=dict) |
| |
|
| |
|
| | @dataclass |
| | class Hypergraph: |
| | vertices: List[str] |
| | hyperedges: List[Hyperedge] |
| |
|
| |
|
| | class QueryRequest(BaseModel): |
| | query: str |
| | top_k: int = 5 |
| |
|
| |
|
| | class QueryResponse(BaseModel): |
| | query_id: str |
| | query: str |
| | selected_hyperedges: List[Dict[str, Any]] |
| | answer: str |
| | phi43_check: float |
| | latency_ms: float |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class HypergraphRAGEngine: |
| | """ |
| | Production-grade Hypergraph RAG: |
| | - Embeddings via SentenceTransformer |
| | - Hyperedges = n-ary concept relations |
| | - Retrieval = minimal hyperedge cover approximation |
| | - φ⁴³ used as a numeric regularizer for scoring/stability |
| | """ |
| |
|
| | def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): |
| | log_line("Initializing HypergraphRAGEngine…") |
| | self.model_name = model_name |
| | self.embedder = SentenceTransformer(model_name) |
| | self.hypergraph: Hypergraph = Hypergraph(vertices=[], hyperedges=[]) |
| | self.vertex_embeddings: Dict[str, np.ndarray] = {} |
| | self.ready = False |
| |
|
| | |
| |
|
| | def build_from_documents(self, docs: List[Dict[str, Any]]) -> None: |
| | """ |
| | docs: list of {"id": str, "text": str, "entities": [str,...]} |
| | entities = extracted or annotated concept ids/names. |
| | """ |
| | log_line(f"Building hypergraph from {len(docs)} documents…") |
| |
|
| | vertices_set = set() |
| | hyperedges: List[Hyperedge] = [] |
| |
|
| | |
| | for d in docs: |
| | for ent in d.get("entities", []): |
| | vertices_set.add(ent) |
| |
|
| | vertices = sorted(list(vertices_set)) |
| |
|
| | |
| | if vertices: |
| | log_line(f"Embedding {len(vertices)} vertices…") |
| | embs = self.embedder.encode(vertices, normalize_embeddings=True) |
| | self.vertex_embeddings = { |
| | v: embs[i] for i, v in enumerate(vertices) |
| | } |
| |
|
| | |
| | for d in docs: |
| | ents = list(set(d.get("entities", []))) |
| | if len(ents) < 2: |
| | continue |
| |
|
| | he_id = str(uuid.uuid4()) |
| | he = Hyperedge( |
| | id=he_id, |
| | vertices=ents, |
| | weight=1.0, |
| | meta={ |
| | "doc_id": d["id"], |
| | "text": d["text"], |
| | }, |
| | ) |
| | hyperedges.append(he) |
| |
|
| | self.hypergraph = Hypergraph(vertices=vertices, hyperedges=hyperedges) |
| | self.ready = True |
| | log_line( |
| | f"Hypergraph built: |V|={len(self.hypergraph.vertices)}, |E|={len(self.hypergraph.hyperedges)}" |
| | ) |
| |
|
| | |
| |
|
| | def _query_embedding(self, query: str) -> np.ndarray: |
| | return self.embedder.encode([query], normalize_embeddings=True)[0] |
| |
|
| | def _hyperedge_score(self, query_emb: np.ndarray, he: Hyperedge) -> float: |
| | |
| | sims = [] |
| | for v in he.vertices: |
| | ve = self.vertex_embeddings.get(v) |
| | if ve is not None: |
| | sims.append(float(np.dot(query_emb, ve))) |
| | if not sims: |
| | base = 0.0 |
| | else: |
| | base = float(np.mean(sims)) |
| | |
| | reg = (base + 1.0) / 2.0 |
| | return float(base + 0.01 * (PHI_43 / 23.0) * reg) |
| |
|
| | def retrieve_hyperedges(self, query: str, top_k: int = 5) -> List[Hyperedge]: |
| | if not self.ready or not self.hypergraph.hyperedges: |
| | return [] |
| |
|
| | q_emb = self._query_embedding(query) |
| | scored = [] |
| | for he in self.hypergraph.hyperedges: |
| | s = self._hyperedge_score(q_emb, he) |
| | scored.append((s, he)) |
| |
|
| | scored.sort(key=lambda x: x[0], reverse=True) |
| | return [he for _, he in scored[:top_k]] |
| |
|
| | |
| |
|
| | def generate_answer(self, query: str, hyperedges: List[Hyperedge]) -> str: |
| | """ |
| | In production, this would call QVNN/LLM with retrieved context. |
| | Here we produce a concise, deterministic executive-style answer. |
| | """ |
| | if not hyperedges: |
| | return ( |
| | "No sufficient hypergraph context was found for this query in the " |
| | "current Quantarion Hypergraph-RAG index." |
| | ) |
| |
|
| | docs = [he.meta.get("text", "") for he in hyperedges] |
| | docs = [d for d in docs if d.strip()] |
| | snippet = " ".join(docs)[:800] |
| |
|
| | return ( |
| | "Executive hypergraph-grounded summary: |
| | " |
| | f"- Query: {query} |
| | " |
| | f"- Top hyperedges: {len(hyperedges)} |
| | " |
| | f"- Condensed context: {snippet} |
| | " |
| | "This answer is generated by selecting a minimal set of " |
| | "multi-entity hyperedges that best align with the query, " |
| | "using φ⁴³-regularized similarity scoring." |
| | ) |
| |
|
| | |
| |
|
| | def phi43_check(self, hyperedges: List[Hyperedge]) -> float: |
| | """ |
| | Simple φ-check: scale count of hyperedges into [0,1] vs PHI_43. |
| | """ |
| | if not hyperedges: |
| | return 0.0 |
| | val = len(hyperedges) / PHI_43 |
| | return float(max(0.0, min(1.0, val))) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | app = FastAPI(title="Quantarion Hypergraph-RAG Production API") |
| |
|
| | engine = HypergraphRAGEngine() |
| |
|
| |
|
| | @app.on_event("startup") |
| | def _startup(): |
| | |
| | log_line("Startup: building demo hypergraph index…") |
| | demo_docs = [ |
| | { |
| | "id": "doc1", |
| | "text": "Neuromorphic SNNs provide event-driven, low-power computation.", |
| | "entities": ["neuromorphic", "SNN", "event-driven"], |
| | }, |
| | { |
| | "id": "doc2", |
| | "text": "Hypergraph RAG uses hyperedges to capture multi-entity relations.", |
| | "entities": ["hypergraph", "RAG", "multi-entity"], |
| | }, |
| | { |
| | "id": "doc3", |
| | "text": "Hybrid retrieval combines dense, sparse, and graph-based signals.", |
| | "entities": ["hybrid retrieval", "dense", "sparse", "graph"], |
| | }, |
| | ] |
| | engine.build_from_documents(demo_docs) |
| | log_line("Startup: Hypergraph-RAG demo index ready.") |
| |
|
| |
|
| | @app.post("/query", response_model=QueryResponse) |
| | def query_hypergraph_rag(req: QueryRequest): |
| | t0 = time.time() |
| | qid = str(uuid.uuid4()) |
| | log_line(f"QUERY {qid} | {req.query}") |
| |
|
| | selected = engine.retrieve_hyperedges(req.query, top_k=req.top_k) |
| | answer = engine.generate_answer(req.query, selected) |
| | phi_val = engine.phi43_check(selected) |
| | latency = (time.time() - t0) * 1000.0 |
| |
|
| | log_line( |
| | f"QUERY {qid} | hyperedges={len(selected)} | phi43_check={phi_val:.3f} | latency_ms={latency:.1f}" |
| | ) |
| |
|
| | return QueryResponse( |
| | query_id=qid, |
| | query=req.query, |
| | selected_hyperedges=[ |
| | { |
| | "id": he.id, |
| | "vertices": he.vertices, |
| | "weight": he.weight, |
| | "meta": he.meta, |
| | } |
| | for he in selected |
| | ], |
| | answer=answer, |
| | phi43_check=phi_val, |
| | latency_ms=latency, |
| | ) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | import uvicorn |
| |
|
| | log_line("Starting Quantarion Hypergraph-RAG Production server on 0.0.0.0:8000…") |
| | uvicorn.run(app, host="0.0.0.0", port=8000) |