Hoghoghi / app /services /database_service.py
Really-amin's picture
Upload 46 files
922c3ba verified
raw
history blame
14.3 kB
"""
Database Service for Legal Dashboard
==================================
SQLite database management for legal documents with AI scoring.
"""
import sqlite3
import json
import logging
from typing import List, Dict, Optional, Any
from datetime import datetime, timedelta
from pathlib import Path
import uuid
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Database manager for legal documents"""
def __init__(self, db_path: str = "legal_documents.db"):
self.db_path = db_path
self.connection = None
self._init_database()
def _init_database(self):
"""Initialize database and create tables"""
try:
self.connection = sqlite3.connect(self.db_path)
self.connection.row_factory = sqlite3.Row
# Create tables
cursor = self.connection.cursor()
# Documents table
cursor.execute("""
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
document_number TEXT,
publication_date TEXT,
source TEXT,
full_text TEXT,
url TEXT,
extracted_at TEXT,
source_credibility REAL DEFAULT 0.0,
document_quality REAL DEFAULT 0.0,
final_score REAL DEFAULT 0.0,
category TEXT,
status TEXT DEFAULT 'pending',
ai_confidence REAL DEFAULT 0.0,
user_feedback TEXT,
keywords TEXT,
references TEXT,
recency_score REAL DEFAULT 0.0,
ocr_confidence REAL DEFAULT 0.0,
language TEXT DEFAULT 'fa',
file_path TEXT,
file_size INTEGER,
processing_time REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# AI training data table
cursor.execute("""
CREATE TABLE IF NOT EXISTS ai_training_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id TEXT,
feedback_type TEXT,
feedback_score REAL,
feedback_text TEXT,
expected_score REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents (id)
)
""")
# System metrics table
cursor.execute("""
CREATE TABLE IF NOT EXISTS system_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
metric_name TEXT,
metric_value REAL,
metric_data TEXT,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.connection.commit()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Database initialization failed: {e}")
raise
def is_connected(self) -> bool:
"""Check if database is connected"""
try:
if self.connection:
self.connection.execute("SELECT 1")
return True
return False
except:
return False
def insert_document(self, document_data: Dict[str, Any]) -> str:
"""Insert a new document"""
try:
cursor = self.connection.cursor()
# Generate ID if not provided
if 'id' not in document_data:
document_data['id'] = str(uuid.uuid4())
# Convert lists to JSON strings
if 'keywords' in document_data and isinstance(document_data['keywords'], list):
document_data['keywords'] = json.dumps(
document_data['keywords'])
if 'references' in document_data and isinstance(document_data['references'], list):
document_data['references'] = json.dumps(
document_data['references'])
# Prepare SQL
columns = ', '.join(document_data.keys())
placeholders = ', '.join(['?' for _ in document_data])
values = list(document_data.values())
sql = f"INSERT OR REPLACE INTO documents ({columns}) VALUES ({placeholders})"
cursor.execute(sql, values)
self.connection.commit()
logger.info(f"Document inserted: {document_data['id']}")
return document_data['id']
except Exception as e:
logger.error(f"Error inserting document: {e}")
raise
def get_documents(self, limit: int = 100, offset: int = 0,
category: Optional[str] = None, status: Optional[str] = None,
min_score: Optional[float] = None, max_score: Optional[float] = None,
source: Optional[str] = None) -> List[Dict]:
"""Get documents with filters"""
try:
cursor = self.connection.cursor()
# Build query
query = "SELECT * FROM documents WHERE 1=1"
params = []
if category:
query += " AND category = ?"
params.append(category)
if status:
query += " AND status = ?"
params.append(status)
if min_score is not None:
query += " AND final_score >= ?"
params.append(min_score)
if max_score is not None:
query += " AND final_score <= ?"
params.append(max_score)
if source:
query += " AND source = ?"
params.append(source)
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
rows = cursor.fetchall()
# Convert to dictionaries
documents = []
for row in rows:
doc = dict(row)
# Parse JSON fields
if doc.get('keywords'):
try:
doc['keywords'] = json.loads(doc['keywords'])
except:
doc['keywords'] = []
if doc.get('references'):
try:
doc['references'] = json.loads(doc['references'])
except:
doc['references'] = []
documents.append(doc)
return documents
except Exception as e:
logger.error(f"Error getting documents: {e}")
return []
def get_document_by_id(self, document_id: str) -> Optional[Dict]:
"""Get a single document by ID"""
try:
cursor = self.connection.cursor()
cursor.execute(
"SELECT * FROM documents WHERE id = ?", (document_id,))
row = cursor.fetchone()
if row:
doc = dict(row)
# Parse JSON fields
if doc.get('keywords'):
try:
doc['keywords'] = json.loads(doc['keywords'])
except:
doc['keywords'] = []
if doc.get('references'):
try:
doc['references'] = json.loads(doc['references'])
except:
doc['references'] = []
return doc
return None
except Exception as e:
logger.error(f"Error getting document {document_id}: {e}")
return None
def update_document(self, document_id: str, updates: Dict[str, Any]) -> bool:
"""Update a document"""
try:
cursor = self.connection.cursor()
# Convert lists to JSON strings
if 'keywords' in updates and isinstance(updates['keywords'], list):
updates['keywords'] = json.dumps(updates['keywords'])
if 'references' in updates and isinstance(updates['references'], list):
updates['references'] = json.dumps(updates['references'])
# Add updated_at timestamp
updates['updated_at'] = datetime.now().isoformat()
# Build update query
set_clause = ', '.join([f"{k} = ?" for k in updates.keys()])
values = list(updates.values()) + [document_id]
sql = f"UPDATE documents SET {set_clause} WHERE id = ?"
cursor.execute(sql, values)
self.connection.commit()
logger.info(f"Document updated: {document_id}")
return True
except Exception as e:
logger.error(f"Error updating document {document_id}: {e}")
return False
def delete_document(self, document_id: str) -> bool:
"""Delete a document"""
try:
cursor = self.connection.cursor()
cursor.execute(
"DELETE FROM documents WHERE id = ?", (document_id,))
self.connection.commit()
logger.info(f"Document deleted: {document_id}")
return True
except Exception as e:
logger.error(f"Error deleting document {document_id}: {e}")
return False
def get_dashboard_summary(self) -> Dict:
"""Get dashboard summary statistics"""
try:
cursor = self.connection.cursor()
# Total documents
cursor.execute("SELECT COUNT(*) FROM documents")
total_documents = cursor.fetchone()[0]
# Documents processed today
today = datetime.now().date()
cursor.execute(
"SELECT COUNT(*) FROM documents WHERE DATE(created_at) = ?", (today,))
processed_today = cursor.fetchone()[0]
# Average score
cursor.execute(
"SELECT AVG(final_score) FROM documents WHERE final_score > 0")
avg_score = cursor.fetchone()[0] or 0.0
# Top categories
cursor.execute("""
SELECT category, COUNT(*) as count
FROM documents
WHERE category IS NOT NULL
GROUP BY category
ORDER BY count DESC
LIMIT 5
""")
top_categories = [dict(row) for row in cursor.fetchall()]
# Recent activity
cursor.execute("""
SELECT id, title, status, created_at
FROM documents
ORDER BY created_at DESC
LIMIT 10
""")
recent_activity = [dict(row) for row in cursor.fetchall()]
return {
"total_documents": total_documents,
"processed_today": processed_today,
"average_score": round(avg_score, 2),
"top_categories": top_categories,
"recent_activity": recent_activity
}
except Exception as e:
logger.error(f"Error getting dashboard summary: {e}")
return {
"total_documents": 0,
"processed_today": 0,
"average_score": 0.0,
"top_categories": [],
"recent_activity": []
}
def add_ai_feedback(self, document_id: str, feedback_type: str,
feedback_score: float, feedback_text: str = "") -> bool:
"""Add AI training feedback"""
try:
cursor = self.connection.cursor()
cursor.execute("""
INSERT INTO ai_training_data
(document_id, feedback_type, feedback_score, feedback_text)
VALUES (?, ?, ?, ?)
""", (document_id, feedback_type, feedback_score, feedback_text))
self.connection.commit()
logger.info(f"AI feedback added for document {document_id}")
return True
except Exception as e:
logger.error(f"Error adding AI feedback: {e}")
return False
def get_ai_training_stats(self) -> Dict:
"""Get AI training statistics"""
try:
cursor = self.connection.cursor()
# Total feedback entries
cursor.execute("SELECT COUNT(*) FROM ai_training_data")
total_feedback = cursor.fetchone()[0]
# Average feedback score
cursor.execute("SELECT AVG(feedback_score) FROM ai_training_data")
avg_feedback = cursor.fetchone()[0] or 0.0
# Feedback by type
cursor.execute("""
SELECT feedback_type, COUNT(*) as count, AVG(feedback_score) as avg_score
FROM ai_training_data
GROUP BY feedback_type
""")
feedback_by_type = [dict(row) for row in cursor.fetchall()]
return {
"total_feedback": total_feedback,
"average_feedback_score": round(avg_feedback, 2),
"feedback_by_type": feedback_by_type
}
except Exception as e:
logger.error(f"Error getting AI training stats: {e}")
return {
"total_feedback": 0,
"average_feedback_score": 0.0,
"feedback_by_type": []
}
def close(self):
"""Close database connection"""
if self.connection:
self.connection.close()
logger.info("Database connection closed")