Spaces:
Running
Running
import asyncio | |
from contextlib import asynccontextmanager | |
from typing import AsyncGenerator | |
from time import sleep | |
import base64 | |
import io | |
from PIL import Image | |
from mcp import ClientSession | |
from mcp.client.streamable_http import streamablehttp_client | |
org_or_user = "a-mahla" | |
space_name = "test_desktop" | |
async def get_mcp_session() -> AsyncGenerator[ClientSession, None]: | |
"""Context manager for recording the demo with MCP server""" | |
base_url = f"https://{org_or_user}-{space_name}.hf.space".replace("_", "-") | |
server_url = f"{base_url}/mcp/" | |
print("π¬ MCP Server started:", server_url) | |
# Connect to the server | |
async with streamablehttp_client(server_url) as ( | |
read_stream, | |
write_stream, | |
_, | |
): | |
async with ClientSession(read_stream, write_stream) as session: | |
await session.initialize() | |
yield session | |
async def main(): | |
async with get_mcp_session() as session: | |
response = await session.list_tools() | |
print("Available Tools:") | |
print("=" * 50) | |
for tool in response.tools: | |
print("-" * 50) | |
print(f"π '{tool.name}':", end="") | |
if tool.description: | |
desc = tool.description.strip() | |
print(f" {desc}", end="\n") | |
else: | |
print(" No description", end="\n") | |
print() | |
print("Opening https://www.huggingface.co for research...") | |
await session.call_tool("open", {"file_or_url": "https://www.huggingface.co/"}) | |
sleep(7) | |
print(await session.call_tool("move_mouse", {"x": 1200, "y": 120})) | |
sleep(2) | |
print(await session.call_tool("left_click", {})) | |
sleep(2) | |
print(await session.call_tool("move_mouse", {"x": 1200, "y": 160})) | |
print(await session.call_tool("left_click", {})) | |
sleep(2) | |
print(await session.call_tool("move_mouse", {"x": 1600, "y": 320})) | |
print(await session.call_tool("left_click", {})) | |
sleep(2) | |
response = await session.call_tool("screenshot", {}) | |
screenshot_base64 = response.content[0].data | |
screenshot_bytes = base64.b64decode(screenshot_base64) | |
image = Image.open(io.BytesIO(screenshot_bytes)) | |
image.save("screenshot.png") | |
if __name__ == "__main__": | |
asyncio.run(main()) |