Spaces:
Sleeping
Sleeping
File size: 1,481 Bytes
b7f86d8 0cf3992 40d6c37 99fdf25 0cf3992 b7f86d8 0cf3992 b7f86d8 0cf3992 40d6c37 49af23d b7f86d8 40d6c37 b7f86d8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
from youtube_transcript_api import YouTubeTranscriptApi
from core.state import AgenticState
import asyncio
from loguru import logger
@logger.catch
async def node_2_transcript_extraction(state: AgenticState) -> AgenticState:
"""
Node 2
Extract transcript from YouTube using youtube-transcript-api
"""
logger.info("🚀 Node 2: Transcript Extraction started...")
video_id = state.video_id
try:
# Running the blocking API call in a thread pool
transcript = await asyncio.to_thread(
lambda vid: YouTubeTranscriptApi().fetch(vid, languages=["en", "de"]),
video_id
)
# Convert subtitle segments to clean text
segments = []
for snippet in transcript:
text = snippet.text.strip()
# Remove newline artifacts inside subtitles
text = text.replace("\n", " ")
segments.append(text)
transcript_text = " ".join(segments)
state.raw_transcript_text = transcript_text
state.transcript_source = "youtube_transcript_api"
logger.info("✅ Node 2 Complete")
logger.info(
"✅ Node 2: Transcript extracted | chars={char_count}",
char_count=len(transcript_text)
)
except Exception as e:
logger.opt(exception=e).error("""Transcript extraction failed.""")
state.raw_transcript_text = ""
state.transcript_source = "failed"
return state
|