gliner_multi / GLiNER /modules /run_evaluation.py
Tom Aarsen
Add cloned GLiNER repository
914502f
import glob
import json
import os
import os
import torch
from tqdm import tqdm
import random
def open_content(path):
paths = glob.glob(os.path.join(path, "*.json"))
train, dev, test, labels = None, None, None, None
for p in paths:
if "train" in p:
with open(p, "r") as f:
train = json.load(f)
elif "dev" in p:
with open(p, "r") as f:
dev = json.load(f)
elif "test" in p:
with open(p, "r") as f:
test = json.load(f)
elif "labels" in p:
with open(p, "r") as f:
labels = json.load(f)
return train, dev, test, labels
def process(data):
words = data['sentence'].split()
entities = [] # List of entities (start, end, type)
for entity in data['entities']:
start_char, end_char = entity['pos']
# Initialize variables to keep track of word positions
start_word = None
end_word = None
# Iterate through words and find the word positions
char_count = 0
for i, word in enumerate(words):
word_length = len(word)
if char_count == start_char:
start_word = i
if char_count + word_length == end_char:
end_word = i
break
char_count += word_length + 1 # Add 1 for the space
# Append the word positions to the list
entities.append((start_word, end_word, entity['type']))
# Create a list of word positions for each entity
sample = {
"tokenized_text": words,
"ner": entities
}
return sample
# create dataset
def create_dataset(path):
train, dev, test, labels = open_content(path)
train_dataset = []
dev_dataset = []
test_dataset = []
for data in train:
train_dataset.append(process(data))
for data in dev:
dev_dataset.append(process(data))
for data in test:
test_dataset.append(process(data))
return train_dataset, dev_dataset, test_dataset, labels
@torch.no_grad()
def get_for_one_path(path, model):
# load the dataset
_, _, test_dataset, entity_types = create_dataset(path)
data_name = path.split("/")[-1] # get the name of the dataset
# check if the dataset is flat_ner
flat_ner = True
if any([i in data_name for i in ["ACE", "GENIA", "Corpus"]]):
flat_ner = False
# evaluate the model
results, f1 = model.evaluate(test_dataset, flat_ner=flat_ner, threshold=0.5, batch_size=12,
entity_types=entity_types)
return data_name, results, f1
def get_for_all_path(model, steps, log_dir, data_paths):
all_paths = glob.glob(f"{data_paths}/*")
all_paths = sorted(all_paths)
# move the model to the device
device = next(model.parameters()).device
model.to(device)
# set the model to eval mode
model.eval()
# log the results
save_path = os.path.join(log_dir, "results.txt")
with open(save_path, "a") as f:
f.write("##############################################\n")
# write step
f.write("step: " + str(steps) + "\n")
zero_shot_benc = ["mit-movie", "mit-restaurant", "CrossNER_AI", "CrossNER_literature", "CrossNER_music",
"CrossNER_politics", "CrossNER_science"]
zero_shot_benc_results = {}
all_results = {} # without crossNER
for p in tqdm(all_paths):
if "sample_" not in p:
data_name, results, f1 = get_for_one_path(p, model)
# write to file
with open(save_path, "a") as f:
f.write(data_name + "\n")
f.write(str(results) + "\n")
if data_name in zero_shot_benc:
zero_shot_benc_results[data_name] = f1
else:
all_results[data_name] = f1
avg_all = sum(all_results.values()) / len(all_results)
avg_zs = sum(zero_shot_benc_results.values()) / len(zero_shot_benc_results)
save_path_table = os.path.join(log_dir, "tables.txt")
# results for all datasets except crossNER
table_bench_all = ""
for k, v in all_results.items():
table_bench_all += f"{k:20}: {v:.1%}\n"
# (20 size aswell for average i.e. :20)
table_bench_all += f"{'Average':20}: {avg_all:.1%}"
# results for zero-shot benchmark
table_bench_zeroshot = ""
for k, v in zero_shot_benc_results.items():
table_bench_zeroshot += f"{k:20}: {v:.1%}\n"
table_bench_zeroshot += f"{'Average':20}: {avg_zs:.1%}"
# write to file
with open(save_path_table, "a") as f:
f.write("##############################################\n")
f.write("step: " + str(steps) + "\n")
f.write("Table for all datasets except crossNER\n")
f.write(table_bench_all + "\n\n")
f.write("Table for zero-shot benchmark\n")
f.write(table_bench_zeroshot + "\n")
f.write("##############################################\n\n")
def sample_train_data(data_paths, sample_size=10000):
all_paths = glob.glob(f"{data_paths}/*")
all_paths = sorted(all_paths)
# to exclude the zero-shot benchmark datasets
zero_shot_benc = ["CrossNER_AI", "CrossNER_literature", "CrossNER_music",
"CrossNER_politics", "CrossNER_science", "ACE 2004"]
new_train = []
# take 10k samples from each dataset
for p in tqdm(all_paths):
if any([i in p for i in zero_shot_benc]):
continue
train, dev, test, labels = create_dataset(p)
# add label key to the train data
for i in range(len(train)):
train[i]["label"] = labels
random.shuffle(train)
train = train[:sample_size]
new_train.extend(train)
return new_train