lumapi2 / app.py
seawolf2357's picture
Update app.py
7bb7551 verified
import gradio as gr
import os
from lumaai import AsyncLumaAI
import asyncio
import aiohttp
from transformers import pipeline
# ๋ฒˆ์—ญ ํŒŒ์ดํ”„๋ผ์ธ ์ดˆ๊ธฐํ™”
translator = pipeline("translation", model="Helsinki-NLP/opus-mt-ko-en")
async def get_camera_motions():
api_key = os.getenv("LMGEN_KEY")
if not api_key:
raise gr.Error("LMGEN_KEY environment variable is not set.")
client = AsyncLumaAI(auth_token=api_key)
try:
motions = await client.generations.camera_motion.list()
return [str(motion) for motion in motions] # Convert each motion to a string
except Exception as e:
print(f"Error fetching camera motions: {str(e)}")
return []
async def generate_video(client, prompt, loop=False, aspect_ratio="16:9", camera_motion=None, extend_id=None, reverse_extend_id=None, interpolate_ids=None, progress=gr.Progress()):
# aspect_ratio์—์„œ ์‹ค์ œ ๋น„์œจ๋งŒ ์ถ”์ถœ
aspect_ratio = aspect_ratio.split()[0]
generation_params = {
"prompt": prompt,
"loop": loop,
"aspect_ratio": aspect_ratio
}
if camera_motion:
generation_params["prompt"] += f" {camera_motion}"
if extend_id:
generation_params["keyframes"] = {
"frame0": {
"type": "generation",
"id": extend_id
}
}
elif reverse_extend_id:
generation_params["keyframes"] = {
"frame1": {
"type": "generation",
"id": reverse_extend_id
}
}
elif interpolate_ids:
generation_params["keyframes"] = {
"frame0": {
"type": "generation",
"id": interpolate_ids[0]
},
"frame1": {
"type": "generation",
"id": interpolate_ids[1]
}
}
progress(0, desc="๋น„๋””์˜ค ์ƒ์„ฑ ์‹œ์ž‘ ์ค‘...")
generation = await client.generations.create(**generation_params)
progress(0.1, desc="๋น„๋””์˜ค ์ƒ์„ฑ ์‹œ์ž‘๋จ. ์™„๋ฃŒ ๋Œ€๊ธฐ ์ค‘...")
# Poll for completion
start_time = asyncio.get_event_loop().time()
while True:
status = await client.generations.get(id=generation.id)
if status.state == "completed":
break
elif status.state == "failed":
raise Exception("๋น„๋””์˜ค ์ƒ์„ฑ ์‹คํŒจ")
# Update progress based on time elapsed (assuming 60 seconds total)
elapsed_time = asyncio.get_event_loop().time() - start_time
progress_value = min(0.1 + (elapsed_time / 60) * 0.8, 0.9)
progress(progress_value, desc="๋น„๋””์˜ค ์ƒ์„ฑ ์ค‘...")
await asyncio.sleep(5)
progress(0.9, desc="์ƒ์„ฑ๋œ ๋น„๋””์˜ค ๋‹ค์šด๋กœ๋“œ ์ค‘...")
# Download the video
video_url = status.assets.video
async with aiohttp.ClientSession() as session:
async with session.get(video_url) as resp:
if resp.status == 200:
file_name = f"luma_ai_generated_{generation.id}.mp4"
with open(file_name, 'wb') as fd:
while True:
chunk = await resp.content.read(1024)
if not chunk:
break
fd.write(chunk)
progress(1.0, desc="๋น„๋””์˜ค ์ƒ์„ฑ ์™„๋ฃŒ!")
return file_name, generation.id
async def translate_prompt(prompt):
try:
translated = translator(prompt, max_length=512)[0]['translation_text']
return translated
except Exception as e:
print(f"๋ฒˆ์—ญ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {str(e)}")
return prompt # ๋ฒˆ์—ญ ์‹คํŒจ ์‹œ ์›๋ณธ ํ”„๋กฌํ”„ํŠธ ๋ฐ˜ํ™˜
async def text_to_video(prompt, loop, aspect_ratio, camera_motion, extend_id, reverse_extend_id, interpolate_id1, interpolate_id2, progress=gr.Progress()):
api_key = os.getenv("LMGEN_KEY")
if not api_key:
raise gr.Error("LMGEN_KEY ํ™˜๊ฒฝ ๋ณ€์ˆ˜๊ฐ€ ์„ค์ •๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
client = AsyncLumaAI(auth_token=api_key)
try:
# ํ”„๋กฌํ”„ํŠธ ๋ฒˆ์—ญ
translated_prompt = await translate_prompt(prompt)
print(f"์›๋ณธ ํ”„๋กฌํ”„ํŠธ: {prompt}")
print(f"๋ฒˆ์—ญ๋œ ํ”„๋กฌํ”„ํŠธ: {translated_prompt}")
# aspect_ratio์—์„œ ์‹ค์ œ ๋น„์œจ๋งŒ ์ถ”์ถœ
aspect_ratio = aspect_ratio.split()[0]
interpolate_ids = None
if interpolate_id1 and interpolate_id2:
interpolate_ids = [interpolate_id1, interpolate_id2]
video_path, video_id = await generate_video(
client, translated_prompt, loop, aspect_ratio, camera_motion,
extend_id, reverse_extend_id, interpolate_ids, progress
)
return video_path, video_id, ""
except Exception as e:
return None, None, f"์˜ค๋ฅ˜ ๋ฐœ์ƒ: {str(e)}"
async def image_to_video(prompt, image_url, loop, aspect_ratio, camera_motion, progress=gr.Progress()):
api_key = os.getenv("LMGEN_KEY")
if not api_key:
raise gr.Error("LMGEN_KEY ํ™˜๊ฒฝ ๋ณ€์ˆ˜๊ฐ€ ์„ค์ •๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
client = AsyncLumaAI(auth_token=api_key)
try:
# ํ”„๋กฌํ”„ํŠธ ๋ฒˆ์—ญ
translated_prompt = await translate_prompt(prompt)
print(f"์›๋ณธ ํ”„๋กฌํ”„ํŠธ: {prompt}")
print(f"๋ฒˆ์—ญ๋œ ํ”„๋กฌํ”„ํŠธ: {translated_prompt}")
# aspect_ratio์—์„œ ์‹ค์ œ ๋น„์œจ๋งŒ ์ถ”์ถœ
aspect_ratio = aspect_ratio.split()[0]
generation_params = {
"prompt": translated_prompt + (f" {camera_motion}" if camera_motion else ""),
"loop": loop,
"aspect_ratio": aspect_ratio,
"keyframes": {
"frame0": {
"type": "image",
"url": image_url
}
}
}
progress(0, desc="์ด๋ฏธ์ง€๋กœ๋ถ€ํ„ฐ ๋น„๋””์˜ค ์ƒ์„ฑ ์‹œ์ž‘ ์ค‘...")
generation = await client.generations.create(**generation_params)
progress(0.1, desc="๋น„๋””์˜ค ์ƒ์„ฑ ์‹œ์ž‘๋จ. ์™„๋ฃŒ ๋Œ€๊ธฐ ์ค‘...")
# Poll for completion
start_time = asyncio.get_event_loop().time()
while True:
status = await client.generations.get(id=generation.id)
if status.state == "completed":
break
elif status.state == "failed":
raise Exception("๋น„๋””์˜ค ์ƒ์„ฑ ์‹คํŒจ")
# Update progress based on time elapsed (assuming 60 seconds total)
elapsed_time = asyncio.get_event_loop().time() - start_time
progress_value = min(0.1 + (elapsed_time / 60) * 0.8, 0.9)
progress(progress_value, desc="๋น„๋””์˜ค ์ƒ์„ฑ ์ค‘...")
await asyncio.sleep(5)
progress(0.9, desc="์ƒ์„ฑ๋œ ๋น„๋””์˜ค ๋‹ค์šด๋กœ๋“œ ์ค‘...")
# Download the video
video_url = status.assets.video
async with aiohttp.ClientSession() as session:
async with session.get(video_url) as resp:
if resp.status == 200:
file_name = f"luma_ai_generated_{generation.id}.mp4"
with open(file_name, 'wb') as fd:
while True:
chunk = await resp.content.read(1024)
if not chunk:
break
fd.write(chunk)
progress(1.0, desc="๋น„๋””์˜ค ์ƒ์„ฑ ์™„๋ฃŒ!")
return file_name, generation.id, ""
except Exception as e:
return None, None, f"์˜ค๋ฅ˜ ๋ฐœ์ƒ: {str(e)}"
css = """
footer {
visibility: hidden;
}
"""
with gr.Blocks(theme="Nymbo/Nymbo_Theme", css=css) as demo:
with gr.Tab("ํ…์ŠคํŠธ๋กœ ๋น„๋””์˜ค ๋งŒ๋“ค๊ธฐ"):
prompt = gr.Textbox(label="ํ”„๋กฌํ”„ํŠธ (๋น„๋””์˜ค์— ๋Œ€ํ•œ ์„ค๋ช…์„ ์ž…๋ ฅํ•˜์„ธ์š”)")
generate_btn = gr.Button("๋น„๋””์˜ค ์ƒ์„ฑ")
video_output = gr.Video(label="์ƒ์„ฑ๋œ ๋น„๋””์˜ค")
video_id_output = gr.Textbox(label="์ƒ์„ฑ๋œ ๋น„๋””์˜ค ID", visible=True)
error_output = gr.Textbox(label="์˜ค๋ฅ˜ ๋ฉ”์‹œ์ง€", visible=True)
with gr.Accordion("๊ณ ๊ธ‰ ์˜ต์…˜", open=False):
loop = gr.Checkbox(label="๋ฃจํ”„ (๋น„๋””์˜ค๋ฅผ ๋ฐ˜๋ณต ์žฌ์ƒํ• ์ง€ ์„ค์ •)", value=False)
aspect_ratio = gr.Dropdown(
label="ํ™”๋ฉด ๋น„์œจ",
choices=["16:9 (์™€์ด๋“œ์Šคํฌ๋ฆฐ)", "1:1 (์ •์‚ฌ๊ฐํ˜•)", "9:16 (์„ธ๋กœ ์˜์ƒ)", "4:3 (ํ‘œ์ค€)", "3:4 (์„ธ๋กœ ํ‘œ์ค€)", "21:9 (์šธํŠธ๋ผ์™€์ด๋“œ)", "9:21 (์„ธ๋กœ ์šธํŠธ๋ผ์™€์ด๋“œ)"],
value="16:9 (์™€์ด๋“œ์Šคํฌ๋ฆฐ)"
)
camera_motion = gr.Dropdown(label="์นด๋ฉ”๋ผ ๋ชจ์…˜ (์นด๋ฉ”๋ผ ์›€์ง์ž„ ํšจ๊ณผ ์„ ํƒ)")
extend_id = gr.Textbox(label="ํ™•์žฅํ•  ๋น„๋””์˜ค ID (๊ธฐ์กด ๋น„๋””์˜ค๋ฅผ ์ด์–ด์„œ ์ƒ์„ฑํ•  ๋•Œ ์ž…๋ ฅ)")
reverse_extend_id = gr.Textbox(label="์—ญ๋ฐฉํ–ฅ ํ™•์žฅํ•  ๋น„๋””์˜ค ID (๊ธฐ์กด ๋น„๋””์˜ค์˜ ์•ž๋ถ€๋ถ„์„ ์ƒ์„ฑํ•  ๋•Œ ์ž…๋ ฅ)")
with gr.Row():
interpolate_id1 = gr.Textbox(label="๋ณด๊ฐ„ ๋น„๋””์˜ค ID 1 (๋‘ ๋น„๋””์˜ค ์‚ฌ์ด๋ฅผ ๋ณด๊ฐ„ํ•  ๋•Œ ์ฒซ ๋ฒˆ์งธ ๋น„๋””์˜ค ID)")
interpolate_id2 = gr.Textbox(label="๋ณด๊ฐ„ ๋น„๋””์˜ค ID 2 (๋‘ ๋น„๋””์˜ค ์‚ฌ์ด๋ฅผ ๋ณด๊ฐ„ํ•  ๋•Œ ๋‘ ๋ฒˆ์งธ ๋น„๋””์˜ค ID)")
generate_btn.click(
text_to_video,
inputs=[prompt, loop, aspect_ratio, camera_motion, extend_id, reverse_extend_id, interpolate_id1, interpolate_id2],
outputs=[video_output, video_id_output, error_output]
)
with gr.Tab("์ด๋ฏธ์ง€๋กœ ๋น„๋””์˜ค ๋งŒ๋“ค๊ธฐ"):
img_prompt = gr.Textbox(label="ํ”„๋กฌํ”„ํŠธ (์ด๋ฏธ์ง€๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ์ƒ์„ฑํ•  ๋น„๋””์˜ค์— ๋Œ€ํ•œ ์„ค๋ช…)")
img_url = gr.Textbox(label="์ด๋ฏธ์ง€ URL (๋ณ€ํ™˜ํ•˜๊ณ ์ž ํ•˜๋Š” ์ด๋ฏธ์ง€์˜ ์›น ์ฃผ์†Œ)")
img_generate_btn = gr.Button("์ด๋ฏธ์ง€๋กœ ๋น„๋””์˜ค ์ƒ์„ฑ")
img_video_output = gr.Video(label="์ƒ์„ฑ๋œ ๋น„๋””์˜ค")
img_video_id_output = gr.Textbox(label="์ƒ์„ฑ๋œ ๋น„๋””์˜ค ID", visible=True)
img_error_output = gr.Textbox(label="์˜ค๋ฅ˜ ๋ฉ”์‹œ์ง€", visible=True)
with gr.Accordion("๊ณ ๊ธ‰ ์˜ต์…˜", open=False):
img_loop = gr.Checkbox(label="๋ฃจํ”„ (๋น„๋””์˜ค๋ฅผ ๋ฐ˜๋ณต ์žฌ์ƒํ• ์ง€ ์„ค์ •)", value=False)
img_aspect_ratio = gr.Dropdown(
label="ํ™”๋ฉด ๋น„์œจ",
choices=["16:9 (์™€์ด๋“œ์Šคํฌ๋ฆฐ)", "1:1 (์ •์‚ฌ๊ฐํ˜•)", "9:16 (์„ธ๋กœ ์˜์ƒ)", "4:3 (ํ‘œ์ค€)", "3:4 (์„ธ๋กœ ํ‘œ์ค€)", "21:9 (์šธํŠธ๋ผ์™€์ด๋“œ)", "9:21 (์„ธ๋กœ ์šธํŠธ๋ผ์™€์ด๋“œ)"],
value="16:9 (์™€์ด๋“œ์Šคํฌ๋ฆฐ)"
)
img_camera_motion = gr.Dropdown(label="์นด๋ฉ”๋ผ ๋ชจ์…˜ (์นด๋ฉ”๋ผ ์›€์ง์ž„ ํšจ๊ณผ ์„ ํƒ)")
img_generate_btn.click(
image_to_video,
inputs=[img_prompt, img_url, img_loop, img_aspect_ratio, img_camera_motion],
outputs=[img_video_output, img_video_id_output, img_error_output]
)
async def update_camera_motions():
try:
motions = await get_camera_motions()
return gr.update(choices=motions), gr.update(choices=motions)
except Exception as e:
print(f"์นด๋ฉ”๋ผ ๋ชจ์…˜ ์—…๋ฐ์ดํŠธ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {str(e)}")
return gr.update(choices=[]), gr.update(choices=[])
demo.load(update_camera_motions, outputs=[camera_motion, img_camera_motion])
demo.queue().launch(auth=("gini","pick"))