File size: 10,882 Bytes
90d2de3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64c1f54
90d2de3
 
 
 
 
64c1f54
90d2de3
40f9684
 
 
90d2de3
40f9684
90d2de3
 
64c1f54
 
 
40f9684
90d2de3
 
 
 
 
 
 
 
 
 
 
 
 
64c1f54
 
90d2de3
 
 
 
 
 
 
 
 
 
 
 
 
 
40f9684
 
 
 
 
90d2de3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b8d1a98
40f9684
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21a4e41
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
"""
This module contains utility functions for text processing and other helper functions.
"""

import re
import os
import base64


def has_meaningful_content(text):
    """
    Check if explanation has meaningful content.
    
    Args:
        text (str): The text to check
        
    Returns:
        bool: True if the text has meaningful content, False otherwise
    """
    if not text:
        return False
    
    # Check if the text is just equal signs or other separators
    stripped_text = text.strip()
    if re.match(r'^[=\-_*]+$', stripped_text.replace('\n', '')):
        return False
    
    # Check if the text only contains "## REASONING" with no actual content
    if "## REASONING" in stripped_text and len(stripped_text) < 20:
        return False
        
    return True


def remove_reasoning_and_sources(text):
    """
    Remove reasoning, follow-up questions, and sources sections from the main response text.
    
    Args:
        text (str): The text to clean
        
    Returns:
        str: Text without reasoning, follow-up questions, and sources sections
    """
    if not text:
        return text
        
    # First, remove any reasoning sections
    pattern_reasoning = r'(?i)(\n+\s*reasoning:|\n+\s*\*{0,2}reasoning\*{0,2}:?|\n+\s*#{1,3}\s*reasoning).*?(?=\n+\s*(?:#{1,3}|follow[ -]?up questions:|sources:|references:|\Z))'
    cleaned_text = re.sub(pattern_reasoning, '', text, flags=re.DOTALL)
    
    # Remove follow-up questions sections
    pattern_followup = r'(?i)(\n+\s*follow[ -]?up questions:|\n+\s*additional questions:|\n+\s*\*{0,2}follow[ -]?up questions\*{0,2}:?|\n+\s*#{1,3}\s*follow[ -]?up questions).*?(?=\n+\s*(?:#{1,3}|reasoning:|sources:|references:|\Z))'
    cleaned_text = re.sub(pattern_followup, '', cleaned_text, flags=re.DOTALL)
    
    # Then, remove any sources/references sections
    pattern_sources = r'(?i)(\n+\s*sources:|\n+\s*references:|\n+\s*\*{0,2}sources\*{0,2}:?|\n+\s*\*{0,2}references\*{0,2}:?|\n+\s*#{1,3}\s*sources|\n+\s*#{1,3}\s*references).*?(?=\n+\s*(?:#{1,3}|\Z))'
    cleaned_text = re.sub(pattern_sources, '', cleaned_text, flags=re.DOTALL)
    
    # Also remove any source citations in the text (e.g., [1], [source_id])
    cleaned_text = re.sub(r'\[([\w\d:_\-\.+]+)\]', '', cleaned_text)
    
    # Process line by line to handle sections more comprehensively
    lines = cleaned_text.split('\n')
    filtered_lines = []
    skip_section = False
    
    for line in lines:
        # Check if we should skip this line (part of reasoning, follow-up questions, or sources section)
        if re.search(r'(?i)^(\s*reasoning:|\s*follow[ -]?up questions:|\s*additional questions:|\s*sources:|\s*references:|\s*\*{0,2}reasoning\*{0,2}:?|\s*\*{0,2}follow[ -]?up questions\*{0,2}:?|\s*\*{0,2}sources\*{0,2}:?|\s*\*{0,2}references\*{0,2}:?|\s*#{1,3}\s*reasoning|\s*#{1,3}\s*follow[ -]?up questions|\s*#{1,3}\s*sources|\s*#{1,3}\s*references)', line):
            skip_section = True
            continue
        # Check if we're entering a new section
        elif skip_section and re.search(r'(?i)^(\s*#{1,3}|\s*[a-zA-Z]+:)', line):
            skip_section = False
        
        # Only keep lines that aren't in sections we want to skip
        if not skip_section:
            filtered_lines.append(line)
    
    # Remove any trailing URL citations that might be left
    result = '\n'.join(filtered_lines).strip()
    result = re.sub(r'\[([^\]]+)\]\(https?://[^)]+\)', r'\1', result)
    
    # Also remove any sections starting with the headers Immediate Response or Main Response
    # We want to preserve this content but remove the header itself
    result = re.sub(r'(?i)^(\s*#{1,3}\s*)?immediate response:?\s*\n', '', result)
    result = re.sub(r'(?i)^(\s*#{1,3}\s*)?main response:?\s*\n', '', result)
    
    return result


def clean_explanation(text):
    """
    Remove duplicate sources sections and data availability notes from explanation.
    
    Args:
        text (str): The explanation text to clean
        
    Returns:
        str: Cleaned explanation text
    """
    if not text:
        return text
        
    # Remove DATA AVAILABILITY NOTE section
    pattern_data_note = r'\n+\s*#{1,3}\s*DATA AVAILABILITY NOTE.*?(?=\n+\s*#{1,3}|\Z)'
    cleaned_text = re.sub(pattern_data_note, '', text, flags=re.DOTALL)
    
    # Fix formatting issues with reasoning points - ensure consistent formatting
    pattern_reasoning_headers = r'(#{1,3}\s*REASONING[^#]*?)#{1,3}\s*(\d+\.\s+)'
    cleaned_text = re.sub(pattern_reasoning_headers, r'\1\2', cleaned_text, flags=re.DOTALL)
    
    # Remove any "REASONING1." pattern which creates the heading effect
    cleaned_text = re.sub(r'(#{1,3}\s*REASONING)(\d+\.)', r'\1', cleaned_text)
    
    # Normalize all reasoning points to use the same format
    cleaned_text = re.sub(r'(\n+)(\d+\.)', r'\1   \2', cleaned_text)
    
    # SIMPLER APPROACH: Remove all sources sections except the last one
    # First, split the text by source section headers
    pattern_sources = r'(\n+\s*#{1,3}\s+(?:SOURCES|Sources)(?:\s+USED)?[^\n]*)'
    sections = re.split(pattern_sources, cleaned_text)
    
    # Find all source sections
    source_sections = []
    current_section = ""
    in_source = False
    source_content = ""
    
    for i, section in enumerate(sections):
        # If this is a source section header
        if re.match(r'\s*#{1,3}\s+(?:SOURCES|Sources)(?:\s+USED)?', section.strip()):
            in_source = True
            current_section = section
        # If this is content after a source header
        elif in_source and i > 0:
            source_content = section
            current_section += section
            source_sections.append(current_section)
            in_source = False
            current_section = ""
    
    # Remove all sources sections from the text
    for section in source_sections:
        cleaned_text = cleaned_text.replace(section, '')
    
    # Clean up any double newlines
    cleaned_text = re.sub(r'\n{3,}', '\n\n', cleaned_text)
    
    # Add the sources section back with a consistent heading
    if source_content.strip():
        # Extract just the content without the header
        source_content = source_content.strip()
        
        # If the source content starts with bullet points, make sure they're properly formatted
        source_content = re.sub(r'^(\s*)(\d+\.)', r'\1•', source_content, flags=re.MULTILINE)
        
        # Add a clean, consistent "Sources" heading
        cleaned_text = cleaned_text.strip()
        if cleaned_text:
            cleaned_text += "\n\n"
        cleaned_text += "## Sources\n" + source_content
    
    return cleaned_text.strip()


def get_image_base64(image_path):
    """
    Encode image to base64.
    
    Args:
        image_path (str): Path to the image file
        
    Returns:
        str: Base64 encoded image or None if error
    """
    try:
        if os.path.exists(image_path):
            with open(image_path, "rb") as img_file:
                return base64.b64encode(img_file.read()).decode()
        else:
            print(f"Image not found: {image_path}")
            return None
    except Exception as e:
        print(f"Error loading image: {e}")
        return None


def format_conversation_history(history, patient_info=None):
    """
    Format the conversation history into a string suitable for LLM processing.
    
    Args:
        history (list): List of message dictionaries
        patient_info (dict, optional): Dictionary with patient information
        
    Returns:
        str: Formatted conversation text for report generation
    """
    formatted_text = "# Medical Consultation\n\n"
    
    # Add patient info if provided
    if patient_info:
        formatted_text += "## Patient Information\n"
        formatted_text += f"* Name: {patient_info.get('name', '')}\n"
        formatted_text += f"* Age: {patient_info.get('age', '')}\n"
        formatted_text += f"* Gender: {patient_info.get('gender', '')}\n\n"
    
    formatted_text += "## Conversation Transcript\n\n"
    
    for message in history:
        role = message.get("role", "").strip()
        content = message.get("content", "").strip()
        
        if not content:
            continue  # Skip empty messages
        
        if role.lower() == "user":
            formatted_text += f"PATIENT: {content}\n\n"
        elif role.lower() == "assistant":
            formatted_text += f"ASSISTANT: {content}\n\n"
            # Include explanations which often contain diagnostic reasoning
            if "explanation" in message and message["explanation"]:
                explanation = message.get("explanation", "").strip()
                if explanation:
                    formatted_text += f"REASONING: {explanation}\n\n"
    
    return formatted_text 


def format_follow_up_questions(questions_text):
    """
    Format follow-up questions text for display.
    
    Args:
        questions_text (str): Raw follow-up questions text
        
    Returns:
        str: Formatted follow-up questions
    """
    if not questions_text:
        return ""
        
    # Clean up any header text
    cleaned_text = re.sub(r'(?i)^(\s*#{1,3}\s*)?follow[ -]?up questions:?\s*\n', '', questions_text)
    
    # Ensure questions are numbered consistently
    lines = cleaned_text.split('\n')
    formatted_lines = []
    question_num = 1
    
    for line in lines:
        # Check if this is a question line (starts with a number or bullet)
        question_match = re.match(r'^\s*(?:\d+\.|\-|\•|\*)\s*(.*)', line)
        if question_match:
            # Replace the existing number/bullet with a consistent format
            formatted_lines.append(f"{question_num}. {question_match.group(1).strip()}")
            question_num += 1
        elif line.strip():
            # If it's not empty and doesn't look like a numbered question, 
            # treat it as a continuation of the previous question or a new question
            if formatted_lines and formatted_lines[-1].endswith('?'):
                # If the previous line ends with a question mark, this is likely a new question
                formatted_lines.append(f"{question_num}. {line.strip()}")
                question_num += 1
            elif formatted_lines:
                # Otherwise it's a continuation of the previous question
                formatted_lines[-1] += " " + line.strip()
            else:
                # If there's no previous line, start a new question
                formatted_lines.append(f"{question_num}. {line.strip()}")
                question_num += 1
    
    # Ensure each question ends with a question mark
    for i in range(len(formatted_lines)):
        if not formatted_lines[i].endswith('?'):
            formatted_lines[i] += '?'
    
    return '\n'.join(formatted_lines)