gapura-ai / api /main.py
gapura-dev's picture
Improve forecast accuracy and grounded AI responses
74b006b
"""
Gapura AI Analysis API
FastAPI server for regression and NLP analysis of irregularity reports
Uses real trained models from ai-model/models/
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, field_validator
from pydantic_core import ValidationError
from typing import List, Optional, Dict, Any, Tuple
from collections import Counter
import os
import json
import logging
import hashlib
from datetime import datetime
import numpy as np
import pickle
import pandas as pd
import sys
import re
from urllib.parse import unquote
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from data.cache_service import get_cache, CacheService
from data.nlp_service import NLPModelService
from data.shap_service import get_shap_explainer
from data.anomaly_service import get_anomaly_detector
from data.remote_client import get_remote_client
from services.data_fetcher import DataFetcher
from services.orchestrator import get_orchestrator
from services.fallback import classify_severity as fallback_classify_severity
from services.fallback import classify_multi_label as fallback_classify_multi_label
from services import analytics as analytics_service
from services import reports as reports_service
from services import case_reasoner as cbr_service
from services import data_intelligence as di_service
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
VALID_SEVERITIES = {"Low", "Medium", "High", "Critical"}
SEVERITY_ORDER = {"Low": 1, "Medium": 2, "High": 3, "Critical": 4}
app = FastAPI(
title="Gapura AI Analysis API",
description="AI-powered analysis for irregularity reports",
version="1.0.0",
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.exception_handler(ValidationError)
async def validation_exception_handler(request: Request, exc: ValidationError):
return JSONResponse(
status_code=422,
content={
"detail": "Validation error",
"errors": exc.errors(),
"body": exc.json(),
},
)
def _normalize_header_name(name: str) -> str:
return name.strip().replace(" ", "_").replace("/", "_").upper()
def _filter_by_esklasi(
rows: List[Dict[str, Any]], pattern: Optional[str]
) -> List[Dict[str, Any]]:
"""DEPRECATED: ESKLASI_DIVISI column no longer exists in Google Sheets.
Returns rows unchanged for backward compatibility."""
return rows
def _rc_category_remote(
root_text: str, report_text: str, area: Optional[str], category: Optional[str]
) -> str:
try:
rc = get_remote_client()
if rc and rc.enabled:
res = rc.root_cause_classify(root_text, report_text, area, category)
if isinstance(res, dict):
lbl = (
res.get("primary_category")
or res.get("label")
or res.get("category")
)
if lbl:
return str(lbl)
except Exception:
pass
from data.root_cause_service import get_root_cause_service
svc = get_root_cause_service()
r = svc.classify(root_text, report_text, {"area": area, "category": category})
return str(r.get("primary_category") or "Unknown")
def _subcat_label_remote(
report: str,
area: Optional[str],
issue_type: Optional[str],
root_cause: Optional[str],
) -> str:
try:
rc = get_remote_client()
if rc and rc.enabled:
res = rc.subcategory_classify(report, area, issue_type, root_cause)
if isinstance(res, dict):
lbl = res.get("subcategory") or res.get("label")
if lbl:
return str(lbl)
except Exception:
pass
from data.subcategory_service import get_subcategory_classifier
clf = get_subcategory_classifier()
sc = clf.classify(report, area, issue_type, root_cause)
return str(sc.get("subcategory") or "Unknown")
def _normalize_severity_label(value: Any) -> Optional[str]:
raw = str(value or "").strip()
if not raw or raw in {"-", "#N/A", "#REF!"}:
return None
aliases = {
"low": "Low",
"minor": "Low",
"medium": "Medium",
"moderate": "Medium",
"high": "High",
"critical": "Critical",
"severe": "Critical",
}
return aliases.get(raw.lower())
def _record_text(record: Dict[str, Any]) -> str:
return " ".join(
str(record.get(key) or "")
for key in ("Report", "Root_Caused", "Action_Taken")
if record.get(key)
).strip()
def _sheet_severity(record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
for key in ("Severity_Level", "Severity", "severity"):
label = _normalize_severity_label(record.get(key))
if label:
return {"severity": label, "confidence": 1.0, "source": "sheet"}
return None
def _apply_context_guard(record: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
guarded = dict(result)
label = _normalize_severity_label(guarded.get("severity")) or "Medium"
text = _record_text(record).lower()
status = str(record.get("Status") or "").strip().upper()
minor_signal = "minor" in text or "kategori minor" in text
hard_critical = any(
kw in text
for kw in (
"fatal",
"kecelakaan",
"accident",
"cedera",
"injury",
"kebakaran",
"fire",
"darurat",
"emergency",
"membahayakan",
)
)
if minor_signal and label == "Critical" and not hard_critical:
label = "Medium"
guarded["context_adjustment"] = "minor_case_downgrade"
if status == "CLOSED" and minor_signal and SEVERITY_ORDER.get(label, 2) > 2:
label = "Medium"
guarded["context_adjustment"] = "closed_minor_case_downgrade"
guarded["severity"] = label
guarded["confidence"] = round(float(guarded.get("confidence") or 0.7), 2)
return guarded
def _classify_record_severity(record: Dict[str, Any]) -> Dict[str, Any]:
sheet_result = _sheet_severity(record)
if sheet_result:
return sheet_result
text = _record_text(record)
if not text:
return {"severity": "Low", "confidence": 0.5, "source": "empty_text"}
try:
nlp = getattr(model_service, "nlp_service", None)
if nlp is not None:
pred = nlp.classify_severity([text])[0]
source = "deep_learning" if getattr(nlp, "remote", None) and nlp.remote.enabled else getattr(nlp, "version", "trained_nlp")
return _apply_context_guard(
record,
{
"severity": pred.get("severity", "Medium"),
"confidence": pred.get("confidence", 0.7),
"source": source,
},
)
except Exception as exc:
logger.warning("NLP severity failed, falling back: %s", exc)
result = fallback_classify_severity([text])[0]
result["source"] = "rule_fallback"
return _apply_context_guard(record, result)
def _classify_records_severity(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
results: List[Optional[Dict[str, Any]]] = [None] * len(records)
pending: List[Tuple[int, Dict[str, Any]]] = []
for idx, record in enumerate(records):
sheet_result = _sheet_severity(record)
if sheet_result:
results[idx] = sheet_result
else:
pending.append((idx, record))
if pending:
texts = [_record_text(record) for _, record in pending]
try:
nlp = getattr(model_service, "nlp_service", None)
if nlp is None:
raise RuntimeError("NLP service unavailable")
preds = nlp.classify_severity(texts)
source = "deep_learning" if getattr(nlp, "remote", None) and nlp.remote.enabled else getattr(nlp, "version", "trained_nlp")
for (idx, record), pred in zip(pending, preds):
results[idx] = _apply_context_guard(
record,
{
"severity": pred.get("severity", "Medium"),
"confidence": pred.get("confidence", 0.7),
"source": source,
},
)
except Exception as exc:
logger.warning("Batch NLP severity failed, falling back: %s", exc)
preds = fallback_classify_severity(texts)
for (idx, record), pred in zip(pending, preds):
pred["source"] = "rule_fallback"
results[idx] = _apply_context_guard(record, pred)
return [r or {"severity": "Low", "confidence": 0.5, "source": "unknown"} for r in results]
from schemas.models import (
ReportCategoryEnum,
AreaEnum,
StatusEnum,
IrregularityReport,
AnalysisOptions,
AnalysisRequest,
ShapExplanation,
AnomalyResult,
RegressionPrediction,
RegressionResult,
ClassificationResult,
Entity,
EntityResult,
SummaryResult,
SentimentResult,
NLPResult,
TrendData,
TrendResult,
Metadata,
AnalysisResponse,
AnalyzeAllResponse,
ActionCategorySummary,
ActionSummaryResponse,
CategorySummaryResponse,
)
from enum import Enum
from datetime import date as date_type
# ============== Model Service (thin wrapper) ==============
class ModelService:
def __init__(self):
self.model_loaded = False
self.regression_version = "2.0.0-delegated"
self.nlp_version = "2.0.0-delegated"
self.model_metrics = {}
self.nlp_service = None
self.model_file_exists = False
self.remote = get_remote_client()
remote_only = os.getenv("DL_REMOTE_ONLY", "0").lower() in {"1", "true", "yes"}
if not (self.remote and self.remote.enabled and remote_only):
self._load_regression_model()
else:
self._load_regression_model()
self._load_nlp_service()
def _load_nlp_service(self):
try:
self.nlp_service = NLPModelService()
self.nlp_version = self.nlp_service.version
logger.info(f"NLP service loaded (version: {self.nlp_version})")
except Exception as e:
logger.warning(f"Failed to load NLP service: {e}")
def _load_regression_model(self):
try:
base_dir = os.path.join(
os.path.dirname(__file__), "..", "models", "regression"
)
app_dir = os.path.join("/app", "models", "regression")
onnx_candidates = [
os.path.join(base_dir, "resolution_predictor.onnx"),
os.path.join(app_dir, "resolution_predictor.onnx"),
]
onnx_path = next((p for p in onnx_candidates if os.path.exists(p)), None)
prefer_onnx = os.getenv("REGRESSION_USE_ONNX", "1").lower() in {
"1",
"true",
"yes",
}
if prefer_onnx and onnx_path:
import onnxruntime as ort
sess_options = ort.SessionOptions()
sess_options.intra_op_num_threads = int(os.getenv("ONNX_THREADS", "1"))
sess_options.graph_optimization_level = (
ort.GraphOptimizationLevel.ORT_ENABLE_ALL
)
self.regression_onnx_session = ort.InferenceSession(
onnx_path, sess_options
)
self.model_loaded = True
self.regression_version = "2.0.0-onnx"
self.model_path = onnx_path
self.model_file_exists = True
metrics_dir = os.path.dirname(onnx_path)
try:
for mf in [
"regression_metrics.json",
"resolution_predictor_latest_metrics.json",
]:
mp = os.path.join(metrics_dir, mf)
if os.path.exists(mp):
with open(mp, "r") as _f:
self.model_metrics = json.load(_f)
break
except Exception:
pass
logger.info(f"Regression ONNX model loaded from {onnx_path}")
else:
pkl_path = os.path.join(base_dir, "resolution_predictor_latest.pkl")
if os.path.exists(pkl_path):
self.model_path = os.path.abspath(pkl_path)
self.model_file_exists = True
with open(self.model_path, "rb") as f:
model_data = pickle.load(f)
if isinstance(model_data, dict):
self.model_metrics = model_data.get("metrics", {})
self.model_loaded = True
self.regression_version = "1.0.0-trained"
except Exception as e:
logger.warning(f"Regression model load failed: {e}")
def predict_regression(self, data: List[Dict]) -> List[RegressionPrediction]:
orch = get_orchestrator()
raw = orch.predict_regression(data)
predictions = []
for r in raw:
ci = (
r.get("confidenceInterval")
or r.get("confidence_interval")
or [0.1, 0.5]
)
if not isinstance(ci, (list, tuple)):
ci = [0.1, 0.5]
shap_info = (
r.get("shapExplanation") or r.get("shap_explanation") or r.get("shap")
)
shap_exp = None
if isinstance(shap_info, dict):
shap_exp = ShapExplanation(
baseValue=shap_info.get("baseValue", 0),
predictionExplained=bool(
shap_info.get("predictionExplained", True)
),
topFactors=shap_info.get("topFactors", []),
explanation=shap_info.get("explanation", ""),
)
an = r.get("anomalyDetection") or r.get("anomaly")
anomaly_res = None
if isinstance(an, dict):
anomaly_res = AnomalyResult(
isAnomaly=bool(an.get("isAnomaly", False)),
anomalyScore=float(an.get("anomalyScore", 0.0)),
anomalies=an.get("anomalies", []),
)
predictions.append(
RegressionPrediction(
reportId=r.get("reportId", "row_0"),
predictedDays=float(
r.get("predictedDays") or r.get("predicted_days") or 0.0
),
confidenceInterval=(float(ci[0]), float(ci[1])),
featureImportance=r.get("featureImportance", {}),
hasUnknownCategories=r.get("hasUnknownCategories", True),
shapExplanation=shap_exp,
anomalyDetection=anomaly_res,
)
)
return predictions
def classify_text(self, data: List[Dict]) -> List[ClassificationResult]:
orch = get_orchestrator()
raw = orch.classify_texts(data)
return [
ClassificationResult(
reportId=r["reportId"],
severity=r["severity"],
severityConfidence=r["severityConfidence"],
areaType=r["areaType"],
issueType=r["issueType"],
issueTypeConfidence=r["issueTypeConfidence"],
)
for r in raw
]
def extract_entities(self, data: List[Dict]) -> List[EntityResult]:
orch = get_orchestrator()
raw = orch.extract_entities(data)
return [
EntityResult(
reportId=r["reportId"],
entities=[Entity(**e) for e in r.get("entities", [])],
)
for r in raw
]
def generate_summary(self, data: List[Dict]) -> List[SummaryResult]:
orch = get_orchestrator()
raw = orch.generate_summaries(data)
return [
SummaryResult(
reportId=r["reportId"],
executiveSummary=r["executiveSummary"],
keyPoints=r["keyPoints"],
)
for r in raw
]
def analyze_sentiment(self, data: List[Dict]) -> List[SentimentResult]:
orch = get_orchestrator()
raw = orch.analyze_sentiment(data)
return [
SentimentResult(
reportId=r["reportId"],
urgencyScore=r["urgencyScore"],
sentiment=r["sentiment"],
keywords=r["keywords"],
)
for r in raw
]
def _classify_severity_fallback(self, texts: List[str]) -> List[Dict]:
return fallback_classify_severity(texts)
# Initialize model service
model_service = ModelService()
# ============== API Endpoints ==============
@app.get("/")
async def root():
"""API health check"""
return {
"status": "healthy",
"service": "Gapura AI Analysis API",
"version": "1.0.0",
"models": {
"regression": (
"loaded"
if model_service.model_loaded
else (
"available"
if getattr(model_service, "model_file_exists", False)
else "unavailable"
)
),
"nlp": model_service.nlp_version,
},
"timestamp": datetime.now().isoformat(),
}
@app.get("/api/ai/gse/ranking")
async def gse_ranking(
entity: str = "branch",
limit: int = 10,
bypass_cache: bool = False,
confidence_threshold: float = 0.0,
):
from api.gse_helpers import fetch_gse_data, get_group_name
from data.root_cause_service import get_root_cause_service
gse_rows_with_idx, mt, all_data = fetch_gse_data(
entity=entity,
confidence_threshold=confidence_threshold,
bypass_cache=bypass_cache,
)
if entity not in {"branch", "airline", "area"}:
raise HTTPException(
status_code=400, detail="Invalid 'entity'. Use branch|airline|area"
)
counts: Dict[str, int] = {}
high_counts = {"SDA": 0, "SDM": 0, "Maintenance": 0}
for r, idx in gse_rows_with_idx:
grp = get_group_name(r, entity)
counts[grp] = counts.get(grp, 0) + 1
rc_cat = "Unknown"
if idx < len(mt) and isinstance(mt[idx], dict):
rc_pred = mt[idx].get("root_cause") or {}
rc_lbl = str(rc_pred.get("label") or "Unknown")
rc_conf = float(rc_pred.get("confidence") or 0.0)
rc_cat = rc_lbl if rc_conf >= confidence_threshold else "Unknown"
if rc_cat == "Unknown":
rc_cat = _rc_category_remote(
str(r.get("Root_Caused") or ""),
str(r.get("Report") or ""),
r.get("Area"),
r.get("Irregularity_Complain_Category")
or r.get("Location_of_Incident"),
)
sub = _subcat_label_remote(
str(r.get("Report") or ""),
r.get("Area"),
r.get("Irregularity_Complain_Category") or r.get("Location_of_Incident"),
str(r.get("Root_Caused") or ""),
)
if sub == "The Availability of GSE":
high_counts["SDA"] += 1
elif sub == "Cleanliness of GSE" or rc_cat == "Equipment Failure":
high_counts["Maintenance"] += 1
elif rc_cat in {"Staff Competency", "Training Gap"} or sub in {
"Officer Competencies",
"Qualified Competencies (Apron)",
}:
high_counts["SDM"] += 1
total = sum(counts.values())
top = sorted(counts.items(), key=lambda x: -x[1])[: max(0, int(limit))]
top_list = [
{
"name": name,
"count": cnt,
"percentage": round((cnt / total * 100) if total else 0.0, 1),
}
for name, cnt in top
]
dist_high = {
k: {
"count": high_counts[k],
"percentage": round(
(high_counts[k] / len(gse_rows_with_idx) * 100)
if gse_rows_with_idx
else 0.0,
1,
),
}
for k in ["SDA", "SDM", "Maintenance"]
}
return {
"status": "success",
"entity": entity,
"filters": {"confidence_threshold": confidence_threshold},
"total_gse_records": len(gse_rows_with_idx),
"top": top_list,
"distribution_high_level": dist_high,
}
@app.get("/api/ai/gse/serviceability")
async def gse_serviceability(
bypass_cache: bool = False,
confidence_threshold: float = 0.0,
sample_n: int = 0,
entity_filter: Optional[str] = None,
entity_value: Optional[str] = None,
):
from api.gse_helpers import (
fetch_gse_data,
detect_equipment,
classify_serviceability,
)
gse_rows_with_idx, mt, all_data = fetch_gse_data(
entity_filter=entity_filter,
entity_value=entity_value,
confidence_threshold=confidence_threshold,
bypass_cache=bypass_cache,
)
status_counts = {
"Serviceable": 0,
"Unserviceable": 0,
"Needs Maintenance": 0,
"Unavailable": 0,
"Unknown": 0,
}
equipment_status: Dict[str, Dict[str, int]] = {}
samples_map: Dict[str, List[Dict[str, Any]]] = {
"Serviceable": [],
"Unserviceable": [],
"Needs Maintenance": [],
"Unavailable": [],
"Unknown": [],
}
for r, idx in gse_rows_with_idx:
t = " ".join(
[
str(r.get("Report") or ""),
str(r.get("Root_Caused") or ""),
str(r.get("Action_Taken") or ""),
]
)
eq = detect_equipment(t)
rc_cat = "Unknown"
if idx < len(mt) and isinstance(mt[idx], dict):
rc_pred = mt[idx].get("root_cause") or {}
rc_lbl = str(rc_pred.get("label") or "Unknown")
rc_conf = float(rc_pred.get("confidence") or 0.0)
rc_cat = rc_lbl if rc_conf >= confidence_threshold else "Unknown"
if rc_cat == "Unknown":
rc_cat = _rc_category_remote(
str(r.get("Root_Caused") or ""),
str(r.get("Report") or ""),
r.get("Area"),
r.get("Irregularity_Complain_Category")
or r.get("Location_of_Incident"),
)
status = classify_serviceability(t, rc_cat)
status_counts[status] += 1
if eq not in equipment_status:
equipment_status[eq] = {
"Serviceable": 0,
"Unserviceable": 0,
"Needs Maintenance": 0,
"Unavailable": 0,
"Unknown": 0,
}
equipment_status[eq][status] += 1
report_preview = (str(r.get("Report") or "")[:160]).strip()
root_preview = (str(r.get("Root_Caused") or "")[:160]).strip()
conf_score = 0.0
if idx < len(mt) and isinstance(mt[idx], dict):
rc_pred2 = mt[idx].get("root_cause") or {}
conf_score = float(rc_pred2.get("confidence") or 0.0)
samples_map[status].append(
{
"sheet": r.get("_sheet_name"),
"branch": r.get("Branch") or r.get("Cabang"),
"airline": r.get("Airlines") or r.get("Airline"),
"area": r.get("Area"),
"equipment": eq,
"status": status,
"root_cause_category": rc_cat,
"confidence": round(conf_score, 3),
"report_preview": report_preview,
"root_cause_preview": root_preview,
}
)
total = len(gse_rows_with_idx)
dist = {
k: {
"count": status_counts[k],
"percentage": round((status_counts[k] / total * 100) if total else 0.0, 1),
}
for k in [
"Serviceable",
"Unserviceable",
"Needs Maintenance",
"Unavailable",
"Unknown",
]
}
equipment_list = []
for eq, cnts in equipment_status.items():
equipment_list.append(
{
"equipment": eq,
"counts": cnts,
"total": sum(cnts.values()),
}
)
equipment_list = sorted(equipment_list, key=lambda x: -x["total"])[:50]
top_samples = {
k: sorted(v, key=lambda x: -x["confidence"])[: max(0, int(sample_n))]
for k, v in samples_map.items()
}
return {
"status": "success",
"filters": {"confidence_threshold": confidence_threshold},
"total_gse_records": total,
"serviceability_distribution": dist,
"equipment_status": equipment_list,
"top_samples": top_samples,
}
@app.get("/api/ai/gse/irregularities")
async def gse_irregularities(
bypass_cache: bool = False,
confidence_threshold: float = 0.0,
top_n: int = 50,
entity_filter: Optional[str] = None,
entity_value: Optional[str] = None,
):
from api.gse_helpers import fetch_gse_data
gse_rows_with_idx, mt, all_data = fetch_gse_data(
entity_filter=entity_filter,
entity_value=entity_value,
confidence_threshold=confidence_threshold,
bypass_cache=bypass_cache,
)
delays_kw = {"delay", "terlambat", "late", "tunda"}
manual_kw = {"manual", "manual handling", "tanpa alat", "tanpa bantuan alat"}
aging_kw = {"usia mesin", "aging", "old", "tua"}
cases: List[Dict[str, Any]] = []
from data.root_cause_service import get_root_cause_service
rc_service = get_root_cause_service()
for r, idx in gse_rows_with_idx:
report_text = str(r.get("Report") or "")
root_text = str(r.get("Root_Caused") or "")
txt = (report_text + " " + root_text).lower()
rc_cat = "Unknown"
if idx < len(mt) and isinstance(mt[idx], dict):
rc_pred = mt[idx].get("root_cause") or {}
rc_lbl = str(rc_pred.get("label") or "Unknown")
rc_conf = float(rc_pred.get("confidence") or 0.0)
rc_cat = rc_lbl if rc_conf >= confidence_threshold else "Unknown"
if rc_cat == "Unknown":
rc_cat = _rc_category_remote(
root_text,
report_text,
r.get("Area"),
r.get("Irregularity_Complain_Category")
or r.get("Location_of_Incident"),
)
tag = "GSE Case"
if rc_cat in {"Resource/Manpower"} and any(k in txt for k in delays_kw):
tag = "Kekurangan SDM menyebabkan delay"
elif rc_cat in {"Resource/Manpower"} and any(k in txt for k in manual_kw):
tag = "Kekurangan SDA Equipment menyebabkan handling manual"
elif (
rc_cat in {"Equipment Failure"}
and (
"loading" in txt
or "unloading" in txt
or "muat" in txt
or "bongkar" in txt
)
and (
("macet" in txt)
or ("tidak bisa" in txt)
or any(k in txt for k in aging_kw)
)
):
tag = "Irreg GSE macet saat loading/unloading perlu perbaikan"
cases.append(
{
"sheet": r.get("_sheet_name"),
"branch": r.get("Branch") or r.get("Cabang"),
"airline": r.get("Airlines") or r.get("Airline"),
"area": r.get("Area"),
"rc_category": rc_cat,
"tag": tag,
"report_preview": (report_text[:180]).strip(),
"root_cause_preview": (root_text[:180]).strip(),
}
)
tag_counts: Dict[str, int] = {}
for c in cases:
tag_counts[c["tag"]] = tag_counts.get(c["tag"], 0) + 1
total = len(cases)
dist_tags = {
k: {
"count": tag_counts[k],
"percentage": round((tag_counts[k] / total * 100) if total else 0.0, 1),
}
for k in sorted(tag_counts.keys(), key=lambda x: -tag_counts[x])
}
top_cases = sorted(cases, key=lambda x: x["tag"])[: max(0, int(top_n))]
return {
"status": "success",
"filters": {"confidence_threshold": confidence_threshold},
"total_gse_irregularity_cases": total,
"distribution_by_tag": dist_tags,
"cases": top_cases,
}
@app.get("/health")
async def health_check():
"""Detailed health check"""
cache = get_cache()
cache_health = cache.health_check()
nlp_ver = model_service.nlp_version or ""
nv = nlp_ver.lower()
if "remote" in nv:
nlp_status = "remote"
elif "onnx" in nv:
nlp_status = "onnx"
elif any(tok in nv for tok in ["hf", "bert", "indobert", "distilbert"]):
nlp_status = "bert"
elif "tfidf" in nv or "tf-idf" in nv:
nlp_status = "tfidf"
else:
nlp_status = "rule_based"
reg_loaded = bool(
getattr(model_service, "model_loaded", False)
or getattr(model_service, "regression_onnx_session", None)
or getattr(model_service, "regression_model", None)
or getattr(model_service, "model_file_exists", False)
)
return {
"status": "healthy",
"models": {
"regression": {
"version": model_service.regression_version,
"loaded": reg_loaded,
"metrics": model_service.model_metrics if reg_loaded else None,
},
"nlp": {
"version": model_service.nlp_version,
"status": nlp_status,
},
},
"cache": cache_health,
"timestamp": datetime.now().isoformat(),
}
@app.post("/api/ai/analyze", response_model=AnalysisResponse)
async def analyze_reports(request: AnalysisRequest):
"""
Analyze irregularity reports using AI models
Supports two input methods:
1. Google Sheet reference (sheetId + sheetName)
2. Direct data upload (data array)
"""
start_time = datetime.now()
try:
# Use direct data
if not request.data:
raise HTTPException(
status_code=400,
detail="No data provided. Either sheetId or data must be specified.",
)
# Convert IrregularityReport objects to dicts
data = [report.model_dump(exclude_none=True) for report in request.data]
total_records = len(data)
logger.info(f"Analyzing {total_records} records...")
# Initialize response
response = AnalysisResponse(
metadata=Metadata(
totalRecords=total_records,
processingTime=0.0,
modelVersions={
"regression": model_service.regression_version,
"nlp": model_service.nlp_version,
},
)
)
# Regression Analysis
if request.options.predictResolutionTime:
logger.info(f"Running regression analysis...")
predictions = model_service.predict_regression(data)
loaded = bool(model_service.model_loaded)
mm = model_service.model_metrics or {}
_mae = mm.get("test_mae") or mm.get("mae")
_rmse = mm.get("test_rmse") or mm.get("rmse")
_r2 = mm.get("test_r2") or mm.get("r2")
metrics = {
"mae": round(_mae, 3) if _mae is not None else None,
"rmse": round(_rmse, 3) if _rmse is not None else None,
"r2": round(_r2, 3) if _r2 is not None else None,
"model_loaded": loaded,
"note": "Using trained model"
if loaded
else "Model not available - using fallback predictions",
}
response.regression = RegressionResult(
predictions=predictions,
modelMetrics=metrics,
)
# NLP Analysis
if any(
[
request.options.classifySeverity,
request.options.extractEntities,
request.options.generateSummary,
]
):
logger.info(f"Running NLP analysis...")
classifications = []
entities = []
summaries = []
sentiment = []
if request.options.classifySeverity:
classifications = model_service.classify_text(data)
if request.options.extractEntities:
entities = model_service.extract_entities(data)
if request.options.generateSummary:
summaries = model_service.generate_summary(data)
sentiment = model_service.analyze_sentiment(data)
response.nlp = NLPResult(
classifications=classifications,
entities=entities,
summaries=summaries,
sentiment=sentiment,
)
# Trend Analysis
if request.options.analyzeTrends:
logger.info(f"Running trend analysis...")
by_airline = {}
by_hub = {}
by_category = {}
for item in data:
airline = item.get("Airlines", "Unknown")
hub = item.get("HUB", "Unknown")
category = item.get("Irregularity_Complain_Category", "Unknown")
# Airline aggregation
if airline not in by_airline:
by_airline[airline] = {"count": 0, "issues": []}
by_airline[airline]["count"] += 1
by_airline[airline]["issues"].append(category)
# Hub aggregation
if hub not in by_hub:
by_hub[hub] = {"count": 0, "issues": []}
by_hub[hub]["count"] += 1
by_hub[hub]["issues"].append(category)
# Category aggregation
if category not in by_category:
by_category[category] = {"count": 0}
by_category[category]["count"] += 1
# Convert to TrendData format
by_airline_trend = {
k: TrendData(
count=v["count"],
avgResolutionDays=2.0 + np.random.random(),
topIssues=list(set(v["issues"]))[:3],
)
for k, v in by_airline.items()
}
by_hub_trend = {
k: TrendData(
count=v["count"],
avgResolutionDays=2.0 + np.random.random(),
topIssues=list(set(v["issues"]))[:3],
)
for k, v in by_hub.items()
}
by_category_trend = {
k: {"count": v["count"], "trend": "stable"}
for k, v in by_category.items()
}
response.trends = TrendResult(
byAirline=by_airline_trend,
byHub=by_hub_trend,
byCategory=by_category_trend,
timeSeries=[],
)
# Calculate processing time
processing_time = (datetime.now() - start_time).total_seconds() * 1000
response.metadata.processingTime = round(processing_time, 2)
logger.info(f"Analysis completed in {processing_time:.2f}ms")
return response
except HTTPException:
raise
except Exception as e:
logger.error(f"Analysis error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/ai/predict-single")
async def predict_single(report: IrregularityReport):
"""
Predict for a single report in real-time
"""
try:
report_dict = report.model_dump(exclude_none=True)
predictions = model_service.predict_regression([report_dict])
classifications = model_service.classify_text([report_dict])
entities = model_service.extract_entities([report_dict])
summaries = model_service.generate_summary([report_dict])
sentiment = model_service.analyze_sentiment([report_dict])
return {
"prediction": predictions[0],
"classification": classifications[0],
"entities": entities[0],
"summary": summaries[0],
"sentiment": sentiment[0],
"modelLoaded": model_service.model_loaded,
}
except Exception as e:
logger.error(f"Single prediction error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/ai/train")
async def train_models(background_tasks: BackgroundTasks, force: bool = False):
"""
Trigger model retraining
Args:
force: If True, force training regardless of conditions
"""
from scripts.scheduled_training import TrainingScheduler
def run_training_task():
scheduler = TrainingScheduler()
result = scheduler.run_training(force=force)
logger.info(f"Training completed: {result}")
background_tasks.add_task(run_training_task)
return {
"status": "training_queued",
"message": "Model retraining has been started in the background",
"force": force,
"timestamp": datetime.now().isoformat(),
}
@app.get("/api/ai/train/status")
async def training_status():
"""Get training status and history"""
from scripts.scheduled_training import TrainingScheduler
scheduler = TrainingScheduler()
status = scheduler.get_status()
return {
"status": "success",
"data": status,
"timestamp": datetime.now().isoformat(),
}
@app.get("/api/ai/model-info")
async def model_info():
"""Get current model information"""
return {
"regression": {
"version": model_service.regression_version,
"type": "GradientBoostingRegressor",
"status": "loaded" if model_service.model_loaded else "unavailable",
"last_trained": "2025-01-15",
"metrics": model_service.model_metrics
if model_service.model_loaded
else None,
},
"nlp": {
"version": model_service.nlp_version,
"type": "Rule-based + Keyword extraction",
"status": "active",
"tasks": ["classification", "ner", "summarization", "sentiment"],
"note": "Full ML NLP models coming soon",
},
}
@app.post("/api/ai/cache/invalidate")
async def invalidate_cache(sheet_name: Optional[str] = None):
"""Invalidate cache for sheets data"""
cache = get_cache()
if sheet_name:
pattern = f"sheets:*{sheet_name}*"
deleted = cache.delete_pattern(pattern)
return {
"status": "success",
"message": f"Invalidated cache for sheet: {sheet_name}",
"keys_deleted": deleted,
}
else:
deleted = cache.delete_pattern("sheets:*")
return {
"status": "success",
"message": "Invalidated all sheets cache",
"keys_deleted": deleted,
}
@app.get("/api/ai/cache/status")
async def cache_status():
"""Get cache status and statistics"""
cache = get_cache()
return cache.health_check()
@app.get("/api/ai/analyze-all", response_model=AnalyzeAllResponse)
async def analyze_all_sheets(
bypass_cache: bool = False,
include_regression: bool = True,
include_nlp: bool = True,
include_trends: bool = True,
max_rows_per_sheet: int = 10000,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
):
"""
Analyze ALL rows from all Google Sheets
Fetches data from both NON CARGO and CGO sheets, analyzes each row,
and returns comprehensive results.
Args:
bypass_cache: Skip cache and fetch fresh data
include_regression: Include regression predictions
include_nlp: Include NLP analysis (severity, entities, summary)
include_trends: Include trend analysis
max_rows_per_sheet: Maximum rows to process per sheet
"""
start_time = datetime.now()
try:
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet,
esklasi_regex=esklasi_regex,
)
nc = sum(1 for r in all_data if r.get("_sheet_name") == "NON CARGO")
cg = sum(1 for r in all_data if r.get("_sheet_name") == "CGO")
sheet_info = {
"NON CARGO": {"rows_fetched": nc, "status": "success"},
"CGO": {"rows_fetched": cg, "status": "success"},
}
total_records = len(all_data)
if total_records == 0:
raise HTTPException(status_code=404, detail="No data found in any sheet")
logger.info(f"Analyzing {total_records} total records...")
results = []
batch_size = 100
for i in range(0, total_records, batch_size):
batch = all_data[i : i + batch_size]
if include_regression:
regression_preds = model_service.predict_regression(batch)
else:
regression_preds = [None] * len(batch)
if include_nlp:
classifications = model_service.classify_text(batch)
entities = model_service.extract_entities(batch)
summaries = model_service.generate_summary(batch)
sentiments = model_service.analyze_sentiment(batch)
else:
classifications = [None] * len(batch)
entities = [None] * len(batch)
summaries = [None] * len(batch)
sentiments = [None] * len(batch)
for j, row in enumerate(batch):
result = {
"rowId": row.get("_row_id", f"row_{i + j}"),
"sourceSheet": row.get("_source_sheet", "Unknown"),
"originalData": {
"date": row.get("Date_of_Event"),
"airline": row.get("Airlines"),
"flightNumber": row.get("Flight_Number"),
"branch": row.get("Branch"),
"hub": row.get("HUB"),
"route": row.get("Route"),
"category": row.get("Report_Category"),
"issueType": row.get("Irregularity_Complain_Category"),
"report": row.get("Report"),
"status": row.get("Status"),
},
}
if regression_preds[j] if j < len(regression_preds) else None:
result["prediction"] = {
"predictedDays": regression_preds[j].predictedDays,
"confidenceInterval": regression_preds[j].confidenceInterval,
"hasUnknownCategories": regression_preds[
j
].hasUnknownCategories,
"shapExplanation": regression_preds[
j
].shapExplanation.model_dump()
if regression_preds[j].shapExplanation
else None,
"anomalyDetection": regression_preds[
j
].anomalyDetection.model_dump()
if regression_preds[j].anomalyDetection
else None,
}
if classifications[j] if j < len(classifications) else None:
result["classification"] = classifications[j].model_dump()
if entities[j] if j < len(entities) else None:
result["entities"] = entities[j].model_dump()
if summaries[j] if j < len(summaries) else None:
result["summary"] = summaries[j].model_dump()
if sentiments[j] if j < len(sentiments) else None:
result["sentiment"] = sentiments[j].model_dump()
results.append(result)
summary = {
"totalRecords": total_records,
"sheetsProcessed": len(
[s for s in sheet_info.values() if s["status"] == "success"]
),
"regressionEnabled": include_regression,
"nlpEnabled": include_nlp,
}
if include_nlp and results:
severity_counts = {}
for r in results:
sev = r.get("classification", {}).get("severity", "Unknown")
severity_counts[sev] = severity_counts.get(sev, 0) + 1
summary["severityDistribution"] = severity_counts
if include_regression and results:
predictions = [
r["prediction"]["predictedDays"] for r in results if r.get("prediction")
]
if predictions:
summary["predictionStats"] = {
"min": round(min(predictions), 2),
"max": round(max(predictions), 2),
"mean": round(sum(predictions) / len(predictions), 2),
}
processing_time = (datetime.now() - start_time).total_seconds()
return AnalyzeAllResponse(
status="success",
metadata={
"totalRecords": total_records,
"processingTimeSeconds": round(processing_time, 2),
"recordsPerSecond": round(total_records / processing_time, 2)
if processing_time > 0
else 0,
"modelVersions": {
"regression": model_service.regression_version,
"nlp": model_service.nlp_version,
},
},
sheets=sheet_info,
results=results,
summary=summary,
timestamp=datetime.now().isoformat(),
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Analyze all error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
# ============== Risk Scoring Endpoints ==============
@app.get("/api/ai/risk/summary")
async def risk_summary(
bypass_cache: bool = False,
max_rows_per_sheet: int = 10000,
fast: bool = True,
branch: Optional[str] = None,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
):
from data.risk_service import get_risk_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_for_risk(
max_rows=max_rows_per_sheet,
fast=fast,
branch=branch,
esklasi_regex=esklasi_regex,
)
risk_service = get_risk_service()
risk_service.calculate_all_risk_scores(all_data, esklasi_regex=esklasi_regex)
return risk_service.get_risk_summary()
@app.get("/api/ai/risk/airlines")
async def airline_risks(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.risk_service import get_risk_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
risk_service = get_risk_service()
risk_service.calculate_all_risk_scores(all_data, esklasi_regex=esklasi_regex)
return risk_service.get_all_airline_risks()
@app.get("/api/ai/risk/airlines/{airline_name}")
async def airline_risk(
airline_name: str,
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
):
from data.risk_service import get_risk_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
risk_service = get_risk_service()
risk_service.calculate_all_risk_scores(all_data, esklasi_regex=esklasi_regex)
risk_data = risk_service.get_airline_risk(airline_name)
if not risk_data:
raise HTTPException(
status_code=404, detail=f"Airline '{airline_name}' not found"
)
recommendations = risk_service.get_risk_recommendations("airline", airline_name)
return {
"airline": airline_name,
"risk_data": risk_data,
"recommendations": recommendations,
}
@app.get("/api/ai/risk/branches")
async def branch_risks(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.risk_service import get_risk_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
risk_service = get_risk_service()
risk_service.calculate_all_risk_scores(all_data, esklasi_regex=esklasi_regex)
return risk_service.get_all_branch_risks()
@app.get("/api/ai/risk/hubs")
async def hub_risks(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.risk_service import get_risk_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
risk_service = get_risk_service()
risk_service.calculate_all_risk_scores(all_data, esklasi_regex=esklasi_regex)
return risk_service.get_all_hub_risks()
@app.get("/api/ai/risk/routes")
async def route_risks(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.risk_service import get_risk_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
risk_service = get_risk_service()
risk_service.calculate_all_risk_scores(all_data, esklasi_regex=esklasi_regex)
return risk_service.get_all_route_risks()
@app.get("/api/ai/risk/routes/{route_name}")
async def route_risk(
route_name: str,
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
):
from data.risk_service import get_risk_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
risk_service = get_risk_service()
risk_service.calculate_all_risk_scores(all_data, esklasi_regex=esklasi_regex)
risk_data = risk_service.get_route_risk(route_name)
if not risk_data:
raise HTTPException(status_code=404, detail=f"Route '{route_name}' not found")
recommendations = risk_service.get_risk_recommendations("route", route_name)
return {
"route": route_name,
"risk_data": risk_data,
"recommendations": recommendations,
}
@app.get("/api/ai/risk/categories")
async def category_risks(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.risk_service import get_risk_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
risk_service = get_risk_service()
risk_service.calculate_all_risk_scores(all_data, esklasi_regex=esklasi_regex)
return risk_service.get_all_category_risks()
@app.get("/api/ai/risk/categories/{category_name:path}")
async def category_risk(
category_name: str,
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
):
from data.risk_service import get_risk_service
category_name = unquote(category_name).strip()
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
risk_service = get_risk_service()
risk_service.calculate_all_risk_scores(all_data, esklasi_regex=esklasi_regex)
risk_data = risk_service.get_category_risk(category_name)
if not risk_data:
raise HTTPException(
status_code=404, detail=f"Category '{category_name}' not found"
)
recommendations = risk_service.get_risk_recommendations("category", category_name)
return {
"category": category_name,
"risk_data": risk_data,
"recommendations": recommendations,
}
@app.get("/api/ai/gse")
async def gse_risk(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.risk_service import get_risk_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
risk_service = get_risk_service()
risk_service.calculate_all_risk_scores(all_data, esklasi_regex=esklasi_regex)
all_cats = risk_service.get_all_category_risks()
gse_cats = {
k: v for k, v in all_cats.items() if "GSE" in k.upper() or "gse" in k.lower()
}
if not gse_cats:
gse_records = [
r
for r in all_data
if "gse" in str(r.get("Irregularity_Complain_Category", "")).lower()
or "gse" in str(r.get("Report", "")).lower()
]
return {
"category": "GSE",
"risk_data": {"total_issues": len(gse_records), "risk_level": "Medium"},
"gse_categories": [],
"recommendations": [],
}
total_issues = sum(v.get("total_issues", 0) for v in gse_cats.values())
avg_risk = (
sum(v.get("risk_score", 0) for v in gse_cats.values()) / len(gse_cats)
if gse_cats
else 0
)
return {
"category": "GSE",
"risk_data": {
"total_issues": total_issues,
"risk_score": round(avg_risk, 2),
"risk_level": "Critical"
if avg_risk > 75
else "High"
if avg_risk > 50
else "Medium",
"gse_sub_categories": list(gse_cats.keys()),
},
"recommendations": risk_service.get_risk_recommendations(
"category", list(gse_cats.keys())[0]
)
if gse_cats
else [],
}
@app.get("/api/ai/gse/issues/top")
async def gse_top_issue(
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
sample_n: int = 0,
confidence_threshold: float = 0.0,
):
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
def _is_gse(r: Dict[str, Any]) -> bool:
cat = str(r.get("Irregularity_Complain_Category") or "").lower()
txt = " ".join(
[
str(r.get("Report") or ""),
str(r.get("Root_Caused") or ""),
str(r.get("Action_Taken") or ""),
]
).lower()
eq_kw = {
"gse",
"ground support equipment",
"forklift",
"belt loader",
"beltloader",
"gpu",
"tow tractor",
"air start",
"conveyor",
"loader",
"belt",
}
if ("gse" in cat) or ("ground support equipment" in cat):
return True
return any(k in txt for k in eq_kw)
from data.nlp_service import get_nlp_service
texts = [
str(r.get("Report") or "") + " [SEP] " + str(r.get("Root_Caused") or "")
for r in all_data
]
nlp = get_nlp_service()
mt = nlp.predict_multi_task(texts) or []
gse_rows_with_idx = [(r, i) for i, r in enumerate(all_data) if _is_gse(r)]
gse_rows = [r for r, _ in gse_rows_with_idx]
from services.fallback import classify_severity
counts: Dict[str, int] = {"SDA": 0, "SDM": 0, "Maintenance": 0}
subcat_counts: Dict[str, int] = {}
rc_counts: Dict[str, int] = {}
total = len(gse_rows)
for r, idx in gse_rows_with_idx:
sub = (
_subcat_label_remote(
str(r.get("Report") or ""),
r.get("Area"),
r.get("Irregularity_Complain_Category"),
str(r.get("Root_Caused") or ""),
)
or "Unknown"
)
subcat_counts[sub] = subcat_counts.get(sub, 0) + 1
rc_cat = _rc_category_remote(
str(r.get("Root_Caused") or ""),
str(r.get("Report") or ""),
r.get("Area"),
r.get("Irregularity_Complain_Category"),
)
rc_counts[rc_cat] = rc_counts.get(rc_cat, 0) + 1
if sub == "The Availability of GSE":
counts["SDA"] += 1
elif sub == "Cleanliness of GSE" or rc_cat == "Equipment Failure":
counts["Maintenance"] += 1
elif rc_cat in {"Staff Competency", "Training Gap"}:
counts["SDM"] += 1
dist_high = {
k: {
"count": counts[k],
"percentage": round((counts[k] / total * 100) if total else 0.0, 1),
}
for k in ["SDA", "SDM", "Maintenance"]
}
dist_sub = {
k: {
"count": subcat_counts.get(k, 0),
"percentage": round(
(subcat_counts.get(k, 0) / total * 100) if total else 0.0, 1
),
}
for k in sorted(subcat_counts.keys())
}
dist_rc = {
k: {
"count": rc_counts.get(k, 0),
"percentage": round(
(rc_counts.get(k, 0) / total * 100) if total else 0.0, 1
),
}
for k in sorted(rc_counts.keys())
}
top_high = sorted(dist_high.items(), key=lambda x: -x[1]["count"])
top_high_item = (
top_high[0] if top_high else ("Unknown", {"count": 0, "percentage": 0.0})
)
top_sub = sorted(dist_sub.items(), key=lambda x: -x[1]["count"])
top_sub_item = (
top_sub[0] if top_sub else ("Unknown", {"count": 0, "percentage": 0.0})
)
top_rc = sorted(dist_rc.items(), key=lambda x: -x[1]["count"])
top_rc_item = top_rc[0] if top_rc else ("Unknown", {"count": 0, "percentage": 0.0})
return {
"status": "success",
"total_gse_records": total,
"top": {
"category": top_high_item[0],
"count": top_high_item[1]["count"],
"percentage": top_high_item[1]["percentage"],
},
"distribution": dist_high,
"top_subcategory": {
"subcategory": top_sub_item[0],
"count": top_sub_item[1]["count"],
"percentage": top_sub_item[1]["percentage"],
},
"distribution_subcategory": dist_sub,
"top_root_cause": {
"category": top_rc_item[0],
"count": top_rc_item[1]["count"],
"percentage": top_rc_item[1]["percentage"],
},
"distribution_root_cause": dist_rc,
}
@app.post("/api/ai/risk/calculate")
async def calculate_risk_scores(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.risk_service import get_risk_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
risk_service = get_risk_service()
risk_data = risk_service.calculate_all_risk_scores(
all_data, esklasi_regex=esklasi_regex
)
return {
"status": "success",
"records_processed": len(all_data),
"risk_summary": risk_service.get_risk_summary(),
}
@app.get("/api/ai/sheets/debug")
async def sheets_debug(
bypass_cache: bool = False,
max_rows_per_sheet: int = 5000,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
):
from data.sheets_service import GoogleSheetsService
cache = get_cache() if not bypass_cache else None
sheets_service = GoogleSheetsService(cache=cache)
spreadsheet_id = os.getenv("GOOGLE_SHEET_ID")
if not spreadsheet_id:
raise HTTPException(status_code=500, detail="GOOGLE_SHEET_ID not configured")
header_ranges = ["NON CARGO!1:1", "CGO!1:1"]
resp = (
sheets_service.service.spreadsheets()
.values()
.batchGet(
spreadsheetId=spreadsheet_id,
ranges=header_ranges,
valueRenderOption="UNFORMATTED_VALUE",
)
.execute()
)
headers_map: Dict[str, Any] = {}
for vr in resp.get("valueRanges", []):
rng = vr.get("range", "")
sheet_name = rng.split("!")[0].replace("'", "")
raw = vr.get("values", [[]])[0] if vr.get("values") else []
normalized = [h.strip().replace(" ", "_").replace("/", "_") for h in raw]
headers_map[sheet_name] = {"raw": raw, "normalized": normalized}
non_cargo = sheets_service.fetch_sheet_data(
spreadsheet_id,
"NON CARGO",
f"A1:AZ{max_rows_per_sheet}",
bypass_cache=bypass_cache,
)
cargo = sheets_service.fetch_sheet_data(
spreadsheet_id, "CGO", f"A1:AZ{max_rows_per_sheet}", bypass_cache=bypass_cache
)
for row in non_cargo:
row["_sheet_name"] = "NON CARGO"
for row in cargo:
row["_sheet_name"] = "CGO"
all_data = non_cargo + cargo
filtered = _filter_by_esklasi(all_data, esklasi_regex)
def _has_text(row: Dict[str, Any]) -> bool:
return bool(
str(row.get("Report", "")).strip()
or str(row.get("Report_Description", "")).strip()
or str(row.get("Root_Caused", "")).strip()
or str(row.get("Root_Cause", "")).strip()
or str(row.get("Action_Taken", "")).strip()
)
def _get_norm(row: Dict[str, Any], target_norm: str) -> Optional[str]:
for k in row.keys():
if _normalize_header_name(k) == target_norm:
return str(row.get(k) or "")
return ""
def _unique_values(rows: List[Dict[str, Any]], target_norm: str) -> Dict[str, int]:
counter: Dict[str, int] = {}
for r in rows:
v = _get_norm(r, target_norm)
if v:
counter[v] = counter.get(v, 0) + 1
return dict(sorted(counter.items(), key=lambda x: -x[1])[:50])
text_stats = {
"NON CARGO": {
"hasText": sum(1 for r in non_cargo if _has_text(r)),
"emptyText": sum(1 for r in non_cargo if not _has_text(r)),
},
"CGO": {
"hasText": sum(1 for r in cargo if _has_text(r)),
"emptyText": sum(1 for r in cargo if not _has_text(r)),
},
}
unique_values = {
"NON CARGO": {
"ESKLASI_DIVISI": _unique_values(non_cargo, "ESKLASI_DIVISI"),
"Irregularity_Complain_Category": _unique_values(
non_cargo, "IRREGULARITY_COMPLAIN_CATEGORY"
),
"Area": _unique_values(non_cargo, "AREA"),
"Airlines": _unique_values(non_cargo, "AIRLINES"),
"Branch": _unique_values(non_cargo, "BRANCH"),
},
"CGO": {
"ESKLASI_DIVISI": _unique_values(cargo, "ESKLASI_DIVISI"),
"Irregularity_Complain_Category": _unique_values(
cargo, "IRREGULARITY_COMPLAIN_CATEGORY"
),
"Area": _unique_values(cargo, "AREA"),
"Airlines": _unique_values(cargo, "AIRLINES"),
"Branch": _unique_values(cargo, "BRANCH"),
},
}
eq_kw = {
"gse",
"ground support equipment",
"forklift",
"belt loader",
"beltloader",
"gpu",
"tow tractor",
"air start",
"conveyor",
"loader",
"belt",
}
def _contains_gse_kw(row: Dict[str, Any]) -> bool:
t = " ".join(
[
str(row.get("Report", "") or ""),
str(row.get("Root_Caused", "") or ""),
str(row.get("Action_Taken", "") or ""),
]
).lower()
return any(k in t for k in eq_kw) or (
"gse" in str(row.get("Irregularity_Complain_Category", "")).lower()
)
gse_kw_counts = {
"NON CARGO": sum(1 for r in non_cargo if _contains_gse_kw(r)),
"CGO": sum(1 for r in cargo if _contains_gse_kw(r)),
"total": sum(1 for r in all_data if _contains_gse_kw(r)),
}
return {
"headers": headers_map,
"counts": {
"NON CARGO": len(non_cargo),
"CGO": len(cargo),
"total": len(all_data),
},
"regex": {"pattern": esklasi_regex, "matches": len(filtered)},
"textStats": text_stats,
"uniqueValues": unique_values,
"gseKeywordMatches": gse_kw_counts,
"matchedRecords": filtered,
}
# ============== Subcategory Classification Endpoints ==============
@app.post("/api/ai/subcategory")
async def classify_subcategory(
report: str,
area: Optional[str] = None,
issue_type: Optional[str] = None,
root_cause: Optional[str] = None,
):
"""Classify report into subcategory"""
try:
rc = get_remote_client()
if rc and rc.enabled:
res = rc.subcategory_classify(report, area, issue_type, root_cause)
if res:
return res
except Exception:
pass
from data.subcategory_service import get_subcategory_classifier
classifier = get_subcategory_classifier()
result = classifier.classify(report, area, issue_type, root_cause)
return result
@app.get("/api/ai/subcategory/categories")
async def get_subcategories(area: Optional[str] = None):
"""Get list of available subcategories"""
from data.subcategory_service import get_subcategory_classifier
classifier = get_subcategory_classifier()
return classifier.get_available_categories(area)
# ============== Action Recommendation Endpoints ==============
@app.post("/api/ai/action/recommend")
async def recommend_actions(
report: str,
issue_type: str,
severity: str = "Medium",
area: Optional[str] = None,
airline: Optional[str] = None,
top_n: int = 5,
):
"""Get action recommendations for an issue"""
rc = None
try:
rc = get_remote_client()
except Exception:
rc = None
from data.action_service import get_action_service
action_service = get_action_service()
remote_res = None
similar_pool = None
if rc and rc.enabled:
try:
similar_payload = {"text": report, "top_k": 30, "threshold": 0.2}
sim_res = (
rc._post("/similarity/search", similar_payload, cache_ttl=60) or {}
)
sp = []
for it in sim_res.get("similar_reports") or []:
meta = it.get("meta") or {}
rid = (
meta.get("Row ID")
or meta.get("row_id")
or meta.get("ROW_ID")
or meta.get("RowId")
)
if not rid:
base_parts = [
str(meta.get("Date of Event") or ""),
str(
meta.get("Airlines")
or meta.get("airline")
or meta.get("MASKAPAI (VLOOKUP)")
or ""
),
str(
meta.get("Flight Number") or meta.get("flight_number") or ""
),
str(meta.get("Route") or ""),
str(
meta.get("Branch")
or meta.get("branch")
or meta.get("KODE CABANG (VLOOKUP)")
or ""
),
str(meta.get("_sheet_name") or meta.get("sheet_name") or ""),
str(it.get("text") or ""),
]
base_str = "|".join(base_parts).strip()
rid = (
hashlib.sha1(base_str.encode("utf-8")).hexdigest()[:10]
if base_str
else None
)
sp.append(
{
"similarity": float(it.get("score", 0.0)),
"report_metadata": {
"row_id": rid,
"airline": meta.get("Airlines")
or meta.get("airline")
or meta.get("MASKAPAI (VLOOKUP)"),
"area": meta.get("Area") or meta.get("area"),
"branch": meta.get("Branch")
or meta.get("branch")
or meta.get("KODE CABANG (VLOOKUP)"),
"status": meta.get("Status") or meta.get("status"),
"category": meta.get("Irregularity/Complain Category")
or meta.get("Irregularity_Complain_Category")
or meta.get("Report Category")
or meta.get("category"),
"action_taken": meta.get("Action Taken")
or meta.get("action_taken"),
"gapura_action": meta.get("Gapura KPS Action Taken")
or meta.get("gapura_action"),
},
}
)
similar_pool = sp
except Exception:
similar_pool = None
try:
remote_res = rc.action_recommend(
report, issue_type, severity, area, airline, top_n
)
except Exception:
remote_res = None
res = action_service.recommend(
report=report,
issue_type=issue_type,
severity=severity,
area=area,
airline=airline,
branch=None,
top_n=top_n,
similar_pool=similar_pool,
)
if remote_res and isinstance(remote_res, list):
merged = list(res.get("recommendations", []))
for r in remote_res:
act = (r.get("action") or "").strip()
if not act:
continue
if any(m.get("action") == act for m in merged):
continue
merged.append(
{
"action": act,
"priority": r.get("priority") or "MEDIUM",
"source": "dl-space",
"rationale": r.get("rationale") or "rekomendasi dari DL Space",
"confidence": float(r.get("score") or r.get("confidence") or 0.7),
}
)
merged.sort(key=lambda x: (-x.get("confidence", 0.0)))
res["recommendations"] = merged[:top_n]
return res
@app.post("/api/ai/action/train")
async def train_action_recommender(
bypass_cache: bool = False,
background_tasks: BackgroundTasks = None,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
):
from data.action_service import get_action_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
action_service = get_action_service()
action_service.train_from_data(all_data)
return {"status": "success", "records_processed": len(all_data)}
# ============== Action Summary Endpoint ==============
@app.get("/api/ai/action-summary", response_model=ActionSummaryResponse)
async def get_action_summary(
bypass_cache: bool = False,
branch: Optional[str] = None,
top_n_per_category: int = 5,
include_closed: bool = True,
max_rows_per_sheet: int = 5000,
fast: bool = True,
approximate_avg_days: bool = True,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
):
"""
Get action recommendations summary aggregated by category.
Fetches all rows from both sheets (NON CARGO & CGO),
analyzes each row, and aggregates recommended actions by category.
"""
from data.action_service import get_action_service
start_time = datetime.now()
cache = get_cache() if not bypass_cache else None
cache_ttl = int(os.getenv("ACTION_SUMMARY_TTL", "600"))
cache_key_parts = [
"action_summary",
str(branch or ""),
"closed" if include_closed else "open_only",
str(top_n_per_category),
str(max_rows_per_sheet),
"fast" if fast else "full",
"approx" if approximate_avg_days else "no_approx",
]
cache_key = "as:" + hashlib.md5(":".join(cache_key_parts).encode()).hexdigest()
if cache and not bypass_cache:
cached = cache.get(cache_key)
if cached:
return cached
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_for_action_summary(
max_rows=max_rows_per_sheet,
fast=fast,
branch=branch,
include_closed=include_closed,
esklasi_regex=esklasi_regex,
)
total_records = len(all_data)
if total_records == 0:
return ActionSummaryResponse(
status="success",
totalRecords=0,
categories={},
overallSummary={},
topCategoriesByCount=[],
topCategoriesByRisk=[],
globalRecommendations=[],
timestamp=datetime.now().isoformat(),
)
action_service = get_action_service()
categories_agg: Dict[str, Dict[str, Any]] = {}
all_severities = Counter()
all_actions: List[Dict[str, Any]] = []
total_open = 0
total_closed = 0
total_high_priority = 0
total_resolution_days = []
use_regression = (not fast) and model_service.model_loaded
batch_size = 200
for i in range(0, total_records, batch_size):
batch = all_data[i : i + batch_size]
if use_regression:
try:
predictions = model_service.predict_regression(batch)
except Exception:
predictions = []
use_regression = False
else:
predictions = []
if fast:
classifications = model_service._classify_severity_fallback(
[
(r.get("Report", "") or "") + " " + (r.get("Root_Caused", "") or "")
for r in batch
]
)
else:
try:
classifications = model_service.classify_text(batch)
except Exception:
classifications = model_service._classify_severity_fallback(
[
(r.get("Report", "") or "")
+ " "
+ (r.get("Root_Caused", "") or "")
for r in batch
]
)
for j, record in enumerate(batch):
category = record.get("Irregularity_Complain_Category") or "Unknown"
if not category or category == "#N/A":
category = "Unknown"
if j < len(classifications):
sev_obj = classifications[j]
if isinstance(sev_obj, dict):
severity = sev_obj.get("severity", "Low")
else:
severity = getattr(sev_obj, "severity", "Low")
else:
severity = "Low"
predicted_days = 0.0
if predictions and j < len(predictions):
pred_obj = predictions[j]
if isinstance(pred_obj, RegressionPrediction):
predicted_days = pred_obj.predictedDays
elif isinstance(pred_obj, dict):
predicted_days = pred_obj.get("predictedDays", 0.0)
elif approximate_avg_days:
sev_map = {
"Low": 1.2,
"Medium": 2.2,
"High": 3.0,
"Critical": 4.0,
}
predicted_days = sev_map.get(severity, 2.0)
status = (record.get("Status") or "").strip().lower()
if category not in categories_agg:
categories_agg[category] = {
"count": 0,
"severities": Counter(),
"actions": [],
"resolution_days": [],
"hubs": Counter(),
"airlines": Counter(),
"open_count": 0,
"closed_count": 0,
"high_priority_count": 0,
"records": [],
}
cat_data = categories_agg[category]
cat_data["count"] += 1
cat_data["severities"][severity] += 1
if use_regression or approximate_avg_days:
cat_data["resolution_days"].append(predicted_days)
all_severities[severity] += 1
if use_regression or approximate_avg_days:
total_resolution_days.append(predicted_days)
hub = record.get("HUB") or "Unknown"
airline = record.get("Airlines") or "Unknown"
cat_data["hubs"][hub] += 1
cat_data["airlines"][airline] += 1
if status == "closed":
cat_data["closed_count"] += 1
total_closed += 1
else:
cat_data["open_count"] += 1
total_open += 1
if severity in ("Critical", "High"):
cat_data["high_priority_count"] += 1
total_high_priority += 1
if len(cat_data["records"]) < 20:
cat_data["records"].append(record)
for category, cat_data in categories_agg.items():
records = cat_data["records"]
if records:
sample_record = records[0]
report = sample_record.get("Report", "") or ""
area = (sample_record.get("Area", "") or "").replace(" Area", "")
airline = sample_record.get("Airlines", "")
branch_val = sample_record.get("Branch", "")
severity_counts = cat_data["severities"]
dominant_severity = (
severity_counts.most_common(1)[0][0] if severity_counts else "Medium"
)
recs = action_service.recommend(
report=report,
issue_type=category,
severity=dominant_severity,
area=area if area else None,
airline=airline if airline else None,
branch=branch_val if branch_val else None,
top_n=top_n_per_category,
)
cat_data["actions"] = recs.get("recommendations", [])
cat_data["effectiveness"] = recs.get("effectiveness_score", 0.5)
else:
cat_data["actions"] = []
cat_data["effectiveness"] = 0.5
category_summaries: Dict[str, ActionCategorySummary] = {}
for category, cat_data in categories_agg.items():
avg_days = None
if cat_data["resolution_days"]:
avg_days = round(
sum(cat_data["resolution_days"]) / len(cat_data["resolution_days"]), 2
)
top_actions = []
for action in cat_data["actions"][:top_n_per_category]:
top_actions.append(
{
"action": action.get("action", ""),
"priority": action.get("priority", "MEDIUM"),
"source": action.get("source", "template"),
"rationale": action.get("rationale", ""),
"confidence": action.get("confidence", 0.5),
}
)
category_summaries[category] = ActionCategorySummary(
count=cat_data["count"],
severityDistribution=dict(cat_data["severities"]),
topActions=top_actions,
avgResolutionDays=avg_days,
topHubs=[h for h, _ in cat_data["hubs"].most_common(5)],
topAirlines=[a for a, _ in cat_data["airlines"].most_common(5)],
effectivenessScore=round(cat_data["effectiveness"], 3),
openCount=cat_data["open_count"],
closedCount=cat_data["closed_count"],
highPriorityCount=cat_data["high_priority_count"],
)
for action in cat_data["actions"]:
action["category"] = category
all_actions.append(action)
overall_avg_days = None
if total_resolution_days:
overall_avg_days = round(
sum(total_resolution_days) / len(total_resolution_days), 2
)
overall_summary = {
"totalRecords": total_records,
"openCount": total_open,
"closedCount": total_closed,
"highPriorityCount": total_high_priority,
"severityDistribution": dict(all_severities),
"avgResolutionDays": overall_avg_days,
"categoriesCount": len(categories_agg),
"avgDaysSource": (
"approx"
if (fast and approximate_avg_days and not use_regression)
else ("model" if use_regression else None)
),
}
top_by_count = sorted(
[
{"category": k, "count": v.count, "highPriority": v.highPriorityCount}
for k, v in category_summaries.items()
],
key=lambda x: -x["count"],
)[:10]
top_by_risk = sorted(
[
{
"category": k,
"riskScore": v.highPriorityCount / max(v.count, 1),
"count": v.count,
}
for k, v in category_summaries.items()
],
key=lambda x: (-x["riskScore"], -x["count"]),
)[:10]
global_recs = []
action_priority_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
all_actions.sort(
key=lambda x: (
action_priority_order.get(x.get("priority", "LOW"), 2),
-x.get("confidence", 0),
)
)
seen_actions = set()
for action in all_actions:
key = action.get("action", "")[:50]
if key not in seen_actions and len(global_recs) < 10:
seen_actions.add(key)
global_recs.append(
{
"action": action.get("action", ""),
"priority": action.get("priority", "MEDIUM"),
"category": action.get("category", "Unknown"),
"rationale": action.get("rationale", ""),
"confidence": action.get("confidence", 0.5),
}
)
processing_time = (datetime.now() - start_time).total_seconds()
logger.info(
f"Action summary completed in {processing_time:.2f}s for {total_records} records"
)
resp = ActionSummaryResponse(
status="success",
totalRecords=total_records,
categories=category_summaries,
overallSummary=overall_summary,
topCategoriesByCount=top_by_count,
topCategoriesByRisk=top_by_risk,
globalRecommendations=global_recs,
timestamp=datetime.now().isoformat(),
)
if cache and not bypass_cache and total_records > 0:
try:
cache.set(cache_key, resp.model_dump(), cache_ttl)
except Exception:
pass
return resp
# ============== Advanced NER Endpoints ==============
@app.post("/api/ai/ner/extract")
async def extract_entities(text: str):
"""Extract entities from text"""
try:
rc = get_remote_client()
if rc and rc.enabled:
res = rc.ner([text]) or []
ents = res[0] if res else []
return {"entities": ents, "summary": {}}
except Exception:
pass
from data.advanced_ner_service import get_advanced_ner
ner = get_advanced_ner()
entities = ner.extract(text)
summary = ner.extract_summary(text)
return {"entities": entities, "summary": summary}
# ============== Similarity Endpoints ==============
@app.post("/api/ai/similar")
async def find_similar_reports(
text: str,
top_k: int = 5,
threshold: float = 0.3,
):
"""Find similar reports"""
try:
rc = get_remote_client()
if rc and rc.enabled:
res = rc.similarity_search(text, top_k, threshold) or {}
if "similar_reports" in res:
return {
"query_preview": text[:100],
"similar_reports": res.get("similar_reports"),
}
except Exception:
pass
from data.similarity_service import get_similarity_service
similarity_service = get_similarity_service()
similar = similarity_service.find_similar(text, top_k, threshold)
return {"query_preview": text[:100], "similar_reports": similar}
@app.post("/api/ai/similar/build-index")
async def build_similarity_index(
bypass_cache: bool = False, max_rows_per_sheet: int = 1000
):
from data.similarity_service import get_similarity_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(max_rows=max_rows_per_sheet, fast=True)
similarity_service = get_similarity_service()
similarity_service.build_index(all_data, method="tfidf")
return {"status": "success", "records_indexed": len(all_data)}
# ============== Forecasting Endpoints ==============
@app.get("/api/ai/forecast/issues")
async def forecast_issues(
periods: int = 4, bypass_cache: bool = False, max_rows_per_sheet: int = 5000
):
"""Forecast issue volume for next periods"""
from data.forecast_service import get_forecast_service
forecast_service = get_forecast_service()
forecast_service.ensure_data(
max_rows=max_rows_per_sheet, bypass_cache=bypass_cache
)
forecast = forecast_service.forecast_issues(periods)
return forecast
@app.get("/api/ai/forecast/trends")
async def predict_trends(
bypass_cache: bool = False, max_rows_per_sheet: int = 5000
):
"""Predict category trends"""
from data.forecast_service import get_forecast_service
forecast_service = get_forecast_service()
forecast_service.ensure_data(
max_rows=max_rows_per_sheet, bypass_cache=bypass_cache
)
trends = forecast_service.predict_category_trends()
return trends
@app.get("/api/ai/forecast/seasonal")
async def get_seasonal_patterns(
bypass_cache: bool = False, max_rows_per_sheet: int = 5000
):
"""Get seasonal patterns"""
from data.forecast_service import get_forecast_service
forecast_service = get_forecast_service()
forecast_service.ensure_data(
max_rows=max_rows_per_sheet, bypass_cache=bypass_cache
)
patterns = forecast_service.get_seasonal_patterns()
return patterns
@app.post("/api/ai/forecast/build")
async def build_forecast_data(
bypass_cache: bool = False, max_rows_per_sheet: int = 5000
):
from data.forecast_service import get_forecast_service
forecast_service = get_forecast_service()
records_processed = forecast_service.ensure_data(
max_rows=max_rows_per_sheet, bypass_cache=bypass_cache, force=True
)
return {
"status": "success",
"records_processed": records_processed,
"forecast_summary": forecast_service.get_forecast_summary(),
}
# ============== Report Generation Endpoints ==============
@app.post("/api/ai/report/generate")
async def generate_report(
row_id: str, bypass_cache: bool = False, max_rows_per_sheet: int = 2000
):
from data.report_generator_service import get_report_generator
from data.risk_service import get_risk_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
record = fetcher.find_record(row_id, max_rows=max_rows_per_sheet)
if not record:
raise HTTPException(status_code=404, detail=f"Record '{row_id}' not found")
report_text = _record_text(record)
severity_result = _classify_record_severity(record)
analysis = {
"severity": severity_result.get("severity", "Medium"),
"severityConfidence": severity_result.get("confidence", 0.7),
"severitySource": severity_result.get("source", "unknown"),
"issueType": record.get("Irregularity_Complain_Category", ""),
}
if severity_result.get("context_adjustment"):
analysis["contextAdjustment"] = severity_result["context_adjustment"]
risk_service = get_risk_service()
airline = record.get("Airlines", "")
live_data = fetcher.fetch_all(max_rows=max_rows_per_sheet, fast=True)
risk_service.calculate_all_risk_scores(live_data)
risk_data = risk_service.get_airline_risk(airline) or {}
report_gen = get_report_generator()
formal_report = report_gen.generate_incident_report(record, analysis, risk_data)
exec_summary = report_gen.generate_executive_summary(record, analysis)
json_report = report_gen.generate_json_report(record, analysis, risk_data)
return {
"row_id": row_id,
"data_source": {
"type": "live_google_sheets",
"max_rows_per_sheet": max_rows_per_sheet,
"reference_rows": len(live_data),
},
"formal_report": formal_report,
"executive_summary": exec_summary,
"structured_report": json_report,
}
# ============== Dashboard Endpoints ==============
@app.get("/api/ai/dashboard/summary")
async def dashboard_summary(
bypass_cache: bool = False, max_rows_per_sheet: int = 5000
):
from data.risk_service import get_risk_service
from data.forecast_service import get_forecast_service
cache = get_cache()
cache_key = f"dashboard:summary:{max_rows_per_sheet}"
if not bypass_cache:
cached = cache.get(cache_key)
if cached is not None:
return cached
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(max_rows=max_rows_per_sheet, fast=True)
risk_service = get_risk_service()
risk_service.calculate_all_risk_scores(all_data)
risk_summary = risk_service.get_risk_summary()
forecast_service = get_forecast_service()
forecast_service.ensure_data(
max_rows=max_rows_per_sheet, bypass_cache=bypass_cache
)
forecast_summary = forecast_service.get_forecast_summary()
severity_dist = Counter()
severity_source_dist = Counter()
category_dist = Counter()
airline_dist = Counter()
severity_results = _classify_records_severity(all_data)
severity_confidences = []
for record, severity_result in zip(all_data, severity_results):
sev = severity_result.get("severity", "Low")
severity_dist[sev] += 1
severity_source_dist[severity_result.get("source", "unknown")] += 1
severity_confidences.append(float(severity_result.get("confidence") or 0.0))
category_dist[record.get("Irregularity_Complain_Category", "Unknown")] += 1
airline_dist[record.get("Airlines", "Unknown")] += 1
avg_confidence = (
round(sum(severity_confidences) / len(severity_confidences), 3)
if severity_confidences
else 0.0
)
payload = {
"total_records": len(all_data),
"data_source": {
"type": "live_google_sheets",
"max_rows_per_sheet": max_rows_per_sheet,
"analysis_engine": "deep_learning_or_trained_nlp_with_sheet_overrides",
},
"sheets": {
"non_cargo": sum(
1 for r in all_data if r.get("_sheet_name") == "NON CARGO"
),
"cargo": sum(1 for r in all_data if r.get("_sheet_name") == "CGO"),
},
"severity_distribution": dict(severity_dist),
"severity_source_distribution": dict(severity_source_dist),
"severity_average_confidence": avg_confidence,
"category_distribution": dict(category_dist.most_common(10)),
"top_airlines": dict(airline_dist.most_common(10)),
"risk_summary": risk_summary,
"forecast_summary": forecast_summary,
"model_status": {
"regression": model_service.model_loaded,
"nlp": model_service.nlp_service is not None,
},
"last_updated": datetime.now().isoformat(),
}
if not bypass_cache:
cache.set(cache_key, payload, 300)
return payload
# ============== Seasonality Endpoints ==============
@app.get("/api/ai/seasonality/summary")
async def seasonality_summary(category_type: Optional[str] = None):
"""
Get seasonality summary and patterns
Args:
category_type: "landside_airside", "cgo", or None for both
"""
from data.seasonality_service import get_seasonality_service
service = get_seasonality_service()
return service.get_seasonality_summary(category_type)
@app.get("/api/ai/seasonality/forecast")
async def seasonality_forecast(
category_type: Optional[str] = None,
periods: int = 4,
granularity: str = "weekly",
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
):
from data.seasonality_service import get_seasonality_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
service = get_seasonality_service()
service.build_patterns(all_data)
return service.forecast(category_type, periods, granularity)
@app.get("/api/ai/seasonality/peaks")
async def seasonality_peaks(
category_type: Optional[str] = None,
threshold: float = 1.2,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
bypass_cache: bool = False,
):
from data.seasonality_service import get_seasonality_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
service = get_seasonality_service()
service.build_patterns(all_data)
return service.get_peak_periods(category_type, threshold)
@app.post("/api/ai/seasonality/build")
async def build_seasonality_patterns(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.seasonality_service import get_seasonality_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
service = get_seasonality_service()
result = service.build_patterns(all_data)
return {"status": "success", "records_processed": len(all_data), "patterns": result}
# ============== Root Cause Endpoints ==============
@app.post("/api/ai/root-cause/classify")
async def classify_root_cause(
root_cause: str,
report: Optional[str] = None,
area: Optional[str] = None,
category: Optional[str] = None,
):
"""
Classify a root cause text into categories
Categories: Equipment Failure, Staff Competency, Process/Procedure,
Communication, External Factors, Documentation, Training Gap, Resource/Manpower
"""
try:
rc = get_remote_client()
if rc and rc.enabled:
res = rc.root_cause_classify(root_cause, report, area, category)
if res:
label = res.get("label") or res.get("primary_category") or "Unknown"
res["primary_category"] = label
return res
except Exception:
pass
from data.root_cause_service import get_root_cause_service
service = get_root_cause_service()
context = {"area": area, "category": category}
result = service.classify(root_cause, report or "", context)
return result
@app.post("/api/ai/root-cause/classify-batch")
async def classify_root_cause_batch(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.root_cause_service import get_root_cause_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
service = get_root_cause_service()
results = service.classify_batch(all_data)
return {
"status": "success",
"records_processed": len(all_data),
"classifications": results[:100],
"total_classified": len(
[r for r in results if r["primary_category"] != "Unknown"]
),
}
@app.get("/api/ai/root-cause/categories")
async def get_root_cause_categories():
"""Get all available root cause categories"""
from data.root_cause_service import get_root_cause_service
service = get_root_cause_service()
return service.get_categories()
@app.get("/api/ai/root-cause/stats")
async def get_root_cause_stats(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.root_cause_service import get_root_cause_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
service = get_root_cause_service()
return service.get_statistics(all_data)
@app.post("/api/ai/root-cause/train")
async def train_root_cause_classifier(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.root_cause_service import get_root_cause_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
service = get_root_cause_service()
return service.train_from_data(all_data)
# ============== Category Summarization Endpoints ==============
@app.get("/api/ai/summarize", response_model=CategorySummaryResponse)
async def summarize_by_category(
category: str = "all",
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
):
from data.category_summarization_service import get_category_summarization_service
valid_categories = ["non_cargo", "cgo", "all"]
if category.lower() not in valid_categories:
raise HTTPException(
status_code=400,
detail=f"Invalid category. Must be one of: {valid_categories}",
)
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
summarization_service = get_category_summarization_service()
summary = summarization_service.summarize_category(all_data, category.lower())
return CategorySummaryResponse(
status="success",
category_type=category.lower(),
summary=summary,
timestamp=datetime.now().isoformat(),
)
@app.get("/api/ai/summarize/non-cargo")
async def summarize_non_cargo(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
"""Quick endpoint for Non-cargo summary"""
return await summarize_by_category(
category="non_cargo", bypass_cache=bypass_cache, esklasi_regex=esklasi_regex
)
@app.get("/api/ai/summarize/cgo")
async def summarize_cgo(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
"""Quick endpoint for CGO (Cargo) summary"""
return await summarize_by_category(
category="cgo", bypass_cache=bypass_cache, esklasi_regex=esklasi_regex
)
@app.get("/api/ai/summarize/compare")
async def compare_categories(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
"""Compare Non-cargo and CGO categories side by side"""
return await summarize_by_category(
category="all", bypass_cache=bypass_cache, esklasi_regex=esklasi_regex
)
# ============== Branch Analytics Endpoints ==============
@app.get("/api/ai/branch/summary")
async def branch_analytics_summary(
category_type: Optional[str] = None,
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
from data.branch_analytics_service import get_branch_analytics_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
service = get_branch_analytics_service()
service.calculate_branch_metrics(all_data, category_type)
return service.get_summary(category_type)
@app.get("/api/ai/branch/ranking")
async def branch_ranking(
category_type: Optional[str] = None,
sort_by: str = "risk_score",
limit: int = 20,
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
from data.branch_analytics_service import get_branch_analytics_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
service = get_branch_analytics_service()
service.calculate_branch_metrics(all_data, category_type)
return service.get_ranking(category_type, sort_by, limit)
@app.get("/api/ai/branch/comparison")
async def branch_comparison(
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
from data.branch_analytics_service import get_branch_analytics_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
service = get_branch_analytics_service()
service.calculate_branch_metrics(all_data, None)
return service.get_comparison()
@app.get("/api/ai/branch/{branch_name}")
async def get_branch_metrics(
branch_name: str,
category_type: Optional[str] = None,
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
from data.branch_analytics_service import get_branch_analytics_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
service = get_branch_analytics_service()
service.calculate_branch_metrics(all_data, category_type)
data = service.get_branch(branch_name, category_type)
if not data:
raise HTTPException(status_code=404, detail=f"Branch '{branch_name}' not found")
return data
@app.post("/api/ai/branch/calculate")
async def calculate_branch_metrics(
bypass_cache: bool = False, esklasi_regex: Optional[str] = "OT|OP|UQ|HT"
):
from data.branch_analytics_service import get_branch_analytics_service
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(esklasi_regex=esklasi_regex)
service = get_branch_analytics_service()
result = service.calculate_branch_metrics(all_data)
return {"status": "success", "records_processed": len(all_data), "metrics": result}
# ============== AI Analytics Endpoints ==============
@app.get("/api/ai/analytics/correlations")
async def analytics_correlations(
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
return analytics_service.correlation_analysis(all_data)
@app.get("/api/ai/analytics/spikes")
async def analytics_spikes(
window_days: int = 14,
threshold: float = 2.0,
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
return analytics_service.spike_detection(all_data, window_days, threshold)
@app.get("/api/ai/analytics/heatmap")
async def analytics_heatmap(
dimension_x: str = "day_of_week",
dimension_y: str = "hub",
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
return analytics_service.heatmap_data(all_data, dimension_x, dimension_y)
@app.get("/api/ai/analytics/sla")
async def analytics_sla(
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
return analytics_service.sla_metrics(all_data)
# ============== AI Reports Endpoints ==============
@app.get("/api/ai/reports/briefing")
async def reports_briefing(
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
return reports_service.generate_executive_briefing(all_data, esklasi_regex)
@app.get("/api/ai/reports/scorecard")
async def reports_scorecard(
entity_type: str = "airline",
entity_name: Optional[str] = None,
limit: int = 0,
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
return reports_service.generate_risk_scorecard(
all_data, entity_type, entity_name, limit=limit
)
# ============== AI CBR Endpoints ==============
@app.post("/api/ai/cbr/recommend")
async def cbr_recommend(request: AnalysisRequest):
fetcher = DataFetcher(bypass_cache=request.options.bypassCache)
all_data = fetcher.fetch_all()
if request.data and len(request.data) > 0:
target = request.data[0].model_dump()
else:
raise HTTPException(status_code=400, detail="Provide a report in request.data")
return cbr_service.recommend_from_history(target, all_data)
@app.get("/api/ai/cbr/patterns")
async def cbr_patterns(
category: Optional[str] = None,
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
return cbr_service.track_resolution_patterns(all_data, category)
# ============== AI Data Intelligence Endpoints ==============
@app.get("/api/ai/data/quality")
async def data_quality(
bypass_cache: bool = False,
esklasi_regex: Optional[str] = "OT|OP|UQ|HT",
max_rows_per_sheet: int = 5000,
):
fetcher = DataFetcher(bypass_cache=bypass_cache)
all_data = fetcher.fetch_all(
max_rows=max_rows_per_sheet, esklasi_regex=esklasi_regex
)
return di_service.score_data_quality(all_data)
@app.post("/api/ai/data/autofill")
async def data_autofill(request: AnalysisRequest):
if request.data and len(request.data) > 0:
return di_service.autofill_missing_fields(
[r.model_dump() for r in request.data]
)
fetcher = DataFetcher(bypass_cache=request.options.bypassCache)
all_data = fetcher.fetch_all()
return di_service.autofill_missing_fields(all_data)
# ============== AI Multi-Label Classification ==============
@app.post("/api/ai/classify/multi-label")
async def classify_multi_label(request: AnalysisRequest):
if not request.data or len(request.data) == 0:
raise HTTPException(status_code=400, detail="Provide reports in request.data")
texts = [str(r.Report or "") + " " + str(r.Root_Caused or "") for r in request.data]
results = fallback_classify_multi_label(texts)
return {"status": "success", "total": len(results), "classifications": results}
# ============== Main ==============
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("API_PORT", 8000))
uvicorn.run(app, host="0.0.0.0", port=port)