mod-osint / engine /normalize.py
moddux's picture
deploy: HF sanitized GUI snapshot
b75c637
"""
Normalize — canonical schema builder + entity linking keys.
Reads raw files from the input directory, detects file types, parses
them into ``NormalizedRecord`` instances with deterministic row IDs
and entity-linking fields.
"""
from __future__ import annotations
import csv
import hashlib
import io
import json
import logging
import re
from pathlib import Path
from typing import Any, Dict, List, Optional
from engine.io_contract import FileType, InputFile, InputSpec, NormalizedRecord
logger = logging.getLogger("engine.normalize")
# ---------------------------------------------------------------------------
# File-type detection
# ---------------------------------------------------------------------------
_EXT_MAP: Dict[str, FileType] = {
".csv": FileType.CSV,
".json": FileType.JSON,
".txt": FileType.TXT,
".html": FileType.HTML,
".htm": FileType.HTML,
".log": FileType.LOG,
}
def detect_file_type(path: Path) -> FileType:
"""Detect file type from extension."""
return _EXT_MAP.get(path.suffix.lower(), FileType.UNKNOWN)
def _file_sha256(path: Path) -> str:
"""Compute SHA-256 hex digest of a file."""
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
# ---------------------------------------------------------------------------
# Build InputSpec from a directory
# ---------------------------------------------------------------------------
def build_input_spec(input_dir: Path) -> InputSpec:
"""
Scan *input_dir* for supported files and return an ``InputSpec``.
"""
input_dir = Path(input_dir)
files: List[InputFile] = []
if input_dir.is_file():
# Single file mode
ft = detect_file_type(input_dir)
files.append(InputFile(
path=input_dir,
file_type=ft,
size_bytes=input_dir.stat().st_size,
sha256=_file_sha256(input_dir),
))
input_dir = input_dir.parent
else:
for p in sorted(input_dir.iterdir()):
if p.is_file() and not p.name.startswith("."):
ft = detect_file_type(p)
files.append(InputFile(
path=p,
file_type=ft,
size_bytes=p.stat().st_size,
sha256=_file_sha256(p),
))
logger.info("InputSpec: %d files from %s", len(files), input_dir)
return InputSpec(input_dir=input_dir, files=files)
# ---------------------------------------------------------------------------
# Deterministic row ID
# ---------------------------------------------------------------------------
def _make_row_id(source_file: str, index: int, content_hash: str) -> str:
"""
Deterministic row ID = first 12 hex chars of SHA-256(source_file + index + content).
"""
raw = f"{source_file}:{index}:{content_hash}"
return hashlib.sha256(raw.encode()).hexdigest()[:12]
# ---------------------------------------------------------------------------
# Entity extraction helpers (lightweight, no ML)
# ---------------------------------------------------------------------------
_EMAIL_RE = re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+")
_IP_RE = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
_PHONE_RE = re.compile(r"\b\+?1?\d{9,15}\b")
_DOMAIN_RE = re.compile(r"\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}\b")
_HASH_RE = re.compile(r"\b[a-fA-F0-9]{32,64}\b")
def _extract_entities(text: str) -> Dict[str, Optional[str]]:
"""Extract first occurrence of common entity types from text."""
email_m = _EMAIL_RE.search(text)
ip_m = _IP_RE.search(text)
phone_m = _PHONE_RE.search(text)
domain_m = _DOMAIN_RE.search(text)
hash_m = _HASH_RE.search(text)
return {
"entity_email": email_m.group(0) if email_m else None,
"entity_ip": ip_m.group(0) if ip_m else None,
"entity_phone": phone_m.group(0) if phone_m else None,
"entity_domain": domain_m.group(0) if domain_m else None,
"entity_hash": hash_m.group(0) if hash_m else None,
}
# ---------------------------------------------------------------------------
# Parsers per file type
# ---------------------------------------------------------------------------
def _parse_csv(path: Path) -> List[NormalizedRecord]:
records: List[NormalizedRecord] = []
with open(path, newline="", encoding="utf-8", errors="replace") as f:
reader = csv.DictReader(f)
for idx, row in enumerate(reader):
text = json.dumps(row, ensure_ascii=False)
content_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
entities = _extract_entities(text)
# Try to pick up common column names
name = row.get("name") or row.get("Name") or row.get("entity_name")
phone = row.get("phone") or row.get("Phone") or row.get("entity_phone")
email = row.get("email") or row.get("Email") or row.get("entity_email")
records.append(NormalizedRecord(
row_id=_make_row_id(str(path), idx, content_hash),
source_file=str(path.name),
source_type=FileType.CSV,
entity_name=name or entities.get("entity_name"),
entity_phone=phone or entities.get("entity_phone"),
entity_email=email or entities.get("entity_email"),
entity_ip=entities.get("entity_ip"),
entity_domain=entities.get("entity_domain"),
entity_hash=entities.get("entity_hash"),
raw_text=text,
extra=dict(row),
))
return records
def _parse_json(path: Path) -> List[NormalizedRecord]:
records: List[NormalizedRecord] = []
with open(path, encoding="utf-8", errors="replace") as f:
data = json.load(f)
# Handle both single object and list of objects
items = data if isinstance(data, list) else [data]
for idx, item in enumerate(items):
text = json.dumps(item, ensure_ascii=False) if isinstance(item, dict) else str(item)
content_hash = hashlib.sha256(text.encode()).hexdigest()[:16]
entities = _extract_entities(text)
extra = item if isinstance(item, dict) else {"value": item}
records.append(NormalizedRecord(
row_id=_make_row_id(str(path), idx, content_hash),
source_file=str(path.name),
source_type=FileType.JSON,
entity_name=extra.get("name") if isinstance(extra, dict) else None,
entity_email=entities.get("entity_email"),
entity_ip=entities.get("entity_ip"),
entity_phone=entities.get("entity_phone"),
entity_domain=entities.get("entity_domain"),
entity_hash=entities.get("entity_hash"),
raw_text=text,
extra=extra,
))
return records
def _parse_text(path: Path, file_type: FileType = FileType.TXT) -> List[NormalizedRecord]:
"""Parse plain text / HTML / log files — one record per non-empty line."""
records: List[NormalizedRecord] = []
with open(path, encoding="utf-8", errors="replace") as f:
lines = f.readlines()
for idx, line in enumerate(lines):
line = line.strip()
if not line:
continue
content_hash = hashlib.sha256(line.encode()).hexdigest()[:16]
entities = _extract_entities(line)
records.append(NormalizedRecord(
row_id=_make_row_id(str(path), idx, content_hash),
source_file=str(path.name),
source_type=file_type,
entity_email=entities.get("entity_email"),
entity_ip=entities.get("entity_ip"),
entity_phone=entities.get("entity_phone"),
entity_domain=entities.get("entity_domain"),
entity_hash=entities.get("entity_hash"),
raw_text=line,
))
return records
# ---------------------------------------------------------------------------
# Main normalization entry point
# ---------------------------------------------------------------------------
def normalize_files(input_spec: InputSpec) -> List[NormalizedRecord]:
"""
Parse all files in *input_spec* and return a flat list of
``NormalizedRecord`` instances.
"""
all_records: List[NormalizedRecord] = []
for f in input_spec.files:
try:
if f.file_type == FileType.CSV:
recs = _parse_csv(f.path)
elif f.file_type == FileType.JSON:
recs = _parse_json(f.path)
elif f.file_type in (FileType.TXT, FileType.LOG):
recs = _parse_text(f.path, f.file_type)
elif f.file_type == FileType.HTML:
recs = _parse_text(f.path, FileType.HTML)
else:
recs = _parse_text(f.path, FileType.UNKNOWN)
logger.info("Parsed %d records from %s (%s)", len(recs), f.path.name, f.file_type.value)
all_records.extend(recs)
except Exception as exc:
logger.error("Failed to parse %s: %s", f.path, exc)
logger.info("Total normalized records: %d", len(all_records))
return all_records