import enum from lynxkite_core import ops from lynxkite_graph_analytics import core import fastapi from lynxscribe.components.chat.api import ChatAPI from lynxscribe.components.rag.rag_chatbot import Mode, RAGChatbot, Scenario, ScenarioSelector from lynxscribe.core.llm.base import get_llm_engine from lynxscribe.core.models.prompts import ChatCompletionPrompt lsop = ops.op_registration( "LynxKite Graph Analytics", "LynxScribe", dir="bottom-to-top", color="blue" ) dsop = ops.op_registration("LynxKite Graph Analytics", "Data Science") Placeholder = str @lsop("Chat frontend", color="gray", outputs=[], view="service") def chat_frontend(agent: Placeholder): return ChatAPIService(agent) @lsop("Agent") def agent( tools: list[Placeholder], *, purpose: str = "", description: ops.LongStr = "This agent helps with various tasks.", system_prompt: ops.LongStr = "You are a helpful assistant.", ): ts = [] for t in tools: t = "\n ".join(t.split("\n")) ts.append(f"- {t}\n") return f"Agent for {purpose} with {len(tools)} tools:\n{''.join(ts)}" @lsop("MCP: Query database with SQL", color="green") def sql_tool(db: Placeholder): return f"SQL over {db}" @ops.output_position(output="top") @dsop("Database") def db(data_pipeline: list[core.Bundle]): return f"DB with {len(data_pipeline)} bundles" class MCPSearchEngine(str, enum.Enum): Google = "Google" Bing = "Bing" DuckDuckGo = "DuckDuckGo" @lsop("MCP: Search web", color="green") def web_search(*, engine: MCPSearchEngine = MCPSearchEngine.Google): return f"Web search ({engine.name})" @lsop("MCP: Run ComfyUI workflow", color="green") def run_comfyui_workflow(*, workflow_name: str): return f"Run comfyui workflow ({workflow_name})" @lsop("MCP: Calculator", color="green") def calculator(): return "Calculator" class ChatAPIService: def __init__(self, agent: str): self.agent = agent self.chat_api = ChatAPI( chatbot=RAGChatbot( None, ScenarioSelector([Scenario(name="single", mode=Mode.LLM_ONLY)]), llm=get_llm_engine(name="openai"), ), model="gpt-4o-mini", ) async def get(self, request: fastapi.Request) -> dict: if request.state.remaining_path == "models": return { "object": "list", "data": [ { "id": "LynxScribe", "object": "model", "created": 0, "owned_by": "lynxkite", "meta": {"profile_image_url": "https://lynxkite.com/favicon.png"}, } ], } return {"error": "Not found"} async def post(self, request: fastapi.Request) -> dict: if request.state.remaining_path == "chat/completions": request = await request.json() if request["stream"]: from sse_starlette.sse import EventSourceResponse return EventSourceResponse(self.stream_chat_api_response(request)) else: return await self.get_chat_api_response(request) return {"error": "Not found"} async def stream_chat_api_response(self, request): request = ChatCompletionPrompt(**request) async for chunk in await self.chat_api.answer(request, stream=True): chunk.choices[0].delta.content += f"({self.agent})" yield chunk.model_dump_json() async def get_chat_api_response(self, request): request = ChatCompletionPrompt(**request) response = await self.chat_api.answer(request, stream=False) return response.model_dump()