IOI-RUN / db_monitor.py
Roudrigus's picture
Upload 82 files
0f0ef8d verified
# db_monitor.py
import streamlit as st
import os
import shutil
import time
from datetime import datetime, timedelta
from sqlalchemy import text
# ✅ Use sempre o engine do BANCO ATIVO (em vez de um engine fixo)
from banco import get_engine, SessionLocal, db_info
from utils_permissoes import verificar_permissao
from utils_auditoria import registrar_log
# ===============================
# MONITOR & BACKUP DO BANCO
# ===============================
# Objetivo:
# - Mostrar estatísticas do banco (tamanho, páginas, espaço em disco)
# - Definir limiar/capacidade alvo e exibir ocupação (%)
# - Planejar backup (frequência em dias) e retenção (N arquivos)
# - Executar backup e limpar antigos com confirmação
# - Acesso restrito por perfil admin
#
# Observações:
# - Em SQLite: usa PRAGMA page_count/page_size + arquivo .db
# - Em outros dialetos: exibe dialeto e recomenda backup externo
# - Pasta padrão de backup: ./backups (pode alterar na UI)
# - Auditoria: registrar_log(usuario, acao="BACKUP/CLEAN/MONITOR", tabela="schema")
# (Opcional) rótulo amigável do ambiente atual (Produção/Teste/Treinamento)
try:
from db_router import current_db_choice, bank_label
_HAS_ROUTER = True
except Exception:
_HAS_ROUTER = False
def current_db_choice() -> str:
return "prod"
def bank_label(choice: str) -> str:
return {"prod": "Banco 1 (Produção)", "test": "Banco 2 (Teste)", "treinamento": "Banco 3 (Treinamento)"}\
.get(choice, choice)
# -------------------------
# Auxiliares de dialeto
# -------------------------
def _engine():
"""Retorna o engine do banco ATIVO (de acordo com a escolha no login)."""
return get_engine()
def _dialeto():
try:
return _engine().url.get_backend_name()
except Exception:
return "desconhecido"
def _sqlite_version():
if _dialeto() != "sqlite":
return None
try:
with _engine().begin() as conn:
return conn.execute(text("select sqlite_version()")).scalar()
except Exception:
return None
# -------------------------
# Info do banco
# -------------------------
def _db_file_path():
# Para SQLite, engine.url.database aponta para o arquivo .db
try:
eng = _engine()
return eng.url.database if eng.url.get_backend_name() == "sqlite" else None
except Exception:
return None
def _sqlite_stats():
# Retorna dict com stats do SQLite
db_path = _db_file_path()
if not db_path or not os.path.exists(db_path):
return None
size_bytes = os.path.getsize(db_path)
dir_path = os.path.dirname(os.path.abspath(db_path)) or "."
total, used, free = shutil.disk_usage(dir_path)
with _engine().begin() as conn:
page_count = conn.execute(text("PRAGMA page_count")).scalar()
page_size = conn.execute(text("PRAGMA page_size")).scalar()
return {
"db_path": db_path,
"size_bytes": size_bytes,
"page_count": page_count,
"page_size": page_size,
"calc_bytes": (page_count or 0) * (page_size or 0),
"disk_total": total,
"disk_free": free,
"disk_used": used,
"sqlite_version": _sqlite_version(),
}
# -------------------------
# Backup
# -------------------------
def _ensure_dir(path: str):
os.makedirs(path, exist_ok=True)
def _fmt_bytes(b: int) -> str:
# Formata bytes em unidades legíveis
for unit in ["B","KB","MB","GB","TB"]:
if b < 1024.0:
return f"{b:,.2f} {unit}".replace(",", ".")
b /= 1024.0
return f"{b:,.2f} PB".replace(",", ".")
def _listar_backups(backup_dir: str, base_name: str):
"""Lista backups para o banco atual. Formato: base_name-YYYYMMDD-HHMMSS.db (ou .zip futuramente)"""
if not os.path.isdir(backup_dir):
return []
files = []
for f in os.listdir(backup_dir):
if f.startswith(base_name + "-") and f.endswith(".db"):
full = os.path.join(backup_dir, f)
files.append((f, full, os.path.getmtime(full)))
return sorted(files, key=lambda x: x[2], reverse=True) # ordem decrescente
def _executar_backup(backup_dir: str):
"""Copia o .db para backups/ com timestamp. Registra auditoria."""
db_path = _db_file_path()
if not db_path or not os.path.exists(db_path):
st.error("Arquivo de banco SQLite não encontrado.")
return False
_ensure_dir(backup_dir)
base_name = os.path.splitext(os.path.basename(db_path))[0]
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
dest = os.path.join(backup_dir, f"{base_name}-{stamp}.db")
try:
shutil.copyfile(db_path, dest)
registrar_log(st.session_state.get("usuario"), f"BACKUP criado: {os.path.basename(dest)}", "schema", None)
st.success(f"✅ Backup criado: {dest}")
return True
except Exception as e:
st.error(f"Erro ao criar backup: {e}")
return False
def _limpar_antigos(backup_dir: str, base_name: str, manter: int):
"""Remove backups antigos, mantendo N mais recentes. Registra auditoria."""
lst = _listar_backups(backup_dir, base_name)
if len(lst) <= manter:
st.info("Nada para remover: já dentro da retenção.")
return 0
remover = lst[manter:]
count = 0
for _, full, _ in remover:
try:
os.remove(full)
count += 1
except Exception as e:
st.error(f"Erro ao remover {full}: {e}")
if count > 0:
registrar_log(st.session_state.get("usuario"), f"CLEAN backups antigos: {count} removidos (retain={manter})", "schema", None)
st.success(f"🧹 {count} backup(s) antigo(s) removido(s).")
return count
# -------------------------
# UI principal
# -------------------------
def main():
st.title("🗄️ Monitor e Backup do Banco")
# 🔐 Proteção: apenas admin
if st.session_state.get("perfil") != "admin":
st.error("⛔ Acesso restrito ao administrador.")
return
# Badge/URL do banco ativo (opcional)
try:
amb = current_db_choice()
st.caption(f"🧭 Ambiente: {bank_label(amb)}")
except Exception:
pass
try:
info = db_info()
st.caption(f"🔗 Engine URL: {info.get('url')}")
except Exception:
pass
dial = _dialeto()
st.caption(f"Dialeto do banco: **{dial}**")
# Estatísticas
stats = _sqlite_stats() if dial == "sqlite" else None
# Se não for SQLite, exibe recomendações
if dial != "sqlite":
st.info("Este monitor está otimizado para SQLite. Para PostgreSQL/MySQL, configure backup via ferramenta da plataforma (pg_dump/mysqldump) e agendamento externo.")
st.stop()
if not stats:
st.error("Banco SQLite não encontrado ou inacessível. Verifique o arquivo do banco ativo.")
return
# Painel de estatísticas
st.subheader("📊 Estatísticas")
colA, colB, colC = st.columns(3)
with colA:
st.metric("Arquivo", os.path.basename(stats["db_path"]))
st.metric("Tamanho do banco (arquivo)", _fmt_bytes(stats["size_bytes"]))
with colB:
st.metric("Páginas (PRAGMA)", f'{stats["page_count"]} × {stats["page_size"]} B')
st.metric("Cálculo (page_count×page_size)", _fmt_bytes(stats["calc_bytes"]))
with colC:
st.metric("Espaço livre no disco", _fmt_bytes(stats["disk_free"]))
st.metric("SQLite version", stats["sqlite_version"] or "—")
st.divider()
# Capacidade alvo e ocupação
st.subheader("🎯 Capacidade & Ocupação")
capacidade_gb = st.number_input("Capacidade alvo (GB) — alerta quando ultrapassar", min_value=0.1, value=1.0, step=0.1)
ocupacao_perc = min(100.0, (stats["size_bytes"] / (capacidade_gb * 1024**3)) * 100.0) if capacidade_gb > 0 else 0.0
st.progress(min(1.0, ocupacao_perc / 100.0))
st.caption(f"Ocupação estimada: **{ocupacao_perc:,.2f}%** de {capacidade_gb} GB")
if ocupacao_perc >= 80.0:
st.warning("⚠️ Ocupação acima de 80%. Considere backup/arquivamento.")
st.divider()
# Planejamento de backup
st.subheader("🗓️ Planejamento de Backup")
backup_dir = st.text_input("Pasta de backups", value="backups")
_ensure_dir(backup_dir) # garante a pasta
base_name = os.path.splitext(os.path.basename(stats["db_path"]))[0]
backups = _listar_backups(backup_dir, base_name)
# Último e próximo
ultimo_backup_dt = datetime.fromtimestamp(backups[0][2]) if backups else None
freq_dias = st.number_input("Frequência (dias)", min_value=1, value=7)
retencao = st.number_input("Retenção máx. de backups (arquivos)", min_value=1, value=10)
proximo_backup_dt = (ultimo_backup_dt + timedelta(days=freq_dias)) if ultimo_backup_dt else (datetime.now() + timedelta(days=freq_dias))
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Último backup", ultimo_backup_dt.strftime("%d/%m/%Y %H:%M:%S") if ultimo_backup_dt else "—")
with col2:
st.metric("Próximo previsto", proximo_backup_dt.strftime("%d/%m/%Y %H:%M:%S"))
with col3:
st.metric("Backups atuais", len(backups))
# Aviso se vencido
if ultimo_backup_dt and datetime.now() >= proximo_backup_dt:
st.warning("⏰ Backup previsto já venceu. Execute agora para manter o plano.")
# Ações
st.subheader("⚙️ Ações")
colX, colY, colZ = st.columns(3)
with colX:
if st.button("💾 Backup agora"):
if _executar_backup(backup_dir):
st.rerun()
with colY:
if st.button("🧹 Limpar antigos (manter retenção)"):
_limpar_antigos(backup_dir, base_name, int(retencao))
st.rerun()
with colZ:
# Apenas mostra lista dos últimos backups
if backups:
st.write("Últimos backups:")
for f, full, mtime in backups[:5]:
dt = datetime.fromtimestamp(mtime).strftime("%d/%m/%Y %H:%M:%S")
st.caption(f"• {f} ({dt})")
# Auditoria de visualização (opcional)
registrar_log(st.session_state.get("usuario"), "MONITOR DB", "schema", None)