cloud-drive-ui / app.py
e2dew32's picture
Upload 3 files
030ee81 verified
import os
import streamlit as st
from huggingface_hub import HfApi
import tempfile
import shutil
st.set_page_config(page_title="☁️ HF Cloud Drive", page_icon="☁️", layout="wide")
REPO_ID = "e2dew32/cloud-drive"
REPO_TYPE = "dataset"
@st.cache_resource
def get_api():
return HfApi()
def format_size(size_bytes):
if size_bytes is None:
return "—"
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / 1024 / 1024:.1f} MB"
else:
return f"{size_bytes / 1024 / 1024 / 1024:.2f} GB"
# ---- Sidebar ----
with st.sidebar:
st.header("☁️ HF Cloud Drive")
st.markdown(f"**仓库:** `{REPO_ID}`")
st.markdown("---")
if st.button("🔄 刷新", use_container_width=True):
st.rerun()
# ---- Main ----
st.title("☁️ HF Cloud Drive")
st.caption(f"后端仓库: [{REPO_ID}](https://huggingface.co/datasets/{REPO_ID})")
api = get_api()
# ---- Upload ----
with st.expander("📤 上传文件", expanded=True):
uploaded_files = st.file_uploader("拖拽或选择文件上传", accept_multiple_files=True, label_visibility="collapsed")
if uploaded_files:
if st.button("🚀 开始上传", type="primary", use_container_width=True):
progress_bar = st.progress(0, text="上传中...")
success_count = 0
for i, uploaded in enumerate(uploaded_files):
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded.name)[1]) as tmp:
tmp.write(uploaded.getvalue())
tmp_path = tmp.name
try:
api.upload_file(
path_or_fileobj=tmp_path,
path_in_repo=uploaded.name,
repo_id=REPO_ID,
repo_type=REPO_TYPE,
commit_message=f"Upload {uploaded.name} via Cloud Drive",
)
success_count += 1
except Exception as e:
st.error(f"❌ {uploaded.name}: {e}")
finally:
os.unlink(tmp_path)
progress_bar.progress((i + 1) / len(uploaded_files), text=f"已上传 {i+1}/{len(uploaded_files)}")
if success_count == len(uploaded_files):
st.success(f"✅ 全部上传成功 ({success_count}/{len(uploaded_files)})")
else:
st.warning(f"⚠️ 部分上传成功 ({success_count}/{len(uploaded_files)})")
st.rerun()
# ---- File Browser ----
st.subheader("📁 文件列表")
try:
tree = list(api.list_repo_tree(repo_id=REPO_ID, repo_type=REPO_TYPE, recursive=True))
except Exception as e:
st.error(f"无法获取文件列表: {e}")
st.info("请确保 Hugging Face Token 已配置,且在 Space Settings 中设置了 HF_TOKEN secret。")
tree = []
# Distinguish files vs directories: folders have tree_id, files have blob_id
files = sorted([f for f in tree if hasattr(f, "blob_id")], key=lambda x: x.path)
dirs = sorted([f for f in tree if hasattr(f, "tree_id")], key=lambda x: x.path)
total_size = sum(getattr(f, "size", 0) or 0 for f in files)
st.info(f"📊 {len(files)} 个文件 | {len(dirs)} 个目录 | 总大小: {format_size(total_size)}")
if not files:
st.info("仓库为空,上传一些文件开始使用吧!")
else:
# Directories
for d in dirs:
st.markdown(f"📁 **`{d.path}/`**")
# Files
for f in files:
cols = st.columns([5, 1, 2])
fname = f.path.split("/")[-1]
cols[0].markdown(f"📄 `{f.path}`")
cols[1].text(format_size(getattr(f, "size", None)))
# Download
try:
dl_path = api.hf_hub_download(repo_id=REPO_ID, filename=f.path, repo_type=REPO_TYPE)
with open(dl_path, "rb") as fh:
data = fh.read()
cols[2].download_button(
label="⬇️",
data=data,
file_name=fname,
key=f"dl_{f.path}",
)
except Exception:
cols[2].button("⬇️", disabled=True, key=f"dl_{f.path}")
# ---- Delete ----
st.markdown("---")
st.subheader("🗑️ 删除文件")
if files:
to_delete = st.multiselect("选择要删除的文件", [f.path for f in files])
if to_delete and st.button("确认删除", type="primary"):
for fp in to_delete:
try:
api.delete_file(path_in_repo=fp, repo_id=REPO_ID, repo_type=REPO_TYPE, commit_message=f"Delete {fp}")
st.success(f"✅ 已删除: {fp}")
except Exception as e:
st.error(f"❌ 删除 {fp} 失败: {e}")
st.rerun()
else:
st.info("没有文件可删除")