justtesting / app.py
hra's picture
Create app.py
7254b09 verified
# app.py – MCP wrapper for triggering a Databricks spam check job
import os
import time
import json
import requests
import gradio as gr
# ── Environment setup ─────────────────────────────────────────
DATABRICKS_HOST = os.getenv("DATABRICKS_HOST")
DATABRICKS_TOKEN = os.getenv("DATABRICKS_TOKEN")
DATABRICKS_NOTEBOOK_PATH = os.getenv("DATABRICKS_NOTEBOOK_PATH")
EXISTING_CLUSTER_ID = os.getenv("EXISTING_CLUSTER_ID")
HEADERS = {
"Authorization": f"Bearer {DATABRICKS_TOKEN}",
"Content-Type": "application/json"
}
# ── MCP Tool ──────────────────────────────────────────────────
def run_spam_check(phone_number: str):
# 1. Submit the job to Databricks
submit_payload = {
"run_name": f"FraudCheck_{phone_number}",
"notebook_task": {
"notebook_path": DATABRICKS_NOTEBOOK_PATH,
"base_parameters": {
"phone_number": phone_number
}
},
"existing_cluster_id": EXISTING_CLUSTER_ID
}
response = requests.post(
f"{DATABRICKS_HOST}/api/2.1/jobs/runs/submit",
headers=HEADERS,
json=submit_payload
)
if response.status_code != 200:
return {"error": "Failed to start job", "details": response.text}
run_id = response.json()["run_id"]
# 2. Poll for completion
while True:
status_response = requests.get(
f"{DATABRICKS_HOST}/api/2.1/jobs/runs/get?run_id={run_id}",
headers=HEADERS
)
status_json = status_response.json()
run_state = status_json["state"]["life_cycle_state"]
if run_state in ("TERMINATED", "SKIPPED", "INTERNAL_ERROR"):
break
time.sleep(2)
# 3. Get output
output_response = requests.get(
f"{DATABRICKS_HOST}/api/2.1/jobs/runs/get-output?run_id={run_id}",
headers=HEADERS
)
if output_response.status_code != 200:
return {"error": "Failed to fetch output", "details": output_response.text}
output_json = output_response.json()
notebook_output = output_json.get("notebook_output", {}).get("result")
if notebook_output:
try:
return json.loads(notebook_output) if isinstance(notebook_output, str) else notebook_output
except json.JSONDecodeError:
return {"raw_output": notebook_output}
else:
return {"warning": "No result returned in notebook output."}
# ── Minimal UI to test ────────────────────────────────────────
demo = gr.Interface(
fn=run_spam_check,
inputs=gr.Textbox(label="Phone Number to Check"),
outputs="json",
title="Databricks Spam Check",
description="Triggers a Databricks job to check for spam/fraud using a phone number.",
api_name="spam_check"
)
if __name__ == "__main__":
demo.launch(mcp_server=True)