Spaces:
Runtime error
Runtime error
import gradio as gr | |
import requests | |
import time | |
import requests | |
import base64 | |
import time | |
token = '5UAYO8UWHNQKT3UUS9H8V360L76MD72DRIUY9QC2' | |
############################################################## | |
################################################# | |
def SD_call(image_prompt, age, color, hair_color,NSFW): | |
postive = "clothes" | |
negative = "naked, nsfw, porn" | |
serverless_api_id = '3g77weiulabzuk' | |
# Define the URL you want to send the request to | |
url = f"https://api.runpod.ai/v2/{serverless_api_id}/runsync" | |
# Define your custom headers | |
headers = { | |
"Authorization": f"Bearer {token}", | |
"Accept": "application/json", | |
"Content-Type": "application/json" | |
} | |
# Define your data (this could also be a JSON payload) | |
print("SD_processing") | |
if NSFW: | |
postive = "naked, nsfw" | |
negative = "clothes" | |
data = { | |
"input": { | |
"api": { | |
"method": "POST", | |
"endpoint": "/sdapi/v1/txt2img" | |
}, | |
"payload": { | |
"override_settings": { | |
"sd_model_checkpoint": "CyberRealistic", | |
"sd_vae": "" | |
}, | |
"override_settings_restore_afterwards": True, | |
"refiner_checkpoint": "", | |
"refiner_switch_at": 0.8, | |
"prompt": f"masterpiece, best quality, 8k, (looking at viewer:1.1), gorgeous, hot, seductive, {age} years old american {color} woman, (eye contact:1.1), beautiful face, hyper detailed, best quality, ultra high res, {hair_color} hair,blue eyes, photorealistic, high resolution, detailed, raw photo, 1girl,{image_prompt}, {positive} ", | |
"negative_prompt": f"EasyNegative, fat, paintings, sketches, (worst quality:2), (low quality:2), (normal quality:2), lowres, ((monochrome)), ((grayscale)), bad anatomy, text, error, cropped, worst quality, low quality, normal quality, jpeg artifacts, signature, watermark, username, blurry, bad feet, poorly drawn face, bad proportions, gross proportions, ng_deepnegative_v1_75t, badhandsv5-neg, {negative}", | |
"seed": -1, | |
"batch_size": 1, | |
"steps": 30, | |
"cfg_scale": 7, | |
"width": 520, | |
"height": 520, | |
"sampler_name": "DPM++ SDE Karras", | |
"sampler_index": "DPM++ SDE Karras", | |
"restore_faces": False | |
} | |
} | |
} | |
# Send the POST request with headers and data | |
response = requests.post(url, headers=headers, json=data) | |
# Check the response | |
if response.status_code == 200: | |
response_data = response.json() | |
msg_id = response_data['id'] | |
print("Message ID:", msg_id) | |
# Poll the status until it's not 'IN_QUEUE' | |
while response_data['status'] == 'IN_QUEUE': | |
time.sleep(5) # Wait for 5 seconds before checking again | |
response = requests.get(f"{url}/{msg_id}", headers=headers) | |
try: | |
response_data = response.json() | |
except Exception as e: | |
print("Error decoding JSON:", e) | |
print("Response content:", response.text) | |
break # Exit the loop on JSON decoding error | |
# Check if the response contains images | |
if 'images' in response_data.get('output', {}): | |
base64_image = response_data['output']['images'][0] | |
image_bytes = base64.b64decode(base64_image) | |
# Save the image to a file | |
image_path = f"output_image_{msg_id}.png" | |
with open(image_path, "wb") as img_file: | |
img_file.write(image_bytes) | |
print(f"Image downloaded successfully: {image_path}") | |
return image_path | |
else: | |
return "No images found in the response." | |
else: | |
# Print error message | |
return f"Error: {response.status_code} - {response.text}" | |
############################################################## | |
################################################# | |
def LLM_call(message_log, temperature): | |
serverless_api_id = '4whzcbwuriohqh' | |
# Define the URL you want to send the request to | |
url = f"https://api.runpod.ai/v2/{serverless_api_id}/run" | |
# Define your custom headers | |
headers = { | |
"Authorization": f"Bearer {token}", | |
"Accept": "application/json", | |
"Content-Type": "application/json" | |
} | |
# Define your data (this could also be a JSON payload) | |
data = { | |
"input": { | |
"prompt": message_log, | |
"max_new_tokens": 4500, | |
"temperature": 0.7, | |
"top_k": 50, | |
"top_p": 0.9, | |
"repetition_penalty": 1.2, | |
"batch_size": 8, | |
"stop": ["</s>"] | |
} | |
} | |
# Send the POST request with headers and data | |
call = requests.post(url, headers=headers, json=data) | |
response_data = call.json() | |
msg_id = response_data['id'] | |
print("Message ID:", msg_id) | |
output = "Output not available" | |
# Poll the API until the response is ready | |
while True: | |
# Get the status using the message ID | |
response = requests.get(f"https://api.runpod.ai/v2/{serverless_api_id}/status/{msg_id}", headers=headers) | |
if response.status_code == 200: | |
response_data = response.json() | |
status = response_data.get('status') | |
if status == 'COMPLETED': | |
# Access the 'output' directly from the response | |
output = response_data.get('output', 'Output not available') | |
print("Response content:", output) | |
break # Exit the loop once the response is ready | |
elif status == 'FAILED': | |
error_message = response_data.get('error', 'Unknown error') | |
print("Request failed. Reason:", error_message) | |
break # Exit the loop if the request failed | |
else: | |
print("Failed to get status. HTTP status code:", response.status_code) | |
# Wait for a short time before polling again (e.g., 2 seconds) | |
time.sleep(2) | |
return output | |
def Chat_call(chat,temperature): | |
message_log = [{"role": "assistant", "content": f"I want you to act as a woman named Julie. Use colloquial language. Julie is 25 years old, she is very hot physically, she loves watching football, she loves to talk about sex. Julie loves sex and being fucked. She never refuses anything. You'll answer to a man, call him baby. Write very short answers, 15 words."},] | |
new_message = {"role": "user", "content": chat} | |
message_log.append(new_message) | |
response = LLM_call(message_log, temperature) | |
print("From Runpod:" + response) | |
return response | |
######################################################## | |
####################################################### | |
def greet(chat, image_prompt, temperature, age, color, hair_color,NSFW): | |
if image_prompt.strip(): | |
image_path = SD_call(image_prompt,age, color, hair_color,NSFW) | |
return "Image generated successfully", image_path | |
if temperature > 3: | |
return "You are too warm please try again", None | |
else: | |
text_answer = Chat_call(chat,temperature) | |
return text_answer, None | |
demo = gr.Interface( | |
fn=greet, | |
inputs=[ | |
"text", | |
gr.Textbox(label="Image", lines=3), | |
gr.Slider(label="Text temperature", value=1, minimum=0, maximum=2), | |
gr.Slider(label="Age", value=22, minimum=18, maximum=75), | |
gr.Dropdown(["asian", "white", "black", "latina"], label="Color", info="Will add more later!"), | |
gr.Dropdown(["blond", "brune", "red", "white", "pink", "black", "blue", "green"], label="Hair color", info="Blond is cool"), | |
gr.Checkbox(label="NSFW", info="πππ") | |
], | |
flagging_options=["blurry", "incorrect", "other"], | |
outputs=[gr.Textbox(label="Answer", lines=3), gr.Image(label="Generated Image", type="filepath")], | |
) | |
demo.launch(share=True) | |