Spaces:
Running
Running
#!/usr/bin/env python3 | |
#/* DARNA.HI | |
# * Copyright (c) 2023 Seapoe1809 <https://github.com/seapoe1809> | |
# * Copyright (c) 2023 pnmeka <https://github.com/pnmeka> | |
# * | |
# * | |
# * This program is free software: you can redistribute it and/or modify | |
# * it under the terms of the GNU General Public License as published by | |
# * the Free Software Foundation, either version 3 of the License, or | |
# * (at your option) any later version. | |
# * | |
# * This program is distributed in the hope that it will be useful, | |
# * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# * GNU General Public License for more details. | |
# * | |
# * You should have received a copy of the GNU General Public License | |
# * along with this program. If not, see <http://www.gnu.org/licenses/>. | |
import pytesseract | |
from pdf2image import convert_from_path | |
import os, subprocess | |
from variables import variables | |
from variables import variables2 | |
import re | |
from PIL import Image, ImageFile | |
from datetime import datetime | |
import json | |
import fitz # PyMuPDF | |
import chromadb | |
from tqdm import tqdm | |
#from install_module.Analyze.pdf_sectionreader import * | |
#from install_module.Analyze.nlp_process import * | |
ImageFile.LOAD_TRUNCATED_IMAGES = True | |
HS_path = os.getcwd() | |
print(HS_path) | |
folderpath = os.environ.get('FOLDERPATH') | |
print("folderpath is", folderpath) | |
if folderpath: | |
ocr_files = f"{folderpath}/ocr_files" | |
else: | |
print("Session FOLDERPATH environment variable not set.") | |
APP_dir = f"{HS_path}/install_module" | |
ocr_files = f"{folderpath}/ocr_files" | |
upload_dir = f"{folderpath}/upload" | |
ip_address = variables.ip_address | |
age = variables2.age | |
sex = variables2.sex | |
try: | |
formatted_ignore_words = variables2.ignore_words if hasattr(variables2, 'ignore_words') else None | |
except NameError: | |
formatted_ignore_words = None | |
# Path to the Tesseract OCR executable (change this if necessary) | |
pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract' | |
ocr_files_dir = f'{ocr_files}/' | |
output_dir = os.path.join(ocr_files_dir, 'Darna_tesseract') | |
os.makedirs(output_dir, exist_ok=True) | |
# Define the patterns to identify and deidentify | |
# remove anything after keyword | |
KEYWORDS_REGEX = r'(?i)(?:Name|DOB|Date of birth|Birth|Address|Phone|PATIENT|Patient|MRN|Medical Record Number|APT|House|Street|ST|zip|pin):.*?(\n|$)' | |
# remove specific words | |
IGNORE_REGEX = rf'(?i)(?<!\bNO\b[-.,])(?:NO\b[-.]|[Nn][Oo]\b[-.,]|{formatted_ignore_words})' | |
KEYWORDS_REPLACE = r'\1REDACT' | |
# NAME_REGEX = r'\b(?!(?:NO\b|NO\b[-.]|[Nn][Oo]\b[-.,]))(?:[A-Z][a-z]+\s){1,2}(?:[A-Z][a-z]+)(?<!\b[A-Z]{2}\b)\b' | |
DOB_REGEX = r'\b(?!(?:NO\b|NO\b[-.]|[Nn][Oo]\b[-.,]))(?:0[1-9]|1[0-2])-(?:0[1-9]|[1-2]\d|3[0-1])-\d{4}\b' | |
SSN_REGEX = r'\b(?!(?:NO\b|NO\b[-.]|[Nn][Oo]\b[-.,]))(\d{3})-(\d{4})\b' | |
EMAIL_REGEX = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b' | |
ZIP_REGEX = r'\b(?!(?:NO\b|NOb[-.]|[Nn][Oo]\b[-.,]))([A-Z]{2}) (\d{5})\b' | |
def perform_ocr(image_path): | |
# Implementation of the perform_ocr function | |
try: | |
# Perform OCR using Tesseract | |
text = pytesseract.image_to_string(image_path) | |
return text | |
except pytesseract.TesseractError as e: | |
print(f"Error processing image: {image_path}") | |
print(f"Error message: {str(e)}") | |
return None | |
def convert_pdf_to_images(file_path): | |
# Implementation of the convert_pdf_to_images function | |
try: | |
# Convert PDF to images using pdf2image library | |
images = convert_from_path(file_path) | |
return images | |
except Exception as e: | |
print(f"Error converting PDF to images: {file_path}") | |
print(f"Error message: {str(e)}") | |
return None | |
def process_ocr_files(directory, age): | |
output_file = os.path.join(directory, 'ocr_results.txt') # Assuming you meant to define `directory` here. | |
with open(output_file, 'w') as f: | |
for root, dirs, files in os.walk(directory): | |
# Skip any paths that include the 'tesseract' directory | |
if 'tesseract' in root.split(os.sep): | |
continue | |
for file_name in files: | |
# Skip hidden files and non-image/non-PDF files explicitly | |
if file_name.startswith('.') or not file_name.lower().endswith(('.pdf', '.jpg', '.jpeg', '.png')): | |
continue | |
file_path = os.path.join(root, file_name) | |
if os.path.isfile(file_path): | |
if file_name.lower().endswith('.pdf'): | |
images = convert_pdf_to_images(file_path) | |
if images is not None: | |
for i, image in enumerate(images): | |
text = perform_ocr(image) | |
if text: | |
f.write(f"File: {file_name}, Page: {i+1}\n") | |
f.write(text) | |
f.write('\n\n') | |
image.close() | |
else: | |
# Assuming perform_ocr can handle image files directly | |
text = perform_ocr(file_path) | |
if text: | |
f.write(f"File: {file_name}\n") | |
f.write(text) | |
f.write('\n\n') | |
print('OCR completed. Results saved in', output_file) | |
def add_deidentification_tags(text): | |
return f'Deidentified Entry | {datetime.now().strftime("%m/%d/%Y")}\n{text}' | |
def generate_fake_text(match): | |
return re.sub(KEYWORDS_REGEX, KEYWORDS_REPLACE, match.group()) | |
def redact_zip_and_words(match): | |
words = match.group(1) | |
zip_code = match.group(2) | |
redacted_words = 'XX ' * min(4, len(words.split())) | |
redacted_zip = re.sub(r'\b\d{5}\b', '11111', zip_code) | |
return redacted_words + redacted_zip | |
def deidentify_records(ocr_files, formatted_ignore_words): | |
try: | |
os.makedirs(os.path.dirname(f'{ocr_files}/ocr_results.txt'), exist_ok=True) | |
try: | |
with open(f'{ocr_files}/ocr_results.txt') as f: | |
text = f.read() | |
except FileNotFoundError: | |
with open(f'{ocr_files}/ocr_results.txt', 'w') as f: | |
pass | |
text = "" | |
# remove specific words | |
IGNORE_REGEX = rf'(?i)(?<!\bNO\b[-.,])(?:NO\b[-.]|[Nn][Oo]\b[-.,]|{deidentify_words})' | |
redacted = re.sub(KEYWORDS_REGEX, generate_fake_text, text, flags=re.IGNORECASE) | |
redacted = re.sub(IGNORE_REGEX, '', redacted) | |
redacted = re.sub(DOB_REGEX, '', redacted) | |
redacted = re.sub(SSN_REGEX, '', redacted) | |
redacted = re.sub(EMAIL_REGEX, '', redacted) | |
redacted = re.sub(ZIP_REGEX, redact_zip_and_words, redacted) | |
tagged = add_deidentification_tags(redacted) | |
with open(f'{ocr_files}/Darna_tesseract/deidentified_records.txt', 'w') as f: | |
f.write(tagged) | |
print("Deidentified records printed with user input") | |
except Exception as e: | |
return f"Error in deidentification process: {str(e)}" | |
def collate_images(input_dir, output_dir): | |
images = [] | |
for root, dirs, files in os.walk(input_dir): | |
# Skip processing files in the '<tesseract>' subdirectory | |
if os.path.basename(root) == 'Darna_tesseract': | |
continue | |
for file in files: | |
# Skip all .txt files | |
if file.lower().endswith('.txt'): | |
continue | |
file_path = os.path.join(root, file) | |
try: | |
if file.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')): | |
img = Image.open(file_path) | |
if img.size[0] > 0 and img.size[1] > 0: # Check if the image is not empty | |
images.append(img) | |
img.close() | |
elif file.lower().endswith(('.pdf', '.PDF')): | |
pdf_images = convert_pdf_to_images(file_path) | |
if pdf_images is not None: | |
for pdf_img in pdf_images: | |
if pdf_img.size[0] > 0 and pdf_img.size[1] > 0: # Check if the image is not empty | |
images.append(pdf_img) | |
# No need to close PIL Images created from bytes | |
except Exception as e: | |
print(f"Error processing image: {file_path}") | |
print(f"Error message: {str(e)}") | |
continue | |
def get_recommendations(age=None, sex=None, ancestry=None, pack_years=None, smoking=None, quit_within_past_15_years=None, overweight_or_obesity=None, cardiovascular_risk=None, cardiovascular_risk_7_5_to_10=None, rh_d_negative=None, pregnant=None, new_mother=None, substance_abuse_risk=None, skin_type=None): | |
recommendations = [] | |
# Set default values when not specified | |
if ancestry is None: | |
ancestry = "not None" | |
if pack_years is None: | |
pack_years = 5 | |
if smoking is None: | |
smoking = "not None" | |
if quit_within_past_15_years is None: | |
quit_within_past_15_years = "not None" | |
if overweight_or_obesity is None: | |
overweight_or_obesity = "not None" | |
if cardiovascular_risk is None: | |
cardiovascular_risk = "not None" | |
if rh_d_negative is None: | |
rh_d_negative = "not None" | |
if cardiovascular_risk_7_5_to_10 is None: | |
cardiovascular_risk_7_5_to_10 = "not None" | |
if substance_abuse_risk is None: | |
substance_abuse_risk = "not None" | |
if skin_type is None: | |
skin_type = "not None" | |
# B - Recommended (39) | |
if (sex == 'female') and (age is not None) and (age >= 21 and age <= 65): | |
recommendations.append("Pap Smear: Cervical Cancer: Screening -- Women aged 21 to 65 years") | |
if age is not None and (age >= 50 and age <= 75): | |
recommendations.append("Colonoscopy: Colorectal Cancer: Screening -- Adults aged 50 to 75 years") | |
if age is not None and (age >= 18): | |
recommendations.append("BP: Blood pressure screening in office screening -- Adults aged 18 years and above") | |
if sex == 'female' and age >= 45: | |
recommendations.append("Coronary Risk: Screening women aged 45 and older for lipid disorders if they are at increased risk for coronary heart disease.") | |
if sex == 'male' and age >= 35: | |
recommendations.append("Fasting Lipid: Screening Men aged 35 and older for lipid disorders with fasting lipid profile.") | |
if sex == 'female' and (ancestry is not None): | |
recommendations.append("BRCA: BRCA-Related Cancer: Risk Assessment, Genetic Counseling, and Genetic Testing -- Women with a personal or family history of breast, ovarian, tubal, or peritoneal cancer or an ancestry associated with BRCA1/2 gene mutation") | |
if sex == 'female' and age >= 35: | |
recommendations.append("Breast Cancer: Medication Use to Reduce Risk -- Women at increased risk for breast cancer aged 35 years or older") | |
if (sex == 'female') and age is not None and (age >= 50 and age <= 74): | |
recommendations.append("Mammogram: Breast Cancer: Screening -- Women aged 50 to 74 years") | |
if (sex == 'female' or (new_mother is not None and new_mother)): | |
recommendations.append("Breastfeeding: Primary Care Interventions -- Pregnant women, new mothers, and their children") | |
if sex == 'female': | |
recommendations.append("Sti screen: Chlamydia and Gonorrhea: Screening -- Sexually active women, including pregnant persons") | |
if age is not None and (age >= 45 and age <= 49): | |
recommendations.append("Colonoscopy: Colorectal Cancer: Screening -- Adults aged 45 to 49 years") | |
if age is not None and (age >= 8 and age <= 18): | |
recommendations.append("Anxiety Questionnaire: Anxiety in Children and Adolescents: Screening -- Children and adolescents aged 8 to 18 years") | |
if (sex == 'pregnant' or (pregnant is not None and pregnant)): | |
recommendations.append("Aspirin for High Risk: Aspirin Use to Prevent Preeclampsia and Related Morbidity and Mortality: Preventive Medication -- Pregnant persons at high risk for preeclampsia") | |
if sex == 'pregnant': | |
recommendations.append("Urinalysis: Asymptomatic Bacteriuria in Adults: Screening -- Pregnant persons") | |
if sex == 'male' and (ancestry is not None): | |
recommendations.append("Brca Gene Test: BRCA-Related Cancer: If screen positive, risk Assessment, Genetic Counseling, and Genetic Testing -- Men with a personal or family history of breast, ovarian, tubal, or peritoneal cancer or an ancestry associated with BRCA1/2 gene mutation") | |
if sex == 'male' and age >= 65 and (pack_years is not None and pack_years > 0): | |
recommendations.append("Ultrasound Doppler Abdomen: Abdominal Aortic Aneurysm: Screening -- Men aged 65 to 75 years who have ever smoked") | |
if age is not None and (age >= 12 and age <= 18): | |
recommendations.append("Depression Screen Questionnaire: Depression and Suicide Risk in Children and Adolescents: Screening -- Adolescents aged 12 to 18 years") | |
if age is not None and (age >= 65): | |
recommendations.append("Falls Screen Questionnaire: Falls Prevention in Community-Dwelling Older Adults: Interventions -- Adults 65 years or older") | |
if (sex == 'pregnant' or (pregnant is not None and pregnant)) and (age is not None and (age >= 24)): | |
recommendations.append("Fasting Blood Glucose: Gestational Diabetes: Screening -- Asymptomatic pregnant persons at 24 weeks of gestation or after") | |
if overweight_or_obesity is not None: | |
recommendations.append("Bmi screen: If elevated BMI consider Healthy Diet and Physical Activity for Cardiovascular Disease Prevention in Adults With Cardiovascular Risk Factors: Behavioral Counseling Interventions -- Adults with cardiovascular disease risk factors") | |
if (sex == 'pregnant' or (pregnant is not None and pregnant)): | |
recommendations.append("Weight Trend: Healthy Weight and Weight Gain In Pregnancy: Behavioral Counseling Interventions -- Pregnant persons") | |
if sex == 'female' and (age is not None and (age >= 18)): | |
recommendations.append("Hepatitis B Blood Test: Hepatitis B Virus Infection in Adolescents and Adults: Screening -- Adolescents and adults at increased risk for infection") | |
if sex == 'male' and (age is not None and (age >= 18 and age <= 79)): | |
recommendations.append("Hepatitis C Blood Test: Hepatitis C Virus Infection in Adolescents and Adults: Screening -- Adults aged 18 to 79 years") | |
if sex == 'female' and (age is not None and (age >= 14)): | |
recommendations.append("Violence Questionnaire screen: Intimate Partner Violence, Elder Abuse, and Abuse of Vulnerable Adults: Screening -- Women of reproductive age") | |
if age is not None and (age >= 6 and age <= 60): | |
recommendations.append("Tb Screen Test/ Questionnaire: Latent Tuberculosis Infection in Adults: Screening -- Asymptomatic adults at increased risk of latent tuberculosis infection (LTBI)") | |
if (sex == 'male' or (sex == 'female' and (pregnant is not None and pregnant))) and (age is not None and (age >= 50 and age <= 80) and (pack_years is not None) and (smoking is not None)): | |
recommendations.append("Ct Chest: Lung Cancer screening if you smoked more that 20 pack years: Screening -- Adults aged 50 to 80 years who have a 20 pack-year smoking history and currently smoke or have quit within the past 15 years") | |
if age is not None and (age >= 6 and age <= 18): | |
recommendations.append("Bmi Screen: Obesity in Children and Adolescents: Screening -- Children and adolescents 6 years and older") | |
if sex == 'female' and (age is not None and (age < 65)): | |
recommendations.append("Dexa Bone Test: Osteoporosis to Prevent Fractures: Screening -- Postmenopausal women younger than 65 years at increased risk of osteoporosis") | |
if sex == 'female' and (age is not None and (age >= 65)): | |
recommendations.append("Dexa Bone Test: Osteoporosis to Prevent Fractures: Screening -- Women 65 years and older") | |
if (sex == 'pregnant' or (pregnant is not None and pregnant) or (new_mother is not None)): | |
recommendations.append("Depression Questionnaire: Perinatal Depression: Preventive Interventions -- Pregnant and postpartum persons") | |
if age is not None and (age >= 35 and age <= 70): | |
recommendations.append("Fasting Blood Glucose: Prediabetes and Type 2 Diabetes: Screening -- Asymptomatic adults aged 35 to 70 years who have overweight or obesity") | |
if (sex == 'pregnant' or (pregnant is not None and pregnant)): | |
recommendations.append("Bp, Questionnaire and Urine test: Preeclampsia: Screening -- Pregnant woman") | |
if age is not None and (age < 5): | |
recommendations.append("Oral Exam: Prevention of Dental Caries in Children Younger Than 5 Years: Screening and Interventions -- Children younger than 5 years") | |
if (sex == 'female' or (pregnant is not None and pregnant)) or (new_mother is not None): | |
recommendations.append("Oral Exam: Prevention of Dental Caries in Children Younger Than 5 Years: Screening and Interventions -- Children younger than 5 years") | |
if (sex == 'pregnant' or (pregnant is not None and pregnant)) and (rh_d_negative is not None): | |
recommendations.append("Rh Blood Test: Rh(D) Incompatibility especially with Rh negative: Screening -- Unsensitized Rh(D)-negative pregnant women") | |
if sex == 'male' or (sex == 'female' and (pregnant is not None and pregnant) or (new_mother is not None and new_mother)): | |
recommendations.append("Depression Questionnaire: Screening for Depression in Adults -- General adult population") | |
if sex == 'male' or (sex == 'female' and (pregnant is not None and pregnant)) or (new_mother is not None): | |
recommendations.append("Sti Screen: Sexually Transmitted Infections: Behavioral Counseling -- Sexually active adolescents and adults at increased risk") | |
if (age is not None and (age >= 25)) or (new_mother is not None) or (sex == 'male' and (substance_abuse_risk is not None)): | |
recommendations.append("Skin Exam: Skin Cancer Prevention: Behavioral Counseling -- Adults, Young adults, adolescents, children, and parents of young children") | |
if (age is not None and (age >= 40 and age <= 75)) and (cardiovascular_risk is not None) and (cardiovascular_risk_7_5_to_10 is not None): | |
recommendations.append("Heart Disease Questionnaire: Screen for CV risk and consider Statin Use for the Primary Prevention of Cardiovascular Disease in Adults: Preventive Medication -- Adults aged 40 to 75 years who have 1 or more cardiovascular risk factors and an estimated 10-year cardiovascular disease (CVD) risk of 10% or greater") | |
if sex == 'female' and (pregnant is not None and pregnant) and (ancestry is not None and ancestry == 'BRCA1/2 gene mutation'): | |
recommendations.append("Family History and Brca Test: BRCA-Related Cancer: Risk Assessment, Genetic Counseling, and Genetic Testing -- Women with a personal or family history of breast, ovarian, tubal, or peritoneal cancer or an ancestry associated with BRCA1/2 gene mutation") | |
if (age is not None and (age >= 6 and age <= 18)) or (sex == 'pregnant' or (pregnant is not None and pregnant)): | |
recommendations.append("Tobacco Questionnaire: Tobacco Use in Children and Adolescents: Primary Care Interventions -- School-aged children and adolescents who have not started to use tobacco") | |
if age is not None and (age >= 18) and (substance_abuse_risk is not None): | |
recommendations.append("Alcohol Questionnaire: Unhealthy Alcohol Use in Adolescents and Adults: Screening and Behavioral Counseling Interventions -- Adults 18 years or older, including pregnant women") | |
if age is not None and (age >= 13): | |
recommendations.append("Drug Abuse Questionnaire: Unhealthy Drug Use: Screening -- Adults age 13 years or older") | |
if age is not None and (age > 2 and age < 24) and skin_type is not None: | |
recommendations.append("Skin Exam: Skin Cancer: Counseling -- Fair-skinned individuals aged 6 months to 24 years with a family history of skin cancer or personal history of skin cancer, or who are at increased risk of skin cancer") | |
return recommendations | |
def generate_recommendations(age=None, sex=None): | |
age = f"{age}" | |
try: | |
age = int(age) | |
except ValueError: | |
print("Invalid age value. Age must be a valid integer.") | |
sex = f"{sex}" | |
recommendations = get_recommendations(age, sex) | |
# Adding subheading | |
subheading = f"The USPTF recommendations for {age}/{sex} are:" | |
subheading = f"RECOMMENDATIONS:" | |
recommendations_with_subheading = [subheading] + recommendations | |
with open(f'{ocr_files}/Darna_tesseract/USPTF_Intent.txt', 'w') as file: | |
file.write('\n\n\n'.join(recommendations_with_subheading)) | |
doc = fitz.open() # Create a new PDF | |
page = doc.new_page() | |
text = "\n\n\n".join(recommendations_with_subheading) | |
page.insert_text((72, 72), text) | |
doc.save(f'{ocr_files}/USPTF.pdf') # Save the PDF | |
doc.close() | |
#extract data from the updated fhir file | |
def extract_lforms_data(json_data): | |
if isinstance(json_data, str): | |
data = json.loads(json_data) | |
else: | |
data = json_data | |
extracted_info = { | |
"date_of_birth": None, | |
"sex": None, | |
"allergies": [], | |
"past_medical_history": [], | |
"medications": [] | |
} | |
for item in data.get("items", []): | |
if item.get("question") == "ABOUT ME": | |
for subitem in item.get("items", []): | |
if subitem.get("question") == "DATE OF BIRTH": | |
extracted_info["date_of_birth"] = subitem.get("value") | |
elif subitem.get("question") == "BIOLOGICAL SEX": | |
extracted_info["sex"] = subitem.get("value", {}).get("text") | |
elif item.get("question") == "ALLERGIES": | |
for allergy_item in item.get("items", []): | |
if allergy_item.get("question") == "Allergies and Other Dangerous Reactions": | |
for subitem in allergy_item.get("items", []): | |
if subitem.get("question") == "Name" and "value" in subitem: | |
extracted_info["allergies"].append(subitem["value"]["text"]) | |
elif item.get("question") == "PAST MEDICAL HISTORY:": | |
for condition_item in item.get("items", []): | |
if condition_item.get("question") == "PAST MEDICAL HISTORY" and "value" in condition_item: | |
condition = extract_condition(condition_item) | |
if condition: | |
extracted_info["past_medical_history"].append(condition) | |
elif item.get("question") == "MEDICATIONS:": | |
medication = {} | |
for med_item in item.get("items", []): | |
if med_item.get("question") == "MEDICATIONS": | |
medication["name"] = extract_med_value(med_item) | |
elif med_item.get("question") == "Strength": | |
medication["strength"] = extract_med_value(med_item) | |
elif med_item.get("question") == "Instructions": | |
medication["instructions"] = extract_med_value(med_item) | |
if medication: | |
extracted_info["medications"].append(medication) | |
return extracted_info | |
def extract_condition(condition_item): | |
if isinstance(condition_item.get("value"), dict): | |
return condition_item["value"].get("text", "") | |
elif isinstance(condition_item.get("value"), str): | |
return condition_item["value"] | |
return "" | |
def extract_med_value(med_item): | |
if "value" not in med_item: | |
return "" | |
value = med_item["value"] | |
if isinstance(value, str): | |
return value | |
elif isinstance(value, dict): | |
return value.get("text", "") | |
return "" | |
####### | |
###nlp_process.py functions | |
import json | |
import nltk | |
import re, os | |
from wordcloud import WordCloud | |
import matplotlib.pyplot as plt | |
from nltk.corpus import stopwords | |
from nltk.tokenize import word_tokenize | |
# Ensure NLTK components are downloaded | |
#nltk.download('punkt') | |
#nltk.download('stopwords') | |
#convert text to lowercase and remove fillers | |
def normalize_text(text): | |
# Convert text to lowercase and remove ':' and '-' | |
return re.sub('[: -]', '', text.lower()) | |
def condense_summary_to_tokens(text, token_limit=300): | |
tokens = word_tokenize(text) | |
# Select the first 'token_limit' tokens | |
limited_tokens = tokens[:token_limit] | |
# Reconstruct the text from these tokens | |
condensed_text = ' '.join(limited_tokens) | |
return condensed_text | |
#write all to a json summary file | |
def wordcloud_summary(keys, texts, directory): | |
output_file = f'{directory}/wordcloud_summary.json' | |
wordcloud_dir = f'{directory}/wordclouds' | |
try: | |
with open(output_file, 'r', encoding='utf-8') as file: | |
existing_data = json.load(file) | |
except FileNotFoundError: | |
existing_data = {} | |
# Ensure the directories exist | |
os.makedirs(os.path.dirname(output_file), exist_ok=True) | |
os.makedirs(wordcloud_dir, exist_ok=True) | |
for i, key in enumerate(keys): | |
if i < len(texts): | |
text = texts[i] | |
# Check if the text contains any words | |
if text.strip(): | |
existing_data[key] = text | |
# Attempt to generate word cloud | |
try: | |
# Split the text into words | |
words = text.split() | |
# Check if there are enough words | |
if len(words) > 1: | |
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(text) | |
# Save the word cloud | |
plt.figure(figsize=(10, 5)) | |
plt.imshow(wordcloud, interpolation='bilinear') | |
plt.axis('off') | |
plt.title(f'Word Cloud for {key}') | |
plt.savefig(f'{wordcloud_dir}/{key}_wordcloud.png') | |
plt.close() | |
print(f"Generated word cloud for key: {key}") | |
else: | |
print(f"Not enough words to generate word cloud for key: {key}") | |
except Exception as e: | |
print(f"Error generating word cloud for key {key}: {str(e)}") | |
else: | |
print(f"Skipping empty text for key: {key}") | |
else: | |
print(f"No text available for key: {key}") | |
with open(output_file, 'w', encoding='utf-8') as file: | |
json.dump(existing_data, file, indent=4, ensure_ascii=False) | |
#generate list of meds from the files | |
def load_text_from_json_meds(json_file_path, keys): | |
normalized_keys = [normalize_text(key) for key in keys] | |
with open(json_file_path, 'r') as file: | |
data = json.load(file) | |
text = [] | |
for json_key, value in data.items(): | |
normalized_json_key = normalize_text(json_key) | |
if any(normalized_key in normalized_json_key for normalized_key in normalized_keys): | |
if isinstance(value, str): | |
text.append(value) | |
elif isinstance(value, list): | |
text.extend(str(item) for item in value if item) | |
elif isinstance(value, dict): | |
text.extend(str(item) for item in value.values() if item) | |
else: | |
text.append(str(value)) | |
combined_text = ' '.join(text) | |
combined_text = condense_summary_to_tokens(combined_text, 300) | |
return combined_text | |
#generate a list of past medical history from the files | |
def load_text_from_json_pmh(json_file_path, keys): | |
normalized_keys = [normalize_text(key) for key in keys] | |
with open(json_file_path, 'r') as file: | |
data = json.load(file) | |
text = [] | |
for json_key, value in data.items(): | |
normalized_json_key = normalize_text(json_key) | |
if any(normalized_key in normalized_json_key for normalized_key in normalized_keys): | |
if isinstance(value, str): | |
text.append(value) | |
elif isinstance(value, list): | |
text.extend(str(item) for item in value if item) | |
elif isinstance(value, dict): | |
text.extend(str(item) for item in value.values() if item) | |
else: | |
text.append(str(value)) | |
combined_text = ' '.join(text) | |
combined_text = condense_summary_to_tokens(combined_text, 300) | |
return combined_text | |
#generate a list of screening items from the USPTF file | |
def load_text_from_json_screening(json_file_path, keys): | |
normalized_keys = [normalize_text(key) for key in keys] | |
with open(json_file_path, 'r') as file: | |
data = json.load(file) | |
text = [] | |
for json_key, value in data.items(): | |
normalized_json_key = normalize_text(json_key) | |
if any(normalized_key in normalized_json_key for normalized_key in normalized_keys): | |
text.append(value) | |
combined_text_screening=' '.join(text) | |
#print (combined_text_screening) | |
return combined_text_screening | |
def load_text_from_json_summary(json_file_path, keys): | |
normalized_keys = [normalize_text(key) for key in keys] | |
with open(json_file_path, 'r') as file: | |
data = json.load(file) | |
text = [] | |
for json_key, value in data.items(): | |
normalized_json_key = normalize_text(json_key) | |
if any(normalized_key in normalized_json_key for normalized_key in normalized_keys): | |
if isinstance(value, str): | |
text.append(value) | |
elif isinstance(value, list): | |
text.extend(str(item) for item in value if item) | |
elif isinstance(value, dict): | |
text.extend(str(item) for item in value.values() if item) | |
else: | |
text.append(str(value)) | |
combined_text = ' '.join(text) | |
combined_text = condense_summary_to_tokens(combined_text, 300) | |
return combined_text | |
#iterate json files in directory and call function above | |
def process_directory_summary(directory, keys): | |
combined_texts = [] | |
for filename in os.listdir(directory): | |
if filename.endswith('.json'): | |
file_path = os.path.join(directory, filename) | |
print(file_path) | |
combined_text = load_text_from_json_summary(file_path, keys) | |
if combined_text: # Only add non-empty strings | |
combined_texts.append(combined_text) | |
# Combine all texts into one | |
final_combined_text = ' '.join(combined_texts) | |
return final_combined_text | |
#iterate json files in directory and summarize meds | |
def process_directory_meds(directory, keys): | |
combined_texts = [] | |
for filename in os.listdir(directory): | |
if filename.endswith('.json'): | |
file_path = os.path.join(directory, filename) | |
print(file_path) | |
combined_text = load_text_from_json_meds(file_path, keys) | |
combined_texts.append(combined_text) | |
# Combine all texts into one | |
final_combined_text = ' '.join(combined_texts) | |
return final_combined_text | |
#iterate json files in directory and summarize past medical | |
def process_directory_pmh(directory, keys): | |
combined_texts = [] | |
for filename in os.listdir(directory): | |
if filename.endswith('.json'): | |
file_path = os.path.join(directory, filename) | |
print(file_path) | |
combined_text = load_text_from_json_pmh(file_path, keys) | |
combined_texts.append(combined_text) | |
# Combine all texts into one | |
final_combined_text = ' '.join(combined_texts) | |
return final_combined_text | |
def preprocess_and_create_wordcloud(text, directory): | |
# Tokenize and remove stopwords | |
stop_words = set(stopwords.words('english')) | |
words = word_tokenize(text) | |
filtered_words = [word for word in words if word.isalnum() and word.lower() not in stop_words] | |
# Check if there are any words left after filtering | |
if not filtered_words: | |
print("No words left after preprocessing. Skipping word cloud creation.") | |
return | |
processed_text = ' '.join(filtered_words) | |
# Create and display the word cloud | |
wordcloud = WordCloud(width=800, height=800, background_color='white').generate(processed_text) | |
plt.figure(figsize=(8, 8), facecolor=None) | |
plt.imshow(wordcloud) | |
plt.axis("off") | |
plt.tight_layout(pad=0) | |
plt.tight_layout(pad = 0) | |
# Display the word cloud | |
#plt.show() | |
# Save the word cloud image | |
plt.savefig(f'{directory}darnahi_ocr.png') | |
############# | |
pattern = r"\d+\..+?(\d{4};\d+\(\d+\):\d+–\d+\. DOI: .+?\.|.+?ed\., .+?: .+?; \d{4}\. \d+–\d+\.)" | |
class Document: | |
def __init__(self, page_content, metadata): | |
self.page_content = page_content | |
self.metadata = metadata | |
def process_pdf(file_path, chunk_size=350): | |
try: | |
doc = fitz.open(file_path) | |
full_text = "" | |
for page in doc: | |
text_blocks = page.get_text("dict")["blocks"] | |
for block in text_blocks: | |
if 'text' in block: | |
text = block['text'].strip() | |
if text: | |
full_text += text + "\n" | |
chunks = [full_text[i:i+chunk_size] for i in range(0, len(full_text), chunk_size)] | |
return chunks | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
return [] | |
def process_json(input_file): | |
try: | |
with open(input_file, 'r', encoding='utf-8') as file: | |
existing_data = json.load(file) | |
except FileNotFoundError: | |
print("File not found.") | |
return [] | |
semantic_snippets = [] | |
for heading, content in existing_data.items(): | |
metadata = {'heading': heading, 'file': input_file} | |
doc = Document(page_content=content, metadata=metadata) | |
semantic_snippets.append(doc) | |
return semantic_snippets | |
def process_files(directory): | |
all_semantic_snippets = [] | |
for filename in os.listdir(directory): | |
file_path = os.path.join(directory, filename) | |
if filename.endswith('.pdf'): | |
snippets = process_pdf(file_path) | |
all_semantic_snippets.extend(snippets) | |
elif filename.endswith('.json'): | |
semantic_snippets = process_json(file_path) | |
all_semantic_snippets.extend(semantic_snippets) | |
return all_semantic_snippets | |
def chromadb_embed(directory, collection_name="documents_collection"): | |
persist_directory = os.path.join(directory, 'Darna_tesseract', 'chroma_storage') | |
os.makedirs(persist_directory, exist_ok=True) | |
all_semantic_snippets = str(process_files(directory)) | |
client = chromadb.PersistentClient(path=persist_directory) | |
collection = client.get_or_create_collection(name=collection_name) | |
count = collection.count() | |
print(f"Collection already contains {count} documents") | |
ids = [str(i) for i in range(count, count + len(all_semantic_snippets))] | |
for i in tqdm(range(0, len(all_semantic_snippets), 100), desc="Adding documents"): | |
batch_snippets = all_semantic_snippets[i:i+100] | |
batch_metadatas = [] | |
for snippet in batch_snippets: | |
metadata = {"filename": "summary", "heading": "summary_heading"} if not isinstance(snippet, Document) else snippet.metadata | |
batch_metadatas.append(metadata) | |
collection.add(ids=ids[i:i+100], documents=[s if isinstance(s, str) else s.page_content for s in batch_snippets], metadatas=batch_metadatas) | |
new_count = collection.count() | |
print(f"Added {new_count - count} documents") | |
####################################### | |
#########pdf_sectionreader.py | |
import os | |
import fitz | |
import pandas as pd | |
import json | |
from unidecode import unidecode | |
global_heading_content_dict = {} # Global dictionary to accumulate data | |
def process_pdf_files(directory): | |
for filename in os.listdir(directory): | |
if filename.endswith('.pdf'): | |
file_path = os.path.join(directory, filename) | |
with fitz.open(file_path) as doc: | |
print(f"Processing {filename}...") | |
extract_and_tag_text(doc) | |
# Generate and save output after processing all files | |
generate_output(global_heading_content_dict, directory) | |
def extract_and_tag_text(doc): | |
block_dict, page_num = {}, 1 | |
for page in doc: | |
file_dict = page.get_text('dict') | |
block = file_dict['blocks'] | |
block_dict[page_num] = block | |
page_num += 1 | |
rows = [] | |
for page_num, blocks in block_dict.items(): | |
for block in blocks: | |
if block['type'] == 0: | |
for line in block['lines']: | |
for span in line['spans']: | |
xmin, ymin, xmax, ymax = list(span['bbox']) | |
font_size = span['size'] | |
text = unidecode(span['text']) | |
span_font = span['font'] | |
is_upper = text.isupper() | |
is_bold = "bold" in span_font.lower() | |
if text.strip() != "": | |
rows.append((xmin, ymin, xmax, ymax, text, is_upper, is_bold, span_font, font_size)) | |
span_df = pd.DataFrame(rows, columns=['xmin', 'ymin', 'xmax', 'ymax', 'text', 'is_upper', 'is_bold', 'span_font', 'font_size']) | |
common_font_size = span_df['font_size'].mode().iloc[0] | |
span_df['tag'] = span_df.apply(assign_tag, axis=1, common_font_size=common_font_size) | |
update_global_dict(span_df) | |
def assign_tag(row, common_font_size): | |
if any(char.isdigit() for char in row['text']): | |
return 'p' | |
elif row['font_size'] > common_font_size and row['is_bold'] and row['is_upper']: | |
return 'h1' | |
elif row['is_bold'] or row['is_upper'] or row['font_size'] > common_font_size: | |
return 'h2' | |
else: | |
return 'p' | |
def update_global_dict(span_df): | |
tmp = [] | |
current_heading = None | |
for index, span_row in span_df.iterrows(): | |
text, tag = span_row.text.strip(), span_row.tag | |
if 'h' in tag: | |
if current_heading is not None: | |
existing_text = global_heading_content_dict.get(current_heading, "") | |
global_heading_content_dict[current_heading] = existing_text + '\n'.join(tmp).strip() | |
current_heading = text | |
tmp = [] | |
else: | |
tmp.append(text) | |
if current_heading is not None: | |
existing_text = global_heading_content_dict.get(current_heading, "") | |
global_heading_content_dict[current_heading] = existing_text + '\n'.join(tmp).strip() | |
def generate_output(heading_content_dict, directory): | |
text_df = pd.DataFrame(list(heading_content_dict.items()), columns=['heading', 'content']) | |
#text_df.to_excel(f'{directory}/combined_output.xlsx', index=False, engine='openpyxl') | |
json_data = json.dumps(heading_content_dict, indent=4, ensure_ascii=False) | |
with open(f'{directory}/Darna_tesseract/combined_output.json', 'w', encoding='utf-8') as f: | |
f.write(json_data) | |
with open(f'{directory}/combined_output.json', 'w', encoding='utf-8') as f: | |
f.write(json_data) | |
import shutil | |
def whitelist_directory(directory, whitelist): | |
for filename in os.listdir(directory): | |
file_path = os.path.join(directory, filename) | |
if os.path.isfile(file_path) and filename not in whitelist: | |
try: | |
os.remove(file_path) | |
print(f"Removed: {file_path}") | |
except Exception as e: | |
print(f"Error removing {file_path}: {e}") | |
########################################### | |
#write files to pdf | |
def write_text_to_pdf(directory, text): | |
doc = fitz.open() # Create a new PDF | |
page = doc.new_page() # Add a new page | |
page.insert_text((72, 72), text) # Position (x, y) and text | |
doc.save(f'{directory}/fhir_data.pdf') # Save the PDF | |
doc.close() | |
def run_analyzer(age, sex, ocr_files, formatted_ignore_words): | |
try: | |
# Process OCR files with provided input | |
print("Processing OCR files") | |
process_ocr_files(ocr_files, age) | |
# Create collated file | |
collate_images(ocr_files, f"{ocr_files}/Darna_tesseract") | |
# Deidentify records | |
print("Deidentifying records") | |
deidentify_records(ocr_files, formatted_ignore_words) | |
# Generate recommendations with provided age and sex | |
print("Generating recommendations") | |
recommendations = generate_recommendations(age=age, sex=sex) | |
# Extract data from FHIR file and create PDF | |
directory = ocr_files | |
#folderpath is global directory | |
with open(f'{folderpath}/summary/chart.json', 'r') as file: | |
json_data = json.load(file) | |
extracted_info = extract_lforms_data(json.dumps(json_data)) | |
print(extracted_info) | |
json_output = json.dumps(extracted_info, indent=4) | |
write_text_to_pdf(directory, str(extracted_info)) | |
final_directory = f'{directory}/Darna_tesseract/' | |
# Process PDF files | |
process_pdf_files(directory) | |
# Write the JSON output to a file | |
with open(f'{directory}/fhir_output.json', 'w', encoding='utf-8') as f: | |
f.write(json_output) | |
# NLP Processing for summary, past medical history, medications, and screening | |
json_file_path = f'{directory}/combined_output.json' | |
keys_pmh = ['PMH', 'medical', 'past medical history', 'surgical', 'past'] | |
keys_meds = ['medications', 'MEDICATIONS:', 'medicine', 'meds'] | |
keys_summary = ['HPI', 'history', 'summary'] | |
keys_screening = ['RECS', 'RECOMMENDATIONS'] | |
# Process text data and create word clouds | |
text_summary = process_directory_summary(directory, keys_summary) | |
preprocess_and_create_wordcloud(text_summary, final_directory) | |
text_meds = process_directory_meds(directory, keys_meds) | |
text_screening = load_text_from_json_screening(json_file_path, keys_screening) | |
text_pmh = process_directory_pmh(directory, keys_pmh) | |
# Write processed texts to JSON | |
keys = ("darnahi_summary", "darnahi_past_medical_history", "darnahi_medications", "darnahi_screening") | |
texts = (text_summary, text_pmh, text_meds, text_screening) | |
wordcloud_summary(keys, texts, final_directory) | |
# CHROMA embedding | |
chromadb_embed(directory) | |
# Cleanup OCR files, but leave Darna_tesseract files | |
whitelist = ["combined_output.json"] | |
whitelist_directory(directory, whitelist) | |
except Exception as e: | |
print(f"Error during processing: {e}") | |
##CALL ANALYZER | |
run_analyzer(age, sex, ocr_files, formatted_ignore_words) | |
##Take files and add to sqlite db; extract metadata; create ai metadata and create a AI function to make extracted data meaningful | |
class MetadataGenerator: | |
def __init__(self, model='gemma3:4b'): | |
self.model = model | |
async def get_metadata(self, file_path: str, content: str) -> str: | |
"""Generate metadata for file content using Ollama""" | |
messages = [ | |
{"role": "system", "content": "You are an analyst. Provide a summary and keywords. To the point."}, | |
{"role": "user", "content": f"File: {os.path.basename(file_path)}\nContent: {content}\n\nProvide only JSON format: {{\"summary\": \"...\", \"keywords\": [\"...\", \"...\"]}}"}, | |
] | |
try: | |
OLLAMA_HOST = os.environ.get('OLLAMA_HOST', 'http://localhost:11434') | |
client = AsyncClient(host=OLLAMA_HOST) | |
response_stream = await client.chat(model=self.model, messages=messages, stream=True) | |
full_response = "" | |
async for part in response_stream: | |
full_response += part['message']['content'] | |
return full_response | |
except Exception as e: | |
return f"{{\"error\": \"{str(e)}\"}}" | |
metadata_generator = MetadataGenerator() | |
""" | |
# Process OCR files with provided input | |
print("process ocr files") | |
process_ocr_files(ocr_files, age) | |
#doesnt work | |
#create collated file | |
collate_images(ocr_files, f"{ocr_files}/Darna_tesseract") | |
# Deidentify records | |
print("debug deidentify records") | |
deidentify_records() | |
# Generate recommendations with provided age and sex | |
print("debug generate records") | |
recommendations = generate_recommendations(age=age, sex=sex) | |
#extract data from fhir file and make pdf | |
directory = ocr_files | |
with open(f'{folderpath}/summary/chart.json', 'r') as file: | |
json_data = json.load(file) | |
# Extract information using function above from fhir document and write to pdf and json file | |
extracted_info = extract_lforms_data(json.dumps(json_data)) | |
print(extracted_info) | |
#extracted_info = extract_info(json_data) | |
json_output = json.dumps(extracted_info, indent=4) | |
#extracted_info = extract_info(json_data) | |
write_text_to_pdf(directory, str(extracted_info)) | |
final_directory= f'{directory}/Darna_tesseract/' | |
#calls the CALL_FILE pdf_sectionreader | |
process_pdf_files(directory) | |
# Write the JSON output to a file and pdf file (2 lines above) | |
with open(f'{directory}/fhir_output.json', 'w', encoding='utf-8') as f: | |
f.write(json_output) | |
#CALL FILE NLP_PROCESS | |
# Usage nlp_process | |
json_file_path = f'{directory}/combined_output.json' | |
#json_file_path = 'processed_data2.json' | |
#keys_summary = ['HPI', 'History of presenting illness', 'History of', 'summary'] | |
keys_pmh = ['PMH', 'medical', 'past medical history', 'surgical', 'past'] #extracts past medical history | |
keys_meds = ['medications', 'MEDICATIONS:', 'medicine', 'meds'] #extracts medications | |
keys_summary = ['HPI', 'history', 'summary'] | |
keys_screening= ['RECS', 'RECOMMENDATIONS'] | |
#call functions and write to wordcloud and creat wordcloud.png file | |
text_summary = process_directory_summary(directory, keys_summary) | |
#creates wordcloud of uploaded files | |
preprocess_and_create_wordcloud(text_summary, final_directory) | |
text_meds = process_directory_meds(directory, keys_meds)#saves to medications in json | |
text_screening = load_text_from_json_screening(json_file_path, keys_screening)#saves to screening in json | |
text_pmh = process_directory_pmh(directory, keys_pmh)#saves to past history in json | |
#write to json using "keys":"texts" | |
keys= ("darnahi_summary", "darnahi_past_medical_history", "darnahi_medications", "darnahi_screening") | |
texts= (text_summary, text_pmh, text_meds, text_screening) | |
wordcloud_summary(keys, texts, final_directory) | |
#CHROMA MINER # Adjust this path to your directory | |
chromadb_embed(directory) | |
#remove files from ocr_files- cleanup but leave Darna_tesseract files | |
subprocess.run(f'find {directory} -maxdepth 1 -type f -exec rm {{}} +', shell=True) | |
""" | |