| """ |
| MediaTranscriberPro - Hugging Face Space |
| Final Fix for DNS/IPv6 Issues |
| """ |
| |
| |
| |
| import socket |
| import os |
|
|
| |
| old_getaddrinfo = socket.getaddrinfo |
| def new_getaddrinfo(*args, **kwargs): |
| responses = old_getaddrinfo(*args, **kwargs) |
| return [response for response in responses if response[0] == socket.AF_INET] |
| socket.getaddrinfo = new_getaddrinfo |
| |
|
|
| import gradio as gr |
| import logging |
| import tempfile |
| import shutil |
| import subprocess |
| import re |
| import yt_dlp |
| from pathlib import Path |
| from dataclasses import dataclass |
| from typing import Optional, Callable |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| logger = logging.getLogger(__name__) |
|
|
| |
| SUPPORTED_MEDIA = {".mp3", ".wav", ".m4a", ".aac", ".ogg", ".opus", ".flac", ".mp4", ".mkv", ".avi", ".mov", ".webm"} |
|
|
| @dataclass |
| class Result: |
| success: bool |
| data: Optional[str] = None |
| file_path: Optional[str] = None |
| error: Optional[str] = None |
|
|
| class MediaDownloader: |
| def __init__(self, output_dir): |
| self.output_dir = output_dir |
| self.output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| def download(self, url, progress=None): |
| try: |
| if progress: progress(0.1, "Initializing download...") |
| |
| |
| ydl_opts = { |
| 'format': 'bestaudio/best', |
| 'outtmpl': str(self.output_dir / '%(title)s.%(ext)s'), |
| 'noplaylist': True, |
| 'force_ipv4': True, |
| 'nocheckcertificate': True, |
| 'socket_timeout': 30, |
| 'quiet': True, |
| 'no_warnings': True, |
| |
| 'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36', |
| } |
|
|
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
| info = ydl.extract_info(url, download=True) |
| filename = ydl.prepare_filename(info) |
| file_path = Path(filename) |
| |
| |
| if not file_path.exists(): |
| potential_files = list(self.output_dir.glob("*")) |
| if not potential_files: |
| return Result(False, error="Download finished but file not found.") |
| file_path = max(potential_files, key=lambda x: x.stat().st_mtime) |
|
|
| return Result(True, file_path=str(file_path)) |
|
|
| except Exception as e: |
| logger.error(f"Download Error: {e}") |
| return Result(False, error=str(e)) |
|
|
| class Processor: |
| def __init__(self): |
| self.tmp = Path(tempfile.mkdtemp()) |
| self.downloader = MediaDownloader(self.tmp / "download") |
| |
| |
| self.model = None |
|
|
| def load_model(self): |
| if not self.model: |
| from faster_whisper import WhisperModel |
| self.model = WhisperModel("medium", device="cpu", compute_type="int8") |
|
|
| def run(self, url, upload, lang, progress=gr.Progress()): |
| try: |
| |
| target_file = None |
| if upload: |
| target_file = Path(upload) |
| elif url: |
| res = self.downloader.download(url, progress) |
| if not res.success: return f"❌ Error: {res.error}", None, None |
| target_file = Path(res.file_path) |
| else: |
| return "Please provide URL or File", None, None |
|
|
| |
| progress(0.3, "Loading Model...") |
| self.load_model() |
| |
| progress(0.5, "Transcribing...") |
| lang_code = lang.split("-")[0] |
| segments, _ = self.model.transcribe(str(target_file), language=lang_code, beam_size=5) |
| |
| |
| full_text = [] |
| srt_content = [] |
| for i, seg in enumerate(segments, 1): |
| full_text.append(seg.text) |
| |
| start = f"{int(seg.start//3600):02}:{int((seg.start%3600)//60):02}:{int(seg.start%60):02},000" |
| end = f"{int(seg.end//3600):02}:{int((seg.end%3600)//60):02}:{int(seg.end%60):02},000" |
| srt_content.append(f"{i}\n{start} --> {end}\n{seg.text.strip()}\n") |
| |
| text_str = " ".join(full_text) |
| srt_str = "\n".join(srt_content) |
| |
| |
| out_txt = self.tmp / "transcript.txt" |
| out_srt = self.tmp / "subs.srt" |
| out_txt.write_text(text_str, encoding="utf-8") |
| out_srt.write_text(srt_str, encoding="utf-8") |
| |
| return f"✅ Done! ({len(text_str)} chars)", str(out_txt), str(out_srt) |
|
|
| except Exception as e: |
| return f"❌ Critical Error: {str(e)}", None, None |
|
|
| |
| proc = Processor() |
|
|
| with gr.Blocks(title="Transcriber Pro") as demo: |
| gr.Markdown("## 🎙️ Media Transcriber Pro (IPv4 Fix)") |
| |
| with gr.Row(): |
| url_in = gr.Textbox(label="YouTube URL") |
| file_in = gr.File(label="Upload File") |
| |
| lang_in = gr.Dropdown(["ar", "en"], value="ar", label="Language") |
| btn = gr.Button("Transcribe", variant="primary") |
| |
| status = gr.Textbox(label="Status") |
| with gr.Row(): |
| f1 = gr.File(label="TXT") |
| f2 = gr.File(label="SRT") |
| |
| btn.click(proc.run, [url_in, file_in, lang_in], [status, f1, f2]) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860) |
|
|