""" Integration with Cognee knowledge graph. This module provides functions to interact with the Cognee knowledge graph. """ import logging import asyncio import json import os from typing import Dict, Any, List, Optional, Literal import aiohttp from pydantic import BaseModel # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("cognee-integration") class SearchResponse(BaseModel): response: str sources: Optional[List[Dict[str, Any]]] = None async def search_knowledge_graph( query: str, search_type: Literal['GRAPH_COMPLETION', 'COMPLETION', 'INSIGHTS'] = 'GRAPH_COMPLETION' ) -> Dict[str, Any]: """ Search the Cognee knowledge graph. Args: query (str): The search query. search_type (str): The type of search to perform. Returns: Dict[str, Any]: The search results. """ try: cognee_url = os.environ.get('COGNEE_MCP_URL', 'http://cognee-mcp:8082') async with aiohttp.ClientSession() as session: async with session.post( f"{cognee_url}/search", json={ "query": query, "search_type": search_type }, headers={"Content-Type": "application/json"} ) as response: if response.status != 200: error_text = await response.text() logger.error(f"Error searching knowledge graph: {error_text}") return SearchResponse( response="I encountered an error while searching the knowledge graph." ).dict() data = await response.json() return SearchResponse( response=data.get("response", "No response from knowledge graph."), sources=data.get("sources", []) ).dict() except Exception as e: logger.error(f"Error searching knowledge graph: {str(e)}") return SearchResponse( response=f"I encountered an error while searching the knowledge graph: {str(e)}" ).dict() if __name__ == "__main__": # Example usage async def main(): result = await search_knowledge_graph("What is the relationship between climate change and biodiversity?") print(json.dumps(result, indent=2)) asyncio.run(main())