| | import os |
| | import time |
| | import shutil |
| | import zipfile |
| | import gradio as gr |
| | from fastapi import FastAPI |
| |
|
| | |
| | BASE_UPLOAD_DIR = "bins" |
| | EXPIRATION_HOURS = 24 |
| |
|
| | if not os.path.exists(BASE_UPLOAD_DIR): |
| | os.makedirs(BASE_UPLOAD_DIR) |
| |
|
| | def clean_old_bins(): |
| | now = time.time() |
| | cutoff = now - (EXPIRATION_HOURS * 3600) |
| | if os.path.exists(BASE_UPLOAD_DIR): |
| | for bin_id in os.listdir(BASE_UPLOAD_DIR): |
| | bin_path = os.path.join(BASE_UPLOAD_DIR, bin_id) |
| | if os.path.getmtime(bin_path) < cutoff: |
| | shutil.rmtree(bin_path) |
| |
|
| | |
| | def upload_to_bin(bin_id, files, progress=gr.Progress()): |
| | clean_old_bins() |
| | if not bin_id or not files: return "⚠️ Thiếu mã Bin hoặc File!", None |
| | bin_id = bin_id.strip() |
| | bin_path = os.path.join(BASE_UPLOAD_DIR, bin_id) |
| | if not os.path.exists(bin_path): os.makedirs(bin_path) |
| | |
| | uploaded_list = [] |
| | for i, file in enumerate(files): |
| | progress((i + 1) / len(files), desc=f"Đang xử lý {i+1}/{len(files)}...") |
| | dest_path = os.path.join(bin_path, os.path.basename(file.name)) |
| | shutil.copy(file.name, dest_path) |
| | uploaded_list.append(dest_path) |
| | os.utime(bin_path, None) |
| | return f"✅ Đã tải lên Bin: {bin_id}", uploaded_list |
| |
|
| | def get_bin_files(bin_id): |
| | clean_old_bins() |
| | if not bin_id: return "⚠️ Nhập mã Bin!", None |
| | bin_path = os.path.join(BASE_UPLOAD_DIR, bin_id.strip()) |
| | if not os.path.exists(bin_path): return "❌ Không tìm thấy Bin.", None |
| | files = [os.path.join(bin_path, f) for f in os.listdir(bin_path) if f != "all_files.zip"] |
| | return f"📂 Bin: {bin_id}", files |
| |
|
| | def download_all_zip(bin_id): |
| | bin_path = os.path.join(BASE_UPLOAD_DIR, bin_id.strip()) |
| | zip_path = os.path.join(bin_path, "all_files.zip") |
| | files = [f for f in os.listdir(bin_path) if f != "all_files.zip"] |
| | with zipfile.ZipFile(zip_path, 'w') as zipf: |
| | for f in files: zipf.write(os.path.join(bin_path, f), f) |
| | return zip_path |
| |
|
| | def delete_bin(bin_id): |
| | bin_path = os.path.join(BASE_UPLOAD_DIR, bin_id.strip()) |
| | if os.path.exists(bin_path): |
| | shutil.rmtree(bin_path) |
| | return "🗑️ Đã xóa Bin.", None |
| | return "❌ Không tìm thấy.", None |
| |
|
| | |
| | with gr.Blocks(theme=gr.themes.Default()) as demo: |
| | gr.Markdown("# 🚀 HF FileBin Pro") |
| | bin_id_input = gr.Textbox(label="Mã Bin (Private ID)") |
| | |
| | with gr.Row(): |
| | with gr.Column(): |
| | file_input = gr.File(label="Upload", file_count="multiple") |
| | btn_upload = gr.Button("📤 Tải lên", variant="primary") |
| | with gr.Column(): |
| | status_msg = gr.Textbox(label="Trạng thái") |
| | file_output = gr.File(label="Danh sách file") |
| | with gr.Row(): |
| | btn_access = gr.Button("🔍 Kiểm tra") |
| | btn_zip = gr.Button("📦 Tải ZIP") |
| | btn_delete = gr.Button("🗑️ Xóa", variant="stop") |
| |
|
| | btn_upload.click(upload_to_bin, [bin_id_input, file_input], [status_msg, file_output]) |
| | btn_access.click(get_bin_files, [bin_id_input], [status_msg, file_output]) |
| | btn_zip.click(download_all_zip, [bin_id_input], [file_output]) |
| | btn_delete.click(delete_bin, [bin_id_input], [status_msg, file_output]) |
| |
|
| | |
| | |
| | app = demo.app |
| |
|
| | @app.get("/health") |
| | def health(): |
| | return {"status": "ok"} |
| |
|
| | if __name__ == "__main__": |
| | |
| | demo.launch(server_name="0.0.0.0", server_port=7860) |
| |
|