GitHub Actions commited on
Commit
d1df4ad
·
1 Parent(s): b8ddf25

Deploy 8ca4892

Browse files
app/models/pipeline.py CHANGED
@@ -60,8 +60,8 @@ class PipelineState(TypedDict):
60
  session_id: str
61
  query_embedding: Optional[list[float]] # set by cache node, reused by retrieve
62
  expanded_queries: Annotated[list[str], operator.add]
63
- retrieved_chunks: Annotated[list[Chunk], operator.add]
64
- reranked_chunks: Annotated[list[Chunk], operator.add]
65
  answer: str
66
  sources: Annotated[list[SourceRef], operator.add]
67
  cached: bool
 
60
  session_id: str
61
  query_embedding: Optional[list[float]] # set by cache node, reused by retrieve
62
  expanded_queries: Annotated[list[str], operator.add]
63
+ retrieved_chunks: list[Chunk]
64
+ reranked_chunks: list[Chunk]
65
  answer: str
66
  sources: Annotated[list[SourceRef], operator.add]
67
  cached: bool
app/pipeline/nodes/gemini_fast.py CHANGED
@@ -1,22 +1,7 @@
1
- """
2
- backend/app/pipeline/nodes/gemini_fast.py
3
-
4
- Fast-path node: Gemini 2.0 Flash answers conversational / general queries
5
- directly from a TOON-encoded portfolio context summary, avoiding full RAG.
6
-
7
- Decision logic:
8
- - Gemini answers → state.answer is set, pipeline skips retrieve/generate.
9
- - Gemini calls search_knowledge_base() → state.thinking=True, pipeline
10
- goes to retrieve+generate so the user gets a cited answer.
11
 
12
- Bug A fix (Issue 2): Gemini fast-path answers now run through the same
13
- is_low_trust() quality gate as Groq answers. If the gate fires (hedge phrase
14
- detected, or suspiciously short complex answer), the answer is discarded
15
- and the pipeline routes to full RAG instead of returning a low-quality answer.
16
-
17
- Issue 7 fix: _is_complex() now requires BOTH a keyword match AND query length
18
- > 8 words, eliminating false-positive complex classifications for short
19
- conversational queries like "How?" or "How many projects?".
20
  """
21
  from __future__ import annotations
22
 
@@ -26,11 +11,8 @@ from typing import Any
26
 
27
  from langgraph.config import get_stream_writer
28
 
29
- from app.core.portfolio_context import is_portfolio_relevant
30
- from app.core.portfolio_context import KNOWN_ORGS, KNOWN_PROJECTS
31
  from app.models.pipeline import PipelineState
32
  from app.services.gemini_client import GeminiClient
33
- from app.core.quality import is_low_trust
34
 
35
  logger = logging.getLogger(__name__)
36
 
@@ -61,92 +43,6 @@ _SMALL_TALK_ANSWER = (
61
  "and I'll find the details for you."
62
  )
63
 
64
- # Words that reliably indicate the visitor wants a deep, cited answer.
65
- # Kept for the quality gate docstring; no longer used for routing.
66
- _COMPLEX_SIGNALS: frozenset[str] = frozenset({
67
- "how", "why", "explain", "implement", "architecture", "deep",
68
- "detail", "technical", "compare", "difference", "algorithm",
69
- "code", "example", "breakdown", "analysis", "source", "cite",
70
- "reference", "proof", "derive", "calculate", "optimise", "optimize",
71
- # Follow-up depth signals — these phrases appear in pill-generated questions
72
- # and always indicate the user wants a cited, retrieved answer not a summary.
73
- "tell me more", "more detail", "more about", "what about",
74
- "explain that", "go deeper", "expand", "elaborate", "dig into",
75
- })
76
-
77
- # Minimum token count for a query to be classified as complex.
78
- # Queries shorter than this are almost always conversational or simple
79
- # biographical lookups regardless of vocabulary. "How?" alone currently
80
- # triggers 70B without this gate; "How many projects?" should not.
81
- # Documented in copilot-instructions.md — do not lower without profiling.
82
- _COMPLEX_MIN_WORDS: int = 8
83
-
84
- # Named-entity detection: a capitalised word that is NOT at position 0.
85
- # Presence of a NE after the first word in a short query means it is asking
86
- # about something specific (a company, project, technology) that needs Qdrant.
87
- _NE_RE = re.compile(r'(?<=\s)[A-Z][a-zA-Z0-9]+')
88
-
89
- # Navigation phrases that Gemini can answer from the TOON portfolio summary.
90
- # Keep this tight — anything not in this set routes to RAG.
91
- _TRIVIAL_PHRASES: frozenset[str] = frozenset({
92
- "what is this", "what's this",
93
- "who are you", "what are you",
94
- "what can you do", "what can you help with", "what can you help me with",
95
- "what do you do",
96
- })
97
-
98
- _ENTITY_SPECIFIC_NOUNS: frozenset[str] = KNOWN_PROJECTS | KNOWN_ORGS
99
-
100
-
101
- def _is_trivial(query: str) -> bool:
102
- """
103
- True when the query is pure navigation — safe for Gemini fast-path.
104
-
105
- A query is trivial ONLY when its stripped form exactly matches a known
106
- navigation phrase from _TRIVIAL_PHRASES. All other queries — including
107
- short career/skills/internship questions — route to full RAG so they
108
- receive citations backed by Qdrant evidence, not the stale TOON summary.
109
-
110
- Removing the <4-word bypass (RC-10): queries like "his skills?" or
111
- "any internships?" previously hit the TOON fast-path and could return
112
- un-cited, outdated answers. They now always go through retrieval.
113
- """
114
- stripped = query.strip().rstrip("?!.")
115
- return stripped.lower() in _TRIVIAL_PHRASES
116
-
117
-
118
- def _is_complex(query: str) -> bool:
119
- """
120
- O(1) heuristic — true when the query signals a need for a cited answer.
121
-
122
- Used only to pick the LLM model tier (70B vs 8B), NOT for RAG routing.
123
- Routing is decided by _is_trivial(); _is_complex() is a post-routing concern.
124
-
125
- A query is complex only when BOTH conditions hold:
126
- 1. It contains a complexity-signal keyword (architecture, explain, etc.)
127
- 2. Its length exceeds _COMPLEX_MIN_WORDS (eliminates "How?" false positives)
128
- OR it is extremely long (>20 tokens, reliably indicates detailed request).
129
- """
130
- tokens = set(query.lower().split())
131
- token_count = len(tokens)
132
- if token_count > 20:
133
- return True
134
- return bool(tokens & _COMPLEX_SIGNALS) and token_count > _COMPLEX_MIN_WORDS
135
-
136
-
137
- def _is_entity_specific_portfolio_query(query: str) -> bool:
138
- tokens = re.findall(r"[a-z0-9]+", query.lower())
139
- if not tokens:
140
- return False
141
- for token in tokens:
142
- if token in _ENTITY_SPECIFIC_NOUNS:
143
- return True
144
- for a, b in zip(tokens, tokens[1:]):
145
- if f"{a} {b}" in _ENTITY_SPECIFIC_NOUNS:
146
- return True
147
- return False
148
-
149
-
150
  def make_gemini_fast_node(gemini_client: GeminiClient) -> Any:
151
  """
152
  Returns a LangGraph-compatible async node function.
@@ -187,74 +83,13 @@ def make_gemini_fast_node(gemini_client: GeminiClient) -> Any:
187
  "path": "gemini_fast",
188
  }
189
 
190
- complexity = "complex" if _is_complex(query) else "simple"
191
-
192
- # Force RAG for entity-specific portfolio queries (project/org names).
193
- # Broad intent-only phrasing (e.g., "what tech stack does he use") first
194
- # goes through Gemini fast-path and falls back to RAG if low-trust.
195
- if (
196
- not _is_trivial(query)
197
- and is_portfolio_relevant(query)
198
- and _is_entity_specific_portfolio_query(query)
199
- ):
200
- logger.debug("Non-trivial query — routing directly to RAG: %r", query[:60])
201
- writer({"type": "status", "label": "Searching portfolio..."})
202
- return {
203
- "query_complexity": complexity,
204
- "expanded_queries": [query],
205
- "thinking": False,
206
- }
207
-
208
- # When Gemini is not configured (GEMINI_API_KEY not set), route all
209
- # traffic straight to RAG — behaviour is identical to the old graph.
210
- if not gemini_client.is_configured:
211
- logger.debug("Gemini not configured; routing query to RAG.")
212
- writer({"type": "status", "label": "Needs deeper search, checking portfolio..."})
213
- return {
214
- "query_complexity": complexity,
215
- "expanded_queries": [query],
216
- "thinking": False,
217
- }
218
-
219
- answer, tool_query = await gemini_client.fast_answer(
220
- query,
221
- history=state.get("conversation_history") or [],
222
- )
223
-
224
- if answer is not None:
225
- # Run the same quality gate that guards Groq answers.
226
- if is_low_trust(answer, [], complexity):
227
- logger.debug(
228
- "Gemini fast-path answer failed quality gate — routing to RAG."
229
- )
230
- writer({"type": "status", "label": "Needs deeper search, checking portfolio..."})
231
- return {
232
- "query_complexity": complexity,
233
- "expanded_queries": [query],
234
- "thinking": True,
235
- }
236
-
237
- # Gemini answered and passed the quality gate.
238
- logger.debug("Gemini fast-path answered query (len=%d)", len(answer))
239
- writer({"type": "status", "label": "Got a direct answer, writing now..."})
240
- # Gemini does not stream; emit the complete answer as one token event.
241
- writer({"type": "token", "text": answer})
242
- return {
243
- "query_complexity": complexity,
244
- "answer": answer,
245
- "sources": [],
246
- "thinking": False,
247
- "path": "gemini_fast",
248
- }
249
-
250
- # Gemini called search_knowledge_base() — route to full RAG.
251
- rag_query = tool_query or query
252
- logger.debug("Gemini routed to RAG (tool_query=%r)", rag_query)
253
  writer({"type": "status", "label": "Needs deeper search, checking portfolio..."})
254
  return {
255
- "query_complexity": complexity,
256
- "expanded_queries": [rag_query],
257
- "thinking": True,
258
  }
259
 
260
  return gemini_fast
 
1
+ """Deterministic fast-path node used only for small-talk handling.
 
 
 
 
 
 
 
 
 
2
 
3
+ All substantive queries are routed to full RAG so answers remain retrieval-grounded
4
+ and citation-capable. No parametric Gemini answer generation is used here.
 
 
 
 
 
 
5
  """
6
  from __future__ import annotations
7
 
 
11
 
12
  from langgraph.config import get_stream_writer
13
 
 
 
14
  from app.models.pipeline import PipelineState
15
  from app.services.gemini_client import GeminiClient
 
16
 
17
  logger = logging.getLogger(__name__)
18
 
 
43
  "and I'll find the details for you."
44
  )
45
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  def make_gemini_fast_node(gemini_client: GeminiClient) -> Any:
47
  """
48
  Returns a LangGraph-compatible async node function.
 
83
  "path": "gemini_fast",
84
  }
85
 
86
+ # Substantive queries are always retrieval-grounded.
87
+ logger.debug("Routing query to RAG path from deterministic fast node.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  writer({"type": "status", "label": "Needs deeper search, checking portfolio..."})
89
  return {
90
+ "query_complexity": "simple",
91
+ "expanded_queries": [query],
92
+ "thinking": False,
93
  }
94
 
95
  return gemini_fast
app/pipeline/nodes/generate.py CHANGED
@@ -8,7 +8,6 @@ from langgraph.config import get_stream_writer
8
 
9
  from app.models.chat import SourceRef
10
  from app.models.pipeline import PipelineState
11
- from app.core.portfolio_context import SUGGESTION_HINT
12
  from app.services.llm_client import LLMClient
13
  from app.core.quality import is_low_trust
14
  logger = logging.getLogger(__name__)
@@ -19,7 +18,10 @@ logger = logging.getLogger(__name__)
19
  _THINK_COMPLETE_RE = re.compile(r"<think>[\s\S]*?</think>", re.DOTALL)
20
  _THINK_OPEN_RE = re.compile(r"<think>")
21
  _THINK_CLOSE_RE = re.compile(r"</think>")
22
- _GEN_HTML_TAG_RE = re.compile(r"</?[a-zA-Z][^>]*>")
 
 
 
23
  _VERSION_PARITY_RE = re.compile(r"\b(up[- ]?to[- ]?date|latest|current|in sync|same version|version)\b", re.IGNORECASE)
24
  _WORD_RE = re.compile(r"[a-zA-Z0-9]+")
25
 
@@ -163,6 +165,8 @@ def _format_history(state: "PipelineState") -> str:
163
  history = state.get("conversation_history") or []
164
  if not history:
165
  return ""
 
 
166
  lines = [
167
  f"[T{i + 1}] Q: {t['q']} | A: {t['a']}"
168
  for i, t in enumerate(history)
@@ -330,54 +334,19 @@ def _build_low_trust_fallback(query: str, source_refs: list[SourceRef]) -> str:
330
  if not source_refs:
331
  return _NOT_FOUND_ANSWER
332
 
333
- first = source_refs[0]
334
- title = first.title or "the retrieved source"
335
-
336
  if _VERSION_PARITY_RE.search(query):
337
  return (
338
- f"The retrieved sources confirm links/details for {title} [1], but they do not explicitly "
339
  "confirm whether the GitHub code and live demo are currently in sync, so version parity "
340
- "cannot be verified from the indexed content alone [1]."
341
  )
342
 
343
  return (
344
- f"Based on the retrieved evidence, the answer is grounded in {title} [1]. "
345
- "If you want deeper detail, ask for a specific section, implementation part, or comparison."
346
  )
347
 
348
 
349
- def _select_chunks_for_prompt(query: str, reranked_chunks: list[dict]) -> list[dict]:
350
- """
351
- Prefer chunks whose source title is explicitly referenced in the query.
352
-
353
- This prevents focused questions (e.g. one project) from receiving multi-project
354
- blended context that can trigger verbose, low-quality comparison answers.
355
- """
356
- if not reranked_chunks:
357
- return reranked_chunks
358
-
359
- query_lower = query.lower()
360
- focused: list[dict] = []
361
-
362
- for chunk in reranked_chunks:
363
- title = str(chunk["metadata"].get("source_title", "")).strip()
364
- if not title:
365
- continue
366
- title_lower = title.lower()
367
- if len(title_lower) >= 4 and title_lower in query_lower:
368
- focused.append(chunk)
369
- continue
370
-
371
- title_tokens = [t for t in _WORD_RE.findall(title_lower) if len(t) >= 4]
372
- if title_tokens and sum(1 for tok in title_tokens if tok in query_lower) >= min(2, len(title_tokens)):
373
- focused.append(chunk)
374
-
375
- if focused:
376
- return focused[:6]
377
-
378
- return reranked_chunks[:8]
379
-
380
-
381
  def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[PipelineState], dict]: # noqa: ANN001
382
  # Number of token chunks to buffer before deciding there is no CoT block.
383
  # Llama 3.1 8B may omit <think> entirely; Llama 3.3 70B always starts with one.
@@ -389,6 +358,9 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
389
  query = state["query"]
390
  complexity = state.get("query_complexity", "simple")
391
  reranked_chunks = state.get("reranked_chunks", [])
 
 
 
392
 
393
  # ── Enumeration path (Fix 1) ──────────────────────────────────────────────
394
  # enumerate_query node already set is_enumeration_query=True and populated
@@ -433,23 +405,8 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
433
  # ── Not-found path ────────────────────────────────────────────────────────────
434
  if not reranked_chunks:
435
  writer({"type": "status", "label": "Could not find specific information, responding carefully..."})
436
-
437
- # Fix 2 Rule 2: generate a specific, topical redirect suggestion using
438
- # Gemini with the TOON portfolio entity list. Fires here after CRAG
439
- # retries are exhausted so the user gets contextual guidance.
440
- query_topic = state.get("query_topic") or "this topic"
441
- not_found_answer = _NOT_FOUND_ANSWER
442
- if gemini_client is not None and getattr(gemini_client, "is_configured", False):
443
- suggestion = await gemini_client.generate_specific_suggestion(
444
- query=query,
445
- query_topic=query_topic,
446
- suggestion_hint=SUGGESTION_HINT,
447
- )
448
- if suggestion:
449
- not_found_answer = suggestion
450
-
451
- writer({"type": "token", "text": not_found_answer})
452
- return {"answer": not_found_answer, "sources": [], "path": "rag"}
453
 
454
  # ── Build numbered context block ────────────────────────────────────
455
  # Merge chunks from the same source URL first so every [N] in the prompt
@@ -457,7 +414,7 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
457
  # TextOps become [1] and [2] — the LLM cites both in the same sentence,
458
  # which looks like self-citing hallucination even though it is technically
459
  # correct. _merge_by_source preserves all text; nothing is discarded.
460
- selected_chunks = _select_chunks_for_prompt(query, reranked_chunks)
461
  merged_chunks = _merge_by_source(selected_chunks)
462
  context_parts: list[str] = []
463
  source_refs: list[SourceRef] = []
@@ -578,7 +535,8 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
578
  if clean_buf:
579
  writer({"type": "token", "text": clean_buf})
580
  except (Exception, asyncio.CancelledError) as exc:
581
- logger.warning("Generation stream interrupted: %s / Returning partial answer.", exc)
 
582
  # Flush whatever we have if the stream was cut
583
  if buf:
584
  clean_buf = _strip_think_tags(buf)
@@ -594,15 +552,6 @@ def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[P
594
  # It handles complete pairs, orphaned opens, and orphaned closes.
595
  full_answer = _strip_think_tags(raw_answer)
596
 
597
- # ── Quality gate: Gemini editorial reformat ──────────────────────────
598
- # Fires when: (a) criticism detected — always reformat, or
599
- # (b) low-trust heuristic flags the draft. Zero extra cost on good responses.
600
- if gemini_client is not None and (is_criticism or is_low_trust(full_answer, reranked_chunks, complexity)):
601
- logger.debug("Triggering Gemini reformat (criticism=%s).", is_criticism)
602
- reformatted = await gemini_client.reformat_rag_answer(query, context_block, full_answer)
603
- if reformatted:
604
- full_answer = reformatted
605
-
606
  full_answer = _normalise_answer_text(full_answer, max_citation_index=len(source_refs))
607
 
608
  # Final guardrail: if answer still looks low-trust after reformat + cleanup,
 
8
 
9
  from app.models.chat import SourceRef
10
  from app.models.pipeline import PipelineState
 
11
  from app.services.llm_client import LLMClient
12
  from app.core.quality import is_low_trust
13
  logger = logging.getLogger(__name__)
 
18
  _THINK_COMPLETE_RE = re.compile(r"<think>[\s\S]*?</think>", re.DOTALL)
19
  _THINK_OPEN_RE = re.compile(r"<think>")
20
  _THINK_CLOSE_RE = re.compile(r"</think>")
21
+ _GEN_HTML_TAG_RE = re.compile(
22
+ r"</?(?:div|span|p|br|ul|ol|li|strong|em|code|pre|h[1-6]|a|section|article|header|footer|main|aside|blockquote|table|thead|tbody|tr|td|th|img)[^>]*>",
23
+ re.IGNORECASE,
24
+ )
25
  _VERSION_PARITY_RE = re.compile(r"\b(up[- ]?to[- ]?date|latest|current|in sync|same version|version)\b", re.IGNORECASE)
26
  _WORD_RE = re.compile(r"[a-zA-Z0-9]+")
27
 
 
165
  history = state.get("conversation_history") or []
166
  if not history:
167
  return ""
168
+ # Keep fallback prompt context bounded for clarity and token predictability.
169
+ history = history[-3:]
170
  lines = [
171
  f"[T{i + 1}] Q: {t['q']} | A: {t['a']}"
172
  for i, t in enumerate(history)
 
334
  if not source_refs:
335
  return _NOT_FOUND_ANSWER
336
 
 
 
 
337
  if _VERSION_PARITY_RE.search(query):
338
  return (
339
+ "The indexed sources include related details [1], but they do not explicitly "
340
  "confirm whether the GitHub code and live demo are currently in sync, so version parity "
341
+ "cannot be verified from indexed content alone [1]."
342
  )
343
 
344
  return (
345
+ "I couldn't find enough specific information to answer that confidently. "
346
+ "Try asking about a specific project, blog post, skill, or company."
347
  )
348
 
349
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
350
  def make_generate_node(llm_client: LLMClient, gemini_client=None) -> Callable[[PipelineState], dict]: # noqa: ANN001
351
  # Number of token chunks to buffer before deciding there is no CoT block.
352
  # Llama 3.1 8B may omit <think> entirely; Llama 3.3 70B always starts with one.
 
358
  query = state["query"]
359
  complexity = state.get("query_complexity", "simple")
360
  reranked_chunks = state.get("reranked_chunks", [])
361
+ if len(reranked_chunks) > 12:
362
+ logger.warning("generate: unusually large reranked chunk set (%d); truncating to 12.", len(reranked_chunks))
363
+ reranked_chunks = reranked_chunks[:12]
364
 
365
  # ── Enumeration path (Fix 1) ──────────────────────────────────────────────
366
  # enumerate_query node already set is_enumeration_query=True and populated
 
405
  # ── Not-found path ────────────────────────────────────────────────────────────
406
  if not reranked_chunks:
407
  writer({"type": "status", "label": "Could not find specific information, responding carefully..."})
408
+ writer({"type": "token", "text": _NOT_FOUND_ANSWER})
409
+ return {"answer": _NOT_FOUND_ANSWER, "sources": [], "path": "rag"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
410
 
411
  # ── Build numbered context block ────────────────────────────────────
412
  # Merge chunks from the same source URL first so every [N] in the prompt
 
414
  # TextOps become [1] and [2] — the LLM cites both in the same sentence,
415
  # which looks like self-citing hallucination even though it is technically
416
  # correct. _merge_by_source preserves all text; nothing is discarded.
417
+ selected_chunks = reranked_chunks[:8]
418
  merged_chunks = _merge_by_source(selected_chunks)
419
  context_parts: list[str] = []
420
  source_refs: list[SourceRef] = []
 
535
  if clean_buf:
536
  writer({"type": "token", "text": clean_buf})
537
  except (Exception, asyncio.CancelledError) as exc:
538
+ logger.warning("Generation stream interrupted (%s: %s) / Returning partial answer.", type(exc).__name__, exc)
539
+ writer({"type": "status", "label": "Stream interrupted; finalizing partial answer..."})
540
  # Flush whatever we have if the stream was cut
541
  if buf:
542
  clean_buf = _strip_think_tags(buf)
 
552
  # It handles complete pairs, orphaned opens, and orphaned closes.
553
  full_answer = _strip_think_tags(raw_answer)
554
 
 
 
 
 
 
 
 
 
 
555
  full_answer = _normalise_answer_text(full_answer, max_citation_index=len(source_refs))
556
 
557
  # Final guardrail: if answer still looks low-trust after reformat + cleanup,
app/pipeline/nodes/log_eval.py CHANGED
@@ -38,10 +38,15 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
38
  rerank_scores = json.dumps(
39
  [c["metadata"].get("rerank_score", 0.0) for c in state.get("reranked_chunks", [])]
40
  )
41
- # Store chunk texts verbatim so data_prep.py can build reranker triplets
42
- # without needing to re-fetch from Qdrant.
43
  reranked_chunks_json = json.dumps(
44
- [{"text": c["text"], "doc_id": c["metadata"]["doc_id"]}
 
 
 
 
 
45
  for c in state.get("reranked_chunks", [])]
46
  )
47
  path = state.get("path") or "rag"
@@ -140,7 +145,12 @@ def make_log_eval_node(db_path: str, github_log=None) -> Callable[[PipelineState
140
  json.dumps([c["metadata"]["doc_id"] for c in state.get("reranked_chunks", [])])
141
  ),
142
  "reranked_chunks_json": [
143
- {"text": c["text"], "doc_id": c["metadata"]["doc_id"]}
 
 
 
 
 
144
  for c in state.get("reranked_chunks", [])
145
  ],
146
  "rerank_scores": [
 
38
  rerank_scores = json.dumps(
39
  [c["metadata"].get("rerank_score", 0.0) for c in state.get("reranked_chunks", [])]
40
  )
41
+ # Store only retrieval metadata to reduce PII exposure in interaction logs.
42
+ # Full text can be reconstructed on-demand by doc_id from Qdrant if needed.
43
  reranked_chunks_json = json.dumps(
44
+ [{
45
+ "doc_id": c["metadata"].get("doc_id", ""),
46
+ "source_title": c["metadata"].get("source_title", ""),
47
+ "source_type": c["metadata"].get("source_type", ""),
48
+ "section": c["metadata"].get("section", ""),
49
+ }
50
  for c in state.get("reranked_chunks", [])]
51
  )
52
  path = state.get("path") or "rag"
 
145
  json.dumps([c["metadata"]["doc_id"] for c in state.get("reranked_chunks", [])])
146
  ),
147
  "reranked_chunks_json": [
148
+ {
149
+ "doc_id": c["metadata"].get("doc_id", ""),
150
+ "source_title": c["metadata"].get("source_title", ""),
151
+ "source_type": c["metadata"].get("source_type", ""),
152
+ "section": c["metadata"].get("section", ""),
153
+ }
154
  for c in state.get("reranked_chunks", [])
155
  ],
156
  "rerank_scores": [
app/pipeline/nodes/retrieve.py CHANGED
@@ -97,6 +97,29 @@ _CAPABILITY_QUERY_HINTS: frozenset[str] = frozenset(
97
  }
98
  )
99
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
100
  _NORMALISATION_STOPWORDS: frozenset[str] = frozenset(
101
  {
102
  "tell",
@@ -242,6 +265,11 @@ def _is_capability_query(query: str) -> bool:
242
  return bool(tokens & _CAPABILITY_QUERY_HINTS)
243
 
244
 
 
 
 
 
 
245
  def _is_informative_chunk(chunk: Chunk) -> bool:
246
  """True when chunk text has enough lexical content for cross-encoder reranking."""
247
  text = (chunk.get("contextualised_text") or chunk["text"] or "").strip()
@@ -505,6 +533,23 @@ def make_retrieve_node(
505
  if not rerank_candidates:
506
  rerank_candidates = unique_chunks
507
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
508
  try:
509
  reranked = await reranker.rerank(retrieval_query, rerank_candidates, top_k=10) # RC-5: raised from 7
510
  except (Exception, asyncio.CancelledError) as exc:
 
97
  }
98
  )
99
 
100
+ _BIOGRAPHY_QUERY_HINTS: frozenset[str] = frozenset(
101
+ {
102
+ "work",
103
+ "experience",
104
+ "employment",
105
+ "career",
106
+ "internship",
107
+ "internships",
108
+ "education",
109
+ "degree",
110
+ "university",
111
+ "background",
112
+ "resume",
113
+ "cv",
114
+ "company",
115
+ "companies",
116
+ "role",
117
+ "roles",
118
+ }
119
+ )
120
+
121
+ _BIO_SOURCE_TYPES: frozenset[str] = frozenset({"resume", "cv", "bio"})
122
+
123
  _NORMALISATION_STOPWORDS: frozenset[str] = frozenset(
124
  {
125
  "tell",
 
265
  return bool(tokens & _CAPABILITY_QUERY_HINTS)
266
 
267
 
268
+ def _is_biography_query(query: str) -> bool:
269
+ tokens = frozenset(re.findall(r"[a-z0-9]+", query.lower()))
270
+ return bool(tokens & _BIOGRAPHY_QUERY_HINTS)
271
+
272
+
273
  def _is_informative_chunk(chunk: Chunk) -> bool:
274
  """True when chunk text has enough lexical content for cross-encoder reranking."""
275
  text = (chunk.get("contextualised_text") or chunk["text"] or "").strip()
 
533
  if not rerank_candidates:
534
  rerank_candidates = unique_chunks
535
 
536
+ # Biography-focused queries should prioritize resume/CV evidence and avoid
537
+ # project/blog code passages crowding out personal background facts.
538
+ if _is_biography_query(retrieval_query):
539
+ bio_candidates = [
540
+ chunk
541
+ for chunk in rerank_candidates
542
+ if chunk["metadata"].get("source_type", "") in _BIO_SOURCE_TYPES
543
+ ]
544
+ if bio_candidates:
545
+ rerank_candidates = bio_candidates
546
+ writer(
547
+ {
548
+ "type": "status",
549
+ "label": "Prioritizing resume and background sources...",
550
+ }
551
+ )
552
+
553
  try:
554
  reranked = await reranker.rerank(retrieval_query, rerank_candidates, top_k=10) # RC-5: raised from 7
555
  except (Exception, asyncio.CancelledError) as exc:
tests/test_generate_focus_selection.py CHANGED
@@ -1,37 +1,36 @@
1
- from app.pipeline.nodes.generate import _select_chunks_for_prompt
2
-
3
-
4
- def _chunk(title: str, section: str = "Overview") -> dict:
5
- return {
6
- "text": f"Info about {title}",
7
- "metadata": {
8
- "source_title": title,
9
- "section": section,
10
- "source_url": "",
11
- "source_type": "project",
12
- },
13
- }
14
 
15
 
16
- def test_select_chunks_prefers_explicitly_mentioned_source_title() -> None:
17
- chunks = [
18
- _chunk("Sorting Demo"),
19
- _chunk("EchoEcho"),
20
- _chunk("Donut.asm"),
21
- ]
 
 
22
 
23
- selected = _select_chunks_for_prompt(
24
- "Is the source code on GitHub up-to-date with the Sorting Demo live demo?",
25
- chunks,
26
- )
27
 
28
- assert selected
29
- assert all(c["metadata"]["source_title"] == "Sorting Demo" for c in selected)
 
30
 
31
 
32
- def test_select_chunks_falls_back_to_top_ranked_when_no_title_match() -> None:
33
- chunks = [_chunk("A"), _chunk("B"), _chunk("C")]
 
 
 
 
 
 
 
 
34
 
35
- selected = _select_chunks_for_prompt("What technologies are used?", chunks)
36
 
37
- assert selected == chunks
 
 
 
 
1
+ from app.pipeline.nodes.generate import _format_history
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
 
4
+ def test_format_history_uses_summary_when_present() -> None:
5
+ state = {
6
+ "conversation_summary": "User asked about PersonaBot architecture and caching.",
7
+ "conversation_history": [
8
+ {"q": "Q1", "a": "A1"},
9
+ {"q": "Q2", "a": "A2"},
10
+ ],
11
+ }
12
 
13
+ rendered = _format_history(state)
 
 
 
14
 
15
+ assert "Running conversation context:" in rendered
16
+ assert "PersonaBot architecture" in rendered
17
+ assert "Q1" not in rendered
18
 
19
 
20
+ def test_format_history_caps_raw_history_to_three_turns() -> None:
21
+ state = {
22
+ "conversation_summary": None,
23
+ "conversation_history": [
24
+ {"q": "Q1", "a": "A1"},
25
+ {"q": "Q2", "a": "A2"},
26
+ {"q": "Q3", "a": "A3"},
27
+ {"q": "Q4", "a": "A4"},
28
+ ],
29
+ }
30
 
31
+ rendered = _format_history(state)
32
 
33
+ assert "Q1" not in rendered
34
+ assert "Q2" in rendered
35
+ assert "Q3" in rendered
36
+ assert "Q4" in rendered
tests/test_generate_not_found_fallback.py CHANGED
@@ -8,14 +8,14 @@ _WRITER_PATCH = "app.pipeline.nodes.gemini_fast.get_stream_writer"
8
 
9
 
10
  @pytest.mark.asyncio
11
- async def test_non_portfolio_conversational_query_uses_gemini_answer() -> None:
12
  gemini = MagicMock()
13
  gemini.is_configured = True
14
- gemini.fast_answer = AsyncMock(return_value=("Hey there!", None))
15
 
16
  node = make_gemini_fast_node(gemini)
17
  state = {
18
- "query": "I said hello.",
19
  "is_followup": False,
20
  "conversation_history": [],
21
  }
@@ -23,9 +23,9 @@ async def test_non_portfolio_conversational_query_uses_gemini_answer() -> None:
23
  with patch(_WRITER_PATCH, return_value=MagicMock()):
24
  result = await node(state)
25
 
26
- assert result["answer"] == "Hey there!"
27
  assert result["path"] == "gemini_fast"
28
- gemini.fast_answer.assert_awaited_once()
29
 
30
 
31
  @pytest.mark.asyncio
@@ -48,22 +48,23 @@ async def test_portfolio_specific_query_forces_rag() -> None:
48
  assert result["expanded_queries"] == ["How does TextOps work?"]
49
  gemini.fast_answer.assert_not_awaited()
50
 
51
- @pytest.mark.asyncio
52
- async def test_broad_portfolio_intent_can_use_gemini_fast_path() -> None:
53
- gemini = MagicMock()
54
- gemini.is_configured = True
55
- gemini.fast_answer = AsyncMock(return_value=("He uses a broad stack.", None))
56
-
57
- node = make_gemini_fast_node(gemini)
58
- state = {
59
- "query": "What tech stack does he use?",
60
- "is_followup": False,
61
- "conversation_history": [],
62
- }
63
-
64
- with patch(_WRITER_PATCH, return_value=MagicMock()):
65
- result = await node(state)
66
-
67
- assert result["answer"] == "He uses a broad stack."
68
- assert result["path"] == "gemini_fast"
69
- gemini.fast_answer.assert_awaited_once()
 
 
8
 
9
 
10
  @pytest.mark.asyncio
11
+ async def test_small_talk_returns_deterministic_fast_path_answer() -> None:
12
  gemini = MagicMock()
13
  gemini.is_configured = True
14
+ gemini.fast_answer = AsyncMock(return_value=("unused", None))
15
 
16
  node = make_gemini_fast_node(gemini)
17
  state = {
18
+ "query": "Hi",
19
  "is_followup": False,
20
  "conversation_history": [],
21
  }
 
23
  with patch(_WRITER_PATCH, return_value=MagicMock()):
24
  result = await node(state)
25
 
26
+ assert "Darshan's portfolio assistant" in result["answer"]
27
  assert result["path"] == "gemini_fast"
28
+ gemini.fast_answer.assert_not_awaited()
29
 
30
 
31
  @pytest.mark.asyncio
 
48
  assert result["expanded_queries"] == ["How does TextOps work?"]
49
  gemini.fast_answer.assert_not_awaited()
50
 
51
+
52
+ @pytest.mark.asyncio
53
+ async def test_non_small_talk_routes_to_rag() -> None:
54
+ gemini = MagicMock()
55
+ gemini.is_configured = True
56
+ gemini.fast_answer = AsyncMock(return_value=("unused", None))
57
+
58
+ node = make_gemini_fast_node(gemini)
59
+ state = {
60
+ "query": "What tech stack does he use?",
61
+ "is_followup": False,
62
+ "conversation_history": [],
63
+ }
64
+
65
+ with patch(_WRITER_PATCH, return_value=MagicMock()):
66
+ result = await node(state)
67
+
68
+ assert "answer" not in result
69
+ assert result["expanded_queries"] == ["What tech stack does he use?"]
70
+ gemini.fast_answer.assert_not_awaited()
tests/test_generate_quality_fallback.py CHANGED
@@ -36,5 +36,5 @@ def test_low_trust_fallback_general_query_is_concise() -> None:
36
  sources,
37
  )
38
 
39
- assert "Sorting Demo" in answer
40
- assert "[1]" in answer
 
36
  sources,
37
  )
38
 
39
+ assert "specific project" in answer.lower()
40
+ assert "sorting demo" not in answer.lower()
tests/test_log_eval_privacy.py ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import sqlite3
3
+
4
+ import pytest
5
+
6
+ from app.pipeline.nodes.log_eval import make_log_eval_node
7
+
8
+
9
+ @pytest.mark.asyncio
10
+ async def test_log_eval_stores_chunk_metadata_without_text(tmp_path) -> None:
11
+ db_path = str(tmp_path / "interactions.db")
12
+ node = make_log_eval_node(db_path)
13
+
14
+ state = {
15
+ "session_id": "s1",
16
+ "query": "What work experience does Darshan have?",
17
+ "answer": "He worked at VK Live.",
18
+ "reranked_chunks": [
19
+ {
20
+ "text": "Phone +44 7818 975908 and email someone@example.com",
21
+ "metadata": {
22
+ "doc_id": "resume-rag",
23
+ "source_title": "Resume",
24
+ "source_type": "resume",
25
+ "section": "Work Experience",
26
+ "rerank_score": 0.9,
27
+ },
28
+ }
29
+ ],
30
+ "latency_ms": 123,
31
+ "cached": False,
32
+ "path": "rag",
33
+ "is_enumeration_query": False,
34
+ }
35
+
36
+ await node(state)
37
+
38
+ with sqlite3.connect(db_path) as conn:
39
+ row = conn.execute("SELECT reranked_chunks_json FROM interactions LIMIT 1").fetchone()
40
+
41
+ assert row is not None
42
+ payload = json.loads(row[0])
43
+ assert payload and payload[0]["doc_id"] == "resume-rag"
44
+ assert payload[0]["source_title"] == "Resume"
45
+ assert "text" not in payload[0]