|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
import re |
|
|
import json |
|
|
import sys |
|
|
import os |
|
|
import random |
|
|
from io import StringIO |
|
|
from typing import List, Dict, Tuple, Annotated, Literal, Optional |
|
|
|
|
|
import gradio as gr |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
from markdownify import markdownify as md |
|
|
from readability import Document |
|
|
from urllib.parse import urlparse |
|
|
from ddgs import DDGS |
|
|
from PIL import Image |
|
|
from huggingface_hub import InferenceClient |
|
|
import time |
|
|
import tempfile |
|
|
import uuid |
|
|
import threading |
|
|
from datetime import datetime |
|
|
|
|
|
|
|
|
import numpy as np |
|
|
try: |
|
|
import torch |
|
|
except Exception: |
|
|
torch = None |
|
|
try: |
|
|
from kokoro import KModel, KPipeline |
|
|
except Exception: |
|
|
KModel = None |
|
|
KPipeline = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _http_get_enhanced(url: str) -> requests.Response: |
|
|
""" |
|
|
Download the page with enhanced headers, timeout handling, and better error recovery. |
|
|
""" |
|
|
headers = { |
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", |
|
|
"Accept-Language": "en-US,en;q=0.9", |
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
|
|
"Accept-Encoding": "gzip, deflate, br", |
|
|
"DNT": "1", |
|
|
"Connection": "keep-alive", |
|
|
"Upgrade-Insecure-Requests": "1", |
|
|
} |
|
|
|
|
|
|
|
|
_fetch_rate_limiter.acquire() |
|
|
|
|
|
try: |
|
|
response = requests.get( |
|
|
url, |
|
|
headers=headers, |
|
|
timeout=30, |
|
|
allow_redirects=True, |
|
|
stream=False |
|
|
) |
|
|
response.raise_for_status() |
|
|
return response |
|
|
except requests.exceptions.Timeout: |
|
|
raise requests.exceptions.RequestException("Request timed out. The webpage took too long to respond.") |
|
|
except requests.exceptions.ConnectionError: |
|
|
raise requests.exceptions.RequestException("Connection error. Please check the URL and your internet connection.") |
|
|
except requests.exceptions.HTTPError as e: |
|
|
if response.status_code == 403: |
|
|
raise requests.exceptions.RequestException("Access forbidden. The website may be blocking automated requests.") |
|
|
elif response.status_code == 404: |
|
|
raise requests.exceptions.RequestException("Page not found. Please check the URL.") |
|
|
elif response.status_code == 429: |
|
|
raise requests.exceptions.RequestException("Rate limited. Please try again in a few minutes.") |
|
|
else: |
|
|
raise requests.exceptions.RequestException(f"HTTP error {response.status_code}: {str(e)}") |
|
|
|
|
|
def _normalize_whitespace(text: str) -> str: |
|
|
""" |
|
|
Squeeze extra spaces and blank lines to keep things compact. |
|
|
(Layman's terms: tidy up the text so it’s not full of weird spacing.) |
|
|
""" |
|
|
text = re.sub(r"[ \t\u00A0]+", " ", text) |
|
|
text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text.strip()) |
|
|
return text.strip() |
|
|
|
|
|
|
|
|
def _truncate(text: str, max_chars: int) -> Tuple[str, bool]: |
|
|
""" |
|
|
Cut text if it gets too long; return the text and whether we trimmed. |
|
|
(Layman's terms: shorten long text and tell us if we had to cut it.) |
|
|
""" |
|
|
if max_chars is None or max_chars <= 0 or len(text) <= max_chars: |
|
|
return text, False |
|
|
return text[:max_chars].rstrip() + " …", True |
|
|
|
|
|
|
|
|
def _shorten(text: str, limit: int) -> str: |
|
|
""" |
|
|
Hard cap a string with an ellipsis to keep tokens small. |
|
|
(Layman's terms: force a string to a max length with an ellipsis.) |
|
|
""" |
|
|
if limit <= 0 or len(text) <= limit: |
|
|
return text |
|
|
return text[: max(0, limit - 1)].rstrip() + "…" |
|
|
|
|
|
|
|
|
def _domain_of(url: str) -> str: |
|
|
""" |
|
|
Show a friendly site name like "example.com". |
|
|
(Layman's terms: pull the website's domain.) |
|
|
""" |
|
|
try: |
|
|
return urlparse(url).netloc or "" |
|
|
except Exception: |
|
|
return "" |
|
|
|
|
|
|
|
|
def _meta(soup: BeautifulSoup, name: str) -> str | None: |
|
|
tag = soup.find("meta", attrs={"name": name}) |
|
|
return tag.get("content") if tag and tag.has_attr("content") else None |
|
|
|
|
|
|
|
|
def _og(soup: BeautifulSoup, prop: str) -> str | None: |
|
|
tag = soup.find("meta", attrs={"property": prop}) |
|
|
return tag.get("content") if tag and tag.has_attr("content") else None |
|
|
|
|
|
|
|
|
def _extract_metadata(soup: BeautifulSoup, final_url: str) -> Dict[str, str]: |
|
|
""" |
|
|
Pull the useful bits: title, description, site name, canonical URL, language, etc. |
|
|
(Layman's terms: gather page basics like title/description/address.) |
|
|
""" |
|
|
meta: Dict[str, str] = {} |
|
|
|
|
|
|
|
|
title_candidates = [ |
|
|
(soup.title.string if soup.title and soup.title.string else None), |
|
|
_og(soup, "og:title"), |
|
|
_meta(soup, "twitter:title"), |
|
|
] |
|
|
meta["title"] = next((t.strip() for t in title_candidates if t and t.strip()), "") |
|
|
|
|
|
|
|
|
desc_candidates = [ |
|
|
_meta(soup, "description"), |
|
|
_og(soup, "og:description"), |
|
|
_meta(soup, "twitter:description"), |
|
|
] |
|
|
meta["description"] = next((d.strip() for d in desc_candidates if d and d.strip()), "") |
|
|
|
|
|
|
|
|
link_canonical = soup.find("link", rel=lambda v: v and "canonical" in v) |
|
|
meta["canonical"] = (link_canonical.get("href") or "").strip() if link_canonical else "" |
|
|
|
|
|
|
|
|
meta["site_name"] = (_og(soup, "og:site_name") or "").strip() |
|
|
html_tag = soup.find("html") |
|
|
meta["lang"] = (html_tag.get("lang") or "").strip() if html_tag else "" |
|
|
|
|
|
|
|
|
meta["fetched_url"] = final_url |
|
|
meta["domain"] = _domain_of(final_url) |
|
|
|
|
|
return meta |
|
|
|
|
|
|
|
|
def _extract_main_text(html: str) -> Tuple[str, BeautifulSoup]: |
|
|
""" |
|
|
Use Readability to isolate the main article and turn it into clean text. |
|
|
Returns (clean_text, soup_of_readable_html). |
|
|
(Layman's terms: find the real article text and clean it.) |
|
|
""" |
|
|
|
|
|
doc = Document(html) |
|
|
readable_html = doc.summary(html_partial=True) |
|
|
|
|
|
|
|
|
s = BeautifulSoup(readable_html, "lxml") |
|
|
|
|
|
|
|
|
for sel in ["script", "style", "noscript", "iframe", "svg"]: |
|
|
for tag in s.select(sel): |
|
|
tag.decompose() |
|
|
|
|
|
|
|
|
text_parts: List[str] = [] |
|
|
for p in s.find_all(["p", "li", "h2", "h3", "h4", "blockquote"]): |
|
|
chunk = p.get_text(" ", strip=True) |
|
|
if chunk: |
|
|
text_parts.append(chunk) |
|
|
|
|
|
clean_text = _normalize_whitespace("\n\n".join(text_parts)) |
|
|
return clean_text, s |
|
|
|
|
|
|
|
|
def _fullpage_markdown_from_soup(full_soup: BeautifulSoup, base_url: str) -> str: |
|
|
|
|
|
|
|
|
for element in full_soup.select("script, style, nav, footer, header, aside"): |
|
|
element.decompose() |
|
|
|
|
|
|
|
|
main = ( |
|
|
full_soup.find("main") |
|
|
or full_soup.find("article") |
|
|
or full_soup.find("div", class_=re.compile(r"content|main|post|article", re.I)) |
|
|
or full_soup.find("body") |
|
|
) |
|
|
|
|
|
if not main: |
|
|
return "No main content found on the webpage." |
|
|
|
|
|
|
|
|
markdown_text = md(str(main), heading_style="ATX") |
|
|
|
|
|
|
|
|
markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) |
|
|
markdown_text = re.sub(r"\[\s*\]\([^)]*\)", "", markdown_text) |
|
|
markdown_text = re.sub(r"[ \t]+", " ", markdown_text) |
|
|
markdown_text = markdown_text.strip() |
|
|
|
|
|
|
|
|
title = full_soup.find("title") |
|
|
if title and title.get_text(strip=True): |
|
|
markdown_text = f"# {title.get_text(strip=True)}\n\n{markdown_text}" |
|
|
|
|
|
return markdown_text or "No content could be extracted." |
|
|
|
|
|
|
|
|
def _truncate_markdown(markdown: str, max_chars: int) -> str: |
|
|
""" |
|
|
Truncate markdown content to a maximum character count while preserving structure. |
|
|
Tries to break at paragraph boundaries when possible. |
|
|
""" |
|
|
if len(markdown) <= max_chars: |
|
|
return markdown |
|
|
|
|
|
|
|
|
truncated = markdown[:max_chars] |
|
|
|
|
|
|
|
|
last_paragraph = truncated.rfind('\n\n') |
|
|
if last_paragraph > max_chars * 0.7: |
|
|
truncated = truncated[:last_paragraph] |
|
|
|
|
|
|
|
|
elif '.' in truncated[-100:]: |
|
|
last_period = truncated.rfind('.') |
|
|
if last_period > max_chars * 0.8: |
|
|
truncated = truncated[:last_period + 1] |
|
|
|
|
|
return truncated.rstrip() + "\n\n> *[Content truncated for brevity]*" |
|
|
|
|
|
|
|
|
def Fetch_Webpage( |
|
|
url: Annotated[str, "The absolute URL to fetch (must return HTML)."], |
|
|
verbosity: Annotated[str, "Controls output length: 'Brief' (1000 chars), 'Standard' (3000 chars), or 'Full' (complete page)."] = "Standard", |
|
|
) -> str: |
|
|
""" |
|
|
Fetch a web page and return it converted to Markdown format with configurable length. |
|
|
|
|
|
This function retrieves a webpage and converts its main content to clean Markdown, |
|
|
preserving headings, formatting, and structure. It automatically removes navigation, |
|
|
footers, scripts, and other non-content elements to focus on the main article or |
|
|
content area. |
|
|
|
|
|
Args: |
|
|
url (str): The absolute URL to fetch (must return HTML). |
|
|
verbosity (str): Controls output length: |
|
|
- "Brief": Truncate to 1000 characters for quick summaries |
|
|
- "Standard": Truncate to 3000 characters for balanced content |
|
|
- "Full": Return complete page content with no length limit |
|
|
|
|
|
Returns: |
|
|
str: The webpage content converted to Markdown format with: |
|
|
- Page title as H1 header |
|
|
- Main content converted to clean Markdown |
|
|
- Preserved heading hierarchy |
|
|
- Clean formatting without navigation/sidebar elements |
|
|
- Length controlled by verbosity setting |
|
|
""" |
|
|
_log_call_start("Fetch_Webpage", url=url, verbosity=verbosity) |
|
|
if not url or not url.strip(): |
|
|
result = "Please enter a valid URL." |
|
|
_log_call_end("Fetch_Webpage", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
try: |
|
|
resp = _http_get_enhanced(url) |
|
|
resp.raise_for_status() |
|
|
except requests.exceptions.RequestException as e: |
|
|
result = f"An error occurred: {e}" |
|
|
_log_call_end("Fetch_Webpage", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
final_url = str(resp.url) |
|
|
ctype = resp.headers.get("Content-Type", "") |
|
|
if "html" not in ctype.lower(): |
|
|
result = f"Unsupported content type for extraction: {ctype or 'unknown'}" |
|
|
_log_call_end("Fetch_Webpage", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
|
|
|
resp.encoding = resp.encoding or resp.apparent_encoding |
|
|
html = resp.text |
|
|
|
|
|
|
|
|
full_soup = BeautifulSoup(html, "lxml") |
|
|
markdown_content = _fullpage_markdown_from_soup(full_soup, final_url) |
|
|
|
|
|
|
|
|
if verbosity == "Brief": |
|
|
result = _truncate_markdown(markdown_content, 1000) |
|
|
elif verbosity == "Standard": |
|
|
result = _truncate_markdown(markdown_content, 3000) |
|
|
else: |
|
|
result = markdown_content |
|
|
_log_call_end("Fetch_Webpage", f"markdown_chars={len(result)}") |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime, timedelta |
|
|
|
|
|
class RateLimiter: |
|
|
def __init__(self, requests_per_minute: int = 30): |
|
|
self.requests_per_minute = requests_per_minute |
|
|
self.requests = [] |
|
|
|
|
|
def acquire(self): |
|
|
"""Synchronous rate limiting for non-async context""" |
|
|
now = datetime.now() |
|
|
|
|
|
self.requests = [ |
|
|
req for req in self.requests if now - req < timedelta(minutes=1) |
|
|
] |
|
|
|
|
|
if len(self.requests) >= self.requests_per_minute: |
|
|
|
|
|
wait_time = 60 - (now - self.requests[0]).total_seconds() |
|
|
if wait_time > 0: |
|
|
time.sleep(max(1, wait_time)) |
|
|
|
|
|
self.requests.append(now) |
|
|
|
|
|
|
|
|
_search_rate_limiter = RateLimiter(requests_per_minute=20) |
|
|
_fetch_rate_limiter = RateLimiter(requests_per_minute=25) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _truncate_for_log(value: str, limit: int = 500) -> str: |
|
|
"""Truncate long strings for concise terminal logging.""" |
|
|
if len(value) <= limit: |
|
|
return value |
|
|
return value[:limit - 1] + "…" |
|
|
|
|
|
|
|
|
def _serialize_input(val): |
|
|
"""Best-effort compact serialization of arbitrary input values for logging.""" |
|
|
try: |
|
|
if isinstance(val, (str, int, float, bool)) or val is None: |
|
|
return val |
|
|
if isinstance(val, (list, tuple)): |
|
|
return [_serialize_input(v) for v in list(val)[:10]] + (["…"] if len(val) > 10 else []) |
|
|
if isinstance(val, dict): |
|
|
out = {} |
|
|
for i, (k, v) in enumerate(val.items()): |
|
|
if i >= 12: |
|
|
out["…"] = "…" |
|
|
break |
|
|
out[str(k)] = _serialize_input(v) |
|
|
return out |
|
|
return repr(val)[:120] |
|
|
except Exception: |
|
|
return "<unserializable>" |
|
|
|
|
|
|
|
|
def _log_call_start(func_name: str, **kwargs) -> None: |
|
|
try: |
|
|
compact = {k: _serialize_input(v) for k, v in kwargs.items()} |
|
|
print(f"[TOOL CALL] {func_name} inputs: {json.dumps(compact, ensure_ascii=False)[:800]}", flush=True) |
|
|
except Exception as e: |
|
|
print(f"[TOOL CALL] {func_name} (failed to log inputs: {e})", flush=True) |
|
|
|
|
|
|
|
|
def _log_call_end(func_name: str, output_desc: str) -> None: |
|
|
try: |
|
|
print(f"[TOOL RESULT] {func_name} output: {output_desc}", flush=True) |
|
|
except Exception as e: |
|
|
print(f"[TOOL RESULT] {func_name} (failed to log output: {e})", flush=True) |
|
|
|
|
|
def Search_DuckDuckGo( |
|
|
query: Annotated[str, "The search query (supports operators like site:, quotes, OR)."], |
|
|
max_results: Annotated[int, "Number of results to return (1–20)."] = 5, |
|
|
) -> str: |
|
|
""" |
|
|
Run a DuckDuckGo search and return numbered results with URLs, titles, and summaries. |
|
|
|
|
|
Args: |
|
|
query (str): The search query string. Supports operators like site:, quotes for exact matching, |
|
|
OR for alternatives, and other DuckDuckGo search syntax. |
|
|
Examples: |
|
|
- Basic search: "Python programming" |
|
|
- Site search: "site:example.com" |
|
|
- Exact phrase: "artificial intelligence" |
|
|
- Exclude terms: "cats -dogs" |
|
|
max_results (int): Number of results to return (1–20). Default: 5. |
|
|
|
|
|
Returns: |
|
|
str: Search results in readable format with titles, URLs, and snippets as a numbered list. |
|
|
""" |
|
|
_log_call_start("Search_DuckDuckGo", query=query, max_results=max_results) |
|
|
if not query or not query.strip(): |
|
|
result = "No search query provided. Please enter a search term." |
|
|
_log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
|
|
|
max_results = max(1, min(20, max_results)) |
|
|
|
|
|
try: |
|
|
|
|
|
_search_rate_limiter.acquire() |
|
|
|
|
|
|
|
|
with DDGS() as ddgs: |
|
|
raw = ddgs.text(query, max_results=max_results) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Search failed: {str(e)[:200]}" |
|
|
if "blocked" in str(e).lower() or "rate" in str(e).lower(): |
|
|
error_msg = "Search temporarily blocked due to rate limiting. Please try again in a few minutes." |
|
|
elif "timeout" in str(e).lower(): |
|
|
error_msg = "Search timed out. Please try again with a simpler query." |
|
|
elif "network" in str(e).lower() or "connection" in str(e).lower(): |
|
|
error_msg = "Network connection error. Please check your internet connection and try again." |
|
|
result = f"Error: {error_msg}" |
|
|
_log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
if not raw: |
|
|
result = f"No results found for query: {query}" |
|
|
_log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
results = [] |
|
|
|
|
|
for r in raw or []: |
|
|
title = (r.get("title") or "").strip() |
|
|
url = (r.get("href") or r.get("link") or "").strip() |
|
|
body = (r.get("body") or r.get("snippet") or "").strip() |
|
|
|
|
|
if not url: |
|
|
continue |
|
|
|
|
|
result_obj = { |
|
|
"title": title or _domain_of(url), |
|
|
"url": url, |
|
|
"snippet": body |
|
|
} |
|
|
|
|
|
results.append(result_obj) |
|
|
|
|
|
if not results: |
|
|
result = f"No valid results found for query: {query}" |
|
|
_log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
|
|
|
lines = [f"Found {len(results)} search results for: {query}\n"] |
|
|
for i, result in enumerate(results, 1): |
|
|
lines.append(f"{i}. {result['title']}") |
|
|
lines.append(f" URL: {result['url']}") |
|
|
if result['snippet']: |
|
|
lines.append(f" Summary: {result['snippet']}") |
|
|
lines.append("") |
|
|
result = "\n".join(lines) |
|
|
_log_call_end("Search_DuckDuckGo", f"results={len(results)} chars={len(result)}") |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def Execute_Python(code: Annotated[str, "Python source code to run; stdout is captured and returned."]) -> str: |
|
|
""" |
|
|
Execute arbitrary Python code and return captured stdout or an error message. |
|
|
|
|
|
Args: |
|
|
code (str): Python source code to run; stdout is captured and returned. |
|
|
|
|
|
Returns: |
|
|
str: Combined stdout produced by the code, or the exception text if |
|
|
execution failed. |
|
|
""" |
|
|
_log_call_start("Execute_Python", code=_truncate_for_log(code or "", 300)) |
|
|
if code is None: |
|
|
result = "No code provided." |
|
|
_log_call_end("Execute_Python", result) |
|
|
return result |
|
|
|
|
|
old_stdout = sys.stdout |
|
|
redirected_output = sys.stdout = StringIO() |
|
|
try: |
|
|
exec(code) |
|
|
result = redirected_output.getvalue() |
|
|
except Exception as e: |
|
|
result = str(e) |
|
|
finally: |
|
|
sys.stdout = old_stdout |
|
|
_log_call_end("Execute_Python", _truncate_for_log(result)) |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_KOKORO_STATE = { |
|
|
"initialized": False, |
|
|
"device": "cpu", |
|
|
"model": None, |
|
|
"pipelines": {}, |
|
|
} |
|
|
|
|
|
|
|
|
def get_kokoro_voices(): |
|
|
"""Get comprehensive list of available Kokoro voice IDs (54 total).""" |
|
|
try: |
|
|
from huggingface_hub import list_repo_files |
|
|
|
|
|
files = list_repo_files('hexgrad/Kokoro-82M') |
|
|
voice_files = [f for f in files if f.endswith('.pt') and f.startswith('voices/')] |
|
|
voices = [f.replace('voices/', '').replace('.pt', '') for f in voice_files] |
|
|
return sorted(voices) if voices else _get_fallback_voices() |
|
|
except Exception: |
|
|
return _get_fallback_voices() |
|
|
|
|
|
|
|
|
def _get_fallback_voices(): |
|
|
"""Return comprehensive fallback list of known Kokoro voices (54 total).""" |
|
|
return [ |
|
|
|
|
|
"af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", |
|
|
"af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky", |
|
|
|
|
|
"am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", |
|
|
"am_michael", "am_onyx", "am_puck", "am_santa", |
|
|
|
|
|
"bf_alice", "bf_emma", "bf_isabella", "bf_lily", |
|
|
|
|
|
"bm_daniel", "bm_fable", "bm_george", "bm_lewis", |
|
|
|
|
|
"ef_dora", "em_alex", "em_santa", |
|
|
|
|
|
"ff_siwis", |
|
|
|
|
|
"hf_alpha", "hf_beta", "hm_omega", "hm_psi", |
|
|
|
|
|
"if_sara", "im_nicola", |
|
|
|
|
|
"jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo", |
|
|
|
|
|
"pf_dora", "pm_alex", "pm_santa", |
|
|
|
|
|
"zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", |
|
|
"zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang" |
|
|
] |
|
|
|
|
|
|
|
|
def _init_kokoro() -> None: |
|
|
"""Lazy-initialize Kokoro model and pipelines on first use. |
|
|
|
|
|
Tries CUDA if torch is present and available; falls back to CPU. Keeps a |
|
|
minimal English pipeline and custom lexicon tweak for the word "kokoro". |
|
|
""" |
|
|
if _KOKORO_STATE["initialized"]: |
|
|
return |
|
|
|
|
|
if KModel is None or KPipeline is None: |
|
|
raise RuntimeError( |
|
|
"Kokoro is not installed. Please install the 'kokoro' package (>=0.9.4)." |
|
|
) |
|
|
|
|
|
device = "cpu" |
|
|
if torch is not None: |
|
|
try: |
|
|
if torch.cuda.is_available(): |
|
|
device = "cuda" |
|
|
except Exception: |
|
|
device = "cpu" |
|
|
|
|
|
model = KModel().to(device).eval() |
|
|
pipelines = {"a": KPipeline(lang_code="a", model=False)} |
|
|
|
|
|
try: |
|
|
pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO" |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
_KOKORO_STATE.update( |
|
|
{ |
|
|
"initialized": True, |
|
|
"device": device, |
|
|
"model": model, |
|
|
"pipelines": pipelines, |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
def List_Kokoro_Voices() -> List[str]: |
|
|
""" |
|
|
Get a list of all available Kokoro voice identifiers. |
|
|
|
|
|
This MCP tool helps clients discover the 54 available voice options |
|
|
for the Generate_Speech tool. |
|
|
|
|
|
Returns: |
|
|
List[str]: A list of voice identifiers (e.g., ["af_heart", "am_adam", "bf_alice", ...]) |
|
|
|
|
|
Voice naming convention: |
|
|
- First 2 letters: Language/Region (af=American Female, am=American Male, bf=British Female, etc.) |
|
|
- Following letters: Voice name (heart, adam, alice, etc.) |
|
|
|
|
|
Available categories: |
|
|
- American Female/Male (20 voices) |
|
|
- British Female/Male (8 voices) |
|
|
- European Female/Male (3 voices) |
|
|
- French Female (1 voice) |
|
|
- Hindi Female/Male (4 voices) |
|
|
- Italian Female/Male (2 voices) |
|
|
- Japanese Female/Male (5 voices) |
|
|
- Portuguese Female/Male (3 voices) |
|
|
- Chinese Female/Male (8 voices) |
|
|
""" |
|
|
return get_kokoro_voices() |
|
|
|
|
|
|
|
|
def Generate_Speech( |
|
|
text: Annotated[str, "The text to synthesize (English)."], |
|
|
speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.25, |
|
|
voice: Annotated[str, "Voice identifier from 54 available options."] = "af_heart", |
|
|
) -> Tuple[int, np.ndarray]: |
|
|
""" |
|
|
Synthesize speech from text using the Kokoro-82M TTS model. |
|
|
|
|
|
This function returns raw audio suitable for a Gradio Audio component and is |
|
|
also exposed as an MCP tool. It supports 54 different voices across multiple |
|
|
languages and accents including American, British, European, Hindi, Italian, |
|
|
Japanese, Portuguese, and Chinese speakers. |
|
|
|
|
|
Args: |
|
|
text (str): The text to synthesize. Works best with English but supports multiple languages. |
|
|
speed (float): Speech speed multiplier in 0.5–2.0; 1.0 = normal speed. Default: 1.25 (slightly brisk). |
|
|
voice (str): Voice identifier from 54 available options. Default: 'af_heart'. |
|
|
|
|
|
Returns: |
|
|
A tuple of (sample_rate_hz, audio_waveform) where: |
|
|
- sample_rate_hz: int sample rate in Hz (24_000) |
|
|
- audio_waveform: numpy.ndarray float32 mono waveform in range [-1, 1] |
|
|
""" |
|
|
_log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), speed=speed, voice=voice) |
|
|
if not text or not text.strip(): |
|
|
try: |
|
|
_log_call_end("Generate_Speech", "error=empty text") |
|
|
finally: |
|
|
pass |
|
|
raise gr.Error("Please provide non-empty text to synthesize.") |
|
|
|
|
|
_init_kokoro() |
|
|
model = _KOKORO_STATE["model"] |
|
|
pipelines = _KOKORO_STATE["pipelines"] |
|
|
|
|
|
pipeline = pipelines.get("a") |
|
|
if pipeline is None: |
|
|
raise gr.Error("Kokoro English pipeline not initialized.") |
|
|
|
|
|
|
|
|
audio_segments = [] |
|
|
pack = pipeline.load_voice(voice) |
|
|
|
|
|
try: |
|
|
|
|
|
segments = list(pipeline(text, voice, speed)) |
|
|
total_segments = len(segments) |
|
|
|
|
|
|
|
|
for segment_idx, (text_chunk, ps, _) in enumerate(segments): |
|
|
ref_s = pack[len(ps) - 1] |
|
|
try: |
|
|
audio = model(ps, ref_s, float(speed)) |
|
|
audio_segments.append(audio.detach().cpu().numpy()) |
|
|
|
|
|
|
|
|
if total_segments > 10 and (segment_idx + 1) % 5 == 0: |
|
|
print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...") |
|
|
|
|
|
except Exception as e: |
|
|
raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {str(e)}") |
|
|
|
|
|
if not audio_segments: |
|
|
raise gr.Error("No audio was generated (empty synthesis result).") |
|
|
|
|
|
|
|
|
if len(audio_segments) == 1: |
|
|
final_audio = audio_segments[0] |
|
|
else: |
|
|
final_audio = np.concatenate(audio_segments, axis=0) |
|
|
|
|
|
duration = len(final_audio) / 24_000 |
|
|
if total_segments > 1: |
|
|
print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio") |
|
|
|
|
|
|
|
|
_log_call_end("Generate_Speech", f"samples={final_audio.shape[0]} duration_sec={len(final_audio)/24_000:.2f}") |
|
|
return 24_000, final_audio |
|
|
|
|
|
except gr.Error as e: |
|
|
_log_call_end("Generate_Speech", f"gr_error={str(e)}") |
|
|
raise |
|
|
except Exception as e: |
|
|
_log_call_end("Generate_Speech", f"error={str(e)[:120]}") |
|
|
raise gr.Error(f"Error during speech generation: {str(e)}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MEMORY_FILE = os.path.join(os.path.dirname(__file__), "memories.json") |
|
|
_MEMORY_LOCK = threading.RLock() |
|
|
_MAX_MEMORIES = 10_000 |
|
|
|
|
|
|
|
|
def _now_iso() -> str: |
|
|
return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
|
|
|
|
|
def _load_memories() -> List[Dict[str, str]]: |
|
|
"""Internal helper: load memory list from disk. |
|
|
|
|
|
Returns an empty list if the file does not exist or is unreadable. |
|
|
If the JSON is corrupted, a *.corrupt backup is written once and a |
|
|
fresh empty list is returned (fail‑open philosophy for tool usage). |
|
|
""" |
|
|
if not os.path.exists(MEMORY_FILE): |
|
|
return [] |
|
|
try: |
|
|
with open(MEMORY_FILE, "r", encoding="utf-8") as f: |
|
|
data = json.load(f) |
|
|
if isinstance(data, list): |
|
|
|
|
|
cleaned: List[Dict[str, str]] = [] |
|
|
for item in data: |
|
|
if isinstance(item, dict) and "id" in item and "text" in item: |
|
|
cleaned.append(item) |
|
|
return cleaned |
|
|
return [] |
|
|
except Exception: |
|
|
|
|
|
try: |
|
|
backup = MEMORY_FILE + ".corrupt" |
|
|
if not os.path.exists(backup): |
|
|
os.replace(MEMORY_FILE, backup) |
|
|
except Exception: |
|
|
pass |
|
|
return [] |
|
|
|
|
|
|
|
|
def _save_memories(memories: List[Dict[str, str]]) -> None: |
|
|
"""Persist memory list atomically to disk (write temp then replace).""" |
|
|
tmp_path = MEMORY_FILE + ".tmp" |
|
|
with open(tmp_path, "w", encoding="utf-8") as f: |
|
|
json.dump(memories, f, ensure_ascii=False, indent=2) |
|
|
os.replace(tmp_path, MEMORY_FILE) |
|
|
|
|
|
|
|
|
def _mem_save( |
|
|
text: Annotated[str, "Raw textual content to remember (will be stored verbatim)."], |
|
|
tags: Annotated[str, "Optional comma-separated tags for lightweight categorization (e.g. 'user, preference')."] = "", |
|
|
) -> str: |
|
|
"""(Internal) Persist a new memory record. |
|
|
|
|
|
Summary: |
|
|
Adds a memory object to the local JSON store (no external database). |
|
|
|
|
|
Stored Fields: |
|
|
- id (str, UUID4) |
|
|
- text (str, verbatim user content) |
|
|
- timestamp (UTC "YYYY-MM-DD HH:MM:SS") |
|
|
- tags (str, original comma-separated tag string) |
|
|
|
|
|
Behavior / Rules: |
|
|
1. Whitespace is trimmed; empty text is rejected. |
|
|
2. If the most recent existing memory has identical text, the new one is skipped (light dedupe heuristic). |
|
|
3. When total entries exceed _MAX_MEMORIES, oldest entries are pruned (soft cap). |
|
|
4. Operation is protected by an in‑process reentrant lock only (no cross‑process locking). |
|
|
|
|
|
Returns: |
|
|
str: Human readable confirmation containing the new memory UUID (full or prefix |
|
|
|
|
|
Security / Privacy: |
|
|
Data is plaintext JSON on local disk; do NOT store secrets or regulated data. |
|
|
""" |
|
|
text_clean = (text or "").strip() |
|
|
if not text_clean: |
|
|
return "Error: memory text is empty." |
|
|
|
|
|
with _MEMORY_LOCK: |
|
|
memories = _load_memories() |
|
|
if memories and memories[-1].get("text") == text_clean: |
|
|
return "Skipped: identical to last stored memory." |
|
|
|
|
|
mem_id = str(uuid.uuid4()) |
|
|
entry = { |
|
|
"id": mem_id, |
|
|
"text": text_clean, |
|
|
"timestamp": _now_iso(), |
|
|
"tags": tags.strip(), |
|
|
} |
|
|
memories.append(entry) |
|
|
if len(memories) > _MAX_MEMORIES: |
|
|
|
|
|
overflow = len(memories) - _MAX_MEMORIES |
|
|
memories = memories[overflow:] |
|
|
_save_memories(memories) |
|
|
return f"Memory saved: {mem_id}" |
|
|
|
|
|
|
|
|
def _mem_list( |
|
|
limit: Annotated[int, "Maximum number of most recent memories to return (1–200)."] = 20, |
|
|
include_tags: Annotated[bool, "If true, include tags column in output."] = True, |
|
|
) -> str: |
|
|
"""(Internal) List most recent memories. |
|
|
|
|
|
Parameters: |
|
|
limit (int): Max rows to return; clamped to [1, 200]. |
|
|
include_tags (bool): Include tags section when True. |
|
|
|
|
|
Output Format (one per line): |
|
|
<uuid_prefix> [YYYY-MM-DD HH:MM:SS] <text> | tags: <tag list> |
|
|
(Tag column omitted if empty or include_tags=False.) |
|
|
|
|
|
Returns: |
|
|
str: Joined newline string or a friendly "No memories stored." message. |
|
|
""" |
|
|
limit = max(1, min(200, limit)) |
|
|
with _MEMORY_LOCK: |
|
|
memories = _load_memories() |
|
|
if not memories: |
|
|
return "No memories stored yet." |
|
|
|
|
|
chosen = memories[-limit:][::-1] |
|
|
lines: List[str] = [] |
|
|
for m in chosen: |
|
|
base = f"{m['id'][:8]} [{m.get('timestamp','?')}] {m.get('text','')}" |
|
|
if include_tags and m.get("tags"): |
|
|
base += f" | tags: {m['tags']}" |
|
|
lines.append(base) |
|
|
omitted = len(memories) - len(chosen) |
|
|
if omitted > 0: |
|
|
lines.append(f"… ({omitted} older memorie{'s' if omitted!=1 else ''} omitted; total={len(memories)})") |
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def _mem_search( |
|
|
query: Annotated[str, "Case-insensitive substring search; space-separated terms are ANDed."], |
|
|
limit: Annotated[int, "Maximum number of matches (1–200)."] = 20, |
|
|
) -> str: |
|
|
"""(Internal) Full-text style AND search across text and tags. |
|
|
|
|
|
Search Semantics: |
|
|
- Split query on whitespace into individual terms. |
|
|
- A memory matches only if EVERY term appears (case-insensitive) in the text OR tags field. |
|
|
- Results are ordered newest-first (descending timestamp). |
|
|
|
|
|
Parameters: |
|
|
query (str): Raw user query string; must contain at least one non-space character. |
|
|
limit (int): Max rows to return; clamped to [1, 200]. |
|
|
|
|
|
Returns: |
|
|
str: Formatted lines identical to _mem_list output or "No matches". |
|
|
""" |
|
|
q = (query or "").strip() |
|
|
if not q: |
|
|
return "Error: empty query." |
|
|
terms = [t.lower() for t in q.split() if t.strip()] |
|
|
if not terms: |
|
|
return "Error: no valid search terms." |
|
|
limit = max(1, min(200, limit)) |
|
|
with _MEMORY_LOCK: |
|
|
memories = _load_memories() |
|
|
|
|
|
matches: List[Dict[str, str]] = [] |
|
|
total_matches = 0 |
|
|
for m in reversed(memories): |
|
|
hay = (m.get("text", "") + " " + m.get("tags", "")).lower() |
|
|
if all(t in hay for t in terms): |
|
|
total_matches += 1 |
|
|
if len(matches) < limit: |
|
|
matches.append(m) |
|
|
if not matches: |
|
|
return f"No matches for: {query}" |
|
|
lines = [ |
|
|
f"{m['id'][:8]} [{m.get('timestamp','?')}] {m.get('text','')}" + (f" | tags: {m['tags']}" if m.get('tags') else "") |
|
|
for m in matches |
|
|
] |
|
|
omitted = total_matches - len(matches) |
|
|
if omitted > 0: |
|
|
lines.append(f"… ({omitted} additional match{'es' if omitted!=1 else ''} omitted; total_matches={total_matches})") |
|
|
return "\n".join(lines) |
|
|
|
|
|
|
|
|
def _mem_delete( |
|
|
memory_id: Annotated[str, "Full UUID or a unique prefix (>=4 chars) of the memory id to delete."], |
|
|
) -> str: |
|
|
"""(Internal) Delete one memory by UUID or unique prefix. |
|
|
|
|
|
Parameters: |
|
|
memory_id (str): Full UUID4 (preferred) OR a unique prefix (>=4 chars). If prefix is ambiguous, no deletion occurs. |
|
|
|
|
|
Returns: |
|
|
str: One of: success message, ambiguity notice, or not-found message. |
|
|
|
|
|
Safety: |
|
|
Ambiguous prefixes are rejected to prevent accidental mass deletion. |
|
|
""" |
|
|
key = (memory_id or "").strip().lower() |
|
|
if len(key) < 4: |
|
|
return "Error: supply at least 4 characters of the id." |
|
|
with _MEMORY_LOCK: |
|
|
memories = _load_memories() |
|
|
matched = [m for m in memories if m["id"].lower().startswith(key)] |
|
|
if not matched: |
|
|
return "Memory not found." |
|
|
if len(matched) > 1 and key != matched[0]["id"].lower(): |
|
|
|
|
|
sample = ", ".join(m["id"][:8] for m in matched[:5]) |
|
|
more = "…" if len(matched) > 5 else "" |
|
|
return f"Ambiguous prefix (matches {len(matched)} ids: {sample}{more}). Provide more characters." |
|
|
|
|
|
target_id = matched[0]["id"] |
|
|
memories = [m for m in memories if m["id"] != target_id] |
|
|
_save_memories(memories) |
|
|
return f"Deleted memory: {target_id}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fetch_interface = gr.Interface( |
|
|
fn=Fetch_Webpage, |
|
|
inputs=[ |
|
|
gr.Textbox(label="URL", placeholder="https://example.com/article"), |
|
|
gr.Dropdown( |
|
|
label="Verbosity", |
|
|
choices=["Brief", "Standard", "Full"], |
|
|
value="Standard", |
|
|
info="Brief: 1000 chars, Standard: 3000 chars, Full: complete page" |
|
|
), |
|
|
], |
|
|
outputs=gr.Markdown(label="Extracted Markdown"), |
|
|
title="Fetch Webpage", |
|
|
description=( |
|
|
"<div style=\"text-align:center\">Convert any webpage to clean Markdown format with configurable length, preserving structure and formatting while removing navigation and clutter.</div>" |
|
|
), |
|
|
api_description=( |
|
|
"Fetch a web page and return it converted to Markdown format with configurable length. " |
|
|
"Parameters: url (str - absolute URL), verbosity (str - Brief/Standard/Full controlling output length: Brief=1000 chars, Standard=3000 chars, Full=complete page)." |
|
|
), |
|
|
flagging_mode="never", |
|
|
) |
|
|
|
|
|
|
|
|
concise_interface = gr.Interface( |
|
|
fn=Search_DuckDuckGo, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Query", placeholder="topic OR site:example.com"), |
|
|
gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max results"), |
|
|
], |
|
|
outputs=gr.Textbox(label="Search Results", interactive=False), |
|
|
title="DuckDuckGo Search", |
|
|
description=( |
|
|
"<div style=\"text-align:center\">Web search with readable output format. Supports advanced search operators.</div>" |
|
|
), |
|
|
api_description=( |
|
|
"Run a DuckDuckGo search and return numbered results with URLs, titles, and summaries. " |
|
|
"Supports advanced search operators: site: for specific domains, quotes for exact phrases, " |
|
|
"OR for alternatives, and - to exclude terms. Examples: 'Python programming', 'site:example.com', " |
|
|
"'\"artificial intelligence\"', 'cats -dogs', 'Python OR JavaScript'." |
|
|
), |
|
|
flagging_mode="never", |
|
|
submit_btn="Search", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
code_interface = gr.Interface( |
|
|
fn=Execute_Python, |
|
|
inputs=gr.Code(label="Python Code", language="python"), |
|
|
outputs=gr.Textbox(label="Output"), |
|
|
title="Python Code Executor", |
|
|
description=( |
|
|
"<div style=\"text-align:center\">Execute Python code and see the output.</div>" |
|
|
), |
|
|
api_description=( |
|
|
"Execute arbitrary Python code and return captured stdout or an error message. " |
|
|
"Supports any valid Python code including imports, variables, functions, loops, and calculations. " |
|
|
"Examples: 'print(2+2)', 'import math; print(math.sqrt(16))', 'for i in range(3): print(i)'. " |
|
|
"Parameters: code (str - Python source code to execute). " |
|
|
"Returns: Combined stdout output or exception text if execution fails." |
|
|
), |
|
|
flagging_mode="never", |
|
|
) |
|
|
|
|
|
CSS_STYLES = """ |
|
|
.gradio-container h1 { |
|
|
text-align: center; |
|
|
/* Ensure main title appears first, then our two subtitle lines */ |
|
|
display: grid; |
|
|
justify-items: center; |
|
|
} |
|
|
/* Place bold tools list on line 2, normal auth note on line 3 (below title) */ |
|
|
.gradio-container h1::before { |
|
|
grid-row: 2; |
|
|
content: "Fetch Webpage | Search DuckDuckGo | Python Interpreter | Memory Manager | Kokoro TTS | Image Generation | Video Generation"; |
|
|
display: block; |
|
|
font-size: 1rem; |
|
|
font-weight: 700; |
|
|
opacity: 0.9; |
|
|
margin-top: 6px; |
|
|
white-space: pre-wrap; |
|
|
} |
|
|
.gradio-container h1::after { |
|
|
grid-row: 3; |
|
|
content: "Authentication is optional but Image/Video Generation require a `HF_READ_TOKEN` in env secrets. They are hidden otherwise. Same with Memory (intended for local use)."; |
|
|
display: block; |
|
|
font-size: 1rem; |
|
|
font-weight: 400; |
|
|
opacity: 0.9; |
|
|
margin-top: 2px; |
|
|
white-space: pre-wrap; |
|
|
} |
|
|
|
|
|
/* Remove inside tab panels so it doesn't duplicate under each tool title */ |
|
|
.gradio-container [role=\"tabpanel\"] h1::before, |
|
|
.gradio-container [role=\"tabpanel\"] h1::after { |
|
|
content: none !important; |
|
|
} |
|
|
""" |
|
|
|
|
|
|
|
|
available_voices = get_kokoro_voices() |
|
|
kokoro_interface = gr.Interface( |
|
|
fn=Generate_Speech, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4), |
|
|
gr.Slider(minimum=0.5, maximum=2.0, value=1.25, step=0.1, label="Speed"), |
|
|
gr.Dropdown( |
|
|
label="Voice", |
|
|
choices=available_voices, |
|
|
value="af_heart", |
|
|
info="Select from 54 available voices across multiple languages and accents" |
|
|
), |
|
|
], |
|
|
outputs=gr.Audio(label="Audio", type="numpy", format="wav", show_download_button=True), |
|
|
title="Kokoro TTS", |
|
|
description=( |
|
|
"<div style=\"text-align:center\">Generate speech with Kokoro-82M. Supports multiple languages and accents. Runs on CPU or CUDA if available.</div>" |
|
|
), |
|
|
api_description=( |
|
|
"Synthesize speech from text using Kokoro-82M TTS model. Returns (sample_rate, waveform) suitable for playback. " |
|
|
"Supports unlimited text length by processing all segments. Voice examples: 'af_heart' (US female), 'am_onyx' (US male), " |
|
|
"'bf_emma' (British female), 'af_sky' (US female), 'af_nicole' (US female), " |
|
|
"Parameters: text (str), speed (float 0.5–2.0, default 1.25x), voice (str from 54 available options, default 'af_heart'). " |
|
|
"Return the generated media to the user in this format ``" |
|
|
), |
|
|
flagging_mode="never", |
|
|
) |
|
|
|
|
|
def Memory_Manager( |
|
|
action: Annotated[Literal["save","list","search","delete"], "Action to perform: save | list | search | delete"], |
|
|
text: Annotated[Optional[str], "Text content (Save only)"] = None, |
|
|
tags: Annotated[Optional[str], "Comma-separated tags (Save only)"] = None, |
|
|
query: Annotated[Optional[str], "Search query terms (Search only)"] = None, |
|
|
limit: Annotated[int, "Max results (List/Search only)"] = 20, |
|
|
memory_id: Annotated[Optional[str], "Full UUID or unique prefix (Delete only)"] = None, |
|
|
include_tags: Annotated[bool, "Include tags (List/Search only)"] = True, |
|
|
) -> str: |
|
|
"""Manage lightweight local JSON “memories” (save | list | search | delete) in one MCP tool. |
|
|
|
|
|
Overview: |
|
|
This tool provides simple, local, append‑only style persistence for short text memories |
|
|
with optional tags. Data is stored in a plaintext JSON file ("memories.json") beside the |
|
|
application; no external database or network access is required. |
|
|
|
|
|
Supported Actions: |
|
|
- save : Store a new memory (requires 'text'; optional 'tags'). |
|
|
- list : Return the most recent memories (respects 'limit' + 'include_tags'). |
|
|
- search : AND match space‑separated terms across text and tags (uses 'query', 'limit'). |
|
|
- delete : Remove one memory by full UUID or unique prefix (uses 'memory_id'). |
|
|
|
|
|
Parameter Usage by Action: |
|
|
action=save -> text (required), tags (optional) |
|
|
action=list -> limit, include_tags |
|
|
action=search -> query (required), limit, include_tags |
|
|
action=delete -> memory_id (required) |
|
|
|
|
|
Parameters: |
|
|
action (Literal[save|list|search|delete]): Operation selector (case-insensitive). |
|
|
text (str): Raw memory content; leading/trailing whitespace trimmed (save only). |
|
|
tags (str): Optional comma-separated tags; stored verbatim (save only). |
|
|
query (str): Space-separated terms (AND logic, case-insensitive) across text+tags (search only). |
|
|
limit (int): Maximum rows for list/search (clamped internally to 1–200). |
|
|
memory_id (str): Full UUID or unique prefix (>=4 chars) (delete only). |
|
|
include_tags (bool): When True, show tag column in list/search output. |
|
|
|
|
|
Storage Format (per entry): |
|
|
{"id": "<uuid4>", "text": "<original text>", "timestamp": "YYYY-MM-DD HH:MM:SS", "tags": "tag1, tag2"} |
|
|
|
|
|
Lifecycle & Constraints: |
|
|
- A soft cap of {_MAX_MEMORIES} entries is enforced by pruning oldest records on save. |
|
|
- A light duplicate guard skips saving if the newest existing entry has identical text. |
|
|
- All operations are protected by a thread‑local reentrant lock (NOT multi‑process safe). |
|
|
|
|
|
Returns: |
|
|
str: Human‑readable status / result lines (never raw JSON) suitable for direct model consumption. |
|
|
|
|
|
Error Modes: |
|
|
- Invalid action -> error string. |
|
|
- Missing required field for the chosen action -> explanatory message. |
|
|
- Ambiguous or unknown memory_id on delete -> clarification message. |
|
|
|
|
|
Security & Privacy: |
|
|
Plaintext JSON; do not store secrets, credentials, or regulated personal data. |
|
|
""" |
|
|
act = (action or "").lower().strip() |
|
|
|
|
|
|
|
|
text = text or "" |
|
|
tags = tags or "" |
|
|
query = query or "" |
|
|
memory_id = memory_id or "" |
|
|
|
|
|
if act == "save": |
|
|
if not text.strip(): |
|
|
return "Error: 'text' is required when action=save." |
|
|
return _mem_save(text=text, tags=tags) |
|
|
if act == "list": |
|
|
return _mem_list(limit=limit, include_tags=include_tags) |
|
|
if act == "search": |
|
|
if not query.strip(): |
|
|
return "Error: 'query' is required when action=search." |
|
|
return _mem_search(query=query, limit=limit) |
|
|
if act == "delete": |
|
|
if not memory_id.strip(): |
|
|
return "Error: 'memory_id' is required when action=delete." |
|
|
return _mem_delete(memory_id=memory_id) |
|
|
return "Error: invalid action (use save|list|search|delete)." |
|
|
|
|
|
memory_interface = gr.Interface( |
|
|
fn=Memory_Manager, |
|
|
inputs=[ |
|
|
gr.Dropdown(label="Action", choices=["save","list","search","delete"], value="list"), |
|
|
gr.Textbox(label="Text", lines=3, placeholder="Memory text (save)"), |
|
|
gr.Textbox(label="Tags", placeholder="tag1, tag2"), |
|
|
gr.Textbox(label="Query", placeholder="Search terms (search)"), |
|
|
gr.Slider(1, 200, value=20, step=1, label="Limit"), |
|
|
gr.Textbox(label="Memory ID / Prefix", placeholder="UUID or prefix (delete)"), |
|
|
gr.Checkbox(value=True, label="Include Tags"), |
|
|
], |
|
|
outputs=gr.Textbox(label="Result", lines=14), |
|
|
title="Memory Manager", |
|
|
description=( |
|
|
"<div style=\"text-align:center\">Lightweight local JSON memory store (no external DB). Choose an Action, fill only the relevant fields, and run.</div>" |
|
|
), |
|
|
api_description=( |
|
|
"Manage short text memories with optional tags. Actions: save(text,tags), list(limit,include_tags), " |
|
|
"search(query,limit,include_tags), delete(memory_id). Returns plaintext JSON. Action parameter is always required. " |
|
|
"Use Memory_Manager whenever you are given information worth remembering about the user, and search for memories when relevant." |
|
|
), |
|
|
flagging_mode="never", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
HF_API_TOKEN = os.getenv("HF_READ_TOKEN") |
|
|
|
|
|
|
|
|
def Generate_Image( |
|
|
prompt: Annotated[str, "Text description of the image to generate."], |
|
|
model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name' (e.g., black-forest-labs/FLUX.1-Krea-dev)."] = "black-forest-labs/FLUX.1-Krea-dev", |
|
|
negative_prompt: Annotated[str, "What should NOT appear in the image." ] = ( |
|
|
"(deformed, distorted, disfigured), poorly drawn, bad anatomy, wrong anatomy, extra limb, " |
|
|
"missing limb, floating limbs, (mutated hands and fingers), disconnected limbs, mutation, " |
|
|
"mutated, ugly, disgusting, blurry, amputation, misspellings, typos" |
|
|
), |
|
|
steps: Annotated[int, "Number of denoising steps (1–100). Higher = slower, potentially higher quality."] = 35, |
|
|
cfg_scale: Annotated[float, "Classifier-free guidance scale (1–20). Higher = follow the prompt more closely."] = 7.0, |
|
|
sampler: Annotated[str, "Sampling method label (UI only). Common options: 'DPM++ 2M Karras', 'DPM++ SDE Karras', 'Euler', 'Euler a', 'Heun', 'DDIM'."] = "DPM++ 2M Karras", |
|
|
seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1, |
|
|
width: Annotated[int, "Output width in pixels (64–1216, multiple of 32 recommended)."] = 1024, |
|
|
height: Annotated[int, "Output height in pixels (64–1216, multiple of 32 recommended)."] = 1024, |
|
|
) -> Image.Image: |
|
|
""" |
|
|
Generate a single image from a text prompt using a Hugging Face model via serverless inference. |
|
|
|
|
|
Args: |
|
|
prompt (str): Text description of the image to generate. |
|
|
model_id (str): The Hugging Face model id (creator/model-name). Defaults to "black-forest-labs/FLUX.1-Krea-dev". |
|
|
negative_prompt (str): What should NOT appear in the image. |
|
|
steps (int): Number of denoising steps (1–100). Higher can improve quality. |
|
|
cfg_scale (float): Guidance scale (1–20). Higher = follow the prompt more closely. |
|
|
sampler (str): Sampling method label for UI; not all providers expose this control. |
|
|
seed (int): Random seed. Use -1 to randomize on each call. |
|
|
width (int): Output width in pixels (64–1216; multiples of 32 recommended). |
|
|
height (int): Output height in pixels (64–1216; multiples of 32 recommended). |
|
|
|
|
|
Returns: |
|
|
PIL.Image.Image: The generated image. |
|
|
|
|
|
Error modes: |
|
|
- Raises gr.Error with a user-friendly message on auth/model/load errors. |
|
|
""" |
|
|
_log_call_start("Generate_Image", prompt=_truncate_for_log(prompt, 200), model_id=model_id, steps=steps, cfg_scale=cfg_scale, seed=seed, size=f"{width}x{height}") |
|
|
if not prompt or not prompt.strip(): |
|
|
_log_call_end("Generate_Image", "error=empty prompt") |
|
|
raise gr.Error("Please provide a non-empty prompt.") |
|
|
|
|
|
|
|
|
enhanced_prompt = f"{prompt} | ultra detail, ultra elaboration, ultra quality, perfect." |
|
|
|
|
|
|
|
|
providers = ["auto", "replicate", "fal-ai"] |
|
|
last_error: Exception | None = None |
|
|
|
|
|
for provider in providers: |
|
|
try: |
|
|
client = InferenceClient(api_key=HF_API_TOKEN, provider=provider) |
|
|
image = client.text_to_image( |
|
|
prompt=enhanced_prompt, |
|
|
negative_prompt=negative_prompt, |
|
|
model=model_id, |
|
|
width=width, |
|
|
height=height, |
|
|
num_inference_steps=steps, |
|
|
guidance_scale=cfg_scale, |
|
|
seed=seed if seed != -1 else random.randint(1, 1_000_000_000), |
|
|
) |
|
|
_log_call_end("Generate_Image", f"provider={provider} size={image.size}") |
|
|
return image |
|
|
except Exception as e: |
|
|
last_error = e |
|
|
continue |
|
|
|
|
|
|
|
|
msg = str(last_error) if last_error else "Unknown error" |
|
|
if "404" in msg: |
|
|
raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and your HF token access.") |
|
|
if "503" in msg: |
|
|
raise gr.Error("The model is warming up. Please try again shortly.") |
|
|
if "401" in msg or "403" in msg: |
|
|
raise gr.Error("Authentication failed. Set HF_READ_TOKEN environment variable with access to the model.") |
|
|
_log_call_end("Generate_Image", f"error={_truncate_for_log(msg, 200)}") |
|
|
raise gr.Error(f"Image generation failed: {msg}") |
|
|
|
|
|
|
|
|
image_generation_interface = gr.Interface( |
|
|
fn=Generate_Image, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Prompt", placeholder="Enter a prompt", lines=2), |
|
|
gr.Textbox(label="Model", value="black-forest-labs/FLUX.1-Krea-dev", placeholder="creator/model-name"), |
|
|
gr.Textbox( |
|
|
label="Negative Prompt", |
|
|
value=( |
|
|
"(deformed, distorted, disfigured), poorly drawn, bad anatomy, wrong anatomy, extra limb, " |
|
|
"missing limb, floating limbs, (mutated hands and fingers), disconnected limbs, mutation, " |
|
|
"mutated, ugly, disgusting, blurry, amputation, misspellings, typos" |
|
|
), |
|
|
lines=2, |
|
|
), |
|
|
gr.Slider(minimum=1, maximum=100, value=35, step=1, label="Steps"), |
|
|
gr.Slider(minimum=1.0, maximum=20.0, value=7.0, step=0.1, label="CFG Scale"), |
|
|
gr.Radio(label="Sampler", value="DPM++ 2M Karras", choices=[ |
|
|
"DPM++ 2M Karras", "DPM++ SDE Karras", "Euler", "Euler a", "Heun", "DDIM" |
|
|
]), |
|
|
gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"), |
|
|
gr.Slider(minimum=64, maximum=1216, value=1024, step=32, label="Width"), |
|
|
gr.Slider(minimum=64, maximum=1216, value=1024, step=32, label="Height"), |
|
|
], |
|
|
outputs=gr.Image(label="Generated Image"), |
|
|
title="Image Generation", |
|
|
description=( |
|
|
"<div style=\"text-align:center\">Generate images via Hugging Face serverless inference. " |
|
|
"Default model is FLUX.1-Krea-dev.</div>" |
|
|
), |
|
|
api_description=( |
|
|
"Generate a single image from a text prompt using a Hugging Face model via serverless inference. " |
|
|
"Supports creative prompts like 'a serene mountain landscape at sunset', 'portrait of a wise owl', " |
|
|
"'futuristic city with flying cars'. Default model: FLUX.1-Krea-dev. " |
|
|
"Parameters: prompt (str), model_id (str, creator/model-name), negative_prompt (str), steps (int, 1–100), " |
|
|
"cfg_scale (float, 1–20), sampler (str), seed (int, -1=random), width/height (int, 64–1216). " |
|
|
"Returns a PIL.Image. Return the generated media to the user in this format ``" |
|
|
), |
|
|
flagging_mode="never", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _write_video_tmp(data_iter_or_bytes: object, suffix: str = ".mp4") -> str: |
|
|
"""Write video bytes or iterable of bytes to a system temporary file and return its path. |
|
|
|
|
|
This avoids polluting the project directory. The file is created in the OS temp |
|
|
location; Gradio will handle serving & offering the download button. |
|
|
""" |
|
|
fd, fname = tempfile.mkstemp(suffix=suffix) |
|
|
try: |
|
|
with os.fdopen(fd, "wb") as f: |
|
|
if isinstance(data_iter_or_bytes, (bytes, bytearray)): |
|
|
f.write(data_iter_or_bytes) |
|
|
elif hasattr(data_iter_or_bytes, "read"): |
|
|
f.write(data_iter_or_bytes.read()) |
|
|
elif hasattr(data_iter_or_bytes, "content"): |
|
|
f.write(data_iter_or_bytes.content) |
|
|
elif hasattr(data_iter_or_bytes, "__iter__") and not isinstance(data_iter_or_bytes, (str, dict)): |
|
|
for chunk in data_iter_or_bytes: |
|
|
if chunk: |
|
|
f.write(chunk) |
|
|
else: |
|
|
raise gr.Error("Unsupported video data type returned by provider.") |
|
|
except Exception: |
|
|
|
|
|
try: |
|
|
os.remove(fname) |
|
|
except Exception: |
|
|
pass |
|
|
raise |
|
|
return fname |
|
|
|
|
|
|
|
|
HF_VIDEO_TOKEN = os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN") |
|
|
|
|
|
|
|
|
def Generate_Video( |
|
|
prompt: Annotated[str, "Text description of the video to generate (e.g., 'a red fox running through a snowy forest at sunrise')."], |
|
|
model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name'. Defaults to Wan-AI/Wan2.2-T2V-A14B."] = "Wan-AI/Wan2.2-T2V-A14B", |
|
|
negative_prompt: Annotated[str, "What should NOT appear in the video."] = "", |
|
|
steps: Annotated[int, "Number of denoising steps (1–100). Higher can improve quality but is slower."] = 25, |
|
|
cfg_scale: Annotated[float, "Guidance scale (1–20). Higher = follow the prompt more closely, lower = more creative."] = 3.5, |
|
|
seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1, |
|
|
width: Annotated[int, "Output width in pixels (multiples of 8 recommended)."] = 768, |
|
|
height: Annotated[int, "Output height in pixels (multiples of 8 recommended)."] = 768, |
|
|
fps: Annotated[int, "Frames per second of the output video (e.g., 24)."] = 24, |
|
|
duration: Annotated[float, "Target duration in seconds (provider/model dependent, commonly 2–6s)."] = 4.0, |
|
|
) -> str: |
|
|
""" |
|
|
Generate a short video from a text prompt using a Hugging Face model via serverless inference. |
|
|
|
|
|
Args: |
|
|
prompt (str): Text description of the video to generate. |
|
|
model_id (str): The Hugging Face model id (creator/model-name). Defaults to "Wan-AI/Wan2.2-T2V-A14B". |
|
|
negative_prompt (str): What should NOT appear in the video. |
|
|
steps (int): Number of denoising steps (1–100). Higher can improve quality but is slower. |
|
|
cfg_scale (float): Guidance scale (1–20). Higher = follow the prompt more closely. |
|
|
seed (int): Random seed. Use -1 to randomize on each call. |
|
|
width (int): Output width in pixels. |
|
|
height (int): Output height in pixels. |
|
|
fps (int): Frames per second. |
|
|
duration (float): Target duration in seconds. |
|
|
|
|
|
Returns: |
|
|
str: Path to an MP4 file on disk (Gradio will serve this file; MCP converts it to a file URL). |
|
|
|
|
|
Error modes: |
|
|
- Raises gr.Error with a user-friendly message on auth/model/load errors or unsupported parameters. |
|
|
""" |
|
|
_log_call_start("Generate_Video", prompt=_truncate_for_log(prompt, 160), model_id=model_id, steps=steps, cfg_scale=cfg_scale, fps=fps, duration=duration, size=f"{width}x{height}") |
|
|
if not prompt or not prompt.strip(): |
|
|
_log_call_end("Generate_Video", "error=empty prompt") |
|
|
raise gr.Error("Please provide a non-empty prompt.") |
|
|
|
|
|
if not HF_VIDEO_TOKEN: |
|
|
|
|
|
pass |
|
|
|
|
|
providers = ["auto", "replicate", "fal-ai"] |
|
|
last_error: Exception | None = None |
|
|
|
|
|
|
|
|
parameters = { |
|
|
"negative_prompt": negative_prompt or None, |
|
|
"num_inference_steps": steps, |
|
|
"guidance_scale": cfg_scale, |
|
|
"seed": seed if seed != -1 else random.randint(1, 1_000_000_000), |
|
|
"width": width, |
|
|
"height": height, |
|
|
"fps": fps, |
|
|
|
|
|
|
|
|
"duration": duration, |
|
|
} |
|
|
|
|
|
for provider in providers: |
|
|
try: |
|
|
client = InferenceClient(api_key=HF_VIDEO_TOKEN, provider=provider) |
|
|
|
|
|
if hasattr(client, "text_to_video"): |
|
|
|
|
|
num_frames = int(duration * fps) if duration and fps else None |
|
|
|
|
|
|
|
|
extra_body = {} |
|
|
if width: |
|
|
extra_body["width"] = width |
|
|
if height: |
|
|
extra_body["height"] = height |
|
|
if fps: |
|
|
extra_body["fps"] = fps |
|
|
if duration: |
|
|
extra_body["duration"] = duration |
|
|
|
|
|
result = client.text_to_video( |
|
|
prompt=prompt, |
|
|
model=model_id, |
|
|
guidance_scale=cfg_scale, |
|
|
negative_prompt=[negative_prompt] if negative_prompt else None, |
|
|
num_frames=num_frames, |
|
|
num_inference_steps=steps, |
|
|
seed=parameters["seed"], |
|
|
extra_body=extra_body if extra_body else None, |
|
|
) |
|
|
else: |
|
|
|
|
|
result = client.post( |
|
|
model=model_id, |
|
|
json={ |
|
|
"inputs": prompt, |
|
|
"parameters": {k: v for k, v in parameters.items() if v is not None}, |
|
|
}, |
|
|
) |
|
|
|
|
|
|
|
|
path = _write_video_tmp(result, suffix=".mp4") |
|
|
try: |
|
|
size = os.path.getsize(path) |
|
|
except Exception: |
|
|
size = -1 |
|
|
_log_call_end("Generate_Video", f"provider={provider} path={os.path.basename(path)} bytes={size}") |
|
|
return path |
|
|
except Exception as e: |
|
|
last_error = e |
|
|
continue |
|
|
|
|
|
msg = str(last_error) if last_error else "Unknown error" |
|
|
if "404" in msg: |
|
|
raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and HF token access.") |
|
|
if "503" in msg: |
|
|
raise gr.Error("The model is warming up. Please try again shortly.") |
|
|
if "401" in msg or "403" in msg: |
|
|
raise gr.Error("Authentication failed or not permitted. Set HF_READ_TOKEN/HF_TOKEN with inference access.") |
|
|
_log_call_end("Generate_Video", f"error={_truncate_for_log(msg, 200)}") |
|
|
raise gr.Error(f"Video generation failed: {msg}") |
|
|
|
|
|
|
|
|
video_generation_interface = gr.Interface( |
|
|
fn=Generate_Video, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Prompt", placeholder="Enter a prompt for the video", lines=2), |
|
|
gr.Textbox(label="Model", value="Wan-AI/Wan2.2-T2V-A14B", placeholder="creator/model-name"), |
|
|
gr.Textbox(label="Negative Prompt", value="", lines=2), |
|
|
gr.Slider(minimum=1, maximum=100, value=25, step=1, label="Steps"), |
|
|
gr.Slider(minimum=1.0, maximum=20.0, value=3.5, step=0.1, label="CFG Scale"), |
|
|
gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"), |
|
|
gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Width"), |
|
|
gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Height"), |
|
|
gr.Slider(minimum=4, maximum=60, value=24, step=1, label="FPS"), |
|
|
gr.Slider(minimum=1.0, maximum=10.0, value=4.0, step=0.5, label="Duration (s)"), |
|
|
], |
|
|
outputs=gr.Video(label="Generated Video", show_download_button=True, format="mp4"), |
|
|
title="Video Generation", |
|
|
description=( |
|
|
"<div style=\"text-align:center\">Generate short videos via Hugging Face serverless inference. " |
|
|
"Default model is Wan2.2-T2V-A14B.</div>" |
|
|
), |
|
|
api_description=( |
|
|
"Generate a short video from a text prompt using a Hugging Face model via serverless inference. " |
|
|
"Create dynamic scenes like 'a red fox running through a snowy forest at sunrise', 'waves crashing on a rocky shore', " |
|
|
"'time-lapse of clouds moving across a blue sky'. Default model: Wan2.2-T2V-A14B (2-6 second videos). " |
|
|
"Parameters: prompt (str), model_id (str), negative_prompt (str), steps (int), cfg_scale (float), seed (int), " |
|
|
"width/height (int), fps (int), duration (float in seconds). Returns MP4 file path. " |
|
|
"Return the generated media to the user in this format ``" |
|
|
), |
|
|
flagging_mode="never", |
|
|
) |
|
|
|
|
|
|
|
|
HAS_HF_TOKEN = bool(HF_API_TOKEN or HF_VIDEO_TOKEN) |
|
|
|
|
|
_interfaces = [ |
|
|
fetch_interface, |
|
|
concise_interface, |
|
|
code_interface, |
|
|
kokoro_interface, |
|
|
] |
|
|
_tab_names = [ |
|
|
"Fetch Webpage", |
|
|
"DuckDuckGo Search", |
|
|
"Python Code Executor", |
|
|
"Kokoro TTS", |
|
|
] |
|
|
|
|
|
|
|
|
HAS_HF_READ = bool(HF_API_TOKEN) |
|
|
if HAS_HF_READ: |
|
|
|
|
|
insert_index = 3 if len(_interfaces) >= 3 else len(_interfaces) |
|
|
_interfaces.insert(insert_index, memory_interface) |
|
|
_tab_names.insert(insert_index, "Memory Manager") |
|
|
|
|
|
if HAS_HF_TOKEN: |
|
|
_interfaces.extend([image_generation_interface, video_generation_interface]) |
|
|
_tab_names.extend(["Image Generation", "Video Generation"]) |
|
|
|
|
|
demo = gr.TabbedInterface( |
|
|
interface_list=_interfaces, |
|
|
tab_names=_tab_names, |
|
|
title="Tools MCP", |
|
|
theme="Nymbo/Nymbo_Theme", |
|
|
css=CSS_STYLES, |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(mcp_server=True) |