|
|
|
import pytest |
|
import logging |
|
import hashlib |
|
from unittest.mock import patch, MagicMock, ANY |
|
import requests |
|
|
|
from ankigen_core.utils import ( |
|
get_logger, |
|
ResponseCache, |
|
fetch_webpage_text, |
|
setup_logging, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
def test_get_logger_returns_logger_instance(): |
|
"""Test that get_logger returns a logging.Logger instance.""" |
|
logger = get_logger() |
|
assert isinstance(logger, logging.Logger) |
|
|
|
|
|
def test_get_logger_is_singleton(): |
|
"""Test that get_logger returns the same instance when called multiple times.""" |
|
logger1 = get_logger() |
|
logger2 = get_logger() |
|
assert logger1 is logger2 |
|
|
|
|
|
def test_setup_logging_configures_handlers(capsys): |
|
"""Test that setup_logging (called via get_logger) configures handlers |
|
and basic logging works. This is a more integrated test. |
|
""" |
|
|
|
|
|
|
|
from ankigen_core import utils |
|
|
|
original_logger_instance = utils._logger_instance |
|
utils._logger_instance = None |
|
|
|
logger = get_logger() |
|
|
|
|
|
|
|
|
|
assert ( |
|
len(logger.handlers) >= 1 |
|
) |
|
|
|
|
|
test_message = "Test INFO message for logging" |
|
logger.info(test_message) |
|
captured = capsys.readouterr() |
|
assert test_message in captured.out |
|
|
|
|
|
utils._logger_instance = original_logger_instance |
|
|
|
|
|
|
|
|
|
|
|
def test_response_cache_set_and_get(): |
|
"""Test basic set and get functionality of ResponseCache.""" |
|
cache = ResponseCache(maxsize=2) |
|
prompt1 = "What is Python?" |
|
model1 = "gpt-test" |
|
response1 = {"answer": "A programming language"} |
|
|
|
prompt2 = "What is Java?" |
|
model2 = "gpt-test" |
|
response2 = {"answer": "Another programming language"} |
|
|
|
cache.set(prompt1, model1, response1) |
|
cache.set(prompt2, model2, response2) |
|
|
|
retrieved_response1 = cache.get(prompt1, model1) |
|
assert retrieved_response1 == response1 |
|
|
|
retrieved_response2 = cache.get(prompt2, model2) |
|
assert retrieved_response2 == response2 |
|
|
|
|
|
def test_response_cache_get_non_existent(): |
|
"""Test get returns None for a key not in the cache.""" |
|
cache = ResponseCache() |
|
retrieved_response = cache.get("NonExistentPrompt", "test-model") |
|
assert retrieved_response is None |
|
|
|
|
|
def test_response_cache_key_creation_indirectly(): |
|
"""Test that different prompts or models result in different cache entries.""" |
|
cache = ResponseCache(maxsize=5) |
|
prompt1 = "Key test prompt 1" |
|
model_a = "model-a" |
|
model_b = "model-b" |
|
response_a = "Response for model A" |
|
response_b = "Response for model B" |
|
|
|
cache.set(prompt1, model_a, response_a) |
|
cache.set(prompt1, model_b, response_b) |
|
|
|
assert cache.get(prompt1, model_a) == response_a |
|
assert cache.get(prompt1, model_b) == response_b |
|
|
|
assert cache.get(prompt1, model_a) != response_b |
|
|
|
|
|
def test_response_cache_lru_eviction_simple(): |
|
"""Test basic LRU eviction if maxsize is hit. |
|
Focus on the fact that old items might be evicted. |
|
""" |
|
cache = ResponseCache(maxsize=1) |
|
prompt1 = "Prompt One" |
|
model1 = "m1" |
|
response1 = "Resp One" |
|
|
|
prompt2 = "Prompt Two" |
|
model2 = "m2" |
|
response2 = "Resp Two" |
|
|
|
cache.set(prompt1, model1, response1) |
|
assert cache.get(prompt1, model1) == response1 |
|
|
|
|
|
|
|
|
|
cache.set(prompt2, model2, response2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
assert cache.get(prompt2, model2) == response2 |
|
|
|
|
|
cache_lru = ResponseCache(maxsize=1) |
|
cache_lru.set("p1", "m", "r1") |
|
cache_lru.set("p2", "m", "r2") |
|
|
|
_ = cache_lru.get("p2", "m") |
|
retrieved_p1_after_p2_get = cache_lru.get( |
|
"p1", "m" |
|
) |
|
|
|
|
|
|
|
|
|
assert retrieved_p1_after_p2_get == "r1" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@patch("ankigen_core.utils.requests.get") |
|
def test_fetch_webpage_text_success(mock_requests_get): |
|
"""Test successful webpage fetching and text extraction.""" |
|
|
|
mock_response = MagicMock() |
|
mock_response.text = """ |
|
<html> |
|
<head><title>Test Page</title></head> |
|
<body> |
|
<header>Ignore this</header> |
|
<script>console.log("ignore scripts");</script> |
|
<main> |
|
<h1>Main Title</h1> |
|
<p>This is the first paragraph.</p> |
|
<p>Second paragraph with extra spaces.</p> |
|
<div>Div content</div> |
|
</main> |
|
<footer>Ignore footer too</footer> |
|
</body> |
|
</html> |
|
""" |
|
mock_response.raise_for_status = MagicMock() |
|
mock_requests_get.return_value = mock_response |
|
|
|
|
|
url = "http://example.com/test" |
|
extracted_text = fetch_webpage_text(url) |
|
|
|
|
|
mock_requests_get.assert_called_once_with( |
|
url, |
|
headers=pytest.approx( |
|
{ |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
|
} |
|
), |
|
timeout=15, |
|
) |
|
mock_response.raise_for_status.assert_called_once() |
|
|
|
|
|
expected_lines = [ |
|
"Main Title", |
|
"This is the first paragraph.", |
|
"Second paragraph with extra spaces.", |
|
"Div content", |
|
] |
|
actual_lines = extracted_text.split("\n") |
|
|
|
assert len(actual_lines) == len( |
|
expected_lines |
|
), f"Expected {len(expected_lines)} lines, got {len(actual_lines)}" |
|
|
|
for i, expected_line in enumerate(expected_lines): |
|
assert ( |
|
actual_lines[i] == expected_line |
|
), f"Line {i + 1} mismatch: Expected '{expected_line}', Got '{actual_lines[i]}'" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@patch("ankigen_core.utils.requests.get") |
|
def test_fetch_webpage_text_network_error(mock_requests_get): |
|
"""Test handling of network errors during webpage fetching.""" |
|
|
|
mock_requests_get.side_effect = requests.exceptions.RequestException( |
|
"Test Network Error" |
|
) |
|
|
|
url = "http://example.com/network-error" |
|
|
|
with pytest.raises(ConnectionError, match="Test Network Error"): |
|
fetch_webpage_text(url) |
|
|
|
mock_requests_get.assert_called_once_with( |
|
url, |
|
headers=pytest.approx( |
|
{ |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
|
} |
|
), |
|
timeout=15, |
|
) |
|
|
|
|
|
|
|
@patch("ankigen_core.utils.BeautifulSoup") |
|
@patch("ankigen_core.utils.requests.get") |
|
def test_fetch_webpage_text_parsing_error(mock_requests_get, mock_beautiful_soup): |
|
"""Test handling of HTML parsing errors (simulated by BeautifulSoup raising error).""" |
|
|
|
mock_response = MagicMock() |
|
mock_response.text = "<html><body>Invalid HTML?</body></html>" |
|
mock_response.raise_for_status = MagicMock() |
|
mock_requests_get.return_value = mock_response |
|
|
|
|
|
mock_beautiful_soup.side_effect = Exception("Test Parsing Error") |
|
|
|
url = "http://example.com/parsing-error" |
|
|
|
with pytest.raises(RuntimeError, match="Failed to parse HTML content"): |
|
fetch_webpage_text(url) |
|
|
|
mock_requests_get.assert_called_once_with( |
|
url, |
|
headers=pytest.approx( |
|
{ |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" |
|
} |
|
), |
|
timeout=15, |
|
) |
|
|
|
|
|
|
|
assert mock_beautiful_soup.call_count > 0 |
|
|
|
|
|
def test_fetch_webpage_text_empty_content(): |
|
"""Test handling when the extracted text is empty.""" |
|
mock_response = MagicMock() |
|
mock_response.text = "<html><body><script>only script</script></body></html>" |
|
mock_response.raise_for_status = MagicMock() |
|
|
|
with patch("ankigen_core.utils.requests.get", return_value=mock_response): |
|
url = "http://example.com/empty" |
|
extracted_text = fetch_webpage_text(url) |
|
assert extracted_text == "" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_setup_logging_initialization(): |
|
"""Test that setup_logging initializes and returns a logger.""" |
|
logger = setup_logging() |
|
assert isinstance(logger, logging.Logger) |
|
assert logger.name == "ankigen" |
|
assert len(logger.handlers) == 2 |
|
|
|
from ankigen_core import utils |
|
|
|
utils._logger_instance = None |
|
|
|
|
|
def test_setup_logging_singleton(): |
|
"""Test that setup_logging returns the same logger instance if called again.""" |
|
logger1 = setup_logging() |
|
logger2 = setup_logging() |
|
assert logger1 is logger2 |
|
from ankigen_core import utils |
|
|
|
utils._logger_instance = None |
|
|
|
|
|
def test_get_logger_flow(): |
|
"""Test get_logger calls setup_logging if no instance exists, else returns existing.""" |
|
from ankigen_core import utils |
|
|
|
utils._logger_instance = None |
|
|
|
|
|
logger1 = get_logger() |
|
assert utils._logger_instance is not None |
|
assert logger1 is utils._logger_instance |
|
|
|
|
|
logger2 = get_logger() |
|
assert logger2 is logger1 |
|
utils._logger_instance = None |
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture |
|
def cache(): |
|
return ResponseCache(maxsize=2) |
|
|
|
|
|
def test_response_cache_get_miss(cache): |
|
retrieved = cache.get("non_existent_prompt", "model") |
|
assert retrieved is None |
|
|
|
|
|
def test_response_cache_lru_eviction(cache): |
|
|
|
cache.set("p1", "m1", "r1") |
|
cache.set("p2", "m2", "r2") |
|
|
|
|
|
cache.get("p1", "m1") |
|
|
|
|
|
|
|
|
|
cache.set("p3", "m3", "r3") |
|
|
|
assert cache.get("p1", "m1") == "r1" |
|
assert cache.get("p3", "m3") == "r3" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cache.get("p1", "m1") |
|
cache.get("p2", "m2") |
|
cache.get( |
|
"p3", "m3" |
|
) |
|
|
|
|
|
|
|
cache_info = cache._lru_cached_get.cache_info() |
|
assert cache_info.hits >= 1 |
|
assert cache_info.misses >= 1 |
|
assert cache_info.currsize == 2 |
|
|
|
|
|
|
|
|
|
|
|
assert cache.get("p2", "m2") == "r2" |
|
|
|
|
|
|
|
|
|
def test_response_cache_create_key(cache): |
|
prompt = "test prompt" |
|
model = "test_model" |
|
expected_key = hashlib.md5(f"{model}:{prompt}".encode("utf-8")).hexdigest() |
|
assert cache._create_key(prompt, model) == expected_key |
|
|
|
|
|
|
|
|
|
|
|
@patch("ankigen_core.utils.requests.get") |
|
def test_fetch_webpage_text_success_main_tag(mock_requests_get): |
|
mock_response = MagicMock() |
|
mock_response.status_code = 200 |
|
mock_response.text = "<html><body><main> Main content here. </main></body></html>" |
|
mock_requests_get.return_value = mock_response |
|
|
|
text = fetch_webpage_text("http://example.com") |
|
assert "Main content here." in text |
|
mock_requests_get.assert_called_once_with( |
|
"http://example.com", headers=ANY, timeout=15 |
|
) |
|
|
|
|
|
@patch("ankigen_core.utils.requests.get") |
|
def test_fetch_webpage_text_success_article_tag(mock_requests_get): |
|
mock_response = MagicMock() |
|
mock_response.status_code = 200 |
|
mock_response.text = ( |
|
"<html><body><article> Article content. </article></body></html>" |
|
) |
|
mock_requests_get.return_value = mock_response |
|
text = fetch_webpage_text("http://example.com") |
|
assert "Article content." in text |
|
|
|
|
|
@patch("ankigen_core.utils.requests.get") |
|
def test_fetch_webpage_text_success_body_fallback(mock_requests_get): |
|
mock_response = MagicMock() |
|
mock_response.status_code = 200 |
|
mock_response.text = ( |
|
"<html><body> Body content only. <script>junk</script> </body></html>" |
|
) |
|
mock_requests_get.return_value = mock_response |
|
text = fetch_webpage_text("http://example.com") |
|
assert "Body content only." in text |
|
assert "junk" not in text |
|
|
|
|
|
@patch("ankigen_core.utils.requests.get") |
|
def test_fetch_webpage_text_no_meaningful_text(mock_requests_get): |
|
mock_response = MagicMock() |
|
mock_response.status_code = 200 |
|
mock_response.text = "<html><body><main></main></body></html>" |
|
mock_requests_get.return_value = mock_response |
|
text = fetch_webpage_text("http://example.com") |
|
assert text == "" |
|
|
|
|
|
@patch("ankigen_core.utils.requests.get") |
|
def test_fetch_webpage_text_http_error(mock_requests_get): |
|
mock_response = MagicMock() |
|
mock_response.status_code = 404 |
|
|
|
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError( |
|
"Client Error: Not Found for url", response=mock_response |
|
) |
|
mock_requests_get.return_value = mock_response |
|
with pytest.raises( |
|
ConnectionError, match="Could not fetch URL: Client Error: Not Found for url" |
|
): |
|
fetch_webpage_text("http://example.com") |
|
|
|
|
|
@patch("ankigen_core.utils.BeautifulSoup") |
|
@patch("ankigen_core.utils.requests.get") |
|
def test_fetch_webpage_text_bs_init_error(mock_requests_get, mock_beautiful_soup): |
|
mock_response = MagicMock() |
|
mock_response.status_code = 200 |
|
mock_response.text = "<html></html>" |
|
mock_requests_get.return_value = mock_response |
|
mock_beautiful_soup.side_effect = Exception("BS failed") |
|
|
|
with pytest.raises( |
|
RuntimeError, match="Failed to parse HTML content for http://example.com." |
|
): |
|
fetch_webpage_text("http://example.com") |
|
|
|
|
|
@patch("ankigen_core.utils.requests.get") |
|
def test_fetch_webpage_text_lxml_fallback(mock_requests_get): |
|
mock_response = MagicMock() |
|
mock_response.status_code = 200 |
|
mock_response.text = "<html><body><main>LXML Test</main></body></html>" |
|
mock_requests_get.return_value = mock_response |
|
|
|
with patch("ankigen_core.utils.BeautifulSoup") as mock_bs_constructor: |
|
|
|
def bs_side_effect(text, parser_type): |
|
if parser_type == "lxml": |
|
raise ImportError("lxml not found") |
|
elif parser_type == "html.parser": |
|
from bs4 import BeautifulSoup as RealBeautifulSoup |
|
|
|
return RealBeautifulSoup(text, "html.parser") |
|
raise ValueError(f"Unexpected parser: {parser_type}") |
|
|
|
mock_bs_constructor.side_effect = bs_side_effect |
|
|
|
logger_instance = get_logger() |
|
with patch.object(logger_instance, "warning") as mock_logger_warning: |
|
text = fetch_webpage_text("http://example.com/lxmltest") |
|
assert "LXML Test" in text |
|
mock_logger_warning.assert_any_call( |
|
"lxml not found, using html.parser instead." |
|
) |
|
|
|
actual_parsers_used = [ |
|
call[0][1] for call in mock_bs_constructor.call_args_list |
|
] |
|
assert "lxml" in actual_parsers_used |
|
assert "html.parser" in actual_parsers_used |
|
|