Omnibus's picture
Update dl.py
91b7ed6 verified
import os
import sys
import uuid
import gradio as gr
import requests
import json
from zipfile import ZipFile
import shutil
from pathlib import Path
from huggingface_hub import (create_repo,get_full_repo_name,upload_file,CommitOperationAdd,HfApi,snapshot_download)
from PIL import Image
uid = uuid.uuid4()
#token = os.environ['HF_TOKEN']
#token_self = os.environ['HF_TOKEN']
#o=os.environ['P']
def show_s(name,token=""):
spaces=[]
spaces.append("")
api = HfApi(token=token)
author=name
s_ist = (api.list_spaces(author=author))
for i,space in enumerate(s_ist):
space_ea = space.id.split("/",1)[1]
#s_info=api.space_info(f'{name}/{space}',files_metadata=True)
#print(s_info)
spaces.append(space_ea)
#print (space_ea)
return(gr.update(label="Spaces", choices=[s for s in spaces], interactive=True))
def show_f(repo,name,token=""):
api = HfApi(token=token)
f_ist = (api.list_repo_files(repo_id=f'{repo}/{name}', repo_type="space"))
print (f_ist)
file_list = []
#file_out = []
if not os.path.exists(name):
os.makedirs(name)
for d_app in f_ist:
if "/" in d_app:
dir_1=d_app.split("/",1)[0]
rem_1=d_app.split("/",1)[1]
if not os.path.exists(f'{name}/{dir_1}'):
os.makedirs(f'{name}/{dir_1}')
if "/" in rem_1:
dir_2=rem_1.split("/",1)[0]
rem_2=rem_1.split("/",1)[1]
if not os.path.exists(f'{name}/{dir_1}/{dir_2}'):
os.makedirs(f'{name}/{dir_1}/{dir_2}')
sf=rem_2.split(".",1)[1]
pf=rem_2.split(".",1)[0]
f_name=f'{dir_1}/{dir2}/{pf}.{sf}'
else:
sf=rem_1.split(".",1)[1]
pf=rem_1.split(".",1)[0]
f_name=f'{dir_1}/{pf}.{sf}'
print(f_name)
else:
sf=d_app.split(".",1)[1]
pf=d_app.split(".",1)[0]
f_name=f'{pf}.{sf}'
pass
r = requests.get(f'https://huggingface.co/spaces/{repo}/{name}/raw/main/{d_app}')
print(d_app)
#print (r.text)
uid = uuid.uuid4()
file = open(f'{name}/{f_name}','wb')
file.write(r.content)
file.close()
file_list.append(f'{name}/{f_name}')
with ZipFile(f"{name}.zip", "w") as zipObj:
for idx, file in enumerate(f_ist):
zipObj.write(f'{name}/{file}')
file_list.append(f'{name}.zip')
s_info=api.space_info(f'{repo}/{name}',files_metadata=True)
print(f's_info:: {s_info}')
'''
with open(f'{uid}-tmp.json','w') as f:
json.dump(s_info,f,indent=4)
f.close()
'''
info_json = api.get_space_runtime(f'{repo}/{name}')
print(info_json.raw)
return(gr.update(label="Files", choices=[f for f in f_ist],interactive=True), file_list,file_list, s_info)
def show_f2(repo,name,token=""):
api = HfApi(token=token)
#f_ist = snapshot_download(repo_id=f'{repo}/{name}', repo_type="space")
f_ist = (api.list_repo_files(repo_id=f'{repo}/{name}', repo_type="space"))
print (f_ist)
file_list = []
file_out = []
if not os.path.exists(name):
os.makedirs(name)
for d_app in f_ist:
r = requests.get(f'https://huggingface.co/spaces/{repo}/{name}/raw/main/{d_app}')
#print (r.text)
uid = uuid.uuid4()
sf=d_app.split(".",1)[1]
pf=d_app.split(".",1)[0]
f_name=f'{pf}.{sf}'
file = open(f'{name}/{f_name}','w')
file.writelines(r.text)
file.close()
file_list.append(Path(f'{name}/{f_name}'))
file_out.append(d_app)
with ZipFile(f"{name}.zip", "w") as zipObj:
for idx, file in enumerate(f_ist):
zipObj.write(f'{name}/{file}')
file_list.append(f'{name}.zip')
return(gr.Dropdown.update(label="Files", choices=[f for f in f_ist],interactive=True), file_list)
def show_file_content(repo,name,file,token=""):
print(f'https://huggingface.co/spaces/{repo}/{name}/raw/main/{file}')
r = requests.get(f'https://huggingface.co/spaces/{repo}/{name}/raw/main/{file}')
text=r.text
html_text += f'<pre style="text-wrap:pretty;">{text}</pre>\n'
out_text = r.text
return(out_text)
def show_f_cont(repo,name,file,token=""):
html_text = '<html>\n<body>\n<div>\n'
images=[".png" , ".jpg" , ".gif" , ".webm" , ".mp4"]
is_im=False
for x in images:
if x in file:
html_text += f'<object data="https://huggingface.co/spaces/{repo}/{name}/resolve/main/{file}" style="width:100%;font-size:small;"></object>'
out_text = "Image File"
is_im=True
else:
pass
if is_im==False:
print(f'https://huggingface.co/spaces/{repo}/{name}/raw/main/{file}')
r = requests.get(f'https://huggingface.co/spaces/{repo}/{name}/raw/main/{file}')
text=r.text
html_text += f'<pre style="text-wrap:pretty;">{text}</pre>\n'
out_text = r.text
else:
pass
html_text += '</div>\n</body>\n</html>'
return(html_text)
def show_f_frame(repo,name,file,token=""):
file_url=f'https://huggingface.co/spaces/{repo}/{name}/blob/main/{file}'
out=f"""<div><iframe src='{file_url}' height='1200px' width='100%'></iframe>"""
return out
def show_f_frame2(repo,name,file,token=""):
file_url=f'https://huggingface.co/spaces/{repo}/{name}'
out=f"""<div><iframe src='{file_url}' height='1200px' width='100%'></iframe>"""
return out
def show_all(author,token=""):
spaces=[]
api = HfApi(token=token)
#author=name
s_ist = (api.list_spaces(author=author))
file_list = []
file_list_ea=[]
for i,space in enumerate(s_ist):
space_ea = space.id.split("/",1)[1]
spaces.append(space_ea)
#print (space_ea)
f_ist = (api.list_repo_files(repo_id=f'{author}/{space_ea}', repo_type="space"))
#print (f_ist)
if not os.path.exists(space_ea):
os.makedirs(space_ea)
for d_app in f_ist:
r = requests.get(f'https://huggingface.co/spaces/{author}/{space_ea}/raw/main/{d_app}')
#print (r.text)
uid = uuid.uuid4()
try:
sf=d_app.split(".",1)[1]
pf=d_app.split(".",1)[0]
f_name=f'{pf}.{sf}'
file = open(f'{space_ea}/{f_name}','w')
file.writelines(r.text)
file.close()
#file_list_ea.append(Path(f'{space}/{f_name}'))
except Exception:
pass
with ZipFile(f"{space_ea}.zip", "w") as zipObj:
for idx, file in enumerate(f_ist):
try:
zipObj.write(f'{space_ea}/{file}')
except Exception:
pass
file_list.append(f'{space_ea}.zip')
yield file_list
with ZipFile(f"{author}.zip", "w") as zipObj:
for idx, file in enumerate(file_list):
try:
zipObj.write(f'{file}')
except Exception:
pass
file_list.append(f'{author}.zip')
yield file_list
def show_all_z(author,token=""):
spaces=[]
api = HfApi(token=token)
#author=name
s_ist = (api.list_spaces(author=author))
file_list = []
file_list_ea=[]
for i,space in enumerate(s_ist):
space_ea = space.id.split("/",1)[1]
spaces.append(space_ea)
#print (space_ea)
f_ist = (api.list_repo_files(repo_id=f'{author}/{space_ea}', repo_type="space"))
#print (f_ist)
if not os.path.exists(space_ea):
os.makedirs(space_ea)
file= snapshot_download(repo_id=f'{author}/{space_ea}', repo_type="space")
shutil.make_archive(f"{space_ea}", 'zip', file)
file_list.append(f'{space_ea}.zip')
yield file_list
with ZipFile(f"{author}.zip", "w") as zipObj:
for idx, file in enumerate(file_list):
try:
zipObj.write(f'{file}')
except Exception:
pass
file_list.append(f'{author}.zip')
yield file_list
def dl_checked_fn(author,checklist,token=""):
spaces=[]
api = HfApi(token=token)
#author=name
s_ist = checklist
file_list = []
file_list_ea=[]
for i,space_ea in enumerate(s_ist):
#spaces.append(space_ea)
#print (space_ea)
f_ist = (api.list_repo_files(repo_id=f'{author}/{space_ea}', repo_type="space"))
#print (f_ist)
if not os.path.exists(space_ea):
os.makedirs(space_ea)
file= snapshot_download(repo_id=f'{author}/{space_ea}', repo_type="space")
shutil.make_archive(f"{space_ea}", 'zip', file)
file_list.append(f'{space_ea}.zip')
yield file_list
with ZipFile(f"{author}.zip", "w") as zipObj:
for idx, file in enumerate(file_list):
try:
zipObj.write(f'{file}')
except Exception:
pass
file_list.append(f'{author}.zip')
yield file_list
def get_space_runtime(author,checklist,token=""):
api=HfApi(token=token)
space_info=[]
for space in checklist:
outp=api.get_space_runtime(f'{author}/{space}')
space_info.append(outp['stage'])
return space_info
def delete_checked(confirm_val,author,checklist,token=""):
if confirm_val=="CONFIRM":
api = HfApi(token=token)
s_ist = checklist
delete_list = []
for i,space_ea in enumerate(s_ist):
try:
api.delete_repo(repo_id=f'{author}/{space_ea}',repo_type='space')
delete_list.append(f'Deleted:: {space_ea}\n')
yield delete_list,gr.update(visible=False)
except Exception as e:
yield e
else:
yield "Not Deleting", gr.update(visible=False)
def checkp(p):
if p == o:
return gr.update(visible=False), gr.update(visible=True)
elif p != o:
return None, None
def update_checked_message(inp):
html=""
for ea in inp:
html=f'{html}<br>{ea}'
return html
def ru_sure_fn():
return gr.update(visible=True)