File size: 18,251 Bytes
922c3ba
 
 
 
c636ebf
 
 
 
 
 
922c3ba
 
 
c636ebf
922c3ba
c636ebf
922c3ba
c636ebf
922c3ba
 
c636ebf
 
 
 
922c3ba
 
 
 
 
c636ebf
 
 
 
922c3ba
 
c636ebf
922c3ba
 
c636ebf
 
922c3ba
c636ebf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
922c3ba
c636ebf
922c3ba
c636ebf
 
 
 
 
 
 
 
 
 
 
 
922c3ba
c636ebf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
922c3ba
c636ebf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
922c3ba
c636ebf
 
 
922c3ba
c636ebf
 
 
922c3ba
c636ebf
 
 
922c3ba
c636ebf
 
 
 
 
 
 
 
 
 
 
 
 
 
922c3ba
c636ebf
 
 
 
922c3ba
c636ebf
922c3ba
 
c636ebf
 
 
 
 
922c3ba
c636ebf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
922c3ba
c636ebf
 
 
 
 
 
 
922c3ba
c636ebf
 
 
922c3ba
c636ebf
 
 
922c3ba
c636ebf
 
 
 
922c3ba
c636ebf
 
 
 
922c3ba
c636ebf
922c3ba
c636ebf
 
 
 
922c3ba
c636ebf
922c3ba
c636ebf
 
 
922c3ba
c636ebf
 
 
 
922c3ba
c636ebf
922c3ba
c636ebf
 
 
 
922c3ba
c636ebf
 
922c3ba
c636ebf
 
 
 
 
922c3ba
c636ebf
 
 
 
 
922c3ba
c636ebf
922c3ba
c636ebf
 
922c3ba
c636ebf
 
 
922c3ba
c636ebf
 
922c3ba
c636ebf
 
 
922c3ba
c636ebf
 
 
922c3ba
 
c636ebf
 
922c3ba
c636ebf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
922c3ba
c636ebf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
922c3ba
 
c636ebf
 
922c3ba
c636ebf
 
922c3ba
c636ebf
 
 
 
922c3ba
c636ebf
 
 
 
 
 
 
 
 
 
 
922c3ba
c636ebf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
"""

AI Service for Legal Dashboard

=============================



Advanced AI-powered features for legal document analysis including:

- Intelligent document scoring and classification

- Legal entity extraction and recognition

- Sentiment analysis for legal documents

- Smart search and recommendation engine

- Document similarity analysis

"""

import re
import json
import logging
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime, timedelta
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
import hashlib
import sqlite3
from pathlib import Path

logger = logging.getLogger(__name__)


class AIScoringEngine:
    """

    Advanced AI scoring engine for legal documents

    Provides intelligent analysis, classification, and recommendations

    """

    def __init__(self):
        """Initialize the AI scoring engine"""
        self.vectorizer = TfidfVectorizer(
            max_features=1000,
            stop_words=None,  # Keep Persian stop words for legal context
            ngram_range=(1, 3)
        )
        self.document_vectors = {}
        self.legal_keywords = self._load_legal_keywords()
        self.entity_patterns = self._load_entity_patterns()
        self.sentiment_indicators = self._load_sentiment_indicators()
        self.classification_categories = self._load_classification_categories()

    def _load_legal_keywords(self) -> Dict[str, List[str]]:
        """Load Persian legal keywords for different categories"""
        return {
            "قانون": [
                "قانون", "ماده", "تبصره", "بند", "فصل", "باب", "مصوبه", "تصویب",
                "مجلس", "شورای", "ملی", "اساسی", "مدنی", "جزایی", "تجاری"
            ],
            "قرارداد": [
                "قرارداد", "عقد", "مفاد", "طرفین", "متعاهدین", "شرایط", "ماده",
                "بند", "مبلغ", "پرداخت", "تعهد", "مسئولیت", "ضمانت"
            ],
            "احکام": [
                "حکم", "رای", "دادگاه", "قاضی", "شعبه", "دعوی", "خواهان",
                "خوانده", "شهادت", "دلیل", "اثبات", "قانونی", "محکوم"
            ],
            "مالی": [
                "مالیات", "درآمد", "سود", "زیان", "دارایی", "بدهی", "حساب",
                "ترازنامه", "صورت", "مالی", "دریافتی", "پرداختی"
            ],
            "اداری": [
                "اداره", "سازمان", "وزارت", "دولت", "مقام", "مسئول", "کارمند",
                "مقررات", "دستورالعمل", "بخشنامه", "آیین‌نامه"
            ]
        }

    def _load_entity_patterns(self) -> Dict[str, str]:
        """Load regex patterns for legal entity extraction"""
        return {
            "نام_شخص": r"([آ-ی]{2,}\s+){2,}",
            "نام_شرکت": r"(شرکت|موسسه|سازمان|بنیاد)\s+([آ-ی\s]+)",
            "شماره_قرارداد": r"شماره\s*:?\s*(\d+/\d+/\d+)",
            "تاریخ": r"(\d{1,2}/\d{1,2}/\d{2,4})",
            "مبلغ": r"(\d{1,3}(?:,\d{3})*)\s*(ریال|تومان|دلار|یورو)",
            "شماره_ملی": r"(\d{10})",
            "کد_پستی": r"(\d{10})",
            "شماره_تلفن": r"(\d{2,4}-\d{3,4}-\d{4})"
        }

    def _load_sentiment_indicators(self) -> Dict[str, List[str]]:
        """Load Persian sentiment indicators for legal documents"""
        return {
            "positive": [
                "موافق", "تایید", "قبول", "اجازه", "مجوز", "تصویب", "قانونی",
                "مشروع", "صحیح", "درست", "مناسب", "مطلوب", "سودمند"
            ],
            "negative": [
                "مخالف", "رد", "عدم", "ممنوع", "غیرقانونی", "نامشروع",
                "نادرست", "نامناسب", "مضر", "خطرناک", "ممنوع"
            ],
            "neutral": [
                "ماده", "بند", "تبصره", "قانون", "مقررات", "شرایط",
                "مفاد", "طرفین", "تاریخ", "مبلغ", "شماره"
            ]
        }

    def _load_classification_categories(self) -> Dict[str, Dict]:
        """Load document classification categories with weights"""
        return {
            "قرارداد": {
                "keywords": ["قرارداد", "عقد", "طرفین", "مفاد"],
                "weight": 0.4,
                "patterns": ["طرفین", "متعاهدین", "شرایط"]
            },
            "احکام_قضایی": {
                "keywords": ["حکم", "رای", "دادگاه", "قاضی"],
                "weight": 0.35,
                "patterns": ["شعبه", "خواهان", "خوانده"]
            },
            "قوانین": {
                "keywords": ["قانون", "ماده", "تبصره", "مجلس"],
                "weight": 0.3,
                "patterns": ["مصوبه", "تصویب", "اساسی"]
            },
            "مقررات_اداری": {
                "keywords": ["مقررات", "دستورالعمل", "آیین‌نامه"],
                "weight": 0.25,
                "patterns": ["اداره", "سازمان", "وزارت"]
            },
            "اسناد_مالی": {
                "keywords": ["مالی", "حساب", "ترازنامه", "صورت"],
                "weight": 0.2,
                "patterns": ["درآمد", "سود", "زیان"]
            }
        }

    def analyze_document(self, text: str, metadata: Dict = None) -> Dict[str, Any]:
        """

        Comprehensive document analysis including scoring, classification, and insights



        Args:

            text: Document text content

            metadata: Additional document metadata



        Returns:

            Dictionary containing analysis results

        """
        try:
            # Basic text preprocessing
            cleaned_text = self._preprocess_text(text)

            # Perform various analyses
            analysis = {
                "basic_metrics": self._calculate_basic_metrics(cleaned_text),
                "classification": self._classify_document(cleaned_text),
                "entities": self._extract_entities(cleaned_text),
                "sentiment": self._analyze_sentiment(cleaned_text),
                "keywords": self._extract_keywords(cleaned_text),
                "quality_score": self._calculate_quality_score(cleaned_text, metadata),
                "recommendations": self._generate_recommendations(cleaned_text, metadata),
                "timestamp": datetime.now().isoformat()
            }

            # Add similarity analysis if we have existing documents
            if self.document_vectors:
                analysis["similarity"] = self._find_similar_documents(
                    cleaned_text)

            return analysis

        except Exception as e:
            logger.error(f"Error in document analysis: {e}")
            return {
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            }

    def _preprocess_text(self, text: str) -> str:
        """Clean and normalize Persian text"""
        # Remove extra whitespace
        text = re.sub(r'\s+', ' ', text.strip())

        # Normalize Persian characters
        text = text.replace('ي', 'ی').replace('ك', 'ک')

        # Remove common noise characters
        text = re.sub(
            r'[^\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF\s\d\-\.\/]', '', text)

        return text

    def _calculate_basic_metrics(self, text: str) -> Dict[str, Any]:
        """Calculate basic document metrics"""
        words = text.split()
        sentences = re.split(r'[.!?؟]', text)
        sentences = [s.strip() for s in sentences if s.strip()]

        return {
            "word_count": len(words),
            "sentence_count": len(sentences),
            "avg_sentence_length": len(words) / len(sentences) if sentences else 0,
            "unique_words": len(set(words)),
            "vocabulary_diversity": len(set(words)) / len(words) if words else 0,
            "legal_terms_count": self._count_legal_terms(text)
        }

    def _count_legal_terms(self, text: str) -> int:
        """Count legal terms in the document"""
        count = 0
        for category_terms in self.legal_keywords.values():
            for term in category_terms:
                count += text.count(term)
        return count

    def _classify_document(self, text: str) -> Dict[str, float]:
        """Classify document into legal categories"""
        scores = {}

        for category, config in self.classification_categories.items():
            score = 0
            weight = config["weight"]

            # Keyword matching
            for keyword in config["keywords"]:
                if keyword in text:
                    score += weight

            # Pattern matching
            for pattern in config["patterns"]:
                if pattern in text:
                    score += weight * 0.5

            scores[category] = min(score, 1.0)

        # Normalize scores
        total_score = sum(scores.values())
        if total_score > 0:
            scores = {k: v/total_score for k, v in scores.items()}

        return scores

    def _extract_entities(self, text: str) -> Dict[str, List[str]]:
        """Extract legal entities from text"""
        entities = {}

        for entity_type, pattern in self.entity_patterns.items():
            matches = re.findall(pattern, text)
            if matches:
                entities[entity_type] = list(set(matches))

        return entities

    def _analyze_sentiment(self, text: str) -> Dict[str, float]:
        """Analyze sentiment of legal document"""
        sentiment_scores = {"positive": 0, "negative": 0, "neutral": 0}
        total_words = len(text.split())

        if total_words == 0:
            return sentiment_scores

        for sentiment, indicators in self.sentiment_indicators.items():
            count = 0
            for indicator in indicators:
                count += text.count(indicator)
            sentiment_scores[sentiment] = count / total_words

        # Normalize scores
        total = sum(sentiment_scores.values())
        if total > 0:
            sentiment_scores = {k: v/total for k,
                                v in sentiment_scores.items()}

        return sentiment_scores

    def _extract_keywords(self, text: str) -> List[Tuple[str, float]]:
        """Extract important keywords with TF-IDF scores"""
        try:
            # Create document-term matrix
            tfidf_matrix = self.vectorizer.fit_transform([text])
            feature_names = self.vectorizer.get_feature_names_out()

            # Get TF-IDF scores
            scores = tfidf_matrix.toarray()[0]

            # Create keyword-score pairs
            keywords = [(feature_names[i], scores[i])
                        for i in range(len(feature_names))]

            # Sort by score and return top keywords
            keywords.sort(key=lambda x: x[1], reverse=True)
            return keywords[:20]  # Return top 20 keywords

        except Exception as e:
            logger.error(f"Error extracting keywords: {e}")
            return []

    def _calculate_quality_score(self, text: str, metadata: Dict = None) -> float:
        """Calculate overall document quality score"""
        score = 0.0

        # Text length factor (optimal length for legal documents)
        word_count = len(text.split())
        if 100 <= word_count <= 2000:
            score += 0.3
        elif word_count > 2000:
            score += 0.2
        else:
            score += 0.1

        # Legal terms density
        legal_terms = self._count_legal_terms(text)
        if legal_terms > 0:
            density = legal_terms / word_count
            if 0.01 <= density <= 0.1:
                score += 0.3
            elif density > 0.1:
                score += 0.2
            else:
                score += 0.1

        # Structure factor (presence of legal document structure)
        structure_indicators = ["ماده", "بند", "تبصره", "فصل", "باب"]
        structure_count = sum(text.count(indicator)
                              for indicator in structure_indicators)
        if structure_count > 0:
            score += 0.2

        # Completeness factor
        completeness_indicators = ["تاریخ", "شماره", "امضا", "مهر"]
        completeness_count = sum(text.count(indicator)
                                 for indicator in completeness_indicators)
        if completeness_count >= 2:
            score += 0.2

        return min(score, 1.0)

    def _generate_recommendations(self, text: str, metadata: Dict = None) -> List[str]:
        """Generate intelligent recommendations for the document"""
        recommendations = []

        # Check document completeness
        if len(text.split()) < 100:
            recommendations.append(
                "مستندات کافی نیست. پیشنهاد می‌شود جزئیات بیشتری اضافه شود.")

        # Check for legal structure
        if "ماده" not in text and "بند" not in text:
            recommendations.append(
                "ساختار حقوقی مشخص نیست. پیشنهاد می‌شود از ساختار ماده و بند استفاده شود.")

        # Check for dates and numbers
        if not re.search(r'\d{1,2}/\d{1,2}/\d{2,4}', text):
            recommendations.append(
                "تاریخ مشخص نشده است. پیشنهاد می‌شود تاریخ مستندات اضافه شود.")

        # Check for signatures
        if "امضا" not in text and "مهر" not in text:
            recommendations.append(
                "امضا یا مهر مشخص نشده است. پیشنهاد می‌شود امضا اضافه شود.")

        # Check for amounts
        if not re.search(r'\d{1,3}(?:,\d{3})*', text):
            recommendations.append(
                "مبالغ مشخص نشده است. پیشنهاد می‌شود مبالغ دقیق ذکر شود.")

        return recommendations

    def _find_similar_documents(self, text: str) -> List[Dict[str, Any]]:
        """Find similar documents using TF-IDF and cosine similarity"""
        try:
            # Vectorize current document
            current_vector = self.vectorizer.transform([text])

            similarities = []
            for doc_id, doc_vector in self.document_vectors.items():
                similarity = cosine_similarity(
                    current_vector, doc_vector)[0][0]
                similarities.append({
                    "document_id": doc_id,
                    "similarity_score": float(similarity),
                    "category": "similar_document"
                })

            # Sort by similarity and return top matches
            similarities.sort(
                key=lambda x: x["similarity_score"], reverse=True)
            return similarities[:5]  # Return top 5 similar documents

        except Exception as e:
            logger.error(f"Error finding similar documents: {e}")
            return []

    def update_document_vector(self, doc_id: str, text: str):
        """Update document vector for similarity analysis"""
        try:
            vector = self.vectorizer.transform([text])
            self.document_vectors[doc_id] = vector
        except Exception as e:
            logger.error(f"Error updating document vector: {e}")

    def get_ai_insights(self, documents: List[Dict]) -> Dict[str, Any]:
        """Generate AI insights from multiple documents"""
        try:
            insights = {
                "document_trends": self._analyze_trends(documents),
                "common_entities": self._find_common_entities(documents),
                "category_distribution": self._analyze_category_distribution(documents),
                "quality_metrics": self._calculate_overall_quality(documents),
                "recommendations": self._generate_system_recommendations(documents)
            }
            return insights
        except Exception as e:
            logger.error(f"Error generating AI insights: {e}")
            return {"error": str(e)}

    def _analyze_trends(self, documents: List[Dict]) -> Dict[str, Any]:
        """Analyze trends across documents"""
        # Implementation for trend analysis
        return {"trend_analysis": "Not implemented yet"}

    def _find_common_entities(self, documents: List[Dict]) -> Dict[str, List[str]]:
        """Find common entities across documents"""
        # Implementation for common entity analysis
        return {"common_entities": "Not implemented yet"}

    def _analyze_category_distribution(self, documents: List[Dict]) -> Dict[str, int]:
        """Analyze distribution of document categories"""
        # Implementation for category distribution
        return {"category_distribution": "Not implemented yet"}

    def _calculate_overall_quality(self, documents: List[Dict]) -> Dict[str, float]:
        """Calculate overall quality metrics"""
        # Implementation for overall quality calculation
        return {"overall_quality": "Not implemented yet"}

    def _generate_system_recommendations(self, documents: List[Dict]) -> List[str]:
        """Generate system-wide recommendations"""
        # Implementation for system recommendations
        return ["سیستم در حال بهبود است"]