import os from oss_utils import ossService import requests import random import json import time from diffusers.utils import load_image from PIL import Image, ImageDraw, ImageFont import numpy as np # oss config BUCKET = os.environ.get('BUCKET', '') ENDPOINT = os.environ.get('ENDPOINT', '') PREFIX = os.environ.get('OSS_DIR_PREFIX', '') AK = os.environ.get('AK', '') SK = os.environ.get('SK', '') DASHONE_SERVICE_ID = os.environ.get('SERVICE_ID', '') URL = os.environ.get('URL', '') GET_URL = os.environ.get('GET_URL', '') OUTPUT_PATH = './output' os.makedirs(OUTPUT_PATH, exist_ok=True) oss_service = ossService(AK, SK, ENDPOINT, BUCKET, PREFIX) def async_request_and_query(data, request_id): url = URL headers = {'Content-Type': 'application/json'} # 1.发起一个异步请求 print('Start sending request') response = requests.post(url, headers=headers, data=json.dumps(data)) if response.status_code != requests.codes.ok: response.raise_for_status() response_json = json.loads(response.content.decode("utf-8")) print('Finish sending request') # 2.异步查询结果 is_running = True running_print_count = 0 sign_oss_path = None task_id = response_json['header']['task_id'] get_url = GET_URL get_header = headers get_data = {"header": {"request_id":request_id,"service_id":DASHONE_SERVICE_ID,"task_id":f"{task_id}"}} print('Start querying results') while is_running: response2 = requests.post(get_url, headers=get_header, data=json.dumps(get_data)) if response2.status_code != requests.codes.ok: response2.raise_for_status() response2_json = json.loads(response2.content.decode("utf-8")) task_status = response2_json['header']['task_status'] if task_status == 'SUCCESS': sign_oss_path = response2_json['payload']['output']['res'] break elif task_status in ['FAILED', 'ERROR'] or running_print_count >= 120: raise ValueError(f'Task Failed') else: time.sleep(1) running_print_count += 1 continue print('Task succeeded!') return sign_oss_path # def add_transparent_watermark(pil_image, watermark_text, position, opacity, font_path, font_size): # # 加载字体 # font = ImageFont.truetype(font_path, font_size) # # 创建一个半透明的水印图层 # watermark_layer = Image.new("RGBA", pil_image.size) # draw = ImageDraw.Draw(watermark_layer) # # 文本颜色和透明度 # text_color = (255, 255, 255, opacity) # 白色文本 # outline_color = (0, 0, 0, opacity) # 黑色轮廓 # # 获取文本尺寸 # text_width = draw.textlength(watermark_text, font=font) # text_height = text_width // 5 # # 计算水印位置 # img_width, img_height = pil_image.size # x = img_width - text_width - position[0] # y = img_height - text_height - position[1] # outline_range = 1 # 轮廓的粗细 # for adj in range(-outline_range, outline_range+1): # for ord in range(-outline_range, outline_range+1): # if adj != 0 or ord != 0: # 避免中心位置,那是真正的文本 # draw.text((x+adj, y+ord), watermark_text, font=font, fill=outline_color) # # 将文本绘制到水印层上 # draw.text((x, y), watermark_text, font=font, fill=text_color) # # 将水印层叠加到原始图像上 # pil_image_with_watermark = Image.alpha_composite(pil_image.convert("RGBA"), watermark_layer) # # 返回添加了水印的图像 # return pil_image_with_watermark def inference(input_image, upscale): # process alpha channel alpha_channel = input_image.split()[-1] input_image = input_image.convert('RGB') local_save_path = os.path.join(OUTPUT_PATH, 'tmp.png') local_output_save_path = os.path.join(OUTPUT_PATH, 'out.png') input_image.save(local_save_path) # generate image url oss_key = os.path.join(PREFIX, 'tmp.png') _, image_url = oss_service.uploadOssFile(oss_key, local_save_path) # rm local file if os.path.isfile(local_save_path): os.remove(local_save_path) data = {} data_header = {} data_payload = {} data_input = {} data_para = {} data_header['request_id'] = "".join(random.sample("0123456789abcdefghijklmnopqrstuvwxyz", 10)) data_header['service_id'] = DASHONE_SERVICE_ID data_input['image_url'] = image_url data_para['upscale'] = upscale data_para['platform'] = 'modelscope' data_payload['input'] = data_input data_payload['parameters'] = data_para data['header'] = data_header data['payload'] = data_payload try: output_url = async_request_and_query(data=data, request_id=data_header['request_id']) download_status = oss_service.downloadFile(output_url, local_output_save_path) if not download_status: raise ValueError(f'Download output image failed') except: output_image = load_image('./error.png') return [output_image], 'The input image format or resolution does not meet the requirements. Please change the image or resize it and try again.' output_image = load_image(local_output_save_path) if os.path.isfile(local_output_save_path): os.remove(local_output_save_path) out_width, out_height = output_image.size # add alpha channel output_alpha_channel = alpha_channel.resize(output_image.size, resample=Image.LANCZOS) # merge output_image_alpha = Image.merge("RGBA", (*output_image.split(), output_alpha_channel)) # new_width = out_width // 4 # new_height = out_height // 4 # resized_image = np.array(input_image.resize((new_width, new_height))) # np_output_image = np.array(output_image) # np_output_image[0:new_height, 0:new_width] = resized_image.copy() # new_output_image = Image.fromarray(np_output_image) # new_output_image = add_transparent_watermark( # pil_image=new_output_image, # watermark_text="追影-放大镜", # position=(5, 5), # opacity=200, # font_path="AlibabaPuHuiTi-3-45-Light.ttf", # 例如:"Arial", "Helvetica", "Times New Roman" # font_size= out_width // 30 # ) org_width, org_height = input_image.size if max(org_width, org_height) > 1920 or min(org_width, org_height) > 1080: msg = 'The input image size has exceeded the optimal range. You can consider scaling the input image for better generation effect' else: msg = 'Task succeeded' return [output_image_alpha], msg