AmitGarage's picture
Upload 8 files
3ab8bd6
from collections import defaultdict
import random
from typing import List
import tarfile
import shutil
import typer
from pathlib import Path
import spacy
from spacy.language import Language
from spacy.tokens import Doc, DocBin, Span
from spacy.util import filter_spans
from wasabi import msg
from spacy.tokenizer import Tokenizer
from spacy.util import compile_prefix_regex, compile_infix_regex, compile_suffix_regex
import functools
random.seed(42)
def main(
input_dir: Path = typer.Argument(..., exists=True),
output_dir: Path = typer.Argument(...),
beth_train_tar_name: str = "i2b2_Beth_Train_Release.tar.gz",
partners_train_tar_name: str = "i2b2_Partners_Train_Release.tar.gz",
test_zip_name: str = "Task_1C.zip",
merge_docs: bool = True,
):
"""Extract and preprocess raw n2c2 2011 Challenge data into spaCy DocBin format.
input_dir (Path): Input directory with raw downloads from Harvard DBMI Portal.
output_dir (Path): Output directory to save spaCy .docbin files to.
beth_train_tar_name (str): Filename of downloaded tarfile for Beth Training Data.
partners_train_tar_name (str): Filename of downloaded tarfile for Partners Training Data.
test_zip_name (str): Filename of downloaded tarfile for n2c2 Test Data.
merge_docs (bool): If False, create spaCy docs for each line of each medical record
"""
# Unpack compressed data files
msg.info("Extracting raw data.")
beth_train_tar_path = input_dir / beth_train_tar_name
partners_train_tar_path = input_dir / partners_train_tar_name
test_zip_path = input_dir / test_zip_name
#for path in [beth_train_tar_path, partners_train_tar_path]:
# if path.name.endswith("tar.gz"):
# msg.text(f"Extracting {path}")
# tar = tarfile.open(path, "r:gz")
# tar.extractall(path.parent)
# tar.close()
#shutil.unpack_archive(test_zip_path, input_dir / test_zip_name.replace(".zip", ""))
# preprocess data
msg.info("Converting to spaCy Doc objects.")
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write(str((input_dir / "Beth_Train").stem)+'\n')
beth_train_docs = docs_from_many_clinical_records(
input_dir / "Beth_Train", merge_docs=merge_docs
)
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write(str((input_dir / "Partners_Train").stem)+'\n')
partners_train_docs = docs_from_many_clinical_records(
input_dir / "Partners_Train", merge_docs=merge_docs
)
train_docs = beth_train_docs + partners_train_docs
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write(str((input_dir / "Task_1C/i2b2_Test/i2b2_Beth_Test").stem)+'\n')
beth_test_docs = docs_from_many_clinical_records(
input_dir / "Task_1C/i2b2_Test/i2b2_Beth_Test", merge_docs=merge_docs
)
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write(str((input_dir / "Task_1C/i2b2_Test/i2b2_Partners_Test").stem)+'\n')
partners_test_docs = docs_from_many_clinical_records(
input_dir / "Task_1C/i2b2_Test/i2b2_Partners_Test", merge_docs=merge_docs
)
test_docs = beth_test_docs + partners_test_docs
random.shuffle(train_docs)
split_idx = int(len(train_docs) * 0.8)
train_docs, dev_docs = train_docs[:split_idx], train_docs[split_idx:]
msg.good(f"Num Train Docs: {len(train_docs)}")
msg.good(f"Num Dev Docs: {len(dev_docs)}")
msg.good(f"Num Test Docs: {len(test_docs)}")
with msg.loading(f"Saving docs to: {output_dir}..."):
DocBin(docs=train_docs).to_disk(output_dir / "train.spacy")
DocBin(docs=dev_docs).to_disk(output_dir / "dev.spacy")
DocBin(docs=test_docs).to_disk(output_dir / "test.spacy")
msg.good("Done.")
def docs_from_clinical_record(
lines: List[str], annotations: List[str], nlp: Language, merge_docs: bool = False
) -> List[Doc]:
"""Create spaCy docs from a single annotated medical record in the n2c2 2011 format
lines (List[str]): Text of the clinical record as a list separated by newlines
annotations (List[str]): Raw entity annotations in the n2c2 2011 format
nlp (Language): spaCy Language object. Defaults to spacy.blank("en").
merge_docs (bool): If True: merge all lines into a single spaCy doc so
there is only 1 element in the output array.
If False: create a spaCy doc for each line in the original record
RETURNS (List[Doc]): List of spaCy Doc objects with entity spans set
"""
difference = []
docs = []
spans_by_line = defaultdict(list)
nlp.Defaults.prefixes = [signs for signs in nlp.Defaults.prefixes if ':' not in signs and '#' not in signs and '+' not in signs and '(' not in signs and ')' not in signs and '*' not in signs and "'" not in signs and "%" not in signs and "_" not in signs and ";" not in signs and ">" not in signs and "," not in signs and "&" not in signs and '"' not in signs and "<" not in signs ]
infixes = nlp.Defaults.prefixes + [r"[-]~"]
infix_re = spacy.util.compile_infix_regex(infixes)
def custom_tokenizer(nlp):
return Tokenizer(nlp.vocab, infix_finditer=infix_re.finditer)
nlp.tokenizer = custom_tokenizer(nlp)
entities = {}
for row in annotations:
row = row.split("||")
text_info = row[0]
type_info = row[1]
offset_start = text_info.split(" ")[-2]
offset_end = text_info.split(" ")[-1]
start_line, word_start = offset_start.split(":")
end_line, word_end = offset_end.split(":")
label = type_info.split('"')[-2]
if start_line != end_line:
# This happens very infrequently (only about 10 times in total)
# so we just skip these annotations
continue
else:
spans_by_line[int(start_line)].append(
(int(word_start), int(word_end), label)
)
if start_line in entities :
entities[start_line].append(text_info.split('"')[1])
else :
entities[start_line] = [text_info.split('"')[1]]
extracted_entities = {}
for i, line in enumerate(lines):
n = i + 1
line = line.replace(" "," ")
doc = nlp.make_doc(line)
if n in spans_by_line:
ents = [
Span(doc, start, end + 1, label=label)
for (start, end, label) in spans_by_line[n]
]
ents = [
e for e in ents if bool(e.text.strip()) and e.text.strip() == e.text
]
doc.ents = filter_spans(ents)
extracted_entities[str(n)] = [ e.text for e in ents if bool(e.text.strip()) and e.text.strip() == e.text ]
docs.append(doc)
for key , value in entities.items() :
if key in extracted_entities :
if functools.reduce(lambda x, y : x and y, map(lambda p, q: p.lower() != q.lower(),entities[key],extracted_entities[key]), True):
difference = difference+[key]+entities[key]+extracted_entities[key]
else :
difference = difference+[key+" Key not present"]+entities[key]
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write('\n'.join(difference))
return [Doc.from_docs(docs)] if merge_docs else docs
def docs_from_many_clinical_records(
base_path: Path, nlp: Language = spacy.blank("en"), merge_docs: bool = True
) -> List[Doc]:
"""Convert raw n2c2 annotated clinical records into a list of
spaCy Doc objects to be ready to be used in training
base_path (Path): Root path to the raw data
nlp (Language): spaCy Language object. Defaults to spacy.blank("en").
merge_docs (bool): If True: merge all lines into a single spaCy doc so
there is only 1 element in the output array.
If False: create a spaCy doc for each line in the original record
RETURNS (List[Doc]): List of spaCy Doc objects with entity spans set
"""
all_docs = []
concept_paths = sorted((base_path / "concepts").glob("*.txt.con"))
document_paths = sorted((base_path / "docs").glob("*.txt"))
for con_path, doc_path in zip(concept_paths, document_paths):
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write('\n'+str(con_path.stem))
annotations = con_path.open().read().splitlines()
lines = doc_path.open().read().splitlines()
docs = docs_from_clinical_record(lines, annotations, nlp, merge_docs=merge_docs)
all_docs += docs
return all_docs
if __name__ == "__main__":
typer.run(main)