from pdfminer.high_level import extract_pages from pdfminer.layout import LTTextContainer from tqdm import tqdm import re import gradio as gr import os import accelerate import spaces import subprocess from huggingface_hub import hf_hub_download from llama_cpp import Llama from huggingface_hub import login login(token = os.getenv('HF_TOKEN')) # subprocess.run('pip install llama-cpp-python==0.2.75 --extra-index-url https://abetlen.github.io/llama-cpp-python/whl/cu124', shell=True) # subprocess.run('pip install llama-cpp-agent==0.2.10', shell=True) repo_id = "srijaydeshpande/Deid-Fine-Tuned" model_id = "deid_finetuned.Q4_K_M.gguf" hf_hub_download( repo_id="srijaydeshpande/Deid-Fine-Tuned", filename="deid_finetuned.Q4_K_M.gguf", local_dir = "./models" ) # hf_hub_download( # repo_id="QuantFactory/Meta-Llama-3-8B-Instruct-GGUF", # filename="Meta-Llama-3-8B-Instruct.Q8_0.gguf", # local_dir = "./models" # ) # hf_hub_download( # repo_id="bartowski/Meta-Llama-3-70B-Instruct-GGUF", # filename="Meta-Llama-3-70B-Instruct-Q3_K_M.gguf", # local_dir = "./models" # ) def process_document(pdf_path): extracted_pages = extract_pages(pdf_path) page2content = {} for extracted_page in tqdm(extracted_pages): page_id = extracted_page.pageid content = process_page(extracted_page) page2content[page_id] = content return page2content def process_page(extracted_page): content = [] elements = [element for element in extracted_page._objs] elements.sort(key=lambda a: a.y1, reverse=True) for i, element in enumerate(elements): if isinstance(element, LTTextContainer): line_text = extract_text_and_normalize(element) content.append(line_text) content = re.sub('\n+', '\n', ''.join(content)) return content def extract_text_and_normalize(element): # Extract text from line and split it with new lines line_texts = element.get_text().split('\n') norm_text = '' for line_text in line_texts: line_text = line_text.strip() if not line_text: line_text = '\n' else: line_text = re.sub('\s+', ' ', line_text) if not re.search('[\w\d\,\-]', line_text[-1]): line_text += '\n' else: line_text += ' ' norm_text += line_text return norm_text def txt_to_html(text): html_content = "" for line in text.split('\n'): html_content += "

{}

".format(line.strip()) html_content += "" return html_content def deidentify_doc(llm, pdftext, maxtokens, temperature, top_probability): prompt = "In the following text, perform the following actions: 1. Replace only the calendar dates with term [date]. Example: if input is 'Date of birth: 15/5/1959 calculated BP (Systolic 158.00 mm, Diastolic 124.95 mm)' output should be 'Date of birth: [date] calculated BP (Systolic 158.00 mm, Diastolic 124.95 mm)' 2. Replace location or address, such as '3970 Longview Drive, CV36HE' with term [address]. Replace complete GP address, such as 'Phanton Medical Centre, Birmingham, CV36HE' with term [address]. It is important that all addresses are completely replaced with [address]. 3. Replace any person name with term [name]. It is important that all person names are replaced with term [name]. Remove any gender terms 'male' or 'female' if exists. 4. Replace the nhs number and the case note number with term [ID]. Replace Hospital number with [ID]. 4. Replace age of person with [age]. It is important that all age numbers are completely replaced with [age]." output = llm.create_chat_completion( messages=[ {"from": "user", "value": prompt + ' Text: ' + pdftext}, ], max_tokens=maxtokens, temperature=temperature ) output = output['choices'][0]['message']['content'] # Remove starting header string in output find_index = output.find(' '.join(pdftext.split()[:3])) if find_index != -1: output = output[find_index:].strip() last_index = output.rfind(' '.join(pdftext.split()[-3:])) if last_index != -1: output = output[:last_index].strip() output = llm.create_chat_completion( messages=[ {"from": "user", "value": prompt + ' Text: ' + output}, ], max_tokens=maxtokens, temperature=temperature ) output = output['choices'][0]['message']['content'] # Remove starting header string in output find_index = output.find(' '.join(pdftext.split()[:3])) if find_index != -1: output = output[find_index:].strip() last_text_to_find = ' '.join(pdftext.split()[-2:]) last_index = output.rfind(last_text_to_find) if last_index != -1 and last_index>(len(pdftext)/2): output = output[:last_index+len(last_text_to_find)].strip() return output @spaces.GPU(duration=80) def pdf_to_text(files, maxtokens=2048, temperature=0, top_probability=0.95): files=[files] llm = Llama( model_path="models/" + model_id, flash_attn=True, n_gpu_layers=81, n_batch=1024, n_ctx=8192, ) for file in files: if not file: return 'Please provide a valid PDF' file_name = os.path.basename(file) file_name_splt = file_name.split('.') if (len(file_name_splt) > 1 and file_name_splt[1] == 'pdf'): page2content = process_document(file) anonymized_text = '' for page_id in page2content: pdftext = page2content[page_id] anonymized_text += deidentify_doc(llm, pdftext, maxtokens, temperature, top_probability) anonymized_text += '\n\n\n' return anonymized_text css = ".gradio-container {background: 'logo.png'}" temp_slider = gr.Slider(minimum=0, maximum=2, value=0.9, label="Temperature Value") prob_slider = gr.Slider(minimum=0, maximum=1, value=0.95, label="Max Probability Value") max_tokens = gr.Number(value=600, label="Max Tokens") input_folder = gr.File(file_count='multiple') input_folder_text = gr.Textbox(label='Enter output folder path') output_text = gr.Textbox() output_path_component = gr.File(label="Select Output Path") iface = gr.Interface( fn=pdf_to_text, inputs=['file'], outputs="text", title='Histofy EndoDeID (Endoscopy Report De-Identification)', description="This application assists to remove personal information from the uploaded clinical report", theme=gr.themes.Soft(), ) iface.launch()