|
|
""" |
|
|
Analysis Synthesizer - Result Aggregation and Synthesis |
|
|
Combines outputs from multiple specialized models |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, List, Any, Optional |
|
|
from datetime import datetime |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class AnalysisSynthesizer: |
|
|
""" |
|
|
Synthesizes results from multiple specialized models into |
|
|
a comprehensive medical document analysis |
|
|
|
|
|
Implements: |
|
|
- Result aggregation |
|
|
- Conflict resolution |
|
|
- Confidence calibration |
|
|
- Clinical insights generation |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
self.fusion_strategies = { |
|
|
"early": self._early_fusion, |
|
|
"late": self._late_fusion, |
|
|
"weighted": self._weighted_fusion |
|
|
} |
|
|
logger.info("Analysis Synthesizer initialized") |
|
|
|
|
|
async def synthesize( |
|
|
self, |
|
|
classification: Dict[str, Any], |
|
|
specialized_results: List[Dict[str, Any]], |
|
|
pdf_content: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Synthesize results from multiple models |
|
|
|
|
|
Returns comprehensive analysis with: |
|
|
- Aggregated findings |
|
|
- Key insights |
|
|
- Recommendations |
|
|
- Risk assessment |
|
|
- Confidence scores |
|
|
""" |
|
|
try: |
|
|
logger.info(f"Synthesizing {len(specialized_results)} model results") |
|
|
|
|
|
|
|
|
successful_results = [ |
|
|
r for r in specialized_results |
|
|
if r.get("status") == "completed" |
|
|
] |
|
|
|
|
|
if not successful_results: |
|
|
return self._generate_fallback_analysis(classification, pdf_content) |
|
|
|
|
|
|
|
|
aggregated_findings = self._aggregate_by_domain(successful_results) |
|
|
|
|
|
|
|
|
insights = self._generate_insights( |
|
|
aggregated_findings, |
|
|
classification, |
|
|
pdf_content |
|
|
) |
|
|
|
|
|
|
|
|
overall_confidence = self._calculate_overall_confidence(successful_results) |
|
|
|
|
|
|
|
|
summary = self._generate_summary( |
|
|
classification, |
|
|
aggregated_findings, |
|
|
insights |
|
|
) |
|
|
|
|
|
|
|
|
recommendations = self._generate_recommendations( |
|
|
aggregated_findings, |
|
|
classification |
|
|
) |
|
|
|
|
|
|
|
|
analysis = { |
|
|
"document_type": classification["document_type"], |
|
|
"classification_confidence": classification["confidence"], |
|
|
"overall_confidence": overall_confidence, |
|
|
"summary": summary, |
|
|
"aggregated_findings": aggregated_findings, |
|
|
"clinical_insights": insights, |
|
|
"recommendations": recommendations, |
|
|
"models_used": [ |
|
|
{ |
|
|
"model": r["model_name"], |
|
|
"domain": r["domain"], |
|
|
"confidence": r.get("result", {}).get("confidence", 0.0) |
|
|
} |
|
|
for r in successful_results |
|
|
], |
|
|
"quality_metrics": { |
|
|
"models_executed": len(successful_results), |
|
|
"models_failed": len(specialized_results) - len(successful_results), |
|
|
"overall_confidence": overall_confidence |
|
|
}, |
|
|
"metadata": { |
|
|
"synthesis_timestamp": datetime.utcnow().isoformat(), |
|
|
"page_count": pdf_content.get("page_count", 0), |
|
|
"has_images": len(pdf_content.get("images", [])) > 0, |
|
|
"has_tables": len(pdf_content.get("tables", [])) > 0 |
|
|
} |
|
|
} |
|
|
|
|
|
logger.info("Synthesis completed successfully") |
|
|
|
|
|
return analysis |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Synthesis failed: {str(e)}") |
|
|
return self._generate_fallback_analysis(classification, pdf_content) |
|
|
|
|
|
def _aggregate_by_domain( |
|
|
self, |
|
|
results: List[Dict[str, Any]] |
|
|
) -> Dict[str, Any]: |
|
|
"""Aggregate results by medical domain""" |
|
|
aggregated = {} |
|
|
|
|
|
for result in results: |
|
|
domain = result.get("domain", "general") |
|
|
|
|
|
if domain not in aggregated: |
|
|
aggregated[domain] = { |
|
|
"models": [], |
|
|
"findings": [], |
|
|
"confidence_scores": [] |
|
|
} |
|
|
|
|
|
aggregated[domain]["models"].append(result["model_name"]) |
|
|
|
|
|
|
|
|
result_data = result.get("result", {}) |
|
|
|
|
|
if "findings" in result_data: |
|
|
aggregated[domain]["findings"].append(result_data["findings"]) |
|
|
|
|
|
if "key_findings" in result_data: |
|
|
aggregated[domain]["findings"].extend(result_data["key_findings"]) |
|
|
|
|
|
if "analysis" in result_data: |
|
|
aggregated[domain]["findings"].append(result_data["analysis"]) |
|
|
|
|
|
confidence = result_data.get("confidence", 0.0) |
|
|
aggregated[domain]["confidence_scores"].append(confidence) |
|
|
|
|
|
|
|
|
for domain in aggregated: |
|
|
scores = aggregated[domain]["confidence_scores"] |
|
|
aggregated[domain]["average_confidence"] = sum(scores) / len(scores) if scores else 0.0 |
|
|
|
|
|
return aggregated |
|
|
|
|
|
def _generate_insights( |
|
|
self, |
|
|
aggregated_findings: Dict[str, Any], |
|
|
classification: Dict[str, Any], |
|
|
pdf_content: Dict[str, Any] |
|
|
) -> List[Dict[str, str]]: |
|
|
"""Generate clinical insights from aggregated findings""" |
|
|
insights = [] |
|
|
|
|
|
|
|
|
page_count = pdf_content.get("page_count", 0) |
|
|
if page_count > 0: |
|
|
insights.append({ |
|
|
"category": "Document Structure", |
|
|
"insight": f"Document contains {page_count} pages with {'comprehensive' if page_count > 5 else 'standard'} documentation", |
|
|
"importance": "medium" |
|
|
}) |
|
|
|
|
|
|
|
|
doc_type = classification["document_type"] |
|
|
confidence = classification["confidence"] |
|
|
insights.append({ |
|
|
"category": "Document Classification", |
|
|
"insight": f"Document identified as {doc_type.replace('_', ' ').title()} with {confidence*100:.0f}% confidence", |
|
|
"importance": "high" |
|
|
}) |
|
|
|
|
|
|
|
|
for domain, data in aggregated_findings.items(): |
|
|
avg_confidence = data.get("average_confidence", 0.0) |
|
|
model_count = len(data.get("models", [])) |
|
|
|
|
|
insights.append({ |
|
|
"category": domain.replace("_", " ").title(), |
|
|
"insight": f"Analysis completed by {model_count} specialized model(s) with {avg_confidence*100:.0f}% average confidence", |
|
|
"importance": "high" if avg_confidence > 0.8 else "medium" |
|
|
}) |
|
|
|
|
|
|
|
|
has_images = pdf_content.get("images", []) |
|
|
has_tables = pdf_content.get("tables", []) |
|
|
|
|
|
if has_images: |
|
|
insights.append({ |
|
|
"category": "Multimodal Content", |
|
|
"insight": f"Document contains {len(has_images)} image(s) for enhanced analysis", |
|
|
"importance": "medium" |
|
|
}) |
|
|
|
|
|
if has_tables: |
|
|
insights.append({ |
|
|
"category": "Structured Data", |
|
|
"insight": f"Document contains {len(has_tables)} table(s) with structured information", |
|
|
"importance": "medium" |
|
|
}) |
|
|
|
|
|
return insights |
|
|
|
|
|
def _calculate_overall_confidence(self, results: List[Dict[str, Any]]) -> float: |
|
|
"""Calculate weighted overall confidence score""" |
|
|
if not results: |
|
|
return 0.0 |
|
|
|
|
|
confidences = [] |
|
|
weights = [] |
|
|
|
|
|
for result in results: |
|
|
confidence = result.get("result", {}).get("confidence", 0.0) |
|
|
priority = result.get("priority", "secondary") |
|
|
|
|
|
|
|
|
weight = 1.5 if priority == "primary" else 1.0 |
|
|
|
|
|
confidences.append(confidence) |
|
|
weights.append(weight) |
|
|
|
|
|
|
|
|
weighted_sum = sum(c * w for c, w in zip(confidences, weights)) |
|
|
total_weight = sum(weights) |
|
|
|
|
|
return weighted_sum / total_weight if total_weight > 0 else 0.0 |
|
|
|
|
|
def _generate_summary( |
|
|
self, |
|
|
classification: Dict[str, Any], |
|
|
aggregated_findings: Dict[str, Any], |
|
|
insights: List[Dict[str, str]] |
|
|
) -> str: |
|
|
"""Generate executive summary of analysis""" |
|
|
doc_type = classification["document_type"].replace("_", " ").title() |
|
|
|
|
|
summary_parts = [ |
|
|
f"Medical Document Analysis: {doc_type}", |
|
|
f"\nThis document has been processed through our comprehensive AI analysis pipeline using {len(aggregated_findings)} specialized medical AI domain(s).", |
|
|
] |
|
|
|
|
|
|
|
|
for domain, data in aggregated_findings.items(): |
|
|
domain_name = domain.replace("_", " ").title() |
|
|
model_count = len(data.get("models", [])) |
|
|
avg_conf = data.get("average_confidence", 0.0) |
|
|
|
|
|
summary_parts.append( |
|
|
f"\n\n{domain_name}: Analyzed by {model_count} model(s) with {avg_conf*100:.0f}% confidence. " |
|
|
f"{'High confidence analysis completed.' if avg_conf > 0.8 else 'Analysis completed with moderate confidence.'}" |
|
|
) |
|
|
|
|
|
|
|
|
high_importance = [i for i in insights if i.get("importance") == "high"] |
|
|
if high_importance: |
|
|
summary_parts.append( |
|
|
f"\n\nKey Findings: {len(high_importance)} high-priority insights identified for clinical review." |
|
|
) |
|
|
|
|
|
summary_parts.append( |
|
|
"\n\nThis analysis provides AI-assisted insights and should be reviewed by qualified healthcare professionals for clinical decision-making." |
|
|
) |
|
|
|
|
|
return "".join(summary_parts) |
|
|
|
|
|
def _generate_recommendations( |
|
|
self, |
|
|
aggregated_findings: Dict[str, Any], |
|
|
classification: Dict[str, Any] |
|
|
) -> List[Dict[str, str]]: |
|
|
"""Generate recommendations based on analysis""" |
|
|
recommendations = [] |
|
|
|
|
|
|
|
|
doc_type = classification["document_type"] |
|
|
|
|
|
if doc_type == "radiology": |
|
|
recommendations.append({ |
|
|
"category": "Clinical Review", |
|
|
"recommendation": "Radiologist review recommended for imaging findings confirmation", |
|
|
"priority": "high" |
|
|
}) |
|
|
|
|
|
elif doc_type == "pathology": |
|
|
recommendations.append({ |
|
|
"category": "Clinical Review", |
|
|
"recommendation": "Pathologist verification required for tissue analysis", |
|
|
"priority": "high" |
|
|
}) |
|
|
|
|
|
elif doc_type == "laboratory": |
|
|
recommendations.append({ |
|
|
"category": "Clinical Review", |
|
|
"recommendation": "Review laboratory values in context of patient history", |
|
|
"priority": "medium" |
|
|
}) |
|
|
|
|
|
elif doc_type == "cardiology": |
|
|
recommendations.append({ |
|
|
"category": "Clinical Review", |
|
|
"recommendation": "Cardiologist review recommended for cardiac findings", |
|
|
"priority": "high" |
|
|
}) |
|
|
|
|
|
|
|
|
recommendations.append({ |
|
|
"category": "Data Quality", |
|
|
"recommendation": "All AI-generated insights should be validated by qualified healthcare professionals", |
|
|
"priority": "high" |
|
|
}) |
|
|
|
|
|
recommendations.append({ |
|
|
"category": "Documentation", |
|
|
"recommendation": "Maintain this analysis report with patient medical records", |
|
|
"priority": "medium" |
|
|
}) |
|
|
|
|
|
|
|
|
low_confidence_domains = [ |
|
|
domain for domain, data in aggregated_findings.items() |
|
|
if data.get("average_confidence", 0.0) < 0.7 |
|
|
] |
|
|
|
|
|
if low_confidence_domains: |
|
|
recommendations.append({ |
|
|
"category": "Analysis Quality", |
|
|
"recommendation": f"Lower confidence detected in {', '.join(low_confidence_domains)}. Consider manual review.", |
|
|
"priority": "medium" |
|
|
}) |
|
|
|
|
|
return recommendations |
|
|
|
|
|
def _generate_fallback_analysis( |
|
|
self, |
|
|
classification: Dict[str, Any], |
|
|
pdf_content: Dict[str, Any] |
|
|
) -> Dict[str, Any]: |
|
|
"""Generate fallback analysis when no models succeeded""" |
|
|
return { |
|
|
"document_type": classification["document_type"], |
|
|
"classification_confidence": classification["confidence"], |
|
|
"overall_confidence": 0.0, |
|
|
"summary": "Analysis could not be completed. Document was classified but specialized model processing failed.", |
|
|
"aggregated_findings": {}, |
|
|
"clinical_insights": [], |
|
|
"recommendations": [{ |
|
|
"category": "Manual Review", |
|
|
"recommendation": "Manual review required - automated analysis unavailable", |
|
|
"priority": "high" |
|
|
}], |
|
|
"models_used": [], |
|
|
"quality_metrics": { |
|
|
"models_executed": 0, |
|
|
"models_failed": 0, |
|
|
"overall_confidence": 0.0 |
|
|
}, |
|
|
"metadata": { |
|
|
"synthesis_timestamp": datetime.utcnow().isoformat(), |
|
|
"page_count": pdf_content.get("page_count", 0), |
|
|
"fallback": True |
|
|
} |
|
|
} |
|
|
|
|
|
def _early_fusion(self, results: List[Dict]) -> Dict: |
|
|
"""Early fusion strategy - combine features before analysis""" |
|
|
pass |
|
|
|
|
|
def _late_fusion(self, results: List[Dict]) -> Dict: |
|
|
"""Late fusion strategy - combine predictions after analysis""" |
|
|
pass |
|
|
|
|
|
def _weighted_fusion(self, results: List[Dict]) -> Dict: |
|
|
"""Weighted fusion strategy - weight by model confidence""" |
|
|
pass |
|
|
|