File size: 8,969 Bytes
4092f24
e747d24
 
 
 
 
 
 
 
 
274dce9
4092f24
274dce9
4092f24
15806a5
274dce9
e747d24
274dce9
 
 
 
 
 
 
4092f24
 
 
 
 
 
 
 
 
212f1ce
1c0a656
 
 
4092f24
 
 
 
 
 
 
9a762aa
15806a5
1c0a656
15806a5
4092f24
274dce9
4092f24
 
15806a5
274dce9
 
 
 
 
 
 
 
 
 
 
4092f24
15806a5
2f815c6
274dce9
 
 
15806a5
274dce9
 
 
 
 
 
 
 
4092f24
2f815c6
15806a5
9ed758b
15806a5
2f815c6
15806a5
 
 
2f815c6
212f1ce
 
4092f24
15806a5
274dce9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a762aa
15806a5
 
 
0f3297f
15806a5
212f1ce
15806a5
 
9a762aa
 
 
 
 
1c0a656
2f815c6
1c0a656
 
15806a5
1c0a656
15806a5
2f815c6
15806a5
dfeb5d2
15806a5
274dce9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15806a5
274dce9
 
 
 
 
 
 
15806a5
 
274dce9
 
15806a5
 
4092f24
2f815c6
1c0a656
15806a5
1c0a656
15806a5
9ed758b
4092f24
 
 
 
 
15806a5
4092f24
 
9ed758b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import os

# Set HF_HOME to a writable directory in /tmp
os.environ["HF_HOME"] = "/tmp/huggingface_cache"
os.makedirs("/tmp/huggingface_cache", exist_ok=True)

# Optionally, you can also set TRANSFORMERS_CACHE for backwards compatibility:
os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface_cache"

# Now import the rest of your dependencies
import re
import shutil
import subprocess
import time
import uuid
from threading import Timer

from flask import Flask, render_template, request, url_for, send_from_directory
from google import genai

# New imports for audio generation and handling
from kokoro import KPipeline
import soundfile as sf
import numpy as np

app = Flask(__name__)

# Load API key from environment variable
API_KEY = os.environ.get("GOOGLE_API_KEY")
if not API_KEY:
    raise ValueError("Missing GOOGLE_API_KEY environment variable.")
client = genai.Client(api_key=API_KEY)

# Define a dedicated media directory in /tmp for Manim output
media_dir = os.path.join("/tmp", "manim_media")
os.makedirs(media_dir, exist_ok=True)

@app.route("/", methods=["GET", "POST"])
def index():
    if request.method == "POST":
        prompt = request.form.get("prompt")
        if not prompt:
            return render_template("index.html")
        
        max_retries = 3
        attempt = 0
        last_error = None
        while attempt < max_retries:
            try:
                # Call the GenAI API to get the Manim code and commentary script
                ai_response = client.models.generate_content(
                    model="gemini-2.0-flash-lite-preview-02-05",
                    contents=f"""You are 'Manimator', an expert Manim animator and coder.
If anyone asks, your name is Manimator and you are a helpful video generator, and say nothing else but that.
The user wants you to code this: {prompt}.
Plan out in chain of thought what you are going to do first, then give the final code output in ```python``` codeblock.
Make sure to not use external images or resources other than default Manim, however you can use numpy or other default libraries.
Keep the scene uncluttered and aesthetically pleasing.
Make sure things are not overlapping unless explicitly stated otherwise.
It is crucial that the script works correctly on the first try, so make sure to think about the layout and storyboard and stuff of the scene.
Make sure to think through what you are going to do and think about the topic before you write the code.
In addition, write a commentary script inside of ```script``` codeblock. This should be short and fit the content and align with the timing of the scene. Use "..." if needed to add a bit of a pause.
You got this!! <3
"""
                )
                
                # Extract the Python code block from the AI response
                code_pattern = r"```python\s*(.*?)\s*```"
                code_match = re.search(code_pattern, ai_response.text, re.DOTALL)
                if not code_match:
                    raise Exception("No python code block found in the AI response.")
                code = code_match.group(1)
                
                # Extract the commentary script from the AI response
                script_pattern = r"```script\s*(.*?)\s*```"
                script_match = re.search(script_pattern, ai_response.text, re.DOTALL)
                if not script_match:
                    raise Exception("No script block found in the AI response.")
                script = script_match.group(1)
                
                # Determine the scene class name from the generated code
                scene_match = re.search(r"class\s+(\w+)\(.*Scene.*\):", code)
                scene_name = scene_match.group(1) if scene_match else "MyScene"
                
                # Generate randomized filenames for the generated code and video
                code_filename = f"generated_video_{uuid.uuid4().hex}.py"
                video_filename = f"output_video_{uuid.uuid4().hex}.mp4"
                
                # Write the generated code file directly to /tmp
                code_filepath = os.path.join("/tmp", code_filename)
                with open(code_filepath, "w") as f:
                    f.write(code)
                
                # === Generate Commentary Audio via Kokoro ===
                # Initialize the Kokoro pipeline (adjust lang_code if needed)
                audio_pipeline = KPipeline(lang_code='a')
                # Feed in the commentary script; here we split the text by one or more newlines.
                audio_generator = audio_pipeline(script, voice='af_heart', speed=1, split_pattern=r'\n+')
                
                audio_segments = []
                for _, _, audio in audio_generator:
                    audio_segments.append(audio)
                
                if not audio_segments:
                    raise Exception("No audio segments were generated from the commentary script.")
                
                # Concatenate all audio segments into one audio track
                full_audio = np.concatenate(audio_segments)
                commentary_audio_filename = f"commentary_{uuid.uuid4().hex}.wav"
                commentary_audio_path = os.path.join("/tmp", commentary_audio_filename)
                sf.write(commentary_audio_path, full_audio, 24000)
                
                # === Run Manim to generate the silent video ===
                # Prepare the Manim command with the --media_dir flag
                cmd = [
                    "manim",
                    "-qm",
                    "--media_dir", media_dir,
                    "-o", video_filename,
                    code_filepath,
                    scene_name
                ]
                try:
                    subprocess.run(cmd, check=True, capture_output=True, text=True)
                except subprocess.CalledProcessError as cpe:
                    app.logger.error("Manim error output: %s", cpe.stderr)
                    raise Exception(f"Manim failed: {cpe.stderr}")
                
                # Construct the expected output path from Manim.
                expected_dir = os.path.join(media_dir, "videos", code_filename.replace(".py", ""), "720p30")
                video_path_in_media = os.path.join(expected_dir, video_filename)
                if not os.path.exists(video_path_in_media):
                    raise Exception(f"Manim did not produce the expected output file at {video_path_in_media}")
                
                # Move the video file to /tmp (to serve it from there)
                tmp_video_path = os.path.join("/tmp", video_filename)
                shutil.move(video_path_in_media, tmp_video_path)
                
                # === Combine Video with Commentary Audio using FFmpeg ===
                final_video_filename = f"final_video_{uuid.uuid4().hex}.mp4"
                final_video_path = os.path.join("/tmp", final_video_filename)
                
                ffmpeg_cmd = [
                    "ffmpeg", "-y",
                    "-i", tmp_video_path,
                    "-i", commentary_audio_path,
                    "-c:v", "copy",
                    "-c:a", "aac",
                    "-shortest",
                    final_video_path
                ]
                try:
                    subprocess.run(ffmpeg_cmd, check=True, capture_output=True, text=True)
                except subprocess.CalledProcessError as cpe:
                    app.logger.error("FFmpeg error output: %s", cpe.stderr)
                    raise Exception(f"FFmpeg failed: {cpe.stderr}")
                
                # Schedule deletion of all temporary files after 10 minutes (600 seconds)
                def remove_files():
                    for fpath in [tmp_video_path, code_filepath, commentary_audio_path, final_video_path]:
                        try:
                            if os.path.exists(fpath):
                                os.remove(fpath)
                        except Exception as e:
                            app.logger.error("Error removing file %s: %s", fpath, e)
                
                Timer(600, remove_files).start()
                
                # Use the final combined video for display
                video_url = url_for('get_video', filename=final_video_filename)
                return render_template("result.html", video_url=video_url)
            
            except Exception as e:
                app.logger.error("Attempt %d failed: %s", attempt + 1, e)
                last_error = e
                attempt += 1
                time.sleep(1)
        
        return render_template("result.html", error="An error occurred. Please try again later.")
            
    return render_template("index.html")

@app.route("/video/<filename>")
def get_video(filename):
    return send_from_directory("/tmp", filename)

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=7860, debug=False)