PierreHanna's picture
Update app.py
4176bf3 verified
from huggingface_hub import hf_hub_download
import os
import time
import gradio as gr
import yt_dlp
import sys
import uuid
import traceback
import tensorflow
import csv
import json
embed_html1 = '<iframe width="560" height="315" src="https://www.youtube.com/embed/'
embed_html2 = '" title="YouTube video player" frameborder="0" allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>'
# NO GPU ?
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
# tensorflow.config.threading.set_intra_op_parallelism_threads(8)
# tensorflow.config.threading.set_inter_op_parallelism_threads(8)
#print(f"Available GPUs {tensorflow.config.list_physical_devices('GPU')}")
#
python_path = hf_hub_download(
repo_id=os.environ["REPO_ID"],
repo_type="space",
filename=os.environ["MODEL_FILE"],
use_auth_token=os.environ["TOKEN"],
)
# print("PATH : ", python_path)
sys.path.append(os.environ["PRIVATE_DIR"])
from models import *
max_results = 100
max_output = 50
# global (faster)
ind = get_index()
ind_filenames = get_audio_names()
catalog = get_catalog()
url_dict = get_durl_myma()
dict_catalog = get_dict_catalog()
fixation_id_to_file_name = {}
for file_name, infos in dict_catalog.items():
# we want only main versions
if infos["Parent fixation id"].strip():
continue
fixation_id_to_file_name[infos["Fixation id"].strip()] = file_name
child_to_parent_filename = {}
count = count_failed = 0
for file_name, infos in dict_catalog.items():
if not infos["Parent fixation id"].strip():
continue
count += 1
try:
child_to_parent_filename[file_name] = fixation_id_to_file_name[
infos["Parent fixation id"].strip()
]
except Exception as e:
print(f"No parent for {file_name} : {e}")
count_failed += 1
print(f"{count_failed} tracks have no parent / {count} tracks")
parent_file_names = set(list(fixation_id_to_file_name.values()))
file_name_to_url = {}
for file_url in url_dict.values():
file_name = os.path.splitext(os.path.basename(file_url))[0]
if file_name not in parent_file_names:
continue
file_name_to_url[file_name] = file_url
parent_file_names = []
fixation_id_to_file_name = []
with open("fixation_id_to_file_name.json", "w") as w, open(
"child_to_parent_filename.json", "w"
) as w2, open("file_name_to_url.json", "w") as w3:
json.dump(fixation_id_to_file_name, w)
json.dump(child_to_parent_filename, w2)
json.dump(file_name_to_url, w3)
parent_file_names = []
fixation_id_to_file_name = {}
def download_audio(id_video):
id = id_video.split("?v=")[-1][:11]
file_name = f"youtube_video_{id}.mp3"
with yt_dlp.YoutubeDL(
{"extract_audio": True, "format": "bestaudio", "outtmpl": file_name}
) as video:
video.download(id_video)
embed_html_all = embed_html1 + id + embed_html2
return file_name, embed_html_all
def process_url(input_path):
# setup the client
# try :
file_name, embed_html_all = download_audio(input_path)
return process(file_name, embed_html_all)
# except:
# return "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", ""
def process_file(input_path):
return process(input_path, "")[1:]
def process(file_name, embed_html_all):
"""try :"""
emb, _ = get_embed(file_name)
try:
os.remove(file_name)
except:
print(traceback.format_exc())
# => global
timestart = time.time()
D, I = do_search(emb, ind)
print("search time :", time.time() - timestart)
print("DEBUG DISTANCES : ", D)
tops = get_topN(I, ind_filenames, url_dict, catalog, max_results)
formated = [{"f": "Choose a result to play", "t": ""}]
output_csv = f"{file_name}_results.csv"
with open(output_csv, "w") as w:
writer = csv.writer(w)
header = False
already = set()
for position, top in enumerate(tops):
if len(formated) / 2 >= max_output:
break
file = os.path.splitext(os.path.basename(top))[0]
try:
file = child_to_parent_filename[file]
top = file_name_to_url[file]
except KeyError:
pass
if file in already:
continue
already.add(file)
file_name = file
if file in dict_catalog:
if not header:
writer.writerow(list(dict_catalog[file].keys()))
header = True
writer.writerow(dict_catalog[file].values())
file_name = dict_catalog[file]["Track name"]
try:
file_name += " - " + dict_catalog[file]["Composer1 full name"]
except:
pass
try:
file_name += " - " + dict_catalog[file]["Album name"]
except:
pass
else:
writer.writerow([file, "no metadata provided"])
formated.append({"f": f"{position+1} - {file_name}", "t": top})
return (embed_html_all, output_csv, formated)
"""except:
return embed_html_all, "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", "", "Erreur Input", ""
"""
with gr.Blocks() as demo:
with gr.Row():
with gr.Column():
with gr.Row():
# gr.HTML(embed_html)
html = gr.HTML()
with gr.Row():
with gr.Column():
audio_url_input = gr.Textbox(
placeholder="YouTube video URL", label="YouTube video URL"
)
analyze_url_btn = gr.Button("Search from URL")
with gr.Row():
with gr.Column():
audio_input_file = gr.Audio(type="filepath", label="Audio Input")
analyze_file_btn = gr.Button("Search from file")
with gr.Row():
with gr.Column():
csv_results = gr.File(label="Results as CSV")
with gr.Row():
with gr.Column():
results = gr.JSON(visible=False)
select_results = gr.Dropdown(label="Results", choices=[])
audio_player = gr.Audio(None, label="Results player")
@select_results.select(inputs=select_results, outputs=audio_player)
def change_audio(value):
if value:
return gr.Audio(value, label="Results player")
return gr.Audio(None, label="Results player")
@results.change(
inputs=results,
outputs=select_results,
)
def update_select(json_results):
try:
print("change dropdown")
return gr.Dropdown(
label="Results",
choices=[(k["f"], k["t"]) for k in json_results],
value=None,
)
except:
return gr.Dropdown(
choices=[],
label="Results",
)
@audio_input_file.change(
outputs=[
audio_url_input,
results,
select_results,
csv_results,
audio_player,
html,
]
)
def cleanup_on_file():
print("cleanup on file change")
return (
gr.Textbox(
placeholder="YouTube video URL", label="YouTube video URL"
),
gr.JSON([{"f": "Choose a result to play", "t": ""}], visible=False),
gr.Dropdown(label="Results", choices=[]),
gr.File(None, label="Results as CSV"),
gr.Audio(None, label="Results player"),
gr.HTML(""),
)
@audio_url_input.change(
outputs=[
audio_input_file,
results,
select_results,
csv_results,
audio_player,
html,
]
)
def cleanup_on_url():
print("cleanup on url change")
return (
gr.Audio(None, type="filepath", label="Audio Input"),
gr.JSON([{"f": "Choose a result to play", "t": ""}], visible=False),
gr.Dropdown(choices=[], label="Results"),
gr.File(None, label="Results as CSV"),
gr.Audio(None, label="Results player"),
gr.HTML(""),
)
analyze_url_btn.click(
process_url,
inputs=[audio_url_input],
outputs=[html, csv_results, results],
)
gr.Examples(
examples=[
"https://www.youtube.com/watch?v=aNzCDt2eidg",
"https://www.youtube.com/watch?v=NBE-uBgtINg",
"https://www.youtube.com/watch?v=5NV6Rdv1a3I",
"https://www.youtube.com/watch?v=OiC1rgCPmUQ", #
"https://www.youtube.com/watch?v=dRX0wDNK6S4", #
"https://www.youtube.com/watch?v=Guzu9aAeDIc",
],
inputs=[audio_url_input],
outputs=[html, csv_results, results],
fn=process_url,
cache_examples=False,
examples_per_page=20,
run_on_click=True,
)
analyze_file_btn.click(
process_file,
inputs=[audio_input_file],
outputs=[csv_results, results],
)
demo.launch(debug=False)