| """
|
| Utility functions for TranscriptorAI
|
| """
|
|
|
| import os
|
| import json
|
| import hashlib
|
| import pickle
|
| from datetime import datetime
|
| from typing import Any, Dict, List, Optional
|
| from pathlib import Path
|
| import logging
|
|
|
|
|
|
|
|
|
|
|
| def setup_logging(log_file: str = "transcript_analysis.log", level: str = "INFO"):
|
| """Setup logging configuration"""
|
| logging.basicConfig(
|
| level=getattr(logging, level.upper()),
|
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
| handlers=[
|
| logging.FileHandler(log_file),
|
| logging.StreamHandler()
|
| ]
|
| )
|
| return logging.getLogger(__name__)
|
|
|
| logger = setup_logging()
|
|
|
|
|
|
|
|
|
|
|
| def get_file_hash(file_path: str) -> str:
|
| """Generate hash for a file for caching purposes"""
|
| hasher = hashlib.md5()
|
| with open(file_path, 'rb') as f:
|
| buf = f.read(65536)
|
| while len(buf) > 0:
|
| hasher.update(buf)
|
| buf = f.read(65536)
|
| return hasher.hexdigest()
|
|
|
|
|
| def cache_result(key: str, data: Any, cache_dir: str = "./.cache") -> bool:
|
| """Cache a result to disk"""
|
| try:
|
| os.makedirs(cache_dir, exist_ok=True)
|
| cache_file = os.path.join(cache_dir, f"{key}.pkl")
|
|
|
| with open(cache_file, 'wb') as f:
|
| pickle.dump(data, f)
|
|
|
| logger.debug(f"Cached result for key: {key}")
|
| return True
|
| except Exception as e:
|
| logger.error(f"Failed to cache result: {e}")
|
| return False
|
|
|
|
|
| def load_cached_result(key: str, cache_dir: str = "./.cache") -> Optional[Any]:
|
| """Load a cached result from disk"""
|
| try:
|
| cache_file = os.path.join(cache_dir, f"{key}.pkl")
|
|
|
| if not os.path.exists(cache_file):
|
| return None
|
|
|
|
|
| file_age = datetime.now().timestamp() - os.path.getmtime(cache_file)
|
| if file_age > 7 * 24 * 3600:
|
| logger.debug(f"Cache expired for key: {key}")
|
| return None
|
|
|
| with open(cache_file, 'rb') as f:
|
| data = pickle.load(f)
|
|
|
| logger.debug(f"Loaded cached result for key: {key}")
|
| return data
|
| except Exception as e:
|
| logger.error(f"Failed to load cached result: {e}")
|
| return None
|
|
|
|
|
| def clear_cache(cache_dir: str = "./.cache"):
|
| """Clear all cached files"""
|
| try:
|
| if os.path.exists(cache_dir):
|
| for file in os.listdir(cache_dir):
|
| file_path = os.path.join(cache_dir, file)
|
| os.remove(file_path)
|
| logger.info(f"Cleared cache directory: {cache_dir}")
|
| except Exception as e:
|
| logger.error(f"Failed to clear cache: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
| def ensure_directory(path: str) -> str:
|
| """Ensure directory exists, create if not"""
|
| os.makedirs(path, exist_ok=True)
|
| return path
|
|
|
|
|
| def get_unique_filename(base_path: str, extension: str = "") -> str:
|
| """Generate unique filename by adding timestamp"""
|
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
| base = os.path.splitext(base_path)[0]
|
| ext = extension or os.path.splitext(base_path)[1]
|
| return f"{base}_{timestamp}{ext}"
|
|
|
|
|
| def get_file_size_mb(file_path: str) -> float:
|
| """Get file size in MB"""
|
| return os.path.getsize(file_path) / (1024 * 1024)
|
|
|
|
|
| def validate_file(file_path: str, max_size_mb: int = 50, allowed_extensions: List[str] = None) -> tuple:
|
| """Validate file exists, size, and extension"""
|
| if allowed_extensions is None:
|
| allowed_extensions = ['.docx', '.pdf']
|
|
|
| if not os.path.exists(file_path):
|
| return False, "File does not exist"
|
|
|
| if get_file_size_mb(file_path) > max_size_mb:
|
| return False, f"File exceeds {max_size_mb}MB limit"
|
|
|
| ext = os.path.splitext(file_path)[1].lower()
|
| if ext not in allowed_extensions:
|
| return False, f"File type {ext} not supported"
|
|
|
| return True, "Valid"
|
|
|
|
|
|
|
|
|
|
|
|
|
| def sanitize_text(text: str) -> str:
|
| """Sanitize text for safe processing"""
|
|
|
| text = text.replace('\x00', '')
|
|
|
|
|
| text = ' '.join(text.split())
|
|
|
| return text.strip()
|
|
|
|
|
| def truncate_text(text: str, max_length: int, suffix: str = "...") -> str:
|
| """Truncate text to max length with suffix"""
|
| if len(text) <= max_length:
|
| return text
|
| return text[:max_length - len(suffix)] + suffix
|
|
|
|
|
| def extract_keywords(text: str, top_n: int = 10) -> List[str]:
|
| """Extract top N keywords from text (simple frequency-based)"""
|
| from collections import Counter
|
| import re
|
|
|
|
|
| words = re.findall(r'\b[a-z]{3,}\b', text.lower())
|
|
|
|
|
| stop_words = {
|
| 'the', 'and', 'for', 'are', 'but', 'not', 'you', 'with',
|
| 'this', 'that', 'from', 'they', 'have', 'has', 'was', 'were'
|
| }
|
|
|
| words = [w for w in words if w not in stop_words]
|
|
|
|
|
| counter = Counter(words)
|
| return [word for word, count in counter.most_common(top_n)]
|
|
|
|
|
|
|
|
|
|
|
|
|
| def calculate_statistics(values: List[float]) -> Dict[str, float]:
|
| """Calculate basic statistics for a list of values"""
|
| if not values:
|
| return {}
|
|
|
| import numpy as np
|
|
|
| return {
|
| "mean": np.mean(values),
|
| "median": np.median(values),
|
| "std": np.std(values),
|
| "min": np.min(values),
|
| "max": np.max(values),
|
| "count": len(values)
|
| }
|
|
|
|
|
| def calculate_percentile(values: List[float], percentile: int) -> float:
|
| """Calculate percentile of values"""
|
| import numpy as np
|
| return np.percentile(values, percentile)
|
|
|
|
|
|
|
|
|
|
|
|
|
| def save_json(data: Dict, filepath: str, pretty: bool = True) -> bool:
|
| """Save data as JSON file"""
|
| try:
|
| with open(filepath, 'w', encoding='utf-8') as f:
|
| if pretty:
|
| json.dump(data, f, indent=2, ensure_ascii=False)
|
| else:
|
| json.dump(data, f, ensure_ascii=False)
|
| logger.debug(f"Saved JSON to: {filepath}")
|
| return True
|
| except Exception as e:
|
| logger.error(f"Failed to save JSON: {e}")
|
| return False
|
|
|
|
|
| def load_json(filepath: str) -> Optional[Dict]:
|
| """Load JSON file"""
|
| try:
|
| with open(filepath, 'r', encoding='utf-8') as f:
|
| data = json.load(f)
|
| logger.debug(f"Loaded JSON from: {filepath}")
|
| return data
|
| except Exception as e:
|
| logger.error(f"Failed to load JSON: {e}")
|
| return None
|
|
|
|
|
|
|
|
|
|
|
|
|
| class ProgressTracker:
|
| """Simple progress tracker for long operations"""
|
|
|
| def __init__(self, total: int, description: str = "Processing"):
|
| self.total = total
|
| self.current = 0
|
| self.description = description
|
| self.start_time = datetime.now()
|
|
|
| def update(self, n: int = 1):
|
| """Update progress"""
|
| self.current = min(self.current + n, self.total)
|
| self._print_progress()
|
|
|
| def _print_progress(self):
|
| """Print progress bar"""
|
| percentage = (self.current / self.total) * 100 if self.total > 0 else 0
|
| bar_length = 40
|
| filled = int(bar_length * self.current / self.total) if self.total > 0 else 0
|
| bar = '█' * filled + '-' * (bar_length - filled)
|
|
|
| elapsed = (datetime.now() - self.start_time).total_seconds()
|
| eta = (elapsed / self.current * (self.total - self.current)) if self.current > 0 else 0
|
|
|
| print(f'\r{self.description}: |{bar}| {percentage:.1f}% ({self.current}/{self.total}) ETA: {eta:.0f}s', end='')
|
|
|
| if self.current >= self.total:
|
| print()
|
|
|
|
|
|
|
|
|
|
|
|
|
| def safe_execute(func, *args, default=None, error_msg="Operation failed", **kwargs):
|
| """Safely execute a function with error handling"""
|
| try:
|
| return func(*args, **kwargs)
|
| except Exception as e:
|
| logger.error(f"{error_msg}: {e}")
|
| return default
|
|
|
|
|
|
|
|
|
|
|
|
|
| def calculate_similarity(text1: str, text2: str) -> float:
|
| """Calculate simple similarity score between two texts"""
|
| words1 = set(text1.lower().split())
|
| words2 = set(text2.lower().split())
|
|
|
| if not words1 or not words2:
|
| return 0.0
|
|
|
| intersection = words1.intersection(words2)
|
| union = words1.union(words2)
|
|
|
| return len(intersection) / len(union) if union else 0.0
|
|
|
|
|
|
|
|
|
|
|
|
|
| def batch_items(items: List, batch_size: int) -> List[List]:
|
| """Split list into batches"""
|
| return [items[i:i + batch_size] for i in range(0, len(items), batch_size)]
|
|
|
|
|
| def parallel_process(func, items: List, max_workers: int = 4):
|
| """Process items in parallel"""
|
| from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
| results = []
|
| with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
| futures = [executor.submit(func, item) for item in items]
|
| for future in as_completed(futures):
|
| try:
|
| result = future.result()
|
| results.append(result)
|
| except Exception as e:
|
| logger.error(f"Parallel processing error: {e}")
|
| results.append(None)
|
|
|
| return results
|
|
|
|
|
|
|
|
|
|
|
|
|
| def export_to_excel(data: Dict[str, List[Dict]], filepath: str) -> bool:
|
| """Export multiple dataframes to Excel with sheets"""
|
| try:
|
| import pandas as pd
|
|
|
| with pd.ExcelWriter(filepath, engine='openpyxl') as writer:
|
| for sheet_name, rows in data.items():
|
| df = pd.DataFrame(rows)
|
| df.to_excel(writer, sheet_name=sheet_name, index=False)
|
|
|
| logger.info(f"Exported to Excel: {filepath}")
|
| return True
|
| except Exception as e:
|
| logger.error(f"Failed to export to Excel: {e}")
|
| return False
|
|
|
|
|
|
|
|
|
|
|
|
|
| def is_valid_email(email: str) -> bool:
|
| """Basic email validation"""
|
| import re
|
| pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
|
| return bool(re.match(pattern, email))
|
|
|
|
|
| def is_valid_url(url: str) -> bool:
|
| """Basic URL validation"""
|
| import re
|
| pattern = r'^https?://[^\s<>"]+$'
|
| return bool(re.match(pattern, url))
|
|
|
|
|
|
|
|
|
|
|
|
|
| if __name__ == "__main__":
|
|
|
| print("Testing utilities...")
|
|
|
|
|
| test_dir = ensure_directory("./test_output")
|
| print(f"Created test directory: {test_dir}")
|
|
|
|
|
| test_data = {"key": "value", "number": 42}
|
| save_json(test_data, "./test_output/test.json")
|
| loaded = load_json("./test_output/test.json")
|
| assert loaded == test_data, "JSON save/load failed"
|
| print("✓ JSON operations work")
|
|
|
|
|
| test_values = [1, 2, 3, 4, 5]
|
| stats = calculate_statistics(test_values)
|
| print(f"✓ Statistics: {stats}")
|
|
|
|
|
| tracker = ProgressTracker(10, "Test")
|
| for i in range(10):
|
| import time
|
| time.sleep(0.1)
|
| tracker.update()
|
| print("✓ Progress tracker works")
|
|
|
| print("\n✓ All utility tests passed!") |