GitHub Actions commited on
Commit
5941cd9
·
1 Parent(s): d1df4ad

Deploy a9a1f6e

Browse files
app/core/config.py CHANGED
@@ -55,14 +55,6 @@ class Settings(BaseSettings):
55
  GEMINI_MODEL: str = "gemini-2.5-flash-lite"
56
  GEMINI_CONTEXT_PATH: str = "backend/app/services/gemini_context.toon"
57
 
58
- # Durable GitHub interaction log — survives HF Space restarts.
59
- # PERSONABOT_WRITE_TOKEN: fine-grained PAT with read+write Contents access
60
- # on the PersonaBot repo. When set, every interaction is appended to
61
- # data/interactions.jsonl in the repo so training signals persist.
62
- # Leave unset in local dev (interactions stay in SQLite only).
63
- PERSONABOT_WRITE_TOKEN: Optional[str] = None
64
- PERSONABOT_REPO: str = "1337Xcode/PersonaBot"
65
-
66
  # HuggingFace Space model servers.
67
  # In local env, embedder/reranker run in-process (these URLs are ignored).
68
  # In prod, the API Space calls the HF embedder/reranker Spaces via HTTP.
 
55
  GEMINI_MODEL: str = "gemini-2.5-flash-lite"
56
  GEMINI_CONTEXT_PATH: str = "backend/app/services/gemini_context.toon"
57
 
 
 
 
 
 
 
 
 
58
  # HuggingFace Space model servers.
59
  # In local env, embedder/reranker run in-process (these URLs are ignored).
60
  # In prod, the API Space calls the HF embedder/reranker Spaces via HTTP.
app/main.py CHANGED
@@ -23,7 +23,6 @@ from app.pipeline.graph import build_pipeline
23
  from app.security.rate_limiter import limiter, custom_rate_limit_handler
24
  from app.services.embedder import Embedder
25
  from app.services.gemini_client import GeminiClient
26
- from app.services.github_log import GithubLog
27
  from app.services.llm_client import get_llm_client, TpmBucket
28
  from app.services.reranker import Reranker
29
  from app.services.semantic_cache import SemanticCache
@@ -69,17 +68,6 @@ def _normalize_qdrant_url(url: str) -> str:
69
  return raw
70
 
71
 
72
- def _sqlite_row_count(db_path: str) -> int:
73
- """Return the current interactions row count, or 0 if the table doesn't exist."""
74
- try:
75
- with sqlite3.connect(db_path) as conn:
76
- return conn.execute("SELECT COUNT(*) FROM interactions").fetchone()[0]
77
- except sqlite3.OperationalError:
78
- return 0
79
- except Exception:
80
- return 0
81
-
82
-
83
  async def _qdrant_keepalive_loop(
84
  qdrant: QdrantClient,
85
  interval_seconds: int,
@@ -112,31 +100,13 @@ async def lifespan(app: FastAPI):
112
  settings = get_settings()
113
  logger.info("Starting PersonaBot API | env=%s", settings.ENVIRONMENT)
114
 
115
- # Durable GitHub interaction log — survives HF Space restarts.
116
- # When PERSONABOT_WRITE_TOKEN is not set (local dev), GithubLog.enabled=False
117
- # and all append calls are silent no-ops.
118
- github_log = GithubLog(
119
- write_token=settings.PERSONABOT_WRITE_TOKEN or "",
120
- repo=settings.PERSONABOT_REPO,
121
- )
122
- app.state.github_log = github_log
123
-
124
  # Attach the in-memory semantic cache. No external service required.
125
  app.state.semantic_cache = SemanticCache(
126
  max_size=settings.SEMANTIC_CACHE_SIZE,
127
  ttl_seconds=settings.SEMANTIC_CACHE_TTL_SECONDS,
128
  similarity_threshold=settings.SEMANTIC_CACHE_SIMILARITY_THRESHOLD,
129
  )
130
- app.state.conversation_store = ConversationStore(settings.DB_PATH, github_log=github_log)
131
-
132
- # Issue 1: reconstruct SQLite conversation history from the durable GitHub log
133
- # after an ephemeral HF Space restart. Only triggers when SQLite is empty
134
- # (<10 rows) so a healthy Space with accumulated data is never overwritten.
135
- if github_log.enabled and _sqlite_row_count(settings.DB_PATH) < 10:
136
- logger.info("SQLite appears empty — attempting reconstruction from durable log.")
137
- recent = await github_log.load_recent(500)
138
- if recent:
139
- app.state.conversation_store.populate_from_records(recent)
140
 
141
  # DagsHub/MLflow experiment tracking — optional, only active when token is set.
142
  # In prod with DAGSHUB_TOKEN set, experiments are tracked at dagshub.com.
@@ -222,7 +192,6 @@ async def lifespan(app: FastAPI):
222
  "vector_store": vector_store,
223
  "reranker": reranker,
224
  "db_path": settings.DB_PATH,
225
- "github_log": github_log,
226
  })
227
  app.state.settings = settings
228
  app.state.qdrant = qdrant
 
23
  from app.security.rate_limiter import limiter, custom_rate_limit_handler
24
  from app.services.embedder import Embedder
25
  from app.services.gemini_client import GeminiClient
 
26
  from app.services.llm_client import get_llm_client, TpmBucket
27
  from app.services.reranker import Reranker
28
  from app.services.semantic_cache import SemanticCache
 
68
  return raw
69
 
70
 
 
 
 
 
 
 
 
 
 
 
 
71
  async def _qdrant_keepalive_loop(
72
  qdrant: QdrantClient,
73
  interval_seconds: int,
 
100
  settings = get_settings()
101
  logger.info("Starting PersonaBot API | env=%s", settings.ENVIRONMENT)
102
 
 
 
 
 
 
 
 
 
 
103
  # Attach the in-memory semantic cache. No external service required.
104
  app.state.semantic_cache = SemanticCache(
105
  max_size=settings.SEMANTIC_CACHE_SIZE,
106
  ttl_seconds=settings.SEMANTIC_CACHE_TTL_SECONDS,
107
  similarity_threshold=settings.SEMANTIC_CACHE_SIMILARITY_THRESHOLD,
108
  )
109
+ app.state.conversation_store = ConversationStore(settings.DB_PATH)
 
 
 
 
 
 
 
 
 
110
 
111
  # DagsHub/MLflow experiment tracking — optional, only active when token is set.
112
  # In prod with DAGSHUB_TOKEN set, experiments are tracked at dagshub.com.
 
192
  "vector_store": vector_store,
193
  "reranker": reranker,
194
  "db_path": settings.DB_PATH,
 
195
  })
196
  app.state.settings = settings
197
  app.state.qdrant = qdrant
app/pipeline/graph.py CHANGED
@@ -121,7 +121,7 @@ def build_pipeline(services: dict) -> CompiledStateGraph:
121
  # CRAG: query rewrite on failed retrieval — runs up to twice for portfolio queries.
122
  graph.add_node("rewrite_query", make_rewrite_query_node(services["gemini"]))
123
  graph.add_node("generate", make_generate_node(services["llm"], services["gemini"]))
124
- graph.add_node("log_eval", make_log_eval_node(services["db_path"], services.get("github_log")))
125
 
126
  graph.set_entry_point("guard")
127
 
 
121
  # CRAG: query rewrite on failed retrieval — runs up to twice for portfolio queries.
122
  graph.add_node("rewrite_query", make_rewrite_query_node(services["gemini"]))
123
  graph.add_node("generate", make_generate_node(services["llm"], services["gemini"]))
124
+ graph.add_node("log_eval", make_log_eval_node(services["db_path"]))
125
 
126
  graph.set_entry_point("guard")
127
 
app/pipeline/nodes/log_eval.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import json
2
  import logging
3
  import sqlite3
@@ -6,16 +7,23 @@ from datetime import datetime, timezone
6
  from typing import Callable
7
 
8
  from app.models.pipeline import PipelineState
9
- from app.core.config import get_settings
10
 
11
  logger = logging.getLogger(__name__)
 
12
 
13
 
14
- def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState], dict]:
 
 
 
 
 
 
 
15
  """
16
  Writes interaction to SQLite synchronously (<5ms) inside the request lifespan.
17
- Also appends to the durable GitHub JSONL log (fire-and-forget background task)
18
- so training signals survive HuggingFace Space restarts.
19
 
20
  The `path` field tags which pipeline branch produced the answer:
21
  "cache_hit" — served from semantic cache, no LLM called.
@@ -50,6 +58,7 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
50
  for c in state.get("reranked_chunks", [])]
51
  )
52
  path = state.get("path") or "rag"
 
53
 
54
  with sqlite3.connect(db_path) as conn:
55
  conn.execute(
@@ -71,7 +80,8 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
71
  critic_completeness INTEGER,
72
  critic_specificity INTEGER,
73
  critic_quality TEXT,
74
- is_enumeration_query BOOLEAN DEFAULT 0
 
75
  )
76
  """
77
  )
@@ -92,6 +102,7 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
92
  # RC-13: retrieval diagnostics
93
  ("sibling_expansion_count", "INTEGER"),
94
  ("focused_source_type", "TEXT"),
 
95
  ]:
96
  try:
97
  conn.execute(f"ALTER TABLE interactions ADD COLUMN {col} {definition}")
@@ -104,8 +115,8 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
104
  (timestamp, session_id, query, answer, chunks_used, rerank_scores,
105
  reranked_chunks_json, latency_ms, cached, path,
106
  critic_groundedness, critic_completeness, critic_specificity, critic_quality,
107
- is_enumeration_query, sibling_expansion_count, focused_source_type)
108
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
109
  """,
110
  (
111
  datetime.now(tz=timezone.utc).isoformat(),
@@ -125,49 +136,50 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
125
  state.get("is_enumeration_query", False),
126
  state.get("sibling_expansion_count"),
127
  state.get("focused_source_type"),
 
128
  ),
129
  )
130
  return cursor.lastrowid # type: ignore[return-value]
131
 
132
- async def log_eval_node(state: PipelineState) -> dict:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
133
  try:
134
  row_id = _write_to_sqlite(state)
135
-
136
- # Append to durable GitHub log (fire-and-forget — never blocks the response).
137
- if github_log is not None and github_log.enabled:
138
- path = state.get("path") or "rag"
139
- record = {
140
- "timestamp": datetime.now(tz=timezone.utc).isoformat(),
141
- "session_id": state.get("session_id", ""),
142
- "query": state.get("query", ""),
143
- "answer": state.get("answer", ""),
144
- "chunks_used": json.loads(
145
- json.dumps([c["metadata"]["doc_id"] for c in state.get("reranked_chunks", [])])
146
- ),
147
- "reranked_chunks_json": [
148
- {
149
- "doc_id": c["metadata"].get("doc_id", ""),
150
- "source_title": c["metadata"].get("source_title", ""),
151
- "source_type": c["metadata"].get("source_type", ""),
152
- "section": c["metadata"].get("section", ""),
153
- }
154
- for c in state.get("reranked_chunks", [])
155
- ],
156
- "rerank_scores": [
157
- c["metadata"].get("rerank_score", 0.0)
158
- for c in state.get("reranked_chunks", [])
159
- ],
160
- "latency_ms": state.get("latency_ms", 0),
161
- "cached": state.get("cached", False),
162
- "feedback": 0,
163
- "path": path,
164
- "critic_groundedness": state.get("critic_groundedness"),
165
- "critic_completeness": state.get("critic_completeness"),
166
- "critic_specificity": state.get("critic_specificity"),
167
- "critic_quality": state.get("critic_quality"),
168
- "is_enumeration_query": state.get("is_enumeration_query", False),
169
- }
170
- github_log.append(record)
171
 
172
  return {"interaction_id": row_id}
173
  except Exception as e:
 
1
+ import asyncio
2
  import json
3
  import logging
4
  import sqlite3
 
7
  from typing import Callable
8
 
9
  from app.models.pipeline import PipelineState
10
+ from app.services.loki_sink import ship_to_loki
11
 
12
  logger = logging.getLogger(__name__)
13
+ _PENDING_TASKS: set[asyncio.Task[None]] = set()
14
 
15
 
16
+ def _source_hit_proxy(state: PipelineState) -> int:
17
+ reranked_chunks = state.get("reranked_chunks", [])
18
+ chunk_count = len(reranked_chunks)
19
+ top_score = state.get("top_rerank_score")
20
+ return int(top_score is not None and top_score > -1.5 and chunk_count >= 2)
21
+
22
+
23
+ def make_log_eval_node(db_path: str) -> Callable[[PipelineState], dict]:
24
  """
25
  Writes interaction to SQLite synchronously (<5ms) inside the request lifespan.
26
+ Ships a sanitised observability record to Grafana Loki asynchronously.
 
27
 
28
  The `path` field tags which pipeline branch produced the answer:
29
  "cache_hit" — served from semantic cache, no LLM called.
 
58
  for c in state.get("reranked_chunks", [])]
59
  )
60
  path = state.get("path") or "rag"
61
+ source_hit_proxy = _source_hit_proxy(state)
62
 
63
  with sqlite3.connect(db_path) as conn:
64
  conn.execute(
 
80
  critic_completeness INTEGER,
81
  critic_specificity INTEGER,
82
  critic_quality TEXT,
83
+ is_enumeration_query BOOLEAN DEFAULT 0,
84
+ source_hit_proxy INTEGER DEFAULT 0
85
  )
86
  """
87
  )
 
102
  # RC-13: retrieval diagnostics
103
  ("sibling_expansion_count", "INTEGER"),
104
  ("focused_source_type", "TEXT"),
105
+ ("source_hit_proxy", "INTEGER DEFAULT 0"),
106
  ]:
107
  try:
108
  conn.execute(f"ALTER TABLE interactions ADD COLUMN {col} {definition}")
 
115
  (timestamp, session_id, query, answer, chunks_used, rerank_scores,
116
  reranked_chunks_json, latency_ms, cached, path,
117
  critic_groundedness, critic_completeness, critic_specificity, critic_quality,
118
+ is_enumeration_query, sibling_expansion_count, focused_source_type, source_hit_proxy)
119
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
120
  """,
121
  (
122
  datetime.now(tz=timezone.utc).isoformat(),
 
136
  state.get("is_enumeration_query", False),
137
  state.get("sibling_expansion_count"),
138
  state.get("focused_source_type"),
139
+ source_hit_proxy,
140
  ),
141
  )
142
  return cursor.lastrowid # type: ignore[return-value]
143
 
144
+ def _build_loki_record(state: PipelineState) -> dict:
145
+ return {
146
+ "timestamp": datetime.now(tz=timezone.utc).isoformat(),
147
+ "session_id": state.get("session_id", ""),
148
+ "query": state.get("query", ""),
149
+ "answer": state.get("answer", ""),
150
+ "path": state.get("path") or "rag",
151
+ "cached": state.get("cached", False),
152
+ "latency_ms": state.get("latency_ms", 0),
153
+ "retrieval_attempts": state.get("retrieval_attempts", 0),
154
+ "top_rerank_score": state.get("top_rerank_score"),
155
+ "focused_source_type": state.get("focused_source_type"),
156
+ "sibling_expansion_count": state.get("sibling_expansion_count"),
157
+ "critic_groundedness": state.get("critic_groundedness"),
158
+ "critic_completeness": state.get("critic_completeness"),
159
+ "critic_specificity": state.get("critic_specificity"),
160
+ "critic_quality": state.get("critic_quality"),
161
+ "is_enumeration_query": state.get("is_enumeration_query", False),
162
+ "guard_passed": state.get("guard_passed", False),
163
+ "query_complexity": state.get("query_complexity", ""),
164
+ "is_followup": state.get("is_followup", False),
165
+ "is_audio_mode": state.get("is_audio_mode", False),
166
+ "follow_ups": state.get("follow_ups", []),
167
+ "reranked_chunks": state.get("reranked_chunks", []),
168
+ "source_hit_proxy": _source_hit_proxy(state),
169
+ }
170
+
171
+ def log_eval_node(state: PipelineState) -> dict:
172
  try:
173
  row_id = _write_to_sqlite(state)
174
+ try:
175
+ loop = asyncio.get_running_loop()
176
+ task = loop.create_task(ship_to_loki(_build_loki_record(state)))
177
+ _PENDING_TASKS.add(task)
178
+ task.add_done_callback(_PENDING_TASKS.discard)
179
+ except RuntimeError:
180
+ # Called outside an event loop (for example, synchronous unit tests).
181
+ # SQLite write still succeeds; Loki shipping is skipped.
182
+ logger.debug("No running event loop; skipping async Loki ship for this call.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
183
 
184
  return {"interaction_id": row_id}
185
  except Exception as e:
app/services/conversation_store.py CHANGED
@@ -32,9 +32,8 @@ class ConversationStore:
32
  One instance is created at startup and shared across all requests via app.state.
33
  """
34
 
35
- def __init__(self, db_path: str, github_log=None) -> None:
36
  self._db_path = db_path
37
- self._github_log = github_log
38
  self._ensure_summary_table()
39
 
40
  def _ensure_summary_table(self) -> None:
@@ -136,79 +135,3 @@ class ConversationStore:
136
  except Exception as exc:
137
  logger.warning("ConversationStore.mark_last_negative SQLite failed: %s", exc)
138
 
139
- if self._github_log is not None:
140
- self._github_log.append_feedback(session_id, feedback=-1)
141
-
142
- def populate_from_records(self, records: list[dict]) -> None:
143
- """
144
- Replay interaction records from the durable GitHub log into SQLite.
145
- Called at startup when SQLite is empty after a Space restart.
146
- """
147
- import os
148
- db_dir = os.path.dirname(self._db_path)
149
- if db_dir:
150
- os.makedirs(db_dir, exist_ok=True)
151
-
152
- interaction_records = [
153
- r for r in records
154
- if r.get("type") != "feedback" and r.get("query")
155
- ]
156
- if not interaction_records:
157
- return
158
-
159
- try:
160
- with sqlite3.connect(self._db_path) as conn:
161
- conn.execute(
162
- """
163
- CREATE TABLE IF NOT EXISTS interactions (
164
- id INTEGER PRIMARY KEY AUTOINCREMENT,
165
- timestamp TEXT,
166
- session_id TEXT,
167
- query TEXT,
168
- answer TEXT,
169
- chunks_used TEXT,
170
- rerank_scores TEXT,
171
- reranked_chunks_json TEXT,
172
- latency_ms INTEGER,
173
- cached BOOLEAN,
174
- feedback INTEGER DEFAULT 0,
175
- path TEXT DEFAULT 'rag'
176
- )
177
- """
178
- )
179
- feedback_corrections: dict[str, int] = {}
180
- for r in records:
181
- if r.get("type") == "feedback":
182
- feedback_corrections[r["session_id"]] = r.get("feedback", 0)
183
-
184
- for r in interaction_records:
185
- sid = r.get("session_id", "")
186
- feedback = feedback_corrections.get(sid, r.get("feedback", 0))
187
- conn.execute(
188
- """
189
- INSERT INTO interactions
190
- (timestamp, session_id, query, answer, chunks_used,
191
- rerank_scores, reranked_chunks_json, latency_ms, cached, feedback, path)
192
- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
193
- """,
194
- (
195
- r.get("timestamp", datetime.now(tz=timezone.utc).isoformat()),
196
- sid,
197
- r.get("query", ""),
198
- r.get("answer", ""),
199
- json.dumps(r.get("chunks_used", [])),
200
- json.dumps(r.get("rerank_scores", [])),
201
- json.dumps(r.get("reranked_chunks_json", [])),
202
- r.get("latency_ms", 0),
203
- r.get("cached", False),
204
- feedback,
205
- r.get("path", "rag"),
206
- ),
207
- )
208
- logger.info(
209
- "Reconstructed %d interactions from durable GitHub log into SQLite.",
210
- len(interaction_records),
211
- )
212
- except Exception as exc:
213
- logger.warning("ConversationStore.populate_from_records failed: %s", exc)
214
-
 
32
  One instance is created at startup and shared across all requests via app.state.
33
  """
34
 
35
+ def __init__(self, db_path: str) -> None:
36
  self._db_path = db_path
 
37
  self._ensure_summary_table()
38
 
39
  def _ensure_summary_table(self) -> None:
 
135
  except Exception as exc:
136
  logger.warning("ConversationStore.mark_last_negative SQLite failed: %s", exc)
137
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
app/services/loki_sink.py ADDED
@@ -0,0 +1,119 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import hashlib
4
+ import json
5
+ import logging
6
+ import os
7
+ import time
8
+ from typing import Any
9
+
10
+ import httpx
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+ _LOKI_URL = os.environ.get("LOKI_URL", "").strip()
15
+ _LOKI_USERNAME = os.environ.get("LOKI_USERNAME", "").strip()
16
+ _LOKI_API_KEY = os.environ.get("LOKI_API_KEY", "").strip()
17
+
18
+ _LOKI_ENABLED = bool(_LOKI_URL)
19
+ if not _LOKI_ENABLED:
20
+ logger.warning("LOKI_URL is not set; Loki shipping is disabled.")
21
+
22
+
23
+ def _sha_prefix(value: str, prefix_len: int) -> str:
24
+ return hashlib.sha256(value.encode("utf-8")).hexdigest()[:prefix_len]
25
+
26
+
27
+ def _extract_chunk_metrics(reranked_chunks: list[dict[str, Any]]) -> tuple[list[float], list[str], int, str]:
28
+ rerank_scores: list[float] = []
29
+ source_types: set[str] = set()
30
+ for chunk in reranked_chunks:
31
+ metadata = chunk.get("metadata", {})
32
+ score = metadata.get("rerank_score")
33
+ if isinstance(score, (int, float)):
34
+ rerank_scores.append(float(score))
35
+ source_type = str(metadata.get("source_type", "") or "")
36
+ if source_type:
37
+ source_types.add(source_type)
38
+
39
+ chunk_count = len(reranked_chunks)
40
+ top_chunk_doc_id = ""
41
+ if chunk_count > 0:
42
+ top_chunk_doc_id = str(reranked_chunks[0].get("metadata", {}).get("doc_id", "") or "")
43
+
44
+ return rerank_scores, sorted(source_types), chunk_count, top_chunk_doc_id
45
+
46
+
47
+ def _to_float_or_none(value: Any) -> float | None:
48
+ if isinstance(value, (int, float)):
49
+ return float(value)
50
+ return None
51
+
52
+
53
+ def _build_sanitized_record(record: dict[str, Any]) -> dict[str, Any]:
54
+ reranked_chunks = record.get("reranked_chunks") or []
55
+ query = str(record.get("query", ""))
56
+ session_id = str(record.get("session_id", ""))
57
+ rerank_scores, source_types_used, chunk_count, top_chunk_doc_id = _extract_chunk_metrics(reranked_chunks)
58
+
59
+ top_rerank_score = _to_float_or_none(record.get("top_rerank_score"))
60
+ source_hit_proxy = int(top_rerank_score is not None and top_rerank_score > -1.5 and chunk_count >= 2)
61
+
62
+ return {
63
+ "timestamp": str(record.get("timestamp", "")),
64
+ "session_id": _sha_prefix(session_id, 12) if session_id else "",
65
+ "path": str(record.get("path", "rag") or "rag"),
66
+ "cached": bool(record.get("cached", False)),
67
+ "latency_ms": int(record.get("latency_ms", 0) or 0),
68
+ "retrieval_attempts": int(record.get("retrieval_attempts", 0) or 0),
69
+ "top_rerank_score": top_rerank_score,
70
+ "focused_source_type": str(record.get("focused_source_type", "") or ""),
71
+ "sibling_expansion_count": int(record.get("sibling_expansion_count", 0) or 0),
72
+ "critic_groundedness": record.get("critic_groundedness"),
73
+ "critic_completeness": record.get("critic_completeness"),
74
+ "critic_specificity": record.get("critic_specificity"),
75
+ "critic_quality": str(record.get("critic_quality", "") or ""),
76
+ "is_enumeration_query": bool(record.get("is_enumeration_query", False)),
77
+ "guard_passed": bool(record.get("guard_passed", False)),
78
+ "query_complexity": str(record.get("query_complexity", "") or ""),
79
+ "is_followup": bool(record.get("is_followup", False)),
80
+ "is_audio_mode": bool(record.get("is_audio_mode", False)),
81
+ "query_hash": _sha_prefix(query, 16) if query else "",
82
+ "chunk_count": chunk_count,
83
+ "top_chunk_doc_id": top_chunk_doc_id,
84
+ "source_types_used": source_types_used,
85
+ "follow_up_count": len(record.get("follow_ups") or []),
86
+ "rerank_scores": rerank_scores,
87
+ "source_hit_proxy": source_hit_proxy,
88
+ }
89
+
90
+
91
+ async def ship_to_loki(record: dict[str, Any]) -> None:
92
+ if not _LOKI_ENABLED:
93
+ return
94
+
95
+ try:
96
+ sanitized = _build_sanitized_record(record)
97
+ payload = {
98
+ "streams": [
99
+ {
100
+ "stream": {
101
+ "app": "personabot",
102
+ "env": "production",
103
+ "path": str(sanitized.get("path", "rag") or "rag"),
104
+ "critic_quality": str(sanitized.get("critic_quality", "") or ""),
105
+ "focused_source_type": str(sanitized.get("focused_source_type", "") or ""),
106
+ },
107
+ "values": [
108
+ [str(time.time_ns()), json.dumps(sanitized)]
109
+ ],
110
+ }
111
+ ]
112
+ }
113
+
114
+ timeout = httpx.Timeout(3.0)
115
+ async with httpx.AsyncClient(timeout=timeout, auth=(_LOKI_USERNAME, _LOKI_API_KEY)) as client:
116
+ response = await client.post(_LOKI_URL, json=payload)
117
+ response.raise_for_status()
118
+ except Exception as exc:
119
+ logger.warning("Loki ship failed: %s", exc)
tests/test_log_eval_privacy.py CHANGED
@@ -6,8 +6,7 @@ import pytest
6
  from app.pipeline.nodes.log_eval import make_log_eval_node
7
 
8
 
9
- @pytest.mark.asyncio
10
- async def test_log_eval_stores_chunk_metadata_without_text(tmp_path) -> None:
11
  db_path = str(tmp_path / "interactions.db")
12
  node = make_log_eval_node(db_path)
13
 
@@ -33,7 +32,7 @@ async def test_log_eval_stores_chunk_metadata_without_text(tmp_path) -> None:
33
  "is_enumeration_query": False,
34
  }
35
 
36
- await node(state)
37
 
38
  with sqlite3.connect(db_path) as conn:
39
  row = conn.execute("SELECT reranked_chunks_json FROM interactions LIMIT 1").fetchone()
 
6
  from app.pipeline.nodes.log_eval import make_log_eval_node
7
 
8
 
9
+ def test_log_eval_stores_chunk_metadata_without_text(tmp_path) -> None:
 
10
  db_path = str(tmp_path / "interactions.db")
11
  node = make_log_eval_node(db_path)
12
 
 
32
  "is_enumeration_query": False,
33
  }
34
 
35
+ node(state)
36
 
37
  with sqlite3.connect(db_path) as conn:
38
  row = conn.execute("SELECT reranked_chunks_json FROM interactions LIMIT 1").fetchone()
tests/test_loki_sink.py ADDED
@@ -0,0 +1,168 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import json
3
+ import sqlite3
4
+
5
+ import httpx
6
+ import pytest
7
+
8
+ from app.pipeline.nodes.log_eval import make_log_eval_node
9
+ from app.services import loki_sink
10
+
11
+
12
+ class _FakeResponse:
13
+ def raise_for_status(self) -> None:
14
+ return None
15
+
16
+
17
+ @pytest.mark.asyncio
18
+ async def test_ship_to_loki_sanitises_pii(monkeypatch) -> None:
19
+ captured: dict = {}
20
+
21
+ class _FakeAsyncClient:
22
+ def __init__(self, *args, **kwargs):
23
+ # No-op test double constructor.
24
+ self._args = args
25
+ self._kwargs = kwargs
26
+
27
+ async def __aenter__(self):
28
+ return self
29
+
30
+ async def __aexit__(self, exc_type, exc, tb):
31
+ return None
32
+
33
+ async def post(self, url, json):
34
+ await asyncio.sleep(0)
35
+ captured["url"] = url
36
+ captured["payload"] = json
37
+ return _FakeResponse()
38
+
39
+ monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True)
40
+ monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push")
41
+ monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123")
42
+ monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test")
43
+ monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FakeAsyncClient)
44
+
45
+ await loki_sink.ship_to_loki(
46
+ {
47
+ "timestamp": "2026-04-21T00:00:00Z",
48
+ "session_id": "session-123",
49
+ "query": "my private query",
50
+ "answer": "long text",
51
+ "path": "rag",
52
+ "cached": False,
53
+ "latency_ms": 100,
54
+ "retrieval_attempts": 1,
55
+ "top_rerank_score": -1.0,
56
+ "focused_source_type": "resume",
57
+ "sibling_expansion_count": 0,
58
+ "critic_groundedness": 3,
59
+ "critic_completeness": 3,
60
+ "critic_specificity": 3,
61
+ "critic_quality": "high",
62
+ "is_enumeration_query": False,
63
+ "guard_passed": True,
64
+ "query_complexity": "simple",
65
+ "is_followup": False,
66
+ "is_audio_mode": False,
67
+ "follow_ups": [],
68
+ "reranked_chunks": [],
69
+ }
70
+ )
71
+
72
+ values = captured["payload"]["streams"][0]["values"]
73
+ assert values
74
+ serialized = values[0][1]
75
+ assert "my private query" not in serialized
76
+ assert "long text" not in serialized
77
+
78
+ payload_record = json.loads(serialized)
79
+ assert payload_record.get("query_hash")
80
+
81
+
82
+ @pytest.mark.asyncio
83
+ async def test_ship_to_loki_noop_when_no_url(monkeypatch) -> None:
84
+ called = {"post": False}
85
+
86
+ class _NeverCalledClient:
87
+ def __init__(self, *args, **kwargs):
88
+ # No-op test double constructor.
89
+ self._args = args
90
+ self._kwargs = kwargs
91
+
92
+ async def __aenter__(self):
93
+ return self
94
+
95
+ async def __aexit__(self, exc_type, exc, tb):
96
+ return None
97
+
98
+ async def post(self, url, json):
99
+ await asyncio.sleep(0)
100
+ called["post"] = True
101
+ return _FakeResponse()
102
+
103
+ monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", False)
104
+ monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _NeverCalledClient)
105
+
106
+ await loki_sink.ship_to_loki({"query": "x", "answer": "y"})
107
+
108
+ assert called["post"] is False
109
+
110
+
111
+ @pytest.mark.asyncio
112
+ async def test_ship_to_loki_swallows_http_error(monkeypatch) -> None:
113
+ class _FailingClient:
114
+ def __init__(self, *args, **kwargs):
115
+ # No-op test double constructor.
116
+ self._args = args
117
+ self._kwargs = kwargs
118
+
119
+ async def __aenter__(self):
120
+ return self
121
+
122
+ async def __aexit__(self, exc_type, exc, tb):
123
+ return None
124
+
125
+ async def post(self, url, json):
126
+ await asyncio.sleep(0)
127
+ raise httpx.ConnectError("connection failed")
128
+
129
+ monkeypatch.setattr(loki_sink, "_LOKI_ENABLED", True)
130
+ monkeypatch.setattr(loki_sink, "_LOKI_URL", "https://example.grafana.net/loki/api/v1/push")
131
+ monkeypatch.setattr(loki_sink, "_LOKI_USERNAME", "123")
132
+ monkeypatch.setattr(loki_sink, "_LOKI_API_KEY", "glc_test")
133
+ monkeypatch.setattr(loki_sink.httpx, "AsyncClient", _FailingClient)
134
+
135
+ await loki_sink.ship_to_loki({"query": "q", "answer": "a", "path": "rag", "reranked_chunks": []})
136
+
137
+
138
+ @pytest.mark.asyncio
139
+ async def test_source_hit_proxy_logged_to_sqlite(tmp_path) -> None:
140
+ db_path = str(tmp_path / "interactions.db")
141
+ node = make_log_eval_node(db_path)
142
+
143
+ state = {
144
+ "session_id": "s1",
145
+ "query": "What work experience does Darshan have?",
146
+ "answer": "He worked at VK Live.",
147
+ "reranked_chunks": [
148
+ {"text": "a", "metadata": {"doc_id": "d1", "source_type": "resume", "rerank_score": -1.0}},
149
+ {"text": "b", "metadata": {"doc_id": "d2", "source_type": "resume", "rerank_score": -1.2}},
150
+ {"text": "c", "metadata": {"doc_id": "d3", "source_type": "resume", "rerank_score": -1.3}},
151
+ ],
152
+ "latency_ms": 123,
153
+ "cached": False,
154
+ "path": "rag",
155
+ "is_enumeration_query": False,
156
+ "top_rerank_score": -1.0,
157
+ "retrieval_attempts": 1,
158
+ "follow_ups": [],
159
+ }
160
+
161
+ result = node(state)
162
+ assert "interaction_id" in result
163
+
164
+ with sqlite3.connect(db_path) as conn:
165
+ row = conn.execute("SELECT source_hit_proxy FROM interactions LIMIT 1").fetchone()
166
+
167
+ assert row is not None
168
+ assert row[0] == 1