propertyverification / models /legal_analysis.py
sksameermujahid's picture
Upload 34 files
9860c76 verified
# models/legal_analysis.py
import re
from .model_loader import load_model
from .logging_config import logger
from typing import Dict, Any, List, Tuple
def analyze_legal_details(legal_text: str) -> Dict[str, Any]:
"""Analyze legal details of a property with comprehensive validation."""
try:
if not legal_text or len(str(legal_text).strip()) < 5:
return {
'assessment': 'insufficient',
'confidence': 0.1, # Small confidence instead of 0
'summary': 'No legal details provided',
'completeness_score': 5, # Minimum score instead of 0
'potential_issues': False,
'legal_metrics': {
'text_length': 0,
'word_count': 0,
'legal_terms_found': 0
},
'reasoning': 'No legal details provided for analysis',
'top_classifications': [],
'document_verification': {},
'compliance_status': {},
'risk_assessment': {}
}
# Try to load the classifier with fallback
try:
classifier = load_model("zero-shot-classification")
except Exception as e:
logger.error(f"Error loading model in legal analysis: {str(e)}")
# Provide fallback scoring based on text content
legal_text_str = str(legal_text)
legal_terms = ['title', 'deed', 'registration', 'tax', 'permit', 'approval', 'certificate', 'compliance', 'legal']
legal_terms_found = sum(1 for term in legal_terms if term in legal_text_str.lower())
fallback_score = min(50, legal_terms_found * 10) # 10 points per legal term, max 50
return {
'assessment': 'basic',
'confidence': 0.3, # Basic confidence
'summary': f'Model loading error, using fallback analysis. Found {legal_terms_found} legal terms.',
'completeness_score': fallback_score,
'potential_issues': False,
'legal_metrics': {
'text_length': len(legal_text_str),
'word_count': len(legal_text_str.split()),
'legal_terms_found': legal_terms_found
},
'reasoning': f'Model loading error: {str(e)}. Using fallback scoring based on legal terms found.',
'top_classifications': [],
'document_verification': {},
'compliance_status': {},
'risk_assessment': {}
}
# Enhanced legal categories with more specific indicators
categories = [
# Title and Ownership
"clear title documentation",
"title verification documents",
"ownership transfer documents",
"inheritance documents",
"gift deed documents",
"power of attorney documents",
# Property Registration
"property registration documents",
"sale deed documents",
"conveyance deed documents",
"development agreement documents",
"joint development agreement documents",
# Tax and Financial
"property tax records",
"tax clearance certificates",
"encumbrance certificates",
"bank loan documents",
"mortgage documents",
# Approvals and Permits
"building permits",
"construction approvals",
"occupation certificates",
"completion certificates",
"environmental clearances",
# Land and Usage
"land use certificates",
"zoning certificates",
"layout approvals",
"master plan compliance",
"land conversion documents",
# Compliance and Legal
"legal compliance certificates",
"no objection certificates",
"fire safety certificates",
"structural stability certificates",
"water and electricity compliance",
# Disputes and Litigation
"property dispute records",
"litigation history",
"court orders",
"settlement agreements",
"pending legal cases"
]
# Create a more detailed context for analysis
legal_context = f"""
Legal Documentation Analysis:
{legal_text}
Please analyze the above legal documentation for:
1. Completeness of legal information
2. Presence of required documents
3. Compliance with regulations
4. Potential legal issues
5. Risk assessment
"""
# Analyze with the classifier
try:
legal_result = classifier(legal_context[:1000], categories, multi_label=True)
except Exception as e:
logger.error(f"Error in legal classification: {str(e)}")
# Fallback to simple analysis
return simple_legal_analysis(legal_text, categories)
# Calculate legal metrics
legal_metrics = calculate_legal_metrics(legal_result, categories)
# Get top classifications
top_classifications = []
for label, score in zip(legal_result['labels'][:5], legal_result['scores'][:5]):
if score > 0.2: # Lower threshold for legal terms
top_classifications.append({
'classification': label,
'confidence': float(score)
})
# Calculate completeness score
positive_categories = [
"clear title documentation", "property registration documents", "sale deed documents",
"property tax records", "building permits", "occupation certificates",
"legal compliance certificates", "no objection certificates"
]
positive_score = sum(score for label, score in zip(legal_result['labels'], legal_result['scores'])
if label in positive_categories)
completeness_score = min(100, int(positive_score * 100))
# Ensure minimum score for any legal content
if completeness_score < 10 and len(legal_text) > 20:
completeness_score = 10 # Minimum 10% for having some legal content
# Determine assessment
if completeness_score >= 80:
assessment = 'excellent'
confidence = 0.9
elif completeness_score >= 60:
assessment = 'good'
confidence = 0.7
elif completeness_score >= 40:
assessment = 'adequate'
confidence = 0.5
elif completeness_score >= 20:
assessment = 'basic'
confidence = 0.3
else:
assessment = 'basic'
confidence = 0.2
# Generate summary
summary = summarize_text(legal_text)
return {
'assessment': assessment,
'confidence': confidence,
'summary': summary,
'completeness_score': completeness_score,
'potential_issues': legal_metrics.get('potential_issues', False),
'legal_metrics': legal_metrics,
'reasoning': f'Legal analysis completed with {completeness_score}% completeness score.',
'top_classifications': top_classifications,
'document_verification': {
'title_docs': legal_metrics.get('title_docs', 0),
'registration_docs': legal_metrics.get('registration_docs', 0),
'tax_docs': legal_metrics.get('tax_docs', 0),
'approval_docs': legal_metrics.get('approval_docs', 0)
},
'compliance_status': {
'overall_compliance': legal_metrics.get('compliance_score', 0),
'missing_documents': legal_metrics.get('missing_docs', [])
},
'risk_assessment': {
'risk_level': legal_metrics.get('risk_level', 'low'),
'risk_factors': legal_metrics.get('risk_factors', [])
}
}
except Exception as e:
logger.error(f"Error in legal analysis: {str(e)}")
# Return reasonable fallback instead of complete failure
return {
'assessment': 'basic',
'confidence': 0.2,
'summary': 'Legal analysis failed due to technical error',
'completeness_score': 10, # Minimum score instead of 0
'potential_issues': False,
'legal_metrics': {
'text_length': len(str(legal_text)) if legal_text else 0,
'word_count': len(str(legal_text).split()) if legal_text else 0,
'legal_terms_found': 0
},
'reasoning': f'Legal analysis error: {str(e)}. Using fallback scoring.',
'top_classifications': [],
'document_verification': {},
'compliance_status': {},
'risk_assessment': {}
}
def calculate_legal_metrics(legal_result, categories):
"""Calculate legal metrics from classification results."""
try:
if not isinstance(legal_result, dict) or 'scores' not in legal_result:
# Return default metrics for fallback
return {
'title_and_ownership': 0.5,
'property_registration': 0.5,
'tax_and_financial': 0.5,
'approvals_and_permits': 0.5,
'land_and_usage': 0.5,
'compliance_and_legal': 0.5,
'disputes_and_litigation': 0.1
}
scores = legal_result.get('scores', [])
labels = legal_result.get('labels', [])
# Create a mapping of labels to scores
label_scores = dict(zip(labels, scores))
return {
'title_and_ownership': sum(label_scores.get(label, 0) for label in
['clear title documentation', 'title verification documents',
'ownership transfer documents', 'inheritance documents']) / 4,
'property_registration': sum(label_scores.get(label, 0) for label in
['property registration documents', 'sale deed documents',
'conveyance deed documents', 'development agreement documents']) / 4,
'tax_and_financial': sum(label_scores.get(label, 0) for label in
['property tax records', 'tax clearance certificates',
'encumbrance certificates', 'bank loan documents']) / 4,
'approvals_and_permits': sum(label_scores.get(label, 0) for label in
['building permits', 'construction approvals',
'occupation certificates', 'completion certificates']) / 4,
'land_and_usage': sum(label_scores.get(label, 0) for label in
['land use certificates', 'zoning certificates',
'layout approvals', 'master plan compliance']) / 4,
'compliance_and_legal': sum(label_scores.get(label, 0) for label in
['legal compliance certificates', 'no objection certificates',
'fire safety certificates', 'structural stability certificates']) / 4,
'disputes_and_litigation': sum(label_scores.get(label, 0) for label in
['property dispute records', 'litigation history',
'court orders', 'pending legal cases']) / 4
}
except Exception as e:
logger.error(f"Error calculating legal metrics: {str(e)}")
return {
'title_and_ownership': 0.5,
'property_registration': 0.5,
'tax_and_financial': 0.5,
'approvals_and_permits': 0.5,
'land_and_usage': 0.5,
'compliance_and_legal': 0.5,
'disputes_and_litigation': 0.1
}
def simple_legal_analysis(legal_text, categories):
"""Simple keyword-based legal analysis fallback."""
text_lower = legal_text.lower()
# Define keywords for each category
category_keywords = {
"clear title documentation": ["title", "clear", "documentation", "ownership"],
"property registration documents": ["registration", "property", "documents", "registered"],
"property tax records": ["tax", "property", "records", "assessment"],
"building permits": ["permit", "building", "construction", "approval"],
"legal compliance certificates": ["compliance", "legal", "certificate", "approved"],
"property dispute records": ["dispute", "litigation", "court", "case"],
"legitimate listing": ["real", "genuine", "authentic", "verified"]
}
scores = []
for category in categories:
keywords = category_keywords.get(category, [category.split()[0]]) # Use first word as fallback
score = sum(1 for keyword in keywords if keyword in text_lower) / len(keywords) if keywords else 0.1
scores.append(min(1.0, score))
return {
"labels": categories,
"scores": scores
}
def summarize_text(text):
"""Generate summary using model or fallback."""
try:
summarizer = load_model("summarization")
if hasattr(summarizer, 'task_type') and summarizer.task_type == "summarization":
# Using fallback summarizer
result = summarizer(text)
return result[0]['summary_text'] if result else text[:200] + "..."
else:
# Using actual model
result = summarizer(text, max_length=130, min_length=30, do_sample=False)
return result[0]['summary_text']
except Exception as e:
logger.warning(f"Model generation failed, using static summary: {str(e)}")
# Simple extractive summarization
sentences = text.split('.')
if len(sentences) > 3:
return '. '.join(sentences[:2]) + '.'
else:
return text[:200] + '...' if len(text) > 200 else text