|
import json |
|
import os |
|
import re |
|
import tempfile |
|
import base64 |
|
from datetime import datetime |
|
import streamlit as st |
|
from reportlab.lib.pagesizes import A4 |
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle |
|
from reportlab.lib.enums import TA_LEFT, TA_CENTER, TA_RIGHT |
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image |
|
from reportlab.lib import colors |
|
from reportlab.lib.units import inch, cm |
|
from reportlab.platypus import PageBreak, KeepTogether |
|
import requests |
|
|
|
from model import orchestrator_chat |
|
from utils import format_conversation_history |
|
from session_state import get_full_history |
|
|
|
|
|
|
|
def fetch_current_datetime(): |
|
""" |
|
Fetch the current date and time from a public API. |
|
Returns a dictionary with date, time, and formatted timestamp. |
|
If the API call fails, falls back to system time. |
|
""" |
|
try: |
|
|
|
response = requests.get("http://worldtimeapi.org/api/ip") |
|
if response.status_code == 200: |
|
data = response.json() |
|
datetime_str = data.get('datetime') |
|
if datetime_str: |
|
|
|
dt = datetime.fromisoformat(datetime_str.replace('Z', '+00:00')) |
|
|
|
|
|
formatted_date = dt.strftime("%Y-%m-%d") |
|
formatted_time = dt.strftime("%H:%M:%S") |
|
full_timestamp = dt.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
return { |
|
"date": formatted_date, |
|
"time": formatted_time, |
|
"timestamp": full_timestamp |
|
} |
|
|
|
|
|
print("API call failed, using system time instead") |
|
now = datetime.now() |
|
return { |
|
"date": now.strftime("%Y-%m-%d"), |
|
"time": now.strftime("%H:%M:%S"), |
|
"timestamp": now.strftime("%Y-%m-%d %H:%M:%S") |
|
} |
|
except Exception as e: |
|
|
|
print(f"Error fetching time from API: {str(e)}") |
|
now = datetime.now() |
|
return { |
|
"date": now.strftime("%Y-%m-%d"), |
|
"time": now.strftime("%H:%M:%S"), |
|
"timestamp": now.strftime("%Y-%m-%d %H:%M:%S") |
|
} |
|
|
|
|
|
|
|
def get_image_base64(image_path): |
|
try: |
|
if os.path.exists(image_path): |
|
with open(image_path, "rb") as img_file: |
|
return base64.b64encode(img_file.read()).decode() |
|
else: |
|
print(f"Image not found: {image_path}") |
|
return None |
|
except Exception as e: |
|
print(f"Error loading image: {e}") |
|
return None |
|
|
|
|
|
|
|
def get_reportlab_image(image_path, width=1.5*inch, height=1.0*inch): |
|
try: |
|
if os.path.exists(image_path): |
|
return Image(image_path, width=width, height=height) |
|
else: |
|
|
|
alt_paths = [ |
|
f"assets/{os.path.basename(image_path)}", |
|
f"assets/Logo/{os.path.basename(image_path)}", |
|
image_path.replace("src/assets", "assets") |
|
] |
|
|
|
for path in alt_paths: |
|
if os.path.exists(path): |
|
return Image(path, width=width, height=height) |
|
|
|
print(f"Image not found: {image_path}") |
|
return None |
|
except Exception as e: |
|
print(f"Error creating image: {str(e)}") |
|
return None |
|
|
|
|
|
def extract_medical_json(conversation_text: str) -> dict: |
|
""" |
|
Extract medical report data from conversation text into structured JSON. |
|
Uses the existing orchestrator_chat model. |
|
""" |
|
system_prompt = """ |
|
You are an expert medical report generator. Extract ALL info from the patient-assistant conversation into this JSON schema: |
|
{ |
|
"patient": {"name":"","age":"","gender":""}, |
|
"visit_date":"YYYY-MM-DD", |
|
"chief_complaint":"", |
|
"history_of_present_illness":"", |
|
"past_medical_history":"", |
|
"medications":"", |
|
"allergies":"", |
|
"examination":"", |
|
"diagnosis":"", |
|
"recommendations":"", |
|
"reasoning":"", |
|
"sources":"" |
|
} |
|
Do NOT invent any data. Return ONLY the JSON object. |
|
""" |
|
|
|
history = [ |
|
{"role": "system", "content": system_prompt} |
|
] |
|
|
|
|
|
result, _, _, _ = orchestrator_chat( |
|
history=history, |
|
query=conversation_text, |
|
use_rag=True, |
|
is_follow_up=False |
|
) |
|
|
|
|
|
try: |
|
|
|
json_match = re.search(r'({[\s\S]*})', result) |
|
if json_match: |
|
json_str = json_match.group(1) |
|
return json.loads(json_str) |
|
else: |
|
|
|
return json.loads(result) |
|
except json.JSONDecodeError: |
|
|
|
st.error("Failed to parse report data") |
|
return { |
|
"patient": {"name":"", "age":"", "gender":""}, |
|
"visit_date": datetime.now().strftime("%Y-%m-%d"), |
|
"chief_complaint": "Error generating report" |
|
} |
|
|
|
|
|
def build_medical_report(data: dict) -> bytes: |
|
""" |
|
Generates a PDF from the extracted JSON and returns PDF bytes. |
|
Modern dark theme version with minimal design. |
|
""" |
|
|
|
fd, temp_path = tempfile.mkstemp(suffix='.pdf') |
|
os.close(fd) |
|
|
|
|
|
doc = SimpleDocTemplate( |
|
temp_path, |
|
pagesize=A4, |
|
rightMargin=1.5*cm, |
|
leftMargin=1.5*cm, |
|
topMargin=1.5*cm, |
|
bottomMargin=2*cm |
|
) |
|
|
|
|
|
background_color = colors.black |
|
text_color = colors.white |
|
accent_color = colors.purple |
|
secondary_color = colors.lavender |
|
highlight_color = colors.mediumorchid |
|
|
|
|
|
styles = getSampleStyleSheet() |
|
|
|
|
|
styles.add(ParagraphStyle( |
|
name='ReportTitle', |
|
fontName='Helvetica-Bold', |
|
fontSize=20, |
|
leading=24, |
|
alignment=TA_CENTER, |
|
spaceAfter=0.3*inch, |
|
textColor=highlight_color |
|
)) |
|
|
|
|
|
styles.add(ParagraphStyle( |
|
name='SectionHeading', |
|
fontName='Helvetica-Bold', |
|
fontSize=14, |
|
leading=16, |
|
spaceAfter=0.1*inch, |
|
alignment=TA_LEFT, |
|
textColor=accent_color |
|
)) |
|
|
|
|
|
styles.add(ParagraphStyle( |
|
name='NormalText', |
|
fontName='Helvetica', |
|
fontSize=10, |
|
leading=12, |
|
spaceAfter=0.1*inch, |
|
alignment=TA_LEFT, |
|
textColor=colors.white |
|
)) |
|
|
|
|
|
styles.add(ParagraphStyle( |
|
name='CompanyInfo', |
|
fontName='Helvetica', |
|
fontSize=9, |
|
leading=11, |
|
alignment=TA_RIGHT, |
|
textColor=secondary_color |
|
)) |
|
|
|
|
|
def add_page_background(canvas, doc): |
|
canvas.saveState() |
|
canvas.setFillColor(background_color) |
|
canvas.rect(0, 0, doc.pagesize[0], doc.pagesize[1], fill=1) |
|
canvas.restoreState() |
|
|
|
|
|
elems = [] |
|
|
|
|
|
logo_path = "src/assets/logo.png" |
|
|
|
|
|
logo_img = get_reportlab_image(logo_path, width=1.0*inch, height=1.0*inch) |
|
|
|
|
|
company_info = Paragraph( |
|
""" |
|
<b><font color="#E0B0FF">Daease</font></b><br/> |
|
Website: <font color="#E0B0FF">http://daease.com/</font><br/> |
|
Email: <font color="#E0B0FF">daease.main@gmail.com</font> |
|
""", |
|
styles['CompanyInfo'] |
|
) |
|
|
|
|
|
if logo_img: |
|
header_table = Table([[logo_img, company_info]], colWidths=[doc.width/2.0]*2) |
|
header_table.setStyle(TableStyle([ |
|
('VALIGN', (0, 0), (-1, -1), 'TOP'), |
|
('ALIGN', (0, 0), (0, 0), 'LEFT'), |
|
('ALIGN', (1, 0), (1, 0), 'RIGHT'), |
|
('BOTTOMPADDING', (0, 0), (-1, -1), 10), |
|
('BACKGROUND', (0, 0), (-1, -1), background_color), |
|
])) |
|
elems.append(header_table) |
|
else: |
|
company_header = Paragraph( |
|
""" |
|
<b><font color="#E0B0FF" size="14">Daease</font></b><br/> |
|
Website: <font color="#E0B0FF">http://daease.com/</font><br/> |
|
Email: <font color="#E0B0FF">daease.main@gmail.com</font> |
|
""", |
|
styles['CompanyInfo'] |
|
) |
|
elems.append(company_header) |
|
|
|
elems.append(Spacer(1, 0.3*inch)) |
|
|
|
|
|
title = Paragraph( |
|
'<font color="#E0B0FF">Daease</font> ' |
|
'<font color="#D8BFD8">Medical</font> ' |
|
'<font color="#DA70D6">Consultation</font> ' |
|
'<font color="#BA55D3">Report</font>', |
|
styles['ReportTitle'] |
|
) |
|
elems.append(title) |
|
elems.append(Spacer(1, 0.2*inch)) |
|
|
|
|
|
separator = Table([['']], colWidths=[doc.width], rowHeights=[1]) |
|
separator.setStyle(TableStyle([ |
|
('LINEABOVE', (0, 0), (-1, -1), 1, accent_color), |
|
('BACKGROUND', (0, 0), (-1, -1), background_color), |
|
])) |
|
elems.append(separator) |
|
elems.append(Spacer(1, 0.3*inch)) |
|
|
|
|
|
patient = data.get('patient', {}) |
|
|
|
|
|
generation_time = data.get('generation_timestamp', datetime.now().strftime("%Y-%m-%d %H:%M:%S")) |
|
|
|
|
|
patient_data = [ |
|
[Paragraph('<font color="#BA55D3"><b>Patient Information</b></font>', styles['SectionHeading']), ''], |
|
[Paragraph('Name:', styles['NormalText']), Paragraph(f'{patient.get("name", "–")}', styles['NormalText'])], |
|
[Paragraph('Age:', styles['NormalText']), Paragraph(f'{patient.get("age", "–")}', styles['NormalText'])], |
|
[Paragraph('Gender:', styles['NormalText']), Paragraph(f'{patient.get("gender", "–")}', styles['NormalText'])], |
|
[Paragraph('Visit Date:', styles['NormalText']), Paragraph(f'{data.get("visit_date", datetime.now().strftime("%Y-%m-%d"))}', styles['NormalText'])], |
|
[Paragraph('Report Generated:', styles['NormalText']), Paragraph(f'{generation_time}', styles['NormalText'])] |
|
] |
|
|
|
patient_table = Table( |
|
patient_data, |
|
colWidths=[doc.width * 0.3, doc.width * 0.7], |
|
rowHeights=[0.4*inch, 0.25*inch, 0.25*inch, 0.25*inch, 0.25*inch, 0.25*inch] |
|
) |
|
|
|
patient_table.setStyle(TableStyle([ |
|
|
|
('SPAN', (0, 0), (1, 0)), |
|
('ALIGN', (0, 0), (0, 0), 'CENTER'), |
|
|
|
|
|
('ALIGN', (0, 1), (0, -1), 'RIGHT'), |
|
('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), |
|
('RIGHTPADDING', (0, 1), (0, -1), 12), |
|
|
|
|
|
('BACKGROUND', (0, 0), (-1, -1), background_color), |
|
('TEXTCOLOR', (1, 1), (1, -1), text_color), |
|
|
|
|
|
('GRID', (0, 1), (1, -1), 0.5, secondary_color), |
|
('BOX', (0, 1), (1, -1), 1, accent_color), |
|
])) |
|
|
|
elems.append(patient_table) |
|
elems.append(Spacer(1, 0.4*inch)) |
|
|
|
|
|
|
|
medical_sections = [ |
|
("Chief Complaint", data.get('chief_complaint')), |
|
("History of Present Illness", data.get('history_of_present_illness')), |
|
("Past Medical History", data.get('past_medical_history')), |
|
("Medications", data.get('medications')), |
|
("Allergies", data.get('allergies')), |
|
("Examination Findings", data.get('examination')), |
|
("Diagnosis", data.get('diagnosis')), |
|
("Recommendations", data.get('recommendations')), |
|
("Reasoning", data.get('reasoning')), |
|
("Sources", data.get('sources')) |
|
] |
|
|
|
|
|
elems.append(Paragraph('<font color="#BA55D3"><b>Medical Information</b></font>', styles['SectionHeading'])) |
|
elems.append(Spacer(1, 0.1*inch)) |
|
|
|
|
|
for title, content in medical_sections: |
|
if not content or str(content).strip() == '': |
|
content = '–' |
|
|
|
|
|
if isinstance(content, list): |
|
content = '\n'.join(map(str, content)) |
|
elif content is not None and not isinstance(content, str): |
|
content = str(content) |
|
|
|
|
|
styled_content = Paragraph(content, styles['NormalText']) |
|
|
|
|
|
section_data = [ |
|
[Paragraph(f'<b>{title}</b>', styles['NormalText']), styled_content] |
|
] |
|
|
|
section_table = Table( |
|
section_data, |
|
colWidths=[doc.width * 0.25, doc.width * 0.75], |
|
rowHeights=None |
|
) |
|
|
|
section_table.setStyle(TableStyle([ |
|
|
|
('VALIGN', (0, 0), (0, 0), 'TOP'), |
|
('ALIGN', (0, 0), (0, 0), 'RIGHT'), |
|
('RIGHTPADDING', (0, 0), (0, 0), 12), |
|
('TOPPADDING', (0, 0), (1, 0), 8), |
|
('BOTTOMPADDING', (0, 0), (1, 0), 8), |
|
|
|
|
|
('BACKGROUND', (0, 0), (-1, -1), background_color), |
|
|
|
|
|
('LINEBELOW', (0, 0), (1, 0), 0.5, secondary_color), |
|
])) |
|
|
|
elems.append(section_table) |
|
|
|
|
|
elems.append(Spacer(1, 0.4*inch)) |
|
|
|
|
|
sig_separator = Table([['']], colWidths=[doc.width], rowHeights=[1]) |
|
sig_separator.setStyle(TableStyle([ |
|
('LINEABOVE', (0, 0), (-1, -1), 0.5, accent_color), |
|
('BACKGROUND', (0, 0), (-1, -1), background_color), |
|
])) |
|
elems.append(sig_separator) |
|
elems.append(Spacer(1, 0.2*inch)) |
|
|
|
|
|
signature_text = Paragraph('<font color="#D8BFD8"><i>Generated by</i></font>', styles['NormalText']) |
|
ai_name = Paragraph('<font color="#BA55D3"><b>Daease AI Medical Assistant</b></font>', styles['NormalText']) |
|
|
|
|
|
signature_img = get_reportlab_image("src/assets/AI Generated Report.png", width=1.0*inch, height=1.0*inch) |
|
|
|
if signature_img: |
|
sig_table = Table( |
|
[[signature_text, ''], |
|
[signature_img, ''], |
|
[ai_name, '']], |
|
colWidths=[doc.width * 0.3, doc.width * 0.7], |
|
rowHeights=[None, None, None] |
|
) |
|
|
|
sig_table.setStyle(TableStyle([ |
|
('ALIGN', (0, 0), (0, -1), 'CENTER'), |
|
('VALIGN', (0, 0), (0, -1), 'MIDDLE'), |
|
('BACKGROUND', (0, 0), (-1, -1), background_color), |
|
])) |
|
elems.append(sig_table) |
|
else: |
|
sig_text = Paragraph( |
|
'<font color="#D8BFD8"><i>Generated by</i></font><br/>' |
|
'<font color="#BA55D3"><b>Daease AI Medical Assistant</b></font>', |
|
styles['NormalText'] |
|
) |
|
elems.append(sig_text) |
|
|
|
|
|
elems.append(Spacer(1, 0.3*inch)) |
|
disclaimer = Paragraph( |
|
'<font color="#A9A9A9"><i>Disclaimer: This report is generated by AI for informational purposes only. ' |
|
'It should not replace professional medical advice, diagnosis, or treatment. ' |
|
'Always consult with a qualified healthcare provider for medical concerns.</i></font>', |
|
ParagraphStyle( |
|
name='Disclaimer', |
|
fontName='Helvetica-Oblique', |
|
fontSize=8, |
|
alignment=TA_CENTER |
|
) |
|
) |
|
elems.append(disclaimer) |
|
|
|
|
|
doc.build(elems, onFirstPage=add_page_background, onLaterPages=add_page_background) |
|
|
|
|
|
with open(temp_path, 'rb') as file: |
|
pdf_bytes = file.read() |
|
|
|
|
|
os.unlink(temp_path) |
|
|
|
return pdf_bytes |
|
|
|
|
|
def generate_and_download_report(): |
|
""" |
|
Generate a medical report from the conversation history. |
|
Shows a form for entering patient info and then allows downloading the report. |
|
""" |
|
|
|
if st.session_state.get('processing', False): |
|
return |
|
|
|
|
|
if st.session_state.report_step == 1: |
|
st.text_input("Patient Name", key="patient_name", |
|
value=st.session_state.patient_info.get("name", "")) |
|
st.text_input("Age", key="patient_age", |
|
value=st.session_state.patient_info.get("age", "")) |
|
|
|
|
|
gender = st.session_state.patient_info.get("gender", "Male") |
|
if gender not in ["Male", "Female", "Other"]: |
|
gender = "Male" |
|
|
|
st.selectbox("Gender", ["Male", "Female", "Other"], key="patient_gender", |
|
index=["Male", "Female", "Other"].index(gender)) |
|
|
|
if st.button("Generate Report"): |
|
|
|
st.session_state.patient_info = { |
|
"name": st.session_state.patient_name, |
|
"age": st.session_state.patient_age, |
|
"gender": st.session_state.patient_gender |
|
} |
|
st.session_state.report_step = 2 |
|
st.rerun() |
|
|
|
|
|
elif st.session_state.report_step == 2: |
|
with st.spinner("Generating report..."): |
|
try: |
|
|
|
full_history = get_full_history() |
|
|
|
|
|
formatted_conversation = format_conversation_history( |
|
full_history, |
|
st.session_state.patient_info |
|
) |
|
|
|
|
|
medical_data = extract_medical_json(formatted_conversation) |
|
|
|
|
|
medical_data["patient"].update(st.session_state.patient_info) |
|
|
|
|
|
datetime_data = fetch_current_datetime() |
|
|
|
|
|
medical_data["visit_date"] = datetime_data["date"] |
|
|
|
|
|
medical_data["generation_timestamp"] = datetime_data["timestamp"] |
|
|
|
|
|
pdf_data = build_medical_report(medical_data) |
|
|
|
|
|
st.session_state.pdf_data = pdf_data |
|
|
|
|
|
st.download_button( |
|
label="Download Report", |
|
data=pdf_data, |
|
file_name=f"medical_report_{st.session_state.patient_info['name'].replace(' ', '_')}.pdf", |
|
mime="application/pdf", |
|
use_container_width=True |
|
) |
|
|
|
|
|
if st.button("Email Report", use_container_width=True): |
|
st.session_state.show_email_form = True |
|
|
|
|
|
if st.session_state.show_email_form: |
|
show_email_form() |
|
|
|
|
|
if st.button("New Report", use_container_width=True): |
|
|
|
st.session_state.report_step = 1 |
|
|
|
if 'pdf_data' in st.session_state: |
|
del st.session_state.pdf_data |
|
|
|
st.session_state.patient_info = {"name": "", "age": "", "gender": ""} |
|
|
|
st.session_state.show_email_form = False |
|
|
|
st.rerun() |
|
|
|
except Exception as e: |
|
st.error(f"Error generating report: {str(e)}") |
|
st.button("Try Again", on_click=lambda: setattr(st.session_state, "report_step", 1)) |
|
|
|
|
|
def show_email_form(): |
|
"""Display the email form for sending reports""" |
|
from sendgrid_service import SendGridService |
|
|
|
st.subheader("Send Report via Email") |
|
|
|
|
|
sendgrid_api_key = os.getenv('SENDGRID_API_KEY') |
|
if not sendgrid_api_key: |
|
st.warning("Email service not configured. Please contact administrator.") |
|
|
|
|
|
email = st.text_input("Recipient Email Address:") |
|
|
|
|
|
if st.button("Send"): |
|
if not email: |
|
st.error("Please enter an email address.") |
|
else: |
|
|
|
sendgrid = SendGridService() |
|
if not sendgrid.validate_email(email): |
|
st.error("Please enter a valid email address.") |
|
else: |
|
|
|
with st.spinner("Sending email... This may take a few seconds."): |
|
success, message = sendgrid.send_report( |
|
email, |
|
st.session_state.pdf_data, |
|
st.session_state.patient_info["name"] |
|
) |
|
|
|
if success: |
|
st.success(message) |
|
|
|
st.session_state.show_email_form = False |
|
st.rerun() |
|
else: |
|
st.error(message) |
|
|
|
|
|
if st.button("Cancel"): |
|
st.session_state.show_email_form = False |
|
st.rerun() |