| | import eventlet |
| | eventlet.monkey_patch() |
| |
|
| | import logging |
| | |
| | logging.getLogger('engineio.server').setLevel(logging.CRITICAL) |
| | logging.getLogger('socketio.server').setLevel(logging.CRITICAL) |
| |
|
| | import os |
| | import fcntl |
| | import json |
| | import uuid |
| | import threading |
| | import time |
| | import base64 |
| | import csv |
| | import io |
| | import requests |
| | import datetime |
| | from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, send_file |
| | from fpdf import FPDF |
| | from flask_socketio import SocketIO, emit |
| | from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user |
| | from werkzeug.security import generate_password_hash, check_password_hash |
| | import gitlab |
| | import telebot |
| | from telebot import types |
| | import markdown |
| | from dotenv import load_dotenv |
| | import resend |
| |
|
| | |
| | try: |
| | from datasets import Dataset, load_dataset |
| | from huggingface_hub import HfApi |
| | HF_AVAILABLE = True |
| | except ImportError: |
| | HF_AVAILABLE = False |
| | print("[WARNING] huggingface_hub/datasets not available. Using local JSON storage only.") |
| |
|
| | |
| | load_dotenv() |
| |
|
| | |
| | app = Flask(__name__) |
| | app.config['SECRET_KEY'] = os.getenv("SECRET_KEY", "maker-secret-key") |
| | socketio = SocketIO(app, cors_allowed_origins="*") |
| |
|
| | |
| | try: |
| | model_path = os.path.join(app.root_path, 'static', 'models') |
| | if os.path.exists(model_path): |
| | print(f"[DEBUG] Models directory found at: {model_path}") |
| | print(f"[DEBUG] Files: {os.listdir(model_path)}") |
| | else: |
| | print(f"[DEBUG] ERROR: Models directory NOT found at {model_path}") |
| | except Exception as e: |
| | print(f"[DEBUG] Error checking models: {e}") |
| |
|
| | |
| | login_manager = LoginManager() |
| | login_manager.init_app(app) |
| | login_manager.login_view = 'login' |
| | login_manager.login_message = "Por favor, inicia sesión para acceder." |
| | login_manager.login_message_category = "red" |
| |
|
| | |
| | class HFDatasetManager: |
| | """Clase base para manejar persistencia con Hugging Face Datasets.""" |
| | |
| | def __init__(self, dataset_name, data_key, local_filename): |
| | self.dataset_name = os.getenv(dataset_name) |
| | self.data_key = data_key |
| | self.hf_token = os.getenv("HF_TOKEN") |
| | |
| | |
| | hf_vars = [k for k in os.environ.keys() if k.startswith("HF_")] |
| | print(f"[{self.__class__.__name__}] DEBUG - HF Vars detectadas: {hf_vars}") |
| | |
| | self.use_hf = HF_AVAILABLE and self.dataset_name and self.hf_token |
| | |
| | |
| | data_dir = os.getenv("DATA_DIR", "data") |
| | if not os.path.isabs(data_dir): |
| | data_dir = os.path.join(os.getcwd(), data_dir) |
| | os.makedirs(data_dir, exist_ok=True) |
| | self.local_file = os.path.join(data_dir, local_filename) |
| | |
| | print(f"[{self.__class__.__name__}] HF Datasets: {'ENABLED' if self.use_hf else 'DISABLED (using local JSON)'}") |
| | if self.use_hf: |
| | print(f"[{self.__class__.__name__}] Dataset: {self.dataset_name}") |
| | |
| | def _load_from_hf(self): |
| | """Carga datos desde Hugging Face Dataset.""" |
| | try: |
| | dataset = load_dataset(self.dataset_name, split="train", token=self.hf_token) |
| | if len(dataset) > 0: |
| | return dataset[0][self.data_key] |
| | return [] |
| | except Exception as e: |
| | print(f"[HF ERROR] Error loading from dataset: {e}") |
| | return None |
| | |
| | def _load_from_local(self): |
| | """Carga datos desde archivo JSON local.""" |
| | if os.path.exists(self.local_file): |
| | try: |
| | with open(self.local_file, "r") as f: |
| | return json.load(f) |
| | except: |
| | return [] |
| | return [] |
| | |
| | def _save_to_hf(self, data): |
| | """Guarda datos en Hugging Face Dataset.""" |
| | try: |
| | dataset_dict = {self.data_key: [data]} |
| | dataset = Dataset.from_dict(dataset_dict) |
| | dataset.push_to_hub( |
| | self.dataset_name, |
| | token=self.hf_token, |
| | private=True |
| | ) |
| | print(f"[HF SUCCESS] Data saved to {self.dataset_name}") |
| | return True |
| | except Exception as e: |
| | print(f"[HF ERROR] Error saving to dataset: {e}") |
| | return False |
| | |
| | def _save_to_local(self, data): |
| | """Guarda datos en archivo JSON local.""" |
| | try: |
| | with open(self.local_file, "w") as f: |
| | json.dump(data, f, indent=4) |
| | return True |
| | except Exception as e: |
| | print(f"[LOCAL ERROR] Error saving to local file: {e}") |
| | return False |
| |
|
| | class User(UserMixin): |
| | """Clase de usuario compatible con Flask-Login.""" |
| | def __init__(self, id, username): |
| | self.id = id |
| | self.username = username |
| |
|
| | |
| | class UserManager(HFDatasetManager): |
| | """Clase para manejar los usuarios con persistencia en HF Datasets.""" |
| | |
| | def __init__(self): |
| | super().__init__( |
| | dataset_name="HF_DATASET_USERS", |
| | data_key="users", |
| | local_filename="users.json" |
| | ) |
| | self.users = self._load() |
| | |
| | def _load(self): |
| | """Carga los usuarios desde HF Dataset o archivo JSON local.""" |
| | if self.use_hf: |
| | data = self._load_from_hf() |
| | if data: |
| | print(f"[UserManager] Loaded {len(data)} users from HF Dataset") |
| | return data |
| | |
| | if data is not None: |
| | print("[UserManager] HF Dataset is empty, checking local...") |
| | else: |
| | print("[UserManager] Failed to load from HF, checking local...") |
| | |
| | data = self._load_from_local() |
| | print(f"[UserManager] Loaded {len(data)} users from local JSON") |
| | return data |
| |
|
| | def save(self): |
| | """Guarda la lista de usuarios en HF Dataset y/o archivo JSON.""" |
| | |
| | self._save_to_local(self.users) |
| | |
| | |
| | if self.use_hf: |
| | success = self._save_to_hf(self.users) |
| | if success: |
| | print(f"[UserManager] Saved {len(self.users)} users to HF Dataset") |
| | else: |
| | print("[UserManager] Failed to save to HF, data saved locally only") |
| |
|
| | def add_user(self, username, password, email, status="PENDING"): |
| | """Registra un nuevo usuario con email, contraseña hasheada y estado pendiente.""" |
| | if self.get_by_username(username) or self.get_by_email(email): |
| | return False |
| | |
| | user_id = str(uuid.uuid4()) |
| | hashed_pw = generate_password_hash(password) |
| | self.users.append({ |
| | "id": user_id, |
| | "username": username, |
| | "email": email, |
| | "password": hashed_pw, |
| | "status": status, |
| | "reset_token": None, |
| | "reset_expiry": None |
| | }) |
| | self.save() |
| | return True |
| |
|
| | def activate_user(self, username): |
| | """Activa un usuario pendiente.""" |
| | for user in self.users: |
| | if user["username"] == username: |
| | user["status"] = "ACTIVE" |
| | self.save() |
| | return True |
| | return False |
| |
|
| | def delete_user(self, username): |
| | """Elimina un usuario (para rechazar registros).""" |
| | self.users = [u for u in self.users if u["username"] != username] |
| | self.save() |
| | return True |
| |
|
| | def get_by_username(self, username): |
| | """Busca un usuario por su nombre.""" |
| | for user in self.users: |
| | if user["username"] == username: |
| | return user |
| | return None |
| |
|
| | def get_by_email(self, email): |
| | """Busca un usuario por su email.""" |
| | for user in self.users: |
| | if user.get("email") == email: |
| | return user |
| | return None |
| |
|
| | def get_by_id(self, user_id): |
| | """Busca un usuario por su ID.""" |
| | for user in self.users: |
| | if user["id"] == user_id: |
| | return user |
| | return None |
| |
|
| | def generate_reset_token(self, email): |
| | """Genera un token de recuperación de contraseña.""" |
| | user = self.get_by_email(email) |
| | if user: |
| | token = str(uuid.uuid4()) |
| | expiry = (datetime.datetime.now() + datetime.timedelta(hours=1)).isoformat() |
| | user["reset_token"] = token |
| | user["reset_expiry"] = expiry |
| | self.save() |
| | return token |
| | return None |
| |
|
| | def verify_reset_token(self, token): |
| | """Verifica un token de recuperación y retorna el usuario si es válido.""" |
| | for user in self.users: |
| | if user.get("reset_token") == token: |
| | expiry_str = user.get("reset_expiry") |
| | if expiry_str: |
| | expiry = datetime.datetime.fromisoformat(expiry_str) |
| | if datetime.datetime.now() < expiry: |
| | return user |
| | return None |
| | |
| | def verify_user(self, username, password): |
| | """Verifica las credenciales de un usuario.""" |
| | user_data = self.get_by_username(username) |
| | if user_data and check_password_hash(user_data['password'], password): |
| | return User(user_data['id'], username) |
| | return None |
| |
|
| | def update_password(self, user_id, new_password): |
| | """Actualiza la contraseña de un usuario.""" |
| | for user in self.users: |
| | if user["id"] == user_id: |
| | user["password"] = generate_password_hash(new_password) |
| | user["reset_token"] = None |
| | user["reset_expiry"] = None |
| | self.save() |
| | return True |
| | return False |
| |
|
| | |
| | class ClassroomManager(HFDatasetManager): |
| | """Clase para manejar aulas y estudiantes con persistencia.""" |
| | |
| | def __init__(self): |
| | super().__init__( |
| | dataset_name="HF_DATASET_CLASSROOMS", |
| | data_key="classrooms", |
| | local_filename="classrooms.json" |
| | ) |
| | self.classrooms = self._load() |
| | |
| | def _load(self): |
| | """Carga los cursos desde HF Dataset o archivo JSON local.""" |
| | if self.use_hf: |
| | data = self._load_from_hf() |
| | if data: |
| | print(f"[ClassroomManager] Loaded {len(data)} courses from HF Dataset") |
| | return data |
| | |
| | if data is not None: |
| | print("[ClassroomManager] HF Dataset is empty, checking local...") |
| | else: |
| | print("[ClassroomManager] Failed to load from HF, checking local...") |
| | |
| | data = self._load_from_local() |
| | print(f"[ClassroomManager] Loaded {len(data)} courses from local JSON") |
| | return data |
| |
|
| | def save(self): |
| | """Guarda la lista de cursos en HF Dataset y/o archivo JSON.""" |
| | |
| | self._save_to_local(self.classrooms) |
| | |
| | |
| | if self.use_hf: |
| | success = self._save_to_hf(self.classrooms) |
| | if success: |
| | print(f"[ClassroomManager] Saved {len(self.classrooms)} courses to HF Dataset") |
| | else: |
| | print("[ClassroomManager] Failed to save to HF, data saved locally only") |
| |
|
| | def get_courses(self, creator_id=None): |
| | if creator_id: |
| | return [c for c in self.classrooms if c.get('creator_id') == creator_id] |
| | return self.classrooms |
| |
|
| | def get_course(self, course_id): |
| | return next((c for c in self.classrooms if c['id'] == course_id), None) |
| |
|
| | def create_course(self, name, creator_id=None): |
| | course = { |
| | "id": str(uuid.uuid4()), |
| | "name": name, |
| | "creator_id": creator_id, |
| | "students": [] |
| | } |
| | self.classrooms.append(course) |
| | self.save() |
| | return course |
| |
|
| | def add_student(self, course_id, student_name): |
| | course = self.get_course(course_id) |
| | if course: |
| | |
| | if not any(s['name'] == student_name for s in course['students']): |
| | student_id = str(uuid.uuid4()) |
| | course['students'].append({ |
| | "id": student_id, |
| | "name": student_name, |
| | "attendance": [] |
| | }) |
| | self.save() |
| | return True |
| | return False |
| |
|
| | def record_attendance(self, student_identifier): |
| | """Registra asistencia buscando por ID o Nombre.""" |
| | today = datetime.datetime.now().strftime("%Y-%m-%d") |
| | recorded = False |
| | student_info = None |
| | |
| | for course in self.classrooms: |
| | for student in course['students']: |
| | |
| | s_id = student.get('id') |
| | if s_id == student_identifier or student['name'] == student_identifier: |
| | if today not in student['attendance']: |
| | student['attendance'].append(today) |
| | recorded = True |
| | student_info = { |
| | "id": s_id, |
| | "name": student['name'], |
| | "course_id": course['id'] |
| | } |
| | |
| | if recorded: |
| | self.save() |
| | |
| | return student_info |
| |
|
| | def delete_course(self, course_id): |
| | """Elimina un curso por ID.""" |
| | self.classrooms = [c for c in self.classrooms if c['id'] != course_id] |
| | self.save() |
| | return True |
| |
|
| | def delete_student(self, course_id, student_id): |
| | """Elimina un estudiante de un curso.""" |
| | course = self.get_course(course_id) |
| | if course: |
| | course['students'] = [s for s in course['students'] if s.get('id') != student_id] |
| | self.save() |
| | return True |
| | return False |
| |
|
| | user_mgr = UserManager() |
| | classroom_manager = ClassroomManager() |
| |
|
| | @login_manager.user_loader |
| | def load_user(user_id): |
| | """Cargador de usuario para Flask-Login desde el JSON.""" |
| | user_data = user_mgr.get_by_id(user_id) |
| | if user_data and user_data.get('status') == "ACTIVE": |
| | return User(user_data['id'], user_data['username']) |
| | return None |
| |
|
| | |
| | class LoanManager(HFDatasetManager): |
| | """Clase para manejar la persistencia de préstamos en HF Datasets.""" |
| | |
| | def __init__(self): |
| | super().__init__( |
| | dataset_name="HF_DATASET_LOANS", |
| | data_key="loans", |
| | local_filename="prestamos.json" |
| | ) |
| | self.loans = self._load() |
| | |
| | def _load(self): |
| | """Carga los préstamos desde HF Dataset o archivo JSON local.""" |
| | if self.use_hf: |
| | data = self._load_from_hf() |
| | if data: |
| | print(f"[LoanManager] Loaded {len(data)} loans from HF Dataset") |
| | return data |
| | |
| | if data is not None: |
| | print("[LoanManager] HF Dataset is empty, checking local...") |
| | else: |
| | print("[LoanManager] Failed to load from HF, checking local...") |
| | |
| | data = self._load_from_local() |
| | print(f"[LoanManager] Loaded {len(data)} loans from local JSON") |
| | return data |
| |
|
| | def save(self): |
| | """Guarda la lista actual de préstamos en HF Dataset y/o archivo JSON.""" |
| | |
| | self._save_to_local(self.loans) |
| | |
| | |
| | if self.use_hf: |
| | success = self._save_to_hf(self.loans) |
| | if success: |
| | print(f"[LoanManager] Saved {len(self.loans)} loans to HF Dataset") |
| | else: |
| | print("[LoanManager] Failed to save to HF, data saved locally only") |
| |
|
| | def add_loan(self, loan): |
| | """Añade un nuevo préstamo a la lista y guarda en disco.""" |
| | self.loans.append(loan) |
| | self.save() |
| |
|
| | def update_status(self, loan_id, status): |
| | """Actualiza el estado de un préstamo existente.""" |
| | for loan in self.loans: |
| | if loan["id"] == loan_id: |
| | loan["status_loan"] = status |
| | self.save() |
| | return True |
| | return False |
| |
|
| | def get_all(self): |
| | """Retorna todos los préstamos registrados.""" |
| | return self.loans |
| |
|
| | loan_mgr = LoanManager() |
| |
|
| | |
| | class FaceManager(HFDatasetManager): |
| | """Clase para manejar descriptores de rostros para asistencia.""" |
| | |
| | def __init__(self): |
| | super().__init__( |
| | dataset_name="HF_DATASET_FACES", |
| | data_key="faces", |
| | local_filename="faces.json" |
| | ) |
| | self.faces = self._load() |
| | |
| | if isinstance(self.faces, list): |
| | print(f"[FaceManager] Migrating list to dict...") |
| | self.faces = { |
| | "descriptors": self.faces, |
| | "attendance_log": [] |
| | } |
| | self.save() |
| | |
| | def _load(self): |
| | """Carga los rostros desde HF Dataset o archivo JSON local.""" |
| | if self.use_hf: |
| | data = self._load_from_hf() |
| | if data: |
| | print(f"[FaceManager] Loaded data from HF Dataset") |
| | return data |
| | |
| | if data is not None: |
| | print("[FaceManager] HF Dataset is empty, checking local...") |
| | else: |
| | print("[FaceManager] Failed to load from HF, checking local...") |
| | |
| | data = self._load_from_local() |
| | print(f"[FaceManager] Loaded data from local JSON") |
| | return data |
| |
|
| | def save(self): |
| | """Guarda la lista de rostros en HF Dataset y/o archivo JSON.""" |
| | |
| | self._save_to_local(self.faces) |
| | |
| | |
| | if self.use_hf: |
| | success = self._save_to_hf(self.faces) |
| | if success: |
| | print(f"[FaceManager] Saved to HF Dataset") |
| | else: |
| | print("[FaceManager] Failed to save to HF, data saved locally only") |
| |
|
| | def add_face(self, label, descriptor, student_id=None, course_id=None): |
| | if not isinstance(self.faces, dict): |
| | self.faces = {"descriptors": [], "attendance_log": []} |
| | |
| | self.faces["descriptors"].append({ |
| | "label": label, |
| | "descriptor": descriptor, |
| | "student_id": student_id, |
| | "course_id": course_id, |
| | "timestamp": datetime.datetime.now().isoformat() |
| | }) |
| | self.save() |
| |
|
| | def log_attendance(self, student_id, student_name, course_id, status="PRESENT"): |
| | if not isinstance(self.faces, dict): |
| | self.faces = {"descriptors": [], "attendance_log": []} |
| | |
| | now = datetime.datetime.now() |
| | date_str = now.strftime("%Y-%m-%d") |
| | time_str = now.strftime("%H:%M:%S") |
| | |
| | record = { |
| | "student_id": student_id, |
| | "student_name": student_name, |
| | "course_id": course_id, |
| | "date": date_str, |
| | "time": time_str, |
| | "status": status |
| | } |
| | |
| | self.faces["attendance_log"].append(record) |
| | self.save() |
| | |
| | |
| | try: |
| | csv_dir = os.path.join(os.getcwd(), "data", "attendance") |
| | os.makedirs(csv_dir, exist_ok=True) |
| | filename = f"{student_id}_{student_name.replace(' ', '_')}.csv" |
| | filepath = os.path.join(csv_dir, filename) |
| | |
| | file_exists = os.path.isfile(filepath) |
| | with open(filepath, mode='a', newline='', encoding='utf-8') as f: |
| | writer = csv.DictWriter(f, fieldnames=["Fecha", "Hora", "Estado", "Nombre", "Curso ID"]) |
| | if not file_exists: |
| | writer.writeheader() |
| | writer.writerow({ |
| | "Fecha": date_str, |
| | "Hora": time_str, |
| | "Estado": status, |
| | "Nombre": student_name, |
| | "Curso ID": course_id |
| | }) |
| | except Exception as e: |
| | print(f"[CSV ERROR] Error writing attendance CSV: {e}") |
| |
|
| | def get_all(self): |
| | if isinstance(self.faces, dict): |
| | return self.faces.get("descriptors", []) |
| | return self.faces |
| |
|
| | face_mgr = FaceManager() |
| |
|
| | |
| | TG_TOKEN = os.getenv("TELEGRAM_TOKEN") |
| | TG_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID") |
| | try: |
| | if TG_CHAT_ID: TG_CHAT_ID = int(TG_CHAT_ID) |
| | except: |
| | pass |
| |
|
| | |
| | GOOGLE_PROXY_URL = os.getenv("GOOGLE_PROXY_URL") or "https://script.google.com/macros/s/AKfycbz7z1Jb0vsur42GmmqrL3PVXeRkN2WxSojFDIleEDoLOg6MnrmJjb_uuPcQ15CTwyzD/exec" |
| |
|
| | if TG_TOKEN: |
| | if GOOGLE_PROXY_URL: |
| | print("[BOT] Usando Google Proxy URL") |
| | base_url = GOOGLE_PROXY_URL.split('?')[0] |
| | telebot.apihelper.API_URL = base_url + "?path={1}&token={0}" |
| | else: |
| | print("[BOT] Usando conexión directa a Telegram") |
| | |
| | |
| | |
| | telebot.apihelper.CONNECT_TIMEOUT = 90 |
| | telebot.apihelper.READ_TIMEOUT = 90 |
| |
|
| | bot = telebot.TeleBot(TG_TOKEN) if TG_TOKEN else None |
| |
|
| | def escape_md(text): |
| | """Escapa caracteres para Markdown de Telegram.""" |
| | if not text: return "" |
| | for char in ['_', '*', '[', '`']: |
| | text = text.replace(char, f"\\{char}") |
| | return text |
| |
|
| | def mark_as_delivered(loan_id): |
| | """Lógica central para marcar un préstamo como entregado físicamente.""" |
| | loan = None |
| | for l in loan_mgr.get_all(): |
| | if l['id'] == loan_id: |
| | loan = l |
| | break |
| | |
| | if loan and loan['status_loan'] == "ACCEPTED": |
| | |
| | utc_now = datetime.datetime.now(datetime.timezone.utc) |
| | rd_now = utc_now - datetime.timedelta(hours=4) |
| | ahora = rd_now.strftime("%H:%M") |
| | |
| | loan['status_loan'] = "DELIVERED" |
| | loan['delivered_at'] = rd_now.isoformat() |
| | loan_mgr.save() |
| | socketio.emit('notification', {"text": f"{loan['Solicitante']} ha entregado", "color": "blue"}) |
| | return True, loan, ahora |
| | return False, None, None |
| |
|
| | if bot: |
| | @bot.callback_query_handler(func=lambda call: True) |
| | def handle_query(call): |
| | try: |
| | if call.data.startswith("accept_"): |
| | loan_id = call.data.replace("accept_", "") |
| | if loan_mgr.update_status(loan_id, "ACCEPTED"): |
| | bot.answer_callback_query(call.id, "Préstamo Aceptado") |
| | |
| | |
| | markup = types.InlineKeyboardMarkup() |
| | markup.add(types.InlineKeyboardButton("📦 MARCAR ENTREGADO", callback_data=f"deliver_{loan_id}")) |
| | |
| | nuevo_texto = f"✅ *ACEPTADO*\n{escape_md(call.message.text)}" |
| | bot.edit_message_text(nuevo_texto, call.message.chat.id, call.message.message_id, |
| | reply_markup=markup, parse_mode="Markdown") |
| | socketio.emit('notification', {"text": f"Préstamo {loan_id[:8]} ACEPTADO", "color": "green"}) |
| | |
| | elif call.data.startswith("decline_"): |
| | loan_id = call.data.replace("decline_", "") |
| | if loan_mgr.update_status(loan_id, "DECLINED"): |
| | bot.answer_callback_query(call.id, "Préstamo Declinado") |
| | nuevo_texto = f"❌ *DECLINADO*\n{escape_md(call.message.text)}" |
| | bot.edit_message_text(nuevo_texto, call.message.chat.id, call.message.message_id, parse_mode="Markdown") |
| | socketio.emit('notification', {"text": f"Préstamo {loan_id[:8]} DECLINADO", "color": "red"}) |
| | |
| | elif call.data.startswith("deliver_"): |
| | loan_id = call.data.replace("deliver_", "") |
| | success, loan, ahora = mark_as_delivered(loan_id) |
| | if success: |
| | bot.answer_callback_query(call.id, "Entrega Confirmada") |
| | nuevo_texto = f"📦 *ENTREGADO A LAS {ahora}*\n{escape_md(call.message.text)}" |
| | bot.edit_message_text(nuevo_texto, call.message.chat.id, call.message.message_id, parse_mode="Markdown") |
| | else: |
| | bot.answer_callback_query(call.id, "Error: El préstamo no está aceptado o no existe", show_alert=True) |
| | |
| | elif call.data.startswith("approve_user_"): |
| | username = call.data.replace("approve_user_", "") |
| | if user_mgr.activate_user(username): |
| | bot.answer_callback_query(call.id, "Usuario aprobado") |
| | bot.edit_message_text(f"✅ *APROBADO:* El usuario `{username}` ya puede iniciar sesión.", |
| | call.message.chat.id, call.message.message_id, parse_mode="Markdown") |
| | socketio.emit('notification', {"text": f"Usuario {username} APROBADO", "color": "green"}) |
| | |
| | elif call.data.startswith("decline_user_"): |
| | username = call.data.replace("decline_user_", "") |
| | if user_mgr.delete_user(username): |
| | bot.answer_callback_query(call.id, "Usuario rechazado") |
| | bot.edit_message_text(f"❌ *RECHAZADO:* El registro de `{username}` ha sido eliminado.", |
| | call.message.chat.id, call.message.message_id, parse_mode="Markdown") |
| | |
| | except Exception as e: |
| | print(f"Callback Error: {e}") |
| |
|
| | @bot.message_handler(commands=['aceptar', 'declinar', 'entregado', 'status']) |
| | def handle_text_commands(message): |
| | try: |
| | text = message.text.split() |
| | if len(text) < 2: |
| | bot.reply_to(message, "Uso: /aceptar <id>, /declinar <id> o /entregado <id>") |
| | return |
| | cmd = text[0][1:]; loan_id = text[1] |
| | |
| | if cmd == "entregado": |
| | success, loan, ahora = mark_as_delivered(loan_id) |
| | if success: |
| | bot.reply_to(message, f"📦 Entrega confirmada para {loan['Solicitante']} a las {ahora}") |
| | else: |
| | bot.reply_to(message, "No se pudo marcar como entregado. Verifica el ID y el estado.") |
| | else: |
| | status = "ACCEPTED" if cmd == "aceptar" else "DECLINED" |
| | if loan_mgr.update_status(loan_id, status): |
| | emoji = "✅" if status == "ACCEPTED" else "❌" |
| | bot.reply_to(message, f"{emoji} Préstamo {loan_id} actualizado a {status}") |
| | socketio.emit('notification', {"text": f"Préstamo {loan_id[:8]} {status}", "color": "green" if status == "ACCEPTED" else "red"}) |
| | except Exception as e: |
| | print(f"Command Error: {e}") |
| |
|
| | def start_bot_thread(): |
| | if not bot: |
| | return |
| |
|
| | |
| | try: |
| | |
| | lock_file_path = "/tmp/tg_bot.lock" |
| | |
| | |
| | |
| | global _bot_lock_file |
| | _bot_lock_file = open(lock_file_path, "w") |
| | fcntl.flock(_bot_lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) |
| | except (IOError, OSError): |
| | |
| | print("[BOT] Otra instancia detectada. No se iniciará el bot en este hilo.") |
| | return |
| |
|
| | |
| | time.sleep(5) |
| | |
| | try: |
| | bot.delete_webhook() |
| | print("[BOT] Webhook eliminado.") |
| | except Exception as e: |
| | print(f"[BOT] Error eliminando webhook: {e}") |
| | |
| | print("[BOT] Iniciando infinity_polling...") |
| | |
| | |
| | |
| | bot.infinity_polling(timeout=90, long_polling_timeout=30) |
| |
|
| | if bot: |
| | threading.Thread(target=start_bot_thread, daemon=True).start() |
| |
|
| | |
| |
|
| | @app.route('/') |
| | def index(): |
| | return render_template('index.html', title="MAKER SPACE") |
| |
|
| | @app.route('/login', methods=['GET', 'POST']) |
| | def login(): |
| | if current_user.is_authenticated: |
| | return redirect(url_for('miembros')) |
| | |
| | if request.method == 'POST': |
| | username = request.form.get('username') |
| | password = request.form.get('password') |
| | |
| | user_data = user_mgr.get_by_username(username) |
| | |
| | if user_data: |
| | if user_data.get('status') != "ACTIVE": |
| | flash("Tu cuenta está pendiente de aprobación por un administrador.", "orange") |
| | return render_template('login.html') |
| | |
| | if check_password_hash(user_data['password'], password): |
| | user = User(user_data['id'], user_data['username']) |
| | login_user(user) |
| | flash(f"¡Bienvenido de nuevo, {username}!", "green") |
| | return redirect(url_for('miembros')) |
| | |
| | flash("Usuario o contraseña incorrectos", "red") |
| | |
| | return render_template('login.html', title="Login") |
| |
|
| | @app.route('/register', methods=['GET', 'POST']) |
| | def register(): |
| | if current_user.is_authenticated: |
| | return redirect(url_for('miembros')) |
| |
|
| | if request.method == 'POST': |
| | username = request.form.get('username') |
| | email = request.form.get('email') |
| | password = request.form.get('password') |
| | confirm_password = request.form.get('confirm_password') |
| |
|
| | if password != confirm_password: |
| | flash("Las contraseñas no coinciden", "red") |
| | return render_template('register.html') |
| |
|
| | if user_mgr.add_user(username, password, email, status="PENDING"): |
| | |
| | if bot and TG_CHAT_ID: |
| | try: |
| | markup = types.InlineKeyboardMarkup() |
| | markup.add( |
| | types.InlineKeyboardButton("✅ Aprobar", callback_data=f"approve_user_{username}"), |
| | types.InlineKeyboardButton("❌ Rechazar", callback_data=f"decline_user_{username}") |
| | ) |
| | bot.send_message(TG_CHAT_ID, f"👤 *NUEVO REGISTRO:* `{username}` solicita acceso.", |
| | reply_markup=markup, parse_mode="Markdown") |
| | except Exception as e: |
| | print(f"Error TG Register: {e}") |
| | |
| | flash("Registro enviado. Un administrador debe aprobar tu cuenta.", "blue") |
| | return redirect(url_for('login')) |
| | else: |
| | flash("El nombre de usuario ya existe.", "red") |
| |
|
| | return render_template('register.html', title="Registro") |
| |
|
| | @app.route('/logout') |
| | @login_required |
| | def logout(): |
| | logout_user() |
| | flash("Has cerrado sesión", "blue") |
| | return redirect(url_for('index')) |
| |
|
| | def send_email(to_email, subject, body): |
| | """Envía un correo electrónico usando Resend (100% gratis hasta 100 emails/día).""" |
| | resend_api_key = os.getenv("RESEND_API_KEY") |
| | from_email = os.getenv("FROM_EMAIL", "onboarding@resend.dev") |
| |
|
| | if not resend_api_key: |
| | print("[EMAIL ERROR] Falta RESEND_API_KEY en variables de entorno.") |
| | return False |
| |
|
| | try: |
| | resend.api_key = resend_api_key |
| | |
| | params = { |
| | "from": from_email, |
| | "to": [to_email], |
| | "subject": subject, |
| | "text": body, |
| | } |
| | |
| | email_response = resend.Emails.send(params) |
| | print(f"[EMAIL SUCCESS] Correo enviado a {to_email}: {email_response}") |
| | return True |
| | except Exception as e: |
| | print(f"[EMAIL ERROR] Error enviando correo con Resend: {e}") |
| | return False |
| |
|
| | @app.route('/forgot-password', methods=['GET', 'POST']) |
| | def forgot_password(): |
| | if current_user.is_authenticated: |
| | return redirect(url_for('miembros')) |
| | |
| | if request.method == 'POST': |
| | email = request.form.get('email') |
| | token = user_mgr.generate_reset_token(email) |
| | |
| | if token: |
| | reset_url = url_for('reset_password', token=token, _external=True) |
| | |
| | |
| | subject = "Recuperación de Contraseña - Maker Space" |
| | body = f"Hola,\n\nHemos recibido una solicitud para restablecer tu contraseña. Haz clic en el siguiente enlace para continuar:\n\n{reset_url}\n\nSi no solicitaste este cambio, puedes ignorar este correo.\n\nEste enlace caducará en 1 hora." |
| | |
| | if send_email(email, subject, body): |
| | flash("Se ha enviado un enlace de recuperación a tu correo.", "blue") |
| | else: |
| | |
| | print(f"\n[EMAIL FALLBACK] Para: {email}") |
| | print(f"[EMAIL FALLBACK] Enlace: {reset_url}\n") |
| | flash("Hubo un problema enviando el correo. Contacta al administrador o revisa los logs.", "orange") |
| | else: |
| | |
| | flash("Si el correo está registrado, recibirás un enlace de recuperación.", "blue") |
| | |
| | return redirect(url_for('login')) |
| | |
| | return render_template('forgot_password.html', title="Recuperar Contraseña") |
| |
|
| | @app.route('/reset-password/<token>', methods=['GET', 'POST']) |
| | def reset_password(token): |
| | if current_user.is_authenticated: |
| | return redirect(url_for('miembros')) |
| | |
| | user = user_mgr.verify_reset_token(token) |
| | if not user: |
| | flash("El enlace de recuperación es inválido o ha expirado.", "red") |
| | return redirect(url_for('forgot_password')) |
| | |
| | if request.method == 'POST': |
| | password = request.form.get('password') |
| | confirm_password = request.form.get('confirm_password') |
| | |
| | if password != confirm_password: |
| | flash("Las contraseñas no coinciden", "red") |
| | return render_template('reset_password.html', title="Nueva Contraseña") |
| | |
| | if user_mgr.update_password(user['id'], password): |
| | flash("Contraseña actualizada correctamente. Ya puedes iniciar sesión.", "green") |
| | return redirect(url_for('login')) |
| | else: |
| | flash("Error al actualizar la contraseña.", "red") |
| | |
| | return render_template('reset_password.html', title="Nueva Contraseña") |
| |
|
| | @app.route('/prestamos') |
| | def prestamos(): |
| | loans = loan_mgr.get_all() |
| | return render_template('prestamos.html', title="Préstamos", loans=loans) |
| |
|
| | @app.route('/api/prestamo', methods=['POST']) |
| | def api_prestamo(): |
| | try: |
| | data = request.json |
| | if not data: |
| | return jsonify({"status": "error", "message": "No data received"}), 400 |
| | |
| | solicitante = data.get('solicitante') |
| | fecha = data.get('fecha') |
| | hora_salida = data.get('hora_salida') |
| | hora_retorno = data.get('hora_retorno') |
| | items_list = data.get('items', []) |
| | |
| | if not solicitante: |
| | return jsonify({"status": "error", "message": "Solicitante es requerido"}), 400 |
| | |
| | |
| | items_desc = [] |
| | for it in items_list: |
| | items_desc.append(f"• {it['descripcion']} ({it['cantidad']}) [{it['categoria']}]") |
| | |
| | full_items_string = "\n".join(items_desc) |
| | loan_id = str(uuid.uuid4()) |
| | |
| | new_loan = { |
| | "id": loan_id, |
| | "Solicitante": solicitante, |
| | "fecha": fecha, |
| | "hora": hora_salida, |
| | "devolucion": hora_retorno, |
| | "item": full_items_string, |
| | "status_loan": "PENDING", |
| | "requested_at": datetime.datetime.now().isoformat(), |
| | "timestamp": datetime.datetime.now().isoformat() |
| | } |
| | |
| | loan_mgr.add_loan(new_loan) |
| | |
| | |
| | if bot and TG_CHAT_ID: |
| | try: |
| | msg = f"📦 *NUEVA SOLICITUD DE PRÉSTAMO*\n\n" |
| | msg += f"👤 *Solicitante:* {escape_md(solicitante)}\n" |
| | msg += f"📅 *Fecha:* {escape_md(fecha)}\n" |
| | msg += f"🕒 *Horario:* {hora_salida} - {hora_retorno}\n" |
| | msg += f"🛠 *Herramientas:*\n{escape_md(full_items_string)}" |
| | |
| | markup = types.InlineKeyboardMarkup() |
| | markup.add( |
| | types.InlineKeyboardButton("✅ ACEPTAR", callback_data=f"accept_{loan_id}"), |
| | types.InlineKeyboardButton("❌ DECLINAR", callback_data=f"decline_{loan_id}") |
| | ) |
| | bot.send_message(TG_CHAT_ID, msg, reply_markup=markup, parse_mode="Markdown") |
| | except Exception as tg_e: |
| | print(f"Error TG Loan: {tg_e}") |
| | |
| | |
| | socketio.emit('notification', {"text": f"Nueva solicitud de {solicitante}", "color": "orange"}) |
| | |
| | return jsonify({"status": "success", "loan_id": loan_id}) |
| | except Exception as e: |
| | print(f"API Prestamo Error: {e}") |
| | return jsonify({"status": "error", "message": str(e)}), 500 |
| |
|
| | |
| |
|
| | @app.route('/miembros') |
| | @login_required |
| | def miembros(): |
| | return render_template('miembros.html', title="Acceso Miembros") |
| |
|
| | @app.route('/classroom') |
| | def classroom(): |
| | return render_template('classroom.html', title="Classroom Maker") |
| |
|
| | @app.route('/asistencia') |
| | def asistencia(): |
| | return render_template('asistencia.html', title="Toma de Asistencia") |
| |
|
| | @app.route('/tutoria') |
| | def tutoria(): |
| | return render_template('tutoria.html', title="Tutoría y Guías") |
| |
|
| | @app.route('/guia/registro') |
| | def guia_registro(): |
| | return render_template('guia_registro.html', title="Guía de Registro") |
| |
|
| | @app.route('/guia/herramientas') |
| | def guia_herramientas(): |
| | return render_template('guia_herramientas.html', title="Guía de Herramientas") |
| |
|
| | @app.route('/guia/asistencia') |
| | def guia_asistencia(): |
| | return render_template('guia_asistencia.html', title="Guía de Asistencia") |
| |
|
| | @app.route('/guia/gestion') |
| | def guia_gestion(): |
| | return render_template('guia_gestion.html', title="Guía de Gestión") |
| |
|
| | |
| | |
| | @app.route('/classroom/dashboard') |
| | @login_required |
| | def classroom_dashboard(): |
| | courses = classroom_manager.get_courses(creator_id=current_user.id) |
| | return render_template('classroom_dashboard.html', title="Gestión de Aulas", courses=courses) |
| |
|
| | @app.route('/classroom/create', methods=['POST']) |
| | @login_required |
| | def create_course(): |
| | name = request.form.get('name') |
| | if name: |
| | classroom_manager.create_course(name, creator_id=current_user.id) |
| | flash('Curso creado exitosamente', 'green') |
| | else: |
| | flash('El nombre del curso es requerido', 'red') |
| | return redirect(url_for('classroom_dashboard')) |
| |
|
| | @app.route('/classroom/<course_id>') |
| | @login_required |
| | def course_details(course_id): |
| | course = classroom_manager.get_course(course_id) |
| | if not course or course.get('creator_id') != current_user.id: |
| | flash('Curso no encontrado o no tienes permiso para verlo', 'red') |
| | return redirect(url_for('classroom_dashboard')) |
| | return render_template('course_details.html', title=course['name'], course=course) |
| |
|
| | @app.route('/classroom/<course_id>/add_student', methods=['POST']) |
| | @login_required |
| | def add_student(course_id): |
| | names_raw = request.form.get('student_name') |
| | if names_raw: |
| | |
| | names_list = [n.strip() for n in names_raw.split(',') if n.strip()] |
| | |
| | success_count = 0 |
| | for name in names_list: |
| | if classroom_manager.add_student(course_id, name): |
| | success_count += 1 |
| | |
| | if success_count > 0: |
| | flash(f'{success_count} estudiante(s) agregado(s)', 'green') |
| | if success_count < len(names_list): |
| | flash('Algunos estudiantes no se pudieron agregar (posiblemente duplicados)', 'orange') |
| | |
| | return redirect(url_for('course_details', course_id=course_id)) |
| |
|
| | @app.route('/classroom/<course_id>/delete', methods=['POST']) |
| | @login_required |
| | def delete_course(course_id): |
| | classroom_manager.delete_course(course_id) |
| | flash('Curso eliminado', 'green') |
| | return redirect(url_for('classroom_dashboard')) |
| |
|
| | @app.route('/classroom/<course_id>/delete_student/<student_id>', methods=['POST']) |
| | @login_required |
| | def delete_student(course_id, student_id): |
| | if classroom_manager.delete_student(course_id, student_id): |
| | flash('Estudiante eliminado', 'green') |
| | else: |
| | flash('Error al eliminar estudiante', 'red') |
| | return redirect(url_for('course_details', course_id=course_id)) |
| |
|
| | @app.route('/api/courses') |
| | def api_courses(): |
| | """Devuelve la lista de cursos y sus estudiantes para el frontend.""" |
| | |
| | |
| | if current_user.is_authenticated: |
| | courses = classroom_manager.get_courses(creator_id=current_user.id) |
| | else: |
| | courses = [] |
| | return jsonify(courses) |
| |
|
| | @app.route('/api/faces', methods=['GET', 'POST']) |
| | def api_faces(): |
| | if request.method == 'POST': |
| | data = request.json |
| | label = data.get('label') |
| | descriptor = data.get('descriptor') |
| | student_id = data.get('student_id') |
| | course_id = data.get('course_id') |
| | |
| | if label and descriptor: |
| | face_mgr.add_face(label, descriptor, student_id, course_id) |
| | return jsonify({"status": "success"}) |
| | |
| | |
| | return jsonify(face_mgr.get_all()) |
| |
|
| | @app.route('/api/attendance', methods=['POST']) |
| | def api_attendance(): |
| | data = request.json |
| | label = data.get('label') |
| | |
| | if label: |
| | print(f"[ASISTENCIA] Registrando: {label}") |
| | |
| | info = classroom_manager.record_attendance(label) |
| | if info: |
| | |
| | face_mgr.log_attendance(info['id'], info['name'], info['course_id']) |
| | socketio.emit('notification', {'text': f'Bienvenido/a {label}', 'color': 'green'}) |
| | return jsonify({"status": "success", "message": f"Asistencia registrada para {label}"}) |
| | else: |
| | socketio.emit('notification', {'text': f'Hola {label} (No inscrito)', 'color': 'blue'}) |
| | return jsonify({"status": "warning", "message": "Registrado pero no vinculado a curso"}) |
| |
|
| | return jsonify({"status": "error", "message": "No label provided"}), 400 |
| |
|
| | class AttendancePDF(FPDF): |
| | def header(self): |
| | self.set_font('Arial', 'B', 15) |
| | self.cell(0, 10, 'MAKER SPACE - REPORTE DE ASISTENCIA', 0, 1, 'C') |
| | self.ln(5) |
| |
|
| | def footer(self): |
| | self.set_y(-15) |
| | self.set_font('Arial', 'I', 8) |
| | self.cell(0, 10, f'Página {self.page_no()}', 0, 0, 'C') |
| |
|
| | @app.route('/export/attendance/<student_id>') |
| | @login_required |
| | def export_attendance(student_id): |
| | |
| | my_courses = {c['id'] for c in classroom_manager.get_courses(creator_id=current_user.id)} |
| |
|
| | |
| | all_logs = [] |
| | if isinstance(face_mgr.faces, dict): |
| | all_logs = face_mgr.faces.get("attendance_log", []) |
| | |
| | |
| | student_logs = [log for log in all_logs if log.get('student_id') == student_id and log.get('course_id') in my_courses] |
| | |
| | if not student_logs: |
| | flash("No hay registros de asistencia para este estudiante en tus cursos.", "orange") |
| | return redirect(request.referrer or url_for('classroom_dashboard')) |
| |
|
| | student_name = student_logs[0].get('student_name', 'Estudiante') |
| | |
| | |
| | pdf = AttendancePDF() |
| | pdf.add_page() |
| | pdf.set_font("Arial", size=12) |
| | |
| | |
| | pdf.set_font("Arial", 'B', 12) |
| | pdf.cell(0, 10, f"Estudiante: {student_name}", 0, 1) |
| | pdf.cell(0, 10, f"ID: {student_id}", 0, 1) |
| | pdf.ln(5) |
| | |
| | |
| | pdf.set_fill_color(200, 220, 255) |
| | pdf.set_font("Arial", 'B', 10) |
| | pdf.cell(40, 10, "Fecha", 1, 0, 'C', 1) |
| | pdf.cell(40, 10, "Hora", 1, 0, 'C', 1) |
| | pdf.cell(40, 10, "Estado", 1, 0, 'C', 1) |
| | pdf.cell(70, 10, "Curso ID", 1, 1, 'C', 1) |
| | |
| | pdf.set_font("Arial", size=10) |
| | for log in student_logs: |
| | pdf.cell(40, 10, log.get('date', '-'), 1) |
| | pdf.cell(40, 10, log.get('time', '-'), 1) |
| | pdf.cell(40, 10, log.get('status', 'PRESENTE'), 1) |
| | pdf.cell(70, 10, log.get('course_id', '-'), 1, 1) |
| | |
| | output = io.BytesIO() |
| | pdf_content = pdf.output() |
| | output.write(pdf_content) |
| | output.seek(0) |
| | |
| | filename = f"Asistencia_{student_name.replace(' ', '_')}.pdf" |
| | return send_file(output, as_attachment=True, download_name=filename, mimetype='application/pdf') |
| |
|
| | @app.route('/repos') |
| | def repos(): |
| | GIT_TOKEN = os.getenv("GITLAB_TOKEN") |
| | GIT_GROUP = os.getenv("GITLAB_GROUP_ID") |
| | projects = [] |
| | if GIT_TOKEN and GIT_GROUP: |
| | try: |
| | gl = gitlab.Gitlab("https://gitlab.com", private_token=GIT_TOKEN) |
| | projects = gl.groups.get(GIT_GROUP).projects.list(all=True) |
| | except: pass |
| | return render_template('repos.html', title="Proyectos", projects=projects) |
| |
|
| | @app.route('/ver/<pid>/<pname>') |
| | def ver_repo(pid, pname): |
| | GIT_TOKEN = os.getenv("GITLAB_TOKEN") |
| | readme_html = "<p>README no disponible</p>" |
| | web_url = "#"; download_url = "#" |
| |
|
| | if GIT_TOKEN: |
| | try: |
| | gl = gitlab.Gitlab("https://gitlab.com", private_token=GIT_TOKEN) |
| | project = gl.projects.get(pid) |
| | web_url = project.web_url |
| | download_url = f"https://gitlab.com/api/v4/projects/{pid}/repository/archive.zip?private_token={GIT_TOKEN}" |
| | |
| | for branch in ["main", "master"]: |
| | try: |
| | f = project.files.get(file_path='README.md', ref=branch) |
| | readme_text = base64.b64decode(f.content).decode("utf-8") |
| | readme_html = markdown.markdown(readme_text, extensions=['fenced_code', 'tables']) |
| | break |
| | except: continue |
| | except: pass |
| | |
| | return render_template('ver_repo.html', title=pname, project_name=pname, readme_html=readme_html, web_url=web_url, download_url=download_url) |
| |
|
| | if __name__ == '__main__': |
| | port = int(os.getenv("PORT", 7860)) |
| | socketio.run(app, host="0.0.0.0", port=port, debug=True) |