AI_SQL / mcp_server.py
mgbam's picture
Update mcp_server.py
936bec2 verified
raw
history blame
8.58 kB
"""
mcp_server.py – MCP tool server exposing:
β€’ query_database – SQLite sample
β€’ query_salesforce – Salesforce via simple-salesforce
β€’ query_hubspot – HubSpot via hubspot-api-client
β€’ query_quickbooks – QuickBooks Online via python-quickbooks
β€’ query_stripe – Stripe via stripe
β€’ query_googleads – Google Ads via google-ads
β€’ query_sharepoint – SharePoint via Office365-REST-Python-Client
Secrets expected in env:
SF_USER, SF_PASS, SF_TOKEN
HUBSPOT_TOKEN
QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN, QB_REALM_ID
STRIPE_API_KEY
GOOGLE_ADS_DEVELOPER_TOKEN, GOOGLE_ADS_LOGIN_CUSTOMER_ID, GOOGLE_ADS_REFRESH_TOKEN, GOOGLE_ADS_CLIENT_ID, GOOGLE_ADS_CLIENT_SECRET
SHAREPOINT_SITE_URL, SHAREPOINT_CLIENT_ID, SHAREPOINT_CLIENT_SECRET, SHAREPOINT_TENANT_ID
"""
import os
import json
import sqlite3
from mcp.server.fastmcp import FastMCP
# Salesforce
from simple_salesforce import Salesforce
# HubSpot
from hubspot import HubSpot
from hubspot.crm.contacts import ApiException as HSContactsError
# QuickBooks
from quickbooks import QuickBooks
from quickbooks.objects.customer import Customer # example object
# Stripe
import stripe
# Google Ads
from google.ads.googleads.client import GoogleAdsClient
from google.ads.googleads.errors import GoogleAdsException
# SharePoint
from office365.runtime.auth.client_credential import ClientCredential
from office365.sharepoint.client_context import ClientContext
# ────────────────────────────────────────────────────────────────────────────
mcp = FastMCP("EnterpriseData")
# ─── 1) In-memory SQLite sample ─────────────────────────────────────────────
conn = sqlite3.connect(":memory:", check_same_thread=False)
cur = conn.cursor()
cur.execute("""
CREATE TABLE Customers (
CustomerID INTEGER PRIMARY KEY AUTOINCREMENT,
Name TEXT,
Region TEXT,
LastOrderDate TEXT
)
""")
cur.executemany(
"INSERT INTO Customers (Name, Region, LastOrderDate) VALUES (?,?,?)",
[
("Acme Corp", "Northeast", "2024-12-01"),
("Beta Inc", "West", "2025-06-01"),
("Gamma Co", "Northeast", "2023-09-15"),
("Delta LLC", "South", "2025-03-20"),
("Epsilon Ltd","Northeast", "2025-07-10"),
],
)
conn.commit()
@mcp.tool()
def query_database(sql: str) -> str:
"""Run SQL against the in-memory Customers table and return JSON rows."""
try:
cur.execute(sql)
cols = [d[0] for d in cur.description or []]
rows = [dict(zip(cols, r)) for r in cur.fetchall()]
return json.dumps(rows)
except Exception as e:
return json.dumps({"error": str(e)})
# ─── 2) Salesforce tool ─────────────────────────────────────────────────────
sf = Salesforce(
username=os.getenv("SF_USER"),
password=os.getenv("SF_PASS"),
security_token=os.getenv("SF_TOKEN"),
)
@mcp.tool()
def query_salesforce(soql: str) -> str:
"""
Run SOQL query via simple-salesforce.
Example: SELECT Id, Name FROM Account WHERE Industry = 'Technology'
"""
try:
result = sf.query_all(soql)
records = result.get("records", [])
# remove attributes block
for r in records:
r.pop("attributes", None)
return json.dumps(records)
except Exception as e:
return json.dumps({"error": str(e)})
# ─── 3) HubSpot tool ────────────────────────────────────────────────────────
hs_client = HubSpot(access_token=os.getenv("HUBSPOT_TOKEN"))
@mcp.tool()
def query_hubspot(object_type: str, limit: int = 100) -> str:
"""
Fetch up to `limit` objects of type `contacts`, `companies`, or `deals`.
Example: object_type="contacts"
"""
try:
api = getattr(hs_client.crm, object_type)
page = api.basic_api.get_page(limit=limit)
items = [r.to_dict() for r in page.results]
return json.dumps(items)
except HSContactsError as he:
return json.dumps({"error": he.body})
except Exception as e:
return json.dumps({"error": str(e)})
# ─── 4) QuickBooks tool ────────────────────────────────────────────────────
qb = QuickBooks(
client_id=os.getenv("QB_CLIENT_ID"),
client_secret=os.getenv("QB_CLIENT_SECRET"),
refresh_token=os.getenv("QB_REFRESH_TOKEN"),
company_id=os.getenv("QB_REALM_ID"),
)
@mcp.tool()
def query_quickbooks(entity: str, max_results: int = 50) -> str:
"""
Query QuickBooks Online entity via python-quickbooks.
entity="Customer" or "Invoice", etc.
"""
try:
cls = globals().get(entity, None)
if cls is None:
raise ValueError(f"Unknown entity '{entity}'")
objs = cls.where("", qb=qb, max_results=max_results)
return json.dumps([o.to_dict() for o in objs])
except Exception as e:
return json.dumps({"error": str(e)})
# ─── 5) Stripe tool ─────────────────────────────────────────────────────────
stripe.api_key = os.getenv("STRIPE_API_KEY")
@mcp.tool()
def query_stripe(obj: str, limit: int = 10) -> str:
"""
List Stripe objects, e.g. obj="customers", "charges", "invoices"
"""
try:
method = getattr(stripe, obj)
items = method.list(limit=limit).data
return json.dumps([i.to_dict() for i in items])
except Exception as e:
return json.dumps({"error": str(e)})
# ─── 6) Google Ads tool ────────────────────────────────────────────────────
ga_config = {
"developer_token": os.getenv("GOOGLE_ADS_DEVELOPER_TOKEN"),
"login_customer_id": os.getenv("GOOGLE_ADS_LOGIN_CUSTOMER_ID"),
"refresh_token": os.getenv("GOOGLE_ADS_REFRESH_TOKEN"),
"client_id": os.getenv("GOOGLE_ADS_CLIENT_ID"),
"client_secret": os.getenv("GOOGLE_ADS_CLIENT_SECRET"),
}
ga_client = GoogleAdsClient.load_from_dict(ga_config)
@mcp.tool()
def query_googleads(customer_id: str, query: str) -> str:
"""
Run a GAQL query against a Google Ads customer.
Example: query="SELECT campaign.id, campaign.name FROM campaign ORDER BY campaign.id"
"""
try:
service = ga_client.get_service("GoogleAdsService")
response = service.search(customer_id=customer_id, query=query)
rows = []
for row in response:
# convert protobuf row to dict
rows.append({k: getattr(row, k) for k in row._pb.DESCRIPTOR.fields_by_name})
return json.dumps(rows)
except GoogleAdsException as ge:
return json.dumps({"error": ge.error.message})
except Exception as e:
return json.dumps({"error": str(e)})
# ─── 7) SharePoint tool ────────────────────────────────────────────────────
site_url = os.getenv("SHAREPOINT_SITE_URL")
client_id = os.getenv("SHAREPOINT_CLIENT_ID")
client_secret = os.getenv("SHAREPOINT_CLIENT_SECRET")
tenant_id = os.getenv("SHAREPOINT_TENANT_ID")
auth_ctx = ClientContext(site_url).with_credentials(
ClientCredential(client_id, client_secret)
)
@mcp.tool()
def query_sharepoint(list_title: str, top: int = 50) -> str:
"""
Fetch items from a SharePoint list.
Example: list_title="ContactsList"
"""
try:
sp_list = auth_ctx.web.lists.get_by_title(list_title)
items = sp_list.items.top(top).get().execute_query()
out = [dict(i.properties) for i in items]
return json.dumps(out, ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e)})
# ─── 8) Start the MCP server ───────────────────────────────────────────────
if __name__ == "__main__":
mcp.run(transport="stdio")