File size: 10,882 Bytes
90d2de3 64c1f54 90d2de3 64c1f54 90d2de3 40f9684 90d2de3 40f9684 90d2de3 64c1f54 40f9684 90d2de3 64c1f54 90d2de3 40f9684 90d2de3 b8d1a98 40f9684 21a4e41 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
"""
This module contains utility functions for text processing and other helper functions.
"""
import re
import os
import base64
def has_meaningful_content(text):
"""
Check if explanation has meaningful content.
Args:
text (str): The text to check
Returns:
bool: True if the text has meaningful content, False otherwise
"""
if not text:
return False
# Check if the text is just equal signs or other separators
stripped_text = text.strip()
if re.match(r'^[=\-_*]+$', stripped_text.replace('\n', '')):
return False
# Check if the text only contains "## REASONING" with no actual content
if "## REASONING" in stripped_text and len(stripped_text) < 20:
return False
return True
def remove_reasoning_and_sources(text):
"""
Remove reasoning, follow-up questions, and sources sections from the main response text.
Args:
text (str): The text to clean
Returns:
str: Text without reasoning, follow-up questions, and sources sections
"""
if not text:
return text
# First, remove any reasoning sections
pattern_reasoning = r'(?i)(\n+\s*reasoning:|\n+\s*\*{0,2}reasoning\*{0,2}:?|\n+\s*#{1,3}\s*reasoning).*?(?=\n+\s*(?:#{1,3}|follow[ -]?up questions:|sources:|references:|\Z))'
cleaned_text = re.sub(pattern_reasoning, '', text, flags=re.DOTALL)
# Remove follow-up questions sections
pattern_followup = r'(?i)(\n+\s*follow[ -]?up questions:|\n+\s*additional questions:|\n+\s*\*{0,2}follow[ -]?up questions\*{0,2}:?|\n+\s*#{1,3}\s*follow[ -]?up questions).*?(?=\n+\s*(?:#{1,3}|reasoning:|sources:|references:|\Z))'
cleaned_text = re.sub(pattern_followup, '', cleaned_text, flags=re.DOTALL)
# Then, remove any sources/references sections
pattern_sources = r'(?i)(\n+\s*sources:|\n+\s*references:|\n+\s*\*{0,2}sources\*{0,2}:?|\n+\s*\*{0,2}references\*{0,2}:?|\n+\s*#{1,3}\s*sources|\n+\s*#{1,3}\s*references).*?(?=\n+\s*(?:#{1,3}|\Z))'
cleaned_text = re.sub(pattern_sources, '', cleaned_text, flags=re.DOTALL)
# Also remove any source citations in the text (e.g., [1], [source_id])
cleaned_text = re.sub(r'\[([\w\d:_\-\.+]+)\]', '', cleaned_text)
# Process line by line to handle sections more comprehensively
lines = cleaned_text.split('\n')
filtered_lines = []
skip_section = False
for line in lines:
# Check if we should skip this line (part of reasoning, follow-up questions, or sources section)
if re.search(r'(?i)^(\s*reasoning:|\s*follow[ -]?up questions:|\s*additional questions:|\s*sources:|\s*references:|\s*\*{0,2}reasoning\*{0,2}:?|\s*\*{0,2}follow[ -]?up questions\*{0,2}:?|\s*\*{0,2}sources\*{0,2}:?|\s*\*{0,2}references\*{0,2}:?|\s*#{1,3}\s*reasoning|\s*#{1,3}\s*follow[ -]?up questions|\s*#{1,3}\s*sources|\s*#{1,3}\s*references)', line):
skip_section = True
continue
# Check if we're entering a new section
elif skip_section and re.search(r'(?i)^(\s*#{1,3}|\s*[a-zA-Z]+:)', line):
skip_section = False
# Only keep lines that aren't in sections we want to skip
if not skip_section:
filtered_lines.append(line)
# Remove any trailing URL citations that might be left
result = '\n'.join(filtered_lines).strip()
result = re.sub(r'\[([^\]]+)\]\(https?://[^)]+\)', r'\1', result)
# Also remove any sections starting with the headers Immediate Response or Main Response
# We want to preserve this content but remove the header itself
result = re.sub(r'(?i)^(\s*#{1,3}\s*)?immediate response:?\s*\n', '', result)
result = re.sub(r'(?i)^(\s*#{1,3}\s*)?main response:?\s*\n', '', result)
return result
def clean_explanation(text):
"""
Remove duplicate sources sections and data availability notes from explanation.
Args:
text (str): The explanation text to clean
Returns:
str: Cleaned explanation text
"""
if not text:
return text
# Remove DATA AVAILABILITY NOTE section
pattern_data_note = r'\n+\s*#{1,3}\s*DATA AVAILABILITY NOTE.*?(?=\n+\s*#{1,3}|\Z)'
cleaned_text = re.sub(pattern_data_note, '', text, flags=re.DOTALL)
# Fix formatting issues with reasoning points - ensure consistent formatting
pattern_reasoning_headers = r'(#{1,3}\s*REASONING[^#]*?)#{1,3}\s*(\d+\.\s+)'
cleaned_text = re.sub(pattern_reasoning_headers, r'\1\2', cleaned_text, flags=re.DOTALL)
# Remove any "REASONING1." pattern which creates the heading effect
cleaned_text = re.sub(r'(#{1,3}\s*REASONING)(\d+\.)', r'\1', cleaned_text)
# Normalize all reasoning points to use the same format
cleaned_text = re.sub(r'(\n+)(\d+\.)', r'\1 \2', cleaned_text)
# SIMPLER APPROACH: Remove all sources sections except the last one
# First, split the text by source section headers
pattern_sources = r'(\n+\s*#{1,3}\s+(?:SOURCES|Sources)(?:\s+USED)?[^\n]*)'
sections = re.split(pattern_sources, cleaned_text)
# Find all source sections
source_sections = []
current_section = ""
in_source = False
source_content = ""
for i, section in enumerate(sections):
# If this is a source section header
if re.match(r'\s*#{1,3}\s+(?:SOURCES|Sources)(?:\s+USED)?', section.strip()):
in_source = True
current_section = section
# If this is content after a source header
elif in_source and i > 0:
source_content = section
current_section += section
source_sections.append(current_section)
in_source = False
current_section = ""
# Remove all sources sections from the text
for section in source_sections:
cleaned_text = cleaned_text.replace(section, '')
# Clean up any double newlines
cleaned_text = re.sub(r'\n{3,}', '\n\n', cleaned_text)
# Add the sources section back with a consistent heading
if source_content.strip():
# Extract just the content without the header
source_content = source_content.strip()
# If the source content starts with bullet points, make sure they're properly formatted
source_content = re.sub(r'^(\s*)(\d+\.)', r'\1•', source_content, flags=re.MULTILINE)
# Add a clean, consistent "Sources" heading
cleaned_text = cleaned_text.strip()
if cleaned_text:
cleaned_text += "\n\n"
cleaned_text += "## Sources\n" + source_content
return cleaned_text.strip()
def get_image_base64(image_path):
"""
Encode image to base64.
Args:
image_path (str): Path to the image file
Returns:
str: Base64 encoded image or None if error
"""
try:
if os.path.exists(image_path):
with open(image_path, "rb") as img_file:
return base64.b64encode(img_file.read()).decode()
else:
print(f"Image not found: {image_path}")
return None
except Exception as e:
print(f"Error loading image: {e}")
return None
def format_conversation_history(history, patient_info=None):
"""
Format the conversation history into a string suitable for LLM processing.
Args:
history (list): List of message dictionaries
patient_info (dict, optional): Dictionary with patient information
Returns:
str: Formatted conversation text for report generation
"""
formatted_text = "# Medical Consultation\n\n"
# Add patient info if provided
if patient_info:
formatted_text += "## Patient Information\n"
formatted_text += f"* Name: {patient_info.get('name', '')}\n"
formatted_text += f"* Age: {patient_info.get('age', '')}\n"
formatted_text += f"* Gender: {patient_info.get('gender', '')}\n\n"
formatted_text += "## Conversation Transcript\n\n"
for message in history:
role = message.get("role", "").strip()
content = message.get("content", "").strip()
if not content:
continue # Skip empty messages
if role.lower() == "user":
formatted_text += f"PATIENT: {content}\n\n"
elif role.lower() == "assistant":
formatted_text += f"ASSISTANT: {content}\n\n"
# Include explanations which often contain diagnostic reasoning
if "explanation" in message and message["explanation"]:
explanation = message.get("explanation", "").strip()
if explanation:
formatted_text += f"REASONING: {explanation}\n\n"
return formatted_text
def format_follow_up_questions(questions_text):
"""
Format follow-up questions text for display.
Args:
questions_text (str): Raw follow-up questions text
Returns:
str: Formatted follow-up questions
"""
if not questions_text:
return ""
# Clean up any header text
cleaned_text = re.sub(r'(?i)^(\s*#{1,3}\s*)?follow[ -]?up questions:?\s*\n', '', questions_text)
# Ensure questions are numbered consistently
lines = cleaned_text.split('\n')
formatted_lines = []
question_num = 1
for line in lines:
# Check if this is a question line (starts with a number or bullet)
question_match = re.match(r'^\s*(?:\d+\.|\-|\•|\*)\s*(.*)', line)
if question_match:
# Replace the existing number/bullet with a consistent format
formatted_lines.append(f"{question_num}. {question_match.group(1).strip()}")
question_num += 1
elif line.strip():
# If it's not empty and doesn't look like a numbered question,
# treat it as a continuation of the previous question or a new question
if formatted_lines and formatted_lines[-1].endswith('?'):
# If the previous line ends with a question mark, this is likely a new question
formatted_lines.append(f"{question_num}. {line.strip()}")
question_num += 1
elif formatted_lines:
# Otherwise it's a continuation of the previous question
formatted_lines[-1] += " " + line.strip()
else:
# If there's no previous line, start a new question
formatted_lines.append(f"{question_num}. {line.strip()}")
question_num += 1
# Ensure each question ends with a question mark
for i in range(len(formatted_lines)):
if not formatted_lines[i].endswith('?'):
formatted_lines[i] += '?'
return '\n'.join(formatted_lines) |