test111 / scripts /liveportrait.py
killbill007's picture
Upload 1398 files
35cdf61 verified
import websocket
import uuid
import json
import urllib.request
import urllib.parse
import json
import glob
import os
class LP:
def __init__(self , listen = '127.0.0.1' , port = '6000'):
work_path = './scripts/workflow/lp.json'
with open(work_path, "r", encoding="utf-8") as f:
workflow_data = f.read()
self.lp_workflow = json.loads(workflow_data)
self.server_address = f'{listen}:{port}'
def queue_prompt(self , prompt , client_id):
p = {"prompt": prompt, "client_id": client_id}
data = json.dumps(p).encode('utf-8')
req = urllib.request.Request("http://{}/prompt".format(self.server_address), data=data)
return json.loads(urllib.request.urlopen(req).read())
def get_history(self , prompt_id):
with urllib.request.urlopen("http://{}/history/{}".format(self.server_address, prompt_id)) as response:
return json.loads(response.read())
def LP(self, client_id, image_base64, video_path):
try:
# Create a copy of the current workflow and generate a unique client_id
current_workflow = self.lp_workflow.copy()
# client_id = str(uuid.uuid4())
# Update the current workflow with the provided image and video paths
current_workflow["168"]['inputs']['filename_prefix'] = client_id
current_workflow['209']['inputs']['base64_string'] = image_base64
current_workflow['202']['inputs']['video'] = video_path
# Try connecting to the WebSocket server
try:
ws = websocket.WebSocket()
ws.connect("ws://{}/ws?clientId={}".format(self.server_address, client_id))
except Exception as e:
print(f"Error connecting to WebSocket server: {e}")
return None
# Try getting the output from the WebSocket
try:
video_output = self.get_output(ws, current_workflow, client_id)
except Exception as e:
print(f"Error during execution of workflow: {e}")
return None
# Search for files generated with the client_id prefix
try:
folder_path = os.getcwd() + '/ComfyUI/output'
beginning_string = client_id
paths = glob.glob(os.path.join(folder_path, beginning_string + '*'))
except Exception as e:
print(f"Error searching for output files: {e}")
return None
# Return the video output and paths if everything was successful
return [video_output, paths]
except Exception as e:
# Catch any unforeseen errors
print(f"An unexpected error occurred: {e}")
return None
def get_output(self, ws, prompt, client_id):
try:
# Send the prompt and wait for the response
prompt_id = self.queue_prompt(prompt, client_id)['prompt_id']
while True:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'executing':
data = message['data']
if data['node'] is None and data['prompt_id'] == prompt_id:
break
else:
continue
# Get the output file from the history
history = self.get_history(prompt_id)[prompt_id]
output_file = history['outputs']['168']['gifs'][0]['filename']
return output_file
except Exception as e:
# Handle errors during WebSocket communication or workflow processing
print(f"Error in get_output function: {e}")
return None
class FM:
def __init__(self , listen = '127.0.0.1' , port = '6000'):
work_path = './scripts/workflow/face.json'
with open(work_path, "r", encoding="utf-8") as f:
workflow_data = f.read()
self.lp_workflow = json.loads(workflow_data)
self.server_address = f'{listen}:{port}'
def queue_prompt(self , prompt , client_id):
p = {"prompt": prompt, "client_id": client_id}
data = json.dumps(p).encode('utf-8')
req = urllib.request.Request("http://{}/prompt".format(self.server_address), data=data)
return json.loads(urllib.request.urlopen(req).read())
def get_history(self , prompt_id):
with urllib.request.urlopen("http://{}/history/{}".format(self.server_address, prompt_id)) as response:
return json.loads(response.read())
def FM(self, client_id, image_base64, rotate_pitch=0, rotate_yaw=0, rotate_roll=0,
blink=0, eyebrow=0, wink=0, pupil_x=0, pupil_y=0, aaa=0, eee=0, woo=0, smile=0, src_ratio=1):
try:
# Create a copy of the current workflow and generate a unique client_id
current_workflow = self.lp_workflow.copy()
# client_id = str(uuid.uuid4())
# Update the current workflow with the provided image and video paths
current_workflow['37']['inputs']['base64_string'] = image_base64
current_workflow['45']['inputs']['filename_prefix'] = client_id
current_workflow['14']['inputs']['rotate_pitch'] = self.validate_range(rotate_pitch, -20, 20)
current_workflow['14']['inputs']['rotate_yaw'] = self.validate_range(rotate_yaw, -20, 20)
current_workflow['14']['inputs']['rotate_roll'] = self.validate_range(rotate_roll, -20, 20)
current_workflow['14']['inputs']['blink'] = self.validate_range(blink, -20, 5)
current_workflow['14']['inputs']['eyebrow'] = self.validate_range(eyebrow, -10, 15)
current_workflow['14']['inputs']['wink'] = self.validate_range(wink, 0, 25)
current_workflow['14']['inputs']['pupil_x'] = self.validate_range(pupil_x, -15, 15)
current_workflow['14']['inputs']['pupil_y'] = self.validate_range(pupil_y, -15, 15)
current_workflow['14']['inputs']['aaa'] = self.validate_range(aaa, -30, 120)
current_workflow['14']['inputs']['eee'] = self.validate_range(eee, -20, 15)
current_workflow['14']['inputs']['woo'] = self.validate_range(woo, -20, 15)
current_workflow['14']['inputs']['smile'] = self.validate_range(smile, -0.30, 1.30)
current_workflow['14']['inputs']['src_ratio'] = self.validate_range(smile, 0.0, 1)
current_workflow['14']['inputs']['crop_factor'] = 2.5
current_workflow['14']['inputs']['sample_ratio'] = 1.20
current_workflow['14']['inputs']['sample_parts'] = 'All'
# Try connecting to the WebSocket server
try:
ws = websocket.WebSocket()
ws.connect("ws://{}/ws?clientId={}".format(self.server_address, client_id))
except Exception as e:
print(f"Error connecting to WebSocket server: {e}")
return None
# Try getting the output from the WebSocket
try:
video_output = self.get_output(ws, current_workflow, client_id)
except Exception as e:
print(f"Error during execution of workflow: {e}")
return None
# Search for files generated with the client_id prefix
try:
folder_path = os.getcwd() + '/ComfyUI/output'
beginning_string = client_id
paths = glob.glob(os.path.join(folder_path, beginning_string + '*'))
except Exception as e:
print(f"Error searching for output files: {e}")
return None
# Return the video output and paths if everything was successful
return [video_output, paths]
except Exception as e:
# Catch any unforeseen errors
print(f"An unexpected error occurred: {e}")
return None
def get_output(self, ws, prompt, client_id):
try:
# Send the prompt and wait for the response
prompt_id = self.queue_prompt(prompt, client_id)['prompt_id']
while True:
out = ws.recv()
if isinstance(out, str):
message = json.loads(out)
if message['type'] == 'executing':
data = message['data']
if data['node'] is None and data['prompt_id'] == prompt_id:
break
else:
continue
# Get the output file from the history
history = self.get_history(prompt_id)[prompt_id]
output_file = history['outputs']['45']['images'][0]['filename']
return output_file
except Exception as e:
# Handle errors during WebSocket communication or workflow processing
print(f"Error in get_output function: {e}")
return None
def validate_range(self, value, min_value, max_value):
if value < min_value:
return min_value
elif value > max_value:
return max_value
return value