|
|
import websocket |
|
|
import uuid |
|
|
import json |
|
|
import urllib.request |
|
|
import urllib.parse |
|
|
|
|
|
import json |
|
|
|
|
|
import glob |
|
|
import os |
|
|
|
|
|
class LP: |
|
|
def __init__(self , listen = '127.0.0.1' , port = '6000'): |
|
|
work_path = './scripts/workflow/lp.json' |
|
|
with open(work_path, "r", encoding="utf-8") as f: |
|
|
workflow_data = f.read() |
|
|
self.lp_workflow = json.loads(workflow_data) |
|
|
self.server_address = f'{listen}:{port}' |
|
|
|
|
|
def queue_prompt(self , prompt , client_id): |
|
|
p = {"prompt": prompt, "client_id": client_id} |
|
|
data = json.dumps(p).encode('utf-8') |
|
|
req = urllib.request.Request("http://{}/prompt".format(self.server_address), data=data) |
|
|
return json.loads(urllib.request.urlopen(req).read()) |
|
|
|
|
|
def get_history(self , prompt_id): |
|
|
with urllib.request.urlopen("http://{}/history/{}".format(self.server_address, prompt_id)) as response: |
|
|
return json.loads(response.read()) |
|
|
|
|
|
|
|
|
def LP(self, client_id, image_base64, video_path): |
|
|
try: |
|
|
|
|
|
current_workflow = self.lp_workflow.copy() |
|
|
|
|
|
|
|
|
|
|
|
current_workflow["168"]['inputs']['filename_prefix'] = client_id |
|
|
current_workflow['209']['inputs']['base64_string'] = image_base64 |
|
|
current_workflow['202']['inputs']['video'] = video_path |
|
|
|
|
|
|
|
|
try: |
|
|
ws = websocket.WebSocket() |
|
|
ws.connect("ws://{}/ws?clientId={}".format(self.server_address, client_id)) |
|
|
except Exception as e: |
|
|
print(f"Error connecting to WebSocket server: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
try: |
|
|
video_output = self.get_output(ws, current_workflow, client_id) |
|
|
except Exception as e: |
|
|
print(f"Error during execution of workflow: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
try: |
|
|
folder_path = os.getcwd() + '/ComfyUI/output' |
|
|
beginning_string = client_id |
|
|
paths = glob.glob(os.path.join(folder_path, beginning_string + '*')) |
|
|
except Exception as e: |
|
|
print(f"Error searching for output files: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
return [video_output, paths] |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
print(f"An unexpected error occurred: {e}") |
|
|
return None |
|
|
|
|
|
def get_output(self, ws, prompt, client_id): |
|
|
try: |
|
|
|
|
|
prompt_id = self.queue_prompt(prompt, client_id)['prompt_id'] |
|
|
while True: |
|
|
out = ws.recv() |
|
|
if isinstance(out, str): |
|
|
message = json.loads(out) |
|
|
if message['type'] == 'executing': |
|
|
data = message['data'] |
|
|
if data['node'] is None and data['prompt_id'] == prompt_id: |
|
|
break |
|
|
else: |
|
|
continue |
|
|
|
|
|
|
|
|
history = self.get_history(prompt_id)[prompt_id] |
|
|
output_file = history['outputs']['168']['gifs'][0]['filename'] |
|
|
|
|
|
return output_file |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
print(f"Error in get_output function: {e}") |
|
|
return None |
|
|
|
|
|
class FM: |
|
|
def __init__(self , listen = '127.0.0.1' , port = '6000'): |
|
|
work_path = './scripts/workflow/face.json' |
|
|
with open(work_path, "r", encoding="utf-8") as f: |
|
|
workflow_data = f.read() |
|
|
self.lp_workflow = json.loads(workflow_data) |
|
|
self.server_address = f'{listen}:{port}' |
|
|
|
|
|
def queue_prompt(self , prompt , client_id): |
|
|
p = {"prompt": prompt, "client_id": client_id} |
|
|
data = json.dumps(p).encode('utf-8') |
|
|
req = urllib.request.Request("http://{}/prompt".format(self.server_address), data=data) |
|
|
return json.loads(urllib.request.urlopen(req).read()) |
|
|
|
|
|
def get_history(self , prompt_id): |
|
|
with urllib.request.urlopen("http://{}/history/{}".format(self.server_address, prompt_id)) as response: |
|
|
return json.loads(response.read()) |
|
|
|
|
|
|
|
|
def FM(self, client_id, image_base64, rotate_pitch=0, rotate_yaw=0, rotate_roll=0, |
|
|
blink=0, eyebrow=0, wink=0, pupil_x=0, pupil_y=0, aaa=0, eee=0, woo=0, smile=0, src_ratio=1): |
|
|
try: |
|
|
|
|
|
current_workflow = self.lp_workflow.copy() |
|
|
|
|
|
|
|
|
|
|
|
current_workflow['37']['inputs']['base64_string'] = image_base64 |
|
|
current_workflow['45']['inputs']['filename_prefix'] = client_id |
|
|
|
|
|
current_workflow['14']['inputs']['rotate_pitch'] = self.validate_range(rotate_pitch, -20, 20) |
|
|
current_workflow['14']['inputs']['rotate_yaw'] = self.validate_range(rotate_yaw, -20, 20) |
|
|
current_workflow['14']['inputs']['rotate_roll'] = self.validate_range(rotate_roll, -20, 20) |
|
|
current_workflow['14']['inputs']['blink'] = self.validate_range(blink, -20, 5) |
|
|
current_workflow['14']['inputs']['eyebrow'] = self.validate_range(eyebrow, -10, 15) |
|
|
current_workflow['14']['inputs']['wink'] = self.validate_range(wink, 0, 25) |
|
|
current_workflow['14']['inputs']['pupil_x'] = self.validate_range(pupil_x, -15, 15) |
|
|
current_workflow['14']['inputs']['pupil_y'] = self.validate_range(pupil_y, -15, 15) |
|
|
current_workflow['14']['inputs']['aaa'] = self.validate_range(aaa, -30, 120) |
|
|
current_workflow['14']['inputs']['eee'] = self.validate_range(eee, -20, 15) |
|
|
current_workflow['14']['inputs']['woo'] = self.validate_range(woo, -20, 15) |
|
|
current_workflow['14']['inputs']['smile'] = self.validate_range(smile, -0.30, 1.30) |
|
|
|
|
|
current_workflow['14']['inputs']['src_ratio'] = self.validate_range(smile, 0.0, 1) |
|
|
current_workflow['14']['inputs']['crop_factor'] = 2.5 |
|
|
current_workflow['14']['inputs']['sample_ratio'] = 1.20 |
|
|
current_workflow['14']['inputs']['sample_parts'] = 'All' |
|
|
|
|
|
|
|
|
try: |
|
|
ws = websocket.WebSocket() |
|
|
ws.connect("ws://{}/ws?clientId={}".format(self.server_address, client_id)) |
|
|
except Exception as e: |
|
|
print(f"Error connecting to WebSocket server: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
try: |
|
|
video_output = self.get_output(ws, current_workflow, client_id) |
|
|
except Exception as e: |
|
|
print(f"Error during execution of workflow: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
try: |
|
|
folder_path = os.getcwd() + '/ComfyUI/output' |
|
|
beginning_string = client_id |
|
|
paths = glob.glob(os.path.join(folder_path, beginning_string + '*')) |
|
|
except Exception as e: |
|
|
print(f"Error searching for output files: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
return [video_output, paths] |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
print(f"An unexpected error occurred: {e}") |
|
|
return None |
|
|
|
|
|
def get_output(self, ws, prompt, client_id): |
|
|
try: |
|
|
|
|
|
prompt_id = self.queue_prompt(prompt, client_id)['prompt_id'] |
|
|
while True: |
|
|
out = ws.recv() |
|
|
if isinstance(out, str): |
|
|
message = json.loads(out) |
|
|
if message['type'] == 'executing': |
|
|
data = message['data'] |
|
|
if data['node'] is None and data['prompt_id'] == prompt_id: |
|
|
break |
|
|
else: |
|
|
continue |
|
|
|
|
|
|
|
|
history = self.get_history(prompt_id)[prompt_id] |
|
|
output_file = history['outputs']['45']['images'][0]['filename'] |
|
|
|
|
|
return output_file |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
print(f"Error in get_output function: {e}") |
|
|
return None |
|
|
|
|
|
def validate_range(self, value, min_value, max_value): |
|
|
if value < min_value: |
|
|
return min_value |
|
|
elif value > max_value: |
|
|
return max_value |
|
|
return value |