waveTest / app.py
Ankur Mahanta
Fix f-string syntax in export content
eebfeee
#chat is working + key topics working
from h2o_wave import main, app, Q, ui, data
from youtube_transcript_api import YouTubeTranscriptApi
from h2ogpte import H2OGPTE
import re
import os
from dotenv import load_dotenv
from collections import Counter
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.sentiment import SentimentIntensityAnalyzer
from textblob import TextBlob
from nltk.tokenize import sent_tokenize
import asyncio
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class TranscriptAnalyzer:
def __init__(self, transcript_list):
# Join all text for general analysis
self.transcript = ' '.join([entry['text'] for entry in transcript_list])
self.transcript_list = transcript_list
self.sia = SentimentIntensityAnalyzer()
self.stop_words = set(stopwords.words('english'))
self.additional_stops = {'um', 'uh', 'like', 'okay', 'right', 'well', 'so'}
self.stop_words.update(self.additional_stops)
self.sentences = sent_tokenize(self.transcript)
self.words = word_tokenize(self.transcript.lower())
def analyze(self):
try:
return {
'word_freq': self._analyze_word_frequency(),
'sentiments': self._analyze_sentiment(),
'topics': self._extract_topics(),
'time_segments': self._create_time_segments()
}
except Exception as e:
logger.error(f"Error in transcript analysis: {e}")
return {
'word_freq': [('no data', 1)],
'sentiments': [0.0],
'topics': [('no topics', 1, ['none'])],
'time_segments': [{
'start_time': '0:00',
'end_time': '0:00',
'text': 'Analysis not available',
'sentiment': 0.0,
'topics': []
}]
}
def _analyze_word_frequency(self):
words = [word.strip('.,!?()[]{}":;') for word in self.words]
words = [word for word in words if (
word.isalnum() and
not word.isnumeric() and
len(word) > 2 and
word not in self.stop_words
)]
return Counter(words).most_common(15)
def _analyze_sentiment(self):
"""Analyze sentiment"""
return [self.sia.polarity_scores(sentence)['compound'] for sentence in self.sentences]
def _extract_topics(self):
blob = TextBlob(self.transcript)
noun_phrases = [phrase for phrase in blob.noun_phrases if len(phrase.split()) >= 2]
topic_counter = Counter(noun_phrases)
topics = []
for topic, count in topic_counter.most_common(10):
topics.append((topic, count, ['related']))
return topics
def _create_time_segments(self):
"""Create time-based segments using actual YouTube timestamps"""
segments = []
segment_size = 5 # Number of transcript entries per segment
for i in range(0, len(self.transcript_list), segment_size):
segment_entries = self.transcript_list[i:i + segment_size]
segment_text = ' '.join([entry['text'] for entry in segment_entries])
# Get start time from first entry and end time from last entry
start_time = segment_entries[0]['start']
end_time = segment_entries[-1]['start'] + segment_entries[-1]['duration']
# Convert times to minutes:seconds format
start_min, start_sec = divmod(int(start_time), 60)
end_min, end_sec = divmod(int(end_time), 60)
sentiment_scores = self.sia.polarity_scores(segment_text)
blob = TextBlob(segment_text)
segments.append({
'start_time': f"{start_min}:{start_sec:02d}",
'end_time': f"{end_min}:{end_sec:02d}",
'text': segment_text,
'sentiment': sentiment_scores['compound'],
'topics': [phrase for phrase in blob.noun_phrases][:3]
})
return segments
# Download NLTK data safely
nltk_dependencies = ['punkt', 'stopwords', 'vader_lexicon']
for dep in nltk_dependencies:
try:
nltk.download(dep, quiet=True)
except Exception as e:
print(f"Error downloading {dep}: {e}")
# Load environment variables
load_dotenv()
# Initialize H2O GPT client
h2ogpt_url = os.getenv('H2OGPT_URL')
h2ogpt_api_key = os.getenv('H2OGPT_API_KEY')
client = H2OGPTE(
address=h2ogpt_url,
api_key=h2ogpt_api_key
)
def analyze_transcript(transcript):
"""Analyze transcript for insights with more sophisticated processing"""
try:
# Word frequency analysis - improved
# Tokenize and clean text more thoroughly
tokens = word_tokenize(transcript.lower())
stop_words = set(stopwords.words('english'))
# Add common transcript-specific words to stop words
additional_stops = {'um', 'uh', 'like', 'okay', 'right', 'well', 'so', 'and', 'the', 'to', 'of', 'in', 'a', 'is', 'that'}
stop_words.update(additional_stops)
# Filter for meaningful words (longer than 2 characters and not numbers)
words = [word for word in tokens if (
word.isalnum() and
not word.isnumeric() and
len(word) > 2 and
word not in stop_words
)]
# Get word frequency for meaningful words
word_freq = Counter(words).most_common(10)
# Enhanced sentiment analysis
# Break transcript into meaningful chunks
sentences = [s.strip() for s in re.split('[.!?]', transcript) if len(s.strip()) > 20]
sentiments = []
sia = SentimentIntensityAnalyzer()
for sentence in sentences:
sentiment_scores = sia.polarity_scores(sentence)
# Use compound score for overall sentiment
sentiments.append(sentiment_scores['compound'])
# If we have too many sentences, sample them to get a representative view
if len(sentiments) > 50:
step = len(sentiments) // 50
sentiments = sentiments[::step][:50]
# Topic extraction with improved filtering
blob = TextBlob(transcript)
noun_phrases = [phrase for phrase in blob.noun_phrases if len(phrase.split()) >= 2] # Only multi-word phrases
topics = Counter(noun_phrases).most_common(5)
# Ensure we have at least some data
if not word_freq:
word_freq = [('no', 1)]
if not sentiments:
sentiments = [0.0]
if not topics:
topics = [('no topics found', 1)]
return {
'word_freq': word_freq,
'sentiments': sentiments,
'topics': topics
}
except Exception as e:
print(f"Error in transcript analysis: {e}")
return {
'word_freq': [('error', 1)],
'sentiments': [0.0],
'topics': [('error', 1)]
}
def create_word_frequency_plot(word_freq):
"""Create word frequency plot components"""
plot_data = data(
fields=['word', 'count'],
rows=[{'word': word, 'count': count} for word, count in word_freq]
)
plot = ui.plot([
ui.mark(
type='interval',
x='=word',
y='=count'
)
])
return plot_data, plot
def create_sentiment_plot(sentiments):
"""Create sentiment plot components"""
plot_data = data(
fields=['index', 'sentiment'],
rows=[{'index': str(i), 'sentiment': score} for i, score in enumerate(sentiments)]
)
plot = ui.plot([
ui.mark(
type='line',
x='=index',
y='=sentiment'
)
])
return plot_data, plot
def extract_video_id(url):
"""Extract YouTube video ID from URL"""
pattern = r'(?:v=|\/)([\w-]{11})(?:\?|\/|&|$)'
match = re.search(pattern, url)
return match.group(1) if match else None
async def get_transcript(video_id: str):
"""Get transcript for YouTube video"""
try:
transcript_list = await asyncio.get_event_loop().run_in_executor(
None, YouTubeTranscriptApi.get_transcript, video_id
)
return transcript_list
except Exception as e:
print(f"Error fetching transcript: {e}") # Using print instead of logger for simplicity
return f"Error: {str(e)}"
async def setup_h2ogpt_collection(transcript, video_id):
"""Setup H2O GPT collection for the video transcript"""
try:
collection_id = client.create_collection(
name=f'YouTube_Video_{video_id}',
description='YouTube video transcript for chat interaction'
)
with open(f'transcript_{video_id}.txt', 'w', encoding='utf-8') as f:
f.write(transcript)
with open(f'transcript_{video_id}.txt', 'rb') as f:
upload_id = client.upload(f'transcript_{video_id}.txt', f)
client.ingest_uploads(collection_id, [upload_id])
os.remove(f'transcript_{video_id}.txt')
return collection_id
except Exception as e:
return f"Error setting up H2O GPT: {str(e)}"
async def get_gpt_response(collection_id, question):
"""Get response from H2O GPT"""
try:
chat_session_id = client.create_chat_session(collection_id)
with client.connect(chat_session_id) as session:
response = session.query(
question,
timeout=60,
rag_config={"rag_type": "rag"}
)
return response.content
except Exception as e:
return f"Error getting response: {str(e)}"
@app('/chatbot')
async def serve(q: Q):
if not q.client.initialized:
q.client.initialized = True
# Header
q.page['header'] = ui.header_card(
box='1 1 12 1',
title='YouTube Video Transcript Chatbot & Analysis | xAmplify',
subtitle='Enter a YouTube URL to analyse and chat about the video content',
color='primary'
)
# URL input form
q.page['url_form'] = ui.form_card(
box='1 2 12 1',
items=[
ui.inline([
ui.textbox(
name='video_url',
# label='YouTube URL',
placeholder='Enter YouTube video URL...',
width='800px'
),
ui.button(
name='submit_url',
label='Fetch Transcript',
primary=True
),
ui.button(
name='clear_chat',
label='Clear Chat',
icon='Delete'
)
])
]
)
# Status card
q.page['status'] = ui.form_card(
box='1 3 12 1',
items=[
ui.text('Please enter a YouTube URL to begin.')
]
)
# Left column - Transcript and Analysis
q.page['transcript'] = ui.form_card(
box='1 4 6 4',
title='Video Transcript',
items=[
ui.text('Transcript will appear here...')
]
)
# Initialize plots with dummy data
q.page['word_freq'] = ui.plot_card(
box='1 8 3 4',
title='Word Frequency Analysis',
# caption='Frequency of significant terms identified in the video content',
data=data('word count', rows=[('', 0)], pack=True),
plot=ui.plot([ui.mark(type='interval', x='=word', y='=count')])
)
q.page['sentiment'] = ui.plot_card(
box='4 8 3 4',
title='Sentiment Flow',
# caption='Emotional tone progression throughout the video',
data=data('index sentiment', rows=[(0, 0)], pack=True),
plot=ui.plot([ui.mark(type='line', x='=index', y='=sentiment')])
)
# Key topics
q.page['topics'] = ui.markdown_card(
box='7 8 6 4',
title='Key Topics',
content='Key topics discussed in the video with their frequency of mention',
)
# Right column - Chat interface
q.page['chat'] = ui.chatbot_card(
box='7 4 6 4',
data=data(fields='content from_user', t='list'),
name='chatbot',
events=['feedback'],
placeholder='Type your question here...',
disabled=True,
)
# Feedback card
q.page['feedback'] = ui.form_card(
box='1 12 12 1',
items=[
ui.inline([
ui.text_l('Response Feedback'),
ui.text(name='feedback_text', content='No feedback yet.'),
ui.button(name='export_chat', label='Export Chat', icon='Download')
])
]
)
# Handle URL submission
if q.args.submit_url:
url = q.args.video_url
video_id = extract_video_id(url)
if not video_id:
q.page['status'].items = [
ui.message_bar(
type='error',
text='Invalid YouTube URL. Please check and try again.'
)
]
return
# Update status to processing
q.page['status'].items = [
ui.progress(label='Processing video transcript...', value=True)
]
await q.page.save()
# Get and process transcript
transcript_list = await get_transcript(video_id)
if isinstance(transcript_list, str) and transcript_list.startswith('Error'):
q.page['status'].items = [
ui.message_bar(type='error', text=transcript_list)
]
return
# Store transcript and analyze
q.client.transcript = ' '.join([entry['text'] for entry in transcript_list])
analyzer = TranscriptAnalyzer(transcript_list)
analysis = analyzer.analyze()
# Update transcript display with time segments
transcript_items = []
transcript_items.append(ui.text_xl('Video Transcript'))
transcript_items.append(ui.separator())
for segment in analysis['time_segments']:
# Add timestamp header with markdown for bold text
transcript_items.append(
ui.text(
f"**[{segment['start_time']} - {segment['end_time']}]**",
size='s'
)
)
# Add segment text
transcript_items.append(ui.text(segment['text']))
# Add sentiment indicator
sentiment_value = (segment['sentiment'] + 1) / 2 # Convert from [-1,1] to [0,1]
transcript_items.append(
ui.progress(
label='Sentiment',
value=sentiment_value,
caption=f"{'Positive' if segment['sentiment'] > 0.1 else 'Negative' if segment['sentiment'] < -0.1 else 'Neutral'}"
)
)
# Add segment topics if available
if segment['topics']:
transcript_items.append(
ui.text(
f"Topics: {', '.join(segment['topics'])}",
size='s'
)
)
# Add separator between segments
transcript_items.append(ui.separator())
q.page['transcript'].items = transcript_items
# Update analysis visualizations
word_freq_data = [(word, count) for word, count in analysis['word_freq']]
q.page['word_freq'].data = data('word count', rows=word_freq_data, pack=True)
# Update sentiment plot
sentiment_data = [(i, score) for i, score in enumerate(analysis['sentiments'])]
q.page['sentiment'].data = data('index sentiment', rows=sentiment_data, pack=True)
# Update topics
topics_md = "## Key Topics Discussed\n" + "\n".join([
f"- {topic} ({count} mentions)\n Related: {', '.join(related)}"
for topic, count, related in analysis['topics']
])
q.page['topics'].content = topics_md
# Setup H2O GPT collection
collection_id = await setup_h2ogpt_collection(q.client.transcript, video_id)
if isinstance(collection_id, str) and collection_id.startswith('Error'):
q.page['status'].items = [
ui.message_bar(type='error', text=collection_id)
]
return
# Store collection ID
q.client.collection_id = collection_id
# Enable chat and update status
q.page['chat'].disabled = False
q.page['status'].items = [
ui.message_bar(
type='success',
text='Transcript processed successfully! You can now ask questions about the video.'
)
]
# Handle chat clear
if q.args.clear_chat:
q.page['chat'].data = data(fields='content from_user', t='list')
q.page['feedback'].items[0][1].content = 'Chat history cleared.'
# Handle chat export
if q.args.export_chat:
if hasattr(q.client, 'transcript'):
chat_history = []
for msg in q.page['chat'].data:
# Updated to use User: and Response: prefixes
prefix = "User: " if msg[1] else "Response: "
chat_history.append(f'{prefix}{msg[0]}')
# Create formatted chat history text
chat_history_text = '\n'.join(chat_history)
# Create full export content
export_content = f'''YouTube Video Transcript Chatbot
Transcript:
{q.client.transcript}
Chat History:
{chat_history_text}'''
q.page['export'] = ui.form_card(
box='1 13 12 2',
items=[
ui.text_area(
name='export_content',
label='Chat Export (Copy and save)',
value=export_content,
height='200px'
)
]
)
# Handle chat messages
if q.args.chatbot:
# Add user message with "User:" prefix
user_message = f"User: {q.args.chatbot}"
q.page['chat'].data += [user_message, True]
await q.page.save()
if hasattr(q.client, 'collection_id'):
response = await get_gpt_response(q.client.collection_id, q.args.chatbot)
# Add response with "Response:" prefix
formatted_response = f"Response: {response}"
q.page['chat'].data += [formatted_response, False]
else:
# Add error message with "Response:" prefix
q.page['chat'].data += ['Response: Please fetch a video transcript first.', False]
# Handle feedback
if q.events.chatbot and q.events.chatbot.feedback:
feedback = q.events.chatbot.feedback
q.page['feedback'].items[0][1].content = f'Latest feedback: {feedback.type} on "{feedback.message}"'
await q.page.save()