Spaces:
Runtime error
Runtime error
from collections import defaultdict | |
import random | |
from typing import List | |
import tarfile | |
import shutil | |
import typer | |
from pathlib import Path | |
import spacy | |
from spacy.language import Language | |
from spacy.tokens import Doc, DocBin, Span | |
from spacy.util import filter_spans | |
from wasabi import msg | |
from spacy.tokenizer import Tokenizer | |
from spacy.util import compile_prefix_regex, compile_infix_regex, compile_suffix_regex | |
import functools | |
random.seed(42) | |
def main( | |
input_dir: Path = typer.Argument(..., exists=True), | |
output_dir: Path = typer.Argument(...), | |
beth_train_tar_name: str = "i2b2_Beth_Train_Release.tar.gz", | |
partners_train_tar_name: str = "i2b2_Partners_Train_Release.tar.gz", | |
test_zip_name: str = "Task_1C.zip", | |
merge_docs: bool = True, | |
): | |
"""Extract and preprocess raw n2c2 2011 Challenge data into spaCy DocBin format. | |
input_dir (Path): Input directory with raw downloads from Harvard DBMI Portal. | |
output_dir (Path): Output directory to save spaCy .docbin files to. | |
beth_train_tar_name (str): Filename of downloaded tarfile for Beth Training Data. | |
partners_train_tar_name (str): Filename of downloaded tarfile for Partners Training Data. | |
test_zip_name (str): Filename of downloaded tarfile for n2c2 Test Data. | |
merge_docs (bool): If False, create spaCy docs for each line of each medical record | |
""" | |
# Unpack compressed data files | |
msg.info("Extracting raw data.") | |
beth_train_tar_path = input_dir / beth_train_tar_name | |
partners_train_tar_path = input_dir / partners_train_tar_name | |
test_zip_path = input_dir / test_zip_name | |
#for path in [beth_train_tar_path, partners_train_tar_path]: | |
# if path.name.endswith("tar.gz"): | |
# msg.text(f"Extracting {path}") | |
# tar = tarfile.open(path, "r:gz") | |
# tar.extractall(path.parent) | |
# tar.close() | |
#shutil.unpack_archive(test_zip_path, input_dir / test_zip_name.replace(".zip", "")) | |
# preprocess data | |
msg.info("Converting to spaCy Doc objects.") | |
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp: | |
fp.write(str((input_dir / "Beth_Train").stem)+'\n') | |
beth_train_docs = docs_from_many_clinical_records( | |
input_dir / "Beth_Train", merge_docs=merge_docs | |
) | |
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp: | |
fp.write(str((input_dir / "Partners_Train").stem)+'\n') | |
partners_train_docs = docs_from_many_clinical_records( | |
input_dir / "Partners_Train", merge_docs=merge_docs | |
) | |
train_docs = beth_train_docs + partners_train_docs | |
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp: | |
fp.write(str((input_dir / "Task_1C/i2b2_Test/i2b2_Beth_Test").stem)+'\n') | |
beth_test_docs = docs_from_many_clinical_records( | |
input_dir / "Task_1C/i2b2_Test/i2b2_Beth_Test", merge_docs=merge_docs | |
) | |
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp: | |
fp.write(str((input_dir / "Task_1C/i2b2_Test/i2b2_Partners_Test").stem)+'\n') | |
partners_test_docs = docs_from_many_clinical_records( | |
input_dir / "Task_1C/i2b2_Test/i2b2_Partners_Test", merge_docs=merge_docs | |
) | |
test_docs = beth_test_docs + partners_test_docs | |
random.shuffle(train_docs) | |
split_idx = int(len(train_docs) * 0.8) | |
train_docs, dev_docs = train_docs[:split_idx], train_docs[split_idx:] | |
msg.good(f"Num Train Docs: {len(train_docs)}") | |
msg.good(f"Num Dev Docs: {len(dev_docs)}") | |
msg.good(f"Num Test Docs: {len(test_docs)}") | |
with msg.loading(f"Saving docs to: {output_dir}..."): | |
DocBin(docs=train_docs).to_disk(output_dir / "train.spacy") | |
DocBin(docs=dev_docs).to_disk(output_dir / "dev.spacy") | |
DocBin(docs=test_docs).to_disk(output_dir / "test.spacy") | |
msg.good("Done.") | |
def docs_from_clinical_record( | |
lines: List[str], annotations: List[str], nlp: Language, merge_docs: bool = False | |
) -> List[Doc]: | |
"""Create spaCy docs from a single annotated medical record in the n2c2 2011 format | |
lines (List[str]): Text of the clinical record as a list separated by newlines | |
annotations (List[str]): Raw entity annotations in the n2c2 2011 format | |
nlp (Language): spaCy Language object. Defaults to spacy.blank("en"). | |
merge_docs (bool): If True: merge all lines into a single spaCy doc so | |
there is only 1 element in the output array. | |
If False: create a spaCy doc for each line in the original record | |
RETURNS (List[Doc]): List of spaCy Doc objects with entity spans set | |
""" | |
difference = [] | |
docs = [] | |
spans_by_line = defaultdict(list) | |
nlp.Defaults.prefixes = [signs for signs in nlp.Defaults.prefixes if ':' not in signs and '#' not in signs and '+' not in signs and '(' not in signs and ')' not in signs and '*' not in signs and "'" not in signs and "%" not in signs and "_" not in signs and ";" not in signs and ">" not in signs and "," not in signs and "&" not in signs and '"' not in signs and "<" not in signs ] | |
infixes = nlp.Defaults.prefixes + [r"[-]~"] | |
infix_re = spacy.util.compile_infix_regex(infixes) | |
def custom_tokenizer(nlp): | |
return Tokenizer(nlp.vocab, infix_finditer=infix_re.finditer) | |
nlp.tokenizer = custom_tokenizer(nlp) | |
entities = {} | |
for row in annotations: | |
row = row.split("||") | |
text_info = row[0] | |
type_info = row[1] | |
offset_start = text_info.split(" ")[-2] | |
offset_end = text_info.split(" ")[-1] | |
start_line, word_start = offset_start.split(":") | |
end_line, word_end = offset_end.split(":") | |
label = type_info.split('"')[-2] | |
if start_line != end_line: | |
# This happens very infrequently (only about 10 times in total) | |
# so we just skip these annotations | |
continue | |
else: | |
spans_by_line[int(start_line)].append( | |
(int(word_start), int(word_end), label) | |
) | |
if start_line in entities : | |
entities[start_line].append(text_info.split('"')[1]) | |
else : | |
entities[start_line] = [text_info.split('"')[1]] | |
extracted_entities = {} | |
for i, line in enumerate(lines): | |
n = i + 1 | |
line = line.replace(" "," ") | |
doc = nlp.make_doc(line) | |
if n in spans_by_line: | |
ents = [ | |
Span(doc, start, end + 1, label=label) | |
for (start, end, label) in spans_by_line[n] | |
] | |
ents = [ | |
e for e in ents if bool(e.text.strip()) and e.text.strip() == e.text | |
] | |
doc.ents = filter_spans(ents) | |
extracted_entities[str(n)] = [ e.text for e in ents if bool(e.text.strip()) and e.text.strip() == e.text ] | |
docs.append(doc) | |
for key , value in entities.items() : | |
if key in extracted_entities : | |
if functools.reduce(lambda x, y : x and y, map(lambda p, q: p.lower() != q.lower(),entities[key],extracted_entities[key]), True): | |
difference = difference+[key]+entities[key]+extracted_entities[key] | |
else : | |
difference = difference+[key+" Key not present"]+entities[key] | |
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp: | |
fp.write('\n'.join(difference)) | |
return [Doc.from_docs(docs)] if merge_docs else docs | |
def docs_from_many_clinical_records( | |
base_path: Path, nlp: Language = spacy.blank("en"), merge_docs: bool = True | |
) -> List[Doc]: | |
"""Convert raw n2c2 annotated clinical records into a list of | |
spaCy Doc objects to be ready to be used in training | |
base_path (Path): Root path to the raw data | |
nlp (Language): spaCy Language object. Defaults to spacy.blank("en"). | |
merge_docs (bool): If True: merge all lines into a single spaCy doc so | |
there is only 1 element in the output array. | |
If False: create a spaCy doc for each line in the original record | |
RETURNS (List[Doc]): List of spaCy Doc objects with entity spans set | |
""" | |
all_docs = [] | |
concept_paths = sorted((base_path / "concepts").glob("*.txt.con")) | |
document_paths = sorted((base_path / "docs").glob("*.txt")) | |
for con_path, doc_path in zip(concept_paths, document_paths): | |
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp: | |
fp.write('\n'+str(con_path.stem)) | |
annotations = con_path.open().read().splitlines() | |
lines = doc_path.open().read().splitlines() | |
docs = docs_from_clinical_record(lines, annotations, nlp, merge_docs=merge_docs) | |
all_docs += docs | |
return all_docs | |
if __name__ == "__main__": | |
typer.run(main) | |