Spaces:
Running
Running
import gradio as gr | |
from PIL import Image | |
from io import BytesIO | |
import requests | |
import time | |
from openai import OpenAI, AzureOpenAI | |
engine = "gpt-4o" | |
api_key = "9e95c7efd64648bd86cd2fcfbe17905b" | |
api_base = "https://chatpartner-miniprogram.openai.azure.com/" | |
api_version = "2024-02-15-preview" | |
client = AzureOpenAI(api_key=api_key, azure_endpoint=api_base, api_version=api_version) | |
class ImageProcessor: | |
def __init__(self, api_key): | |
self.api_key = api_key | |
self.submit_url = 'https://test.aitanzou.com/web/api/task/submit' | |
self.result_url = 'https://test.aitanzou.com/web/api/getResult' | |
self.headers = { | |
'API-Key': self.api_key | |
} | |
def submit_images(self, image_bytes_list): | |
files = [('images', ('image.png', image_bytes, 'image/png')) for image_bytes in image_bytes_list] | |
response = requests.post(self.submit_url, headers=self.headers, files=files) | |
if response.status_code == 200: | |
data = response.json() | |
if 'data' in data and 'taskId' in data['data']: | |
task_id = data['data']['taskId'] | |
return task_id | |
else: | |
raise Exception(f'Unexpected response format: {data}') | |
else: | |
raise Exception(f'Error: {response.status_code}, {response.text}') | |
def get_result(self, task_id): | |
params = {'taskId': task_id} | |
while True: | |
result_response = requests.get(self.result_url, params=params) | |
if result_response.status_code == 200: | |
result_data = result_response.json() | |
if 'data' in result_data and 'abcPath' in result_data['data']: | |
if result_data['data']['abcPath'] is None: | |
print('Task is still pending...') | |
time.sleep(10) | |
else: | |
url = result_data['data']['abcPath'] | |
response = requests.get(url) | |
if response.status_code == 200: | |
return response.text | |
else: | |
raise Exception(f'Error retrieving file content: {response.status_code}, {response.text}') | |
else: | |
raise Exception(f'Unexpected result format: {result_data}') | |
else: | |
raise Exception(f'Error: {result_response.status_code}, {result_response.text}') | |
def process_images(self, image_bytes_list): | |
task_id = self.submit_images(image_bytes_list) | |
return self.get_result(task_id) | |
# 设置API密钥 | |
api_key = 'ddc85b14-bd83-4757-9bc4-8a11194da536' | |
image_processor = ImageProcessor(api_key) | |
# 定义处理函数 | |
def process_input(text=None, images=None, audio=None): | |
# 创建GPT请求的描述 | |
system = "1.你是一个专业的钢琴音乐教师,只能回答音乐知识,回复的内容为普通文本格式。如果提供的乐谱是abc记谱法,则回复时不要用abc记谱法,需要转换为传统的普通记谱法使用专业词汇进行回答问题2.你将根据下面指令回答问题,但是不能违反第一条指令,也不能在回复中提及。" | |
messages = [{"role": "system", "content": system}] | |
prompt = "" | |
if text: | |
prompt += f"\nText input: {text}" | |
if images: | |
# 使用ImageProcessor处理图像 | |
image_bytes_list = [] | |
for image in images: | |
img = Image.open(image) | |
image_bytes = BytesIO() | |
img.save(image_bytes, format="PNG") | |
image_bytes.seek(0) | |
image_bytes_list.append(image_bytes.getvalue()) | |
try: | |
processed_image_result = image_processor.process_images(image_bytes_list) | |
prompt += f"\n乐谱的内容如下,这是一首杜维诺伊的曲子,请你根据他的曲风回答问题: {processed_image_result}" | |
except Exception as e: | |
return f"Error processing image: {e}", None | |
if audio: | |
# 将音频转换为描述(这里简单地用占位符) | |
prompt += "\nAudio input: A description of the audio has been generated." | |
# 使用GPT API进行处理 | |
try: | |
''' | |
from zhipuai import ZhipuAI | |
client = ZhipuAI(api_key="7cfa094b2c6867229659831340af9f2c.BIl47duC5fufSY8V") # 填写您自己的APIKey | |
response = client.chat.completions.create( | |
model="glm-4", # 填写需要调用的模型名称 | |
messages=[ | |
{"role": "user", "content": prompt}, | |
], | |
) | |
''' | |
messages.append({"role": "user", "content": prompt}) | |
response = client.chat.completions.create( | |
model=engine, | |
messages=messages, | |
temperature=0.1, | |
max_tokens=4096, | |
top_p=0.95, | |
frequency_penalty=0, | |
presence_penalty=0, | |
stop=None | |
) | |
print(response.choices[0].message.content) | |
return response.choices[0].message.content.strip() | |
except Exception as e: | |
return f"Error: {e}", None | |
# 创建Gradio接口 | |
iface = gr.Interface( | |
fn=process_input, | |
inputs=[ | |
gr.Textbox(label="Input Text", placeholder="Enter text here"), | |
gr.File(label="Input Images", file_count="multiple", type="filepath"), # 支持多文件上传 | |
# gr.Audio(label="Input Audio", type="filepath"), | |
], | |
outputs=[ | |
gr.Textbox(label="Output Text"), | |
# gr.Audio(label="Output Audio") # 目前示例中未处理音频输出 | |
], | |
live=True | |
) | |
# 启动Gradio应用 | |
iface.launch() | |