Spaces:
Paused
Paused
| """ | |
| Database Service for Legal Dashboard | |
| ================================== | |
| SQLite database management for legal documents with AI scoring. | |
| """ | |
| import sqlite3 | |
| import json | |
| import logging | |
| from typing import List, Dict, Optional, Any | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| import uuid | |
| logger = logging.getLogger(__name__) | |
| class DatabaseManager: | |
| """Database manager for legal documents""" | |
| def __init__(self, db_path: str = "legal_documents.db"): | |
| self.db_path = db_path | |
| self.connection = None | |
| self._init_database() | |
| def _init_database(self): | |
| """Initialize database and create tables""" | |
| try: | |
| self.connection = sqlite3.connect(self.db_path) | |
| self.connection.row_factory = sqlite3.Row | |
| # Create tables | |
| cursor = self.connection.cursor() | |
| # Documents table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS documents ( | |
| id TEXT PRIMARY KEY, | |
| title TEXT NOT NULL, | |
| document_number TEXT, | |
| publication_date TEXT, | |
| source TEXT, | |
| full_text TEXT, | |
| url TEXT, | |
| extracted_at TEXT, | |
| source_credibility REAL DEFAULT 0.0, | |
| document_quality REAL DEFAULT 0.0, | |
| final_score REAL DEFAULT 0.0, | |
| category TEXT, | |
| status TEXT DEFAULT 'pending', | |
| ai_confidence REAL DEFAULT 0.0, | |
| user_feedback TEXT, | |
| keywords TEXT, | |
| references TEXT, | |
| recency_score REAL DEFAULT 0.0, | |
| ocr_confidence REAL DEFAULT 0.0, | |
| language TEXT DEFAULT 'fa', | |
| file_path TEXT, | |
| file_size INTEGER, | |
| processing_time REAL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # AI training data table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS ai_training_data ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| document_id TEXT, | |
| feedback_type TEXT, | |
| feedback_score REAL, | |
| feedback_text TEXT, | |
| expected_score REAL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (document_id) REFERENCES documents (id) | |
| ) | |
| """) | |
| # System metrics table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS system_metrics ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| metric_name TEXT, | |
| metric_value REAL, | |
| metric_data TEXT, | |
| recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| self.connection.commit() | |
| logger.info("Database initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Database initialization failed: {e}") | |
| raise | |
| def is_connected(self) -> bool: | |
| """Check if database is connected""" | |
| try: | |
| if self.connection: | |
| self.connection.execute("SELECT 1") | |
| return True | |
| return False | |
| except: | |
| return False | |
| def insert_document(self, document_data: Dict[str, Any]) -> str: | |
| """Insert a new document""" | |
| try: | |
| cursor = self.connection.cursor() | |
| # Generate ID if not provided | |
| if 'id' not in document_data: | |
| document_data['id'] = str(uuid.uuid4()) | |
| # Convert lists to JSON strings | |
| if 'keywords' in document_data and isinstance(document_data['keywords'], list): | |
| document_data['keywords'] = json.dumps( | |
| document_data['keywords']) | |
| if 'references' in document_data and isinstance(document_data['references'], list): | |
| document_data['references'] = json.dumps( | |
| document_data['references']) | |
| # Prepare SQL | |
| columns = ', '.join(document_data.keys()) | |
| placeholders = ', '.join(['?' for _ in document_data]) | |
| values = list(document_data.values()) | |
| sql = f"INSERT OR REPLACE INTO documents ({columns}) VALUES ({placeholders})" | |
| cursor.execute(sql, values) | |
| self.connection.commit() | |
| logger.info(f"Document inserted: {document_data['id']}") | |
| return document_data['id'] | |
| except Exception as e: | |
| logger.error(f"Error inserting document: {e}") | |
| raise | |
| def get_documents(self, limit: int = 100, offset: int = 0, | |
| category: Optional[str] = None, status: Optional[str] = None, | |
| min_score: Optional[float] = None, max_score: Optional[float] = None, | |
| source: Optional[str] = None) -> List[Dict]: | |
| """Get documents with filters""" | |
| try: | |
| cursor = self.connection.cursor() | |
| # Build query | |
| query = "SELECT * FROM documents WHERE 1=1" | |
| params = [] | |
| if category: | |
| query += " AND category = ?" | |
| params.append(category) | |
| if status: | |
| query += " AND status = ?" | |
| params.append(status) | |
| if min_score is not None: | |
| query += " AND final_score >= ?" | |
| params.append(min_score) | |
| if max_score is not None: | |
| query += " AND final_score <= ?" | |
| params.append(max_score) | |
| if source: | |
| query += " AND source = ?" | |
| params.append(source) | |
| query += " ORDER BY created_at DESC LIMIT ? OFFSET ?" | |
| params.extend([limit, offset]) | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| # Convert to dictionaries | |
| documents = [] | |
| for row in rows: | |
| doc = dict(row) | |
| # Parse JSON fields | |
| if doc.get('keywords'): | |
| try: | |
| doc['keywords'] = json.loads(doc['keywords']) | |
| except: | |
| doc['keywords'] = [] | |
| if doc.get('references'): | |
| try: | |
| doc['references'] = json.loads(doc['references']) | |
| except: | |
| doc['references'] = [] | |
| documents.append(doc) | |
| return documents | |
| except Exception as e: | |
| logger.error(f"Error getting documents: {e}") | |
| return [] | |
| def get_document_by_id(self, document_id: str) -> Optional[Dict]: | |
| """Get a single document by ID""" | |
| try: | |
| cursor = self.connection.cursor() | |
| cursor.execute( | |
| "SELECT * FROM documents WHERE id = ?", (document_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| doc = dict(row) | |
| # Parse JSON fields | |
| if doc.get('keywords'): | |
| try: | |
| doc['keywords'] = json.loads(doc['keywords']) | |
| except: | |
| doc['keywords'] = [] | |
| if doc.get('references'): | |
| try: | |
| doc['references'] = json.loads(doc['references']) | |
| except: | |
| doc['references'] = [] | |
| return doc | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting document {document_id}: {e}") | |
| return None | |
| def update_document(self, document_id: str, updates: Dict[str, Any]) -> bool: | |
| """Update a document""" | |
| try: | |
| cursor = self.connection.cursor() | |
| # Convert lists to JSON strings | |
| if 'keywords' in updates and isinstance(updates['keywords'], list): | |
| updates['keywords'] = json.dumps(updates['keywords']) | |
| if 'references' in updates and isinstance(updates['references'], list): | |
| updates['references'] = json.dumps(updates['references']) | |
| # Add updated_at timestamp | |
| updates['updated_at'] = datetime.now().isoformat() | |
| # Build update query | |
| set_clause = ', '.join([f"{k} = ?" for k in updates.keys()]) | |
| values = list(updates.values()) + [document_id] | |
| sql = f"UPDATE documents SET {set_clause} WHERE id = ?" | |
| cursor.execute(sql, values) | |
| self.connection.commit() | |
| logger.info(f"Document updated: {document_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error updating document {document_id}: {e}") | |
| return False | |
| def delete_document(self, document_id: str) -> bool: | |
| """Delete a document""" | |
| try: | |
| cursor = self.connection.cursor() | |
| cursor.execute( | |
| "DELETE FROM documents WHERE id = ?", (document_id,)) | |
| self.connection.commit() | |
| logger.info(f"Document deleted: {document_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error deleting document {document_id}: {e}") | |
| return False | |
| def get_dashboard_summary(self) -> Dict: | |
| """Get dashboard summary statistics""" | |
| try: | |
| cursor = self.connection.cursor() | |
| # Total documents | |
| cursor.execute("SELECT COUNT(*) FROM documents") | |
| total_documents = cursor.fetchone()[0] | |
| # Documents processed today | |
| today = datetime.now().date() | |
| cursor.execute( | |
| "SELECT COUNT(*) FROM documents WHERE DATE(created_at) = ?", (today,)) | |
| processed_today = cursor.fetchone()[0] | |
| # Average score | |
| cursor.execute( | |
| "SELECT AVG(final_score) FROM documents WHERE final_score > 0") | |
| avg_score = cursor.fetchone()[0] or 0.0 | |
| # Top categories | |
| cursor.execute(""" | |
| SELECT category, COUNT(*) as count | |
| FROM documents | |
| WHERE category IS NOT NULL | |
| GROUP BY category | |
| ORDER BY count DESC | |
| LIMIT 5 | |
| """) | |
| top_categories = [dict(row) for row in cursor.fetchall()] | |
| # Recent activity | |
| cursor.execute(""" | |
| SELECT id, title, status, created_at | |
| FROM documents | |
| ORDER BY created_at DESC | |
| LIMIT 10 | |
| """) | |
| recent_activity = [dict(row) for row in cursor.fetchall()] | |
| return { | |
| "total_documents": total_documents, | |
| "processed_today": processed_today, | |
| "average_score": round(avg_score, 2), | |
| "top_categories": top_categories, | |
| "recent_activity": recent_activity | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting dashboard summary: {e}") | |
| return { | |
| "total_documents": 0, | |
| "processed_today": 0, | |
| "average_score": 0.0, | |
| "top_categories": [], | |
| "recent_activity": [] | |
| } | |
| def add_ai_feedback(self, document_id: str, feedback_type: str, | |
| feedback_score: float, feedback_text: str = "") -> bool: | |
| """Add AI training feedback""" | |
| try: | |
| cursor = self.connection.cursor() | |
| cursor.execute(""" | |
| INSERT INTO ai_training_data | |
| (document_id, feedback_type, feedback_score, feedback_text) | |
| VALUES (?, ?, ?, ?) | |
| """, (document_id, feedback_type, feedback_score, feedback_text)) | |
| self.connection.commit() | |
| logger.info(f"AI feedback added for document {document_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error adding AI feedback: {e}") | |
| return False | |
| def get_ai_training_stats(self) -> Dict: | |
| """Get AI training statistics""" | |
| try: | |
| cursor = self.connection.cursor() | |
| # Total feedback entries | |
| cursor.execute("SELECT COUNT(*) FROM ai_training_data") | |
| total_feedback = cursor.fetchone()[0] | |
| # Average feedback score | |
| cursor.execute("SELECT AVG(feedback_score) FROM ai_training_data") | |
| avg_feedback = cursor.fetchone()[0] or 0.0 | |
| # Feedback by type | |
| cursor.execute(""" | |
| SELECT feedback_type, COUNT(*) as count, AVG(feedback_score) as avg_score | |
| FROM ai_training_data | |
| GROUP BY feedback_type | |
| """) | |
| feedback_by_type = [dict(row) for row in cursor.fetchall()] | |
| return { | |
| "total_feedback": total_feedback, | |
| "average_feedback_score": round(avg_feedback, 2), | |
| "feedback_by_type": feedback_by_type | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting AI training stats: {e}") | |
| return { | |
| "total_feedback": 0, | |
| "average_feedback_score": 0.0, | |
| "feedback_by_type": [] | |
| } | |
| def close(self): | |
| """Close database connection""" | |
| if self.connection: | |
| self.connection.close() | |
| logger.info("Database connection closed") | |