| | |
| | from h2o_wave import main, app, Q, ui, data |
| | from youtube_transcript_api import YouTubeTranscriptApi |
| | from h2ogpte import H2OGPTE |
| | import re |
| | import os |
| | from dotenv import load_dotenv |
| | from collections import Counter |
| | import nltk |
| | from nltk.tokenize import word_tokenize |
| | from nltk.corpus import stopwords |
| | from nltk.sentiment import SentimentIntensityAnalyzer |
| | from textblob import TextBlob |
| | from nltk.tokenize import sent_tokenize |
| | import asyncio |
| | import logging |
| |
|
| | logging.basicConfig( |
| | level=logging.INFO, |
| | format='%(asctime)s - %(levelname)s - %(message)s' |
| | ) |
| | logger = logging.getLogger(__name__) |
| |
|
| | class TranscriptAnalyzer: |
| | def __init__(self, transcript_list): |
| | |
| | self.transcript = ' '.join([entry['text'] for entry in transcript_list]) |
| | self.transcript_list = transcript_list |
| | self.sia = SentimentIntensityAnalyzer() |
| | self.stop_words = set(stopwords.words('english')) |
| | self.additional_stops = {'um', 'uh', 'like', 'okay', 'right', 'well', 'so'} |
| | self.stop_words.update(self.additional_stops) |
| | |
| | self.sentences = sent_tokenize(self.transcript) |
| | self.words = word_tokenize(self.transcript.lower()) |
| |
|
| | def analyze(self): |
| | try: |
| | return { |
| | 'word_freq': self._analyze_word_frequency(), |
| | 'sentiments': self._analyze_sentiment(), |
| | 'topics': self._extract_topics(), |
| | 'time_segments': self._create_time_segments() |
| | } |
| | except Exception as e: |
| | logger.error(f"Error in transcript analysis: {e}") |
| | return { |
| | 'word_freq': [('no data', 1)], |
| | 'sentiments': [0.0], |
| | 'topics': [('no topics', 1, ['none'])], |
| | 'time_segments': [{ |
| | 'start_time': '0:00', |
| | 'end_time': '0:00', |
| | 'text': 'Analysis not available', |
| | 'sentiment': 0.0, |
| | 'topics': [] |
| | }] |
| | } |
| | |
| | def _analyze_word_frequency(self): |
| | words = [word.strip('.,!?()[]{}":;') for word in self.words] |
| | words = [word for word in words if ( |
| | word.isalnum() and |
| | not word.isnumeric() and |
| | len(word) > 2 and |
| | word not in self.stop_words |
| | )] |
| | return Counter(words).most_common(15) |
| |
|
| | def _analyze_sentiment(self): |
| | """Analyze sentiment""" |
| | return [self.sia.polarity_scores(sentence)['compound'] for sentence in self.sentences] |
| |
|
| | def _extract_topics(self): |
| | blob = TextBlob(self.transcript) |
| | noun_phrases = [phrase for phrase in blob.noun_phrases if len(phrase.split()) >= 2] |
| | topic_counter = Counter(noun_phrases) |
| | |
| | topics = [] |
| | for topic, count in topic_counter.most_common(10): |
| | topics.append((topic, count, ['related'])) |
| | return topics |
| | |
| | def _create_time_segments(self): |
| | """Create time-based segments using actual YouTube timestamps""" |
| | segments = [] |
| | segment_size = 5 |
| | |
| | for i in range(0, len(self.transcript_list), segment_size): |
| | segment_entries = self.transcript_list[i:i + segment_size] |
| | segment_text = ' '.join([entry['text'] for entry in segment_entries]) |
| | |
| | |
| | start_time = segment_entries[0]['start'] |
| | end_time = segment_entries[-1]['start'] + segment_entries[-1]['duration'] |
| | |
| | |
| | start_min, start_sec = divmod(int(start_time), 60) |
| | end_min, end_sec = divmod(int(end_time), 60) |
| | |
| | sentiment_scores = self.sia.polarity_scores(segment_text) |
| | blob = TextBlob(segment_text) |
| | |
| | segments.append({ |
| | 'start_time': f"{start_min}:{start_sec:02d}", |
| | 'end_time': f"{end_min}:{end_sec:02d}", |
| | 'text': segment_text, |
| | 'sentiment': sentiment_scores['compound'], |
| | 'topics': [phrase for phrase in blob.noun_phrases][:3] |
| | }) |
| | |
| | return segments |
| |
|
| |
|
| | |
| | nltk_dependencies = ['punkt', 'stopwords', 'vader_lexicon'] |
| | for dep in nltk_dependencies: |
| | try: |
| | nltk.download(dep, quiet=True) |
| | except Exception as e: |
| | print(f"Error downloading {dep}: {e}") |
| |
|
| | |
| | load_dotenv() |
| |
|
| | |
| | h2ogpt_url = os.getenv('H2OGPT_URL') |
| | h2ogpt_api_key = os.getenv('H2OGPT_API_KEY') |
| |
|
| | client = H2OGPTE( |
| | address=h2ogpt_url, |
| | api_key=h2ogpt_api_key |
| | ) |
| |
|
| | def analyze_transcript(transcript): |
| | """Analyze transcript for insights with more sophisticated processing""" |
| | try: |
| | |
| | |
| | tokens = word_tokenize(transcript.lower()) |
| | stop_words = set(stopwords.words('english')) |
| | |
| | additional_stops = {'um', 'uh', 'like', 'okay', 'right', 'well', 'so', 'and', 'the', 'to', 'of', 'in', 'a', 'is', 'that'} |
| | stop_words.update(additional_stops) |
| | |
| | |
| | words = [word for word in tokens if ( |
| | word.isalnum() and |
| | not word.isnumeric() and |
| | len(word) > 2 and |
| | word not in stop_words |
| | )] |
| | |
| | |
| | word_freq = Counter(words).most_common(10) |
| | |
| | |
| | |
| | sentences = [s.strip() for s in re.split('[.!?]', transcript) if len(s.strip()) > 20] |
| | sentiments = [] |
| | sia = SentimentIntensityAnalyzer() |
| | |
| | for sentence in sentences: |
| | sentiment_scores = sia.polarity_scores(sentence) |
| | |
| | sentiments.append(sentiment_scores['compound']) |
| | |
| | |
| | if len(sentiments) > 50: |
| | step = len(sentiments) // 50 |
| | sentiments = sentiments[::step][:50] |
| | |
| | |
| | blob = TextBlob(transcript) |
| | noun_phrases = [phrase for phrase in blob.noun_phrases if len(phrase.split()) >= 2] |
| | topics = Counter(noun_phrases).most_common(5) |
| | |
| | |
| | if not word_freq: |
| | word_freq = [('no', 1)] |
| | if not sentiments: |
| | sentiments = [0.0] |
| | if not topics: |
| | topics = [('no topics found', 1)] |
| | |
| | return { |
| | 'word_freq': word_freq, |
| | 'sentiments': sentiments, |
| | 'topics': topics |
| | } |
| | except Exception as e: |
| | print(f"Error in transcript analysis: {e}") |
| | return { |
| | 'word_freq': [('error', 1)], |
| | 'sentiments': [0.0], |
| | 'topics': [('error', 1)] |
| | } |
| |
|
| | def create_word_frequency_plot(word_freq): |
| | """Create word frequency plot components""" |
| | plot_data = data( |
| | fields=['word', 'count'], |
| | rows=[{'word': word, 'count': count} for word, count in word_freq] |
| | ) |
| | plot = ui.plot([ |
| | ui.mark( |
| | type='interval', |
| | x='=word', |
| | y='=count' |
| | ) |
| | ]) |
| | return plot_data, plot |
| |
|
| | def create_sentiment_plot(sentiments): |
| | """Create sentiment plot components""" |
| | plot_data = data( |
| | fields=['index', 'sentiment'], |
| | rows=[{'index': str(i), 'sentiment': score} for i, score in enumerate(sentiments)] |
| | ) |
| | plot = ui.plot([ |
| | ui.mark( |
| | type='line', |
| | x='=index', |
| | y='=sentiment' |
| | ) |
| | ]) |
| | return plot_data, plot |
| |
|
| | def extract_video_id(url): |
| | """Extract YouTube video ID from URL""" |
| | pattern = r'(?:v=|\/)([\w-]{11})(?:\?|\/|&|$)' |
| | match = re.search(pattern, url) |
| | return match.group(1) if match else None |
| |
|
| | async def get_transcript(video_id: str): |
| | """Get transcript for YouTube video""" |
| | try: |
| | transcript_list = await asyncio.get_event_loop().run_in_executor( |
| | None, YouTubeTranscriptApi.get_transcript, video_id |
| | ) |
| | return transcript_list |
| | |
| | except Exception as e: |
| | print(f"Error fetching transcript: {e}") |
| | return f"Error: {str(e)}" |
| |
|
| | async def setup_h2ogpt_collection(transcript, video_id): |
| | """Setup H2O GPT collection for the video transcript""" |
| | try: |
| | collection_id = client.create_collection( |
| | name=f'YouTube_Video_{video_id}', |
| | description='YouTube video transcript for chat interaction' |
| | ) |
| | |
| | with open(f'transcript_{video_id}.txt', 'w', encoding='utf-8') as f: |
| | f.write(transcript) |
| | |
| | with open(f'transcript_{video_id}.txt', 'rb') as f: |
| | upload_id = client.upload(f'transcript_{video_id}.txt', f) |
| | |
| | client.ingest_uploads(collection_id, [upload_id]) |
| | os.remove(f'transcript_{video_id}.txt') |
| | |
| | return collection_id |
| | except Exception as e: |
| | return f"Error setting up H2O GPT: {str(e)}" |
| |
|
| | async def get_gpt_response(collection_id, question): |
| | """Get response from H2O GPT""" |
| | try: |
| | chat_session_id = client.create_chat_session(collection_id) |
| | with client.connect(chat_session_id) as session: |
| | response = session.query( |
| | question, |
| | timeout=60, |
| | rag_config={"rag_type": "rag"} |
| | ) |
| | return response.content |
| | except Exception as e: |
| | return f"Error getting response: {str(e)}" |
| |
|
| | @app('/chatbot') |
| | async def serve(q: Q): |
| | if not q.client.initialized: |
| | q.client.initialized = True |
| | |
| | |
| | q.page['header'] = ui.header_card( |
| | box='1 1 12 1', |
| | title='YouTube Video Transcript Chatbot & Analysis | xAmplify', |
| | subtitle='Enter a YouTube URL to analyse and chat about the video content', |
| | color='primary' |
| | ) |
| | |
| | |
| | q.page['url_form'] = ui.form_card( |
| | box='1 2 12 1', |
| | items=[ |
| | ui.inline([ |
| | ui.textbox( |
| | name='video_url', |
| | |
| | placeholder='Enter YouTube video URL...', |
| | width='800px' |
| | ), |
| | ui.button( |
| | name='submit_url', |
| | label='Fetch Transcript', |
| | primary=True |
| | ), |
| | ui.button( |
| | name='clear_chat', |
| | label='Clear Chat', |
| | icon='Delete' |
| | ) |
| | ]) |
| | ] |
| | ) |
| | |
| | |
| | q.page['status'] = ui.form_card( |
| | box='1 3 12 1', |
| | items=[ |
| | ui.text('Please enter a YouTube URL to begin.') |
| | ] |
| | ) |
| | |
| | |
| | q.page['transcript'] = ui.form_card( |
| | box='1 4 6 4', |
| | title='Video Transcript', |
| | items=[ |
| | ui.text('Transcript will appear here...') |
| | ] |
| | ) |
| | |
| | |
| | q.page['word_freq'] = ui.plot_card( |
| | box='1 8 3 4', |
| | title='Word Frequency Analysis', |
| | |
| | data=data('word count', rows=[('', 0)], pack=True), |
| | plot=ui.plot([ui.mark(type='interval', x='=word', y='=count')]) |
| | ) |
| | |
| | q.page['sentiment'] = ui.plot_card( |
| | box='4 8 3 4', |
| | title='Sentiment Flow', |
| | |
| | data=data('index sentiment', rows=[(0, 0)], pack=True), |
| | plot=ui.plot([ui.mark(type='line', x='=index', y='=sentiment')]) |
| | ) |
| | |
| | |
| | q.page['topics'] = ui.markdown_card( |
| | box='7 8 6 4', |
| | title='Key Topics', |
| | content='Key topics discussed in the video with their frequency of mention', |
| | ) |
| | |
| | |
| | q.page['chat'] = ui.chatbot_card( |
| | box='7 4 6 4', |
| | data=data(fields='content from_user', t='list'), |
| | name='chatbot', |
| | events=['feedback'], |
| | placeholder='Type your question here...', |
| | disabled=True, |
| | ) |
| | |
| | |
| | q.page['feedback'] = ui.form_card( |
| | box='1 12 12 1', |
| | items=[ |
| | ui.inline([ |
| | ui.text_l('Response Feedback'), |
| | ui.text(name='feedback_text', content='No feedback yet.'), |
| | ui.button(name='export_chat', label='Export Chat', icon='Download') |
| | ]) |
| | ] |
| | ) |
| | |
| | |
| | if q.args.submit_url: |
| | url = q.args.video_url |
| | video_id = extract_video_id(url) |
| | |
| | if not video_id: |
| | q.page['status'].items = [ |
| | ui.message_bar( |
| | type='error', |
| | text='Invalid YouTube URL. Please check and try again.' |
| | ) |
| | ] |
| | return |
| | |
| | |
| | q.page['status'].items = [ |
| | ui.progress(label='Processing video transcript...', value=True) |
| | ] |
| | await q.page.save() |
| | |
| | |
| | transcript_list = await get_transcript(video_id) |
| | if isinstance(transcript_list, str) and transcript_list.startswith('Error'): |
| | q.page['status'].items = [ |
| | ui.message_bar(type='error', text=transcript_list) |
| | ] |
| | return |
| | |
| | |
| | q.client.transcript = ' '.join([entry['text'] for entry in transcript_list]) |
| | analyzer = TranscriptAnalyzer(transcript_list) |
| | analysis = analyzer.analyze() |
| | |
| | |
| | transcript_items = [] |
| | transcript_items.append(ui.text_xl('Video Transcript')) |
| | transcript_items.append(ui.separator()) |
| |
|
| | for segment in analysis['time_segments']: |
| | |
| | transcript_items.append( |
| | ui.text( |
| | f"**[{segment['start_time']} - {segment['end_time']}]**", |
| | size='s' |
| | ) |
| | ) |
| | |
| | |
| | transcript_items.append(ui.text(segment['text'])) |
| | |
| | |
| | sentiment_value = (segment['sentiment'] + 1) / 2 |
| | transcript_items.append( |
| | ui.progress( |
| | label='Sentiment', |
| | value=sentiment_value, |
| | caption=f"{'Positive' if segment['sentiment'] > 0.1 else 'Negative' if segment['sentiment'] < -0.1 else 'Neutral'}" |
| | ) |
| | ) |
| | |
| | |
| | if segment['topics']: |
| | transcript_items.append( |
| | ui.text( |
| | f"Topics: {', '.join(segment['topics'])}", |
| | size='s' |
| | ) |
| | ) |
| | |
| | |
| | transcript_items.append(ui.separator()) |
| | |
| | q.page['transcript'].items = transcript_items |
| | |
| | |
| | word_freq_data = [(word, count) for word, count in analysis['word_freq']] |
| | q.page['word_freq'].data = data('word count', rows=word_freq_data, pack=True) |
| |
|
| | |
| | sentiment_data = [(i, score) for i, score in enumerate(analysis['sentiments'])] |
| | q.page['sentiment'].data = data('index sentiment', rows=sentiment_data, pack=True) |
| | |
| | |
| | topics_md = "## Key Topics Discussed\n" + "\n".join([ |
| | f"- {topic} ({count} mentions)\n Related: {', '.join(related)}" |
| | for topic, count, related in analysis['topics'] |
| | ]) |
| | q.page['topics'].content = topics_md |
| | |
| | |
| | collection_id = await setup_h2ogpt_collection(q.client.transcript, video_id) |
| | if isinstance(collection_id, str) and collection_id.startswith('Error'): |
| | q.page['status'].items = [ |
| | ui.message_bar(type='error', text=collection_id) |
| | ] |
| | return |
| | |
| | |
| | q.client.collection_id = collection_id |
| | |
| | |
| | q.page['chat'].disabled = False |
| | q.page['status'].items = [ |
| | ui.message_bar( |
| | type='success', |
| | text='Transcript processed successfully! You can now ask questions about the video.' |
| | ) |
| | ] |
| | |
| | |
| | if q.args.clear_chat: |
| | q.page['chat'].data = data(fields='content from_user', t='list') |
| | q.page['feedback'].items[0][1].content = 'Chat history cleared.' |
| | |
| | |
| | if q.args.export_chat: |
| | if hasattr(q.client, 'transcript'): |
| | chat_history = [] |
| | for msg in q.page['chat'].data: |
| | |
| | prefix = "User: " if msg[1] else "Response: " |
| | chat_history.append(f'{prefix}{msg[0]}') |
| | |
| | |
| | chat_history_text = '\n'.join(chat_history) |
| |
|
| | |
| | export_content = f'''YouTube Video Transcript Chatbot |
| | |
| | Transcript: |
| | {q.client.transcript} |
| | |
| | Chat History: |
| | {chat_history_text}''' |
| | |
| | q.page['export'] = ui.form_card( |
| | box='1 13 12 2', |
| | items=[ |
| | ui.text_area( |
| | name='export_content', |
| | label='Chat Export (Copy and save)', |
| | value=export_content, |
| | height='200px' |
| | ) |
| | ] |
| | ) |
| | |
| | |
| | if q.args.chatbot: |
| | |
| | user_message = f"User: {q.args.chatbot}" |
| | q.page['chat'].data += [user_message, True] |
| | await q.page.save() |
| | |
| | if hasattr(q.client, 'collection_id'): |
| | response = await get_gpt_response(q.client.collection_id, q.args.chatbot) |
| | |
| | formatted_response = f"Response: {response}" |
| | q.page['chat'].data += [formatted_response, False] |
| | else: |
| | |
| | q.page['chat'].data += ['Response: Please fetch a video transcript first.', False] |
| | |
| | |
| | if q.events.chatbot and q.events.chatbot.feedback: |
| | feedback = q.events.chatbot.feedback |
| | q.page['feedback'].items[0][1].content = f'Latest feedback: {feedback.type} on "{feedback.message}"' |
| | |
| | await q.page.save() |