|
import os |
|
import zipfile |
|
from configuration import get_aida_yago_tsv_file_path, get_resources_dir |
|
|
|
TRAIN_START_LINE = "-DOCSTART- (1 EU)" |
|
TESTA_START_LINE = "-DOCSTART- (947testa CRICKET)" |
|
TESTB_START_LINE = "-DOCSTART- (1163testb SOCCER)" |
|
|
|
CANONICAL_REDIRECTS = None |
|
|
|
|
|
class AnnotationRecord: |
|
def __init__(self, line): |
|
""" |
|
Lines with tabs are tokens the are part of a mention: |
|
- column 1 is the token |
|
- column 2 is either B (beginning of a mention) or I (continuation of a mention) |
|
- column 3 is the full mention used to find entity candidates |
|
- column 4 is the corresponding YAGO2 entity (in YAGO encoding, i.e. unicode characters are backslash encoded and spaces are replaced by underscores, see also the tools on the YAGO2 website), OR --NME--, denoting that there is no matching entity in YAGO2 for this particular mention, or that we are missing the connection between the mention string and the YAGO2 entity. |
|
- column 5 is the corresponding Wikipedia URL of the entity (added for convenience when evaluating against a Wikipedia based method) |
|
- column 6 is the corresponding Wikipedia ID of the entity (added for convenience when evaluating against a Wikipedia based method - the ID refers to the dump used for annotation, 2010-08-17) |
|
- column 7 is the corresponding Freebase mid, if there is one (thanks to Massimiliano Ciaramita from Google Zürich for creating the mapping and making it available to us) |
|
""" |
|
data_columns = line.split('\t') |
|
self.token = None |
|
self.begin_inside_tag = None |
|
self.full_mention = None |
|
self.yago_entity = None |
|
self.wikipedia_url = None |
|
self.wikipedia_id = None |
|
self.freebase_mid = None |
|
self.candidates = None |
|
if data_columns: |
|
self.token = data_columns[0] |
|
if len(data_columns) > 1: |
|
self.begin_inside_tag = data_columns[1] |
|
if len(data_columns) > 2: |
|
self.full_mention = data_columns[2] |
|
if len(data_columns) > 3: |
|
self.yago_entity = data_columns[3] |
|
if len(data_columns) > 4: |
|
self.wikipedia_url = data_columns[4] |
|
if len(data_columns) > 5: |
|
self.wikipedia_id = data_columns[5] |
|
if len(data_columns) > 6: |
|
self.freebase_mid = data_columns[6] |
|
|
|
def set_candidates(self, candidate_record): |
|
self.candidates = candidate_record |
|
self.candidates.non_considered_word_count -= 1 |
|
|
|
def __str__(self): |
|
res = "" |
|
t = [self.token, self.begin_inside_tag, self.full_mention, self.yago_entity, self.wikipedia_url, |
|
self.wikipedia_id, self.freebase_mid] |
|
for ind, e in enumerate(t): |
|
if not e: |
|
continue |
|
if ind < len(t) - 1: |
|
res += e + "|" |
|
else: |
|
res += e |
|
if res[-1] == "|": |
|
res = res[:-1] |
|
return res |
|
|
|
|
|
class Document: |
|
def __init__(self, document_id): |
|
self.document_id = document_id |
|
self.annotations = [] |
|
self.current_annotation = [] |
|
|
|
def add_annotation(self, line, candidates): |
|
if not line: |
|
self.flush_current_annotation() |
|
else: |
|
ar = AnnotationRecord(line) |
|
for c in candidates: |
|
if c.non_considered_word_count < 1: |
|
continue |
|
if c.orig_text == ar.full_mention: |
|
ar.set_candidates(c) |
|
break |
|
self.current_annotation.append(ar) |
|
|
|
def flush_current_annotation(self): |
|
self.annotations.append(self.current_annotation) |
|
self.current_annotation = [] |
|
|
|
|
|
class Candidate: |
|
def __init__(self, candidate_line): |
|
self.id = "" |
|
self.in_count = 0 |
|
self.out_count = 0 |
|
self.links = 0 |
|
self.url = "" |
|
self.name = "" |
|
self.normal_name = "" |
|
self.normal_wiki_title = "" |
|
self.predicted_type = "" |
|
for item in candidate_line.split('\t'): |
|
if item == 'CANDIDATE' or not item.strip(): |
|
continue |
|
elif item.startswith('id:'): |
|
self.id = item[3:] |
|
elif item.startswith('inCount:'): |
|
self.in_count = int(item[8:]) |
|
elif item.startswith('outCount:'): |
|
self.out_count = int(item[9:]) |
|
elif item.startswith('links:'): |
|
self.links = item[6:] |
|
elif item.startswith('url:'): |
|
self.url = item[4:] |
|
elif item.startswith('name:'): |
|
self.name = item[5:] |
|
elif item.startswith('normalName:'): |
|
self.normal_name = item[11:] |
|
elif item.startswith('normalWikiTitle:'): |
|
self.normal_wiki_title = item[16:] |
|
elif item.startswith('predictedType:'): |
|
self.predicted_type = item[14:] |
|
else: |
|
raise ValueError(f"Undefined PPRforNED CANDIDATE column: {item}") |
|
|
|
def __str__(self): |
|
return f"id: {self.id}\twiki_page: {self.url.replace('http://en.wikipedia.org/wiki/', '')}" |
|
|
|
|
|
class CandidateRecord: |
|
def __init__(self, entity_header): |
|
self.candidates = [] |
|
self.text = "" |
|
self.normal_name = "" |
|
self.predicted_type = "" |
|
self.q = False |
|
self.qid = "" |
|
self.docid = -1 |
|
self.orig_text = "" |
|
self.non_considered_word_count = 0 |
|
self.url = "" |
|
for item in entity_header.split('\t'): |
|
if item == 'ENTITY': |
|
continue |
|
elif item.startswith('text:'): |
|
self.text = item[5:] |
|
elif item.startswith('normalName:'): |
|
self.normal_name = item[11:] |
|
elif item.startswith('predictedType:'): |
|
self.predicted_type = item[14:] |
|
elif item.startswith('q:'): |
|
self.q = bool(item[2:]) |
|
elif item.startswith('qid:'): |
|
self.qid = item[4:] |
|
elif item.startswith('docId:'): |
|
self.docid = int(item[6:]) - 1 |
|
elif item.startswith('origText:'): |
|
self.orig_text = item[9:] |
|
self.non_considered_word_count = len(self.orig_text.split()) |
|
elif item.startswith('url:'): |
|
self.url = item[4:] |
|
else: |
|
raise ValueError(f"Undefined PPRforNED column: {item}") |
|
|
|
def add_candidate(self, candidate_line): |
|
self.candidates.append(Candidate(candidate_line)) |
|
|
|
def __str__(self): |
|
cnds = '\n\t'.join([str(x) for x in self.candidates]) |
|
return f"doc_id: {self.docid}\toriginal_text: {self.orig_text}\tcandidates:\n\t{cnds}" |
|
|
|
|
|
def get_candidates(ppr_for_ned_candidates_zip, last_document_id): |
|
candidates_string = ppr_for_ned_candidates_zip.read(str(last_document_id + 1)).decode("utf-8").split("\n") |
|
candidates = [] |
|
for c_line in candidates_string: |
|
if not c_line.strip(): |
|
continue |
|
if c_line.startswith("ENTITY"): |
|
candidates.append(CandidateRecord(c_line)) |
|
elif c_line.startswith("CANDIDATE"): |
|
assert len(candidates) |
|
candidates[-1].add_candidate(c_line) |
|
else: |
|
raise ValueError("This must be unreachable!") |
|
return candidates |
|
|
|
class AIDADataset: |
|
def __init__(self): |
|
super(AIDADataset, self).__init__() |
|
self.dataset = None |
|
self.data_path = str(get_aida_yago_tsv_file_path().absolute()) |
|
assert os.path.exists(self.data_path), f"The passed dataset address: {self.data_path} does not exist" |
|
self.load_dataset() |
|
|
|
def load_dataset(self): |
|
ppr_for_ned_candidates_zip = zipfile.ZipFile(get_resources_dir() / "data" / "PPRforNED.zip", "r") |
|
annotations = [[], [], []] |
|
current_document = None |
|
current_document_candidates = None |
|
data_split_id = -1 |
|
last_document_id = 0 |
|
with open(self.data_path, "r", encoding="utf-8") as data_file: |
|
for ind, line in enumerate(data_file): |
|
line = line.strip() |
|
if line.startswith("-DOCSTART-"): |
|
if current_document: |
|
annotations[data_split_id].append(current_document) |
|
last_document_id += 1 |
|
if line == TRAIN_START_LINE or line == TESTA_START_LINE or line == TESTB_START_LINE: |
|
data_split_id += 1 |
|
current_document = Document(last_document_id) |
|
current_document_candidates = get_candidates(ppr_for_ned_candidates_zip, last_document_id) |
|
else: |
|
current_document.add_annotation(line, current_document_candidates) |
|
if current_document: |
|
annotations[data_split_id].append(current_document) |
|
self.dataset = {"train": annotations[0], "testa": annotations[1], "testb": annotations[2]} |
|
ppr_for_ned_candidates_zip.close() |
|
|