|
|
import os |
|
|
import logging |
|
|
from typing import Dict, Any, List, Optional |
|
|
from src.tools.base_tool import BaseTool |
|
|
|
|
|
logger = logging.getLogger("healthcare-mcp") |
|
|
|
|
|
class HealthFinderTool(BaseTool): |
|
|
"""Tool for accessing health information from Health.gov""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the HealthFinder tool with base URL and HTTP client""" |
|
|
super().__init__(cache_db_path="healthcare_cache.db") |
|
|
self.base_url = "https://health.gov/myhealthfinder/api/v3" |
|
|
|
|
|
import requests |
|
|
self.http_client = requests |
|
|
|
|
|
async def get_health_topics(self, topic: str, language: str = "en") -> Dict[str, Any]: |
|
|
""" |
|
|
Get evidence-based health information on various topics with caching |
|
|
|
|
|
Args: |
|
|
topic: Health topic to search for information |
|
|
language: Language for content (en or es) |
|
|
|
|
|
Returns: |
|
|
Dictionary containing health information or error details |
|
|
""" |
|
|
|
|
|
if not topic: |
|
|
return self._format_error_response("Topic is required") |
|
|
|
|
|
|
|
|
language = language.lower() |
|
|
if language not in ["en", "es"]: |
|
|
language = "en" |
|
|
|
|
|
|
|
|
cache_key = self._get_cache_key("health_topics", topic, language) |
|
|
|
|
|
|
|
|
cached_result = self.cache.get(cache_key) |
|
|
if cached_result: |
|
|
logger.info(f"Cache hit for health topics: {topic}, language={language}") |
|
|
return cached_result |
|
|
|
|
|
try: |
|
|
logger.info(f"Fetching health information for topic: {topic}, language={language}") |
|
|
|
|
|
|
|
|
endpoint = f"{self.base_url}/topicsearch.json" |
|
|
params = { |
|
|
"keyword": topic, |
|
|
"lang": language |
|
|
} |
|
|
|
|
|
|
|
|
data = await self._make_request( |
|
|
url=endpoint, |
|
|
method="GET", |
|
|
params=params |
|
|
) |
|
|
|
|
|
|
|
|
result_data = data.get("Result", {}) |
|
|
|
|
|
|
|
|
topics = await self._extract_topics(result_data) |
|
|
|
|
|
|
|
|
result = self._format_success_response( |
|
|
search_term=topic, |
|
|
language=language, |
|
|
total_results=result_data.get("Total", 0) if isinstance(result_data, dict) else 0, |
|
|
topics=topics |
|
|
) |
|
|
|
|
|
|
|
|
self.cache.set(cache_key, result, ttl=604800) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching health information: {str(e)}") |
|
|
return self._format_error_response(f"Error fetching health information: {str(e)}") |
|
|
|
|
|
async def _extract_topics(self, result_data: Dict[str, Any]) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Extract topics from Health.gov API response |
|
|
|
|
|
Args: |
|
|
result_data: Result data from Health.gov API |
|
|
|
|
|
Returns: |
|
|
List of processed topic data |
|
|
""" |
|
|
topics = [] |
|
|
resources = result_data.get("Resources", {}) |
|
|
|
|
|
|
|
|
if not isinstance(resources, dict): |
|
|
logger.warning("Resources is not a dictionary") |
|
|
return topics |
|
|
|
|
|
resource_list = resources.get("Resource", []) |
|
|
|
|
|
|
|
|
if not isinstance(resource_list, list): |
|
|
resource_list = [resource_list] if resource_list else [] |
|
|
|
|
|
|
|
|
for resource in resource_list: |
|
|
|
|
|
if not isinstance(resource, dict): |
|
|
continue |
|
|
|
|
|
|
|
|
topic = { |
|
|
"title": resource.get("Title", ""), |
|
|
"url": resource.get("AccessibleVersion", ""), |
|
|
"last_updated": resource.get("LastUpdate", ""), |
|
|
"section": resource.get("Section", ""), |
|
|
"description": "" |
|
|
} |
|
|
|
|
|
|
|
|
categories = resource.get("Categories", {}) |
|
|
if isinstance(categories, dict): |
|
|
category_list = categories.get("Category", []) |
|
|
if not isinstance(category_list, list): |
|
|
category_list = [category_list] if category_list else [] |
|
|
|
|
|
if category_list and isinstance(category_list[0], dict): |
|
|
topic["description"] = category_list[0].get("Title", "") |
|
|
|
|
|
|
|
|
if "Sections" in resource and isinstance(resource["Sections"], dict): |
|
|
sections = resource["Sections"].get("Section", []) |
|
|
if not isinstance(sections, list): |
|
|
sections = [sections] if sections else [] |
|
|
|
|
|
content = [] |
|
|
for section in sections: |
|
|
if isinstance(section, dict) and "Content" in section: |
|
|
content.append(section["Content"]) |
|
|
|
|
|
if content: |
|
|
topic["content"] = content |
|
|
|
|
|
topics.append(topic) |
|
|
|
|
|
return topics |
|
|
|