gaia / agents /tools /mcp_tools.py
bstraehle's picture
Update agents/tools/mcp_tools.py
6615317 verified
# References:
# https://docs.crewai.com/introduction
import json, os
from agents.mcp.mcp_client import (
call_mcp_tool,
MCP_SSE_URL_CHESS_POSITION_EVALUATION,
MCP_TOOL_CHESS_POSITION_EVALUATION
)
from crewai.tools import tool
class MCPTools():
# Chess position evaluation
@tool("Best Move Tool")
def best_move_tool(fen: str) -> str:
"""Get best move with continuation in UCI notation for chess position in FEN.
Args:
fen (str): Chess position in FEN
Returns:
str: Best move with continuation in UCI notation (e.g., 'f7f2 e4e5 f2f1')
Raises:
RuntimeError: If processing fails
"""
print("")
print(f"๐Ÿ› ๏ธ MCPTools: best_move_tool: fen={fen}")
try:
mcp_url = os.getenv("MCP_SSE_URL", MCP_SSE_URL_CHESS_POSITION_EVALUATION)
raw_result = call_mcp_tool(
mcp_url=mcp_url,
tool_name=MCP_TOOL_CHESS_POSITION_EVALUATION,
arguments={"fen": fen}
)
if isinstance(raw_result, str):
try:
raw_result = eval(raw_result)
except:
pass
if isinstance(raw_result, tuple) and len(raw_result) > 0:
raw_result = raw_result[0]
result = None
if isinstance(raw_result, dict) and "continuation" in raw_result:
result = raw_result["continuation"]
else:
result = raw_result
print(f"๐Ÿ› ๏ธ MCPTools: best_move_tool: mcp_url={MCP_SSE_URL_CHESS_POSITION_EVALUATION}")
print(f"๐Ÿ› ๏ธ MCPTools: best_move_tool: tool_name={MCP_TOOL_CHESS_POSITION_EVALUATION}")
print(f"๐Ÿ› ๏ธ MCPTools: best_move_tool: result={result}")
return result
except Exception as e:
print(f"โš ๏ธ MCPTools: best_move_tool: exception={str(e)}")
raise RuntimeError(f"Processing failed: {str(e)}")