Spaces:
Sleeping
Sleeping
import gradio as gr | |
from llama_cpp import Llama | |
import os | |
from groq import Groq | |
import numpy as np | |
import wave | |
import uuid | |
from GoogleTTS import GoogleTTS | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
#tts | |
#import torchaudio | |
#from speechbrain.inference.TTS import FastSpeech2 | |
# from speechbrain.inference.TTS import Tacotron2 | |
# from speechbrain.inference.vocoders import HIFIGAN | |
#fastspeech2 = FastSpeech2.from_hparams(source="speechbrain/tts-fastspeech2-ljspeech", savedir="pretrained_models/tts-fastspeech2-ljspeech") | |
# tacotron2 = Tacotron2.from_hparams(source="speechbrain/tts-tacotron2-ljspeech", savedir="tmpdir_tts") | |
# hifi_gan = HIFIGAN.from_hparams(source="speechbrain/tts-hifigan-ljspeech", savedir="pretrained_models/tts-hifigan-ljspeech") | |
#google tts | |
tts = GoogleTTS() | |
def text_to_speech(text): | |
# mel_output, mel_length, alignment = tacotron2.encode_text(text) | |
# Running Vocoder (spectrogram-to-waveform) | |
# waveforms = hifi_gan.decode_batch(mel_output) | |
# Save the waverform | |
outfile = f"{os.path.join(os.getcwd(), str(uuid.uuid4()))}.wav" | |
# torchaudio.save(outfile, waveforms.squeeze(1), 22050) | |
if len(text) > 5000: | |
text_str = text[0:4999] | |
else: | |
text_str = text | |
ret = tts.tts(text_str, outfile) | |
return outfile | |
def combine_audio_files(audio_files): | |
data= [] | |
outfile = "sounds.wav" | |
for infile in audio_files: | |
w = wave.open(infile, 'rb') | |
data.append([w.getparams(), w.readframes(w.getnframes())] ) | |
w.close() | |
#os.remove(infile) # Remove temporary files | |
output = wave.open(outfile, 'wb') | |
output.setparams(data[0][0]) | |
for i in range(len(data)): | |
output.writeframes(data[i][1]) | |
output.close() | |
return outfile | |
client = Groq( | |
api_key=os.getenv("GROQ_API_KEY"), | |
) | |
llm = Llama.from_pretrained( | |
repo_id="amir22010/fine_tuned_product_marketing_email_gemma_2_9b_q4_k_m", #custom fine tuned model | |
filename="unsloth.Q4_K_M.gguf", #model file name | |
cache_dir=os.path.abspath(os.getcwd()), | |
n_ctx=2048, | |
n_batch=126, | |
verbose=False | |
) | |
#marketing prompt | |
marketing_email_prompt = """Below is a product and description, please write a marketing email for this product. | |
### Product: | |
{} | |
### Description: | |
{} | |
### Marketing Email: | |
{}""" | |
async def greet(product,description): | |
user_reques = marketing_email_prompt.format( | |
product, # product | |
description, # description | |
"", # output - leave this blank for generation! | |
) | |
#llama guard | |
chat_completion = client.chat.completions.create( | |
messages=[ | |
{ | |
"role": "user", | |
"content": product+"\n"+description | |
} | |
], | |
model="llama-guard-3-8b", | |
) | |
warning_message = chat_completion.choices[0].message.content | |
if warning_message != 'safe': | |
yield warning_message | |
else: | |
output = llm.create_chat_completion( | |
messages=[ | |
{"role":"system","content": "Your go-to Email Marketing Guru - I'm here to help you craft short and concise compelling campaigns, boost conversions, and take your business to the next level."}, | |
{"role": "user", "content": user_reques}, | |
], | |
max_tokens=2048, | |
temperature=0.7, | |
stream=True | |
) | |
partial_message = "" | |
audio_list = [] | |
for chunk in output: | |
print(chunk) | |
delta = chunk['choices'][0]['delta'] | |
if 'content' in delta: | |
partial_message = partial_message + delta.get('content', '') | |
yield partial_message | |
# | |
demo = gr.Interface(fn=greet, inputs=["text","text"], outputs="text") | |
demo.launch() |