|
"""
|
|
🛠️ Helper Utilities for CourseCrafter AI
|
|
|
|
Common utility functions and helpers used throughout the application.
|
|
"""
|
|
|
|
import re
|
|
import json
|
|
import hashlib
|
|
import asyncio
|
|
from typing import Any, Dict, List, Optional, Union, Callable
|
|
from datetime import datetime, timedelta
|
|
import logging
|
|
|
|
|
|
def generate_id(prefix: str = "", length: int = 8) -> str:
|
|
"""Generate a unique ID with optional prefix"""
|
|
timestamp = str(int(datetime.now().timestamp() * 1000))
|
|
hash_obj = hashlib.md5(timestamp.encode())
|
|
unique_id = hash_obj.hexdigest()[:length]
|
|
|
|
if prefix:
|
|
return f"{prefix}-{unique_id}"
|
|
return unique_id
|
|
|
|
|
|
def clean_text(text: str) -> str:
|
|
"""Clean and normalize text content"""
|
|
if not text:
|
|
return ""
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text.strip())
|
|
|
|
|
|
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text)
|
|
|
|
|
|
text = text.replace('"', '"').replace('"', '"')
|
|
text = text.replace(''', "'").replace(''', "'")
|
|
|
|
return text
|
|
|
|
|
|
def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str:
|
|
"""Truncate text to specified length with suffix"""
|
|
if not text or len(text) <= max_length:
|
|
return text
|
|
|
|
|
|
truncated = text[:max_length - len(suffix)]
|
|
last_space = truncated.rfind(' ')
|
|
|
|
if last_space > max_length * 0.7:
|
|
truncated = truncated[:last_space]
|
|
|
|
return truncated + suffix
|
|
|
|
|
|
def extract_keywords(text: str, max_keywords: int = 10) -> List[str]:
|
|
"""Extract keywords from text using simple frequency analysis"""
|
|
if not text:
|
|
return []
|
|
|
|
|
|
clean = re.sub(r'[^\w\s]', ' ', text.lower())
|
|
words = clean.split()
|
|
|
|
|
|
stop_words = {
|
|
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
|
|
'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have',
|
|
'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should',
|
|
'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'it', 'we',
|
|
'they', 'me', 'him', 'her', 'us', 'them', 'my', 'your', 'his', 'its',
|
|
'our', 'their', 'can', 'may', 'might', 'must', 'shall', 'from', 'up',
|
|
'out', 'down', 'off', 'over', 'under', 'again', 'further', 'then',
|
|
'once', 'here', 'there', 'when', 'where', 'why', 'how', 'all', 'any',
|
|
'both', 'each', 'few', 'more', 'most', 'other', 'some', 'such', 'no',
|
|
'nor', 'not', 'only', 'own', 'same', 'so', 'than', 'too', 'very'
|
|
}
|
|
|
|
|
|
filtered_words = [word for word in words if len(word) > 2 and word not in stop_words]
|
|
word_freq = {}
|
|
|
|
for word in filtered_words:
|
|
word_freq[word] = word_freq.get(word, 0) + 1
|
|
|
|
|
|
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
|
|
return [word for word, freq in sorted_words[:max_keywords]]
|
|
|
|
|
|
def format_duration(seconds: int) -> str:
|
|
"""Format duration in seconds to human-readable string"""
|
|
if seconds < 60:
|
|
return f"{seconds} seconds"
|
|
elif seconds < 3600:
|
|
minutes = seconds // 60
|
|
remaining_seconds = seconds % 60
|
|
if remaining_seconds == 0:
|
|
return f"{minutes} minutes"
|
|
return f"{minutes} minutes {remaining_seconds} seconds"
|
|
else:
|
|
hours = seconds // 3600
|
|
remaining_minutes = (seconds % 3600) // 60
|
|
if remaining_minutes == 0:
|
|
return f"{hours} hours"
|
|
return f"{hours} hours {remaining_minutes} minutes"
|
|
|
|
|
|
def estimate_reading_time(text: str, words_per_minute: int = 200) -> int:
|
|
"""Estimate reading time in minutes for given text"""
|
|
if not text:
|
|
return 0
|
|
|
|
word_count = len(text.split())
|
|
minutes = max(1, round(word_count / words_per_minute))
|
|
return minutes
|
|
|
|
|
|
def safe_json_loads(json_str: str, default: Any = None) -> Any:
|
|
"""Safely parse JSON string with fallback"""
|
|
try:
|
|
return json.loads(json_str)
|
|
except (json.JSONDecodeError, TypeError):
|
|
return default
|
|
|
|
|
|
def extract_json_from_response(response_text: str) -> str:
|
|
"""
|
|
Extract JSON from LLM response that might be wrapped in markdown code blocks.
|
|
|
|
Handles cases like:
|
|
- Plain JSON: {"key": "value"}
|
|
- Markdown wrapped: ```json\n{"key": "value"}\n```
|
|
- Mixed content: Some text\n```json\n{"key": "value"}\n```\nMore text
|
|
"""
|
|
if not response_text:
|
|
return ""
|
|
|
|
|
|
import re
|
|
|
|
|
|
json_block_patterns = [
|
|
r'```json\s*\n(.*?)\n```',
|
|
r'```\s*\n(.*?)\n```',
|
|
r'`(.*?)`',
|
|
]
|
|
|
|
for pattern in json_block_patterns:
|
|
matches = re.findall(pattern, response_text, re.DOTALL | re.IGNORECASE)
|
|
for match in matches:
|
|
|
|
try:
|
|
json.loads(match.strip())
|
|
return match.strip()
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
|
|
|
|
first_brace = response_text.find('{')
|
|
last_brace = response_text.rfind('}')
|
|
|
|
if first_brace != -1 and last_brace != -1 and last_brace > first_brace:
|
|
potential_json = response_text[first_brace:last_brace + 1]
|
|
try:
|
|
json.loads(potential_json)
|
|
return potential_json
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
|
|
first_bracket = response_text.find('[')
|
|
last_bracket = response_text.rfind(']')
|
|
|
|
if first_bracket != -1 and last_bracket != -1 and last_bracket > first_bracket:
|
|
potential_json = response_text[first_bracket:last_bracket + 1]
|
|
try:
|
|
json.loads(potential_json)
|
|
return potential_json
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
|
|
return response_text.strip()
|
|
|
|
|
|
def smart_json_loads(response_text: str, default: Any = None) -> Any:
|
|
"""
|
|
Smart JSON parser that handles markdown-wrapped JSON and other common LLM response formats.
|
|
|
|
This function:
|
|
1. Extracts JSON from markdown code blocks
|
|
2. Handles mixed content responses
|
|
3. Provides fallback for malformed JSON
|
|
4. Logs parsing attempts for debugging
|
|
"""
|
|
if not response_text:
|
|
return default
|
|
|
|
|
|
json_text = extract_json_from_response(response_text)
|
|
|
|
|
|
try:
|
|
result = json.loads(json_text)
|
|
return result
|
|
except json.JSONDecodeError as e:
|
|
|
|
print(f"🔍 JSON parsing failed: {e}")
|
|
print(f"📝 Original response length: {len(response_text)} chars")
|
|
print(f"📝 Extracted JSON length: {len(json_text)} chars")
|
|
print(f"📝 First 200 chars of original: {response_text[:200]}...")
|
|
print(f"📝 First 200 chars of extracted: {json_text[:200]}...")
|
|
return default
|
|
except Exception as e:
|
|
print(f"❌ Unexpected error in JSON parsing: {e}")
|
|
return default
|
|
|
|
|
|
def safe_json_dumps(obj: Any, default: Any = None) -> str:
|
|
"""Safely serialize object to JSON string"""
|
|
try:
|
|
return json.dumps(obj, default=str, ensure_ascii=False, indent=2)
|
|
except (TypeError, ValueError):
|
|
return json.dumps(default or {})
|
|
|
|
|
|
def merge_dicts(*dicts: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Merge multiple dictionaries, with later ones taking precedence"""
|
|
result = {}
|
|
for d in dicts:
|
|
if isinstance(d, dict):
|
|
result.update(d)
|
|
return result
|
|
|
|
|
|
def flatten_list(nested_list: List[Any]) -> List[Any]:
|
|
"""Flatten a nested list structure"""
|
|
result = []
|
|
for item in nested_list:
|
|
if isinstance(item, list):
|
|
result.extend(flatten_list(item))
|
|
else:
|
|
result.append(item)
|
|
return result
|
|
|
|
|
|
def chunk_list(lst: List[Any], chunk_size: int) -> List[List[Any]]:
|
|
"""Split a list into chunks of specified size"""
|
|
return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)]
|
|
|
|
|
|
def deduplicate_list(lst: List[Any], key_func: Optional[Callable] = None) -> List[Any]:
|
|
"""Remove duplicates from list while preserving order"""
|
|
if key_func is None:
|
|
seen = set()
|
|
result = []
|
|
for item in lst:
|
|
if item not in seen:
|
|
seen.add(item)
|
|
result.append(item)
|
|
return result
|
|
else:
|
|
seen = set()
|
|
result = []
|
|
for item in lst:
|
|
key = key_func(item)
|
|
if key not in seen:
|
|
seen.add(key)
|
|
result.append(item)
|
|
return result
|
|
|
|
|
|
def retry_async(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0):
|
|
"""Decorator for retrying async functions with exponential backoff"""
|
|
def decorator(func: Callable) -> Callable:
|
|
async def wrapper(*args, **kwargs):
|
|
last_exception = None
|
|
current_delay = delay
|
|
|
|
for attempt in range(max_attempts):
|
|
try:
|
|
return await func(*args, **kwargs)
|
|
except Exception as e:
|
|
last_exception = e
|
|
if attempt < max_attempts - 1:
|
|
logging.warning(f"Attempt {attempt + 1} failed: {str(e)}. Retrying in {current_delay}s...")
|
|
await asyncio.sleep(current_delay)
|
|
current_delay *= backoff
|
|
else:
|
|
logging.error(f"All {max_attempts} attempts failed. Last error: {str(e)}")
|
|
|
|
raise last_exception
|
|
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def validate_email(email: str) -> bool:
|
|
"""Validate email address format"""
|
|
if not email or not isinstance(email, str):
|
|
return False
|
|
|
|
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
|
return bool(re.match(pattern, email))
|
|
|
|
|
|
def sanitize_filename(filename: str, max_length: int = 100) -> str:
|
|
"""Sanitize filename for cross-platform compatibility"""
|
|
if not filename:
|
|
return "untitled"
|
|
|
|
|
|
filename = re.sub(r'[<>:"/\\|?*]', '_', filename)
|
|
|
|
|
|
filename = filename.strip('. ')
|
|
|
|
|
|
if len(filename) > max_length:
|
|
name, ext = filename.rsplit('.', 1) if '.' in filename else (filename, '')
|
|
max_name_length = max_length - len(ext) - 1 if ext else max_length
|
|
filename = name[:max_name_length] + ('.' + ext if ext else '')
|
|
|
|
|
|
if not filename:
|
|
filename = "untitled"
|
|
|
|
return filename
|
|
|
|
|
|
def calculate_similarity(text1: str, text2: str) -> float:
|
|
"""Calculate simple text similarity using Jaccard similarity"""
|
|
if not text1 or not text2:
|
|
return 0.0
|
|
|
|
|
|
words1 = set(text1.lower().split())
|
|
words2 = set(text2.lower().split())
|
|
|
|
|
|
intersection = len(words1.intersection(words2))
|
|
union = len(words1.union(words2))
|
|
|
|
return intersection / union if union > 0 else 0.0
|
|
|
|
|
|
def format_file_size(size_bytes: int) -> str:
|
|
"""Format file size in bytes to human-readable string"""
|
|
if size_bytes == 0:
|
|
return "0 B"
|
|
|
|
size_names = ["B", "KB", "MB", "GB", "TB"]
|
|
i = 0
|
|
size = float(size_bytes)
|
|
|
|
while size >= 1024.0 and i < len(size_names) - 1:
|
|
size /= 1024.0
|
|
i += 1
|
|
|
|
return f"{size:.1f} {size_names[i]}"
|
|
|
|
|
|
def create_progress_callback(total_steps: int, callback_func: Optional[Callable] = None):
|
|
"""Create a progress tracking callback function"""
|
|
current_step = 0
|
|
|
|
def update_progress(step_name: str = "", increment: int = 1):
|
|
nonlocal current_step
|
|
current_step += increment
|
|
progress = min(current_step / total_steps, 1.0)
|
|
|
|
if callback_func:
|
|
callback_func(progress, step_name, current_step, total_steps)
|
|
|
|
return progress
|
|
|
|
return update_progress
|
|
|
|
|
|
def debounce(wait_time: float):
|
|
"""Decorator to debounce function calls"""
|
|
def decorator(func: Callable) -> Callable:
|
|
last_called = [0.0]
|
|
|
|
async def wrapper(*args, **kwargs):
|
|
now = asyncio.get_event_loop().time()
|
|
if now - last_called[0] >= wait_time:
|
|
last_called[0] = now
|
|
return await func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
class Timer:
|
|
"""Simple timer context manager"""
|
|
|
|
def __init__(self, name: str = "Operation"):
|
|
self.name = name
|
|
self.start_time = None
|
|
self.end_time = None
|
|
|
|
def __enter__(self):
|
|
self.start_time = datetime.now()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.end_time = datetime.now()
|
|
duration = self.end_time - self.start_time
|
|
print(f"{self.name} completed in {duration.total_seconds():.2f} seconds")
|
|
|
|
@property
|
|
def duration(self) -> Optional[timedelta]:
|
|
if self.start_time and self.end_time:
|
|
return self.end_time - self.start_time
|
|
return None
|
|
|
|
|
|
class RateLimiter:
|
|
"""Simple rate limiter for API calls"""
|
|
|
|
def __init__(self, max_calls: int, time_window: float):
|
|
self.max_calls = max_calls
|
|
self.time_window = time_window
|
|
self.calls = []
|
|
|
|
async def acquire(self):
|
|
"""Wait if necessary to respect rate limits"""
|
|
now = datetime.now()
|
|
|
|
|
|
self.calls = [call_time for call_time in self.calls
|
|
if (now - call_time).total_seconds() < self.time_window]
|
|
|
|
|
|
if len(self.calls) >= self.max_calls:
|
|
oldest_call = min(self.calls)
|
|
wait_time = self.time_window - (now - oldest_call).total_seconds()
|
|
if wait_time > 0:
|
|
await asyncio.sleep(wait_time)
|
|
|
|
|
|
self.calls.append(now)
|
|
|
|
|
|
def get_nested_value(data: Dict[str, Any], key_path: str, default: Any = None) -> Any:
|
|
"""Get nested dictionary value using dot notation"""
|
|
keys = key_path.split('.')
|
|
current = data
|
|
|
|
try:
|
|
for key in keys:
|
|
current = current[key]
|
|
return current
|
|
except (KeyError, TypeError):
|
|
return default
|
|
|
|
|
|
def set_nested_value(data: Dict[str, Any], key_path: str, value: Any) -> None:
|
|
"""Set nested dictionary value using dot notation"""
|
|
keys = key_path.split('.')
|
|
current = data
|
|
|
|
for key in keys[:-1]:
|
|
if key not in current or not isinstance(current[key], dict):
|
|
current[key] = {}
|
|
current = current[key]
|
|
|
|
current[keys[-1]] = value |