import gradio as gr
import openai
import numpy as np
import time
import base64
import ffmpeg
from sentence_transformers import SentenceTransformer
from audio2numpy import open_audio
import httpx
import json
import os
import requests
import urllib
import pydub
from os import path
from pydub import AudioSegment
import re
MUBERT_LICENSE = os.environ.get('MUBERT_LICENSE')
MUBERT_TOKEN = os.environ.get('MUBERT_TOKEN')
#img_to_text = gr.Blocks.load(name="spaces/pharma/CLIP-Interrogator")
img_to_text = gr.Blocks.load(name="spaces/fffiloni/CLIP-Interrogator-2")
from share_btn import community_icon_html, loading_icon_html, share_js
from utils import get_tags_for_prompts, get_mubert_tags_embeddings
minilm = SentenceTransformer('all-MiniLM-L6-v2')
mubert_tags_embeddings = get_mubert_tags_embeddings(minilm)
##————————————————————————————————————
MUBERT_LICENSE = os.environ.get('MUBERT_LICENSE')
MUBERT_TOKEN = os.environ.get('MUBERT_TOKEN')
##————————————————————————————————————
def get_pat_token():
r = httpx.post('https://api-b2b.mubert.com/v2/GetServiceAccess',
json={
"method": "GetServiceAccess",
"params": {
"email":"mail@mail.com",
"phone":"+11234567890",
"license": MUBERT_LICENSE,
"token": MUBERT_TOKEN,
}
})
rdata = json.loads(r.text)
assert rdata['status'] == 1, "probably incorrect e-mail"
pat = rdata['data']['pat']
#print(f"pat: {pat}")
return pat
def get_music(pat, prompt, track_duration, gen_intensity, gen_mode):
if len(prompt) > 200:
prompt = prompt[:200]
r = httpx.post('https://api-b2b.mubert.com/v2/TTMRecordTrack',
json={
"method": "TTMRecordTrack",
"params":
{
"text": prompt,
"pat": pat,
"mode":gen_mode,
"duration":track_duration,
"intensity": gen_intensity,
"format": "wav"
}
})
rdata = json.loads(r.text)
#print(f"rdata: {rdata}")
assert rdata['status'] == 1, rdata['error']['text']
track = rdata['data']['tasks'][0]['download_link']
print(track)
local_file_path = "sample.wav"
# Download the MP3 file from the URL
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7; rv:93.0) Gecko/20100101 Firefox/93.0'}
retries = 3
delay = 5 # in seconds
while retries > 0:
response = requests.get(track, headers=headers)
if response.status_code == 200:
break
retries -= 1
time.sleep(delay)
response = requests.get(track, headers=headers)
print(f"{response}")
# Save the downloaded content to a local file
with open(local_file_path, 'wb') as f:
f.write(response.content)
return "sample.wav", track
def get_results(text_prompt,track_duration,gen_intensity,gen_mode):
pat_token = get_pat_token()
music = get_music(pat_token, text_prompt, track_duration, gen_intensity, gen_mode)
return pat_token, music[0], music[1]
def get_prompts(uploaded_image, track_duration, gen_intensity, gen_mode, openai_api_key):
print("calling clip interrogator")
#prompt = img_to_text(uploaded_image, "ViT-L (best for Stable Diffusion 1.*)", "fast", fn_index=1)[0]
prompt = img_to_text(uploaded_image, 'best', 4, fn_index=1)[0]
print(prompt)
clean_prompt = clean_text(prompt)
print(f"prompt cleaned: {clean_prompt}")
musical_prompt = 'You did not use any OpenAI API key to pimp your result :)'
if openai_api_key is not None:
gpt_adaptation = try_api(prompt, openai_api_key)
if gpt_adaptation[0] != "oups":
musical_prompt = gpt_adaptation[0]
print(f"musical adapt: {musical_prompt}")
music_result = get_results(musical_prompt, track_duration, gen_intensity, gen_mode)
else:
music_result = get_results(clean_prompt, track_duration, gen_intensity, gen_mode)
else:
music_result = get_results(clean_prompt, track_duration, gen_intensity, gen_mode)
show_prompts = f"""
CLIP Interrogator Caption: '{prompt}'
—
OpenAI Musical Adaptation: '{musical_prompt}'
—
Audio file link: {music_result[2]}
"""
#wave_file = convert_mp3_to_wav(music_result[1])
time.sleep(1)
return gr.Textbox.update(value=show_prompts, visible=True), music_result[1], gr.update(visible=True), gr.update(visible=True), gr.update(visible=True)
def try_api(message, openai_api_key):
try:
response = call_api(message, openai_api_key)
return response, "no error"
except openai.error.Timeout as e:
#Handle timeout error, e.g. retry or log
#print(f"OpenAI API request timed out: {e}")
return "oups", f"OpenAI API request timed out:
{e}"
except openai.error.APIError as e:
#Handle API error, e.g. retry or log
#print(f"OpenAI API returned an API Error: {e}")
return "oups", f"OpenAI API returned an API Error:
{e}"
except openai.error.APIConnectionError as e:
#Handle connection error, e.g. check network or log
#print(f"OpenAI API request failed to connect: {e}")
return "oups", f"OpenAI API request failed to connect:
{e}"
except openai.error.InvalidRequestError as e:
#Handle invalid request error, e.g. validate parameters or log
#print(f"OpenAI API request was invalid: {e}")
return "oups", f"OpenAI API request was invalid:
{e}"
except openai.error.AuthenticationError as e:
#Handle authentication error, e.g. check credentials or log
#print(f"OpenAI API request was not authorized: {e}")
return "oups", f"OpenAI API request was not authorized:
{e}"
except openai.error.PermissionError as e:
#Handle permission error, e.g. check scope or log
#print(f"OpenAI API request was not permitted: {e}")
return "oups", f"OpenAI API request was not permitted:
{e}"
except openai.error.RateLimitError as e:
#Handle rate limit error, e.g. wait or log
#print(f"OpenAI API request exceeded rate limit: {e}")
return "oups", f"OpenAI API request exceeded rate limit:
{e}"
def call_api(message, openai_api_key):
instruction = "Convert in less than 200 characters this image caption to a very concise musical description with musical terms, as if you wanted to describe a musical ambiance, stricly in English"
print("starting open ai")
augmented_prompt = f"{instruction}: '{message}'."
openai.api_key = openai_api_key
response = openai.Completion.create(
model="text-davinci-003",
prompt=augmented_prompt,
temperature=0.5,
max_tokens=2048,
top_p=1,
frequency_penalty=0,
presence_penalty=0.6
)
#print(response)
#return str(response.choices[0].text).split("\n",2)[2]
return str(response.choices[0].text).lstrip('\n')
def get_track_by_tags(tags, pat, duration, gen_intensity, gen_mode, maxit=20):
r = httpx.post('https://api-b2b.mubert.com/v2/RecordTrackTTM',
json={
"method": "RecordTrackTTM",
"params": {
"pat": pat,
"duration": duration,
"format": "wav",
"intensity":gen_intensity,
"tags": tags,
"mode": gen_mode
}
})
rdata = json.loads(r.text)
print(rdata)
#assert rdata['status'] == 1, rdata['error']['text']
trackurl = rdata['data']['tasks'][0]
print('Generating track ', end='')
for i in range(maxit):
r = httpx.get(trackurl)
if r.status_code == 200:
return trackurl
time.sleep(1)
def generate_track_by_prompt(pat, prompt, duration, gen_intensity, gen_mode):
try:
_, tags = get_tags_for_prompts(minilm, mubert_tags_embeddings, prompt)[0]
result = get_track_by_tags(tags, pat, int(duration), gen_intensity, gen_mode)
print(result)
return result, ",".join(tags), "Success"
except Exception as e:
return None, "", str(e)
def convert_mp3_to_wav(mp3_filepath):
wave_file="file.wav"
sound = AudioSegment.from_mp3(mp3_filepath)
sound.export(wave_file, format="wav")
return wave_file
def remove_emoji(text):
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
"]+", flags=re.UNICODE)
return emoji_pattern.sub(r'', text)
def remove_nonalphanumeric(text):
return re.sub(r'[^a-zA-Z0-9\s]', '', text)
def clean_text(text):
clean_text = remove_nonalphanumeric(text)
clean_text = remove_emoji(clean_text)
clean_text = re.sub(r'\d+', '', clean_text) # Remove any number
return clean_text
article = """
You may also like:
Sends an image in to CLIP Interrogator to generate a text prompt which is then run through Mubert text-to-music to generate music from the input image!