medical-report-analyzer / analysis_synthesizer.py
snikhilesh's picture
Deploy backend with monitoring infrastructure - Complete Medical AI Platform
13d5ab4 verified
raw
history blame
14.9 kB
"""
Analysis Synthesizer - Result Aggregation and Synthesis
Combines outputs from multiple specialized models
"""
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
class AnalysisSynthesizer:
"""
Synthesizes results from multiple specialized models into
a comprehensive medical document analysis
Implements:
- Result aggregation
- Conflict resolution
- Confidence calibration
- Clinical insights generation
"""
def __init__(self):
self.fusion_strategies = {
"early": self._early_fusion,
"late": self._late_fusion,
"weighted": self._weighted_fusion
}
logger.info("Analysis Synthesizer initialized")
async def synthesize(
self,
classification: Dict[str, Any],
specialized_results: List[Dict[str, Any]],
pdf_content: Dict[str, Any]
) -> Dict[str, Any]:
"""
Synthesize results from multiple models
Returns comprehensive analysis with:
- Aggregated findings
- Key insights
- Recommendations
- Risk assessment
- Confidence scores
"""
try:
logger.info(f"Synthesizing {len(specialized_results)} model results")
# Extract successful results
successful_results = [
r for r in specialized_results
if r.get("status") == "completed"
]
if not successful_results:
return self._generate_fallback_analysis(classification, pdf_content)
# Aggregate findings by domain
aggregated_findings = self._aggregate_by_domain(successful_results)
# Generate clinical insights
insights = self._generate_insights(
aggregated_findings,
classification,
pdf_content
)
# Calculate overall confidence
overall_confidence = self._calculate_overall_confidence(successful_results)
# Generate summary
summary = self._generate_summary(
classification,
aggregated_findings,
insights
)
# Generate recommendations
recommendations = self._generate_recommendations(
aggregated_findings,
classification
)
# Compile final analysis
analysis = {
"document_type": classification["document_type"],
"classification_confidence": classification["confidence"],
"overall_confidence": overall_confidence,
"summary": summary,
"aggregated_findings": aggregated_findings,
"clinical_insights": insights,
"recommendations": recommendations,
"models_used": [
{
"model": r["model_name"],
"domain": r["domain"],
"confidence": r.get("result", {}).get("confidence", 0.0)
}
for r in successful_results
],
"quality_metrics": {
"models_executed": len(successful_results),
"models_failed": len(specialized_results) - len(successful_results),
"overall_confidence": overall_confidence
},
"metadata": {
"synthesis_timestamp": datetime.utcnow().isoformat(),
"page_count": pdf_content.get("page_count", 0),
"has_images": len(pdf_content.get("images", [])) > 0,
"has_tables": len(pdf_content.get("tables", [])) > 0
}
}
logger.info("Synthesis completed successfully")
return analysis
except Exception as e:
logger.error(f"Synthesis failed: {str(e)}")
return self._generate_fallback_analysis(classification, pdf_content)
def _aggregate_by_domain(
self,
results: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""Aggregate results by medical domain"""
aggregated = {}
for result in results:
domain = result.get("domain", "general")
if domain not in aggregated:
aggregated[domain] = {
"models": [],
"findings": [],
"confidence_scores": []
}
aggregated[domain]["models"].append(result["model_name"])
# Extract findings from result
result_data = result.get("result", {})
if "findings" in result_data:
aggregated[domain]["findings"].append(result_data["findings"])
if "key_findings" in result_data:
aggregated[domain]["findings"].extend(result_data["key_findings"])
if "analysis" in result_data:
aggregated[domain]["findings"].append(result_data["analysis"])
confidence = result_data.get("confidence", 0.0)
aggregated[domain]["confidence_scores"].append(confidence)
# Calculate average confidence per domain
for domain in aggregated:
scores = aggregated[domain]["confidence_scores"]
aggregated[domain]["average_confidence"] = sum(scores) / len(scores) if scores else 0.0
return aggregated
def _generate_insights(
self,
aggregated_findings: Dict[str, Any],
classification: Dict[str, Any],
pdf_content: Dict[str, Any]
) -> List[Dict[str, str]]:
"""Generate clinical insights from aggregated findings"""
insights = []
# Document structure insight
page_count = pdf_content.get("page_count", 0)
if page_count > 0:
insights.append({
"category": "Document Structure",
"insight": f"Document contains {page_count} pages with {'comprehensive' if page_count > 5 else 'standard'} documentation",
"importance": "medium"
})
# Classification insight
doc_type = classification["document_type"]
confidence = classification["confidence"]
insights.append({
"category": "Document Classification",
"insight": f"Document identified as {doc_type.replace('_', ' ').title()} with {confidence*100:.0f}% confidence",
"importance": "high"
})
# Domain-specific insights
for domain, data in aggregated_findings.items():
avg_confidence = data.get("average_confidence", 0.0)
model_count = len(data.get("models", []))
insights.append({
"category": domain.replace("_", " ").title(),
"insight": f"Analysis completed by {model_count} specialized model(s) with {avg_confidence*100:.0f}% average confidence",
"importance": "high" if avg_confidence > 0.8 else "medium"
})
# Data richness insight
has_images = pdf_content.get("images", [])
has_tables = pdf_content.get("tables", [])
if has_images:
insights.append({
"category": "Multimodal Content",
"insight": f"Document contains {len(has_images)} image(s) for enhanced analysis",
"importance": "medium"
})
if has_tables:
insights.append({
"category": "Structured Data",
"insight": f"Document contains {len(has_tables)} table(s) with structured information",
"importance": "medium"
})
return insights
def _calculate_overall_confidence(self, results: List[Dict[str, Any]]) -> float:
"""Calculate weighted overall confidence score"""
if not results:
return 0.0
confidences = []
weights = []
for result in results:
confidence = result.get("result", {}).get("confidence", 0.0)
priority = result.get("priority", "secondary")
# Weight by priority
weight = 1.5 if priority == "primary" else 1.0
confidences.append(confidence)
weights.append(weight)
# Weighted average
weighted_sum = sum(c * w for c, w in zip(confidences, weights))
total_weight = sum(weights)
return weighted_sum / total_weight if total_weight > 0 else 0.0
def _generate_summary(
self,
classification: Dict[str, Any],
aggregated_findings: Dict[str, Any],
insights: List[Dict[str, str]]
) -> str:
"""Generate executive summary of analysis"""
doc_type = classification["document_type"].replace("_", " ").title()
summary_parts = [
f"Medical Document Analysis: {doc_type}",
f"\nThis document has been processed through our comprehensive AI analysis pipeline using {len(aggregated_findings)} specialized medical AI domain(s).",
]
# Add domain summaries
for domain, data in aggregated_findings.items():
domain_name = domain.replace("_", " ").title()
model_count = len(data.get("models", []))
avg_conf = data.get("average_confidence", 0.0)
summary_parts.append(
f"\n\n{domain_name}: Analyzed by {model_count} model(s) with {avg_conf*100:.0f}% confidence. "
f"{'High confidence analysis completed.' if avg_conf > 0.8 else 'Analysis completed with moderate confidence.'}"
)
# Add insights summary
high_importance = [i for i in insights if i.get("importance") == "high"]
if high_importance:
summary_parts.append(
f"\n\nKey Findings: {len(high_importance)} high-priority insights identified for clinical review."
)
summary_parts.append(
"\n\nThis analysis provides AI-assisted insights and should be reviewed by qualified healthcare professionals for clinical decision-making."
)
return "".join(summary_parts)
def _generate_recommendations(
self,
aggregated_findings: Dict[str, Any],
classification: Dict[str, Any]
) -> List[Dict[str, str]]:
"""Generate recommendations based on analysis"""
recommendations = []
# Classification-based recommendations
doc_type = classification["document_type"]
if doc_type == "radiology":
recommendations.append({
"category": "Clinical Review",
"recommendation": "Radiologist review recommended for imaging findings confirmation",
"priority": "high"
})
elif doc_type == "pathology":
recommendations.append({
"category": "Clinical Review",
"recommendation": "Pathologist verification required for tissue analysis",
"priority": "high"
})
elif doc_type == "laboratory":
recommendations.append({
"category": "Clinical Review",
"recommendation": "Review laboratory values in context of patient history",
"priority": "medium"
})
elif doc_type == "cardiology":
recommendations.append({
"category": "Clinical Review",
"recommendation": "Cardiologist review recommended for cardiac findings",
"priority": "high"
})
# General recommendations
recommendations.append({
"category": "Data Quality",
"recommendation": "All AI-generated insights should be validated by qualified healthcare professionals",
"priority": "high"
})
recommendations.append({
"category": "Documentation",
"recommendation": "Maintain this analysis report with patient medical records",
"priority": "medium"
})
# Confidence-based recommendations
low_confidence_domains = [
domain for domain, data in aggregated_findings.items()
if data.get("average_confidence", 0.0) < 0.7
]
if low_confidence_domains:
recommendations.append({
"category": "Analysis Quality",
"recommendation": f"Lower confidence detected in {', '.join(low_confidence_domains)}. Consider manual review.",
"priority": "medium"
})
return recommendations
def _generate_fallback_analysis(
self,
classification: Dict[str, Any],
pdf_content: Dict[str, Any]
) -> Dict[str, Any]:
"""Generate fallback analysis when no models succeeded"""
return {
"document_type": classification["document_type"],
"classification_confidence": classification["confidence"],
"overall_confidence": 0.0,
"summary": "Analysis could not be completed. Document was classified but specialized model processing failed.",
"aggregated_findings": {},
"clinical_insights": [],
"recommendations": [{
"category": "Manual Review",
"recommendation": "Manual review required - automated analysis unavailable",
"priority": "high"
}],
"models_used": [],
"quality_metrics": {
"models_executed": 0,
"models_failed": 0,
"overall_confidence": 0.0
},
"metadata": {
"synthesis_timestamp": datetime.utcnow().isoformat(),
"page_count": pdf_content.get("page_count", 0),
"fallback": True
}
}
def _early_fusion(self, results: List[Dict]) -> Dict:
"""Early fusion strategy - combine features before analysis"""
pass
def _late_fusion(self, results: List[Dict]) -> Dict:
"""Late fusion strategy - combine predictions after analysis"""
pass
def _weighted_fusion(self, results: List[Dict]) -> Dict:
"""Weighted fusion strategy - weight by model confidence"""
pass