|
|
""" |
|
|
mcp_server.py β MCP tool server exposing: |
|
|
β’ query_database β SQLite sample |
|
|
β’ query_salesforce β Salesforce via simple-salesforce |
|
|
β’ query_hubspot β HubSpot via hubspot-api-client |
|
|
β’ query_quickbooks β QuickBooks Online via python-quickbooks |
|
|
β’ query_stripe β Stripe via stripe |
|
|
β’ query_googleads β Google Ads via google-ads |
|
|
β’ query_sharepoint β SharePoint via Office365-REST-Python-Client |
|
|
|
|
|
Secrets expected in env: |
|
|
SF_USER, SF_PASS, SF_TOKEN |
|
|
HUBSPOT_TOKEN |
|
|
QB_CLIENT_ID, QB_CLIENT_SECRET, QB_REFRESH_TOKEN, QB_REALM_ID |
|
|
STRIPE_API_KEY |
|
|
GOOGLE_ADS_DEVELOPER_TOKEN, GOOGLE_ADS_LOGIN_CUSTOMER_ID, GOOGLE_ADS_REFRESH_TOKEN, GOOGLE_ADS_CLIENT_ID, GOOGLE_ADS_CLIENT_SECRET |
|
|
SHAREPOINT_SITE_URL, SHAREPOINT_CLIENT_ID, SHAREPOINT_CLIENT_SECRET, SHAREPOINT_TENANT_ID |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import sqlite3 |
|
|
|
|
|
from mcp.server.fastmcp import FastMCP |
|
|
|
|
|
|
|
|
from simple_salesforce import Salesforce |
|
|
|
|
|
|
|
|
from hubspot import HubSpot |
|
|
from hubspot.crm.contacts import ApiException as HSContactsError |
|
|
|
|
|
|
|
|
from quickbooks import QuickBooks |
|
|
from quickbooks.objects.customer import Customer |
|
|
|
|
|
|
|
|
import stripe |
|
|
|
|
|
|
|
|
from google.ads.googleads.client import GoogleAdsClient |
|
|
from google.ads.googleads.errors import GoogleAdsException |
|
|
|
|
|
|
|
|
from office365.runtime.auth.client_credential import ClientCredential |
|
|
from office365.sharepoint.client_context import ClientContext |
|
|
|
|
|
|
|
|
mcp = FastMCP("EnterpriseData") |
|
|
|
|
|
|
|
|
conn = sqlite3.connect(":memory:", check_same_thread=False) |
|
|
cur = conn.cursor() |
|
|
cur.execute(""" |
|
|
CREATE TABLE Customers ( |
|
|
CustomerID INTEGER PRIMARY KEY AUTOINCREMENT, |
|
|
Name TEXT, |
|
|
Region TEXT, |
|
|
LastOrderDate TEXT |
|
|
) |
|
|
""") |
|
|
cur.executemany( |
|
|
"INSERT INTO Customers (Name, Region, LastOrderDate) VALUES (?,?,?)", |
|
|
[ |
|
|
("Acme Corp", "Northeast", "2024-12-01"), |
|
|
("Beta Inc", "West", "2025-06-01"), |
|
|
("Gamma Co", "Northeast", "2023-09-15"), |
|
|
("Delta LLC", "South", "2025-03-20"), |
|
|
("Epsilon Ltd","Northeast", "2025-07-10"), |
|
|
], |
|
|
) |
|
|
conn.commit() |
|
|
|
|
|
@mcp.tool() |
|
|
def query_database(sql: str) -> str: |
|
|
"""Run SQL against the in-memory Customers table and return JSON rows.""" |
|
|
try: |
|
|
cur.execute(sql) |
|
|
cols = [d[0] for d in cur.description or []] |
|
|
rows = [dict(zip(cols, r)) for r in cur.fetchall()] |
|
|
return json.dumps(rows) |
|
|
except Exception as e: |
|
|
return json.dumps({"error": str(e)}) |
|
|
|
|
|
|
|
|
sf = Salesforce( |
|
|
username=os.getenv("SF_USER"), |
|
|
password=os.getenv("SF_PASS"), |
|
|
security_token=os.getenv("SF_TOKEN"), |
|
|
) |
|
|
|
|
|
@mcp.tool() |
|
|
def query_salesforce(soql: str) -> str: |
|
|
""" |
|
|
Run SOQL query via simple-salesforce. |
|
|
Example: SELECT Id, Name FROM Account WHERE Industry = 'Technology' |
|
|
""" |
|
|
try: |
|
|
result = sf.query_all(soql) |
|
|
records = result.get("records", []) |
|
|
|
|
|
for r in records: |
|
|
r.pop("attributes", None) |
|
|
return json.dumps(records) |
|
|
except Exception as e: |
|
|
return json.dumps({"error": str(e)}) |
|
|
|
|
|
|
|
|
hs_client = HubSpot(access_token=os.getenv("HUBSPOT_TOKEN")) |
|
|
|
|
|
@mcp.tool() |
|
|
def query_hubspot(object_type: str, limit: int = 100) -> str: |
|
|
""" |
|
|
Fetch up to `limit` objects of type `contacts`, `companies`, or `deals`. |
|
|
Example: object_type="contacts" |
|
|
""" |
|
|
try: |
|
|
api = getattr(hs_client.crm, object_type) |
|
|
page = api.basic_api.get_page(limit=limit) |
|
|
items = [r.to_dict() for r in page.results] |
|
|
return json.dumps(items) |
|
|
except HSContactsError as he: |
|
|
return json.dumps({"error": he.body}) |
|
|
except Exception as e: |
|
|
return json.dumps({"error": str(e)}) |
|
|
|
|
|
|
|
|
qb = QuickBooks( |
|
|
client_id=os.getenv("QB_CLIENT_ID"), |
|
|
client_secret=os.getenv("QB_CLIENT_SECRET"), |
|
|
refresh_token=os.getenv("QB_REFRESH_TOKEN"), |
|
|
company_id=os.getenv("QB_REALM_ID"), |
|
|
) |
|
|
|
|
|
@mcp.tool() |
|
|
def query_quickbooks(entity: str, max_results: int = 50) -> str: |
|
|
""" |
|
|
Query QuickBooks Online entity via python-quickbooks. |
|
|
entity="Customer" or "Invoice", etc. |
|
|
""" |
|
|
try: |
|
|
cls = globals().get(entity, None) |
|
|
if cls is None: |
|
|
raise ValueError(f"Unknown entity '{entity}'") |
|
|
objs = cls.where("", qb=qb, max_results=max_results) |
|
|
return json.dumps([o.to_dict() for o in objs]) |
|
|
except Exception as e: |
|
|
return json.dumps({"error": str(e)}) |
|
|
|
|
|
|
|
|
stripe.api_key = os.getenv("STRIPE_API_KEY") |
|
|
|
|
|
@mcp.tool() |
|
|
def query_stripe(obj: str, limit: int = 10) -> str: |
|
|
""" |
|
|
List Stripe objects, e.g. obj="customers", "charges", "invoices" |
|
|
""" |
|
|
try: |
|
|
method = getattr(stripe, obj) |
|
|
items = method.list(limit=limit).data |
|
|
return json.dumps([i.to_dict() for i in items]) |
|
|
except Exception as e: |
|
|
return json.dumps({"error": str(e)}) |
|
|
|
|
|
|
|
|
ga_config = { |
|
|
"developer_token": os.getenv("GOOGLE_ADS_DEVELOPER_TOKEN"), |
|
|
"login_customer_id": os.getenv("GOOGLE_ADS_LOGIN_CUSTOMER_ID"), |
|
|
"refresh_token": os.getenv("GOOGLE_ADS_REFRESH_TOKEN"), |
|
|
"client_id": os.getenv("GOOGLE_ADS_CLIENT_ID"), |
|
|
"client_secret": os.getenv("GOOGLE_ADS_CLIENT_SECRET"), |
|
|
} |
|
|
|
|
|
ga_client = GoogleAdsClient.load_from_dict(ga_config) |
|
|
|
|
|
@mcp.tool() |
|
|
def query_googleads(customer_id: str, query: str) -> str: |
|
|
""" |
|
|
Run a GAQL query against a Google Ads customer. |
|
|
Example: query="SELECT campaign.id, campaign.name FROM campaign ORDER BY campaign.id" |
|
|
""" |
|
|
try: |
|
|
service = ga_client.get_service("GoogleAdsService") |
|
|
response = service.search(customer_id=customer_id, query=query) |
|
|
rows = [] |
|
|
for row in response: |
|
|
|
|
|
rows.append({k: getattr(row, k) for k in row._pb.DESCRIPTOR.fields_by_name}) |
|
|
return json.dumps(rows) |
|
|
except GoogleAdsException as ge: |
|
|
return json.dumps({"error": ge.error.message}) |
|
|
except Exception as e: |
|
|
return json.dumps({"error": str(e)}) |
|
|
|
|
|
|
|
|
site_url = os.getenv("SHAREPOINT_SITE_URL") |
|
|
client_id = os.getenv("SHAREPOINT_CLIENT_ID") |
|
|
client_secret = os.getenv("SHAREPOINT_CLIENT_SECRET") |
|
|
tenant_id = os.getenv("SHAREPOINT_TENANT_ID") |
|
|
|
|
|
auth_ctx = ClientContext(site_url).with_credentials( |
|
|
ClientCredential(client_id, client_secret) |
|
|
) |
|
|
|
|
|
@mcp.tool() |
|
|
def query_sharepoint(list_title: str, top: int = 50) -> str: |
|
|
""" |
|
|
Fetch items from a SharePoint list. |
|
|
Example: list_title="ContactsList" |
|
|
""" |
|
|
try: |
|
|
sp_list = auth_ctx.web.lists.get_by_title(list_title) |
|
|
items = sp_list.items.top(top).get().execute_query() |
|
|
out = [dict(i.properties) for i in items] |
|
|
return json.dumps(out, ensure_ascii=False) |
|
|
except Exception as e: |
|
|
return json.dumps({"error": str(e)}) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
mcp.run(transport="stdio") |
|
|
|