smallville / worker.py
CazC's picture
Update worker.py
491765d verified
raw
history blame
5.74 kB
import subprocess
from supabase import create_client, Client
from imgGen import generateTransparentImage
import sys
sys.path.append('./TripoSR')
import TripoSR.obj_gen as obj_gen
import os
# from dotenv import load_dotenv
import time
# load_dotenv()
url: str = os.environ.get("SUPABASE_URL")
key: str = os.environ.get("SUPABASE_KEY")
supabase: Client = create_client(url, key)
fire_url = os.environ.get("FIRE_URL")
fire_bucket = os.environ.get("FIRE_BUCKET")
cred = credentials.Certificate('firekey.json')
firebase_admin.initialize_app(cred, {'storageBucket': fire_bucket})
ref = db.reference('/Tasks',url=fire_url)
bucket = storage.bucket()
IsSuperbase = False
if time.localtime().tm_mday > 15:
IsSuperbase = True
class Task:
def __init__(self,tasks):
"""Initialize the task with the first task in the list of tasks"""
self.key = tasks[0][0]
self.text = tasks[0][1]['text']
self.status = tasks[0][1]['status']
self.result = tasks[0][1]['result']
def __str__(self):
return f"Task: {self.text}, Status: {self.status}, Result: {self.result}, Key: {self.key}"
def processing(self):
ref.child(self.key).set({
"text": self.text,
"status": 'processing',
'result': ''
})
print('Processing')
def complete(self, result):
ref.child(self.key).set({
"text": self.text,
"status": 'complete',
'result': result
})
print('Complete')
def error(self):
ref.child(self.key).set({
"text": self.text,
"status": 'error',
'result': ''
})
print('Error')
def save_to_storage(self, binary_file_path, key):
try:
blob = bucket.blob(key+'.obj')
#upload but get download url as well
blob.upload_from_filename(binary_file_path)
blob.make_public()
self.complete(blob.public_url)
os.remove('task_'+str(self.key)+'.obj')
except Exception as e:
print(f"Error saving file to storage: {e}")
self.error()
return None
def check_queue():
if IsSuperbase:
try:
tasks = supabase.table("Tasks").select("*").eq("status", "pending").execute()
assert len(tasks.data) > 0
if len(tasks.data) > 0:
return tasks.data[0]
else:
return None
except Exception as e:
print(f"Error checking queue: {e}")
return None
else:
try:
response = ref.order_by_child('status').equal_to('pending').limit_to_first(1).get()
tasks = list(response.items()) # Convert tasks to a list
print(tasks)
if len(tasks) > 0:
task = Task(tasks)
print(task)
return task
else:
return None
except Exception as e:
print(f"Error checking queue: {e}")
return None
def generate_image(text):
try:
img = generateTransparentImage(text)
return img
except Exception as e:
print(f"Error generating image: {e}")
return None
def create_obj_file(img, task_id):
try:
obj_gen.generate_obj_from_image(img, 'task_'+str(task_id)+'.obj')
except Exception as e:
print(f"Error creating obj file: {e}")
supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute()
def send_back_to_supabase(task_id):
# check that a file was created
if os.path.exists('task_'+str(task_id)+'.obj'):
try:
with open('task_'+str(task_id)+'.obj', 'rb') as file:
data = file.read()
supabase.storage.from_('Results').upload('task_'+str(task_id)+'.obj', data)
public_url = supabase.storage.from_('Results').get_public_url('task_'+str(task_id)+'.obj')
supabase.table("Tasks").update({"status": "complete","result":public_url}).eq("id", task_id).execute()
os.remove('task_'+str(task_id)+'.obj')
except Exception as e:
print(f"Error sending file back to Supabase: {e}")
supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute()
else:
print(f"Error: No file was created for task {task_id}")
def worker():
while True:
task = check_queue()
if task:
if IsSuperbase:
task_id = task['id']
supabase.table("Tasks").update({"status": "processing"}).eq("id", task_id).execute()
print(f"Processing task {task_id}")
img = generate_image(task["text"])
else:
task_id = task.key
task.processing()
img = generate_image(task.text)
if img:
print(f"Image generated for task {task_id}")
create_obj_file(img,task_id)
if IsSuperbase:
send_back_to_supabase(task_id)
else:
task.save_to_storage('task_'+str(task_id)+'.obj', task_id)
print(f"Task {task_id} completed")
else:
print(f"Error generating image for task {task_id}")
if IsSuperbase:
supabase.table("Tasks").update({"status": "error"}).eq("id", task_id).execute()
else:
task.error()
else:
print("No pending tasks in the queue")
time.sleep(2) # Add a 2 second delay between checks
if __name__ == "__main__":
worker()