Spaces:
Runtime error
Runtime error
from huggingface_hub import hf_hub_download | |
import os | |
import time | |
import gradio as gr | |
import yt_dlp | |
import sys | |
import uuid | |
import traceback | |
import tensorflow | |
import csv | |
embed_html1 = '<iframe width="560" height="315" src="https://www.youtube.com/embed/' | |
embed_html2 = '" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>' | |
# NO GPU | |
# os.environ['CUDA_VISIBLE_DEVICES'] = '-1' | |
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" | |
# tensorflow.config.threading.set_intra_op_parallelism_threads(8) | |
# tensorflow.config.threading.set_inter_op_parallelism_threads(8) | |
print(tensorflow.config.list_physical_devices("GPU")) | |
# | |
python_path = hf_hub_download( | |
repo_id=os.environ["REPO_ID"], | |
repo_type="space", | |
filename=os.environ["MODEL_FILE"], | |
use_auth_token=os.environ["TOKEN"], | |
) | |
# print("PATH : ", python_path) | |
sys.path.append(os.environ["PRIVATE_DIR"]) | |
from models import * | |
max_results = 100 | |
max_output = 50 | |
# global (faster) | |
ind = get_index() | |
ind_filenames = get_audio_names() | |
catalog = get_catalog() | |
url_dict = get_durl_myma() | |
dict_catalog = get_dict_catalog() | |
def download_audio(id_video): | |
id = id_video.split("?v=")[-1][:11] | |
file_name = f"youtube_video_{id}.mp3" | |
with yt_dlp.YoutubeDL( | |
{"extract_audio": True, "format": "bestaudio", "outtmpl": file_name} | |
) as video: | |
video.download(id_video) | |
embed_html_all = embed_html1 + id + embed_html2 | |
return file_name, embed_html_all | |
def process_url(input_path): | |
# setup the client | |
# try : | |
file_name, embed_html_all = download_audio(input_path) | |
return process(file_name, embed_html_all) | |
# except: | |
# return "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "" | |
def process_file(input_path): | |
return process(input_path, "")[1:] | |
def process(file_name, embed_html_all): | |
"""try :""" | |
emb, _ = get_embed(file_name) | |
try: | |
os.remove(file_name) | |
except: | |
print(traceback.format_exc()) | |
# => global | |
timestart = time.time() | |
_, I = do_search(emb, ind) | |
print("search time :", time.time() - timestart) | |
tops = get_topN(I, ind_filenames, url_dict, catalog, max_results) | |
formated = [{"f": "Choose a result to play", "t": ""}] | |
output_csv = f"{file_name}_results.csv" | |
with open(output_csv, "w") as w: | |
writer = csv.writer(w) | |
header = False | |
for position, top in enumerate(tops): | |
if len(formated) / 2 >= max_output: | |
break | |
file = os.path.splitext(os.path.basename(top))[0] | |
if file in dict_catalog: | |
if not header: | |
writer.writerow(list(dict_catalog[file].keys())) | |
header = True | |
writer.writerow(dict_catalog[file].values()) | |
else: | |
writer.writerow([file, "no metadata provided"]) | |
formated.append({"f": f"{position+1} - {file}", "t": top}) | |
return (embed_html_all, output_csv, formated) | |
"""except: | |
return embed_html_all, "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "" | |
""" | |
with gr.Blocks() as demo: | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Row(): | |
# gr.HTML(embed_html) | |
html = gr.HTML() | |
with gr.Row(): | |
with gr.Column(): | |
audio_url_input = gr.Textbox( | |
placeholder="YouTube video URL", label="YouTube video URL" | |
) | |
analyze_url_btn = gr.Button("Search from URL") | |
with gr.Row(): | |
with gr.Column(): | |
audio_input_file = gr.Audio(type="filepath", label="Audio Input") | |
analyze_file_btn = gr.Button("Search from file") | |
with gr.Row(): | |
with gr.Column(): | |
csv_results = gr.File(label="Results as CSV") | |
with gr.Row(): | |
with gr.Column(): | |
results = gr.JSON(visible=False) | |
select_results = gr.Dropdown(label="Results", choices=[]) | |
audio_player = gr.Audio(None, label="Results player") | |
def change_audio(value): | |
if value: | |
return gr.Audio(value, label="Results player") | |
return gr.Audio(None, label="Results player") | |
def update_select(json_results): | |
try: | |
print("change dropdown") | |
return gr.Dropdown( | |
label="Results", | |
choices=[(k["f"], k["t"]) for k in json_results], | |
value=None, | |
) | |
except: | |
return gr.Dropdown( | |
choices=[], | |
label="Results", | |
) | |
def cleanup_on_file(): | |
print("cleanup on file change") | |
return ( | |
gr.Textbox( | |
placeholder="YouTube video URL", label="YouTube video URL" | |
), | |
gr.JSON([{"f": "Choose a result to play", "t": ""}], visible=False), | |
gr.Dropdown(label="Results", choices=[]), | |
gr.File(None, label="Results as CSV"), | |
gr.Audio(None, label="Results player"), | |
gr.HTML(""), | |
) | |
def cleanup_on_url(): | |
print("cleanup on url change") | |
return ( | |
gr.Audio(None, type="filepath", label="Audio Input"), | |
gr.JSON([{"f": "Choose a result to play", "t": ""}], visible=False), | |
gr.Dropdown(choices=[], label="Results"), | |
gr.File(None, label="Results as CSV"), | |
gr.Audio(None, label="Results player"), | |
gr.HTML(""), | |
) | |
analyze_url_btn.click( | |
process_url, | |
inputs=[audio_url_input], | |
outputs=[html, csv_results, results], | |
) | |
gr.Examples( | |
examples=[ | |
"https://www.youtube.com/watch?v=aNzCDt2eidg", | |
"https://www.youtube.com/watch?v=NBE-uBgtINg", | |
"https://www.youtube.com/watch?v=5NV6Rdv1a3I", | |
"https://www.youtube.com/watch?v=OiC1rgCPmUQ", # | |
"https://www.youtube.com/watch?v=dRX0wDNK6S4", # | |
"https://www.youtube.com/watch?v=Guzu9aAeDIc", | |
], | |
inputs=[audio_url_input], | |
outputs=[html, csv_results, results], | |
fn=process_url, | |
cache_examples=False, | |
examples_per_page=20, | |
run_on_click=True, | |
) | |
analyze_file_btn.click( | |
process_file, | |
inputs=[audio_input_file], | |
outputs=[csv_results, results], | |
) | |
demo.launch(debug=False) | |