import atexit import functools import os import re import tempfile from queue import Queue from threading import Event, Thread import threading # Import threading from flask import Flask, request, jsonify from paddleocr import PaddleOCR from PIL import Image # --- NEW: Import the NLP analysis function --- from nlp_service import analyze_text # Corrected import # --- Configuration --- LANG = 'en' # Default language, can be overridden if needed NUM_WORKERS = 2 # Number of OCR worker threads # --- PaddleOCR Model Manager --- class PaddleOCRModelManager(object): def __init__(self, num_workers, model_factory): super().__init__() self._model_factory = model_factory self._queue = Queue() self._workers = [] self._model_initialized_event = Event() print(f"Initializing {num_workers} OCR worker(s)...") for i in range(num_workers): print(f"Starting worker {i+1}...") worker = Thread(target=self._worker, daemon=True) worker.start() self._model_initialized_event.wait() # Wait for this worker's model self._model_initialized_event.clear() self._workers.append(worker) print("All OCR workers initialized.") def infer(self, *args, **kwargs): result_queue = Queue(maxsize=1) self._queue.put((args, kwargs, result_queue)) success, payload = result_queue.get() if success: return payload else: print(f"Error during OCR inference: {payload}") raise payload def close(self): print("Shutting down OCR workers...") for _ in self._workers: self._queue.put(None) print("OCR worker shutdown signaled.") def _worker(self): print(f"Worker thread {threading.current_thread().name}: Loading PaddleOCR model ({LANG})...") try: model = self._model_factory() print(f"Worker thread {threading.current_thread().name}: Model loaded.") self._model_initialized_event.set() except Exception as e: print(f"FATAL: Worker thread {threading.current_thread().name} failed to load model: {e}") self._model_initialized_event.set() return while True: item = self._queue.get() if item is None: print(f"Worker thread {threading.current_thread().name}: Exiting.") break args, kwargs, result_queue = item try: result = model.ocr(*args, **kwargs) if result and result[0]: result_queue.put((True, result[0])) else: result_queue.put((True, [])) except Exception as e: print(f"Worker thread {threading.current_thread().name}: Error processing request: {e}") result_queue.put((False, e)) finally: self._queue.task_done() # --- Amount Extraction Logic --- def find_main_amount(ocr_results): if not ocr_results: return None amount_regex = re.compile(r'(?= 0.01] # Keep small decimals too # Stricter filter for large numbers: exclude large integers (likely IDs, phone numbers) # Keep numbers < 50000 OR numbers that have a non-zero decimal part plausible_numbers = [n for n in plausible_numbers if n < 50000 or (n != int(n))] # If we have plausible numbers other than subtotals, prefer them non_subtotal_plausible = [n for n in plausible_numbers if n not in subtotal_numbers] if non_subtotal_plausible: return max(non_subtotal_plausible) elif plausible_numbers: # Only subtotals (or nothing else plausible) were found return max(plausible_numbers) # Return the largest subtotal/plausible as last resort # 6. If still nothing, return None print("Warning: Could not determine main amount.") return None # --- Flask App Setup --- app = Flask(__name__) # --- REMOVED: Register the NLP Blueprint --- # app.register_blueprint(nlp_bp) # No longer needed as we call the function directly # --- Initialize OCR Manager --- ocr_model_factory = functools.partial(PaddleOCR, lang=LANG, use_angle_cls=True, use_gpu=False, show_log=False) ocr_manager = PaddleOCRModelManager(num_workers=NUM_WORKERS, model_factory=ocr_model_factory) # Register cleanup function atexit.register(ocr_manager.close) # --- API Endpoint --- @app.route('/extract_expense', methods=['POST']) def extract_expense(): if 'file' not in request.files: return jsonify({"error": "No file part in the request"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file"}), 400 if file: temp_file_path = None # Initialize variable try: # Save to a temporary file _, file_extension = os.path.splitext(file.filename) with tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) as temp_file: file.save(temp_file.name) temp_file_path = temp_file.name # Perform OCR ocr_result = ocr_manager.infer(temp_file_path, cls=True) # Process OCR results extracted_text = "" main_amount_ocr = None if ocr_result: extracted_lines = [line[1][0] for line in ocr_result if line and len(line) > 1 and len(line[1]) > 0] extracted_text = "\n".join(extracted_lines) main_amount_ocr = find_main_amount(ocr_result) # Keep OCR amount extraction # --- REMOVED: NLP Call --- # nlp_analysis_result = None # nlp_error = None # ... (removed NLP call logic) ... # --- End Removed NLP Call --- # Construct the response (only OCR results) response_data = { "type": "photo", "extracted_text": extracted_text, "main_amount_ocr": main_amount_ocr, # Amount found by OCR regex logic } return jsonify(response_data) except Exception as e: print(f"Error processing file: {e}") import traceback traceback.print_exc() return jsonify({"error": f"An internal error occurred: {str(e)}"}), 500 finally: if temp_file_path and os.path.exists(temp_file_path): os.remove(temp_file_path) return jsonify({"error": "File processing failed"}), 500 # --- NEW: NLP Message Endpoint --- @app.route('/message', methods=['POST']) def process_message(): data = request.get_json() if not data or 'text' not in data: return jsonify({"error": "Missing 'text' field in JSON payload"}), 400 text_message = data['text'] if not text_message: return jsonify({"error": "'text' field cannot be empty"}), 400 nlp_analysis_result = None nlp_error = None try: # Call the imported analysis function nlp_analysis_result = analyze_text(text_message) # Corrected function call print(f"NLP Service Analysis Result: {nlp_analysis_result}") # Check if the NLP analysis itself reported an error/failure or requires fallback status = nlp_analysis_result.get("status") if status == "failed": nlp_error = nlp_analysis_result.get("message", "NLP processing failed") # Return the failure result from NLP service return jsonify(nlp_analysis_result), 400 # Use 400 for client-side errors like empty text elif status == "fallback_required": # Return the fallback result (e.g., for queries) return jsonify(nlp_analysis_result), 200 # Return 200, but indicate fallback needed # Return the successful analysis result return jsonify(nlp_analysis_result) except Exception as nlp_e: nlp_error = f"Error calling NLP analysis function: {nlp_e}" print(f"Error calling NLP function: {nlp_error}") return jsonify({"error": "An internal error occurred during NLP processing", "details": nlp_error}), 500 # --- NEW: Health Check Endpoint --- @app.route('/health', methods=['GET']) def health_check(): # You could add more checks here (e.g., if OCR workers are alive) return jsonify({"status": "ok"}), 200 # --- Run the App --- if __name__ == '__main__': # Use port 7860 as expected by Hugging Face Spaces # Use host='0.0.0.0' for accessibility within Docker/Spaces app.run(host='0.0.0.0', port=7860, debug=False)