ThreatLevelD
Upgrade EILProcessor to world-class signal normalization: adds subphrase/keyword blend detection, chunk weighting by model confidence, negation/contrast handling, emotion arc trajectory output, and sentiment-to-emotion mapping for non-EI language. Significantly improves long-form and ambiguous emotional inference.
9b2f3b7
# core/eil_processor.py
# MEC EIL Processor – World-Class Signal Normalization Edition
import yaml
import re
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
import torch.nn.functional as F
class EILProcessor:
def __init__(self, codex_informer, softmax_threshold=0.6):
self.codex_informer = codex_informer
self.softmax_threshold = softmax_threshold
# Build alias lookup from Codex
self.alias_lookup = self.codex_informer.build_alias_lookup()
print(f"[EILProcessor] Alias map loaded with {len(self.alias_lookup)} entries")
# Load crosswalk.yaml
with open('config/crosswalk.yaml', 'r', encoding='utf-8') as f:
yaml_data = yaml.safe_load(f)
crosswalk_data = yaml_data['crosswalk']
story_pattern_data = yaml_data.get('story_patterns', [])
# Build crosswalk lookup
self.crosswalk_lookup = {}
for entry in crosswalk_data:
phrase = self.normalize_text(entry['phrase'])
emotion_code = entry['emotion_code']
self.crosswalk_lookup[phrase] = emotion_code
# Build story_patterns lookup
self.story_patterns_lookup = {}
for entry in story_pattern_data:
pattern = self.normalize_text(entry['pattern'])
emotion_code = entry['emotion_code']
self.story_patterns_lookup[pattern] = emotion_code
print(f"[EILProcessor] Crosswalk loaded with {len(self.crosswalk_lookup)} entries")
print(f"[EILProcessor] Story Patterns loaded with {len(self.story_patterns_lookup)} entries")
# Emotion keyword dictionary for signal normalization/blending
self.emotion_keyword_map = {
"FAM-ANG": ["anger", "angry", "hate", "furious", "rage", "resentment"],
"FAM-HEL": ["helpless", "powerless", "can't", "unable", "trapped", "stuck", "overwhelmed", "overwhelm"],
"FAM-SAD": ["sad", "down", "unhappy", "miserable", "depressed", "blue", "empty"],
"FAM-FEA": ["afraid", "scared", "fear", "terrified", "worried", "nervous", "anxious", "can't sleep"],
"FAM-LOV": ["love", "loved", "loving", "caring", "affection", "proud"],
"FAM-JOY": ["joy", "happy", "excited", "delighted", "content", "proud"],
"FAM-SUR": ["surprised", "amazed", "astonished", "shocked"],
"FAM-DIS": ["disgust", "disgusted", "gross", "revolted"],
"FAM-SHA": ["ashamed", "shame", "embarrassed", "humiliated"],
"FAM-GUI": ["guilty", "guilt", "remorse", "regret"],
# Add more as needed
}
# For sentiment-to-emotion mapping of ambiguous/indirect language
self.sentiment_cue_map = [
# (sentiment, regex or cue, mapped emotion)
("negative", r"can.?t sleep|insomnia|restless|wake up", "FAM-FEA"),
("negative", r"too much|overwhelmed|can.?t cope|can.?t deal", "FAM-HEL"),
("negative", r"nothing feels right|empty|pointless|no purpose", "FAM-SAD"),
("negative", r"don't care|apathy|numb", "FAM-LON"),
("positive", r"did it|proud|relieved", "FAM-JOY"),
("neutral", r"just tired|exhausted", "FAM-HEL"),
# ...add more for coverage
]
# Load emotion and sentiment models
self.tokenizer = AutoTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base-emotion')
self.model = AutoModelForSequenceClassification.from_pretrained('cardiffnlp/twitter-roberta-base-emotion')
self.sentiment_tokenizer = AutoTokenizer.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment-latest')
self.sentiment_model = AutoModelForSequenceClassification.from_pretrained('cardiffnlp/twitter-roberta-base-sentiment-latest')
def normalize_text(self, text):
normalization_map = {
"i am feeling ": "",
"i feel ": "",
"feeling ": "",
"i'm feeling ": "",
"i am ": "",
"i'm ": ""
}
text = text.lower().strip()
for k, v in normalization_map.items():
if text.startswith(k):
text = text.replace(k, "", 1)
break
text = re.sub(r'[.!?]', '', text)
return text
def is_story_input(self, text):
clause_markers = [',', ';', '.', 'but', 'because', 'so that', 'which', 'when', 'while']
token_count = len(text.split())
clause_hits = any(marker in text for marker in clause_markers)
return token_count > 12 or clause_hits
def chunk_story(self, text):
chunks = re.split(r'[.,;!?]|\b(?:and|but|because|so|although|though|while|when)\b', text, flags=re.IGNORECASE)
chunks = [chunk.strip() for chunk in chunks if chunk and chunk.strip()]
return chunks
def detect_emotion_blend_with_negation(self, norm_text):
blend = {}
for fam, keywords in self.emotion_keyword_map.items():
for kw in keywords:
negation_patterns = [
rf"not {kw}", rf"no longer {kw}", rf"never {kw}",
rf"no {kw}", rf"\bwithout {kw}"
]
if any(re.search(p, norm_text) for p in negation_patterns):
continue
if kw in norm_text:
blend[fam] = blend.get(fam, 0) + 1.0
return blend
def get_sentiment(self, norm_text):
tokens = self.sentiment_tokenizer(norm_text, return_tensors='pt')
with torch.no_grad():
logits = self.sentiment_model(**tokens).logits
probs = F.softmax(logits, dim=-1).squeeze()
top_prob, top_idx = torch.max(probs, dim=-1)
sentiment_label = self.sentiment_model.config.id2label[top_idx.item()]
return sentiment_label.lower(), top_prob.item()
def infer_emotion(self, input_text):
norm_text = self.normalize_text(input_text)
# 1️⃣ Story Pattern Override
if norm_text in self.story_patterns_lookup:
primary_emotion_code = self.story_patterns_lookup[norm_text]
emotion_data = self.codex_informer.resolve_emotion_family(primary_emotion_code)
print(f"[EILProcessor] Story Pattern match: '{norm_text}' → {primary_emotion_code}")
packet = {
'phrases': [input_text],
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': primary_emotion_code}],
'metadata': {'source': 'EILProcessor (story pattern)', 'input_type': 'story'},
'emotion_family': emotion_data['emotion_family'],
'primary_emotion_code': emotion_data['primary_emotion_code'],
'arc': emotion_data['arc'],
'resonance': emotion_data['resonance'],
'blend': {emotion_data['primary_emotion_code']: 1.0},
'trajectory': [emotion_data['primary_emotion_code']],
}
return packet
# 2️⃣ Story detection (chunking and blend aggregation)
input_type = 'phrase'
if self.is_story_input(norm_text):
input_type = 'story'
print(f"[EILProcessor] Story mode activated for input: '{norm_text}'")
chunks = self.chunk_story(norm_text)
chunk_results = []
blend_accum = {}
trajectory = []
for chunk in chunks:
sub_result = self.infer_emotion(chunk) # RECURSIVE CALL
chunk_results.append(sub_result)
# Accumulate blends (weighted by confidence if available)
conf = sub_result.get('confidence', 1.0)
for fam, val in sub_result.get('blend', {}).items():
blend_accum[fam] = blend_accum.get(fam, 0) + val * conf
# Trajectory
if 'primary_emotion_code' in sub_result:
trajectory.append(sub_result['primary_emotion_code'])
# Normalize blend
if blend_accum:
total = sum(blend_accum.values())
for k in blend_accum:
blend_accum[k] /= total
dominant_family = max(blend_accum.items(), key=lambda x: x[1])[0]
else:
dominant_family = "FAM-NEU"
blend_accum = {"FAM-NEU": 1.0}
trajectory = ["FAM-NEU"]
emotion_data = self.codex_informer.resolve_emotion_family(dominant_family)
packet = {
'phrases': [input_text] + [r['phrases'][0] for r in chunk_results],
'emotion_candidates': [{'phrase': r['phrases'][0], 'candidate_emotion': r.get('primary_emotion_code', 'FAM-NEU')} for r in chunk_results],
'metadata': {'source': 'EILProcessor (story mode)', 'input_type': input_type},
'emotion_family': emotion_data['emotion_family'],
'primary_emotion_code': emotion_data['primary_emotion_code'],
'arc': emotion_data['arc'],
'resonance': emotion_data['resonance'],
'blend': blend_accum,
'trajectory': trajectory,
}
return packet
# 3️⃣ Crosswalk check
if norm_text in self.crosswalk_lookup:
primary_emotion_code = self.crosswalk_lookup[norm_text]
emotion_data = self.codex_informer.resolve_emotion_family(primary_emotion_code)
print(f"[EILProcessor] Crosswalk match: '{norm_text}' → {primary_emotion_code}")
packet = {
'phrases': [input_text],
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': primary_emotion_code}],
'metadata': {'source': 'EILProcessor (crosswalk)', 'input_type': input_type},
'emotion_family': emotion_data['emotion_family'],
'primary_emotion_code': emotion_data['primary_emotion_code'],
'arc': emotion_data['arc'],
'resonance': emotion_data['resonance'],
'blend': {emotion_data['primary_emotion_code']: 1.0},
'trajectory': [emotion_data['primary_emotion_code']],
}
return packet
# 4️⃣ Alias lookup
if norm_text in self.alias_lookup:
variant_code = self.alias_lookup[norm_text]
emotion_family = variant_code.split('-')[1]
family_code = f"FAM-{emotion_family}"
print(f"[EILProcessor] Alias match: '{norm_text}' → {variant_code}")
packet = {
'phrases': [input_text],
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': variant_code}],
'metadata': {'source': 'EILProcessor (alias match)', 'input_type': input_type},
'emotion_family': family_code,
'primary_emotion_code': variant_code,
'arc': 'Pending',
'resonance': 'Pending',
'blend': {variant_code: 1.0},
'trajectory': [variant_code],
}
return packet
# 5️⃣ Signal normalization - blend detection & negation
blend = self.detect_emotion_blend_with_negation(norm_text)
if blend:
total = sum(blend.values())
for k in blend:
blend[k] /= total
primary_code = max(blend.items(), key=lambda x: x[1])[0]
emotion_data = self.codex_informer.resolve_emotion_family(primary_code)
print(f"[EILProcessor] Signal normalization keyword blend: {blend} (primary: {primary_code})")
packet = {
'phrases': [input_text],
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': primary_code}],
'metadata': {'source': 'EILProcessor (signal normalization)', 'input_type': input_type},
'emotion_family': emotion_data['emotion_family'],
'primary_emotion_code': emotion_data['primary_emotion_code'],
'arc': emotion_data['arc'],
'resonance': emotion_data['resonance'],
'blend': blend,
'trajectory': [primary_code],
}
return packet
# 6️⃣ Sentiment-to-emotion mapping for non-EI language
sentiment, sentiment_conf = self.get_sentiment(norm_text)
print(f"[EILProcessor] Sentiment fallback: {sentiment} ({sentiment_conf:.2f})")
for sent, cue, fam in self.sentiment_cue_map:
if sent == sentiment and re.search(cue, norm_text):
emotion_data = self.codex_informer.resolve_emotion_family(fam)
packet = {
'phrases': [input_text],
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': fam}],
'metadata': {'source': 'EILProcessor (sentiment-to-emotion)', 'input_type': input_type},
'emotion_family': emotion_data['emotion_family'],
'primary_emotion_code': emotion_data['primary_emotion_code'],
'arc': emotion_data['arc'],
'resonance': emotion_data['resonance'],
'blend': {fam: 1.0},
'trajectory': [fam],
}
return packet
# 7️⃣ Model fallback (last resort)
print(f"[EILProcessor] No crosswalk/alias/keyword/sentiment match — running model on: '{norm_text}'")
tokens = self.tokenizer(norm_text, return_tensors='pt')
with torch.no_grad():
logits = self.model(**tokens).logits
probs = F.softmax(logits, dim=-1).squeeze()
top_prob, top_idx = torch.max(probs, dim=-1)
predicted_label = self.model.config.id2label[top_idx.item()]
confidence = top_prob.item()
if confidence < self.softmax_threshold:
predicted_label = 'neutral'
print(f"[EILProcessor] Low confidence ({confidence:.2f}) — setting to 'neutral'")
print(f"[EILProcessor] Model prediction: {predicted_label} ({confidence:.2f})")
model_to_codex_map = {
"joy": "FAM-JOY",
"anger": "FAM-ANG",
"sadness": "FAM-SAD",
"fear": "FAM-FEA",
"love": "FAM-LOV",
"surprise": "FAM-SUR",
"disgust": "FAM-DIS",
"neutral": "FAM-NEU"
}
primary_emotion_code = model_to_codex_map.get(predicted_label.lower(), "FAM-NEU")
emotion_data = self.codex_informer.resolve_emotion_family(primary_emotion_code)
blend = {emotion_data['primary_emotion_code']: 1.0}
packet = {
'phrases': [input_text],
'emotion_candidates': [{'phrase': input_text, 'candidate_emotion': predicted_label}],
'metadata': {'source': 'EILProcessor (model)', 'input_type': input_type, 'confidence': confidence},
'emotion_family': emotion_data['emotion_family'],
'primary_emotion_code': emotion_data['primary_emotion_code'],
'arc': emotion_data['arc'],
'resonance': emotion_data['resonance'],
'blend': blend,
'trajectory': [emotion_data['primary_emotion_code']],
'confidence': confidence
}
return packet