Spaces:
Runtime error
Runtime error
File size: 8,746 Bytes
3ab8bd6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
from collections import defaultdict
import random
from typing import List
import tarfile
import shutil
import typer
from pathlib import Path
import spacy
from spacy.language import Language
from spacy.tokens import Doc, DocBin, Span
from spacy.util import filter_spans
from wasabi import msg
from spacy.tokenizer import Tokenizer
from spacy.util import compile_prefix_regex, compile_infix_regex, compile_suffix_regex
import functools
random.seed(42)
def main(
input_dir: Path = typer.Argument(..., exists=True),
output_dir: Path = typer.Argument(...),
beth_train_tar_name: str = "i2b2_Beth_Train_Release.tar.gz",
partners_train_tar_name: str = "i2b2_Partners_Train_Release.tar.gz",
test_zip_name: str = "Task_1C.zip",
merge_docs: bool = True,
):
"""Extract and preprocess raw n2c2 2011 Challenge data into spaCy DocBin format.
input_dir (Path): Input directory with raw downloads from Harvard DBMI Portal.
output_dir (Path): Output directory to save spaCy .docbin files to.
beth_train_tar_name (str): Filename of downloaded tarfile for Beth Training Data.
partners_train_tar_name (str): Filename of downloaded tarfile for Partners Training Data.
test_zip_name (str): Filename of downloaded tarfile for n2c2 Test Data.
merge_docs (bool): If False, create spaCy docs for each line of each medical record
"""
# Unpack compressed data files
msg.info("Extracting raw data.")
beth_train_tar_path = input_dir / beth_train_tar_name
partners_train_tar_path = input_dir / partners_train_tar_name
test_zip_path = input_dir / test_zip_name
#for path in [beth_train_tar_path, partners_train_tar_path]:
# if path.name.endswith("tar.gz"):
# msg.text(f"Extracting {path}")
# tar = tarfile.open(path, "r:gz")
# tar.extractall(path.parent)
# tar.close()
#shutil.unpack_archive(test_zip_path, input_dir / test_zip_name.replace(".zip", ""))
# preprocess data
msg.info("Converting to spaCy Doc objects.")
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write(str((input_dir / "Beth_Train").stem)+'\n')
beth_train_docs = docs_from_many_clinical_records(
input_dir / "Beth_Train", merge_docs=merge_docs
)
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write(str((input_dir / "Partners_Train").stem)+'\n')
partners_train_docs = docs_from_many_clinical_records(
input_dir / "Partners_Train", merge_docs=merge_docs
)
train_docs = beth_train_docs + partners_train_docs
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write(str((input_dir / "Task_1C/i2b2_Test/i2b2_Beth_Test").stem)+'\n')
beth_test_docs = docs_from_many_clinical_records(
input_dir / "Task_1C/i2b2_Test/i2b2_Beth_Test", merge_docs=merge_docs
)
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write(str((input_dir / "Task_1C/i2b2_Test/i2b2_Partners_Test").stem)+'\n')
partners_test_docs = docs_from_many_clinical_records(
input_dir / "Task_1C/i2b2_Test/i2b2_Partners_Test", merge_docs=merge_docs
)
test_docs = beth_test_docs + partners_test_docs
random.shuffle(train_docs)
split_idx = int(len(train_docs) * 0.8)
train_docs, dev_docs = train_docs[:split_idx], train_docs[split_idx:]
msg.good(f"Num Train Docs: {len(train_docs)}")
msg.good(f"Num Dev Docs: {len(dev_docs)}")
msg.good(f"Num Test Docs: {len(test_docs)}")
with msg.loading(f"Saving docs to: {output_dir}..."):
DocBin(docs=train_docs).to_disk(output_dir / "train.spacy")
DocBin(docs=dev_docs).to_disk(output_dir / "dev.spacy")
DocBin(docs=test_docs).to_disk(output_dir / "test.spacy")
msg.good("Done.")
def docs_from_clinical_record(
lines: List[str], annotations: List[str], nlp: Language, merge_docs: bool = False
) -> List[Doc]:
"""Create spaCy docs from a single annotated medical record in the n2c2 2011 format
lines (List[str]): Text of the clinical record as a list separated by newlines
annotations (List[str]): Raw entity annotations in the n2c2 2011 format
nlp (Language): spaCy Language object. Defaults to spacy.blank("en").
merge_docs (bool): If True: merge all lines into a single spaCy doc so
there is only 1 element in the output array.
If False: create a spaCy doc for each line in the original record
RETURNS (List[Doc]): List of spaCy Doc objects with entity spans set
"""
difference = []
docs = []
spans_by_line = defaultdict(list)
nlp.Defaults.prefixes = [signs for signs in nlp.Defaults.prefixes if ':' not in signs and '#' not in signs and '+' not in signs and '(' not in signs and ')' not in signs and '*' not in signs and "'" not in signs and "%" not in signs and "_" not in signs and ";" not in signs and ">" not in signs and "," not in signs and "&" not in signs and '"' not in signs and "<" not in signs ]
infixes = nlp.Defaults.prefixes + [r"[-]~"]
infix_re = spacy.util.compile_infix_regex(infixes)
def custom_tokenizer(nlp):
return Tokenizer(nlp.vocab, infix_finditer=infix_re.finditer)
nlp.tokenizer = custom_tokenizer(nlp)
entities = {}
for row in annotations:
row = row.split("||")
text_info = row[0]
type_info = row[1]
offset_start = text_info.split(" ")[-2]
offset_end = text_info.split(" ")[-1]
start_line, word_start = offset_start.split(":")
end_line, word_end = offset_end.split(":")
label = type_info.split('"')[-2]
if start_line != end_line:
# This happens very infrequently (only about 10 times in total)
# so we just skip these annotations
continue
else:
spans_by_line[int(start_line)].append(
(int(word_start), int(word_end), label)
)
if start_line in entities :
entities[start_line].append(text_info.split('"')[1])
else :
entities[start_line] = [text_info.split('"')[1]]
extracted_entities = {}
for i, line in enumerate(lines):
n = i + 1
line = line.replace(" "," ")
doc = nlp.make_doc(line)
if n in spans_by_line:
ents = [
Span(doc, start, end + 1, label=label)
for (start, end, label) in spans_by_line[n]
]
ents = [
e for e in ents if bool(e.text.strip()) and e.text.strip() == e.text
]
doc.ents = filter_spans(ents)
extracted_entities[str(n)] = [ e.text for e in ents if bool(e.text.strip()) and e.text.strip() == e.text ]
docs.append(doc)
for key , value in entities.items() :
if key in extracted_entities :
if functools.reduce(lambda x, y : x and y, map(lambda p, q: p.lower() != q.lower(),entities[key],extracted_entities[key]), True):
difference = difference+[key]+entities[key]+extracted_entities[key]
else :
difference = difference+[key+" Key not present"]+entities[key]
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write('\n'.join(difference))
return [Doc.from_docs(docs)] if merge_docs else docs
def docs_from_many_clinical_records(
base_path: Path, nlp: Language = spacy.blank("en"), merge_docs: bool = True
) -> List[Doc]:
"""Convert raw n2c2 annotated clinical records into a list of
spaCy Doc objects to be ready to be used in training
base_path (Path): Root path to the raw data
nlp (Language): spaCy Language object. Defaults to spacy.blank("en").
merge_docs (bool): If True: merge all lines into a single spaCy doc so
there is only 1 element in the output array.
If False: create a spaCy doc for each line in the original record
RETURNS (List[Doc]): List of spaCy Doc objects with entity spans set
"""
all_docs = []
concept_paths = sorted((base_path / "concepts").glob("*.txt.con"))
document_paths = sorted((base_path / "docs").glob("*.txt"))
for con_path, doc_path in zip(concept_paths, document_paths):
with open(r'/notebooks/Clinical_NER/difference.txt', 'a') as fp:
fp.write('\n'+str(con_path.stem))
annotations = con_path.open().read().splitlines()
lines = doc_path.open().read().splitlines()
docs = docs_from_clinical_record(lines, annotations, nlp, merge_docs=merge_docs)
all_docs += docs
return all_docs
if __name__ == "__main__":
typer.run(main)
|