mtyrrell commited on
Commit
99fffc4
·
1 Parent(s): aa7f532

cleanup of all cache-related code/comments; README revision

Browse files
Files changed (3) hide show
  1. README.md +11 -72
  2. app/main.py +3 -13
  3. app/nodes.py +5 -69
README.md CHANGED
@@ -95,29 +95,17 @@ The orchestrator implements a dual-mode workflow designed to handle non-standard
95
 
96
  #### Mode 1: Direct Output (DIRECT_OUTPUT = True)
97
 
98
- **Purpose:** Immediately return long-running ingestor results to the user without LLM processing, then use those results as context for follow-up questions.
99
 
100
- **First File Upload:**
101
  ```
102
  File Upload → Detect Type → Direct Output Ingest → Return Raw Results
103
-
104
- Cache Result (by file hash)
105
- ```
106
-
107
- **Subsequent Conversation Turns:**
108
- ```
109
- Follow-up Query → Detect Cached File → Retrieved Context → Combined Context → Generator
110
-
111
- Use Cached Ingestor Output as Context
112
  ```
113
 
114
  **Key Behaviors:**
115
- - First upload returns raw ingestor output immediately (no LLM generation)
116
- - File content is hashed (SHA256) for deduplication
117
- - Ingestor results are cached with the file hash
118
- - All follow-up queries in the conversation use the cached ingestor output as retrieval context
119
-
120
- **Noteable Unintuitive Behavior:** Once the file is cached on the Orchestrator, re-uploading the same file (even with different filename in a different chat) skips re-processing.
121
 
122
  **Example Conversation Flow:**
123
  ```
@@ -125,15 +113,15 @@ User: [Uploads plot_boundaries.geojson]
125
  System: [Returns API analysis results directly - no LLM processing]
126
 
127
  User: "What deforestation risks were identified?"
128
- System: [Uses cached GeoJSON results + retrievalLLM generation]
129
 
130
  User: "How does this compare to EUDR requirements?"
131
- System: [Uses same cached results + conversation history + retrieval LLM generation]
132
  ```
133
 
134
  #### Mode 2: Standard RAG (DIRECT_OUTPUT = False)
135
 
136
- **Purpose:** Traditional RAG pipeline where uploaded files are treated as additional context for generation from first instance.
137
 
138
  **Every Query (with or without file):**
139
  ```
@@ -146,7 +134,6 @@ Query + Optional File → Detect Type → Ingest → Retrieved Context → Combi
146
  - Files are processed through ingestor when uploaded
147
  - Ingestor output is added to the retrieval context (not returned directly)
148
  - Generator always processes the combined context (ingestor + retriever)
149
- - No special caching or deduplication logic
150
 
151
  **Example Conversation Flow:**
152
  ```
@@ -157,29 +144,6 @@ User: "Summarize section 3"
157
  System: [Retrieval → Combined Context → Generator]
158
  ```
159
 
160
- ### File Hash Caching Mechanism
161
-
162
- The orchestrator uses SHA256 hashing to detect duplicate file uploads:
163
-
164
- **Cache Structure:**
165
- ```python
166
- {
167
- "a3f5c91...": {
168
- "ingestor_context": "API results...",
169
- "timestamp": "2025-10-02T14:30:00",
170
- "filename": "boundaries.geojson",
171
- "file_type": "geojson"
172
- }
173
- }
174
- ```
175
-
176
- **Detection Logic:**
177
- 1. File is uploaded
178
- 2. Compute SHA256 hash of file content
179
- 3. Check if hash exists in cache
180
- 4. If not found: Process through ingestor, cache results
181
- 5. If found: Use cached results (Skip Ingestion → Retrieved Context → Combined Context → Generator)
182
-
183
 
184
  ### Conversation Context Management
185
 
@@ -196,7 +160,7 @@ The system maintains conversation history separately from file processing with a
196
  - Ensures relevant document retrieval based on current question
197
 
198
  **Generation Context:**
199
- - Combines: Conversation history + Retrieved context + Cached file results
200
  - Generator uses full context to produce coherent, contextually-aware responses
201
 
202
 
@@ -207,7 +171,6 @@ The system maintains conversation history separately from file processing with a
207
  - LangServe endpoints for ChatUI integration
208
  - Gradio web interface for testing
209
  - FastAPI endpoints for diagnostics and future use (e.g. /health)
210
- - Cache management endpoint (for direct output use cases)
211
 
212
  **Key Functions:**
213
  - `chatui_adapter()`: Handles text-only queries
@@ -230,8 +193,6 @@ LangGraph nodes that implement the processing pipeline:
230
  **Helper Functions:**
231
 
232
  - `process_query_streaming()`: Unified streaming interface
233
- - `compute_file_hash()`: SHA256 hashing for deduplication
234
- - `clear_direct_output_cache()`: Cache management
235
 
236
  ### 3. Data Models (`models.py`)
237
 
@@ -567,19 +528,6 @@ Content-Type: application/json
567
  }
568
  ```
569
 
570
- #### Clear Cache
571
- ```
572
- POST /clear-cache
573
- ```
574
- Clears the direct output file cache.
575
-
576
- **Response:**
577
- ```json
578
- {
579
- "status": "cache cleared"
580
- }
581
- ```
582
-
583
  ### Gradio Interface
584
 
585
  #### Interactive Query
@@ -612,16 +560,7 @@ Gradio's default API endpoint for UI interactions. If running on huggingface spa
612
  - Consider enabling `DIRECT_OUTPUT` for suitable file types
613
  - Check logs for retrieval/generation bottlenecks
614
 
615
- #### 3. Cache Not Clearing
616
-
617
- **Symptoms:** Same file shows cached results when it shouldn't
618
-
619
- **Solutions:**
620
- - Call `/clear-cache` endpoint
621
- - Restart the service (clears in-memory cache)
622
- - Check if `DIRECT_OUTPUT=True` in config
623
-
624
- #### 4. Service Connection Errors
625
 
626
  **Symptoms:** "Connection refused" or timeout errors
627
 
@@ -635,7 +574,7 @@ Gradio's default API endpoint for UI interactions. If running on huggingface spa
635
  ### Version History
636
 
637
  - **v1.0.0**: Initial release with LangGraph orchestration
638
- - Current implementation supports streaming, caching, and dual-mode processing
639
 
640
  ---
641
 
 
95
 
96
  #### Mode 1: Direct Output (DIRECT_OUTPUT = True)
97
 
98
+ **Purpose:** Immediately return long-running ingestor results to the user without Generator (LLM) processing. Results are maintained in message history context for follow-up questions.
99
 
100
+ **File Upload:**
101
  ```
102
  File Upload → Detect Type → Direct Output Ingest → Return Raw Results
 
 
 
 
 
 
 
 
 
103
  ```
104
 
105
  **Key Behaviors:**
106
+ - File uploads return raw ingestor output immediately (no LLM generation)
107
+ - Each file upload is processed through the ingestor
108
+ - Suitable for immediate analysis results (e.g., Whisp API responses)
 
 
 
109
 
110
  **Example Conversation Flow:**
111
  ```
 
113
  System: [Returns API analysis results directly - no LLM processing]
114
 
115
  User: "What deforestation risks were identified?"
116
+ System: [Conversation history + RetrievalGenerator - processes as standard query]
117
 
118
  User: "How does this compare to EUDR requirements?"
119
+ System: [Conversation history + Retrieval Generator - processes as standard query]
120
  ```
121
 
122
  #### Mode 2: Standard RAG (DIRECT_OUTPUT = False)
123
 
124
+ **Purpose:** Traditional RAG pipeline where uploaded files are treated as additional context for query-based generation from first instance.
125
 
126
  **Every Query (with or without file):**
127
  ```
 
134
  - Files are processed through ingestor when uploaded
135
  - Ingestor output is added to the retrieval context (not returned directly)
136
  - Generator always processes the combined context (ingestor + retriever)
 
137
 
138
  **Example Conversation Flow:**
139
  ```
 
144
  System: [Retrieval → Combined Context → Generator]
145
  ```
146
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
 
148
  ### Conversation Context Management
149
 
 
160
  - Ensures relevant document retrieval based on current question
161
 
162
  **Generation Context:**
163
+ - Combines: Conversation history + Retrieved context + File ingestor results (if present)
164
  - Generator uses full context to produce coherent, contextually-aware responses
165
 
166
 
 
171
  - LangServe endpoints for ChatUI integration
172
  - Gradio web interface for testing
173
  - FastAPI endpoints for diagnostics and future use (e.g. /health)
 
174
 
175
  **Key Functions:**
176
  - `chatui_adapter()`: Handles text-only queries
 
193
  **Helper Functions:**
194
 
195
  - `process_query_streaming()`: Unified streaming interface
 
 
196
 
197
  ### 3. Data Models (`models.py`)
198
 
 
528
  }
529
  ```
530
 
 
 
 
 
 
 
 
 
 
 
 
 
 
531
  ### Gradio Interface
532
 
533
  #### Interactive Query
 
560
  - Consider enabling `DIRECT_OUTPUT` for suitable file types
561
  - Check logs for retrieval/generation bottlenecks
562
 
563
+ #### 3. Service Connection Errors
 
 
 
 
 
 
 
 
 
564
 
565
  **Symptoms:** "Connection refused" or timeout errors
566
 
 
574
  ### Version History
575
 
576
  - **v1.0.0**: Initial release with LangGraph orchestration
577
+ - Current implementation supports streaming and dual-mode processing
578
 
579
  ---
580
 
app/main.py CHANGED
@@ -40,14 +40,13 @@ workflow.add_node("direct_output", direct_output_node)
40
  workflow.add_node("retrieve", retrieve_node)
41
  workflow.add_node("generate", generate_node_streaming)
42
 
43
- # Simple linear path - node logic handles caching and routing
44
  workflow.add_edge(START, "detect_file_type")
45
  workflow.add_edge("detect_file_type", "ingest")
46
 
47
  # Route after ingestion based on direct output mode
48
- # Direct output files route to direct_output only on FIRST upload
49
- # This allows for follow-up queries to go through the full pipeline
50
- # Cached direct output files route to standard (retrieve + generate)
51
  workflow.add_conditional_edges(
52
  "ingest",
53
  route_workflow,
@@ -264,20 +263,11 @@ async def root():
264
  "health": "/health",
265
  "chatfed-ui-stream": "/chatfed-ui-stream (LangServe)",
266
  "chatfed-with-file-stream": "/chatfed-with-file-stream (LangServe)",
267
- "clear-cache": "/clear-cache (Clear direct output cache)",
268
  "gradio": "/gradio"
269
  }
270
  }
271
 
272
 
273
- @app.post("/clear-cache")
274
- async def clear_cache():
275
- """Clear the direct output file cache"""
276
- from nodes import clear_direct_output_cache
277
- clear_direct_output_cache()
278
- return {"status": "cache cleared"}
279
-
280
-
281
  #----------------------------------------
282
  # LANGSERVE ROUTES - endpoints for ChatUI
283
  #----------------------------------------
 
40
  workflow.add_node("retrieve", retrieve_node)
41
  workflow.add_node("generate", generate_node_streaming)
42
 
43
+ # Simple linear path - node logic handles routing
44
  workflow.add_edge(START, "detect_file_type")
45
  workflow.add_edge("detect_file_type", "ingest")
46
 
47
  # Route after ingestion based on direct output mode
48
+ # Direct output mode routes to direct_output (return ingestor results)
49
+ # Standard mode routes to retrieve + generate (full RAG pipeline)
 
50
  workflow.add_conditional_edges(
51
  "ingest",
52
  route_workflow,
 
263
  "health": "/health",
264
  "chatfed-ui-stream": "/chatfed-ui-stream (LangServe)",
265
  "chatfed-with-file-stream": "/chatfed-with-file-stream (LangServe)",
 
266
  "gradio": "/gradio"
267
  }
268
  }
269
 
270
 
 
 
 
 
 
 
 
 
271
  #----------------------------------------
272
  # LANGSERVE ROUTES - endpoints for ChatUI
273
  #----------------------------------------
app/nodes.py CHANGED
@@ -1,6 +1,5 @@
1
  import tempfile
2
  import os
3
- import hashlib
4
  from models import GraphState
5
  from datetime import datetime
6
  from gradio_client import Client, file
@@ -8,7 +7,7 @@ import logging
8
  import dotenv
9
  import httpx
10
  import json
11
- from typing import Generator, Optional, Dict
12
 
13
  from utils import detect_file_type, convert_context_to_list, merge_state, getconfig
14
  from retriever_adapter import RetrieverAdapter
@@ -29,16 +28,6 @@ DIRECT_OUTPUT_ENABLED = config.getboolean("file_processing", "DIRECT_OUTPUT", fa
29
 
30
  retriever_adapter = RetrieverAdapter("params.cfg")
31
 
32
- # Cache ONLY for direct output from ingestor (to prevent re-displaying results)
33
- # Standard output file uploads are NOT cached - they always go through normal processing
34
- # This means ChatUI will resend the file alongside conversation history with each turn
35
- _direct_output_cache: Dict[str, dict] = {}
36
-
37
-
38
- def compute_file_hash(file_content: bytes) -> str:
39
- """Compute SHA256 hash of file content for duplicate detection"""
40
- return hashlib.sha256(file_content).hexdigest()
41
-
42
 
43
  #----------------------------------------
44
  # LANGGRAPH NODE FUNCTIONS
@@ -48,23 +37,13 @@ def detect_file_type_node(state: GraphState) -> GraphState:
48
  """Detect file type and determine workflow"""
49
  file_type = "unknown"
50
  workflow_type = "standard"
51
- is_cached_direct_output = False
52
 
53
  if state.get("file_content") and state.get("filename"):
54
  file_type = detect_file_type(state["filename"], state["file_content"])
55
 
56
  # Check if direct output mode is enabled
57
  if DIRECT_OUTPUT_ENABLED:
58
- # Check if we've already shown direct output for this exact file
59
- file_hash = compute_file_hash(state["file_content"])
60
-
61
- # Comment out the cache check:
62
- # if file_hash in _direct_output_cache:
63
- # logger.info(f"Direct output file already processed (hash: {file_hash[:8]}...) - will route to standard RAG for follow-up")
64
- # workflow_type = "standard" # Override to standard for follow-up queries
65
- # is_cached_direct_output = True
66
- # else:
67
- logger.info(f"Direct output mode enabled - new file will show ingestor results directly")
68
  workflow_type = "direct_output"
69
  else:
70
  # Direct output disabled - use standard workflow
@@ -75,16 +54,12 @@ def detect_file_type_node(state: GraphState) -> GraphState:
75
  metadata.update({
76
  "file_type": file_type,
77
  "workflow_type": workflow_type,
78
- "is_cached_direct_output": is_cached_direct_output,
79
  "direct_output_enabled": DIRECT_OUTPUT_ENABLED
80
  })
81
 
82
- file_hash = compute_file_hash(state["file_content"]) if state.get("file_content") else None
83
-
84
  return {
85
  "file_type": file_type,
86
  "workflow_type": workflow_type,
87
- "file_hash": file_hash,
88
  "metadata": metadata
89
  }
90
 
@@ -98,25 +73,6 @@ def ingest_node(state: GraphState) -> GraphState:
98
  return {"ingestor_context": "", "metadata": state.get("metadata", {})}
99
 
100
  file_type = state.get("file_type", "unknown")
101
- file_hash = state.get("file_hash")
102
-
103
- # Check cache ONLY if direct output is enabled and file was previously processed
104
- # if DIRECT_OUTPUT_ENABLED and file_hash and file_hash in _direct_output_cache:
105
- # cached_data = _direct_output_cache[file_hash]
106
- # logger.info(f"Using cached result for direct output file: {state['filename']}")
107
-
108
- # metadata = state.get("metadata", {})
109
- # metadata.update({
110
- # "ingestion_duration": 0,
111
- # "ingestor_context_length": len(cached_data["ingestor_context"]),
112
- # "ingestion_success": True,
113
- # "cached": True,
114
- # "cache_timestamp": cached_data["timestamp"]
115
- # })
116
-
117
- # return {"ingestor_context": cached_data["ingestor_context"], "metadata": metadata}
118
-
119
- # Standard processing (both for new direct output files and all standard files)
120
  logger.info(f"Ingesting {file_type} file: {state['filename']}")
121
 
122
  try:
@@ -139,24 +95,13 @@ def ingest_node(state: GraphState) -> GraphState:
139
  finally:
140
  os.unlink(tmp_file_path)
141
 
142
- # Cache ONLY if direct output mode is enabled
143
- # if DIRECT_OUTPUT_ENABLED and file_hash:
144
- # _direct_output_cache[file_hash] = {
145
- # "ingestor_context": ingestor_context,
146
- # "timestamp": datetime.now().isoformat(),
147
- # "filename": state["filename"],
148
- # "file_type": file_type
149
- # }
150
- # logger.info(f"Cached direct output result for file hash: {file_hash[:8]}...")
151
-
152
  duration = (datetime.now() - start_time).total_seconds()
153
  metadata = state.get("metadata", {})
154
  metadata.update({
155
  "ingestion_duration": duration,
156
  "ingestor_context_length": len(ingestor_context) if ingestor_context else 0,
157
  "ingestion_success": True,
158
- "ingestor_used": ingestor_url,
159
- "cached": False
160
  })
161
 
162
  return {"ingestor_context": ingestor_context, "metadata": metadata}
@@ -177,7 +122,6 @@ def ingest_node(state: GraphState) -> GraphState:
177
  def direct_output_node(state: GraphState) -> GraphState:
178
  """
179
  For files when direct output mode is enabled, return ingestor results directly.
180
- This node is only reached on FIRST upload when DIRECT_OUTPUT=True.
181
  """
182
  file_type = state.get('file_type', 'unknown')
183
  logger.info(f"Direct output mode - returning ingestor results for {file_type} file")
@@ -384,8 +328,7 @@ async def generate_node_streaming(state: GraphState) -> Generator[GraphState, No
384
  def route_workflow(state: GraphState) -> str:
385
  """
386
  Conditional routing based on workflow type after ingestion.
387
- Returns 'direct_output' for NEW files when DIRECT_OUTPUT=True,
388
- 'standard' for everything else (standard mode + cached direct output files).
389
  """
390
  workflow_type = state.get("workflow_type", "standard")
391
  logger.info(f"Routing to: {workflow_type}")
@@ -527,11 +470,4 @@ async def process_query_streaming(
527
  if output_format == "structured":
528
  yield {"type": "error", "content": f"Error: {str(e)}"}
529
  else:
530
- yield f"Error: {str(e)}"
531
-
532
-
533
- def clear_direct_output_cache():
534
- """Utility function to clear the direct output cache"""
535
- global _direct_output_cache
536
- _direct_output_cache.clear()
537
- logger.info("Direct output cache cleared")
 
1
  import tempfile
2
  import os
 
3
  from models import GraphState
4
  from datetime import datetime
5
  from gradio_client import Client, file
 
7
  import dotenv
8
  import httpx
9
  import json
10
+ from typing import Generator, Optional
11
 
12
  from utils import detect_file_type, convert_context_to_list, merge_state, getconfig
13
  from retriever_adapter import RetrieverAdapter
 
28
 
29
  retriever_adapter = RetrieverAdapter("params.cfg")
30
 
 
 
 
 
 
 
 
 
 
 
31
 
32
  #----------------------------------------
33
  # LANGGRAPH NODE FUNCTIONS
 
37
  """Detect file type and determine workflow"""
38
  file_type = "unknown"
39
  workflow_type = "standard"
 
40
 
41
  if state.get("file_content") and state.get("filename"):
42
  file_type = detect_file_type(state["filename"], state["file_content"])
43
 
44
  # Check if direct output mode is enabled
45
  if DIRECT_OUTPUT_ENABLED:
46
+ logger.info(f"Direct output mode enabled - file will show ingestor results directly")
 
 
 
 
 
 
 
 
 
47
  workflow_type = "direct_output"
48
  else:
49
  # Direct output disabled - use standard workflow
 
54
  metadata.update({
55
  "file_type": file_type,
56
  "workflow_type": workflow_type,
 
57
  "direct_output_enabled": DIRECT_OUTPUT_ENABLED
58
  })
59
 
 
 
60
  return {
61
  "file_type": file_type,
62
  "workflow_type": workflow_type,
 
63
  "metadata": metadata
64
  }
65
 
 
73
  return {"ingestor_context": "", "metadata": state.get("metadata", {})}
74
 
75
  file_type = state.get("file_type", "unknown")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
76
  logger.info(f"Ingesting {file_type} file: {state['filename']}")
77
 
78
  try:
 
95
  finally:
96
  os.unlink(tmp_file_path)
97
 
 
 
 
 
 
 
 
 
 
 
98
  duration = (datetime.now() - start_time).total_seconds()
99
  metadata = state.get("metadata", {})
100
  metadata.update({
101
  "ingestion_duration": duration,
102
  "ingestor_context_length": len(ingestor_context) if ingestor_context else 0,
103
  "ingestion_success": True,
104
+ "ingestor_used": ingestor_url
 
105
  })
106
 
107
  return {"ingestor_context": ingestor_context, "metadata": metadata}
 
122
  def direct_output_node(state: GraphState) -> GraphState:
123
  """
124
  For files when direct output mode is enabled, return ingestor results directly.
 
125
  """
126
  file_type = state.get('file_type', 'unknown')
127
  logger.info(f"Direct output mode - returning ingestor results for {file_type} file")
 
328
  def route_workflow(state: GraphState) -> str:
329
  """
330
  Conditional routing based on workflow type after ingestion.
331
+ Returns 'direct_output' when DIRECT_OUTPUT=True, 'standard' otherwise.
 
332
  """
333
  workflow_type = state.get("workflow_type", "standard")
334
  logger.info(f"Routing to: {workflow_type}")
 
470
  if output_format == "structured":
471
  yield {"type": "error", "content": f"Error: {str(e)}"}
472
  else:
473
+ yield f"Error: {str(e)}"