| | """ |
| | Veo3 API客户端 |
| | """ |
| |
|
| | import json |
| | import time |
| | import os |
| | import uuid |
| | import base64 |
| | import requests |
| | from typing import List, Tuple, Optional |
| |
|
| | from ..config import API_CONFIG, MIME_TYPES |
| |
|
| |
|
| | class Veo3Client: |
| | """Veo3 API客户端类""" |
| | |
| | def __init__(self, api_key: str): |
| | self.api_key = api_key |
| | self.headers = { |
| | "Authorization": f"Bearer {api_key}", |
| | "Content-Type": "application/json" |
| | } |
| | |
| | def upload_file(self, file_path: str) -> Tuple[bool, str]: |
| | """ |
| | 上传文件到 KIE AI 的文件存储服务(使用Base64上传) |
| | 返回文件的公开访问URL |
| | """ |
| | url = API_CONFIG["FILE_UPLOAD_URL"] |
| | headers = self.headers.copy() |
| |
|
| | |
| | file_name = os.path.basename(file_path) |
| | file_ext = os.path.splitext(file_name)[1].lower() |
| | if file_ext is None or file_ext == "": |
| | file_ext = ".jpg" |
| | file_name = uuid.uuid4().hex + file_ext |
| |
|
| | |
| | mime_type = MIME_TYPES.get(file_ext, 'image/jpeg') |
| |
|
| | try: |
| | print("hellow") |
| | with open(file_path, 'rb') as file_handle: |
| | |
| | file_data = file_handle.read() |
| | base64_data = base64.b64encode(file_data).decode('utf-8') |
| | |
| | |
| | data_url = f"data:{mime_type};base64,{base64_data}" |
| | |
| | |
| | payload = { |
| | "base64Data": data_url, |
| | "uploadPath": "images/veo3", |
| | "fileName": file_name |
| | } |
| |
|
| | response = requests.post( |
| | url, |
| | headers=headers, |
| | json=payload, |
| | timeout=API_CONFIG["TIMEOUT"] |
| | ) |
| | print(response.status_code) |
| | if response.status_code == 200: |
| | resp_json = response.json() |
| | print(resp_json) |
| | if resp_json.get("success") and resp_json.get("data"): |
| | |
| | return True, resp_json["data"]["downloadUrl"] |
| | else: |
| | return False, resp_json.get("msg", "Upload failed") |
| | else: |
| | return False, f"Upload error, please try again later" |
| | except Exception as e: |
| | return False, f"Upload error: {str(e)}" |
| |
|
| | def create_task( |
| | self, |
| | prompt: str, |
| | image_urls: List[str] = None, |
| | aspect_ratio: str = "16:9", |
| | watermark: str = "", |
| | seeds: Optional[int] = None, |
| | enable_fallback: bool = False |
| | ) -> Tuple[int, str]: |
| | """ |
| | 创建Veo3视频生成任务 |
| | """ |
| | headers = { |
| | "Content-Type": "application/json", |
| | "Authorization": f"Bearer {self.api_key}" |
| | } |
| | url = API_CONFIG["VEO3_GENERATE_URL"] |
| | |
| | |
| | payload = { |
| | "prompt": prompt, |
| | "model": "veo3", |
| | "aspectRatio": aspect_ratio, |
| | "enableFallback": enable_fallback |
| | } |
| | |
| | |
| | if image_urls: |
| | payload["imageUrls"] = image_urls |
| | if watermark: |
| | payload["watermark"] = watermark |
| | if seeds is not None and seeds > 0: |
| | payload["seeds"] = seeds |
| | print(json.dumps(payload), headers) |
| | try: |
| | response = requests.post( |
| | url, |
| | headers=headers, |
| | data=json.dumps(payload), |
| | timeout=30 |
| | ) |
| | if response.status_code == 200: |
| | resp_json = response.json() |
| | print(resp_json) |
| | if resp_json.get("code") == 200: |
| | return 200, resp_json["data"]["taskId"] |
| | else: |
| | return 500, resp_json.get("msg", "API request failed") |
| | else: |
| | return response.status_code, f"HTTP {response.status_code}: {response.text}" |
| | except Exception as e: |
| | return 500, f"Request failed: {str(e)}" |
| |
|
| | def get_task_result(self, task_id: str) -> Tuple[int, str]: |
| | """ |
| | 获取任务结果 |
| | """ |
| | start_time = time.time() |
| | url = API_CONFIG["VEO3_DETAILS_URL"] |
| | params = {"taskId": task_id} |
| | headers = {"Authorization": f"Bearer {self.api_key}"} |
| | |
| | while time.time() - start_time < API_CONFIG["TASK_TIMEOUT"]: |
| | try: |
| | response = requests.get( |
| | url, |
| | headers=headers, |
| | params=params, |
| | timeout=API_CONFIG["TIMEOUT"] |
| | ) |
| | if response.status_code == 200: |
| | resp_json = response.json() |
| | print(resp_json) |
| | if resp_json.get("code") == 200: |
| | data = resp_json.get('data', {}) |
| | success_flag = data.get('successFlag') |
| | |
| | if success_flag == 1: |
| | |
| | response_data = data.get('response', {}) |
| | result_urls = response_data.get('resultUrls', []) |
| | if result_urls: |
| | video_url = result_urls[0] |
| | return 200, video_url |
| | else: |
| | return 500, "Video URL not found in response" |
| | elif success_flag == 2 or success_flag == 3: |
| | error_msg = data.get('errorMessage', 'Task failed') |
| | return 500, error_msg |
| | else: |
| | time.sleep(API_CONFIG["POLL_INTERVAL"]) |
| | continue |
| | else: |
| | return 500, resp_json.get("msg", "API request failed") |
| | else: |
| | return response.status_code, f"HTTP {response.status_code}: {response.text}" |
| | except Exception as e: |
| | pass |
| | time.sleep(API_CONFIG["POLL_INTERVAL"]) |
| | |
| | return 500, "Task timeout, please try again later" |
| |
|