Spaces:
Sleeping
Sleeping
import streamlit as st | |
import base64 | |
import os | |
from io import BytesIO | |
from pdf2image import convert_from_bytes | |
from dotenv import load_dotenv | |
import json | |
from pydantic import BaseModel, Field, RootModel | |
from typing import List | |
import shutil | |
from moviepy import ImageClip, AudioFileClip, concatenate_videoclips | |
from openai import OpenAI | |
import wave, numpy as np, os | |
from pydub import AudioSegment | |
import io | |
st.set_page_config(page_title="Slide to Video 🎞️") | |
st.title("Slide to Video 🎞️") | |
load_dotenv() | |
API_KEY = os.getenv("API_HUGGINGFACE") | |
BASE_URL = "https://matteoscript-ai.hf.space/v1/" | |
MODEL_NAME = "gemini-2.5-flash" | |
client = OpenAI(api_key=API_KEY, base_url=BASE_URL) | |
if "logged" not in st.session_state: | |
st.session_state.logged = False | |
if st.session_state.logged == False: | |
login_placeholder = st.empty() | |
with login_placeholder.container(): | |
container = st.container(border=True) | |
username = container.text_input('Username') | |
password = container.text_input('Passowrd', type='password') | |
login = container.button(' Login ', type='primary') | |
if not login or username != os.getenv("LOGIN_USER") or password != os.getenv("LOGIN_PASSWORD"): | |
if login: | |
st.error('Password Errata') | |
st.stop() | |
st.session_state.logged = True | |
login_placeholder.empty() | |
class DialogoPagina(BaseModel): | |
"""Contiene il dialogo per una singola pagina.""" | |
page: int = Field(..., description="Il numero della pagina a cui si riferisce il dialogo.") | |
speaker: str = Field(..., description="Battuta del dialogo, pronunciata dallo Speaker.") | |
class DialoghiTTS(BaseModel): | |
"""L'oggetto JSON principale che contiene tutti i dialoghi generati.""" | |
data: List[DialogoPagina] = Field(..., description="Una lista di oggetti, ciascuno contenente il dialogo per una pagina.") | |
class SpeechSegment(BaseModel): | |
speaker: str = Field(..., description="ID dello speaker (es. SPEAKER_00)") | |
start_seconds: float = Field(..., ge=0, description="Secondi di inizio (comprensivi di decimali)") | |
end_seconds: float = Field(..., ge=0, description="Secondi di fine (comprensivi di decimali)") | |
class Speech(RootModel[List[SpeechSegment]]): | |
""" Un modello radice che rappresenta direttamente una lista di SpeechSegment.""" | |
pass | |
def pdf_to_images(pdf_bytes: bytes): | |
"""Converte il PDF in miniature PIL (per anteprima).""" | |
return convert_from_bytes(pdf_bytes, dpi=200) | |
def encode_bytes(file_bytes): | |
"""Codifica i byte di un file in una stringa base64.""" | |
return base64.b64encode(file_bytes).decode("utf-8") | |
def genera_dialoghi_tts(prompt: str, lingua: str, pdf_bytes: bytes, num_pagine: int)-> dict: | |
"""Invia uno o più PDF a OpenAI e restituisce un riassunto.""" | |
prompt_text = f"Per ciascuna delle {num_pagine} pagine del documento, {prompt}. Genera il testo in questa LINGUA: {lingua}. Rispondi solo con JSON conforme con un Array di {num_pagine} oggetti!!!" | |
content = [{"type": "text", "text": prompt_text}] | |
content.append( | |
{ | |
"type": "image_url", | |
"image_url": {"url": f"data:application/pdf;base64,{encode_bytes(pdf_bytes)}"}, | |
} | |
) | |
completion = client.beta.chat.completions.parse( | |
model=MODEL_NAME, | |
messages=[{"role": "user", "content": content}], | |
response_format=DialoghiTTS | |
) | |
return completion.choices[0].message.parsed.model_dump() | |
def generate_text(system_prompt: str, user_request: str): | |
"""Chiama un LLM per chiamata API """ | |
response = client.chat.completions.create( | |
model=MODEL_NAME, | |
temperature=0.2, | |
messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": f"Ecco il testo: {user_request}"}], | |
) | |
return response.choices[0].message.content.strip() | |
# Diarizzazione Audio per Speaker | |
def diarize_by_llm(file_name_audio: str, temp_dir: str, slides_data: list, music: str): | |
"""Suddivide l'audio sulla base dei secondi restituiti dall'LLM.""" | |
original_voice = AudioSegment.from_file(file_name_audio) | |
silence_2s = AudioSegment.silent(duration=2000) # 2000 ms = 2 s | |
voice_with_silence = silence_2s + original_voice + silence_2s | |
print(f"Durata originale: {len(original_voice)/1000:.2f}s | con silenzi: {len(voice_with_silence)/1000:.2f}s") | |
buffer = io.BytesIO() | |
voice_with_silence.export(buffer, format="wav") | |
segments = trascrivi_audio_openai(buffer.getvalue(), slides_data) | |
voice_with_silence.export(file_name_audio, format="wav") | |
if music: | |
add_music(file_name_audio, f"{music}.mp3") | |
full_audio = AudioSegment.from_file(file_name_audio) | |
segs = segments.root | |
for idx in range(1, len(segs)): | |
if idx==1: | |
start_ms = 0 | |
else: | |
start_ms = int(segs[idx - 1].start_seconds * 1000) | |
end_ms = int(segs[idx].start_seconds * 1000) | |
clip = full_audio[start_ms:end_ms] | |
filename = os.path.join(temp_dir, f"slide_{idx}.wav") | |
clip.export(filename, format="wav") | |
print(f"Salvato: {filename} ({start_ms / 1000:.2f}s - {end_ms / 1000:.2f}s)") | |
start_ms = int(segs[-1].start_seconds * 1000) | |
final_clip = full_audio[start_ms:] | |
final_filename = os.path.join(temp_dir, f"slide_{len(segs)}.wav") | |
final_clip.export(final_filename, format="wav") | |
print(f"Salvato: {final_filename} ({start_ms / 1000:.2f}s - {len(full_audio) / 1000:.2f}s)") | |
# Trascrive audio con SECONDI tramite LLM | |
def trascrivi_audio_openai(audio: bytes, slides_data: dict) -> Speech: | |
""" Trascrive AUDIO con secondi per la diarizzazione """ | |
audio_b64 = base64.b64encode(audio).decode() | |
resp = client.beta.chat.completions.parse( | |
model = "gemini-2.5-flash", | |
response_format=Speech, | |
messages=[{ | |
"role": "user", | |
"content": [ | |
{ "type": "text", "text": f"Restituisci un array JSON con esattamente {len(slides_data)} oggetti aventi speaker, start_seconds (decimale), end_seconds (decimale), " | |
f" sulla base del testo delle slides così formattate: {slides_data}"}, | |
{ "type": "input_audio", "input_audio": { "data": audio_b64, "format": "wav"}} | |
] | |
}] | |
) | |
return resp.choices[0].message.parsed | |
# Aggiungi musica di sottofondo | |
def add_music(speech_name, music_name): | |
""" Aggiunge musica di sottofondo alla presentazione """ | |
voice = AudioSegment.from_wav(speech_name) | |
guitar = AudioSegment.from_file(music_name) | |
guitar = guitar - 15 | |
if len(guitar) < len(voice): | |
loops = (len(voice) // len(guitar)) + 1 | |
guitar = guitar * loops | |
guitar = guitar[:len(voice)] | |
final = voice.overlay(guitar) | |
final.export(speech_name, format="wav") | |
print("Creato audio con sottofondo") | |
# Modifica Dialoghi | |
def modifica_dialoghi_con_llm(richiesta_utente: str, dialoghi_attuali: dict) -> DialoghiTTS: | |
""" Usa un LLM per modificare i dialoghi esistenti sulla base di una richiesta utente. """ | |
dialoghi_json_str = json.dumps(dialoghi_attuali, indent=2, ensure_ascii=False) | |
prompt_llm = f""" | |
Sei un assistente editoriale per presentazioni. Il tuo compito è modificare una serie di dialoghi per delle slide in base alla richiesta dell'utente. | |
**Richiesta dell'Utente:** | |
"{richiesta_utente}" | |
---------- | |
**Dialoghi Attuali in formato JSON:** | |
{dialoghi_json_str} | |
---------- | |
**Istruzioni Obbligatorie:** | |
1. Leggi la richiesta dell'utente e modifica i dialoghi nel JSON come richiesto. | |
2. RISPONDI ESCLUSIVAMENTE CON UN OGGETTO JSON VALIDO. | |
3. L'oggetto JSON di risposta deve avere ESATTAMENTE le stesse chiavi (i numeri delle pagine) dell'oggetto JSON che ti ho fornito. Non aggiungere, rimuovere o modificare le chiavi. | |
4. Mantieni la struttura del JSON originale con i testi modificati!. | |
""" | |
completion = client.beta.chat.completions.parse( | |
model=MODEL_NAME, | |
messages=[{"role": "user", "content": prompt_llm},], | |
response_format=DialoghiTTS | |
) | |
try: | |
return completion.choices[0].message.parsed | |
except Exception as e: | |
st.error(f"Errore durante la modifica dei dialoghi con l'LLM: {e}") | |
return {} | |
# ──────────────────────────────────────────────────────────────── | |
# Streamlit UI | |
# ──────────────────────────────────────────────────────────────── | |
# Inizializzazione dello stato della sessione | |
if 'dialoghi' not in st.session_state: | |
st.session_state.dialoghi = None | |
if 'pages_imgs' not in st.session_state: | |
st.session_state.pages_imgs = None | |
if 'video_path' not in st.session_state: | |
st.session_state.video_path = None | |
if 'slides_da_eliminare' not in st.session_state: | |
st.session_state.slides_da_eliminare = [] | |
with st.sidebar: | |
st.header("🔄 Caricamento") | |
uploaded_file = st.file_uploader("Seleziona un file PDF", type=["pdf"]) | |
st.divider() | |
st.header("✍️ Testo") | |
prompt_dialoghi = st.text_area("Prompt di generazione", "Genera un breve dialogo (max 15 parole) adatto ad una presentazione aziendale molto professionale", height=100) | |
lingua = st.selectbox("Lingua", ["Italiano", "Inglese", "Spagnolo", "Polacco", "Tedesco", "Rumeno", "Bresciano"]) | |
base_style_prompt = "Leggi in tono AZIENDALE e professionale per una presentazione molto elegante. Deve essere VELOCE!" | |
if st.button("Genera Testo Dialoghi", type='primary', use_container_width=True) and uploaded_file: | |
st.session_state.video_path = None # Resetta il video precedente | |
st.session_state.slides_da_eliminare = [] # Resetta le slide eliminate | |
pdf_bytes = uploaded_file.getvalue() | |
with st.spinner("Creazione anteprime pagine..."): | |
st.session_state.pages_imgs = pdf_to_images(pdf_bytes) | |
num_pages = len(st.session_state.pages_imgs) | |
with st.spinner("Generazione dialoghi con AI..."): | |
st.session_state.dialoghi = genera_dialoghi_tts(prompt_dialoghi, lingua, pdf_bytes, num_pages) | |
if lingua != "Italiano": | |
with st.spinner("Traduzione stile audio..."): | |
base_style_prompt = generate_text(f"Sei un TRADUTTORE dall'ITALIANO alla lingua {lingua}. Traduci la frase che ti viene assegnata:", base_style_prompt) | |
st.divider() | |
st.subheader("🎤 Voce") | |
voice_info = { | |
"Zephyr": ("Brillante", "female"), | |
"Puck": ("Ritmato", "male"), | |
"Charon": ("Informativa", "male"), | |
"Kore": ("Deciso", "female"), | |
"Fenrir": ("Eccitabile", "male"), | |
"Leda": ("Giovane", "female"), | |
"Orus": ("Aziendale", "male"), | |
"Aoede": ("Arioso", "female"), | |
"Callirrhoe": ("Rilassato", "female"), | |
"Autonoe": ("Brillante", "female"), | |
"Enceladus": ("Respiro", "male"), | |
"Iapetus": ("Sussurrato", "male"), | |
"Umbriel": ("Rilassato", "male"), | |
"Algieba": ("Morbido", "male"), | |
"Despina": ("Morbido", "female"), | |
"Erinome": ("Chiara", "female"), | |
"Algenib": ("Rauco", "male"), | |
"Rasalgethi": ("Informativa", "male"), | |
"Laomedeia": ("Allegro", "female"), | |
"Achernar": ("Soffice", "female"), | |
"Alnilam": ("Aziendale", "male"), | |
"Schedar": ("Neutro", "male"), | |
"Gacrux": ("Per adulti", "female"), | |
"Pulcherrima": ("Avanzato", "female"), | |
"Achird": ("Amichevole", "male"), | |
"Zubenelgenubi": ("Casual", "male"), | |
"Vindemiatrix": ("Delicato", "female"), | |
"Sadachbia": ("Vivace", "male"), | |
"Sadaltager": ("Competente", "male"), | |
"Sulafat": ("Caldo", "female"), | |
} | |
def voice_label(name: str) -> str: | |
style, gender = voice_info[name] | |
symbol = "♀️" if gender == "female" else "♂️" | |
return f"{symbol} {name} - {style}" | |
voice_names = list(voice_info.keys()) | |
speaker1_voice = st.selectbox( | |
"Prima Voce", | |
options=voice_names, | |
index=voice_names.index("Kore"), | |
format_func=voice_label | |
) | |
speaker2_voice = st.selectbox( | |
"Seconda Voce", | |
options=voice_names, | |
index=voice_names.index("Schedar"), | |
format_func=voice_label | |
) | |
style_prompt = st.text_area("Stile", base_style_prompt, height=100) | |
music = st.selectbox("Musica Sottofondo", ["Modern", "Guitar", "Uplifting", "Acoustic", ""]) | |
if st.session_state.dialoghi and st.session_state.pages_imgs: | |
st.subheader("Dialoghi Generati per Slide") | |
st.divider() | |
def elimina_slide(index_da_eliminare): | |
if index_da_eliminare not in st.session_state.slides_da_eliminare: | |
st.session_state.slides_da_eliminare.append(index_da_eliminare) | |
for i, img in enumerate(st.session_state.pages_imgs, 1): | |
if i in st.session_state.slides_da_eliminare: | |
continue | |
with st.container(border=False): | |
col1, col2 = st.columns([0.8, 0.2]) | |
with col1: | |
st.write(f"#### 📄 Slide {i}") | |
with col2: | |
st.button(f"🗑️ Elimina", key=f"delete_{i}", on_click=elimina_slide, args=(i,), use_container_width=True) | |
st.image(img, use_container_width=True) | |
if "data" in st.session_state.dialoghi: | |
dialogo_trovato = next((item.get("speaker", "") for item in st.session_state.dialoghi["data"] if item.get("page") == i), "") | |
st.text_area(f"Dialogo Pagina {i}", value=dialogo_trovato, height=100, key=f"dialogo_{i}", label_visibility="collapsed") | |
st.divider() | |
if st.sidebar.button("Genera Audio & Video", use_container_width=True, type="primary"): | |
with st.spinner("Generazione in corso"): | |
temp_dir = "temp_video_files" | |
if os.path.exists(temp_dir): | |
shutil.rmtree(temp_dir) | |
os.makedirs(temp_dir) | |
video_clips = [] | |
pagine_valide = [(i, img) for i, img in enumerate(st.session_state.pages_imgs, 1) | |
if i not in st.session_state.slides_da_eliminare] | |
num_pagine_valide = len(pagine_valide) | |
content_con_speaker = "" | |
i = 0 | |
slides_data = [] | |
with st.spinner("Generazione audio"): | |
for idx, (page_num, img) in enumerate(pagine_valide): | |
i+=1 | |
dialogo_corrente = st.session_state[f"dialogo_{page_num}"] | |
if i % 2 != 0: | |
content_con_speaker+= f"Speaker 1: {dialogo_corrente}\n\n" | |
else: | |
content_con_speaker+= f"Speaker 2: {dialogo_corrente}\n\n" | |
slide_info = {"numero_slide": i, "testo_slide": dialogo_corrente} | |
slides_data.append(slide_info) | |
file_name_audio = os.path.join(temp_dir, "slides.wav") | |
response = client.audio.speech.create( | |
model="gemini-2.5-flash-preview-tts", | |
input=f"{style_prompt}\n\n{content_con_speaker}", | |
voice=f"{speaker1_voice},{speaker2_voice}", | |
response_format="wav", | |
) | |
response.write_to_file(file_name_audio) | |
diarize_by_llm(file_name_audio, temp_dir, slides_data, music) | |
progress_bar = st.sidebar.progress(0, "Inizio generazione video...") | |
for idx, (page_num, img) in enumerate(pagine_valide): | |
progress_text = f"Elaborazione slide {idx + 1}/{num_pagine_valide} (Pagina originale: {page_num})..." | |
progress_bar.progress(idx / num_pagine_valide, text=progress_text) | |
file_name_audio = os.path.join(temp_dir, f"slide_{idx + 1}.wav") | |
if os.path.exists(file_name_audio): | |
audio_clip = AudioFileClip(file_name_audio) | |
clip = ( | |
ImageClip(np.array(img)) | |
.with_duration(audio_clip.duration) | |
.with_fps(1) | |
.with_audio(audio_clip) | |
) | |
video_clips.append(clip) | |
else: | |
st.warning(f"Audio per slide {page_num} non generato correttamente.") | |
progress_bar.progress(0.9, text="Assemblaggio video finale...") | |
if video_clips: | |
final_video = concatenate_videoclips(video_clips, method="chain") | |
final_video_path = "final_video.mp4" | |
final_video.write_videofile(final_video_path, codec="libx264", audio_codec="aac", fps=1, preset="ultrafast", threads=os.cpu_count(), ffmpeg_params=["-tune", "stillimage", "-movflags", "+faststart"]) | |
st.session_state.video_path = final_video_path | |
else: | |
st.warning("Nessuna slide valida da elaborare. Il video non è stato creato.") | |
#shutil.rmtree(temp_dir) | |
progress_bar.empty() | |
if st.session_state.video_path: | |
st.success("🎉 Video generato con successo!") | |
st.video(st.session_state.video_path) | |
with open(st.session_state.video_path, "rb") as file: | |
st.download_button( | |
label="📥 SCARICA VIDEO", | |
data=file, | |
file_name="presentazione_video.mp4", | |
mime="video/mp4", | |
use_container_width=True, | |
type='primary' | |
) | |
# CHAT INPUT | |
if st.session_state.dialoghi: | |
prompt_modifica = st.chat_input("Come vuoi modificare i dialoghi? (es. 'Rendili più brevi e professionali')") | |
if prompt_modifica: | |
with st.spinner("L'AI sta modificando i dialoghi..."): | |
dialoghi_attuali = {} | |
pagine_visibili = [ | |
i for i in range(1, len(st.session_state.pages_imgs) + 1) | |
if i not in st.session_state.slides_da_eliminare | |
] | |
for page_num in pagine_visibili: | |
key = f"dialogo_{page_num}" | |
if key in st.session_state: | |
dialoghi_attuali[str(page_num)] = st.session_state[key] | |
if not dialoghi_attuali: | |
st.warning("Non ci sono dialoghi da modificare.") | |
else: | |
dialoghi_modificati_obj = modifica_dialoghi_con_llm(prompt_modifica, dialoghi_attuali) | |
if dialoghi_modificati_obj: | |
st.session_state.dialoghi = dialoghi_modificati_obj.model_dump() | |
st.success("Dialoghi aggiornati!") | |
st.rerun() | |
else: | |
st.error("❌ Modifica fallita: L'AI non ha restituito un output valido.") |