mcp_ocr_tuner / app_json.py
Vachudev's picture
Initial Commit
dc79584 verified
# app.py — MCP server (single-file)
from mcp.server.fastmcp import FastMCP
from typing import Optional, List, Tuple, Any, Dict
import requests
import os
import gradio as gr
import json
import re
import logging
import gc
# --- Import OCR Engine & Prompts ---
try:
from ocr_engine import extract_text_from_file
from prompts import get_ocr_extraction_prompt, get_agent_prompt
except ImportError:
def extract_text_from_file(path): return ""
def get_ocr_extraction_prompt(txt): return txt
def get_agent_prompt(h, u): return u
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp_server")
# --- Load Config ---
try:
from config import (
CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE,
INVOICE_API_BASE, ORGANIZATION_ID, LOCAL_MODEL
)
except Exception:
raise SystemExit("Config missing.")
mcp = FastMCP("ZohoCRMAgent")
# --- Globals ---
LLM_PIPELINE = None
TOKENIZER = None
# --- Helpers ---
def extract_json_safely(text: str) -> Optional[Any]:
try:
return json.loads(text)
except:
match = re.search(r'(\{.*\}|\[.*\])', text, re.DOTALL)
return json.loads(match.group(0)) if match else None
def _normalize_local_path_args(args: Any) -> Any:
if not isinstance(args, dict): return args
fp = args.get("file_path") or args.get("path")
if isinstance(fp, str) and fp.startswith("/mnt/data/") and os.path.exists(fp):
args["file_url"] = f"file://{fp}"
return args
# --- Model Loading ---
def init_local_model():
global LLM_PIPELINE, TOKENIZER
if LLM_PIPELINE is not None: return
try:
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
logger.info(f"Loading lighter model: {LOCAL_MODEL}...")
TOKENIZER = AutoTokenizer.from_pretrained(LOCAL_MODEL)
model = AutoModelForCausalLM.from_pretrained(
LOCAL_MODEL,
device_map="auto",
torch_dtype="auto"
)
LLM_PIPELINE = pipeline("text-generation", model=model, tokenizer=TOKENIZER)
logger.info("Model loaded.")
except Exception as e:
logger.error(f"Model load error: {e}")
def local_llm_generate(prompt: str, max_tokens: int = 512) -> Dict[str, Any]:
if LLM_PIPELINE is None:
init_local_model()
if LLM_PIPELINE is None:
return {"text": "Model not loaded.", "raw": None}
try:
out = LLM_PIPELINE(
prompt,
max_new_tokens=max_tokens,
return_full_text=False,
do_sample=False
)
text = out[0]["generated_text"] if out else ""
return {"text": text, "raw": out}
except Exception as e:
return {"text": f"Error: {e}", "raw": None}
# --- Tools (Zoho) ---
def _get_valid_token_headers() -> dict:
r = requests.post("https://accounts.zoho.in/oauth/v2/token", params={
"refresh_token": REFRESH_TOKEN, "client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET, "grant_type": "refresh_token"
}, timeout=10)
if r.status_code == 200:
return {"Authorization": f"Zoho-oauthtoken {r.json().get('access_token')}"}
return {}
@mcp.tool()
def create_record(module_name: str, record_data: dict) -> str:
h = _get_valid_token_headers()
if not h: return "Auth Failed"
r = requests.post(f"{API_BASE}/{module_name}", headers=h, json={"data": [record_data]})
if r.status_code in (200, 201):
try:
d = r.json().get("data", [{}])[0].get("details", {})
return json.dumps({"status": "success", "id": d.get("id"), "zoho_response": r.json()})
except:
return json.dumps(r.json())
return r.text
@mcp.tool()
def create_invoice(data: dict) -> str:
h = _get_valid_token_headers()
if not h: return "Auth Failed"
r = requests.post(f"{INVOICE_API_BASE}/invoices", headers=h,
params={"organization_id": ORGANIZATION_ID}, json=data)
return json.dumps(r.json()) if r.status_code in (200, 201) else r.text
@mcp.tool()
def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict:
if not os.path.exists(file_path):
return {"error": f"File not found at path: {file_path}"}
# 1. OCR
raw_text = extract_text_from_file(file_path)
if not raw_text: return {"error": "OCR empty"}
# 2. LLM Extraction
prompt = get_ocr_extraction_prompt(raw_text)
res = local_llm_generate(prompt, max_tokens=300)
data = extract_json_safely(res["text"])
return {
"status": "success",
"file": os.path.basename(file_path),
"extracted_data": data or {"raw": res["text"]}
}
# --- Executor ---
def parse_and_execute(model_text: str, history: list) -> str:
payload = extract_json_safely(model_text)
if not payload: return "No valid tool call found."
cmds = [payload] if isinstance(payload, dict) else payload
results = []
last_contact_id = None
for cmd in cmds:
if not isinstance(cmd, dict): continue
tool = cmd.get("tool")
args = _normalize_local_path_args(cmd.get("args", {}))
if tool == "create_record":
res = create_record(args.get("module_name", "Contacts"), args.get("record_data", {}))
results.append(f"Record: {res}")
try:
rj = json.loads(res)
if isinstance(rj, dict) and "id" in rj:
last_contact_id = rj["id"]
except: pass
elif tool == "create_invoice":
# Auto-fill contact_id if we just created one
if not args.get("customer_id") and last_contact_id:
args["customer_id"] = last_contact_id
# Map Items from strict structure
invoice_payload = args # Assuming LLM passes correct structure, or map here
if last_contact_id and "customer_id" not in invoice_payload:
invoice_payload["customer_id"] = last_contact_id
res = create_invoice(invoice_payload)
results.append(f"Invoice: {res}")
return "\n".join(results)
# --- Chat Core ---
def chat_logic(message: str, file_path: str, history: list) -> str:
# PHASE 1: File Upload -> Extraction Only (No Zoho Auth yet)
if file_path:
logger.info(f"Processing file: {file_path}")
doc = process_document(file_path)
if doc.get("status") == "success":
extracted_json = json.dumps(doc["extracted_data"], indent=2)
# We return this text. It gets added to history.
# The User must then say "Yes, push it" to trigger Phase 2.
return (
f"I extracted the following data from **{doc['file']}**:\n\n"
f"```json\n{extracted_json}\n```\n\n"
"Please review it. If it looks correct, type **'Create Invoice'** or **'Push to Zoho'**."
)
else:
return f"OCR Failed: {doc.get('error')}"
# PHASE 2: Text Interaction (Check History for JSON + Intent)
hist_txt = "\n".join([f"U: {h[0]}\nA: {h[1]}" for h in history])
# The Prompt now checks history for JSON and waits for explicit "save/push" keywords
prompt = get_agent_prompt(hist_txt, message)
gen = local_llm_generate(prompt, max_tokens=256)
logger.info(f"LLM Decision: {gen['text']}")
tool_data = extract_json_safely(gen["text"])
if tool_data:
# User confirmed -> Execute Tool (Triggers Zoho Auth)
return parse_and_execute(gen["text"], history)
# Just chat/clarification
return gen["text"]
# --- UI ---
def chat_handler(msg, hist):
txt = msg.get("text", "")
files = msg.get("files", [])
path = files[0] if files else None
return chat_logic(txt, path, hist)
if __name__ == "__main__":
gc.collect()
demo = gr.ChatInterface(fn=chat_handler, multimodal=True)
demo.launch(server_name="0.0.0.0", server_port=7860)