seapoe1809's picture
Upload 201 files
571f20f verified
#!/usr/bin/env python3
#/* DARNA.HI
# * Copyright (c) 2023 Seapoe1809 <https://github.com/seapoe1809>
# * Copyright (c) 2023 pnmeka <https://github.com/pnmeka>
# *
# *
# * This program is free software: you can redistribute it and/or modify
# * it under the terms of the GNU General Public License as published by
# * the Free Software Foundation, either version 3 of the License, or
# * (at your option) any later version.
# *
# * This program is distributed in the hope that it will be useful,
# * but WITHOUT ANY WARRANTY; without even the implied warranty of
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# * GNU General Public License for more details.
# *
# * You should have received a copy of the GNU General Public License
# * along with this program. If not, see <http://www.gnu.org/licenses/>.
import pytesseract
from pdf2image import convert_from_path
import os, subprocess
from variables import variables
from variables import variables2
import re
from PIL import Image, ImageFile
from datetime import datetime
import json
import fitz # PyMuPDF
import chromadb
from tqdm import tqdm
#from install_module.Analyze.pdf_sectionreader import *
#from install_module.Analyze.nlp_process import *
ImageFile.LOAD_TRUNCATED_IMAGES = True
HS_path = os.getcwd()
print(HS_path)
folderpath = os.environ.get('FOLDERPATH')
print("folderpath is", folderpath)
if folderpath:
ocr_files = f"{folderpath}/ocr_files"
else:
print("Session FOLDERPATH environment variable not set.")
APP_dir = f"{HS_path}/install_module"
ocr_files = f"{folderpath}/ocr_files"
upload_dir = f"{folderpath}/upload"
ip_address = variables.ip_address
age = variables2.age
sex = variables2.sex
try:
formatted_ignore_words = variables2.ignore_words if hasattr(variables2, 'ignore_words') else None
except NameError:
formatted_ignore_words = None
# Path to the Tesseract OCR executable (change this if necessary)
pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract'
ocr_files_dir = f'{ocr_files}/'
output_dir = os.path.join(ocr_files_dir, 'Darna_tesseract')
os.makedirs(output_dir, exist_ok=True)
# Define the patterns to identify and deidentify
# remove anything after keyword
KEYWORDS_REGEX = r'(?i)(?:Name|DOB|Date of birth|Birth|Address|Phone|PATIENT|Patient|MRN|Medical Record Number|APT|House|Street|ST|zip|pin):.*?(\n|$)'
# remove specific words
IGNORE_REGEX = rf'(?i)(?<!\bNO\b[-.,])(?:NO\b[-.]|[Nn][Oo]\b[-.,]|{formatted_ignore_words})'
KEYWORDS_REPLACE = r'\1REDACT'
# NAME_REGEX = r'\b(?!(?:NO\b|NO\b[-.]|[Nn][Oo]\b[-.,]))(?:[A-Z][a-z]+\s){1,2}(?:[A-Z][a-z]+)(?<!\b[A-Z]{2}\b)\b'
DOB_REGEX = r'\b(?!(?:NO\b|NO\b[-.]|[Nn][Oo]\b[-.,]))(?:0[1-9]|1[0-2])-(?:0[1-9]|[1-2]\d|3[0-1])-\d{4}\b'
SSN_REGEX = r'\b(?!(?:NO\b|NO\b[-.]|[Nn][Oo]\b[-.,]))(\d{3})-(\d{4})\b'
EMAIL_REGEX = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b'
ZIP_REGEX = r'\b(?!(?:NO\b|NOb[-.]|[Nn][Oo]\b[-.,]))([A-Z]{2}) (\d{5})\b'
def perform_ocr(image_path):
# Implementation of the perform_ocr function
try:
# Perform OCR using Tesseract
text = pytesseract.image_to_string(image_path)
return text
except pytesseract.TesseractError as e:
print(f"Error processing image: {image_path}")
print(f"Error message: {str(e)}")
return None
def convert_pdf_to_images(file_path):
# Implementation of the convert_pdf_to_images function
try:
# Convert PDF to images using pdf2image library
images = convert_from_path(file_path)
return images
except Exception as e:
print(f"Error converting PDF to images: {file_path}")
print(f"Error message: {str(e)}")
return None
def process_ocr_files(directory, age):
output_file = os.path.join(directory, 'ocr_results.txt') # Assuming you meant to define `directory` here.
with open(output_file, 'w') as f:
for root, dirs, files in os.walk(directory):
# Skip any paths that include the 'tesseract' directory
if 'tesseract' in root.split(os.sep):
continue
for file_name in files:
# Skip hidden files and non-image/non-PDF files explicitly
if file_name.startswith('.') or not file_name.lower().endswith(('.pdf', '.jpg', '.jpeg', '.png')):
continue
file_path = os.path.join(root, file_name)
if os.path.isfile(file_path):
if file_name.lower().endswith('.pdf'):
images = convert_pdf_to_images(file_path)
if images is not None:
for i, image in enumerate(images):
text = perform_ocr(image)
if text:
f.write(f"File: {file_name}, Page: {i+1}\n")
f.write(text)
f.write('\n\n')
image.close()
else:
# Assuming perform_ocr can handle image files directly
text = perform_ocr(file_path)
if text:
f.write(f"File: {file_name}\n")
f.write(text)
f.write('\n\n')
print('OCR completed. Results saved in', output_file)
def add_deidentification_tags(text):
return f'Deidentified Entry | {datetime.now().strftime("%m/%d/%Y")}\n{text}'
def generate_fake_text(match):
return re.sub(KEYWORDS_REGEX, KEYWORDS_REPLACE, match.group())
def redact_zip_and_words(match):
words = match.group(1)
zip_code = match.group(2)
redacted_words = 'XX ' * min(4, len(words.split()))
redacted_zip = re.sub(r'\b\d{5}\b', '11111', zip_code)
return redacted_words + redacted_zip
def deidentify_records(ocr_files, formatted_ignore_words):
try:
os.makedirs(os.path.dirname(f'{ocr_files}/ocr_results.txt'), exist_ok=True)
try:
with open(f'{ocr_files}/ocr_results.txt') as f:
text = f.read()
except FileNotFoundError:
with open(f'{ocr_files}/ocr_results.txt', 'w') as f:
pass
text = ""
# remove specific words
IGNORE_REGEX = rf'(?i)(?<!\bNO\b[-.,])(?:NO\b[-.]|[Nn][Oo]\b[-.,]|{deidentify_words})'
redacted = re.sub(KEYWORDS_REGEX, generate_fake_text, text, flags=re.IGNORECASE)
redacted = re.sub(IGNORE_REGEX, '', redacted)
redacted = re.sub(DOB_REGEX, '', redacted)
redacted = re.sub(SSN_REGEX, '', redacted)
redacted = re.sub(EMAIL_REGEX, '', redacted)
redacted = re.sub(ZIP_REGEX, redact_zip_and_words, redacted)
tagged = add_deidentification_tags(redacted)
with open(f'{ocr_files}/Darna_tesseract/deidentified_records.txt', 'w') as f:
f.write(tagged)
print("Deidentified records printed with user input")
except Exception as e:
return f"Error in deidentification process: {str(e)}"
def collate_images(input_dir, output_dir):
images = []
for root, dirs, files in os.walk(input_dir):
# Skip processing files in the '<tesseract>' subdirectory
if os.path.basename(root) == 'Darna_tesseract':
continue
for file in files:
# Skip all .txt files
if file.lower().endswith('.txt'):
continue
file_path = os.path.join(root, file)
try:
if file.lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
img = Image.open(file_path)
if img.size[0] > 0 and img.size[1] > 0: # Check if the image is not empty
images.append(img)
img.close()
elif file.lower().endswith(('.pdf', '.PDF')):
pdf_images = convert_pdf_to_images(file_path)
if pdf_images is not None:
for pdf_img in pdf_images:
if pdf_img.size[0] > 0 and pdf_img.size[1] > 0: # Check if the image is not empty
images.append(pdf_img)
# No need to close PIL Images created from bytes
except Exception as e:
print(f"Error processing image: {file_path}")
print(f"Error message: {str(e)}")
continue
def get_recommendations(age=None, sex=None, ancestry=None, pack_years=None, smoking=None, quit_within_past_15_years=None, overweight_or_obesity=None, cardiovascular_risk=None, cardiovascular_risk_7_5_to_10=None, rh_d_negative=None, pregnant=None, new_mother=None, substance_abuse_risk=None, skin_type=None):
recommendations = []
# Set default values when not specified
if ancestry is None:
ancestry = "not None"
if pack_years is None:
pack_years = 5
if smoking is None:
smoking = "not None"
if quit_within_past_15_years is None:
quit_within_past_15_years = "not None"
if overweight_or_obesity is None:
overweight_or_obesity = "not None"
if cardiovascular_risk is None:
cardiovascular_risk = "not None"
if rh_d_negative is None:
rh_d_negative = "not None"
if cardiovascular_risk_7_5_to_10 is None:
cardiovascular_risk_7_5_to_10 = "not None"
if substance_abuse_risk is None:
substance_abuse_risk = "not None"
if skin_type is None:
skin_type = "not None"
# B - Recommended (39)
if (sex == 'female') and (age is not None) and (age >= 21 and age <= 65):
recommendations.append("Pap Smear: Cervical Cancer: Screening -- Women aged 21 to 65 years")
if age is not None and (age >= 50 and age <= 75):
recommendations.append("Colonoscopy: Colorectal Cancer: Screening -- Adults aged 50 to 75 years")
if age is not None and (age >= 18):
recommendations.append("BP: Blood pressure screening in office screening -- Adults aged 18 years and above")
if sex == 'female' and age >= 45:
recommendations.append("Coronary Risk: Screening women aged 45 and older for lipid disorders if they are at increased risk for coronary heart disease.")
if sex == 'male' and age >= 35:
recommendations.append("Fasting Lipid: Screening Men aged 35 and older for lipid disorders with fasting lipid profile.")
if sex == 'female' and (ancestry is not None):
recommendations.append("BRCA: BRCA-Related Cancer: Risk Assessment, Genetic Counseling, and Genetic Testing -- Women with a personal or family history of breast, ovarian, tubal, or peritoneal cancer or an ancestry associated with BRCA1/2 gene mutation")
if sex == 'female' and age >= 35:
recommendations.append("Breast Cancer: Medication Use to Reduce Risk -- Women at increased risk for breast cancer aged 35 years or older")
if (sex == 'female') and age is not None and (age >= 50 and age <= 74):
recommendations.append("Mammogram: Breast Cancer: Screening -- Women aged 50 to 74 years")
if (sex == 'female' or (new_mother is not None and new_mother)):
recommendations.append("Breastfeeding: Primary Care Interventions -- Pregnant women, new mothers, and their children")
if sex == 'female':
recommendations.append("Sti screen: Chlamydia and Gonorrhea: Screening -- Sexually active women, including pregnant persons")
if age is not None and (age >= 45 and age <= 49):
recommendations.append("Colonoscopy: Colorectal Cancer: Screening -- Adults aged 45 to 49 years")
if age is not None and (age >= 8 and age <= 18):
recommendations.append("Anxiety Questionnaire: Anxiety in Children and Adolescents: Screening -- Children and adolescents aged 8 to 18 years")
if (sex == 'pregnant' or (pregnant is not None and pregnant)):
recommendations.append("Aspirin for High Risk: Aspirin Use to Prevent Preeclampsia and Related Morbidity and Mortality: Preventive Medication -- Pregnant persons at high risk for preeclampsia")
if sex == 'pregnant':
recommendations.append("Urinalysis: Asymptomatic Bacteriuria in Adults: Screening -- Pregnant persons")
if sex == 'male' and (ancestry is not None):
recommendations.append("Brca Gene Test: BRCA-Related Cancer: If screen positive, risk Assessment, Genetic Counseling, and Genetic Testing -- Men with a personal or family history of breast, ovarian, tubal, or peritoneal cancer or an ancestry associated with BRCA1/2 gene mutation")
if sex == 'male' and age >= 65 and (pack_years is not None and pack_years > 0):
recommendations.append("Ultrasound Doppler Abdomen: Abdominal Aortic Aneurysm: Screening -- Men aged 65 to 75 years who have ever smoked")
if age is not None and (age >= 12 and age <= 18):
recommendations.append("Depression Screen Questionnaire: Depression and Suicide Risk in Children and Adolescents: Screening -- Adolescents aged 12 to 18 years")
if age is not None and (age >= 65):
recommendations.append("Falls Screen Questionnaire: Falls Prevention in Community-Dwelling Older Adults: Interventions -- Adults 65 years or older")
if (sex == 'pregnant' or (pregnant is not None and pregnant)) and (age is not None and (age >= 24)):
recommendations.append("Fasting Blood Glucose: Gestational Diabetes: Screening -- Asymptomatic pregnant persons at 24 weeks of gestation or after")
if overweight_or_obesity is not None:
recommendations.append("Bmi screen: If elevated BMI consider Healthy Diet and Physical Activity for Cardiovascular Disease Prevention in Adults With Cardiovascular Risk Factors: Behavioral Counseling Interventions -- Adults with cardiovascular disease risk factors")
if (sex == 'pregnant' or (pregnant is not None and pregnant)):
recommendations.append("Weight Trend: Healthy Weight and Weight Gain In Pregnancy: Behavioral Counseling Interventions -- Pregnant persons")
if sex == 'female' and (age is not None and (age >= 18)):
recommendations.append("Hepatitis B Blood Test: Hepatitis B Virus Infection in Adolescents and Adults: Screening -- Adolescents and adults at increased risk for infection")
if sex == 'male' and (age is not None and (age >= 18 and age <= 79)):
recommendations.append("Hepatitis C Blood Test: Hepatitis C Virus Infection in Adolescents and Adults: Screening -- Adults aged 18 to 79 years")
if sex == 'female' and (age is not None and (age >= 14)):
recommendations.append("Violence Questionnaire screen: Intimate Partner Violence, Elder Abuse, and Abuse of Vulnerable Adults: Screening -- Women of reproductive age")
if age is not None and (age >= 6 and age <= 60):
recommendations.append("Tb Screen Test/ Questionnaire: Latent Tuberculosis Infection in Adults: Screening -- Asymptomatic adults at increased risk of latent tuberculosis infection (LTBI)")
if (sex == 'male' or (sex == 'female' and (pregnant is not None and pregnant))) and (age is not None and (age >= 50 and age <= 80) and (pack_years is not None) and (smoking is not None)):
recommendations.append("Ct Chest: Lung Cancer screening if you smoked more that 20 pack years: Screening -- Adults aged 50 to 80 years who have a 20 pack-year smoking history and currently smoke or have quit within the past 15 years")
if age is not None and (age >= 6 and age <= 18):
recommendations.append("Bmi Screen: Obesity in Children and Adolescents: Screening -- Children and adolescents 6 years and older")
if sex == 'female' and (age is not None and (age < 65)):
recommendations.append("Dexa Bone Test: Osteoporosis to Prevent Fractures: Screening -- Postmenopausal women younger than 65 years at increased risk of osteoporosis")
if sex == 'female' and (age is not None and (age >= 65)):
recommendations.append("Dexa Bone Test: Osteoporosis to Prevent Fractures: Screening -- Women 65 years and older")
if (sex == 'pregnant' or (pregnant is not None and pregnant) or (new_mother is not None)):
recommendations.append("Depression Questionnaire: Perinatal Depression: Preventive Interventions -- Pregnant and postpartum persons")
if age is not None and (age >= 35 and age <= 70):
recommendations.append("Fasting Blood Glucose: Prediabetes and Type 2 Diabetes: Screening -- Asymptomatic adults aged 35 to 70 years who have overweight or obesity")
if (sex == 'pregnant' or (pregnant is not None and pregnant)):
recommendations.append("Bp, Questionnaire and Urine test: Preeclampsia: Screening -- Pregnant woman")
if age is not None and (age < 5):
recommendations.append("Oral Exam: Prevention of Dental Caries in Children Younger Than 5 Years: Screening and Interventions -- Children younger than 5 years")
if (sex == 'female' or (pregnant is not None and pregnant)) or (new_mother is not None):
recommendations.append("Oral Exam: Prevention of Dental Caries in Children Younger Than 5 Years: Screening and Interventions -- Children younger than 5 years")
if (sex == 'pregnant' or (pregnant is not None and pregnant)) and (rh_d_negative is not None):
recommendations.append("Rh Blood Test: Rh(D) Incompatibility especially with Rh negative: Screening -- Unsensitized Rh(D)-negative pregnant women")
if sex == 'male' or (sex == 'female' and (pregnant is not None and pregnant) or (new_mother is not None and new_mother)):
recommendations.append("Depression Questionnaire: Screening for Depression in Adults -- General adult population")
if sex == 'male' or (sex == 'female' and (pregnant is not None and pregnant)) or (new_mother is not None):
recommendations.append("Sti Screen: Sexually Transmitted Infections: Behavioral Counseling -- Sexually active adolescents and adults at increased risk")
if (age is not None and (age >= 25)) or (new_mother is not None) or (sex == 'male' and (substance_abuse_risk is not None)):
recommendations.append("Skin Exam: Skin Cancer Prevention: Behavioral Counseling -- Adults, Young adults, adolescents, children, and parents of young children")
if (age is not None and (age >= 40 and age <= 75)) and (cardiovascular_risk is not None) and (cardiovascular_risk_7_5_to_10 is not None):
recommendations.append("Heart Disease Questionnaire: Screen for CV risk and consider Statin Use for the Primary Prevention of Cardiovascular Disease in Adults: Preventive Medication -- Adults aged 40 to 75 years who have 1 or more cardiovascular risk factors and an estimated 10-year cardiovascular disease (CVD) risk of 10% or greater")
if sex == 'female' and (pregnant is not None and pregnant) and (ancestry is not None and ancestry == 'BRCA1/2 gene mutation'):
recommendations.append("Family History and Brca Test: BRCA-Related Cancer: Risk Assessment, Genetic Counseling, and Genetic Testing -- Women with a personal or family history of breast, ovarian, tubal, or peritoneal cancer or an ancestry associated with BRCA1/2 gene mutation")
if (age is not None and (age >= 6 and age <= 18)) or (sex == 'pregnant' or (pregnant is not None and pregnant)):
recommendations.append("Tobacco Questionnaire: Tobacco Use in Children and Adolescents: Primary Care Interventions -- School-aged children and adolescents who have not started to use tobacco")
if age is not None and (age >= 18) and (substance_abuse_risk is not None):
recommendations.append("Alcohol Questionnaire: Unhealthy Alcohol Use in Adolescents and Adults: Screening and Behavioral Counseling Interventions -- Adults 18 years or older, including pregnant women")
if age is not None and (age >= 13):
recommendations.append("Drug Abuse Questionnaire: Unhealthy Drug Use: Screening -- Adults age 13 years or older")
if age is not None and (age > 2 and age < 24) and skin_type is not None:
recommendations.append("Skin Exam: Skin Cancer: Counseling -- Fair-skinned individuals aged 6 months to 24 years with a family history of skin cancer or personal history of skin cancer, or who are at increased risk of skin cancer")
return recommendations
def generate_recommendations(age=None, sex=None):
age = f"{age}"
try:
age = int(age)
except ValueError:
print("Invalid age value. Age must be a valid integer.")
sex = f"{sex}"
recommendations = get_recommendations(age, sex)
# Adding subheading
subheading = f"The USPTF recommendations for {age}/{sex} are:"
subheading = f"RECOMMENDATIONS:"
recommendations_with_subheading = [subheading] + recommendations
with open(f'{ocr_files}/Darna_tesseract/USPTF_Intent.txt', 'w') as file:
file.write('\n\n\n'.join(recommendations_with_subheading))
doc = fitz.open() # Create a new PDF
page = doc.new_page()
text = "\n\n\n".join(recommendations_with_subheading)
page.insert_text((72, 72), text)
doc.save(f'{ocr_files}/USPTF.pdf') # Save the PDF
doc.close()
#extract data from the updated fhir file
def extract_lforms_data(json_data):
if isinstance(json_data, str):
data = json.loads(json_data)
else:
data = json_data
extracted_info = {
"date_of_birth": None,
"sex": None,
"allergies": [],
"past_medical_history": [],
"medications": []
}
for item in data.get("items", []):
if item.get("question") == "ABOUT ME":
for subitem in item.get("items", []):
if subitem.get("question") == "DATE OF BIRTH":
extracted_info["date_of_birth"] = subitem.get("value")
elif subitem.get("question") == "BIOLOGICAL SEX":
extracted_info["sex"] = subitem.get("value", {}).get("text")
elif item.get("question") == "ALLERGIES":
for allergy_item in item.get("items", []):
if allergy_item.get("question") == "Allergies and Other Dangerous Reactions":
for subitem in allergy_item.get("items", []):
if subitem.get("question") == "Name" and "value" in subitem:
extracted_info["allergies"].append(subitem["value"]["text"])
elif item.get("question") == "PAST MEDICAL HISTORY:":
for condition_item in item.get("items", []):
if condition_item.get("question") == "PAST MEDICAL HISTORY" and "value" in condition_item:
condition = extract_condition(condition_item)
if condition:
extracted_info["past_medical_history"].append(condition)
elif item.get("question") == "MEDICATIONS:":
medication = {}
for med_item in item.get("items", []):
if med_item.get("question") == "MEDICATIONS":
medication["name"] = extract_med_value(med_item)
elif med_item.get("question") == "Strength":
medication["strength"] = extract_med_value(med_item)
elif med_item.get("question") == "Instructions":
medication["instructions"] = extract_med_value(med_item)
if medication:
extracted_info["medications"].append(medication)
return extracted_info
def extract_condition(condition_item):
if isinstance(condition_item.get("value"), dict):
return condition_item["value"].get("text", "")
elif isinstance(condition_item.get("value"), str):
return condition_item["value"]
return ""
def extract_med_value(med_item):
if "value" not in med_item:
return ""
value = med_item["value"]
if isinstance(value, str):
return value
elif isinstance(value, dict):
return value.get("text", "")
return ""
#######
###nlp_process.py functions
import json
import nltk
import re, os
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
# Ensure NLTK components are downloaded
#nltk.download('punkt')
#nltk.download('stopwords')
#convert text to lowercase and remove fillers
def normalize_text(text):
# Convert text to lowercase and remove ':' and '-'
return re.sub('[: -]', '', text.lower())
def condense_summary_to_tokens(text, token_limit=300):
tokens = word_tokenize(text)
# Select the first 'token_limit' tokens
limited_tokens = tokens[:token_limit]
# Reconstruct the text from these tokens
condensed_text = ' '.join(limited_tokens)
return condensed_text
#write all to a json summary file
def wordcloud_summary(keys, texts, directory):
output_file = f'{directory}/wordcloud_summary.json'
wordcloud_dir = f'{directory}/wordclouds'
try:
with open(output_file, 'r', encoding='utf-8') as file:
existing_data = json.load(file)
except FileNotFoundError:
existing_data = {}
# Ensure the directories exist
os.makedirs(os.path.dirname(output_file), exist_ok=True)
os.makedirs(wordcloud_dir, exist_ok=True)
for i, key in enumerate(keys):
if i < len(texts):
text = texts[i]
# Check if the text contains any words
if text.strip():
existing_data[key] = text
# Attempt to generate word cloud
try:
# Split the text into words
words = text.split()
# Check if there are enough words
if len(words) > 1:
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(text)
# Save the word cloud
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title(f'Word Cloud for {key}')
plt.savefig(f'{wordcloud_dir}/{key}_wordcloud.png')
plt.close()
print(f"Generated word cloud for key: {key}")
else:
print(f"Not enough words to generate word cloud for key: {key}")
except Exception as e:
print(f"Error generating word cloud for key {key}: {str(e)}")
else:
print(f"Skipping empty text for key: {key}")
else:
print(f"No text available for key: {key}")
with open(output_file, 'w', encoding='utf-8') as file:
json.dump(existing_data, file, indent=4, ensure_ascii=False)
#generate list of meds from the files
def load_text_from_json_meds(json_file_path, keys):
normalized_keys = [normalize_text(key) for key in keys]
with open(json_file_path, 'r') as file:
data = json.load(file)
text = []
for json_key, value in data.items():
normalized_json_key = normalize_text(json_key)
if any(normalized_key in normalized_json_key for normalized_key in normalized_keys):
if isinstance(value, str):
text.append(value)
elif isinstance(value, list):
text.extend(str(item) for item in value if item)
elif isinstance(value, dict):
text.extend(str(item) for item in value.values() if item)
else:
text.append(str(value))
combined_text = ' '.join(text)
combined_text = condense_summary_to_tokens(combined_text, 300)
return combined_text
#generate a list of past medical history from the files
def load_text_from_json_pmh(json_file_path, keys):
normalized_keys = [normalize_text(key) for key in keys]
with open(json_file_path, 'r') as file:
data = json.load(file)
text = []
for json_key, value in data.items():
normalized_json_key = normalize_text(json_key)
if any(normalized_key in normalized_json_key for normalized_key in normalized_keys):
if isinstance(value, str):
text.append(value)
elif isinstance(value, list):
text.extend(str(item) for item in value if item)
elif isinstance(value, dict):
text.extend(str(item) for item in value.values() if item)
else:
text.append(str(value))
combined_text = ' '.join(text)
combined_text = condense_summary_to_tokens(combined_text, 300)
return combined_text
#generate a list of screening items from the USPTF file
def load_text_from_json_screening(json_file_path, keys):
normalized_keys = [normalize_text(key) for key in keys]
with open(json_file_path, 'r') as file:
data = json.load(file)
text = []
for json_key, value in data.items():
normalized_json_key = normalize_text(json_key)
if any(normalized_key in normalized_json_key for normalized_key in normalized_keys):
text.append(value)
combined_text_screening=' '.join(text)
#print (combined_text_screening)
return combined_text_screening
def load_text_from_json_summary(json_file_path, keys):
normalized_keys = [normalize_text(key) for key in keys]
with open(json_file_path, 'r') as file:
data = json.load(file)
text = []
for json_key, value in data.items():
normalized_json_key = normalize_text(json_key)
if any(normalized_key in normalized_json_key for normalized_key in normalized_keys):
if isinstance(value, str):
text.append(value)
elif isinstance(value, list):
text.extend(str(item) for item in value if item)
elif isinstance(value, dict):
text.extend(str(item) for item in value.values() if item)
else:
text.append(str(value))
combined_text = ' '.join(text)
combined_text = condense_summary_to_tokens(combined_text, 300)
return combined_text
#iterate json files in directory and call function above
def process_directory_summary(directory, keys):
combined_texts = []
for filename in os.listdir(directory):
if filename.endswith('.json'):
file_path = os.path.join(directory, filename)
print(file_path)
combined_text = load_text_from_json_summary(file_path, keys)
if combined_text: # Only add non-empty strings
combined_texts.append(combined_text)
# Combine all texts into one
final_combined_text = ' '.join(combined_texts)
return final_combined_text
#iterate json files in directory and summarize meds
def process_directory_meds(directory, keys):
combined_texts = []
for filename in os.listdir(directory):
if filename.endswith('.json'):
file_path = os.path.join(directory, filename)
print(file_path)
combined_text = load_text_from_json_meds(file_path, keys)
combined_texts.append(combined_text)
# Combine all texts into one
final_combined_text = ' '.join(combined_texts)
return final_combined_text
#iterate json files in directory and summarize past medical
def process_directory_pmh(directory, keys):
combined_texts = []
for filename in os.listdir(directory):
if filename.endswith('.json'):
file_path = os.path.join(directory, filename)
print(file_path)
combined_text = load_text_from_json_pmh(file_path, keys)
combined_texts.append(combined_text)
# Combine all texts into one
final_combined_text = ' '.join(combined_texts)
return final_combined_text
def preprocess_and_create_wordcloud(text, directory):
# Tokenize and remove stopwords
stop_words = set(stopwords.words('english'))
words = word_tokenize(text)
filtered_words = [word for word in words if word.isalnum() and word.lower() not in stop_words]
# Check if there are any words left after filtering
if not filtered_words:
print("No words left after preprocessing. Skipping word cloud creation.")
return
processed_text = ' '.join(filtered_words)
# Create and display the word cloud
wordcloud = WordCloud(width=800, height=800, background_color='white').generate(processed_text)
plt.figure(figsize=(8, 8), facecolor=None)
plt.imshow(wordcloud)
plt.axis("off")
plt.tight_layout(pad=0)
plt.tight_layout(pad = 0)
# Display the word cloud
#plt.show()
# Save the word cloud image
plt.savefig(f'{directory}darnahi_ocr.png')
#############
pattern = r"\d+\..+?(\d{4};\d+\(\d+\):\d+–\d+\. DOI: .+?\.|.+?ed\., .+?: .+?; \d{4}\. \d+–\d+\.)"
class Document:
def __init__(self, page_content, metadata):
self.page_content = page_content
self.metadata = metadata
def process_pdf(file_path, chunk_size=350):
try:
doc = fitz.open(file_path)
full_text = ""
for page in doc:
text_blocks = page.get_text("dict")["blocks"]
for block in text_blocks:
if 'text' in block:
text = block['text'].strip()
if text:
full_text += text + "\n"
chunks = [full_text[i:i+chunk_size] for i in range(0, len(full_text), chunk_size)]
return chunks
except Exception as e:
print(f"An error occurred: {str(e)}")
return []
def process_json(input_file):
try:
with open(input_file, 'r', encoding='utf-8') as file:
existing_data = json.load(file)
except FileNotFoundError:
print("File not found.")
return []
semantic_snippets = []
for heading, content in existing_data.items():
metadata = {'heading': heading, 'file': input_file}
doc = Document(page_content=content, metadata=metadata)
semantic_snippets.append(doc)
return semantic_snippets
def process_files(directory):
all_semantic_snippets = []
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if filename.endswith('.pdf'):
snippets = process_pdf(file_path)
all_semantic_snippets.extend(snippets)
elif filename.endswith('.json'):
semantic_snippets = process_json(file_path)
all_semantic_snippets.extend(semantic_snippets)
return all_semantic_snippets
def chromadb_embed(directory, collection_name="documents_collection"):
persist_directory = os.path.join(directory, 'Darna_tesseract', 'chroma_storage')
os.makedirs(persist_directory, exist_ok=True)
all_semantic_snippets = str(process_files(directory))
client = chromadb.PersistentClient(path=persist_directory)
collection = client.get_or_create_collection(name=collection_name)
count = collection.count()
print(f"Collection already contains {count} documents")
ids = [str(i) for i in range(count, count + len(all_semantic_snippets))]
for i in tqdm(range(0, len(all_semantic_snippets), 100), desc="Adding documents"):
batch_snippets = all_semantic_snippets[i:i+100]
batch_metadatas = []
for snippet in batch_snippets:
metadata = {"filename": "summary", "heading": "summary_heading"} if not isinstance(snippet, Document) else snippet.metadata
batch_metadatas.append(metadata)
collection.add(ids=ids[i:i+100], documents=[s if isinstance(s, str) else s.page_content for s in batch_snippets], metadatas=batch_metadatas)
new_count = collection.count()
print(f"Added {new_count - count} documents")
#######################################
#########pdf_sectionreader.py
import os
import fitz
import pandas as pd
import json
from unidecode import unidecode
global_heading_content_dict = {} # Global dictionary to accumulate data
def process_pdf_files(directory):
for filename in os.listdir(directory):
if filename.endswith('.pdf'):
file_path = os.path.join(directory, filename)
with fitz.open(file_path) as doc:
print(f"Processing {filename}...")
extract_and_tag_text(doc)
# Generate and save output after processing all files
generate_output(global_heading_content_dict, directory)
def extract_and_tag_text(doc):
block_dict, page_num = {}, 1
for page in doc:
file_dict = page.get_text('dict')
block = file_dict['blocks']
block_dict[page_num] = block
page_num += 1
rows = []
for page_num, blocks in block_dict.items():
for block in blocks:
if block['type'] == 0:
for line in block['lines']:
for span in line['spans']:
xmin, ymin, xmax, ymax = list(span['bbox'])
font_size = span['size']
text = unidecode(span['text'])
span_font = span['font']
is_upper = text.isupper()
is_bold = "bold" in span_font.lower()
if text.strip() != "":
rows.append((xmin, ymin, xmax, ymax, text, is_upper, is_bold, span_font, font_size))
span_df = pd.DataFrame(rows, columns=['xmin', 'ymin', 'xmax', 'ymax', 'text', 'is_upper', 'is_bold', 'span_font', 'font_size'])
common_font_size = span_df['font_size'].mode().iloc[0]
span_df['tag'] = span_df.apply(assign_tag, axis=1, common_font_size=common_font_size)
update_global_dict(span_df)
def assign_tag(row, common_font_size):
if any(char.isdigit() for char in row['text']):
return 'p'
elif row['font_size'] > common_font_size and row['is_bold'] and row['is_upper']:
return 'h1'
elif row['is_bold'] or row['is_upper'] or row['font_size'] > common_font_size:
return 'h2'
else:
return 'p'
def update_global_dict(span_df):
tmp = []
current_heading = None
for index, span_row in span_df.iterrows():
text, tag = span_row.text.strip(), span_row.tag
if 'h' in tag:
if current_heading is not None:
existing_text = global_heading_content_dict.get(current_heading, "")
global_heading_content_dict[current_heading] = existing_text + '\n'.join(tmp).strip()
current_heading = text
tmp = []
else:
tmp.append(text)
if current_heading is not None:
existing_text = global_heading_content_dict.get(current_heading, "")
global_heading_content_dict[current_heading] = existing_text + '\n'.join(tmp).strip()
def generate_output(heading_content_dict, directory):
text_df = pd.DataFrame(list(heading_content_dict.items()), columns=['heading', 'content'])
#text_df.to_excel(f'{directory}/combined_output.xlsx', index=False, engine='openpyxl')
json_data = json.dumps(heading_content_dict, indent=4, ensure_ascii=False)
with open(f'{directory}/Darna_tesseract/combined_output.json', 'w', encoding='utf-8') as f:
f.write(json_data)
with open(f'{directory}/combined_output.json', 'w', encoding='utf-8') as f:
f.write(json_data)
import shutil
def whitelist_directory(directory, whitelist):
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path) and filename not in whitelist:
try:
os.remove(file_path)
print(f"Removed: {file_path}")
except Exception as e:
print(f"Error removing {file_path}: {e}")
###########################################
#write files to pdf
def write_text_to_pdf(directory, text):
doc = fitz.open() # Create a new PDF
page = doc.new_page() # Add a new page
page.insert_text((72, 72), text) # Position (x, y) and text
doc.save(f'{directory}/fhir_data.pdf') # Save the PDF
doc.close()
def run_analyzer(age, sex, ocr_files, formatted_ignore_words):
try:
# Process OCR files with provided input
print("Processing OCR files")
process_ocr_files(ocr_files, age)
# Create collated file
collate_images(ocr_files, f"{ocr_files}/Darna_tesseract")
# Deidentify records
print("Deidentifying records")
deidentify_records(ocr_files, formatted_ignore_words)
# Generate recommendations with provided age and sex
print("Generating recommendations")
recommendations = generate_recommendations(age=age, sex=sex)
# Extract data from FHIR file and create PDF
directory = ocr_files
#folderpath is global directory
with open(f'{folderpath}/summary/chart.json', 'r') as file:
json_data = json.load(file)
extracted_info = extract_lforms_data(json.dumps(json_data))
print(extracted_info)
json_output = json.dumps(extracted_info, indent=4)
write_text_to_pdf(directory, str(extracted_info))
final_directory = f'{directory}/Darna_tesseract/'
# Process PDF files
process_pdf_files(directory)
# Write the JSON output to a file
with open(f'{directory}/fhir_output.json', 'w', encoding='utf-8') as f:
f.write(json_output)
# NLP Processing for summary, past medical history, medications, and screening
json_file_path = f'{directory}/combined_output.json'
keys_pmh = ['PMH', 'medical', 'past medical history', 'surgical', 'past']
keys_meds = ['medications', 'MEDICATIONS:', 'medicine', 'meds']
keys_summary = ['HPI', 'history', 'summary']
keys_screening = ['RECS', 'RECOMMENDATIONS']
# Process text data and create word clouds
text_summary = process_directory_summary(directory, keys_summary)
preprocess_and_create_wordcloud(text_summary, final_directory)
text_meds = process_directory_meds(directory, keys_meds)
text_screening = load_text_from_json_screening(json_file_path, keys_screening)
text_pmh = process_directory_pmh(directory, keys_pmh)
# Write processed texts to JSON
keys = ("darnahi_summary", "darnahi_past_medical_history", "darnahi_medications", "darnahi_screening")
texts = (text_summary, text_pmh, text_meds, text_screening)
wordcloud_summary(keys, texts, final_directory)
# CHROMA embedding
chromadb_embed(directory)
# Cleanup OCR files, but leave Darna_tesseract files
whitelist = ["combined_output.json"]
whitelist_directory(directory, whitelist)
except Exception as e:
print(f"Error during processing: {e}")
##CALL ANALYZER
run_analyzer(age, sex, ocr_files, formatted_ignore_words)
##Take files and add to sqlite db; extract metadata; create ai metadata and create a AI function to make extracted data meaningful
class MetadataGenerator:
def __init__(self, model='gemma3:4b'):
self.model = model
async def get_metadata(self, file_path: str, content: str) -> str:
"""Generate metadata for file content using Ollama"""
messages = [
{"role": "system", "content": "You are an analyst. Provide a summary and keywords. To the point."},
{"role": "user", "content": f"File: {os.path.basename(file_path)}\nContent: {content}\n\nProvide only JSON format: {{\"summary\": \"...\", \"keywords\": [\"...\", \"...\"]}}"},
]
try:
OLLAMA_HOST = os.environ.get('OLLAMA_HOST', 'http://localhost:11434')
client = AsyncClient(host=OLLAMA_HOST)
response_stream = await client.chat(model=self.model, messages=messages, stream=True)
full_response = ""
async for part in response_stream:
full_response += part['message']['content']
return full_response
except Exception as e:
return f"{{\"error\": \"{str(e)}\"}}"
metadata_generator = MetadataGenerator()
"""
# Process OCR files with provided input
print("process ocr files")
process_ocr_files(ocr_files, age)
#doesnt work
#create collated file
collate_images(ocr_files, f"{ocr_files}/Darna_tesseract")
# Deidentify records
print("debug deidentify records")
deidentify_records()
# Generate recommendations with provided age and sex
print("debug generate records")
recommendations = generate_recommendations(age=age, sex=sex)
#extract data from fhir file and make pdf
directory = ocr_files
with open(f'{folderpath}/summary/chart.json', 'r') as file:
json_data = json.load(file)
# Extract information using function above from fhir document and write to pdf and json file
extracted_info = extract_lforms_data(json.dumps(json_data))
print(extracted_info)
#extracted_info = extract_info(json_data)
json_output = json.dumps(extracted_info, indent=4)
#extracted_info = extract_info(json_data)
write_text_to_pdf(directory, str(extracted_info))
final_directory= f'{directory}/Darna_tesseract/'
#calls the CALL_FILE pdf_sectionreader
process_pdf_files(directory)
# Write the JSON output to a file and pdf file (2 lines above)
with open(f'{directory}/fhir_output.json', 'w', encoding='utf-8') as f:
f.write(json_output)
#CALL FILE NLP_PROCESS
# Usage nlp_process
json_file_path = f'{directory}/combined_output.json'
#json_file_path = 'processed_data2.json'
#keys_summary = ['HPI', 'History of presenting illness', 'History of', 'summary']
keys_pmh = ['PMH', 'medical', 'past medical history', 'surgical', 'past'] #extracts past medical history
keys_meds = ['medications', 'MEDICATIONS:', 'medicine', 'meds'] #extracts medications
keys_summary = ['HPI', 'history', 'summary']
keys_screening= ['RECS', 'RECOMMENDATIONS']
#call functions and write to wordcloud and creat wordcloud.png file
text_summary = process_directory_summary(directory, keys_summary)
#creates wordcloud of uploaded files
preprocess_and_create_wordcloud(text_summary, final_directory)
text_meds = process_directory_meds(directory, keys_meds)#saves to medications in json
text_screening = load_text_from_json_screening(json_file_path, keys_screening)#saves to screening in json
text_pmh = process_directory_pmh(directory, keys_pmh)#saves to past history in json
#write to json using "keys":"texts"
keys= ("darnahi_summary", "darnahi_past_medical_history", "darnahi_medications", "darnahi_screening")
texts= (text_summary, text_pmh, text_meds, text_screening)
wordcloud_summary(keys, texts, final_directory)
#CHROMA MINER # Adjust this path to your directory
chromadb_embed(directory)
#remove files from ocr_files- cleanup but leave Darna_tesseract files
subprocess.run(f'find {directory} -maxdepth 1 -type f -exec rm {{}} +', shell=True)
"""