Kalpokoch commited on
Commit
01687a9
Β·
verified Β·
1 Parent(s): bbdcb91

Update app/app.py

Browse files
Files changed (1) hide show
  1. app/app.py +132 -98
app/app.py CHANGED
@@ -9,21 +9,6 @@ from pydantic import BaseModel
9
  from llama_cpp import Llama
10
  from app.policy_vector_db import PolicyVectorDB, ensure_db_populated
11
 
12
- # -----------------------------
13
- # βœ… Optimized Configuration for Hugging Face Free Tier
14
- # -----------------------------
15
- DB_PERSIST_DIRECTORY = os.getenv("DB_PERSIST_DIRECTORY", "/app/vector_database")
16
- CHUNKS_FILE_PATH = os.getenv("CHUNKS_FILE_PATH", "/app/granular_chunks_final.jsonl")
17
- MODEL_PATH = os.getenv("MODEL_PATH", "/app/tinyllama_dop_q4_k_m.gguf")
18
- LLM_TIMEOUT_SECONDS = int(os.getenv("LLM_TIMEOUT_SECONDS", "60")) # Reduced timeout for free tier
19
- RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.3"))
20
- TOP_K_SEARCH = int(os.getenv("TOP_K_SEARCH", "3")) # Reduced for efficiency
21
- TOP_K_CONTEXT = int(os.getenv("TOP_K_CONTEXT", "2"))
22
-
23
- # βœ… Single-threaded CPU optimization
24
- LLM_THREADS = 1 # Single thread for free tier
25
- MAX_CONCURRENT_REQUESTS = 1 # Process one request at a time
26
-
27
  # -----------------------------
28
  # βœ… Logging Configuration
29
  # -----------------------------
@@ -36,12 +21,25 @@ class RequestIdAdapter(logging.LoggerAdapter):
36
  logger = logging.getLogger("app")
37
 
38
  # -----------------------------
39
- # βœ… Initialize FastAPI App with Request Limiting
40
  # -----------------------------
41
- app = FastAPI(title="NEEPCO DoP RAG Chatbot", version="2.4.0")
 
 
 
 
 
 
42
 
43
- # βœ… Request queue to ensure single processing
44
- request_semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
 
 
 
 
 
 
 
45
 
46
  @app.middleware("http")
47
  async def add_request_id(request: Request, call_next):
@@ -73,23 +71,19 @@ except Exception as e:
73
  db_ready = False
74
 
75
  # -----------------------------
76
- # βœ… Memory-Optimized GGUF Model Loading for Free Tier
77
  # -----------------------------
78
- logger.info(f"Loading GGUF model for single-threaded processing from: {MODEL_PATH}")
79
  try:
80
  llm = Llama(
81
  model_path=MODEL_PATH,
82
- n_ctx=2048, # Reduced context size for memory efficiency
83
- n_threads=LLM_THREADS, # Single thread
84
- n_batch=256, # Smaller batch size for memory efficiency
85
- use_mlock=False, # Disable memory locking
86
- use_mmap=True, # Enable memory mapping for efficiency
87
- verbose=False,
88
- n_gpu_layers=0, # CPU only
89
- f16_kv=True, # Use 16-bit for key-value cache to save memory
90
- low_vram=True, # Enable low VRAM mode for better memory usage
91
  )
92
- logger.info("GGUF model loaded successfully for single-threaded processing.")
93
  model_ready = True
94
  except Exception as e:
95
  logger.error(f"FATAL: Failed to load GGUF model: {e}", exc_info=True)
@@ -111,7 +105,7 @@ class Feedback(BaseModel):
111
  comment: str | None = None
112
 
113
  # -----------------------------
114
- # βœ… Query Processing Functions (Unchanged)
115
  # -----------------------------
116
  def classify_query_type(question: str) -> str:
117
  """Classify the type of query to choose appropriate search strategy."""
@@ -210,33 +204,34 @@ Your task is to answer the user's question based ONLY on the provided context.
210
  return prompt
211
 
212
  # -----------------------------
213
- # βœ… Synchronous LLM Response Generation (No Threading)
214
  # -----------------------------
215
- def generate_llm_response_sync(prompt: str, request_id: str) -> str:
216
- """Synchronous LLM generation optimized for single-threaded processing."""
217
- try:
218
- # βœ… Optimized parameters for free tier CPU
219
- response = llm(
 
220
  prompt,
221
- max_tokens=1024, # Reduced token limit for faster processing
222
  stop=["###", "Question:", "Context:", "</s>"],
223
- temperature=0.1, # Lower temperature for consistent responses
224
- top_p=0.9,
225
- repeat_penalty=1.1,
226
  echo=False
227
  )
228
-
229
- if response and "choices" in response and len(response["choices"]) > 0:
230
- return response["choices"][0]["text"].strip()
231
- else:
232
- raise ValueError("Empty or invalid response from LLM")
233
-
234
- except Exception as e:
235
- logger.error(f"LLM generation error for request {request_id}: {e}")
236
- raise
 
 
237
 
238
  # -----------------------------
239
- # βœ… Endpoints with Request Limiting
240
  # -----------------------------
241
  def get_logger_adapter(request: Request):
242
  return RequestIdAdapter(logger, {'request_id': getattr(request.state, 'request_id', 'N/A')})
@@ -244,10 +239,10 @@ def get_logger_adapter(request: Request):
244
  @app.get("/")
245
  async def root():
246
  return {
247
- "status": "βœ… Server is running on Hugging Face Free Tier",
248
- "mode": "Single-threaded processing",
249
- "max_concurrent_requests": MAX_CONCURRENT_REQUESTS,
250
- "llm_threads": LLM_THREADS
251
  }
252
 
253
  @app.get("/health")
@@ -256,8 +251,7 @@ async def health_check():
256
  "status": "ok",
257
  "database_status": "ready" if db_ready else "error",
258
  "model_status": "ready" if model_ready else "error",
259
- "processing_mode": "single_threaded",
260
- "max_concurrent_requests": MAX_CONCURRENT_REQUESTS
261
  }
262
  if not db_ready or not model_ready:
263
  raise HTTPException(status_code=503, detail=status)
@@ -265,14 +259,19 @@ async def health_check():
265
 
266
  @app.post("/chat")
267
  async def chat(query: Query, request: Request):
268
- # βœ… Acquire semaphore to ensure single request processing
269
- async with request_semaphore:
 
 
 
 
 
 
 
270
  adapter = get_logger_adapter(request)
271
- adapter.info("Processing request (single-threaded mode)")
272
-
273
  question_lower = query.question.strip().lower()
274
 
275
- # Greeting handling
276
  greeting_keywords = ["hello", "hi", "hey", "what can you do", "who are you"]
277
  if question_lower in greeting_keywords:
278
  adapter.info(f"Handling a greeting or introductory query: '{query.question}'")
@@ -300,22 +299,32 @@ async def chat(query: Query, request: Request):
300
 
301
  search_results = []
302
 
 
303
  if query_type == "monetary":
304
  amount = extract_monetary_amount(query.question)
305
  if amount:
306
  adapter.info(f"Extracted monetary amount: β‚Ή{amount}")
307
- monetary_results = db.search_by_amount(amount, comparison=">=", top_k=TOP_K_SEARCH)
308
- if monetary_results:
309
- search_results = monetary_results
310
- adapter.info(f"Found {len(search_results)} results using monetary search")
 
 
 
311
 
312
  if not search_results:
313
- search_results = db.search_with_context(
314
- query.question,
315
- top_k=TOP_K_SEARCH,
316
- include_related=True
317
- )
318
- adapter.info(f"Found {len(search_results)} results using semantic search with context")
 
 
 
 
 
 
319
 
320
  if not search_results:
321
  adapter.warning("No relevant context found in vector DB.")
@@ -326,43 +335,62 @@ async def chat(query: Query, request: Request):
326
  "answer": "Sorry, I could not find a relevant policy to answer that question. Please try rephrasing or ask about specific delegation limits, approval authorities, or procedures."
327
  }
328
 
329
- # Log search results with metadata
330
- result_info = []
331
- for i, result in enumerate(search_results):
332
- metadata = result.get('metadata', {})
333
- role = metadata.get('role', 'N/A')
334
- section = metadata.get('section', 'N/A')
335
- score = result.get('relevance_score', 0)
336
- result_info.append(f"#{i+1}: Score={score:.3f}, Role={role}, Section={section}")
337
-
338
- adapter.info(f"Search results: {' | '.join(result_info)}")
339
 
340
- # Prepare context with metadata
341
  context_chunks = []
342
  for result in search_results[:TOP_K_CONTEXT]:
343
  chunk_text = result['text']
344
  metadata = result.get('metadata', {})
345
 
346
- if metadata.get('section') or metadata.get('role'):
347
  metadata_prefix = f"[Section: {metadata.get('section', 'N/A')}, Role: {metadata.get('role', 'N/A')}] "
348
  chunk_text = metadata_prefix + chunk_text
349
 
350
  context_chunks.append(chunk_text)
351
 
352
  context = "\n---\n".join(context_chunks)
353
- prompt = build_enhanced_prompt(query.question, context, query_type, search_results)
354
 
355
- # Generate response synchronously
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
356
  answer = "An error occurred while processing your request."
357
  try:
358
- adapter.info(f"Sending prompt to LLM for {query_type} query (synchronous processing)...")
359
-
360
- # βœ… Direct synchronous call - no threading or async execution
361
- raw_answer = generate_llm_response_sync(prompt, request.state.request_id)
362
-
363
  adapter.info(f"LLM generation successful. Raw response: {raw_answer[:250]}...")
364
 
365
- # Post-processing
366
  if '|' in raw_answer:
367
  adapter.info("Pipe separator found. Formatting response as a bulleted list.")
368
  items = raw_answer.split('|')
@@ -371,10 +399,14 @@ async def chat(query: Query, request: Request):
371
  else:
372
  answer = raw_answer.strip()
373
 
 
374
  if query_type == "monetary" and "β‚Ή" not in answer and extract_monetary_amount(query.question):
375
  amount = extract_monetary_amount(query.question)
376
  answer = f"For amounts of β‚Ή{amount:,.0f}:\n\n{answer}"
377
 
 
 
 
378
  except Exception as e:
379
  adapter.error(f"An unexpected error occurred during LLM generation: {e}", exc_info=True)
380
  answer = "Sorry, an unexpected error occurred while generating a response."
@@ -385,10 +417,13 @@ async def chat(query: Query, request: Request):
385
  "question": query.question,
386
  "context_used": context,
387
  "answer": answer,
388
- "query_type": query_type,
389
- "search_strategy": "monetary" if query_type == "monetary" and extract_monetary_amount(query.question) else "semantic_with_context",
390
- "processing_mode": "single_threaded"
391
  }
 
 
 
 
 
392
 
393
  @app.post("/feedback")
394
  async def collect_feedback(feedback: Feedback, request: Request):
@@ -405,7 +440,6 @@ async def collect_feedback(feedback: Feedback, request: Request):
405
  adapter.info(json.dumps(feedback_log))
406
  return {"status": "βœ… Feedback recorded. Thank you!"}
407
 
408
- # βœ… No cleanup needed for single-threaded processing
409
  @app.on_event("shutdown")
410
  async def shutdown_event():
411
- logger.info("Application shutting down (single-threaded mode).")
 
9
  from llama_cpp import Llama
10
  from app.policy_vector_db import PolicyVectorDB, ensure_db_populated
11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  # -----------------------------
13
  # βœ… Logging Configuration
14
  # -----------------------------
 
21
  logger = logging.getLogger("app")
22
 
23
  # -----------------------------
24
+ # βœ… Configuration - Restored Original Efficient Settings
25
  # -----------------------------
26
+ DB_PERSIST_DIRECTORY = os.getenv("DB_PERSIST_DIRECTORY", "/app/vector_database")
27
+ CHUNKS_FILE_PATH = os.getenv("CHUNKS_FILE_PATH", "/app/granular_chunks_final.jsonl")
28
+ MODEL_PATH = os.getenv("MODEL_PATH", "/app/tinyllama_dop_q4_k_m.gguf")
29
+ LLM_TIMEOUT_SECONDS = int(os.getenv("LLM_TIMEOUT_SECONDS", "90")) # Back to original timeout
30
+ RELEVANCE_THRESHOLD = float(os.getenv("RELEVANCE_THRESHOLD", "0.3"))
31
+ TOP_K_SEARCH = int(os.getenv("TOP_K_SEARCH", "3")) # Keep reduced for efficiency
32
+ TOP_K_CONTEXT = int(os.getenv("TOP_K_CONTEXT", "1")) # Keep reduced for efficiency
33
 
34
+ # βœ… Single request processing without blocking semaphore
35
+ MAX_CONCURRENT_REQUESTS = 1
36
+ request_in_progress = False
37
+ request_lock = asyncio.Lock()
38
+
39
+ # -----------------------------
40
+ # βœ… Initialize FastAPI App
41
+ # -----------------------------
42
+ app = FastAPI(title="NEEPCO DoP RAG Chatbot", version="2.5.0")
43
 
44
  @app.middleware("http")
45
  async def add_request_id(request: Request, call_next):
 
71
  db_ready = False
72
 
73
  # -----------------------------
74
+ # βœ… Load GGUF Model - Restored Original Efficient Settings
75
  # -----------------------------
76
+ logger.info(f"Loading GGUF model from: {MODEL_PATH}")
77
  try:
78
  llm = Llama(
79
  model_path=MODEL_PATH,
80
+ n_ctx=4096, # βœ… Restored original context size
81
+ n_threads=4, # βœ… Restored original thread count for efficient CPU usage
82
+ n_batch=512, # βœ… Restored original batch size
83
+ use_mlock=True, # βœ… Restored original memory settings
84
+ verbose=False
 
 
 
 
85
  )
86
+ logger.info("GGUF model loaded successfully.")
87
  model_ready = True
88
  except Exception as e:
89
  logger.error(f"FATAL: Failed to load GGUF model: {e}", exc_info=True)
 
105
  comment: str | None = None
106
 
107
  # -----------------------------
108
+ # βœ… Enhanced Query Processing Functions
109
  # -----------------------------
110
  def classify_query_type(question: str) -> str:
111
  """Classify the type of query to choose appropriate search strategy."""
 
204
  return prompt
205
 
206
  # -----------------------------
207
+ # βœ… Efficient LLM Response Generation - Restored Original Async Pattern
208
  # -----------------------------
209
+ async def generate_llm_response(prompt: str, request_id: str):
210
+ """Async LLM generation using original efficient pattern."""
211
+ loop = asyncio.get_running_loop()
212
+
213
+ def llm_call():
214
+ return llm(
215
  prompt,
216
+ max_tokens=2048, # βœ… Restored original token limit
217
  stop=["###", "Question:", "Context:", "</s>"],
218
+ temperature=0.05, # βœ… Restored original temperature
 
 
219
  echo=False
220
  )
221
+
222
+ # βœ… Use original async executor pattern for efficient CPU usage
223
+ response = await loop.run_in_executor(None, llm_call)
224
+
225
+ if response and "choices" in response and len(response["choices"]) > 0:
226
+ answer = response["choices"][0]["text"].strip()
227
+ if not answer:
228
+ raise ValueError("Empty response from LLM")
229
+ return answer
230
+ else:
231
+ raise ValueError("Invalid response from LLM")
232
 
233
  # -----------------------------
234
+ # βœ… Endpoints with Lightweight Request Management
235
  # -----------------------------
236
  def get_logger_adapter(request: Request):
237
  return RequestIdAdapter(logger, {'request_id': getattr(request.state, 'request_id', 'N/A')})
 
239
  @app.get("/")
240
  async def root():
241
  return {
242
+ "status": "βœ… Server is running efficiently",
243
+ "mode": "CPU optimized for Hugging Face",
244
+ "model_loaded": model_ready,
245
+ "db_ready": db_ready
246
  }
247
 
248
  @app.get("/health")
 
251
  "status": "ok",
252
  "database_status": "ready" if db_ready else "error",
253
  "model_status": "ready" if model_ready else "error",
254
+ "processing_mode": "efficient_cpu_usage"
 
255
  }
256
  if not db_ready or not model_ready:
257
  raise HTTPException(status_code=503, detail=status)
 
259
 
260
  @app.post("/chat")
261
  async def chat(query: Query, request: Request):
262
+ global request_in_progress
263
+
264
+ # βœ… Lightweight request management - reject if busy instead of blocking
265
+ async with request_lock:
266
+ if request_in_progress:
267
+ raise HTTPException(status_code=429, detail="Server is busy processing another request. Please try again in a moment.")
268
+ request_in_progress = True
269
+
270
+ try:
271
  adapter = get_logger_adapter(request)
 
 
272
  question_lower = query.question.strip().lower()
273
 
274
+ # --- GREETING & INTRO HANDLING ---
275
  greeting_keywords = ["hello", "hi", "hey", "what can you do", "who are you"]
276
  if question_lower in greeting_keywords:
277
  adapter.info(f"Handling a greeting or introductory query: '{query.question}'")
 
299
 
300
  search_results = []
301
 
302
+ # Enhanced search strategy
303
  if query_type == "monetary":
304
  amount = extract_monetary_amount(query.question)
305
  if amount:
306
  adapter.info(f"Extracted monetary amount: β‚Ή{amount}")
307
+ try:
308
+ monetary_results = db.search_by_amount(amount, comparison=">=", top_k=TOP_K_SEARCH)
309
+ if monetary_results:
310
+ search_results = monetary_results
311
+ adapter.info(f"Found {len(search_results)} results using monetary search")
312
+ except:
313
+ adapter.info("Monetary search not available, falling back to semantic search")
314
 
315
  if not search_results:
316
+ # Use enhanced search if available, otherwise fallback to basic search
317
+ try:
318
+ search_results = db.search_with_context(
319
+ query.question,
320
+ top_k=TOP_K_SEARCH,
321
+ include_related=True
322
+ )
323
+ adapter.info(f"Found {len(search_results)} results using enhanced semantic search")
324
+ except:
325
+ # Fallback to basic search
326
+ search_results = db.search(query.question, top_k=TOP_K_SEARCH)
327
+ adapter.info(f"Found {len(search_results)} results using basic search")
328
 
329
  if not search_results:
330
  adapter.warning("No relevant context found in vector DB.")
 
335
  "answer": "Sorry, I could not find a relevant policy to answer that question. Please try rephrasing or ask about specific delegation limits, approval authorities, or procedures."
336
  }
337
 
338
+ # Log search results
339
+ scores = [f"{result.get('relevance_score', 0):.4f}" for result in search_results]
340
+ adapter.info(f"Found {len(search_results)} relevant chunks with scores: {scores}")
 
 
 
 
 
 
 
341
 
342
+ # Prepare context with metadata if available
343
  context_chunks = []
344
  for result in search_results[:TOP_K_CONTEXT]:
345
  chunk_text = result['text']
346
  metadata = result.get('metadata', {})
347
 
348
+ if metadata and (metadata.get('section') or metadata.get('role')):
349
  metadata_prefix = f"[Section: {metadata.get('section', 'N/A')}, Role: {metadata.get('role', 'N/A')}] "
350
  chunk_text = metadata_prefix + chunk_text
351
 
352
  context_chunks.append(chunk_text)
353
 
354
  context = "\n---\n".join(context_chunks)
 
355
 
356
+ # Build prompt - use enhanced if search results have metadata, otherwise simple
357
+ if any(result.get('metadata') for result in search_results):
358
+ prompt = build_enhanced_prompt(query.question, context, query_type, search_results)
359
+ adapter.info(f"Using enhanced prompt for {query_type} query")
360
+ else:
361
+ # Fallback to original simple prompt
362
+ prompt = f"""<|system|>
363
+ You are a precise and factual assistant for NEEPCO's Delegation of Powers (DoP) policy.
364
+ Your task is to answer the user's question based ONLY on the provided context.
365
+
366
+ - **Formatting Rule:** If the answer contains a list of items or steps, you **MUST** separate each item with a pipe symbol (`|`). For example: `First item|Second item|Third item`.
367
+ - **Content Rule:** If the information is not in the provided context, you **MUST** reply with the exact phrase: "The provided policy context does not contain information on this topic."
368
+ </s>
369
+ <|user|>
370
+ ### Relevant Context:
371
+ ```
372
+ {context}
373
+ ```
374
+
375
+ ### Question:
376
+ {query.question}
377
+ </s>
378
+ <|assistant|>
379
+ ### Detailed Answer:
380
+ """
381
+ adapter.info("Using original simple prompt")
382
+
383
+ # Generate response using original efficient async pattern
384
  answer = "An error occurred while processing your request."
385
  try:
386
+ adapter.info("Sending prompt to LLM for generation...")
387
+ raw_answer = await asyncio.wait_for(
388
+ generate_llm_response(prompt, request.state.request_id),
389
+ timeout=LLM_TIMEOUT_SECONDS
390
+ )
391
  adapter.info(f"LLM generation successful. Raw response: {raw_answer[:250]}...")
392
 
393
+ # Post-processing logic
394
  if '|' in raw_answer:
395
  adapter.info("Pipe separator found. Formatting response as a bulleted list.")
396
  items = raw_answer.split('|')
 
399
  else:
400
  answer = raw_answer.strip()
401
 
402
+ # Add monetary context if needed
403
  if query_type == "monetary" and "β‚Ή" not in answer and extract_monetary_amount(query.question):
404
  amount = extract_monetary_amount(query.question)
405
  answer = f"For amounts of β‚Ή{amount:,.0f}:\n\n{answer}"
406
 
407
+ except asyncio.TimeoutError:
408
+ adapter.warning(f"LLM generation timed out after {LLM_TIMEOUT_SECONDS} seconds.")
409
+ answer = "Sorry, the request took too long to process. Please try again with a simpler question."
410
  except Exception as e:
411
  adapter.error(f"An unexpected error occurred during LLM generation: {e}", exc_info=True)
412
  answer = "Sorry, an unexpected error occurred while generating a response."
 
417
  "question": query.question,
418
  "context_used": context,
419
  "answer": answer,
420
+ "query_type": query_type if 'query_type' in locals() else "general"
 
 
421
  }
422
+
423
+ finally:
424
+ # βœ… Always release the lock
425
+ async with request_lock:
426
+ request_in_progress = False
427
 
428
  @app.post("/feedback")
429
  async def collect_feedback(feedback: Feedback, request: Request):
 
440
  adapter.info(json.dumps(feedback_log))
441
  return {"status": "βœ… Feedback recorded. Thank you!"}
442
 
 
443
  @app.on_event("shutdown")
444
  async def shutdown_event():
445
+ logger.info("Application shutting down.")