Spaces:
Running
Running
# app.py β simple one-shot scenario mapper (MCP-ready) | |
import io, os, uuid, hashlib, json, warnings | |
from datetime import datetime | |
from pathlib import Path | |
import gradio as gr | |
import numpy as np | |
import matplotlib.pyplot as plt | |
from duckduckgo_search import DDGS | |
from sklearn.cluster import KMeans | |
from sklearn.decomposition import PCA | |
from PIL import Image as PILImage | |
# ββ optional fancy clustering via OpenAI embeddings ββββββββββββββββββββββββ | |
try: | |
import openai | |
except ImportError: | |
openai = None | |
warnings.warn("`openai` package not found; falling back to hash clustering.") | |
# ββ in-memory store (one run = one entry) βββββββββββββββββββββββββββββββββββ | |
LABS: dict[str, dict] = {} | |
# ββ helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def web_search(q: str, k: int = 20): | |
with DDGS() as ddgs: | |
return [f"{r['title']} β {r.get('body','')}" for r in ddgs.text(q, max_results=k)] | |
def deterministic_xy(txt: str): | |
h = int(hashlib.sha256(txt.encode()).hexdigest(), 16) | |
return ((h % 1000) / 500 - 1, ((h >> 12) % 1000) / 500 - 1) | |
def embed(texts: list[str], key: str | None): | |
if openai is None or not key: | |
return None # Fallback: hashing | |
openai.api_key = key | |
resp = openai.embeddings.create(model="text-embedding-3-small", input=texts) | |
return np.array([d.embedding for d in resp.data]) | |
def cluster(snips: list[str], embeds): | |
if embeds is None: | |
return [(*deterministic_xy(s), s[:40]) for s in snips[:16]] | |
k = min(max(len(snips) // 5, 4), 12) | |
km = KMeans(n_clusters=k, n_init="auto", random_state=0).fit(embeds) | |
p2 = PCA(2, random_state=0).fit_transform(km.cluster_centers_) | |
labels = [ | |
snips[int(np.argmin(np.linalg.norm(embeds - c, axis=1)))][:40] | |
for c in km.cluster_centers_ | |
] | |
xs, ys = p2[:, 0], p2[:, 1] | |
xs = (xs - xs.min()) / (np.ptp(xs) + 1e-4) * 2 - 1 | |
ys = (ys - ys.min()) / (np.ptp(ys) + 1e-4) * 2 - 1 | |
return list(zip(xs, ys, labels)) | |
def draw(points, ax1, ax2): | |
fig, ax = plt.subplots(figsize=(5, 5)) | |
ax.axhline(0); ax.axvline(0) | |
ax.set_xlim(-1.1, 1.1); ax.set_ylim(-1.1, 1.1) | |
ax.set_xlabel(ax1); ax.set_ylabel(ax2) | |
for x, y, lbl in points: | |
ax.scatter(x, y); ax.text(x, y, lbl, fontsize=8) | |
buf = io.BytesIO() | |
fig.tight_layout(); fig.savefig(buf, format="png"); plt.close(fig) | |
buf.seek(0) | |
return PILImage.open(buf) | |
# ββ MCP tool 1 ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def scenario_lab(axis1: str, axis2: str, openai_key: str | None = None): | |
""" | |
Make a scenario map once and return {session_id, scenarios}. | |
Pass \"*****\" or leave blank to use the HF Space secret OPENAI_API_KEY. | |
""" | |
# Secret handling -------------------------------------------------------- | |
if not openai_key or openai_key.strip() == "*****": | |
openai_key = os.getenv("OPENAI_API_KEY", "") | |
# Basic validation ------------------------------------------------------- | |
axis1, axis2 = axis1.strip(), axis2.strip() | |
if not axis1 or not axis2: | |
raise gr.Error("Both axes are required.") | |
# Build map -------------------------------------------------------------- | |
snippets = web_search(f"{axis1} {axis2}", 20) | |
embeds = embed(snippets, openai_key) | |
points = cluster(snippets, embeds) | |
sid = str(uuid.uuid4()) | |
LABS[sid] = dict(axis1=axis1, axis2=axis2, points=points) | |
labels = [p[2] for p in points] | |
return {"session_id": sid, "scenarios": labels} | |
# ββ MCP tool 2 ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def get_plot(session_id: str): | |
if session_id not in LABS: | |
raise gr.Error("Unknown session_id") | |
d = LABS[session_id] | |
return draw(d["points"], d["axis1"], d["axis2"]) | |
# ββ Minimal UI so you can try it in a browser βββββββββββββββββββββββββββββββ | |
create_ui = gr.Interface( | |
fn=scenario_lab, | |
inputs=[gr.Textbox(label="Axis 1"), | |
gr.Textbox(label="Axis 2"), | |
gr.Textbox(label="OpenAI key (opt. or β*****β)", type="password")], | |
outputs="json", | |
title="Create Scenarios", | |
api_name="scenario_lab" # exposes as MCP tool | |
) | |
plot_ui = gr.Interface( | |
fn=get_plot, | |
inputs=gr.Textbox(label="session_id"), | |
outputs="image", | |
title="Get Plot", | |
api_name="get_plot" # exposes as MCP tool | |
) | |
demo = gr.TabbedInterface([create_ui, plot_ui], ["Create", "Plot"]) | |
if __name__ == "__main__": | |
demo.launch(mcp_server=True) # UI + MCP server | |