mesa-react / backend /app /api /visualizacao.py
Guilherme Silberfarb Costa
multiplas alteracoes
9e7c650
from __future__ import annotations
import os
from typing import Any
from fastapi import APIRouter, File, Form, Request, UploadFile
from fastapi.responses import FileResponse
from pydantic import BaseModel
from app.services import auth_service, elaboracao_service, visualizacao_service
from app.services.audit_log_service import log_event
from app.services.session_store import session_store
router = APIRouter(prefix="/api/visualizacao", tags=["visualizacao"])
class SessionPayload(BaseModel):
session_id: str
class ExibirPayload(SessionPayload):
trabalhos_tecnicos_modelos_modo: str | None = None
class MapaPayload(SessionPayload):
variavel_mapa: str | None = None
trabalhos_tecnicos_modelos_modo: str | None = None
class SecaoPayload(SessionPayload):
secao: str
trabalhos_tecnicos_modelos_modo: str | None = None
class MapaPopupPayload(SessionPayload):
row_id: int
class AvaliacaoPayload(SessionPayload):
valores_x: dict[str, Any]
indice_base: str | None = None
avaliando_lat: float | None = None
avaliando_lon: float | None = None
class AvaliacaoKnnDetalhesPayload(SessionPayload):
valores_x: dict[str, Any]
avaliando_lat: float | None = None
avaliando_lon: float | None = None
class AvaliacaoDeletePayload(SessionPayload):
indice: str | None = None
indice_base: str | None = None
class AvaliacaoBasePayload(SessionPayload):
indice_base: str | None = None
class RepositorioModeloPayload(SessionPayload):
modelo_id: str
@router.post("/upload")
async def upload_file(
session_id: str = Form(...),
file: UploadFile = File(...),
) -> dict[str, Any]:
session = session_store.get(session_id)
conteudo = await file.read()
caminho = elaboracao_service.save_uploaded_file(session, file.filename, conteudo)
return visualizacao_service.carregar_modelo(session, caminho)
@router.get("/repositorio-modelos")
def repositorio_modelos(request: Request) -> dict[str, Any]:
user = auth_service.require_user(request)
resposta = visualizacao_service.listar_modelos_repositorio()
log_event(
"repositorio",
"listar_modelos_visualizacao",
user=user,
request=request,
details={"total_modelos": resposta.get("total_modelos")},
)
return resposta
@router.post("/repositorio-carregar")
def repositorio_carregar(payload: RepositorioModeloPayload, request: Request) -> dict[str, Any]:
session = session_store.get(payload.session_id)
user = auth_service.require_user(request)
resposta = visualizacao_service.carregar_modelo_repositorio(session, payload.modelo_id)
log_event(
"repositorio",
"carregar_modelo_visualizacao",
user=user,
session_id=payload.session_id,
request=request,
details={"modelo_id": payload.modelo_id},
)
return resposta
@router.post("/exibir")
def exibir(payload: ExibirPayload, request: Request) -> dict[str, Any]:
session = session_store.get(payload.session_id)
return visualizacao_service.exibir_modelo(
session,
trabalhos_tecnicos_modelos_modo=payload.trabalhos_tecnicos_modelos_modo,
api_base_url=str(request.base_url).rstrip("/"),
popup_auth_token=getattr(request.state, "auth_token", None),
)
@router.post("/section")
def section(payload: SecaoPayload, request: Request) -> dict[str, Any]:
session = session_store.get(payload.session_id)
return visualizacao_service.carregar_secao_modelo(
session,
payload.secao,
trabalhos_tecnicos_modelos_modo=payload.trabalhos_tecnicos_modelos_modo,
api_base_url=str(request.base_url).rstrip("/"),
popup_auth_token=getattr(request.state, "auth_token", None),
)
@router.post("/evaluation/context")
def evaluation_context(payload: SessionPayload) -> dict[str, Any]:
session = session_store.get(payload.session_id)
return visualizacao_service.exibir_contexto_avaliacao(session)
@router.post("/map/update")
def map_update(payload: MapaPayload, request: Request) -> dict[str, Any]:
session = session_store.get(payload.session_id)
return visualizacao_service.atualizar_mapa(
session,
payload.variavel_mapa,
trabalhos_tecnicos_modelos_modo=payload.trabalhos_tecnicos_modelos_modo,
api_base_url=str(request.base_url).rstrip("/"),
popup_auth_token=getattr(request.state, "auth_token", None),
)
@router.post("/map/popup")
def map_popup(payload: MapaPopupPayload) -> dict[str, Any]:
session = session_store.get(payload.session_id)
return visualizacao_service.carregar_popup_ponto_mapa(session, payload.row_id)
@router.post("/evaluation/fields")
def evaluation_fields(payload: SessionPayload) -> dict[str, Any]:
session = session_store.get(payload.session_id)
return {"campos": visualizacao_service.campos_avaliacao(session)}
@router.post("/evaluation/calculate")
def evaluation_calculate(payload: AvaliacaoPayload, request: Request) -> dict[str, Any]:
session = session_store.get(payload.session_id)
user = auth_service.require_user(request)
resposta = visualizacao_service.calcular_avaliacao(
session,
payload.valores_x,
payload.indice_base,
payload.avaliando_lat,
payload.avaliando_lon,
)
log_event(
"visualizacao",
"avaliacao_calcular",
user=user,
session_id=payload.session_id,
request=request,
details={"total_avaliacoes": len(resposta.get("avaliacoes") or [])},
)
return resposta
@router.post("/evaluation/knn-details")
def evaluation_knn_details(payload: AvaliacaoKnnDetalhesPayload, request: Request) -> dict[str, Any]:
session = session_store.get(payload.session_id)
user = auth_service.require_user(request)
resposta = visualizacao_service.detalhes_knn_avaliacao(
session,
payload.valores_x,
payload.avaliando_lat,
payload.avaliando_lon,
)
log_event(
"visualizacao",
"avaliacao_knn_detalhes",
user=user,
session_id=payload.session_id,
request=request,
)
return resposta
@router.post("/evaluation/clear")
def evaluation_clear(payload: SessionPayload) -> dict[str, Any]:
session = session_store.get(payload.session_id)
return visualizacao_service.limpar_avaliacoes(session)
@router.post("/evaluation/delete")
def evaluation_delete(payload: AvaliacaoDeletePayload) -> dict[str, Any]:
session = session_store.get(payload.session_id)
return visualizacao_service.excluir_avaliacao(session, payload.indice, payload.indice_base)
@router.post("/evaluation/base")
def evaluation_base(payload: AvaliacaoBasePayload) -> dict[str, Any]:
session = session_store.get(payload.session_id)
return visualizacao_service.atualizar_base(session, payload.indice_base)
@router.get("/evaluation/export")
def evaluation_export(session_id: str) -> FileResponse:
session = session_store.get(session_id)
caminho = visualizacao_service.exportar_avaliacoes(session)
return FileResponse(
path=caminho,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
filename=os.path.basename(caminho),
)
@router.get("/equation/export")
def equation_export(session_id: str, mode: str = "excel") -> FileResponse:
session = session_store.get(session_id)
caminho, nome = visualizacao_service.exportar_equacao(session, mode)
return FileResponse(
path=caminho,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
filename=nome,
)
@router.post("/clear")
def clear(payload: SessionPayload) -> dict[str, Any]:
session = session_store.get(payload.session_id)
return visualizacao_service.limpar_tudo_visualizacao(session)