Spaces:
Sleeping
Sleeping
import subprocess | |
from supabase import create_client, Client | |
from imgGen import generateTransparentImage | |
import sys | |
sys.path.append('./TripoSR') | |
import TripoSR.obj_gen as obj_gen | |
import os | |
# from dotenv import load_dotenv | |
import time | |
# load_dotenv() | |
url: str = os.environ.get("SUPABASE_URL") | |
key: str = os.environ.get("SUPABASE_KEY") | |
supabase: Client = create_client(url, key) | |
fire_url = os.environ.get("FIRE_URL") | |
fire_bucket = os.environ.get("FIRE_BUCKET") | |
cred = credentials.Certificate('firekey.json') | |
firebase_admin.initialize_app(cred, {'storageBucket': fire_bucket}) | |
ref = db.reference('/Tasks',url=fire_url) | |
bucket = storage.bucket() | |
IsSuperbase = False | |
if time.localtime().tm_mday > 15: | |
IsSuperbase = True | |
class Task: | |
def __init__(self,tasks): | |
"""Initialize the task with the first task in the list of tasks""" | |
self.key = tasks[0][0] | |
self.text = tasks[0][1]['text'] | |
self.status = tasks[0][1]['status'] | |
self.result = tasks[0][1]['result'] | |
def __str__(self): | |
return f"Task: {self.text}, Status: {self.status}, Result: {self.result}, Key: {self.key}" | |
def processing(self): | |
ref.child(self.key).set({ | |
"text": self.text, | |
"status": 'processing', | |
'result': '' | |
}) | |
print('Processing') | |
def complete(self, result): | |
ref.child(self.key).set({ | |
"text": self.text, | |
"status": 'complete', | |
'result': result | |
}) | |
print('Complete') | |
def error(self): | |
ref.child(self.key).set({ | |
"text": self.text, | |
"status": 'error', | |
'result': '' | |
}) | |
print('Error') | |
def save_to_storage(self, binary_file_path, key): | |
try: | |
blob = bucket.blob(key+'.obj') | |
#upload but get download url as well | |
blob.upload_from_filename(binary_file_path) | |
blob.make_public() | |
self.complete(blob.public_url) | |
os.remove('task_'+str(self.key)+'.obj') | |
except Exception as e: | |
print(f"Error saving file to storage: {e}") | |
self.error() | |
return None | |
def check_queue(): | |
if IsSuperbase: | |
try: | |
tasks = supabase.table("Tasks").select("*").eq("status", "pending").execute() | |
assert len(tasks.data) > 0 | |
if len(tasks.data) > 0: | |
return tasks.data[0] | |
else: | |
return None | |
except Exception as e: | |
print(f"Error checking queue: {e}") | |
return None | |
else: | |
try: | |
response = ref.order_by_child('status').equal_to('pending').limit_to_first(1).get() | |
tasks = list(response.items()) # Convert tasks to a list | |
print(tasks) | |
if len(tasks) > 0: | |
task = Task(tasks) | |
print(task) | |
return task | |
else: | |
return None | |
except Exception as e: | |
print(f"Error checking queue: {e}") | |
return None | |
def generate_image(text): | |
try: | |
img = generateTransparentImage(text) | |
return img | |
except Exception as e: | |
print(f"Error generating image: {e}") | |
return None | |
def create_obj_file(img, task_id): | |
try: | |
obj_gen.generate_obj_from_image(img, 'task_'+str(task_id)+'.obj') | |
except Exception as e: | |
print(f"Error creating obj file: {e}") | |
supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute() | |
def send_back_to_supabase(task_id): | |
# check that a file was created | |
if os.path.exists('task_'+str(task_id)+'.obj'): | |
try: | |
with open('task_'+str(task_id)+'.obj', 'rb') as file: | |
data = file.read() | |
supabase.storage.from_('Results').upload('task_'+str(task_id)+'.obj', data) | |
public_url = supabase.storage.from_('Results').get_public_url('task_'+str(task_id)+'.obj') | |
supabase.table("Tasks").update({"status": "complete","result":public_url}).eq("id", task_id).execute() | |
os.remove('task_'+str(task_id)+'.obj') | |
except Exception as e: | |
print(f"Error sending file back to Supabase: {e}") | |
supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute() | |
else: | |
print(f"Error: No file was created for task {task_id}") | |
def worker(): | |
while True: | |
task = check_queue() | |
if task: | |
if IsSuperbase: | |
task_id = task['id'] | |
supabase.table("Tasks").update({"status": "processing"}).eq("id", task_id).execute() | |
print(f"Processing task {task_id}") | |
img = generate_image(task["text"]) | |
else: | |
task_id = task.key | |
task.processing() | |
img = generate_image(task.text) | |
if img: | |
print(f"Image generated for task {task_id}") | |
create_obj_file(img,task_id) | |
if IsSuperbase: | |
send_back_to_supabase(task_id) | |
else: | |
task.save_to_storage('task_'+str(task_id)+'.obj', task_id) | |
print(f"Task {task_id} completed") | |
else: | |
print(f"Error generating image for task {task_id}") | |
if IsSuperbase: | |
supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute() | |
else: | |
task.error() | |
else: | |
print("No pending tasks in the queue") | |
time.sleep(2) # Add a 2 second delay between checks | |
if __name__ == "__main__": | |
worker() |