Spaces:
Sleeping
Sleeping
| from mcp.server.fastmcp import FastMCP | |
| from typing import Optional | |
| import requests | |
| import os | |
| from config import CLIENT_ID, CLIENT_SECRET, REFRESH_TOKEN, API_BASE | |
| # --- Initialize the FastMCP Server --- | |
| mcp = FastMCP("ZohoCRMAgent") | |
| # --- Token Refresh Utility --- | |
| def _get_valid_token_headers() -> dict: | |
| """Internal function to ensure a valid Zoho access token is available. | |
| This uses the refresh token flow to retrieve a fresh access token.""" | |
| token_url = "https://accounts.zoho.in/oauth/v2/token" | |
| params = { | |
| "refresh_token": REFRESH_TOKEN, | |
| "client_id": CLIENT_ID, | |
| "client_secret": CLIENT_SECRET, | |
| "grant_type": "refresh_token" | |
| } | |
| response = requests.post(token_url, params=params) | |
| if response.status_code == 200: | |
| access_token = response.json().get("access_token") | |
| return {"Authorization": f"Zoho-oauthtoken {access_token}"} | |
| else: | |
| raise Exception(f"Failed to refresh token: {response.text}") | |
| # --- MCP Tools for Zoho CRM and Zoho Books Operations --- | |
| def authenticate_zoho() -> str: | |
| """Refreshes and confirms Zoho CRM access token availability.""" | |
| _ = _get_valid_token_headers() | |
| return "Zoho CRM access token successfully refreshed." | |
| def create_record(module_name: str, record_data: dict) -> str: | |
| """Creates a new record in the specified Zoho CRM module.""" | |
| headers = _get_valid_token_headers() | |
| response = requests.post(f"{API_BASE}/{module_name}", headers=headers, json={"data": [record_data]}) | |
| if response.status_code in [200, 201]: | |
| return f"Record created successfully in {module_name}." | |
| return f"Error creating record: {response.text}" | |
| def get_records(module_name: str, page: int = 1, per_page: int = 200) -> list: | |
| """Fetches records from a specified Zoho CRM module.""" | |
| headers = _get_valid_token_headers() | |
| params = {"page": page, "per_page": per_page} | |
| response = requests.get(f"{API_BASE}/{module_name}", headers=headers, params=params) | |
| if response.status_code == 200: | |
| return response.json().get("data", []) | |
| return [f"Error retrieving records: {response.text}"] | |
| def update_record(module_name: str, record_id: str, data: dict) -> str: | |
| """Updates a record in a Zoho CRM module.""" | |
| headers = _get_valid_token_headers() | |
| response = requests.put(f"{API_BASE}/{module_name}/{record_id}", headers=headers, json={"data": [data]}) | |
| if response.status_code == 200: | |
| return f"Record {record_id} in {module_name} updated successfully." | |
| return f"Error updating record: {response.text}" | |
| def delete_record(module_name: str, record_id: str) -> str: | |
| """Deletes a record from the specified Zoho CRM module.""" | |
| headers = _get_valid_token_headers() | |
| response = requests.delete(f"{API_BASE}/{module_name}/{record_id}", headers=headers) | |
| if response.status_code == 200: | |
| return f"Record {record_id} in {module_name} deleted." | |
| return f"Error deleting record: {response.text}" | |
| def create_invoice(data: dict) -> str: | |
| """Creates an invoice in Zoho Books.""" | |
| headers = _get_valid_token_headers() | |
| response = requests.post(f"{API_BASE}/invoices", headers=headers, json={"data": [data]}) | |
| if response.status_code in [200, 201]: | |
| return "Invoice created successfully." | |
| return f"Error creating invoice: {response.text}" | |
| def process_document(file_path: str, target_module: Optional[str] = "Contacts") -> dict: | |
| """Extracts data from uploaded file (PDF/image) and returns structured info.""" | |
| # Placeholder for OCR + Gemini parsing logic | |
| # raw_text = perform_ocr(file_path) | |
| # structured_data = gemini_parse_json(raw_text) | |
| return { | |
| "status": "success", | |
| "file": os.path.basename(file_path), | |
| "extracted_data": f"Simulated structured data from {target_module} document." | |
| } |