import subprocess from supabase import create_client, Client from imgGen import generateTransparentImage import sys sys.path.append('./TripoSR') import TripoSR.obj_gen as obj_gen import os # from dotenv import load_dotenv import time # load_dotenv() url: str = os.environ.get("SUPABASE_URL") key: str = os.environ.get("SUPABASE_KEY") supabase: Client = create_client(url, key) fire_url = os.environ.get("FIRE_URL") fire_bucket = os.environ.get("FIRE_BUCKET") cred = credentials.Certificate('firekey.json') firebase_admin.initialize_app(cred, {'storageBucket': fire_bucket}) ref = db.reference('/Tasks',url=fire_url) bucket = storage.bucket() IsSuperbase = False if time.localtime().tm_mday > 15: IsSuperbase = True class Task: def __init__(self,tasks): """Initialize the task with the first task in the list of tasks""" self.key = tasks[0][0] self.text = tasks[0][1]['text'] self.status = tasks[0][1]['status'] self.result = tasks[0][1]['result'] def __str__(self): return f"Task: {self.text}, Status: {self.status}, Result: {self.result}, Key: {self.key}" def processing(self): ref.child(self.key).set({ "text": self.text, "status": 'processing', 'result': '' }) print('Processing') def complete(self, result): ref.child(self.key).set({ "text": self.text, "status": 'complete', 'result': result }) print('Complete') def error(self): ref.child(self.key).set({ "text": self.text, "status": 'error', 'result': '' }) print('Error') def save_to_storage(self, binary_file_path, key): try: blob = bucket.blob(key+'.obj') #upload but get download url as well blob.upload_from_filename(binary_file_path) blob.make_public() self.complete(blob.public_url) os.remove('task_'+str(self.key)+'.obj') except Exception as e: print(f"Error saving file to storage: {e}") self.error() return None def check_queue(): if IsSuperbase: try: tasks = supabase.table("Tasks").select("*").eq("status", "pending").execute() assert len(tasks.data) > 0 if len(tasks.data) > 0: return tasks.data[0] else: return None except Exception as e: print(f"Error checking queue: {e}") return None else: try: response = ref.order_by_child('status').equal_to('pending').limit_to_first(1).get() tasks = list(response.items()) # Convert tasks to a list print(tasks) if len(tasks) > 0: task = Task(tasks) print(task) return task else: return None except Exception as e: print(f"Error checking queue: {e}") return None def generate_image(text): try: img = generateTransparentImage(text) return img except Exception as e: print(f"Error generating image: {e}") return None def create_obj_file(img, task_id): try: obj_gen.generate_obj_from_image(img, 'task_'+str(task_id)+'.obj') except Exception as e: print(f"Error creating obj file: {e}") supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute() def send_back_to_supabase(task_id): # check that a file was created if os.path.exists('task_'+str(task_id)+'.obj'): try: with open('task_'+str(task_id)+'.obj', 'rb') as file: data = file.read() supabase.storage.from_('Results').upload('task_'+str(task_id)+'.obj', data) public_url = supabase.storage.from_('Results').get_public_url('task_'+str(task_id)+'.obj') supabase.table("Tasks").update({"status": "complete","result":public_url}).eq("id", task_id).execute() os.remove('task_'+str(task_id)+'.obj') except Exception as e: print(f"Error sending file back to Supabase: {e}") supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute() else: print(f"Error: No file was created for task {task_id}") def worker(): while True: task = check_queue() if task: if IsSuperbase: task_id = task['id'] supabase.table("Tasks").update({"status": "processing"}).eq("id", task_id).execute() print(f"Processing task {task_id}") img = generate_image(task["text"]) else: task_id = task.key task.processing() img = generate_image(task.text) if img: print(f"Image generated for task {task_id}") create_obj_file(img,task_id) if IsSuperbase: send_back_to_supabase(task_id) else: task.save_to_storage('task_'+str(task_id)+'.obj', task_id) print(f"Task {task_id} completed") else: print(f"Error generating image for task {task_id}") if IsSuperbase: supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute() else: task.error() else: print("No pending tasks in the queue") time.sleep(2) # Add a 2 second delay between checks if __name__ == "__main__": worker()