FCT / services /domain_plugins /base_plugin.py
Parthnuwal7
Adding analytical content
3d015cd
"""Base Domain Plugin Interface"""
from abc import ABC, abstractmethod
from typing import Dict, Tuple, List, Optional
from dataclasses import dataclass
@dataclass
class DomainScore:
"""Standardized domain scoring output"""
domain_type: str
score: float # 0-1
confidence: float # 0-1
raw_features: Dict # Raw feature values for explainability
processing_time_ms: float
def to_dict(self):
return {
'domain_type': self.domain_type,
'score': round(self.score, 3),
'confidence': round(self.confidence, 3),
'raw_features': self.raw_features,
'processing_time_ms': round(self.processing_time_ms, 2)
}
class BaseDomainPlugin(ABC):
"""Abstract base class for all domain plugins"""
def __init__(self):
self.domain_type = self._get_domain_type()
self.feature_weights = self._get_feature_weights()
@abstractmethod
def _get_domain_type(self) -> str:
"""Return domain identifier (e.g., 'tech', 'business')"""
pass
@abstractmethod
def _get_feature_weights(self) -> Dict[str, float]:
"""Return feature name to weight mapping"""
pass
@abstractmethod
def get_required_fields(self) -> List[str]:
"""Return list of required input fields for this domain"""
pass
@abstractmethod
def get_optional_fields(self) -> List[str]:
"""Return list of optional input fields"""
pass
def validate_inputs(self, evidence_data: Dict) -> Tuple[bool, Optional[str]]:
"""
Validate input data completeness
Returns: (is_valid, error_message)
"""
required = self.get_required_fields()
missing = [f for f in required if not evidence_data.get(f)]
if missing:
return False, f"Missing required fields: {', '.join(missing)}"
return True, None
@abstractmethod
def score(self, evidence_data: Dict) -> DomainScore:
"""
Main scoring method - must be implemented by each plugin
Args:
evidence_data: Dictionary containing domain-specific inputs
Returns:
DomainScore object with score, confidence, and features
"""
pass
def explain(self, features: Dict) -> Dict:
"""Generate human-readable explanation of scoring"""
explanations = {
'top_features': [],
'recommendations': []
}
# Sort features by value
sorted_features = sorted(features.items(), key=lambda x: x[1], reverse=True)
# Top 3 features
for feat, val in sorted_features[:3]:
if val > 0.3:
explanations['top_features'].append({
'feature': feat,
'value': round(val, 2),
'weight': self.feature_weights.get(feat, 0)
})
return explanations
def calculate_confidence(self, evidence_data: Dict) -> float:
"""
Calculate confidence based on data completeness and quality
Returns: 0-1 confidence score
"""
required_fields = self.get_required_fields()
optional_fields = self.get_optional_fields()
total_fields = len(required_fields) + len(optional_fields)
filled_required = sum(1 for f in required_fields if evidence_data.get(f))
filled_optional = sum(1 for f in optional_fields if evidence_data.get(f))
# Base confidence from required fields (70%)
required_confidence = (filled_required / len(required_fields)) * 0.7 if required_fields else 0.7
# Bonus from optional fields (30%)
optional_confidence = (filled_optional / len(optional_fields)) * 0.3 if optional_fields else 0.3
return min(required_confidence + optional_confidence, 1.0)