|
|
|
|
|
|
|
|
import streamlit as st
|
|
|
import os
|
|
|
import shutil
|
|
|
import time
|
|
|
from datetime import datetime, timedelta
|
|
|
from sqlalchemy import text
|
|
|
|
|
|
from banco import get_engine, SessionLocal, db_info
|
|
|
from utils_permissoes import verificar_permissao
|
|
|
from utils_auditoria import registrar_log
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
from db_router import current_db_choice, bank_label
|
|
|
_HAS_ROUTER = True
|
|
|
except Exception:
|
|
|
_HAS_ROUTER = False
|
|
|
def current_db_choice() -> str:
|
|
|
return "prod"
|
|
|
def bank_label(choice: str) -> str:
|
|
|
return {"prod": "Banco 1 (Produção)", "test": "Banco 2 (Teste)", "treinamento": "Banco 3 (Treinamento)"}\
|
|
|
.get(choice, choice)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _engine():
|
|
|
"""Retorna o engine do banco ATIVO (de acordo com a escolha no login)."""
|
|
|
return get_engine()
|
|
|
|
|
|
def _dialeto():
|
|
|
try:
|
|
|
return _engine().url.get_backend_name()
|
|
|
except Exception:
|
|
|
return "desconhecido"
|
|
|
|
|
|
def _sqlite_version():
|
|
|
if _dialeto() != "sqlite":
|
|
|
return None
|
|
|
try:
|
|
|
with _engine().begin() as conn:
|
|
|
return conn.execute(text("select sqlite_version()")).scalar()
|
|
|
except Exception:
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _db_file_path():
|
|
|
|
|
|
try:
|
|
|
eng = _engine()
|
|
|
return eng.url.database if eng.url.get_backend_name() == "sqlite" else None
|
|
|
except Exception:
|
|
|
return None
|
|
|
|
|
|
def _sqlite_stats():
|
|
|
|
|
|
db_path = _db_file_path()
|
|
|
if not db_path or not os.path.exists(db_path):
|
|
|
return None
|
|
|
|
|
|
size_bytes = os.path.getsize(db_path)
|
|
|
dir_path = os.path.dirname(os.path.abspath(db_path)) or "."
|
|
|
total, used, free = shutil.disk_usage(dir_path)
|
|
|
|
|
|
with _engine().begin() as conn:
|
|
|
page_count = conn.execute(text("PRAGMA page_count")).scalar()
|
|
|
page_size = conn.execute(text("PRAGMA page_size")).scalar()
|
|
|
|
|
|
return {
|
|
|
"db_path": db_path,
|
|
|
"size_bytes": size_bytes,
|
|
|
"page_count": page_count,
|
|
|
"page_size": page_size,
|
|
|
"calc_bytes": (page_count or 0) * (page_size or 0),
|
|
|
"disk_total": total,
|
|
|
"disk_free": free,
|
|
|
"disk_used": used,
|
|
|
"sqlite_version": _sqlite_version(),
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _ensure_dir(path: str):
|
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
|
|
def _fmt_bytes(b: int) -> str:
|
|
|
|
|
|
for unit in ["B","KB","MB","GB","TB"]:
|
|
|
if b < 1024.0:
|
|
|
return f"{b:,.2f} {unit}".replace(",", ".")
|
|
|
b /= 1024.0
|
|
|
return f"{b:,.2f} PB".replace(",", ".")
|
|
|
|
|
|
def _listar_backups(backup_dir: str, base_name: str):
|
|
|
"""Lista backups para o banco atual. Formato: base_name-YYYYMMDD-HHMMSS.db (ou .zip futuramente)"""
|
|
|
if not os.path.isdir(backup_dir):
|
|
|
return []
|
|
|
files = []
|
|
|
for f in os.listdir(backup_dir):
|
|
|
if f.startswith(base_name + "-") and f.endswith(".db"):
|
|
|
full = os.path.join(backup_dir, f)
|
|
|
files.append((f, full, os.path.getmtime(full)))
|
|
|
return sorted(files, key=lambda x: x[2], reverse=True)
|
|
|
|
|
|
def _executar_backup(backup_dir: str):
|
|
|
"""Copia o .db para backups/ com timestamp. Registra auditoria."""
|
|
|
db_path = _db_file_path()
|
|
|
if not db_path or not os.path.exists(db_path):
|
|
|
st.error("Arquivo de banco SQLite não encontrado.")
|
|
|
return False
|
|
|
|
|
|
_ensure_dir(backup_dir)
|
|
|
base_name = os.path.splitext(os.path.basename(db_path))[0]
|
|
|
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
|
dest = os.path.join(backup_dir, f"{base_name}-{stamp}.db")
|
|
|
|
|
|
try:
|
|
|
shutil.copyfile(db_path, dest)
|
|
|
registrar_log(st.session_state.get("usuario"), f"BACKUP criado: {os.path.basename(dest)}", "schema", None)
|
|
|
st.success(f"✅ Backup criado: {dest}")
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
st.error(f"Erro ao criar backup: {e}")
|
|
|
return False
|
|
|
|
|
|
def _limpar_antigos(backup_dir: str, base_name: str, manter: int):
|
|
|
"""Remove backups antigos, mantendo N mais recentes. Registra auditoria."""
|
|
|
lst = _listar_backups(backup_dir, base_name)
|
|
|
if len(lst) <= manter:
|
|
|
st.info("Nada para remover: já dentro da retenção.")
|
|
|
return 0
|
|
|
remover = lst[manter:]
|
|
|
count = 0
|
|
|
for _, full, _ in remover:
|
|
|
try:
|
|
|
os.remove(full)
|
|
|
count += 1
|
|
|
except Exception as e:
|
|
|
st.error(f"Erro ao remover {full}: {e}")
|
|
|
if count > 0:
|
|
|
registrar_log(st.session_state.get("usuario"), f"CLEAN backups antigos: {count} removidos (retain={manter})", "schema", None)
|
|
|
st.success(f"🧹 {count} backup(s) antigo(s) removido(s).")
|
|
|
return count
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
st.title("🗄️ Monitor e Backup do Banco")
|
|
|
|
|
|
|
|
|
if st.session_state.get("perfil") != "admin":
|
|
|
st.error("⛔ Acesso restrito ao administrador.")
|
|
|
return
|
|
|
|
|
|
|
|
|
try:
|
|
|
amb = current_db_choice()
|
|
|
st.caption(f"🧭 Ambiente: {bank_label(amb)}")
|
|
|
except Exception:
|
|
|
pass
|
|
|
try:
|
|
|
info = db_info()
|
|
|
st.caption(f"🔗 Engine URL: {info.get('url')}")
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
dial = _dialeto()
|
|
|
st.caption(f"Dialeto do banco: **{dial}**")
|
|
|
|
|
|
|
|
|
stats = _sqlite_stats() if dial == "sqlite" else None
|
|
|
|
|
|
|
|
|
if dial != "sqlite":
|
|
|
st.info("Este monitor está otimizado para SQLite. Para PostgreSQL/MySQL, configure backup via ferramenta da plataforma (pg_dump/mysqldump) e agendamento externo.")
|
|
|
st.stop()
|
|
|
|
|
|
if not stats:
|
|
|
st.error("Banco SQLite não encontrado ou inacessível. Verifique o arquivo do banco ativo.")
|
|
|
return
|
|
|
|
|
|
|
|
|
st.subheader("📊 Estatísticas")
|
|
|
colA, colB, colC = st.columns(3)
|
|
|
with colA:
|
|
|
st.metric("Arquivo", os.path.basename(stats["db_path"]))
|
|
|
st.metric("Tamanho do banco (arquivo)", _fmt_bytes(stats["size_bytes"]))
|
|
|
with colB:
|
|
|
st.metric("Páginas (PRAGMA)", f'{stats["page_count"]} × {stats["page_size"]} B')
|
|
|
st.metric("Cálculo (page_count×page_size)", _fmt_bytes(stats["calc_bytes"]))
|
|
|
with colC:
|
|
|
st.metric("Espaço livre no disco", _fmt_bytes(stats["disk_free"]))
|
|
|
st.metric("SQLite version", stats["sqlite_version"] or "—")
|
|
|
|
|
|
st.divider()
|
|
|
|
|
|
|
|
|
st.subheader("🎯 Capacidade & Ocupação")
|
|
|
capacidade_gb = st.number_input("Capacidade alvo (GB) — alerta quando ultrapassar", min_value=0.1, value=1.0, step=0.1)
|
|
|
ocupacao_perc = min(100.0, (stats["size_bytes"] / (capacidade_gb * 1024**3)) * 100.0) if capacidade_gb > 0 else 0.0
|
|
|
|
|
|
st.progress(min(1.0, ocupacao_perc / 100.0))
|
|
|
st.caption(f"Ocupação estimada: **{ocupacao_perc:,.2f}%** de {capacidade_gb} GB")
|
|
|
|
|
|
if ocupacao_perc >= 80.0:
|
|
|
st.warning("⚠️ Ocupação acima de 80%. Considere backup/arquivamento.")
|
|
|
|
|
|
st.divider()
|
|
|
|
|
|
|
|
|
st.subheader("🗓️ Planejamento de Backup")
|
|
|
backup_dir = st.text_input("Pasta de backups", value="backups")
|
|
|
_ensure_dir(backup_dir)
|
|
|
base_name = os.path.splitext(os.path.basename(stats["db_path"]))[0]
|
|
|
backups = _listar_backups(backup_dir, base_name)
|
|
|
|
|
|
|
|
|
ultimo_backup_dt = datetime.fromtimestamp(backups[0][2]) if backups else None
|
|
|
freq_dias = st.number_input("Frequência (dias)", min_value=1, value=7)
|
|
|
retencao = st.number_input("Retenção máx. de backups (arquivos)", min_value=1, value=10)
|
|
|
proximo_backup_dt = (ultimo_backup_dt + timedelta(days=freq_dias)) if ultimo_backup_dt else (datetime.now() + timedelta(days=freq_dias))
|
|
|
|
|
|
col1, col2, col3 = st.columns(3)
|
|
|
with col1:
|
|
|
st.metric("Último backup", ultimo_backup_dt.strftime("%d/%m/%Y %H:%M:%S") if ultimo_backup_dt else "—")
|
|
|
with col2:
|
|
|
st.metric("Próximo previsto", proximo_backup_dt.strftime("%d/%m/%Y %H:%M:%S"))
|
|
|
with col3:
|
|
|
st.metric("Backups atuais", len(backups))
|
|
|
|
|
|
|
|
|
if ultimo_backup_dt and datetime.now() >= proximo_backup_dt:
|
|
|
st.warning("⏰ Backup previsto já venceu. Execute agora para manter o plano.")
|
|
|
|
|
|
|
|
|
st.subheader("⚙️ Ações")
|
|
|
colX, colY, colZ = st.columns(3)
|
|
|
with colX:
|
|
|
if st.button("💾 Backup agora"):
|
|
|
if _executar_backup(backup_dir):
|
|
|
st.rerun()
|
|
|
with colY:
|
|
|
if st.button("🧹 Limpar antigos (manter retenção)"):
|
|
|
_limpar_antigos(backup_dir, base_name, int(retencao))
|
|
|
st.rerun()
|
|
|
with colZ:
|
|
|
|
|
|
if backups:
|
|
|
st.write("Últimos backups:")
|
|
|
for f, full, mtime in backups[:5]:
|
|
|
dt = datetime.fromtimestamp(mtime).strftime("%d/%m/%Y %H:%M:%S")
|
|
|
st.caption(f"• {f} ({dt})")
|
|
|
|
|
|
|
|
|
registrar_log(st.session_state.get("usuario"), "MONITOR DB", "schema", None)
|
|
|
|
|
|
|