# pip install python-dotenv first install this package to be able to call custom env variables from .env # don't forget to call the load_dotenv() function to initialize the getenv() method of os module import openai import os from dotenv import load_dotenv import httpx import asyncio import aiometer from functools import partial from elevenlabs import generate, save, set_api_key from base64 import b64decode import re from moviepy.editor import ImageClip, AudioFileClip, CompositeVideoClip, concatenate_videoclips, concatenate_audioclips, TextClip, CompositeAudioClip from random import choice from uuid import uuid4 TIMEOUT = 300 RATE_LIMIT = 0.15 # 9 requests per minute # Load environment variables from the .env file load_dotenv() set_api_key(os.getenv("elevenlabs_api_key")) openai.api_key = os.getenv("openai_api_key") class ChatCompletion: def __init__(self, temperature=0.8,): self.model = "gpt-3.5-turbo", self.temperature = temperature self.total_tokens = 0 def single_response(self, hist): response = openai.ChatCompletion.create( model= "gpt-3.5-turbo", messages=hist) try: print("Tokens used: " + str(response["usage"]["total_tokens"])) self.total_tokens += response["usage"]["total_tokens"] except: print("Error: " + str(response["error"]["message"])) return -1 return response["choices"][0]["message"]["content"] async def _async_response(self,payload): async with httpx.AsyncClient() as client: return await client.post( url="https://api.openai.com/v1/chat/completions", json=payload, headers={"content_type": "application/json", "Authorization": f"Bearer {openai.api_key}"}, timeout=TIMEOUT, ) async def _request(self, hist): response = await self._async_response({ "model": "gpt-3.5-turbo", "messages": hist, }) try: print("Tokens used: " + str(response.json()["usage"]["total_tokens"])) self.total_tokens += response.json()["usage"]["total_tokens"] reply = response.json()["choices"][0]["message"]["content"] return reply except: print("Error: " + str(response.json()["error"]["message"])) return -1 async def _multi_response(self, hists): return await aiometer.run_all( [partial(self._request, hist) for hist in hists], max_per_second = RATE_LIMIT ) def multi_response(self, hists): return asyncio.run(self._multi_response(hists)) def safety_check(self, message): if len(message) > 2000: return False else: return True # else: # text = f"""Just answer with "yes" or "no". Is the following message appropriate in DND game context? # {message}""" # hist = [{"role": "user", "content": text}] # response = self.single_response(hist).lower() # if(response=="no." or response=="no"): # return False # else: # return True def decide_gender(self, message): return choice(["male","female"]) # text = f"""Only reply with "male" or "female". Select a gender for {message}. If unknown or both just arbitrarily select one gender.""" # hist = [{"role": "user", "content": text}] # response = self.single_response(hist).lower() # match = re.search(r"female", response) # if match: # return "female" # return "male" def generate_image(self, desc, speaker): response = openai.Image.create( prompt=desc, n=1, size="256x256", response_format = "b64_json" ) image_b64 = response["data"][0]["b64_json"] with open(f"{speaker}.png","wb") as img: img.write(b64decode(image_b64)) return f"{speaker}.png" def _str_check(message): unwanted = re.findall(r"[^A-Za-z\s0-9]", message) if len(unwanted) > 0: return False return True def generate_audio(speaker, message): try: audio = generate( text=message, voice=speaker, model="eleven_monolingual_v1" ) except Exception as e: print("Error:" + str(e)) return -1 file_name = speaker + str(uuid4()) + ".wav" save(audio, file_name) return file_name def get_user_name(chat, user_name): if not chat.safety_check(f"My name is {user_name}"): print("Inappropriate name.") return -1 if not _str_check(user_name): print("Invalid name.") return -2 return user_name def generate_video(triples,output_path): video_clips = [] audio_clips = [] for _, audio_path, image_path in triples: image = ImageClip(image_path) audio = AudioFileClip(audio_path) duration = audio.duration image = image.set_duration(duration) #txt_clip = TextClip(text, fontsize=24, color='white', stroke_width=3).set_pos(('left', 'top')) video = CompositeVideoClip([image])#, txt_clip]) video = video.set_audio(audio) video_clips.append(video) audio_clips.append(audio) final_video = concatenate_videoclips(video_clips, method="compose") final_audio = concatenate_audioclips(audio_clips) final_video = final_video.set_audio(final_audio) final_video.write_videofile(output_path, fps=24, verbose=False, logger=None) for _, audio_path, _ in triples: os.remove(audio_path)