Sem-nCG / tests.py
nbansal's picture
Added SemNCG metric
a54024a
raw
history blame
18.1 kB
import statistics
import unittest
from unittest.mock import patch, MagicMock
import numpy as np
import torch
from numpy.testing import assert_almost_equal
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from .encoder_models import SBertEncoder, get_encoder, get_sbert_encoder
from .semncg import RankedGains, compute_cosine_similarity, compute_gain, score_ncg, compute_ncg, _validate_input_format, SemnCG
from .utils import get_gpu, slice_embeddings, is_nested_list_of_type, flatten_list, prep_sentences, tokenize_and_prep_document
class TestUtils(unittest.TestCase):
def test_get_gpu(self):
gpu_count = torch.cuda.device_count()
gpu_available = torch.cuda.is_available()
# Test single boolean input
self.assertEqual(get_gpu(True), 0 if gpu_available else "cpu")
self.assertEqual(get_gpu(False), "cpu")
# Test single string input
self.assertEqual(get_gpu("cpu"), "cpu")
self.assertEqual(get_gpu("gpu"), 0 if gpu_available else "cpu")
self.assertEqual(get_gpu("cuda"), 0 if gpu_available else "cpu")
# Test single integer input
self.assertEqual(get_gpu(0), 0 if gpu_available else "cpu")
self.assertEqual(get_gpu(1), 1 if gpu_available else "cpu")
# Test list input with unique elements
self.assertEqual(get_gpu([True, "cpu", 0]), [0, "cpu"] if gpu_available else ["cpu", "cpu", "cpu"])
# Test list input with duplicate elements
self.assertEqual(get_gpu([0, 0, "gpu"]), 0 if gpu_available else ["cpu", "cpu", "cpu"])
# Test list input with duplicate elements of different types
self.assertEqual(get_gpu([True, 0, "gpu"]), 0 if gpu_available else ["cpu", "cpu", "cpu"])
# Test list input but only one element
self.assertEqual(get_gpu([True]), 0 if gpu_available else "cpu")
# Test list input with all integers
self.assertEqual(get_gpu(list(range(gpu_count))),
list(range(gpu_count)) if gpu_available else gpu_count * ["cpu"])
with self.assertRaises(ValueError):
get_gpu("invalid")
with self.assertRaises(ValueError):
get_gpu(torch.cuda.device_count())
def test_prep_sentences(self):
# Test normal case
self.assertEqual(prep_sentences(["Hello, world!", " This is a test. ", "!!!"]),
['Hello, world!', 'This is a test.'])
# Test case with only punctuations
with self.assertRaises(ValueError):
prep_sentences(["!!!", "..."])
# Test case with empty list
with self.assertRaises(ValueError):
prep_sentences([])
def test_tokenize_and_prep_document(self):
# Test tokenize=True with string input
self.assertEqual(tokenize_and_prep_document("Hello, world! This is a test.", True),
['Hello, world!', 'This is a test.'])
# Test tokenize=False with list of strings input
self.assertEqual(tokenize_and_prep_document(["Hello, world!", "This is a test."], False),
['Hello, world!', 'This is a test.'])
# Test tokenize=True with empty document
with self.assertRaises(ValueError):
tokenize_and_prep_document("!!! ...", True)
def test_slice_embeddings(self):
# Case 1
embeddings = np.random.rand(10, 5)
num_sentences = [3, 2, 5]
expected_output = [embeddings[:3], embeddings[3:5], embeddings[5:]]
self.assertTrue(
all(np.array_equal(a, b) for a, b in zip(slice_embeddings(embeddings, num_sentences),
expected_output))
)
# Case 2
num_sentences_nested = [[2, 1], [3, 4]]
expected_output_nested = [[embeddings[:2], embeddings[2:3]], [embeddings[3:6], embeddings[6:]]]
self.assertTrue(
slice_embeddings(embeddings, num_sentences_nested), expected_output_nested
)
# Case 3
document_sentences_count = [10, 8, 7]
reference_sentences_count = [5, 3, 2]
pred_sentences_count = [2, 2, 1]
all_embeddings = np.random.rand(
sum(document_sentences_count + reference_sentences_count + pred_sentences_count), 5,
)
embeddings = all_embeddings
expected_doc_embeddings = [embeddings[:10], embeddings[10:18], embeddings[18:25]]
embeddings = all_embeddings[25:]
expected_ref_embeddings = [embeddings[:5], embeddings[5:8], embeddings[8:10]]
embeddings = all_embeddings[35:]
expected_pred_embeddings = [embeddings[:2], embeddings[2:4], embeddings[4:5]]
doc_embeddings = slice_embeddings(all_embeddings, document_sentences_count)
ref_embeddings = slice_embeddings(all_embeddings[sum(document_sentences_count):], reference_sentences_count)
pred_embeddings = slice_embeddings(
all_embeddings[sum(document_sentences_count+reference_sentences_count):], pred_sentences_count
)
self.assertTrue(doc_embeddings, expected_doc_embeddings)
self.assertTrue(ref_embeddings, expected_ref_embeddings)
self.assertTrue(pred_embeddings, expected_pred_embeddings)
with self.assertRaises(TypeError):
slice_embeddings(embeddings, "invalid")
def test_is_nested_list_of_type(self):
# Test case: Depth 0, single element matching element_type
self.assertTrue(is_nested_list_of_type("test", str, 0))
# Test case: Depth 0, single element not matching element_type
self.assertFalse(is_nested_list_of_type("test", int, 0))
# Test case: Depth 1, list of elements matching element_type
self.assertTrue(is_nested_list_of_type(["apple", "banana"], str, 1))
# Test case: Depth 1, list of elements not matching element_type
self.assertFalse(is_nested_list_of_type([1, 2, 3], str, 1))
# Test case: Depth 0 (Wrong), list of elements matching element_type
self.assertFalse(is_nested_list_of_type([1, 2, 3], str, 0))
# Depth 2
self.assertTrue(is_nested_list_of_type([[1, 2], [3, 4]], int, 2))
self.assertTrue(is_nested_list_of_type([['1', '2'], ['3', '4']], str, 2))
self.assertFalse(is_nested_list_of_type([[1, 2], ["a", "b"]], int, 2))
# Depth 3
self.assertFalse(is_nested_list_of_type([[[1], [2]], [[3], [4]]], list, 3))
self.assertTrue(is_nested_list_of_type([[[1], [2]], [[3], [4]]], int, 3))
with self.assertRaises(ValueError):
is_nested_list_of_type([1, 2], int, -1)
def test_flatten_list(self):
self.assertEqual(flatten_list([1, [2, 3], [[4], 5]]), [1, 2, 3, 4, 5])
self.assertEqual(flatten_list([]), [])
self.assertEqual(flatten_list([1, 2, 3]), [1, 2, 3])
self.assertEqual(flatten_list([[[[1]]]]), [1])
class TestSBertEncoder(unittest.TestCase):
def setUp(self) -> None:
# Set up a test SentenceTransformer model
self.model_name = "paraphrase-distilroberta-base-v1"
self.sbert_model = get_sbert_encoder(self.model_name)
self.device = "cpu" # For testing on CPU
self.batch_size = 32
self.verbose = False
self.encoder = SBertEncoder(self.sbert_model, self.device, self.batch_size, self.verbose)
def test_encode_single_sentence(self):
sentence = "Hello, world!"
embeddings = self.encoder.encode([sentence])
self.assertEqual(embeddings.shape, (1, 768)) # Adjust shape based on your model's embedding dimension
def test_encode_multiple_sentences(self):
sentences = ["Hello, world!", "This is a test."]
embeddings = self.encoder.encode(sentences)
self.assertEqual(embeddings.shape, (2, 768)) # Adjust shape based on your model's embedding dimension
def test_get_sbert_encoder(self):
model_name = "paraphrase-distilroberta-base-v1"
sbert_model = get_sbert_encoder(model_name)
self.assertIsInstance(sbert_model, SentenceTransformer)
def test_encode_with_gpu(self):
if torch.cuda.is_available():
device = "cuda"
encoder = get_encoder(self.sbert_model, device, self.batch_size, self.verbose)
sentences = ["Hello, world!", "This is a test."]
embeddings = encoder.encode(sentences)
self.assertEqual(embeddings.shape, (2, 768)) # Adjust shape based on your model's embedding dimension
else:
self.skipTest("CUDA not available, skipping GPU test.")
def test_encode_multi_device(self):
if torch.cuda.device_count() < 2:
self.skipTest("Multi-GPU test requires at least 2 GPUs.")
else:
devices = ["cuda:0", "cuda:1"]
encoder = get_encoder(self.sbert_model, devices, self.batch_size, self.verbose)
sentences = ["This is a test sentence.", "Here is another sentence.", "This is a test sentence."]
embeddings = encoder.encode(sentences)
self.assertIsInstance(embeddings, np.ndarray)
self.assertEqual(embeddings.shape[0], 3)
self.assertEqual(embeddings.shape[1], self.encoder.model.get_sentence_embedding_dimension())
class TestGetEncoder(unittest.TestCase):
def setUp(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.batch_size = 8
self.verbose = False
def _base_test(self, model_name):
sbert_model = get_sbert_encoder(model_name)
encoder = get_encoder(sbert_model, self.device, self.batch_size, self.verbose)
# Assert
self.assertIsInstance(encoder, SBertEncoder)
self.assertEqual(encoder.device, self.device)
self.assertEqual(encoder.batch_size, self.batch_size)
self.assertEqual(encoder.verbose, self.verbose)
def test_get_sbert_encoder(self):
model_name = "stsb-roberta-large"
self._base_test(model_name)
def test_sbert_model(self):
model_name = "all-mpnet-base-v2"
self._base_test(model_name)
def test_huggingface_model(self):
"""Test Huggingface models which work with SBert library"""
model_name = "roberta-base"
self._base_test(model_name)
def test_get_encoder_environment_error(self): # This parameter is used when using patch decorator
model_name = "abc" # Wrong model_name
with self.assertRaises(EnvironmentError):
get_sbert_encoder(model_name)
def test_get_encoder_other_exception(self):
model_name = "apple/OpenELM-270M" # This model is not supported by SentenceTransformer lib
with self.assertRaises(RuntimeError):
get_sbert_encoder(model_name)
class TestRankedGainsDataclass(unittest.TestCase):
def test_ranked_gains_dataclass(self):
# Test initialization and attribute access
gt_gains = [("doc1", 0.8), ("doc2", 0.6)]
pred_gains = [("doc2", 0.7), ("doc1", 0.5)]
k = 2
ncg = 0.75
ranked_gains = RankedGains(gt_gains, pred_gains, k, ncg)
self.assertEqual(ranked_gains.gt_gains, gt_gains)
self.assertEqual(ranked_gains.pred_gains, pred_gains)
self.assertEqual(ranked_gains.k, k)
self.assertEqual(ranked_gains.ncg, ncg)
class TestComputeCosineSimilarity(unittest.TestCase):
def test_compute_cosine_similarity(self):
doc_embeds = np.array([[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]])
ref_embeds = np.array([[0.2, 0.3, 0.4], [0.5, 0.6, 0.7]])
# Test compute_cosine_similarity function
similarity_scores = compute_cosine_similarity(doc_embeds, ref_embeds)
print(similarity_scores)
# Example values, change as per actual function output
expected_scores = [0.980, 0.997]
self.assertAlmostEqual(similarity_scores[0], expected_scores[0], places=3)
self.assertAlmostEqual(similarity_scores[1], expected_scores[1], places=3)
class TestComputeGain(unittest.TestCase):
def test_compute_gain(self):
# Test compute_gain function
sim_scores = [0.8, 0.6, 0.7]
gains = compute_gain(sim_scores)
print(gains)
# Example values, change as per actual function output
expected_gains = [(0, 0.5), (2, 0.3333333333333333), (1, 0.16666666666666666)]
self.assertEqual(gains, expected_gains)
class TestScoreNcg(unittest.TestCase):
def test_score_ncg(self):
# Test score_ncg function
model_relevance = [0.8, 0.7, 0.6]
gt_relevance = [1.0, 0.9, 0.8]
ncg_score = score_ncg(model_relevance, gt_relevance)
expected_ncg = 0.778 # Example value, change as per actual function output
self.assertAlmostEqual(ncg_score, expected_ncg, places=3)
class TestComputeNcg(unittest.TestCase):
def test_compute_ncg(self):
# Test compute_ncg function
pred_gains = [(0, 0.8), (2, 0.7), (1, 0.6)]
gt_gains = [(0, 1.0), (1, 0.9), (2, 0.8)]
k = 3
ncg_score = compute_ncg(pred_gains, gt_gains, k)
expected_ncg = 1.0 # TODO: Confirm this with Dr. Santu
self.assertAlmostEqual(ncg_score, expected_ncg, places=6)
class TestValidateInputFormat(unittest.TestCase):
def test_validate_input_format(self):
# Test _validate_input_format function
tokenize_sentences = True
predictions = ["Prediction 1", "Prediction 2"]
references = ["Reference 1", "Reference 2"]
documents = ["Document 1", "Document 2"]
# No exception should be raised for valid input
try:
_validate_input_format(tokenize_sentences, predictions, references, documents)
except ValueError as e:
self.fail(f"_validate_input_format raised ValueError unexpectedly: {str(e)}")
# Test invalid input format
predictions_invalid = [["Sentence 1 in prediction 1.", "Sentence 2 in prediction 1."],
["Sentence 1 in prediction 2.", "Sentence 2 in prediction 2."]]
references_invalid = [["Sentences in reference 1."], ["Sentences in reference 2."]]
documents_invalid = [["Sentence 1 in document 1.", "Sentence 2 in document 1."],
["Sentence 1 in document 2.", "Sentence 2 in document 2."]]
with self.assertRaises(ValueError):
_validate_input_format(tokenize_sentences, predictions_invalid, references, documents)
with self.assertRaises(ValueError):
_validate_input_format(tokenize_sentences, predictions, references_invalid, documents)
with self.assertRaises(ValueError):
_validate_input_format(tokenize_sentences, predictions, references, documents_invalid)
class TestSemnCG(unittest.TestCase):
def setUp(self):
self.model_name = "stsb-distilbert-base"
self.metric = SemnCG(self.model_name)
def _basic_assertion(self, result, debug: bool = False):
self.assertIsInstance(result, tuple)
self.assertEqual(len(result), 2)
self.assertIsInstance(result[0], float)
self.assertTrue(0.0 <= result[0] <= 1.0)
self.assertIsInstance(result[1], list)
if debug:
for ranked_gain in result[1]:
self.assertTrue(isinstance(ranked_gain, RankedGains))
self.assertTrue(0.0 <= ranked_gain.ncg <= 1.0)
else:
for gain in result[1]:
self.assertTrue(isinstance(gain, float))
self.assertTrue(0.0 <= gain <= 1.0)
def test_compute_basic(self):
predictions = ["The cat sat on the mat.", "The quick brown fox jumps over the lazy dog."]
references = ["A cat was sitting on a mat.", "A quick brown fox jumped over a lazy dog."]
documents = ["There was a cat on a mat.", "The quick brown fox jumped over the lazy dog."]
result = self.metric.compute(predictions=predictions, references=references, documents=documents)
self._basic_assertion(result)
def test_compute_with_tokenization(self):
predictions = [["The cat sat on the mat."], ["The quick brown fox jumps over the lazy dog."]]
references = [["A cat was sitting on a mat."], ["A quick brown fox jumped over a lazy dog."]]
documents = [["There was a cat on a mat."], ["The quick brown fox jumped over the lazy dog."]]
result = self.metric.compute(
predictions=predictions, references=references, documents=documents, tokenize_sentences=False
)
self._basic_assertion(result)
def test_compute_with_pre_compute_embeddings(self):
predictions = ["The cat sat on the mat.", "The quick brown fox jumps over the lazy dog."]
references = ["A cat was sitting on a mat.", "A quick brown fox jumped over a lazy dog."]
documents = ["There was a cat on a mat.", "The quick brown fox jumped over the lazy dog."]
result = self.metric.compute(
predictions=predictions, references=references, documents=documents, pre_compute_embeddings=True
)
self._basic_assertion(result)
def test_compute_with_debug(self):
predictions = ["The cat sat on the mat.", "The quick brown fox jumps over the lazy dog."]
references = ["A cat was sitting on a mat.", "A quick brown fox jumped over a lazy dog."]
documents = ["There was a cat on a mat.", "The quick brown fox jumped over the lazy dog."]
result = self.metric.compute(
predictions=predictions, references=references, documents=documents, debug=True
)
self._basic_assertion(result, debug=True)
def test_compute_invalid_input_format(self):
predictions = "The cat sat on the mat."
references = ["A cat was sitting on a mat."]
documents = ["There was a cat on a mat."]
with self.assertRaises(ValueError):
self.metric.compute(predictions=predictions, references=references, documents=documents)
if __name__ == '__main__':
unittest.main(verbosity=2)