Spaces:
Sleeping
Sleeping
| """ | |
| Utilities to ingest uploaded legal documents into persistent storage. | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| from dataclasses import dataclass | |
| from datetime import datetime, date | |
| from io import BytesIO | |
| from typing import BinaryIO, Dict, Optional | |
| from pathlib import Path | |
| import re | |
| from django.conf import settings | |
| from django.core.files.base import ContentFile | |
| from django.db import transaction | |
| from django.utils import timezone | |
| from hue_portal.core.models import ( | |
| LegalDocument, | |
| LegalSection, | |
| LegalDocumentImage, | |
| IngestionJob, | |
| ) | |
| from hue_portal.core.etl.legal_document_loader import load_legal_document | |
| from hue_portal.core.tasks import process_ingestion_job | |
| class LegalIngestionResult: | |
| document: LegalDocument | |
| created: bool | |
| sections_count: int | |
| images_count: int | |
| def _parse_date(value: Optional[str | date]) -> Optional[date]: | |
| if isinstance(value, date): | |
| return value | |
| if not value: | |
| return None | |
| for fmt in ("%Y-%m-%d", "%d/%m/%Y"): | |
| try: | |
| return datetime.strptime(value, fmt).date() | |
| except ValueError: | |
| continue | |
| return None | |
| def _sha256(data: bytes) -> str: | |
| digest = hashlib.sha256() | |
| digest.update(data) | |
| return digest.hexdigest() | |
| def _normalize_text(text: str) -> str: | |
| cleaned = re.sub(r"\s+", "", text or "") | |
| return cleaned.lower() | |
| DOC_TYPE_KEYWORDS = { | |
| "decision": ["quyết định"], | |
| "circular": ["thông tư"], | |
| "guideline": ["hướng dẫn"], | |
| "plan": ["kế hoạch"], | |
| } | |
| def _auto_fill_metadata( | |
| *, text: str, title: str, issued_by: str, issued_at: Optional[date], doc_type: str | |
| ) -> tuple[str, str, Optional[date], str]: | |
| head = (text or "")[:2000] | |
| if not issued_by: | |
| match = re.search(r"(BỘ\s+[A-ZÂĂÊÔƠƯ\s]+|ỦY BAN\s+NHÂN DÂN\s+[^\n]+)", head, re.IGNORECASE) | |
| if match: | |
| issued_by = match.group(0).strip() | |
| if not issued_at: | |
| match = re.search( | |
| r"(\d{1,2})[\/\-](\d{1,2})[\/\-](\d{4})", head, | |
| ) | |
| if match: | |
| day, month, year = match.groups() | |
| issued_at = _parse_date(f"{year}-{int(month):02d}-{int(day):02d}") | |
| else: | |
| match = re.search( | |
| r"ngày\s+(\d{1,2})\s+tháng\s+(\d{1,2})\s+năm\s+(\d{4})", | |
| head, | |
| re.IGNORECASE, | |
| ) | |
| if match: | |
| day, month, year = match.groups() | |
| issued_at = _parse_date(f"{year}-{int(month):02d}-{int(day):02d}") | |
| if doc_type == "other": | |
| lower = head.lower() | |
| for dtype, keywords in DOC_TYPE_KEYWORDS.items(): | |
| if any(keyword in lower for keyword in keywords): | |
| doc_type = dtype | |
| break | |
| if not title or title == (DOC_TYPE_KEYWORDS.get(doc_type, [title])[0] if doc_type != "other" else ""): | |
| match = re.search(r"(QUYẾT ĐỊNH|THÔNG TƯ|HƯỚNG DẪN|KẾ HOẠCH)[^\n]+", head, re.IGNORECASE) | |
| if match: | |
| title = match.group(0).strip().title() | |
| return title, issued_by, issued_at, doc_type | |
| def ingest_uploaded_document( | |
| *, | |
| file_obj: BinaryIO, | |
| filename: str, | |
| metadata: Dict, | |
| ) -> LegalIngestionResult: | |
| """ | |
| Ingest uploaded PDF/DOCX file, storing raw file, sections, and extracted images. | |
| Args: | |
| file_obj: Binary file-like object positioned at start. | |
| filename: Original filename. | |
| metadata: dict containing code, title, doc_type, summary, issued_by, issued_at, source_url, extra_metadata. | |
| """ | |
| code = metadata.get("code", "").strip() | |
| if not code: | |
| raise ValueError("Document code is required.") | |
| title = metadata.get("title") or code | |
| doc_type = metadata.get("doc_type", "other") | |
| issued_at = _parse_date(metadata.get("issued_at")) | |
| summary = metadata.get("summary", "") | |
| issued_by = metadata.get("issued_by", "") | |
| source_url = metadata.get("source_url", "") | |
| extra_metadata = metadata.get("metadata") or {} | |
| file_bytes = file_obj.read() | |
| if hasattr(file_obj, "seek"): | |
| file_obj.seek(0) | |
| checksum = _sha256(file_bytes) | |
| mime_type = metadata.get("mime_type") or getattr(file_obj, "content_type", "") | |
| size = len(file_bytes) | |
| extracted = load_legal_document(BytesIO(file_bytes), filename=filename) | |
| title, issued_by, issued_at, doc_type = _auto_fill_metadata( | |
| text=extracted.text, title=title, issued_by=issued_by, issued_at=issued_at, doc_type=doc_type | |
| ) | |
| normalized_text = _normalize_text(extracted.text) | |
| content_checksum = _sha256(normalized_text.encode("utf-8")) | |
| duplicate = ( | |
| LegalDocument.objects.filter(content_checksum=content_checksum) | |
| .exclude(code=code) | |
| .first() | |
| ) | |
| if duplicate: | |
| raise ValueError(f"Nội dung trùng với văn bản hiện có: {duplicate.code}") | |
| with transaction.atomic(): | |
| doc, created = LegalDocument.objects.get_or_create( | |
| code=code, | |
| defaults={ | |
| "title": title, | |
| "doc_type": doc_type, | |
| "summary": summary, | |
| "issued_by": issued_by, | |
| "issued_at": issued_at, | |
| "source_url": source_url, | |
| "metadata": extra_metadata, | |
| }, | |
| ) | |
| # Update metadata if document already existed (keep latest info) | |
| doc.title = title | |
| doc.doc_type = doc_type | |
| doc.summary = summary | |
| doc.issued_by = issued_by | |
| doc.issued_at = issued_at | |
| doc.source_url = source_url | |
| doc.metadata = extra_metadata | |
| doc.page_count = extracted.page_count | |
| doc.raw_text = extracted.text | |
| doc.raw_text_ocr = extracted.ocr_text or "" | |
| doc.file_checksum = checksum | |
| doc.content_checksum = content_checksum | |
| doc.file_size = size | |
| doc.mime_type = mime_type | |
| doc.original_filename = filename | |
| doc.updated_at = timezone.now() | |
| # Save binary file | |
| content = ContentFile(file_bytes) | |
| storage_name = f"{code}/{filename}" | |
| doc.uploaded_file.save(storage_name, content, save=False) | |
| doc.source_file = doc.uploaded_file.name | |
| doc.save() | |
| # Replace sections | |
| doc.sections.all().delete() | |
| sections = [] | |
| for idx, section in enumerate(extracted.sections, start=1): | |
| sections.append( | |
| LegalSection( | |
| document=doc, | |
| section_code=section.code, | |
| section_title=section.title, | |
| level=section.level, | |
| order=idx, | |
| content=section.content, | |
| excerpt=section.content[:400], | |
| page_start=section.page_start, | |
| page_end=section.page_end, | |
| is_ocr=section.is_ocr, | |
| metadata=section.metadata or {}, | |
| ) | |
| ) | |
| LegalSection.objects.bulk_create(sections, batch_size=200) | |
| # Replace images | |
| doc.images.all().delete() | |
| images = [] | |
| for idx, image in enumerate(extracted.images, start=1): | |
| image_content = ContentFile(image.data) | |
| image_name = f"{code}/img_{idx}.{image.extension}" | |
| img_instance = LegalDocumentImage( | |
| document=doc, | |
| page_number=image.page_number, | |
| description=image.description, | |
| width=image.width, | |
| height=image.height, | |
| checksum=_sha256(image.data), | |
| ) | |
| img_instance.image.save(image_name, image_content, save=False) | |
| images.append(img_instance) | |
| LegalDocumentImage.objects.bulk_create(images, batch_size=100) | |
| return LegalIngestionResult( | |
| document=doc, | |
| created=created, | |
| sections_count=len(sections), | |
| images_count=len(images), | |
| ) | |
| def enqueue_ingestion_job(*, file_obj, filename: str, metadata: Dict) -> IngestionJob: | |
| """ | |
| Persist uploaded file to a temporary job folder and enqueue Celery processing. | |
| """ | |
| job = IngestionJob.objects.create( | |
| code=metadata.get("code", ""), | |
| filename=filename, | |
| metadata=metadata, | |
| status=IngestionJob.STATUS_PENDING, | |
| ) | |
| temp_dir = Path(settings.MEDIA_ROOT) / "ingestion_jobs" / str(job.id) | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| temp_path = temp_dir / filename | |
| if hasattr(file_obj, "seek"): | |
| file_obj.seek(0) | |
| if hasattr(file_obj, "chunks"): | |
| with temp_path.open("wb") as dest: | |
| for chunk in file_obj.chunks(): | |
| dest.write(chunk) | |
| else: | |
| data = file_obj.read() | |
| with temp_path.open("wb") as dest: | |
| dest.write(data) | |
| job.storage_path = str(temp_path) | |
| job.save(update_fields=["storage_path"]) | |
| process_ingestion_job.delay(str(job.id)) | |
| return job | |