| | |
| | """ |
| | Data Persistence Verification Script |
| | |
| | This script verifies data persistence across the Knowledge Assistant RAG application, |
| | including database integrity, vector store consistency, and backup validation. |
| | """ |
| |
|
| | import os |
| | import sys |
| | import json |
| | import sqlite3 |
| | import asyncio |
| | import logging |
| | import argparse |
| | from datetime import datetime, timedelta |
| | from pathlib import Path |
| | from typing import Dict, List, Any, Optional |
| |
|
| | |
| | sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src')) |
| |
|
| | from core.database import User, DocumentMetadata |
| | from core.vector_store import get_qdrant_client |
| | from core.backup import backup_manager |
| |
|
| | |
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class DataPersistenceVerifier: |
| | """Comprehensive data persistence verification system""" |
| | |
| | def __init__(self, database_path: str = "knowledge_assistant.db"): |
| | self.database_path = database_path |
| | self.verification_results = {} |
| | |
| | def verify_database_integrity(self) -> Dict[str, Any]: |
| | """Verify SQLite database integrity""" |
| | logger.info("Verifying database integrity...") |
| | |
| | result = { |
| | "test": "database_integrity", |
| | "status": "unknown", |
| | "details": {}, |
| | "errors": [] |
| | } |
| | |
| | try: |
| | if not os.path.exists(self.database_path): |
| | result["status"] = "failed" |
| | result["errors"].append(f"Database file not found: {self.database_path}") |
| | return result |
| | |
| | |
| | conn = sqlite3.connect(self.database_path) |
| | cursor = conn.cursor() |
| | |
| | |
| | cursor.execute("PRAGMA integrity_check") |
| | integrity_result = cursor.fetchone()[0] |
| | |
| | if integrity_result == "ok": |
| | result["details"]["integrity_check"] = "passed" |
| | else: |
| | result["status"] = "failed" |
| | result["errors"].append(f"Database integrity check failed: {integrity_result}") |
| | conn.close() |
| | return result |
| | |
| | |
| | cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") |
| | tables = [row[0] for row in cursor.fetchall()] |
| | |
| | expected_tables = ["users", "documents", "alembic_version"] |
| | missing_tables = [table for table in expected_tables if table not in tables] |
| | |
| | if missing_tables: |
| | result["status"] = "failed" |
| | result["errors"].append(f"Missing tables: {missing_tables}") |
| | conn.close() |
| | return result |
| | |
| | result["details"]["tables"] = tables |
| | |
| | |
| | cursor.execute("SELECT COUNT(*) FROM users") |
| | user_count = cursor.fetchone()[0] |
| | |
| | cursor.execute("SELECT COUNT(*) FROM documents") |
| | doc_count = cursor.fetchone()[0] |
| | |
| | result["details"]["user_count"] = user_count |
| | result["details"]["document_count"] = doc_count |
| | |
| | |
| | cursor.execute(""" |
| | SELECT COUNT(*) FROM documents d |
| | LEFT JOIN users u ON d.user_id = u.id |
| | WHERE u.id IS NULL |
| | """) |
| | orphaned_docs = cursor.fetchone()[0] |
| | |
| | if orphaned_docs > 0: |
| | result["errors"].append(f"Found {orphaned_docs} orphaned documents") |
| | |
| | result["details"]["orphaned_documents"] = orphaned_docs |
| | |
| | |
| | cursor.execute(""" |
| | SELECT file_hash, COUNT(*) as count |
| | FROM documents |
| | WHERE file_hash IS NOT NULL |
| | GROUP BY file_hash |
| | HAVING COUNT(*) > 1 |
| | """) |
| | duplicate_hashes = cursor.fetchall() |
| | |
| | if duplicate_hashes: |
| | result["details"]["duplicate_file_hashes"] = len(duplicate_hashes) |
| | result["errors"].append(f"Found {len(duplicate_hashes)} duplicate file hashes") |
| | else: |
| | result["details"]["duplicate_file_hashes"] = 0 |
| | |
| | conn.close() |
| | |
| | if not result["errors"]: |
| | result["status"] = "passed" |
| | else: |
| | result["status"] = "warning" |
| | |
| | logger.info(f"Database integrity check: {result['status']}") |
| | |
| | except Exception as e: |
| | result["status"] = "failed" |
| | result["errors"].append(f"Database verification failed: {str(e)}") |
| | logger.error(f"Database verification error: {str(e)}") |
| | |
| | return result |
| | |
| | async def verify_vector_store_consistency(self) -> Dict[str, Any]: |
| | """Verify Qdrant vector store consistency""" |
| | logger.info("Verifying vector store consistency...") |
| | |
| | result = { |
| | "test": "vector_store_consistency", |
| | "status": "unknown", |
| | "details": {}, |
| | "errors": [] |
| | } |
| | |
| | try: |
| | client = get_qdrant_client() |
| | |
| | |
| | collections_info = client.get_collections() |
| | collections = collections_info.collections |
| | |
| | result["details"]["total_collections"] = len(collections) |
| | result["details"]["collections"] = [] |
| | |
| | total_points = 0 |
| | |
| | for collection in collections: |
| | collection_name = collection.name |
| | |
| | try: |
| | |
| | collection_info = client.get_collection(collection_name) |
| | points_count = collection_info.points_count |
| | |
| | |
| | sample_points, _ = client.scroll( |
| | collection_name=collection_name, |
| | limit=10, |
| | with_payload=True, |
| | with_vectors=True |
| | ) |
| | |
| | collection_details = { |
| | "name": collection_name, |
| | "points_count": points_count, |
| | "sample_points": len(sample_points), |
| | "status": "healthy" |
| | } |
| | |
| | |
| | if sample_points: |
| | first_point = sample_points[0] |
| | |
| | |
| | if not hasattr(first_point, 'payload') or not first_point.payload: |
| | collection_details["status"] = "warning" |
| | result["errors"].append(f"Collection {collection_name}: Points missing payload") |
| | |
| | if not hasattr(first_point, 'vector') or not first_point.vector: |
| | collection_details["status"] = "warning" |
| | result["errors"].append(f"Collection {collection_name}: Points missing vectors") |
| | |
| | |
| | if collection_name.startswith("user_") and first_point.payload: |
| | required_fields = ["text", "source", "user_id"] |
| | missing_fields = [field for field in required_fields if field not in first_point.payload] |
| | |
| | if missing_fields: |
| | collection_details["status"] = "warning" |
| | result["errors"].append(f"Collection {collection_name}: Missing payload fields: {missing_fields}") |
| | |
| | result["details"]["collections"].append(collection_details) |
| | total_points += points_count |
| | |
| | except Exception as e: |
| | collection_details = { |
| | "name": collection_name, |
| | "status": "error", |
| | "error": str(e) |
| | } |
| | result["details"]["collections"].append(collection_details) |
| | result["errors"].append(f"Collection {collection_name}: {str(e)}") |
| | |
| | result["details"]["total_points"] = total_points |
| | |
| | |
| | if os.path.exists(self.database_path): |
| | conn = sqlite3.connect(self.database_path) |
| | cursor = conn.cursor() |
| | |
| | |
| | cursor.execute("SELECT COUNT(DISTINCT user_id) FROM documents") |
| | db_users_with_docs = cursor.fetchone()[0] |
| | |
| | |
| | user_collections = [c for c in collections if c.name.startswith("user_")] |
| | qdrant_user_collections = len(user_collections) |
| | |
| | result["details"]["db_users_with_documents"] = db_users_with_docs |
| | result["details"]["qdrant_user_collections"] = qdrant_user_collections |
| | |
| | |
| | if db_users_with_docs != qdrant_user_collections: |
| | result["errors"].append( |
| | f"Mismatch between database users with documents ({db_users_with_docs}) " |
| | f"and Qdrant user collections ({qdrant_user_collections})" |
| | ) |
| | |
| | conn.close() |
| | |
| | if not result["errors"]: |
| | result["status"] = "passed" |
| | else: |
| | result["status"] = "warning" |
| | |
| | logger.info(f"Vector store consistency check: {result['status']}") |
| | |
| | except Exception as e: |
| | result["status"] = "failed" |
| | result["errors"].append(f"Vector store verification failed: {str(e)}") |
| | logger.error(f"Vector store verification error: {str(e)}") |
| | |
| | return result |
| | |
| | async def verify_backup_integrity(self) -> Dict[str, Any]: |
| | """Verify backup integrity and completeness""" |
| | logger.info("Verifying backup integrity...") |
| | |
| | result = { |
| | "test": "backup_integrity", |
| | "status": "unknown", |
| | "details": {}, |
| | "errors": [] |
| | } |
| | |
| | try: |
| | |
| | backups = await backup_manager.list_backups() |
| | |
| | result["details"]["total_backups"] = len(backups) |
| | result["details"]["backups"] = [] |
| | |
| | if not backups: |
| | result["status"] = "warning" |
| | result["errors"].append("No backups found") |
| | return result |
| | |
| | |
| | verified_count = 0 |
| | failed_count = 0 |
| | |
| | for backup in backups: |
| | backup_details = { |
| | "backup_id": backup.backup_id, |
| | "timestamp": backup.timestamp.isoformat(), |
| | "file_size_bytes": backup.file_size_bytes, |
| | "status": backup.status |
| | } |
| | |
| | try: |
| | |
| | is_valid = await backup_manager.verify_backup_integrity(backup.backup_id) |
| | |
| | if is_valid: |
| | backup_details["integrity"] = "valid" |
| | verified_count += 1 |
| | else: |
| | backup_details["integrity"] = "invalid" |
| | failed_count += 1 |
| | result["errors"].append(f"Backup {backup.backup_id} failed integrity check") |
| | |
| | except Exception as e: |
| | backup_details["integrity"] = "error" |
| | backup_details["error"] = str(e) |
| | failed_count += 1 |
| | result["errors"].append(f"Backup {backup.backup_id} verification error: {str(e)}") |
| | |
| | result["details"]["backups"].append(backup_details) |
| | |
| | result["details"]["verified_backups"] = verified_count |
| | result["details"]["failed_backups"] = failed_count |
| | |
| | |
| | if backups: |
| | latest_backup = max(backups, key=lambda b: b.timestamp) |
| | days_since_backup = (datetime.utcnow() - latest_backup.timestamp).days |
| | |
| | result["details"]["days_since_latest_backup"] = days_since_backup |
| | |
| | if days_since_backup > 7: |
| | result["errors"].append(f"Latest backup is {days_since_backup} days old") |
| | |
| | if failed_count == 0 and not result["errors"]: |
| | result["status"] = "passed" |
| | elif failed_count < len(backups): |
| | result["status"] = "warning" |
| | else: |
| | result["status"] = "failed" |
| | |
| | logger.info(f"Backup integrity check: {result['status']}") |
| | |
| | except Exception as e: |
| | result["status"] = "failed" |
| | result["errors"].append(f"Backup verification failed: {str(e)}") |
| | logger.error(f"Backup verification error: {str(e)}") |
| | |
| | return result |
| | |
| | def verify_file_system_integrity(self) -> Dict[str, Any]: |
| | """Verify file system integrity and permissions""" |
| | logger.info("Verifying file system integrity...") |
| | |
| | result = { |
| | "test": "file_system_integrity", |
| | "status": "unknown", |
| | "details": {}, |
| | "errors": [] |
| | } |
| | |
| | try: |
| | |
| | critical_paths = [ |
| | {"path": ".", "type": "directory", "name": "application_root"}, |
| | {"path": self.database_path, "type": "file", "name": "database"}, |
| | {"path": "uploads", "type": "directory", "name": "uploads_directory"}, |
| | {"path": "backups", "type": "directory", "name": "backups_directory"}, |
| | {"path": "src", "type": "directory", "name": "source_code"}, |
| | ] |
| | |
| | result["details"]["path_checks"] = [] |
| | |
| | for path_info in critical_paths: |
| | path = path_info["path"] |
| | path_type = path_info["type"] |
| | name = path_info["name"] |
| | |
| | check_result = { |
| | "name": name, |
| | "path": path, |
| | "type": path_type, |
| | "exists": False, |
| | "readable": False, |
| | "writable": False |
| | } |
| | |
| | if os.path.exists(path): |
| | check_result["exists"] = True |
| | |
| | |
| | check_result["readable"] = os.access(path, os.R_OK) |
| | check_result["writable"] = os.access(path, os.W_OK) |
| | |
| | if path_type == "directory": |
| | check_result["executable"] = os.access(path, os.X_OK) |
| | |
| | |
| | if path_type == "file": |
| | check_result["size_bytes"] = os.path.getsize(path) |
| | elif path_type == "directory": |
| | try: |
| | |
| | file_count = len([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))]) |
| | check_result["file_count"] = file_count |
| | except PermissionError: |
| | check_result["file_count"] = "permission_denied" |
| | else: |
| | result["errors"].append(f"Critical path missing: {path} ({name})") |
| | |
| | result["details"]["path_checks"].append(check_result) |
| | |
| | |
| | try: |
| | import shutil |
| | total, used, free = shutil.disk_usage(".") |
| | |
| | result["details"]["disk_usage"] = { |
| | "total_bytes": total, |
| | "used_bytes": used, |
| | "free_bytes": free, |
| | "free_gb": free / (1024**3), |
| | "usage_percent": (used / total) * 100 |
| | } |
| | |
| | |
| | free_gb = free / (1024**3) |
| | if free_gb < 1.0: |
| | result["errors"].append(f"Critical: Only {free_gb:.2f} GB free disk space") |
| | elif free_gb < 5.0: |
| | result["errors"].append(f"Warning: Only {free_gb:.2f} GB free disk space") |
| | |
| | except Exception as e: |
| | result["errors"].append(f"Could not check disk usage: {str(e)}") |
| | |
| | if not result["errors"]: |
| | result["status"] = "passed" |
| | else: |
| | result["status"] = "warning" |
| | |
| | logger.info(f"File system integrity check: {result['status']}") |
| | |
| | except Exception as e: |
| | result["status"] = "failed" |
| | result["errors"].append(f"File system verification failed: {str(e)}") |
| | logger.error(f"File system verification error: {str(e)}") |
| | |
| | return result |
| | |
| | async def run_comprehensive_verification(self) -> Dict[str, Any]: |
| | """Run all verification tests""" |
| | logger.info("Starting comprehensive data persistence verification...") |
| | |
| | start_time = datetime.utcnow() |
| | |
| | |
| | tests = [ |
| | self.verify_database_integrity(), |
| | await self.verify_vector_store_consistency(), |
| | await self.verify_backup_integrity(), |
| | self.verify_file_system_integrity() |
| | ] |
| | |
| | |
| | verification_results = { |
| | "timestamp": start_time.isoformat(), |
| | "duration_seconds": (datetime.utcnow() - start_time).total_seconds(), |
| | "overall_status": "unknown", |
| | "tests": tests, |
| | "summary": { |
| | "total_tests": len(tests), |
| | "passed": 0, |
| | "warnings": 0, |
| | "failed": 0 |
| | } |
| | } |
| | |
| | |
| | for test in tests: |
| | if test["status"] == "passed": |
| | verification_results["summary"]["passed"] += 1 |
| | elif test["status"] == "warning": |
| | verification_results["summary"]["warnings"] += 1 |
| | elif test["status"] == "failed": |
| | verification_results["summary"]["failed"] += 1 |
| | |
| | |
| | if verification_results["summary"]["failed"] > 0: |
| | verification_results["overall_status"] = "failed" |
| | elif verification_results["summary"]["warnings"] > 0: |
| | verification_results["overall_status"] = "warning" |
| | else: |
| | verification_results["overall_status"] = "passed" |
| | |
| | logger.info(f"Verification completed: {verification_results['overall_status']}") |
| | |
| | return verification_results |
| |
|
| |
|
| | def main(): |
| | """Main function""" |
| | parser = argparse.ArgumentParser(description="Data Persistence Verification Tool") |
| | parser.add_argument("--database", default="knowledge_assistant.db", help="Database file path") |
| | parser.add_argument("--output", help="Output file for results (JSON)") |
| | parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") |
| | |
| | args = parser.parse_args() |
| | |
| | if args.verbose: |
| | logging.getLogger().setLevel(logging.DEBUG) |
| | |
| | |
| | verifier = DataPersistenceVerifier(args.database) |
| | |
| | |
| | try: |
| | results = asyncio.run(verifier.run_comprehensive_verification()) |
| | |
| | |
| | if args.output: |
| | with open(args.output, 'w') as f: |
| | json.dump(results, f, indent=2) |
| | print(f"Results saved to: {args.output}") |
| | else: |
| | print(json.dumps(results, indent=2)) |
| | |
| | |
| | if results["overall_status"] == "failed": |
| | sys.exit(1) |
| | elif results["overall_status"] == "warning": |
| | sys.exit(2) |
| | else: |
| | sys.exit(0) |
| | |
| | except Exception as e: |
| | logger.error(f"Verification failed: {str(e)}") |
| | sys.exit(1) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |