Spaces:
Running
Running
File size: 5,657 Bytes
873b70f 6e35819 3a7a44c ea4284c 6e35819 ea4284c 6e35819 3a7a44c 6e35819 ea4284c 6e35819 873b70f 3a7a44c 873b70f 3a7a44c ea4284c 873b70f ea4284c 873b70f 312213e 3a7a44c 312213e 3a7a44c 312213e ea4284c 312213e 3a7a44c 312213e 3a7a44c 5cca310 312213e 3a7a44c 312213e 3a7a44c 312213e 3a7a44c 5cca310 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
import pandas as pd
import pytest
from data_access import calculate_cumulative_statistics_for_all_questions, get_metadata, get_run_ids, \
get_async_connection, get_questions
from data_access import get_unified_sources
@pytest.mark.asyncio
async def test_get_questions():
source_run_id = 2
baseline_source_finder_run_id = 1
async with get_async_connection() as conn:
actual = await get_questions(conn, source_run_id, baseline_source_finder_run_id)
assert len(actual) == 10
@pytest.mark.asyncio
async def test_get_unified_sources():
async with get_async_connection() as conn:
results, stats = await get_unified_sources(conn,2, 2, 1)
assert results is not None
assert stats is not None
# Check number of rows in results.csv list
assert len(results) > 4, "Results should contain at least one row"
# Check number of rows in stats DataFrame
assert stats.shape[0] > 0, "Stats DataFrame should contain at least one row"
# You can also check specific stats columns
assert "overlap_count" in stats.columns, "Stats should contain overlap_count"
@pytest.mark.asyncio
async def test_calculate_cumulative_statistics_for_all_questions():
# Test with known source_finder_id, run_id, and ranker_id
source_finder_run_id = 2
ranker_id = 1
# Call the function to test
async with get_async_connection() as conn:
questions = await get_questions(conn, source_finder_run_id, ranker_id)
question_ids = [question['id'] for question in questions]
result = await calculate_cumulative_statistics_for_all_questions(conn, question_ids, source_finder_run_id, ranker_id)
# Check basic structure of results.csv
assert isinstance(result, pd.DataFrame), "Result should be a pandas DataFrame"
assert result.shape[0] == 1, "Result should have one row"
# Check required columns exist
expected_columns = [
"total_questions_analyzed",
"total_baseline_sources",
"total_found_sources",
"total_overlap_count",
"overall_overlap_percentage",
"total_high_ranked_baseline_sources",
"total_high_ranked_found_sources",
"total_high_ranked_overlap_count",
"overall_high_ranked_overlap_percentage",
"avg_baseline_sources_per_question",
"avg_found_sources_per_question"
]
for column in expected_columns:
assert column in result.columns, f"Column {column} should be in result DataFrame"
# Check some basic value validations
assert result["total_questions_analyzed"].iloc[0] >= 0, "Should have zero or more questions analyzed"
assert result["total_baseline_sources"].iloc[0] >= 0, "Should have zero or more baseline sources"
assert result["total_found_sources"].iloc[0] >= 0, "Should have zero or more found sources"
# Check that percentages are within valid ranges
assert 0 <= result["overall_overlap_percentage"].iloc[0] <= 100, "Overlap percentage should be between 0 and 100"
assert 0 <= result["overall_high_ranked_overlap_percentage"].iloc[
0] <= 100, "High ranked overlap percentage should be between 0 and 100"
@pytest.mark.asyncio
async def test_get_metadata_none_returned():
# Test with known source_finder_id, run_id, and ranker_id
source_finder_run_id = 1
question_id = 1
# Call the function to test
async with get_async_connection() as conn:
result = await get_metadata(conn, question_id, source_finder_run_id)
assert result == {}, "Should return empty string when no metadata is found"
@pytest.mark.asyncio
async def test_get_metadata():
# Test with known source_finder_id, run_id, and ranker_id
source_finder_run_id = 4
question_id = 1
# Call the function to test
async with get_async_connection() as conn:
result = await get_metadata(conn, question_id, source_finder_run_id)
assert result is not None, "Should return metadata when it exists"
@pytest.mark.asyncio
async def test_get_run_ids():
# Test with known question_id and source_finder_id
question_id = 2 # Using a question ID that exists in the test database
source_finder_id = 2 # Using a source finder ID that exists in the test database
# Call the function to test
async with get_async_connection() as conn:
result = await get_run_ids(conn, source_finder_id, question_id)
# Verify the result is a dictionary
assert isinstance(result, dict), "Result should be a dictionary"
# Check that the dictionary is not empty (assuming there are run IDs for this question/source finder)
assert len(result) > 0, "Should return at least one run ID"
# Test with a non-existent question_id
non_existent_question_id = 9999
empty_result = await get_run_ids(conn, source_finder_id, non_existent_question_id)
assert isinstance(empty_result, dict), "Should return an empty dictionary for non-existent question"
assert len(empty_result) == 0, "Should return empty dictionary for non-existent question"
@pytest.mark.asyncio
async def test_get_run_ids_no_question_id():
source_finder_id = 2 # Using a source finder ID that exists in the test database
# Call the function to test
async with get_async_connection() as conn:
result = await get_run_ids(conn, source_finder_id)
# Verify the result is a dictionary
assert isinstance(result, dict), "Result should be a dictionary"
# Check that the dictionary is not empty (assuming there are run IDs for this question/source finder)
assert len(result) > 0, "Should return at least one run ID"
|