Spaces:
Paused
Paused
| """ | |
| AI Service for Legal Dashboard | |
| ============================= | |
| Advanced AI-powered features for legal document analysis including: | |
| - Intelligent document scoring and classification | |
| - Legal entity extraction and recognition | |
| - Sentiment analysis for legal documents | |
| - Smart search and recommendation engine | |
| - Document similarity analysis | |
| """ | |
| import re | |
| import json | |
| import logging | |
| from typing import Dict, List, Optional, Tuple, Any | |
| from datetime import datetime, timedelta | |
| import numpy as np | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from sklearn.cluster import KMeans | |
| import hashlib | |
| import sqlite3 | |
| from pathlib import Path | |
| logger = logging.getLogger(__name__) | |
| class AIScoringEngine: | |
| """ | |
| Advanced AI scoring engine for legal documents | |
| Provides intelligent analysis, classification, and recommendations | |
| """ | |
| def __init__(self): | |
| """Initialize the AI scoring engine""" | |
| self.vectorizer = TfidfVectorizer( | |
| max_features=1000, | |
| stop_words=None, # Keep Persian stop words for legal context | |
| ngram_range=(1, 3) | |
| ) | |
| self.document_vectors = {} | |
| self.legal_keywords = self._load_legal_keywords() | |
| self.entity_patterns = self._load_entity_patterns() | |
| self.sentiment_indicators = self._load_sentiment_indicators() | |
| self.classification_categories = self._load_classification_categories() | |
| def _load_legal_keywords(self) -> Dict[str, List[str]]: | |
| """Load Persian legal keywords for different categories""" | |
| return { | |
| "قانون": [ | |
| "قانون", "ماده", "تبصره", "بند", "فصل", "باب", "مصوبه", "تصویب", | |
| "مجلس", "شورای", "ملی", "اساسی", "مدنی", "جزایی", "تجاری" | |
| ], | |
| "قرارداد": [ | |
| "قرارداد", "عقد", "مفاد", "طرفین", "متعاهدین", "شرایط", "ماده", | |
| "بند", "مبلغ", "پرداخت", "تعهد", "مسئولیت", "ضمانت" | |
| ], | |
| "احکام": [ | |
| "حکم", "رای", "دادگاه", "قاضی", "شعبه", "دعوی", "خواهان", | |
| "خوانده", "شهادت", "دلیل", "اثبات", "قانونی", "محکوم" | |
| ], | |
| "مالی": [ | |
| "مالیات", "درآمد", "سود", "زیان", "دارایی", "بدهی", "حساب", | |
| "ترازنامه", "صورت", "مالی", "دریافتی", "پرداختی" | |
| ], | |
| "اداری": [ | |
| "اداره", "سازمان", "وزارت", "دولت", "مقام", "مسئول", "کارمند", | |
| "مقررات", "دستورالعمل", "بخشنامه", "آییننامه" | |
| ] | |
| } | |
| def _load_entity_patterns(self) -> Dict[str, str]: | |
| """Load regex patterns for legal entity extraction""" | |
| return { | |
| "نام_شخص": r"([آ-ی]{2,}\s+){2,}", | |
| "نام_شرکت": r"(شرکت|موسسه|سازمان|بنیاد)\s+([آ-ی\s]+)", | |
| "شماره_قرارداد": r"شماره\s*:?\s*(\d+/\d+/\d+)", | |
| "تاریخ": r"(\d{1,2}/\d{1,2}/\d{2,4})", | |
| "مبلغ": r"(\d{1,3}(?:,\d{3})*)\s*(ریال|تومان|دلار|یورو)", | |
| "شماره_ملی": r"(\d{10})", | |
| "کد_پستی": r"(\d{10})", | |
| "شماره_تلفن": r"(\d{2,4}-\d{3,4}-\d{4})" | |
| } | |
| def _load_sentiment_indicators(self) -> Dict[str, List[str]]: | |
| """Load Persian sentiment indicators for legal documents""" | |
| return { | |
| "positive": [ | |
| "موافق", "تایید", "قبول", "اجازه", "مجوز", "تصویب", "قانونی", | |
| "مشروع", "صحیح", "درست", "مناسب", "مطلوب", "سودمند" | |
| ], | |
| "negative": [ | |
| "مخالف", "رد", "عدم", "ممنوع", "غیرقانونی", "نامشروع", | |
| "نادرست", "نامناسب", "مضر", "خطرناک", "ممنوع" | |
| ], | |
| "neutral": [ | |
| "ماده", "بند", "تبصره", "قانون", "مقررات", "شرایط", | |
| "مفاد", "طرفین", "تاریخ", "مبلغ", "شماره" | |
| ] | |
| } | |
| def _load_classification_categories(self) -> Dict[str, Dict]: | |
| """Load document classification categories with weights""" | |
| return { | |
| "قرارداد": { | |
| "keywords": ["قرارداد", "عقد", "طرفین", "مفاد"], | |
| "weight": 0.4, | |
| "patterns": ["طرفین", "متعاهدین", "شرایط"] | |
| }, | |
| "احکام_قضایی": { | |
| "keywords": ["حکم", "رای", "دادگاه", "قاضی"], | |
| "weight": 0.35, | |
| "patterns": ["شعبه", "خواهان", "خوانده"] | |
| }, | |
| "قوانین": { | |
| "keywords": ["قانون", "ماده", "تبصره", "مجلس"], | |
| "weight": 0.3, | |
| "patterns": ["مصوبه", "تصویب", "اساسی"] | |
| }, | |
| "مقررات_اداری": { | |
| "keywords": ["مقررات", "دستورالعمل", "آییننامه"], | |
| "weight": 0.25, | |
| "patterns": ["اداره", "سازمان", "وزارت"] | |
| }, | |
| "اسناد_مالی": { | |
| "keywords": ["مالی", "حساب", "ترازنامه", "صورت"], | |
| "weight": 0.2, | |
| "patterns": ["درآمد", "سود", "زیان"] | |
| } | |
| } | |
| def analyze_document(self, text: str, metadata: Dict = None) -> Dict[str, Any]: | |
| """ | |
| Comprehensive document analysis including scoring, classification, and insights | |
| Args: | |
| text: Document text content | |
| metadata: Additional document metadata | |
| Returns: | |
| Dictionary containing analysis results | |
| """ | |
| try: | |
| # Basic text preprocessing | |
| cleaned_text = self._preprocess_text(text) | |
| # Perform various analyses | |
| analysis = { | |
| "basic_metrics": self._calculate_basic_metrics(cleaned_text), | |
| "classification": self._classify_document(cleaned_text), | |
| "entities": self._extract_entities(cleaned_text), | |
| "sentiment": self._analyze_sentiment(cleaned_text), | |
| "keywords": self._extract_keywords(cleaned_text), | |
| "quality_score": self._calculate_quality_score(cleaned_text, metadata), | |
| "recommendations": self._generate_recommendations(cleaned_text, metadata), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # Add similarity analysis if we have existing documents | |
| if self.document_vectors: | |
| analysis["similarity"] = self._find_similar_documents( | |
| cleaned_text) | |
| return analysis | |
| except Exception as e: | |
| logger.error(f"Error in document analysis: {e}") | |
| return { | |
| "error": str(e), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| def _preprocess_text(self, text: str) -> str: | |
| """Clean and normalize Persian text""" | |
| # Remove extra whitespace | |
| text = re.sub(r'\s+', ' ', text.strip()) | |
| # Normalize Persian characters | |
| text = text.replace('ي', 'ی').replace('ك', 'ک') | |
| # Remove common noise characters | |
| text = re.sub( | |
| r'[^\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\s\d\-\.\/]', '', text) | |
| return text | |
| def _calculate_basic_metrics(self, text: str) -> Dict[str, Any]: | |
| """Calculate basic document metrics""" | |
| words = text.split() | |
| sentences = re.split(r'[.!?؟]', text) | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| return { | |
| "word_count": len(words), | |
| "sentence_count": len(sentences), | |
| "avg_sentence_length": len(words) / len(sentences) if sentences else 0, | |
| "unique_words": len(set(words)), | |
| "vocabulary_diversity": len(set(words)) / len(words) if words else 0, | |
| "legal_terms_count": self._count_legal_terms(text) | |
| } | |
| def _count_legal_terms(self, text: str) -> int: | |
| """Count legal terms in the document""" | |
| count = 0 | |
| for category_terms in self.legal_keywords.values(): | |
| for term in category_terms: | |
| count += text.count(term) | |
| return count | |
| def _classify_document(self, text: str) -> Dict[str, float]: | |
| """Classify document into legal categories""" | |
| scores = {} | |
| for category, config in self.classification_categories.items(): | |
| score = 0 | |
| weight = config["weight"] | |
| # Keyword matching | |
| for keyword in config["keywords"]: | |
| if keyword in text: | |
| score += weight | |
| # Pattern matching | |
| for pattern in config["patterns"]: | |
| if pattern in text: | |
| score += weight * 0.5 | |
| scores[category] = min(score, 1.0) | |
| # Normalize scores | |
| total_score = sum(scores.values()) | |
| if total_score > 0: | |
| scores = {k: v/total_score for k, v in scores.items()} | |
| return scores | |
| def _extract_entities(self, text: str) -> Dict[str, List[str]]: | |
| """Extract legal entities from text""" | |
| entities = {} | |
| for entity_type, pattern in self.entity_patterns.items(): | |
| matches = re.findall(pattern, text) | |
| if matches: | |
| entities[entity_type] = list(set(matches)) | |
| return entities | |
| def _analyze_sentiment(self, text: str) -> Dict[str, float]: | |
| """Analyze sentiment of legal document""" | |
| sentiment_scores = {"positive": 0, "negative": 0, "neutral": 0} | |
| total_words = len(text.split()) | |
| if total_words == 0: | |
| return sentiment_scores | |
| for sentiment, indicators in self.sentiment_indicators.items(): | |
| count = 0 | |
| for indicator in indicators: | |
| count += text.count(indicator) | |
| sentiment_scores[sentiment] = count / total_words | |
| # Normalize scores | |
| total = sum(sentiment_scores.values()) | |
| if total > 0: | |
| sentiment_scores = {k: v/total for k, | |
| v in sentiment_scores.items()} | |
| return sentiment_scores | |
| def _extract_keywords(self, text: str) -> List[Tuple[str, float]]: | |
| """Extract important keywords with TF-IDF scores""" | |
| try: | |
| # Create document-term matrix | |
| tfidf_matrix = self.vectorizer.fit_transform([text]) | |
| feature_names = self.vectorizer.get_feature_names_out() | |
| # Get TF-IDF scores | |
| scores = tfidf_matrix.toarray()[0] | |
| # Create keyword-score pairs | |
| keywords = [(feature_names[i], scores[i]) | |
| for i in range(len(feature_names))] | |
| # Sort by score and return top keywords | |
| keywords.sort(key=lambda x: x[1], reverse=True) | |
| return keywords[:20] # Return top 20 keywords | |
| except Exception as e: | |
| logger.error(f"Error extracting keywords: {e}") | |
| return [] | |
| def _calculate_quality_score(self, text: str, metadata: Dict = None) -> float: | |
| """Calculate overall document quality score""" | |
| score = 0.0 | |
| # Text length factor (optimal length for legal documents) | |
| word_count = len(text.split()) | |
| if 100 <= word_count <= 2000: | |
| score += 0.3 | |
| elif word_count > 2000: | |
| score += 0.2 | |
| else: | |
| score += 0.1 | |
| # Legal terms density | |
| legal_terms = self._count_legal_terms(text) | |
| if legal_terms > 0: | |
| density = legal_terms / word_count | |
| if 0.01 <= density <= 0.1: | |
| score += 0.3 | |
| elif density > 0.1: | |
| score += 0.2 | |
| else: | |
| score += 0.1 | |
| # Structure factor (presence of legal document structure) | |
| structure_indicators = ["ماده", "بند", "تبصره", "فصل", "باب"] | |
| structure_count = sum(text.count(indicator) | |
| for indicator in structure_indicators) | |
| if structure_count > 0: | |
| score += 0.2 | |
| # Completeness factor | |
| completeness_indicators = ["تاریخ", "شماره", "امضا", "مهر"] | |
| completeness_count = sum(text.count(indicator) | |
| for indicator in completeness_indicators) | |
| if completeness_count >= 2: | |
| score += 0.2 | |
| return min(score, 1.0) | |
| def _generate_recommendations(self, text: str, metadata: Dict = None) -> List[str]: | |
| """Generate intelligent recommendations for the document""" | |
| recommendations = [] | |
| # Check document completeness | |
| if len(text.split()) < 100: | |
| recommendations.append( | |
| "مستندات کافی نیست. پیشنهاد میشود جزئیات بیشتری اضافه شود.") | |
| # Check for legal structure | |
| if "ماده" not in text and "بند" not in text: | |
| recommendations.append( | |
| "ساختار حقوقی مشخص نیست. پیشنهاد میشود از ساختار ماده و بند استفاده شود.") | |
| # Check for dates and numbers | |
| if not re.search(r'\d{1,2}/\d{1,2}/\d{2,4}', text): | |
| recommendations.append( | |
| "تاریخ مشخص نشده است. پیشنهاد میشود تاریخ مستندات اضافه شود.") | |
| # Check for signatures | |
| if "امضا" not in text and "مهر" not in text: | |
| recommendations.append( | |
| "امضا یا مهر مشخص نشده است. پیشنهاد میشود امضا اضافه شود.") | |
| # Check for amounts | |
| if not re.search(r'\d{1,3}(?:,\d{3})*', text): | |
| recommendations.append( | |
| "مبالغ مشخص نشده است. پیشنهاد میشود مبالغ دقیق ذکر شود.") | |
| return recommendations | |
| def _find_similar_documents(self, text: str) -> List[Dict[str, Any]]: | |
| """Find similar documents using TF-IDF and cosine similarity""" | |
| try: | |
| # Vectorize current document | |
| current_vector = self.vectorizer.transform([text]) | |
| similarities = [] | |
| for doc_id, doc_vector in self.document_vectors.items(): | |
| similarity = cosine_similarity( | |
| current_vector, doc_vector)[0][0] | |
| similarities.append({ | |
| "document_id": doc_id, | |
| "similarity_score": float(similarity), | |
| "category": "similar_document" | |
| }) | |
| # Sort by similarity and return top matches | |
| similarities.sort( | |
| key=lambda x: x["similarity_score"], reverse=True) | |
| return similarities[:5] # Return top 5 similar documents | |
| except Exception as e: | |
| logger.error(f"Error finding similar documents: {e}") | |
| return [] | |
| def update_document_vector(self, doc_id: str, text: str): | |
| """Update document vector for similarity analysis""" | |
| try: | |
| vector = self.vectorizer.transform([text]) | |
| self.document_vectors[doc_id] = vector | |
| except Exception as e: | |
| logger.error(f"Error updating document vector: {e}") | |
| def get_ai_insights(self, documents: List[Dict]) -> Dict[str, Any]: | |
| """Generate AI insights from multiple documents""" | |
| try: | |
| insights = { | |
| "document_trends": self._analyze_trends(documents), | |
| "common_entities": self._find_common_entities(documents), | |
| "category_distribution": self._analyze_category_distribution(documents), | |
| "quality_metrics": self._calculate_overall_quality(documents), | |
| "recommendations": self._generate_system_recommendations(documents) | |
| } | |
| return insights | |
| except Exception as e: | |
| logger.error(f"Error generating AI insights: {e}") | |
| return {"error": str(e)} | |
| def _analyze_trends(self, documents: List[Dict]) -> Dict[str, Any]: | |
| """Analyze trends across documents""" | |
| # Implementation for trend analysis | |
| return {"trend_analysis": "Not implemented yet"} | |
| def _find_common_entities(self, documents: List[Dict]) -> Dict[str, List[str]]: | |
| """Find common entities across documents""" | |
| # Implementation for common entity analysis | |
| return {"common_entities": "Not implemented yet"} | |
| def _analyze_category_distribution(self, documents: List[Dict]) -> Dict[str, int]: | |
| """Analyze distribution of document categories""" | |
| # Implementation for category distribution | |
| return {"category_distribution": "Not implemented yet"} | |
| def _calculate_overall_quality(self, documents: List[Dict]) -> Dict[str, float]: | |
| """Calculate overall quality metrics""" | |
| # Implementation for overall quality calculation | |
| return {"overall_quality": "Not implemented yet"} | |
| def _generate_system_recommendations(self, documents: List[Dict]) -> List[str]: | |
| """Generate system-wide recommendations""" | |
| # Implementation for system recommendations | |
| return ["سیستم در حال بهبود است"] | |