ThreatLevelD
Upgrade EILProcessor to world-class signal normalization: adds subphrase/keyword blend detection, chunk weighting by model confidence, negation/contrast handling, emotion arc trajectory output, and sentiment-to-emotion mapping for non-EI language. Significantly improves long-form and ambiguous emotional inference.
9b2f3b7
# core/eil_processor.py | |
# MEC EIL Processor – World-Class Signal Normalization Edition | |
import yaml | |
import re | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
import torch | |
import torch.nn.functional as F | |
class EILProcessor: | |
def __init__(self, codex_informer, softmax_threshold=0.6): | |
self.codex_informer = codex_informer | |
self.softmax_threshold = softmax_threshold | |
# Build alias lookup from Codex | |
self.alias_lookup = self.codex_informer.build_alias_lookup() | |
print(f"[EILProcessor] Alias map loaded with {len(self.alias_lookup)} entries") | |
# Load crosswalk.yaml | |
with open('config/crosswalk.yaml', 'r', encoding='utf-8') as f: | |
yaml_data = yaml.safe_load(f) | |
crosswalk_data = yaml_data['crosswalk'] | |
story_pattern_data = yaml_data.get('story_patterns', []) | |
# Build crosswalk lookup | |
self.crosswalk_lookup = {} | |
for entry in crosswalk_data: | |
phrase = self.normalize_text(entry['phrase']) | |
emotion_code = entry['emotion_code'] | |
self.crosswalk_lookup[phrase] = emotion_code | |
# Build story_patterns lookup | |
self.story_patterns_lookup = {} | |
for entry in story_pattern_data: | |
pattern = self.normalize_text(entry['pattern']) | |
emotion_code = entry['emotion_code'] | |
self.story_patterns_lookup[pattern] = emotion_code | |
print(f"[EILProcessor] Crosswalk loaded with {len(self.crosswalk_lookup)} entries") | |
print(f"[EILProcessor] Story Patterns loaded with {len(self.story_patterns_lookup)} entries") | |
# Emotion keyword dictionary for signal normalization/blending | |
self.emotion_keyword_map = { | |
"FAM-ANG": ["anger", "angry", "hate", "furious", "rage", "resentment"], | |
"FAM-HEL": ["helpless", "powerless", "can't", "unable", "trapped", "stuck", "overwhelmed", "overwhelm"], | |
"FAM-SAD": ["sad", "down", "unhappy", "miserable", "depressed", "blue", "empty"], | |
"FAM-FEA": ["afraid", "scared", "fear", "terrified", "worried", "nervous", "anxious", "can't sleep"], | |
"FAM-LOV": ["love", "loved", "loving", "caring", "affection", "proud"], | |
"FAM-JOY": ["joy", "happy", "excited", "delighted", "content", "proud"], | |
"FAM-SUR": ["surprised", "amazed", "astonished", "shocked"], | |
"FAM-DIS": ["disgust", "disgusted", "gross", "revolted"], | |
"FAM-SHA": ["ashamed", "shame", "embarrassed", "humiliated"], | |
"FAM-GUI": ["guilty", "guilt", "remorse", "regret"], | |
# Add more as needed | |
} | |
# For sentiment-to-emotion mapping of ambiguous/indirect language | |
self.sentiment_cue_map = [ | |
# (sentiment, regex or cue, mapped emotion) | |
("negative", r"can.?t sleep|insomnia|restless|wake up", "FAM-FEA"), | |
("negative", r"too much|overwhelmed|can.?t cope|can.?t deal", "FAM-HEL"), | |
("negative", r"nothing feels right|empty|pointless|no purpose", "FAM-SAD"), | |
("negative", r"don't care|apathy|numb", "FAM-LON"), | |
("positive", r"did it|proud|relieved", "FAM-JOY"), | |
("neutral", r"just tired|exhausted", "FAM-HEL"), | |
# ...add more for coverage | |
] | |
# Load emotion and sentiment models | |
self.tokenizer = AutoTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base-emotion') | |
self.model = AutoModelForSequenceClassification.from_pretrained('cardiffnlp/twitter-roberta-base-emotion') | |
self.sentiment_tokenizer = AutoTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment-latest') | |
self.sentiment_model = AutoModelForSequenceClassification.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment-latest') | |
def normalize_text(self, text): | |
normalization_map = { | |
"i am feeling ": "", | |
"i feel ": "", | |
"feeling ": "", | |
"i'm feeling ": "", | |
"i am ": "", | |
"i'm ": "" | |
} | |
text = text.lower().strip() | |
for k, v in normalization_map.items(): | |
if text.startswith(k): | |
text = text.replace(k, "", 1) | |
break | |
text = re.sub(r'[.!?]', '', text) | |
return text | |
def is_story_input(self, text): | |
clause_markers = [',', ';', '.', 'but', 'because', 'so that', 'which', 'when', 'while'] | |
token_count = len(text.split()) | |
clause_hits = any(marker in text for marker in clause_markers) | |
return token_count > 12 or clause_hits | |
def chunk_story(self, text): | |
chunks = re.split(r'[.,;!?]|\b(?:and|but|because|so|although|though|while|when)\b', text, flags=re.IGNORECASE) | |
chunks = [chunk.strip() for chunk in chunks if chunk and chunk.strip()] | |
return chunks | |
def detect_emotion_blend_with_negation(self, norm_text): | |
blend = {} | |
for fam, keywords in self.emotion_keyword_map.items(): | |
for kw in keywords: | |
negation_patterns = [ | |
rf"not {kw}", rf"no longer {kw}", rf"never {kw}", | |
rf"no {kw}", rf"\bwithout {kw}" | |
] | |
if any(re.search(p, norm_text) for p in negation_patterns): | |
continue | |
if kw in norm_text: | |
blend[fam] = blend.get(fam, 0) + 1.0 | |
return blend | |
def get_sentiment(self, norm_text): | |
tokens = self.sentiment_tokenizer(norm_text, return_tensors='pt') | |
with torch.no_grad(): | |
logits = self.sentiment_model(**tokens).logits | |
probs = F.softmax(logits, dim=-1).squeeze() | |
top_prob, top_idx = torch.max(probs, dim=-1) | |
sentiment_label = self.sentiment_model.config.id2label[top_idx.item()] | |
return sentiment_label.lower(), top_prob.item() | |
def infer_emotion(self, input_text): | |
norm_text = self.normalize_text(input_text) | |
# 1️⃣ Story Pattern Override | |
if norm_text in self.story_patterns_lookup: | |
primary_emotion_code = self.story_patterns_lookup[norm_text] | |
emotion_data = self.codex_informer.resolve_emotion_family(primary_emotion_code) | |
print(f"[EILProcessor] Story Pattern match: '{norm_text}' → {primary_emotion_code}") | |
packet = { | |
'phrases': [input_text], | |
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': primary_emotion_code}], | |
'metadata': {'source': 'EILProcessor (story pattern)', 'input_type': 'story'}, | |
'emotion_family': emotion_data['emotion_family'], | |
'primary_emotion_code': emotion_data['primary_emotion_code'], | |
'arc': emotion_data['arc'], | |
'resonance': emotion_data['resonance'], | |
'blend': {emotion_data['primary_emotion_code']: 1.0}, | |
'trajectory': [emotion_data['primary_emotion_code']], | |
} | |
return packet | |
# 2️⃣ Story detection (chunking and blend aggregation) | |
input_type = 'phrase' | |
if self.is_story_input(norm_text): | |
input_type = 'story' | |
print(f"[EILProcessor] Story mode activated for input: '{norm_text}'") | |
chunks = self.chunk_story(norm_text) | |
chunk_results = [] | |
blend_accum = {} | |
trajectory = [] | |
for chunk in chunks: | |
sub_result = self.infer_emotion(chunk) # RECURSIVE CALL | |
chunk_results.append(sub_result) | |
# Accumulate blends (weighted by confidence if available) | |
conf = sub_result.get('confidence', 1.0) | |
for fam, val in sub_result.get('blend', {}).items(): | |
blend_accum[fam] = blend_accum.get(fam, 0) + val * conf | |
# Trajectory | |
if 'primary_emotion_code' in sub_result: | |
trajectory.append(sub_result['primary_emotion_code']) | |
# Normalize blend | |
if blend_accum: | |
total = sum(blend_accum.values()) | |
for k in blend_accum: | |
blend_accum[k] /= total | |
dominant_family = max(blend_accum.items(), key=lambda x: x[1])[0] | |
else: | |
dominant_family = "FAM-NEU" | |
blend_accum = {"FAM-NEU": 1.0} | |
trajectory = ["FAM-NEU"] | |
emotion_data = self.codex_informer.resolve_emotion_family(dominant_family) | |
packet = { | |
'phrases': [input_text] + [r['phrases'][0] for r in chunk_results], | |
'emotion_candidates': [{'phrase': r['phrases'][0], 'candidate_emotion': r.get('primary_emotion_code', 'FAM-NEU')} for r in chunk_results], | |
'metadata': {'source': 'EILProcessor (story mode)', 'input_type': input_type}, | |
'emotion_family': emotion_data['emotion_family'], | |
'primary_emotion_code': emotion_data['primary_emotion_code'], | |
'arc': emotion_data['arc'], | |
'resonance': emotion_data['resonance'], | |
'blend': blend_accum, | |
'trajectory': trajectory, | |
} | |
return packet | |
# 3️⃣ Crosswalk check | |
if norm_text in self.crosswalk_lookup: | |
primary_emotion_code = self.crosswalk_lookup[norm_text] | |
emotion_data = self.codex_informer.resolve_emotion_family(primary_emotion_code) | |
print(f"[EILProcessor] Crosswalk match: '{norm_text}' → {primary_emotion_code}") | |
packet = { | |
'phrases': [input_text], | |
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': primary_emotion_code}], | |
'metadata': {'source': 'EILProcessor (crosswalk)', 'input_type': input_type}, | |
'emotion_family': emotion_data['emotion_family'], | |
'primary_emotion_code': emotion_data['primary_emotion_code'], | |
'arc': emotion_data['arc'], | |
'resonance': emotion_data['resonance'], | |
'blend': {emotion_data['primary_emotion_code']: 1.0}, | |
'trajectory': [emotion_data['primary_emotion_code']], | |
} | |
return packet | |
# 4️⃣ Alias lookup | |
if norm_text in self.alias_lookup: | |
variant_code = self.alias_lookup[norm_text] | |
emotion_family = variant_code.split('-')[1] | |
family_code = f"FAM-{emotion_family}" | |
print(f"[EILProcessor] Alias match: '{norm_text}' → {variant_code}") | |
packet = { | |
'phrases': [input_text], | |
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': variant_code}], | |
'metadata': {'source': 'EILProcessor (alias match)', 'input_type': input_type}, | |
'emotion_family': family_code, | |
'primary_emotion_code': variant_code, | |
'arc': 'Pending', | |
'resonance': 'Pending', | |
'blend': {variant_code: 1.0}, | |
'trajectory': [variant_code], | |
} | |
return packet | |
# 5️⃣ Signal normalization - blend detection & negation | |
blend = self.detect_emotion_blend_with_negation(norm_text) | |
if blend: | |
total = sum(blend.values()) | |
for k in blend: | |
blend[k] /= total | |
primary_code = max(blend.items(), key=lambda x: x[1])[0] | |
emotion_data = self.codex_informer.resolve_emotion_family(primary_code) | |
print(f"[EILProcessor] Signal normalization keyword blend: {blend} (primary: {primary_code})") | |
packet = { | |
'phrases': [input_text], | |
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': primary_code}], | |
'metadata': {'source': 'EILProcessor (signal normalization)', 'input_type': input_type}, | |
'emotion_family': emotion_data['emotion_family'], | |
'primary_emotion_code': emotion_data['primary_emotion_code'], | |
'arc': emotion_data['arc'], | |
'resonance': emotion_data['resonance'], | |
'blend': blend, | |
'trajectory': [primary_code], | |
} | |
return packet | |
# 6️⃣ Sentiment-to-emotion mapping for non-EI language | |
sentiment, sentiment_conf = self.get_sentiment(norm_text) | |
print(f"[EILProcessor] Sentiment fallback: {sentiment} ({sentiment_conf:.2f})") | |
for sent, cue, fam in self.sentiment_cue_map: | |
if sent == sentiment and re.search(cue, norm_text): | |
emotion_data = self.codex_informer.resolve_emotion_family(fam) | |
packet = { | |
'phrases': [input_text], | |
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': fam}], | |
'metadata': {'source': 'EILProcessor (sentiment-to-emotion)', 'input_type': input_type}, | |
'emotion_family': emotion_data['emotion_family'], | |
'primary_emotion_code': emotion_data['primary_emotion_code'], | |
'arc': emotion_data['arc'], | |
'resonance': emotion_data['resonance'], | |
'blend': {fam: 1.0}, | |
'trajectory': [fam], | |
} | |
return packet | |
# 7️⃣ Model fallback (last resort) | |
print(f"[EILProcessor] No crosswalk/alias/keyword/sentiment match — running model on: '{norm_text}'") | |
tokens = self.tokenizer(norm_text, return_tensors='pt') | |
with torch.no_grad(): | |
logits = self.model(**tokens).logits | |
probs = F.softmax(logits, dim=-1).squeeze() | |
top_prob, top_idx = torch.max(probs, dim=-1) | |
predicted_label = self.model.config.id2label[top_idx.item()] | |
confidence = top_prob.item() | |
if confidence < self.softmax_threshold: | |
predicted_label = 'neutral' | |
print(f"[EILProcessor] Low confidence ({confidence:.2f}) — setting to 'neutral'") | |
print(f"[EILProcessor] Model prediction: {predicted_label} ({confidence:.2f})") | |
model_to_codex_map = { | |
"joy": "FAM-JOY", | |
"anger": "FAM-ANG", | |
"sadness": "FAM-SAD", | |
"fear": "FAM-FEA", | |
"love": "FAM-LOV", | |
"surprise": "FAM-SUR", | |
"disgust": "FAM-DIS", | |
"neutral": "FAM-NEU" | |
} | |
primary_emotion_code = model_to_codex_map.get(predicted_label.lower(), "FAM-NEU") | |
emotion_data = self.codex_informer.resolve_emotion_family(primary_emotion_code) | |
blend = {emotion_data['primary_emotion_code']: 1.0} | |
packet = { | |
'phrases': [input_text], | |
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': predicted_label}], | |
'metadata': {'source': 'EILProcessor (model)', 'input_type': input_type, 'confidence': confidence}, | |
'emotion_family': emotion_data['emotion_family'], | |
'primary_emotion_code': emotion_data['primary_emotion_code'], | |
'arc': emotion_data['arc'], | |
'resonance': emotion_data['resonance'], | |
'blend': blend, | |
'trajectory': [emotion_data['primary_emotion_code']], | |
'confidence': confidence | |
} | |
return packet | |