| | import logging |
| | import json |
| | import json5 |
| | import time |
| | from datetime import datetime |
| | from typing import List, Dict, Any, Optional |
| | from pydantic import BaseModel, Field |
| |
|
| | |
| | logger = logging.getLogger(__name__) |
| |
|
| | class PhaseTransitionResponse(BaseModel): |
| | goals_progress: Dict[str, float] |
| | should_transition: bool |
| | next_phase: str |
| | reasoning: str |
| |
|
| | class SessionCharacteristics(BaseModel): |
| | alliance_strength: float = Field(ge=0.0, le=1.0) |
| | engagement_level: float = Field(ge=0.0, le=1.0) |
| | emotional_pattern: str |
| | cognitive_pattern: str |
| | coping_mechanisms: List[str] = Field(min_items=2) |
| | progress_quality: float = Field(ge=0.0, le=1.0) |
| | recommended_focus: str |
| |
|
| | class ConversationPhase(BaseModel): |
| | name: str |
| | description: str |
| | goals: List[str] |
| | typical_duration: int |
| | started_at: Optional[str] = None |
| | ended_at: Optional[str] = None |
| | completion_metrics: Dict[str, float] = Field(default_factory=dict) |
| |
|
| | class FlowManager: |
| | |
| | |
| | PHASES = { |
| | 'introduction': { |
| | 'description': 'Establishing rapport and identifying main concerns', |
| | 'goals': [ |
| | 'build therapeutic alliance', |
| | 'identify primary concerns', |
| | 'understand client expectations', |
| | 'establish session structure' |
| | ], |
| | 'typical_duration': 5 |
| | }, |
| | 'exploration': { |
| | 'description': 'In-depth exploration of issues and their context', |
| | 'goals': [ |
| | 'examine emotional responses', |
| | 'explore thought patterns', |
| | 'identify behavioral patterns', |
| | 'understand situational context', |
| | 'recognize relationship dynamics' |
| | ], |
| | 'typical_duration': 15 |
| | }, |
| | 'intervention': { |
| | 'description': 'Providing strategies, insights, and therapeutic interventions', |
| | 'goals': [ |
| | 'introduce coping techniques', |
| | 'reframe negative thinking', |
| | 'provide emotional validation', |
| | 'offer perspective shifts', |
| | 'suggest behavioral modifications' |
| | ], |
| | 'typical_duration': 20 |
| | }, |
| | 'conclusion': { |
| | 'description': 'Summarizing insights and establishing next steps', |
| | 'goals': [ |
| | 'review key insights', |
| | 'consolidate learning', |
| | 'identify action items', |
| | 'set intentions', |
| | 'provide closure' |
| | ], |
| | 'typical_duration': 5 |
| | } |
| | } |
| | |
| | def __init__(self, llm, session_duration: int = 45): |
| | |
| | self.llm = llm |
| | self.session_duration = session_duration * 60 |
| | |
| | |
| | self.user_sessions = {} |
| | |
| | logger.info(f"Initialized FlowManager with {session_duration} minute sessions") |
| | |
| | def _ensure_user_session(self, user_id: str): |
| | |
| | if user_id not in self.user_sessions: |
| | self.initialize_session(user_id) |
| | |
| | def initialize_session(self, user_id: str): |
| | |
| | now = datetime.now().isoformat() |
| | |
| | |
| | initial_phase = ConversationPhase( |
| | name='introduction', |
| | description=self.PHASES['introduction']['description'], |
| | goals=self.PHASES['introduction']['goals'], |
| | typical_duration=self.PHASES['introduction']['typical_duration'], |
| | started_at=now |
| | ) |
| | |
| | |
| | session_id = f"{user_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}" |
| | |
| | |
| | self.user_sessions[user_id] = { |
| | 'session_id': session_id, |
| | 'user_id': user_id, |
| | 'started_at': now, |
| | 'updated_at': now, |
| | 'current_phase': initial_phase, |
| | 'phase_history': [initial_phase], |
| | 'message_count': 0, |
| | 'emotion_history': [], |
| | 'emotion_progression': [], |
| | 'flags': { |
| | 'crisis_detected': False, |
| | 'long_silences': False |
| | }, |
| | 'llm_context': { |
| | 'session_characteristics': {} |
| | } |
| | } |
| | |
| | logger.info(f"Initialized new session for user {user_id}") |
| | return self.user_sessions[user_id] |
| | |
| | def process_message(self, user_id: str, message: str, emotions: Dict[str, float]) -> Dict[str, Any]: |
| |
|
| | self._ensure_user_session(user_id) |
| | session = self.user_sessions[user_id] |
| | |
| | |
| | now = datetime.now().isoformat() |
| | session['updated_at'] = now |
| | session['message_count'] += 1 |
| | |
| | |
| | emotion_entry = { |
| | 'timestamp': now, |
| | 'emotions': emotions, |
| | 'message_idx': session['message_count'] |
| | } |
| | session['emotion_history'].append(emotion_entry) |
| | |
| | |
| | if not session.get('emotion_progression'): |
| | session['emotion_progression'] = [] |
| | |
| | |
| | primary_emotion = max(emotions.items(), key=lambda x: x[1])[0] |
| | session['emotion_progression'].append(primary_emotion) |
| | |
| | |
| | self._check_phase_transition(user_id, message, emotions) |
| | |
| | |
| | if session['message_count'] % 5 == 0: |
| | self._update_session_characteristics(user_id) |
| | |
| | |
| | flow_context = self._create_flow_context(user_id) |
| | |
| | return flow_context |
| | |
| | def _check_phase_transition(self, user_id: str, message: str, emotions: Dict[str, float]): |
| | |
| | session = self.user_sessions[user_id] |
| | current_phase = session['current_phase'] |
| | |
| | |
| | started_at = datetime.fromisoformat(session['started_at']) |
| | now = datetime.now() |
| | elapsed_seconds = (now - started_at).total_seconds() |
| | session_progress = elapsed_seconds / self.session_duration |
| | |
| | |
| | phase_context = { |
| | 'current': current_phase.name, |
| | 'description': current_phase.description, |
| | 'goals': current_phase.goals, |
| | 'time_in_phase': (now - datetime.fromisoformat(current_phase.started_at)).total_seconds() / 60, |
| | 'session_progress': session_progress, |
| | 'message_count': session['message_count'] |
| | } |
| | |
| | |
| | min_time_in_phase_minutes = max(2, current_phase.typical_duration * 0.5) |
| | if phase_context['time_in_phase'] < min_time_in_phase_minutes: |
| | return |
| | |
| | prompt = f""" |
| | Evaluate whether this therapeutic conversation should transition to the next phase. |
| | |
| | Current conversation state: |
| | - Current phase: {current_phase.name} ("{current_phase.description}") |
| | - Goals for this phase: {', '.join(current_phase.goals)} |
| | - Time spent in this phase: {phase_context['time_in_phase']:.1f} minutes |
| | - Session progress: {session_progress * 100:.1f}% complete |
| | - Message count: {session['message_count']} |
| | |
| | Latest message from user: "{message}" |
| | |
| | Current emotions: {', '.join([f"{e} ({score:.2f})" for e, score in |
| | sorted(emotions.items(), key=lambda x: x[1], reverse=True)[:3]])} |
| | |
| | Phases in a therapeutic conversation: |
| | 1. introduction: {self.PHASES['introduction']['description']} |
| | 2. exploration: {self.PHASES['exploration']['description']} |
| | 3. intervention: {self.PHASES['intervention']['description']} |
| | 4. conclusion: {self.PHASES['conclusion']['description']} |
| | |
| | Consider: |
| | 1. Have the goals of the current phase been sufficiently addressed? |
| | 2. Is the timing appropriate considering overall session progress? |
| | 3. Is there a natural transition point in the conversation? |
| | 4. Does the emotional content suggest readiness to move forward? |
| | |
| | First, provide your analysis of whether the key goals of the current phase have been met. |
| | Then decide if the conversation should transition to the next phase. |
| | |
| | Respond with a JSON object in this format: |
| | {{ |
| | "goals_progress": {{ |
| | "goal1": 0.5, |
| | "goal2": 0.7 |
| | }}, |
| | "should_transition": false, |
| | "next_phase": "exploration", |
| | "reasoning": "brief explanation" |
| | }} |
| | |
| | Output ONLY valid JSON without additional text. |
| | """ |
| | |
| | response = self.llm.invoke(prompt) |
| | |
| | try: |
| | |
| | evaluation = json.loads(response) |
| | |
| | phase_transition = PhaseTransitionResponse.parse_obj(evaluation) |
| | |
| | |
| | for goal, score in phase_transition.goals_progress.items(): |
| | if goal in current_phase.goals: |
| | current_phase.completion_metrics[goal] = score |
| | |
| | |
| | if phase_transition.should_transition: |
| | if phase_transition.next_phase in self.PHASES: |
| | self._transition_to_phase(user_id, phase_transition.next_phase, phase_transition.reasoning) |
| | except (json.JSONDecodeError, ValueError): |
| | self._check_time_based_transition(user_id) |
| | |
| | def _check_time_based_transition(self, user_id: str): |
| | |
| | session = self.user_sessions[user_id] |
| | current_phase = session['current_phase'] |
| | |
| | |
| | started_at = datetime.fromisoformat(session['started_at']) |
| | now = datetime.now() |
| | elapsed_minutes = (now - started_at).total_seconds() / 60 |
| | |
| | |
| | intro_threshold = self.PHASES['introduction']['typical_duration'] |
| | explore_threshold = intro_threshold + self.PHASES['exploration']['typical_duration'] |
| | intervention_threshold = explore_threshold + self.PHASES['intervention']['typical_duration'] |
| | |
| | |
| | next_phase = None |
| | if current_phase.name == 'introduction' and elapsed_minutes >= intro_threshold: |
| | next_phase = 'exploration' |
| | elif current_phase.name == 'exploration' and elapsed_minutes >= explore_threshold: |
| | next_phase = 'intervention' |
| | elif current_phase.name == 'intervention' and elapsed_minutes >= intervention_threshold: |
| | next_phase = 'conclusion' |
| | |
| | if next_phase: |
| | self._transition_to_phase(user_id, next_phase, "Time-based transition") |
| | |
| | def _transition_to_phase(self, user_id: str, next_phase_name: str, reason: str): |
| | |
| | session = self.user_sessions[user_id] |
| | current_phase = session['current_phase'] |
| | |
| | |
| | now = datetime.now().isoformat() |
| | current_phase.ended_at = now |
| | |
| | |
| | new_phase = ConversationPhase( |
| | name=next_phase_name, |
| | description=self.PHASES[next_phase_name]['description'], |
| | goals=self.PHASES[next_phase_name]['goals'], |
| | typical_duration=self.PHASES[next_phase_name]['typical_duration'], |
| | started_at=now |
| | ) |
| | |
| | |
| | session['current_phase'] = new_phase |
| | session['phase_history'].append(new_phase) |
| | |
| | logger.info(f"User {user_id} transitioned from {current_phase.name} to {next_phase_name}: {reason}") |
| | |
| | def _update_session_characteristics(self, user_id: str): |
| | session = self.user_sessions[user_id] |
| | |
| | |
| | if session['message_count'] < 5: |
| | return |
| | |
| | |
| | message_sample = [] |
| | emotion_summary = {} |
| | |
| | |
| | for i, emotion_data in enumerate(session['emotion_history'][-10:]): |
| | msg_idx = emotion_data['message_idx'] |
| | if i % 2 == 0: |
| | message_sample.append(f"Message {msg_idx}: User emotions: {', '.join([f'{e}({s:.2f})' for e, s in sorted(emotion_data['emotions'].items(), key=lambda x: x[1], reverse=True)[:2]])}") |
| | |
| | |
| | for emotion, score in emotion_data['emotions'].items(): |
| | if score > 0.3: |
| | emotion_summary[emotion] = emotion_summary.get(emotion, 0) + score |
| | |
| | |
| | if emotion_summary: |
| | total = sum(emotion_summary.values()) |
| | emotion_summary = {e: s/total for e, s in emotion_summary.items()} |
| | |
| | |
| | prompt = f""" |
| | Analyze this therapy session and provide a JSON response with the following characteristics: |
| | |
| | Current session state: |
| | - Phase: {session['current_phase'].name} ({session['current_phase'].description}) |
| | - Message count: {session['message_count']} |
| | - Emotion summary: {', '.join([f'{e}({s:.2f})' for e, s in sorted(emotion_summary.items(), key=lambda x: x[1], reverse=True)])} |
| | |
| | Recent messages: |
| | {chr(10).join(message_sample)} |
| | |
| | Required JSON format: |
| | {{ |
| | "alliance_strength": 0.8, |
| | "engagement_level": 0.7, |
| | "emotional_pattern": "brief description of emotional pattern", |
| | "cognitive_pattern": "brief description of cognitive pattern", |
| | "coping_mechanisms": ["mechanism1", "mechanism2"], |
| | "progress_quality": 0.6, |
| | "recommended_focus": "brief therapeutic recommendation" |
| | }} |
| | |
| | Important: |
| | 1. Respond with ONLY the JSON object |
| | 2. Use numbers between 0.0 and 1.0 for alliance_strength, engagement_level, and progress_quality |
| | 3. Keep descriptions brief and focused |
| | 4. Include at least 2 coping mechanisms |
| | 5. Provide a specific recommended focus |
| | |
| | JSON Response: |
| | """ |
| | |
| | response = self.llm.invoke(prompt) |
| | |
| | try: |
| | |
| | characteristics = json.loads(response) |
| | |
| | session_chars = SessionCharacteristics.parse_obj(characteristics) |
| | session['llm_context']['session_characteristics'] = session_chars.dict() |
| | logger.info(f"Updated session characteristics for user {user_id}") |
| | except (json.JSONDecodeError, ValueError) as e: |
| | logger.warning(f"Failed to parse session characteristics: {e}") |
| | |
| | def _create_flow_context(self, user_id: str) -> Dict[str, Any]: |
| | |
| | session = self.user_sessions[user_id] |
| | current_phase = session['current_phase'] |
| | |
| | |
| | started_at = datetime.fromisoformat(session['started_at']) |
| | now = datetime.now() |
| | elapsed_seconds = (now - started_at).total_seconds() |
| | remaining_seconds = max(0, self.session_duration - elapsed_seconds) |
| | |
| | |
| | emotions_summary = {} |
| | for emotion_data in session['emotion_history'][-3:]: |
| | for emotion, score in emotion_data['emotions'].items(): |
| | emotions_summary[emotion] = emotions_summary.get(emotion, 0) + score |
| | |
| | if emotions_summary: |
| | primary_emotions = sorted(emotions_summary.items(), key=lambda x: x[1], reverse=True)[:3] |
| | else: |
| | primary_emotions = [] |
| | |
| | |
| | phase_guidance = [] |
| | |
| | |
| | if current_phase.name == 'introduction': |
| | phase_guidance.append("Build rapport and identify main concerns") |
| | if session['message_count'] > 3: |
| | phase_guidance.append("Begin exploring emotional context") |
| | |
| | elif current_phase.name == 'exploration': |
| | phase_guidance.append("Deepen understanding of issues and contexts") |
| | phase_guidance.append("Connect emotional patterns to identify themes") |
| | |
| | elif current_phase.name == 'intervention': |
| | phase_guidance.append("Offer support strategies and therapeutic insights") |
| | if remaining_seconds < 600: |
| | phase_guidance.append("Begin consolidating key insights") |
| | |
| | elif current_phase.name == 'conclusion': |
| | phase_guidance.append("Summarize insights and establish next steps") |
| | phase_guidance.append("Provide closure while maintaining supportive presence") |
| | |
| | |
| | if 'session_characteristics' in session['llm_context']: |
| | char = session['llm_context']['session_characteristics'] |
| | |
| | |
| | if char.get('alliance_strength', 0.8) < 0.6: |
| | phase_guidance.append("Focus on strengthening therapeutic alliance") |
| | |
| | |
| | if char.get('engagement_level', 0.8) < 0.6: |
| | phase_guidance.append("Increase engagement with more personalized responses") |
| | |
| | |
| | if 'recommended_focus' in char: |
| | phase_guidance.append(char['recommended_focus']) |
| | |
| | |
| | flow_context = { |
| | 'phase': { |
| | 'name': current_phase.name, |
| | 'description': current_phase.description, |
| | 'goals': current_phase.goals |
| | }, |
| | 'session': { |
| | 'elapsed_minutes': elapsed_seconds / 60, |
| | 'remaining_minutes': remaining_seconds / 60, |
| | 'progress_percentage': (elapsed_seconds / self.session_duration) * 100, |
| | 'message_count': session['message_count'] |
| | }, |
| | 'emotions': [{'name': e, 'intensity': s} for e, s in primary_emotions], |
| | 'guidance': phase_guidance |
| | } |
| | |
| | return flow_context |