Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify, send_file | |
| from flask_cors import CORS | |
| import os, io, jwt, hashlib, math, datetime | |
| from datetime import timedelta | |
| from functools import wraps | |
| from firebase_admin import credentials, db, initialize_app | |
| from openpyxl import Workbook | |
| import cv2 | |
| import numpy as np | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Konfigurasi JWT | |
| app.config['JWT_SECRET'] = 'super-secret-key-2025' | |
| app.config['JWT_EXPIRATION'] = timedelta(hours=6) | |
| # === FIREBASE === | |
| cred = credentials.Certificate("serviceAccountKey.json") | |
| initialize_app(cred, { | |
| 'databaseURL': 'https://absensiwajah-4b6e8-default-rtdb.asia-southeast1.firebasedatabase.app' | |
| }) | |
| # === Helper === | |
| def hash_password(password): | |
| return hashlib.sha256(password.encode()).hexdigest() | |
| def calculate_distance(lat1, lon1, lat2, lon2): | |
| """ | |
| Calculate distance between two coordinates in meters | |
| using Haversine formula | |
| """ | |
| # Convert degrees to radians | |
| lat1_rad = math.radians(lat1) | |
| lon1_rad = math.radians(lon1) | |
| lat2_rad = math.radians(lat2) | |
| lon2_rad = math.radians(lon2) | |
| # Haversine formula | |
| dlon = lon2_rad - lon1_rad | |
| dlat = lat2_rad - lat1_rad | |
| a = math.sin(dlat/2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon/2)**2 | |
| c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) | |
| # Earth radius in meters | |
| R = 6371000 | |
| distance = R * c | |
| return distance | |
| def get_wib_time(): | |
| """Get current time in WIB (UTC+7)""" | |
| utc_now = datetime.datetime.utcnow() | |
| wib_time = utc_now + timedelta(hours=7) | |
| return wib_time | |
| def token_required(f): | |
| def decorated(*args, **kwargs): | |
| token = None | |
| if 'Authorization' in request.headers: | |
| token = request.headers['Authorization'].split(" ")[1] | |
| if not token: | |
| return jsonify({'error': 'Token hilang!'}), 401 | |
| try: | |
| data = jwt.decode(token, app.config['JWT_SECRET'], algorithms=["HS256"]) | |
| request.user = data | |
| except Exception as e: | |
| return jsonify({'error': 'Token tidak valid', 'detail': str(e)}), 401 | |
| return f(*args, **kwargs) | |
| return decorated | |
| # === ROUTES === | |
| def home(): | |
| return jsonify({'message': 'API Absensi Aktif ✅'}) | |
| # === ✅ USER REGISTER (MODIFIED - SIMPAN FACE ENCODING) === | |
| def register(): | |
| try: | |
| # Ambil data dari FormData | |
| file = request.files.get('file') | |
| name = request.form.get('name') | |
| user_id = request.form.get('user_id') | |
| password = request.form.get('password') | |
| if not all([file, name, user_id, password]): | |
| return jsonify({'success': False, 'error': 'Data tidak lengkap'}), 400 | |
| # Simpan file sementara untuk processing | |
| temp_path = f"temp_register_{user_id}.jpg" | |
| file.save(temp_path) | |
| try: | |
| # === FACE ENCODING PROCESS === | |
| from deepface import DeepFace | |
| # Extract face embedding/encoding | |
| embedding_objs = DeepFace.represent( | |
| img_path=temp_path, | |
| model_name="VGG-Face", | |
| detector_backend="opencv", | |
| enforce_detection=False | |
| ) | |
| if not embedding_objs: | |
| return jsonify({'success': False, 'error': 'Tidak ada wajah terdeteksi'}), 400 | |
| # Ambil embedding pertama (wajah utama) | |
| face_embedding = embedding_objs[0]["embedding"] # Ini array of numbers | |
| # Cek user_id sudah ada atau belum | |
| ref = db.reference('users') | |
| users = ref.get() or {} | |
| for uid, u in users.items(): | |
| if u.get('user_id') == user_id: | |
| return jsonify({'success': False, 'error': 'User sudah ada'}), 400 | |
| # Simpan data user + face encoding ke Firebase | |
| new_user = { | |
| 'name': name, | |
| 'user_id': user_id, | |
| 'password': hash_password(password), | |
| 'face_encoding': face_embedding, # ⬅️ SIMPAN ENCODING, BUKAN FOTO | |
| 'registered_at': get_wib_time().isoformat() # ⬅️ WIB TIME | |
| } | |
| ref.push(new_user) | |
| print(f"✅ User {user_id} berhasil diregistrasi dengan face encoding") | |
| return jsonify({'success': True, 'user': {'name': name, 'user_id': user_id}}), 200 | |
| except Exception as e: | |
| print("❌ Error face encoding:", str(e)) | |
| return jsonify({'success': False, 'error': f'Gagal memproses wajah: {str(e)}'}), 500 | |
| finally: | |
| # Hapus file temporary | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| except Exception as e: | |
| print("❌ Error register:", str(e)) | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| # === USER LOGIN === | |
| def user_login(): | |
| try: | |
| # Bisa handle JSON dan FormData | |
| data = request.get_json(silent=True) or request.form.to_dict() | |
| # Tangkap berbagai kemungkinan nama key | |
| user_id = data.get('user_id') or data.get('id') or data.get('username') or data.get('userId') | |
| password_raw = data.get('password') or data.get('pass') or data.get('pwd') | |
| if not user_id or not password_raw: | |
| return jsonify({'success': False, 'error': 'Data tidak lengkap (user_id/password hilang)'}), 400 | |
| password = hash_password(password_raw) | |
| users = db.reference('users').get() or {} | |
| for uid, u in users.items(): | |
| if u.get('user_id') == user_id and u.get('password') == password: | |
| # Jangan kirim face encoding ke frontend untuk keamanan | |
| user_data = {k: v for k, v in u.items() if k != 'face_encoding'} | |
| return jsonify({'success': True, 'user': user_data}), 200 | |
| return jsonify({'success': False, 'error': 'Login gagal, user atau password salah'}), 401 | |
| except Exception as e: | |
| print("❌ Error login:", str(e)) | |
| return jsonify({'success': False, 'error': str(e)}), 500 | |
| # === ADMIN LOGIN === | |
| def admin_login(): | |
| data = request.get_json() | |
| username = data.get('username') | |
| password = data.get('password') | |
| if username == 'admin' and password == 'admin123': | |
| token = jwt.encode({ | |
| 'username': username, | |
| 'role': 'admin', | |
| 'exp': datetime.datetime.utcnow() + app.config['JWT_EXPIRATION'] | |
| }, app.config['JWT_SECRET'], algorithm='HS256') | |
| return jsonify({'success': True, 'token': token, 'role': 'admin'}) | |
| return jsonify({'error': 'Akses ditolak'}), 401 | |
| # === VERIFY TOKEN === | |
| def verify_token(): | |
| data = request.get_json() | |
| token = data.get('token') | |
| try: | |
| jwt.decode(token, app.config['JWT_SECRET'], algorithms=["HS256"]) | |
| return jsonify({'success': True, 'authenticated': True}) | |
| except: | |
| return jsonify({'success': False, 'authenticated': False}), 401 | |
| # === ABSENSI (MODIFIED - PAKAI FACE ENCODING) === | |
| def attendance(): | |
| try: | |
| file = request.files.get("file") | |
| if not file: | |
| return jsonify({"success": False, "error": "Gambar tidak ditemukan"}), 400 | |
| # Simpan file sementara | |
| temp_path = "temp_attendance.jpg" | |
| file.save(temp_path) | |
| latitude = request.form.get("latitude", type=float) | |
| longitude = request.form.get("longitude", type=float) | |
| # Inisialisasi referensi database | |
| users_ref = db.reference('users') | |
| attendance_ref = db.reference('attendance') | |
| settings_ref = db.reference('location_settings') | |
| # === CEK LOKASI === | |
| loc_settings = settings_ref.get() or {} | |
| location_status = {"verified": True, "message": "Lokasi tidak diatur"} | |
| if loc_settings and loc_settings.get("enabled"): | |
| if not latitude or not longitude: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| return jsonify({"success": False, "error": "Lokasi GPS tidak terdeteksi"}), 400 | |
| dist = calculate_distance( | |
| latitude, longitude, | |
| loc_settings["latitude"], | |
| loc_settings["longitude"] | |
| ) | |
| verified = dist <= loc_settings["radius"] | |
| location_status = { | |
| "verified": verified, | |
| "message": f"Jarak {round(dist)}m dari {loc_settings.get('location_name', 'lokasi')}", | |
| "distance": round(dist) | |
| } | |
| if not verified: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| return jsonify({ | |
| "success": False, | |
| "error": f"Lokasi tidak valid! Maksimal jarak {loc_settings['radius']}m" | |
| }), 400 | |
| # === FACE RECOGNITION DENGAN ENCODING === | |
| best_match, best_score = None, 0 | |
| users = users_ref.get() or {} | |
| print(f"🔍 Scan wajah untuk {len(users)} user...") | |
| try: | |
| from deepface import DeepFace | |
| # Extract encoding dari foto absen | |
| login_embeddings = DeepFace.represent( | |
| img_path=temp_path, | |
| model_name="VGG-Face", | |
| detector_backend="opencv", | |
| enforce_detection=False | |
| ) | |
| if not login_embeddings: | |
| return jsonify({"success": False, "error": "Tidak ada wajah terdeteksi di foto absen"}), 400 | |
| login_embedding = login_embeddings[0]["embedding"] | |
| # Compare dengan semua user yang terdaftar | |
| for uid, user in users.items(): | |
| if 'face_encoding' not in user: | |
| continue # Skip user tanpa encoding | |
| stored_encoding = user['face_encoding'] | |
| # Hitung similarity (cosine similarity) | |
| from numpy import dot | |
| from numpy.linalg import norm | |
| A = np.array(stored_encoding) | |
| B = np.array(login_embedding) | |
| cosine_similarity = dot(A, B) / (norm(A) * norm(B)) | |
| # Jika similarity tinggi dan lebih baik dari sebelumnya | |
| if cosine_similarity > 0.6 and cosine_similarity > best_score: # Threshold 0.6 | |
| best_score = cosine_similarity | |
| best_match = { | |
| "user_id": user["user_id"], | |
| "name": user["name"], | |
| "similarity": cosine_similarity | |
| } | |
| print(f"✅ Cocok dengan {user['user_id']}: {cosine_similarity:.3f}") | |
| except ImportError: | |
| print("❌ DeepFace tidak tersedia") | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| return jsonify({"success": False, "error": "Sistem face recognition error"}), 500 | |
| # HASIL | |
| if best_match: | |
| print(f"🎉 BERHASIL: {best_match['name']} (Score: {best_score:.3f})") | |
| # PAKAI WIB TIME | |
| wib_time = get_wib_time() | |
| attendance_data = { | |
| "user_id": best_match["user_id"], | |
| "name": best_match["name"], | |
| "timestamp": wib_time.isoformat(), # ⬅️ WIB TIME | |
| "similarity": round(best_score, 3), | |
| "location_verified": location_status["verified"], | |
| "location_message": location_status["message"], | |
| "status": "present", | |
| "latitude": latitude, | |
| "longitude": longitude | |
| } | |
| attendance_ref.push(attendance_data) | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| return jsonify({ | |
| "success": True, | |
| "recognized_user": { | |
| "user_id": best_match["user_id"], | |
| "name": best_match["name"] | |
| }, | |
| "similarity": round(best_score, 3), | |
| "location": location_status, | |
| "timestamp": wib_time.strftime("%Y-%m-%d %H:%M:%S WIB") # ⬅️ FORMAT WIB | |
| }) | |
| else: | |
| print("❌ Tidak ada wajah yang cocok") | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| return jsonify({"success": False, "error": "Wajah tidak dikenali"}), 400 | |
| except Exception as e: | |
| print("❌ Error:", str(e)) | |
| if 'temp_path' in locals() and os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| return jsonify({"success": False, "error": "Gagal memproses wajah"}), 500 | |
| # === RIWAYAT USER === | |
| def attendance_records(): | |
| records = db.reference('attendance').get() | |
| if not records: | |
| return jsonify({'success': True, 'records': []}) | |
| # Convert UTC timestamps to WIB for display | |
| records_list = [] | |
| for record in records.values(): | |
| record_data = record.copy() | |
| # Jika timestamp dalam format ISO, convert ke WIB | |
| if 'timestamp' in record_data: | |
| try: | |
| utc_time = datetime.datetime.fromisoformat(record_data['timestamp'].replace('Z', '+00:00')) | |
| wib_time = utc_time + timedelta(hours=7) | |
| record_data['timestamp_display'] = wib_time.strftime("%Y-%m-%d %H:%M:%S WIB") | |
| except: | |
| record_data['timestamp_display'] = record_data['timestamp'] | |
| records_list.append(record_data) | |
| return jsonify({'success': True, 'records': records_list}) | |
| # === DASHBOARD === | |
| def admin_dashboard(): | |
| users = db.reference('users').get() or {} | |
| attendance = db.reference('attendance').get() or {} | |
| stats = { | |
| 'totalUsers': len(users), | |
| 'totalTransactions': len(attendance), | |
| 'averageScore': sum(r.get('similarity', 0) for r in attendance.values()) / max(1, len(attendance)), | |
| 'activeMonths': len(set(r['timestamp'][:7] for r in attendance.values())), | |
| 'location_enabled': True, | |
| 'invalid_location_today': 0 | |
| } | |
| return jsonify({'success': True, 'statistics': stats}) | |
| # === USERS LIST === | |
| def get_users(): | |
| users = db.reference('users').get() or {} | |
| # Jangan kirim face encoding ke frontend | |
| safe_users = {} | |
| for uid, user_data in users.items(): | |
| safe_users[uid] = {k: v for k, v in user_data.items() if k != 'face_encoding'} | |
| return jsonify({'success': True, 'users': safe_users}) | |
| # === DELETE USER === | |
| def delete_user(user_id): | |
| users_ref = db.reference('users') | |
| users = users_ref.get() or {} | |
| for uid, u in users.items(): | |
| if u.get('user_id') == user_id: | |
| users_ref.child(uid).delete() | |
| return jsonify({'success': True}) | |
| return jsonify({'error': 'User tidak ditemukan'}), 404 | |
| # === LOCATION SETTINGS === | |
| def location_settings(): | |
| ref = db.reference('location_settings') | |
| if request.method == 'GET': | |
| data = ref.get() or { | |
| 'enabled': False, | |
| 'latitude': -6.2088, | |
| 'longitude': 106.8456, | |
| 'radius': 100, | |
| 'location_name': 'Kantor Pusat' | |
| } | |
| return jsonify({'success': True, 'settings': data}) | |
| else: | |
| new_data = request.get_json() | |
| ref.set(new_data) | |
| return jsonify({'success': True, 'settings': new_data}) | |
| # === MONTHLY RECORDS === | |
| def monthly_records(): | |
| month = request.args.get('month') | |
| attendance = db.reference('attendance').get() or {} | |
| filtered = [r for r in attendance.values() if r['timestamp'].startswith(month)] | |
| return jsonify({'success': True, 'records': filtered}) | |
| # === EXPORT EXCEL === | |
| def export_excel(): | |
| month = request.args.get('month') | |
| attendance = db.reference('attendance').get() or {} | |
| wb = Workbook() | |
| ws = wb.active | |
| ws.title = "Absensi" | |
| ws.append(["Nama", "User ID", "Tanggal", "Waktu", "Kemiripan", "Lokasi Valid", "Status"]) | |
| for r in attendance.values(): | |
| if not month or r['timestamp'].startswith(month): | |
| # Convert UTC to WIB for Excel export | |
| try: | |
| utc_time = datetime.datetime.fromisoformat(r['timestamp'].replace('Z', '+00:00')) | |
| wib_time = utc_time + timedelta(hours=7) | |
| date_str = wib_time.strftime("%Y-%m-%d") | |
| time_str = wib_time.strftime("%H:%M:%S WIB") | |
| except: | |
| date_str = r['timestamp'][:10] | |
| time_str = r['timestamp'][11:19] | |
| ws.append([ | |
| r.get('name'), r.get('user_id'), | |
| date_str, time_str, | |
| f"{r.get('similarity', 0)*100:.1f}%", | |
| "Ya" if r.get('location_verified') else "Tidak", | |
| r.get('status') | |
| ]) | |
| file_stream = io.BytesIO() | |
| wb.save(file_stream) | |
| file_stream.seek(0) | |
| return send_file(file_stream, as_attachment=True, download_name=f"laporan-{month}.xlsx") | |
| # === AVAILABLE MONTHS === | |
| def available_months(): | |
| attendance = db.reference('attendance').get() or {} | |
| months = sorted(set(r['timestamp'][:7] for r in attendance.values())) | |
| return jsonify({'success': True, 'months': months}) | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860) |