Spaces:
Sleeping
Sleeping
File size: 6,508 Bytes
2d8cb17 b5b7a62 2d8cb17 b5b7a62 2d8cb17 b5b7a62 2d8cb17 b5b7a62 2d8cb17 b5b7a62 2d8cb17 b5b7a62 2d8cb17 b5b7a62 2d8cb17 b5b7a62 2d8cb17 b5b7a62 2d8cb17 b8ea041 2d8cb17 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import gradio as gr
import subprocess
import os
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
import pytz # Für Zeitzonen-Management (nicht direkt verwendet, aber gute Praxis)
# --- Konfiguration ---
OUTPUT_DIR = "recordings"
# Erstelle das Verzeichnis, falls es nicht existiert
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Initialisiere den Scheduler
scheduler = BackgroundScheduler()
scheduler.start()
# --- Funktionen für die Aufnahme und Planung ---
def record_radio_stream(stream_url: str, output_filename: str, duration_seconds: int):
"""
Startet die Aufnahme eines Webradio-Streams mit ffmpeg.
"""
full_output_path = os.path.join(OUTPUT_DIR, output_filename)
print(f"🎬 Starte Aufnahme von {stream_url} für {duration_seconds} Sekunden nach {full_output_path}...")
# ffmpeg Kommando:
# -i: Input URL
# -c:a copy: Audio-Codec kopieren (kein Re-Encoding), das spart CPU und Zeit
# -map 0:a: Stelle sicher, dass nur der Audio-Stream gemappt wird
# -t: Dauer der Aufnahme in Sekunden
command = [
"ffmpeg",
"-i", stream_url,
"-c:a", "copy",
"-map", "0:a",
"-t", str(duration_seconds),
full_output_path
]
try:
# Führe den ffmpeg-Befehl aus
# check=True wird eine CalledProcessError erzeugen, wenn ffmpeg fehlschlägt
subprocess.run(command, check=True, capture_output=True)
print(f"✅ Aufnahme abgeschlossen: {full_output_path}")
return full_output_path
except subprocess.CalledProcessError as e:
print(f"❌ Fehler bei der Aufnahme von {stream_url}:")
print(f"Stdout: {e.stdout.decode()}")
print(f"Stderr: {e.stderr.decode()}")
# Lösche unvollständige Datei, falls vorhanden
if os.path.exists(full_output_path):
os.remove(full_output_path)
return None # Signalisiere Fehler
def schedule_recording(stream_url: str, start_datetime_obj: datetime, end_datetime_obj: datetime):
"""
Plant eine Webradio-Aufnahme basierend auf Start- und Endzeit.
Die Zeiten kommen direkt als datetime-Objekte von gr.DateTime.
"""
try:
# Berechne die Dauer der Aufnahme in Sekunden
duration = (end_datetime_obj - start_datetime_obj).total_seconds()
if duration <= 0:
return "❌ Fehler: Die Endzeit muss nach der Startzeit liegen."
# Generiere einen eindeutigen Dateinamen
timestamp = start_datetime_obj.strftime("%Y%m%d_%H%M%S")
output_filename = f"radio_recording_{timestamp}.mp3"
# Füge den Job zum Scheduler hinzu
# 'date' trigger bedeutet, dass der Job zu einem spezifischen Datum und Uhrzeit ausgeführt wird
scheduler.add_job(
record_radio_stream,
'date',
run_date=start_datetime_obj,
args=[stream_url, output_filename, int(duration)] # Dauer als Integer übergeben
)
# Zeige geplante Jobs an (optional, zur Fehlersuche)
# for job in scheduler.get_jobs():
# print(f"Geplanter Job: {job.id} - Nächste Ausführung: {job.next_run_time}")
return f"✅ Aufnahme von **{stream_url}** erfolgreich geplant.\nStart: **{start_datetime_obj.strftime('%Y-%m-%d %H:%M:%S')}** | Ende: **{end_datetime_obj.strftime('%Y-%m-%d %H:%M:%S')}**.\nDatei: **{output_filename}**\nBitte aktualisiere die Dateiliste, nachdem die Aufnahme abgeschlossen ist."
except Exception as e:
return f"❌ Ein unerwarteter Fehler ist aufgetreten: {e}"
def get_recorded_files():
"""
Gibt eine Liste der Pfade zu allen aufgenommenen MP3-Dateien zurück.
"""
files = [os.path.join(OUTPUT_DIR, f) for f in os.listdir(OUTPUT_DIR) if f.endswith(".mp3")]
# Gradio gr.Files erwartet eine Liste von Pfaden.
# Wenn keine Dateien da sind, gibt eine leere Liste zurück.
return files
# --- Gradio UI Definition ---
with gr.Blocks() as demo:
gr.Markdown("# 📻 Webradio Recorder")
gr.Markdown(
"Plane deine Webradio-Aufnahmen! Gib die Stream-URL, Start- und Endzeit an, "
"und die App nimmt den Stream für dich auf. Die fertige Datei kannst du dann herunterladen."
)
with gr.Tab("Aufnahme planen"):
with gr.Row():
stream_url_input = gr.Textbox(
label="Stream URL",
placeholder="z.B. http://stream.laut.fm/meinstream (MP3- oder AAC-Stream)"
)
with gr.Row():
# HIER WURDE GR.DATETIME VERWENDET
start_datetime_input = gr.DateTime(
label="Start Datum & Uhrzeit",
value=datetime.now() + timedelta(minutes=1), # Voreinstellung: 1 Minute in der Zukunft
info="Wähle das Datum und die Uhrzeit, wann die Aufnahme beginnen soll."
)
# HIER WURDE GR.DATETIME VERWENDET
end_datetime_input = gr.DateTime(
label="End Datum & Uhrzeit",
value=datetime.now() + timedelta(minutes=10), # Voreinstellung: 10 Minuten in der Zukunft
info="Wähle das Datum und die Uhrzeit, wann die Aufnahme enden soll."
)
schedule_button = gr.Button("▶️ Aufnahme planen", variant="primary")
schedule_output_message = gr.Textbox(label="Status der Planung", interactive=False)
schedule_button.click(
fn=schedule_recording,
inputs=[stream_url_input, start_datetime_input, end_datetime_input],
outputs=schedule_output_message
)
with gr.Tab("Verfügbare Aufnahmen"):
gr.Markdown("Hier siehst du alle bisher aufgenommenen Dateien.")
refresh_files_button = gr.Button("🔄 Aufnahmen aktualisieren", variant="secondary")
# gr.Files ermöglicht das Herunterladen der Dateien
# HIER WURDE DER PARAMETER 'type' GEÄNDERT
downloadable_files = gr.Files(label="Deine Aufnahmen zum Herunterladen", type="filepath")
# Wenn der "Aktualisieren"-Button geklickt wird, rufe die Funktion auf, um die Dateien zu listen
refresh_files_button.click(
fn=get_recorded_files,
outputs=downloadable_files
)
# Beim Laden der App auch die Dateien einmal anzeigen
demo.load(
fn=get_recorded_files,
outputs=downloadable_files
)
# --- App starten ---
if __name__ == "__main__":
demo.launch() |