Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException | |
| from fastapi.responses import JSONResponse | |
| import tempfile | |
| import os | |
| from typing import List | |
| # Import our services | |
| from app.services.text_extractor import extract_text_from_pdf | |
| from app.services.preprocessor import segment_into_clauses | |
| from app.services.risk_analyzer import analyze_clause_with_gemini | |
| from app.services.risk_scorer import calculate_scores, get_risk_definition | |
| from app.schemas import AnalysisReport, AnalyzedClause, RiskFinding | |
| # Create FastAPI app instance | |
| app = FastAPI( | |
| title="Multilingual Legal Contract Analyzer", | |
| description="AI-powered contract analysis for English and Indic languages", | |
| version="1.0.0" | |
| ) | |
| async def analyze_contract(file: UploadFile = File(...)): | |
| """ | |
| Analyze a legal contract PDF and return detailed risk analysis. | |
| Args: | |
| file: PDF file to analyze | |
| Returns: | |
| AnalysisReport with risk analysis and suggestions | |
| """ | |
| # Validate file type | |
| if not file.filename.lower().endswith('.pdf'): | |
| raise HTTPException( | |
| status_code=400, detail="Only PDF files are supported") | |
| # Create temporary file to store uploaded PDF | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as temp_file: | |
| try: | |
| # Write uploaded file to temporary file | |
| content = await file.read() | |
| temp_file.write(content) | |
| temp_file.flush() | |
| # Step 1: Extract text from PDF | |
| print(f"Extracting text from {file.filename}...") | |
| full_text = extract_text_from_pdf(temp_file.name) | |
| if not full_text or len(full_text.strip()) < 50: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Unable to extract meaningful text from PDF. Please ensure the PDF is readable." | |
| ) | |
| # Step 2: Segment text into clauses | |
| print("Segmenting text into clauses...") | |
| clauses = segment_into_clauses(full_text) | |
| if not clauses: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Unable to identify contract clauses. Please ensure the document is a valid contract." | |
| ) | |
| # Step 3: Analyze each clause with Gemini AI | |
| print(f"Analyzing {len(clauses)} clauses with AI...") | |
| analyzed_clauses = [] | |
| for i, clause_text in enumerate(clauses, 1): | |
| print(f"Analyzing clause {i}/{len(clauses)}...") | |
| # Get AI analysis | |
| ai_result = analyze_clause_with_gemini(clause_text) | |
| # Convert AI results to RiskFinding objects | |
| risks = [] | |
| for risk_data in ai_result.get("risks", []): | |
| risk_id = risk_data.get("risk_id") | |
| if risk_id: | |
| risk_def = get_risk_definition(risk_id) | |
| risk_finding = RiskFinding( | |
| risk_id=risk_id, | |
| description=risk_data.get( | |
| "explanation", risk_def["description"]), | |
| score=risk_def["score"] | |
| ) | |
| risks.append(risk_finding) | |
| # Create AnalyzedClause object | |
| analyzed_clause = AnalyzedClause( | |
| clause_number=i, | |
| # Truncate for response | |
| text=clause_text[:500] + | |
| "..." if len(clause_text) > 500 else clause_text, | |
| risks=risks, | |
| suggestion=ai_result.get("suggestion") | |
| ) | |
| analyzed_clauses.append(analyzed_clause) | |
| # Step 4: Calculate final risk score | |
| print("Calculating final risk score...") | |
| final_score, all_findings = calculate_scores(analyzed_clauses) | |
| # Step 5: Determine contract type and language (basic detection) | |
| contract_type = "General Contract" # Could be enhanced with AI detection | |
| language = "English" # Could be enhanced with language detection | |
| # Create final analysis report | |
| analysis_report = AnalysisReport( | |
| file_name=file.filename, | |
| language=language, | |
| contract_type=contract_type, | |
| final_risk_score=final_score, | |
| clauses=analyzed_clauses | |
| ) | |
| print(f"Analysis complete. Final risk score: {final_score}") | |
| return analysis_report | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| print(f"Error during analysis: {e}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Analysis failed: {str(e)}" | |
| ) | |
| finally: | |
| # Clean up temporary file | |
| try: | |
| os.unlink(temp_file.name) | |
| except: | |
| pass | |
| async def root(): | |
| """Health check endpoint""" | |
| return {"message": "Multilingual Legal Contract Analyzer API is running"} | |
| async def health_check(): | |
| """Health check endpoint""" | |
| return {"status": "healthy", "service": "contract-analyzer"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |