AgenticResearch / mcp-agents /cognee_integration.py
kahsuen's picture
Upload 1083 files
cf0f589 verified
"""
Integration with Cognee knowledge graph.
This module provides functions to interact with the Cognee knowledge graph.
"""
import logging
import asyncio
import json
import os
from typing import Dict, Any, List, Optional, Literal
import aiohttp
from pydantic import BaseModel
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("cognee-integration")
class SearchResponse(BaseModel):
response: str
sources: Optional[List[Dict[str, Any]]] = None
async def search_knowledge_graph(
query: str,
search_type: Literal['GRAPH_COMPLETION', 'COMPLETION', 'INSIGHTS'] = 'GRAPH_COMPLETION'
) -> Dict[str, Any]:
"""
Search the Cognee knowledge graph.
Args:
query (str): The search query.
search_type (str): The type of search to perform.
Returns:
Dict[str, Any]: The search results.
"""
try:
cognee_url = os.environ.get('COGNEE_MCP_URL', 'http://cognee-mcp:8082')
async with aiohttp.ClientSession() as session:
async with session.post(
f"{cognee_url}/search",
json={
"query": query,
"search_type": search_type
},
headers={"Content-Type": "application/json"}
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error searching knowledge graph: {error_text}")
return SearchResponse(
response="I encountered an error while searching the knowledge graph."
).dict()
data = await response.json()
return SearchResponse(
response=data.get("response", "No response from knowledge graph."),
sources=data.get("sources", [])
).dict()
except Exception as e:
logger.error(f"Error searching knowledge graph: {str(e)}")
return SearchResponse(
response=f"I encountered an error while searching the knowledge graph: {str(e)}"
).dict()
if __name__ == "__main__":
# Example usage
async def main():
result = await search_knowledge_graph("What is the relationship between climate change and biodiversity?")
print(json.dumps(result, indent=2))
asyncio.run(main())