reddit-scraper / advanced_scraper_ui.py
milwright's picture
Remove visualization features completely to improve reliability
fdeb244
# Add warning suppression at the very beginning before any other imports
import warnings
warnings.filterwarnings("ignore", message="No secrets files found.*")
import streamlit as st
import pandas as pd
import time
import os
import json
from datetime import datetime
from dotenv import load_dotenv
from enhanced_scraper import EnhancedRedditScraper
# Disable static file serving to prevent the warning
os.environ['STREAMLIT_SERVER_ENABLE_STATIC_SERVING'] = 'false'
# Note: Page configuration and session state initialization are handled in app.py
# Functions
def initialize_scraper(client_id, client_secret, user_agent):
"""Initialize the scraper with API credentials"""
try:
scraper = EnhancedRedditScraper(
client_id=client_id,
client_secret=client_secret,
user_agent=user_agent
)
st.session_state.scraper = scraper
return True
except Exception as e:
st.error(f"Failed to initialize scraper: {str(e)}")
return False
def run_search(subreddits, keywords, limit, sort_by, include_comments,
include_selftext, min_score):
"""Run the search with provided parameters"""
if not st.session_state.scraper:
st.error("Scraper not initialized. Please set up API credentials first.")
return False
try:
with st.spinner("Scraping Reddit..."):
if len(subreddits) == 1:
# Single subreddit search
results = st.session_state.scraper.scrape_subreddit(
subreddit_name=subreddits[0],
keywords=keywords,
limit=limit,
sort_by=sort_by,
include_comments=include_comments,
include_selftext=include_selftext,
min_score=min_score
)
st.session_state.results = {subreddits[0]: results}
else:
# Multiple subreddit search
results = st.session_state.scraper.search_multiple_subreddits(
subreddits=subreddits,
keywords=keywords,
limit=limit,
sort_by=sort_by,
include_comments=include_comments,
include_selftext=include_selftext,
min_score=min_score
)
st.session_state.results = results
# Add to search history
search_info = {
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'subreddits': subreddits,
'keywords': keywords,
'total_results': sum(len(results) for results in st.session_state.results.values())
}
st.session_state.search_history.append(search_info)
return True
except Exception as e:
st.error(f"Search failed: {str(e)}")
return False
def filter_results(results, filters):
"""Apply filters to results"""
filtered = {}
for subreddit, posts in results.items():
filtered_posts = []
for post in posts:
# Apply score filter
if post['score'] < filters['min_score']:
continue
# Apply date filters if set
if filters['date_from'] or filters['date_to']:
post_date = datetime.strptime(post['created_utc'], '%Y-%m-%d %H:%M:%S')
if filters['date_from'] and post_date < filters['date_from']:
continue
if filters['date_to'] and post_date > filters['date_to']:
continue
# Filter for posts with comments if requested
if filters['show_only_with_comments'] and (
'matching_comments' not in post or not post['matching_comments']):
continue
filtered_posts.append(post)
filtered[subreddit] = filtered_posts
return filtered
# Visualization function has been removed
def main():
# Suppress the "No secrets files found" warning
warnings.filterwarnings("ignore", message="No secrets files found.*")
# Ensure session state variables are initialized
if 'results' not in st.session_state:
st.session_state['results'] = None
if 'scraper' not in st.session_state:
st.session_state['scraper'] = None
if 'search_history' not in st.session_state:
st.session_state['search_history'] = []
if 'filters' not in st.session_state:
st.session_state['filters'] = {
'min_score': 0,
'date_from': None,
'date_to': None,
'show_only_with_comments': False
}
# Header using Streamlit's native heading components
st.title("Reddit Scraper")
st.header("Data Collection Tool")
# Sidebar for configuration
with st.sidebar:
st.header("Configuration")
# Search Parameters
st.subheader("Search Parameters")
# Multiple subreddit input
subreddits_input = st.text_area("Subreddits (one per line)", value="cuny\ncollegequestions")
subreddits = [s.strip() for s in subreddits_input.split("\n") if s.strip()]
# Keywords input
keywords_input = st.text_area("Keywords (one per line)", value="question\nhelp\nconfused")
keywords = [k.strip() for k in keywords_input.split("\n") if k.strip()]
# Other parameters
limit = st.slider("Number of posts to scan per subreddit", 10, 200, 50)
sort_by = st.selectbox("Sort posts by", ["hot", "new", "top", "rising"], index=0)
include_selftext = st.checkbox("Include post content in search", value=True)
include_comments = st.checkbox("Include comments in search", value=True)
min_score = st.slider("Minimum score (upvotes)", 0, 1000, 0)
# Action buttons
search_col, clear_col = st.columns(2)
with search_col:
search_button = st.button("Run Search", type="primary", use_container_width=True)
with clear_col:
clear_button = st.button("Clear Results", type="secondary", use_container_width=True)
# Main interface tabs
tab1, tab2, tab3, tab4 = st.tabs(["Results", "Export", "History", "API Credentials"])
# Handle Actions
if clear_button:
st.session_state.results = None
st.rerun()
if search_button:
if not subreddits:
st.error("Please enter at least one subreddit to search.")
elif not keywords:
st.error("Please enter at least one keyword to search.")
else:
success = run_search(
subreddits=subreddits,
keywords=keywords,
limit=limit,
sort_by=sort_by,
include_comments=include_comments,
include_selftext=include_selftext,
min_score=min_score
)
if success:
st.success(f"Search completed! Found results in {len(st.session_state.results)} subreddits.")
# Tab 1: Results
with tab1:
if st.session_state.results:
# Post-search filters
st.markdown('<div class="card">', unsafe_allow_html=True)
st.subheader("Filter Results")
filter_col1, filter_col2, filter_col3 = st.columns(3)
with filter_col1:
st.session_state.filters['min_score'] = st.number_input(
"Minimum score", min_value=0, value=st.session_state.filters['min_score'])
with filter_col2:
st.session_state.filters['date_from'] = st.date_input(
"From date", value=None)
with filter_col3:
st.session_state.filters['date_to'] = st.date_input(
"To date", value=None)
st.session_state.filters['show_only_with_comments'] = st.checkbox(
"Show only posts with matching comments",
value=st.session_state.filters['show_only_with_comments'])
apply_filters = st.button("Apply Filters")
st.markdown('</div>', unsafe_allow_html=True)
# Apply filters if requested
if apply_filters:
filtered_results = filter_results(st.session_state.results, st.session_state.filters)
else:
filtered_results = st.session_state.results
# Show results for each subreddit
total_posts = sum(len(posts) for posts in filtered_results.values())
st.subheader(f"Search Results ({total_posts} posts found)")
for subreddit, posts in filtered_results.items():
with st.expander(f"r/{subreddit} - {len(posts)} posts", expanded=len(filtered_results) == 1):
if posts:
# Create a dataframe for easier viewing
df = pd.DataFrame([{
'Title': p['title'],
'Score': p['score'],
'Comments': p['num_comments'],
'Date': p['created_utc'],
'URL': p['permalink']
} for p in posts])
st.dataframe(df, use_container_width=True)
# Show detailed post view
st.subheader("Post Details")
# Handle the case where there are no posts or only one post
if len(posts) == 0:
st.info(f"No posts found to display details.")
elif len(posts) == 1:
# For a single post, no need for a slider
post_index = 0
st.info(f"Displaying the only post found.")
else:
# For multiple posts, create a slider
post_index = st.slider(f"Select post from r/{subreddit} ({len(posts)} posts)",
0, len(posts)-1, 0)
if len(posts) > 0:
post = posts[post_index]
# Display post details in a card
st.markdown('<div class="card">', unsafe_allow_html=True)
st.markdown(f"### {post['title']}")
st.markdown(f"**Author:** u/{post['author']} | **Score:** {post['score']} | **Comments:** {post['num_comments']}")
st.markdown(f"**Posted on:** {post['created_utc']}")
st.markdown(f"**URL:** [{post['url']}]({post['url']})")
if post['text']:
st.markdown("##### Post Content")
with st.container():
show_content = st.checkbox("Show full content", key=f"content_{subreddit}_{post_index}")
if show_content:
st.text(post['text'])
# Show matching comments if available
if 'matching_comments' in post and post['matching_comments']:
st.markdown(f"##### Matching Comments ({len(post['matching_comments'])})")
with st.container():
show_comments = st.checkbox("Show comments", value=True, key=f"comments_{subreddit}_{post_index}")
if show_comments:
for i, comment in enumerate(post['matching_comments']):
st.markdown(f"**u/{comment['author']}** ({comment['score']} points) - {comment['created_utc']}")
st.text(comment['body'])
if i < len(post['matching_comments']) - 1:
st.divider()
st.markdown('</div>', unsafe_allow_html=True)
else:
st.info(f"No posts found in r/{subreddit} matching the current filters.")
else:
st.info("Configure the search parameters and click 'Run Search' to begin.")
# Show help for first-time users
with st.expander("Help & Tips"):
st.markdown("""
### Quick Start Guide
1. Set up your **API credentials** in the API Credentials tab
2. Enter **subreddits** to search (one per line)
3. Enter **keywords** to filter posts (one per line)
4. Adjust settings as needed
5. Click **Run Search**
### Search Tips
- Use specific keywords for targeted results
- Search multiple related subreddits for better coverage
- Enable comment search to find keywords in discussions
- Export data for external analysis
""")
# Tab 2: Export
with tab2:
if st.session_state.results:
st.subheader("Export Results")
# Apply current filters
filtered_results = filter_results(st.session_state.results, st.session_state.filters)
# Format selection
export_format = st.radio("Export format", ["CSV", "JSON"], horizontal=True)
# Filename input
timestamp = time.strftime("%Y%m%d_%H%M%S")
default_filename = f"reddit_scrape_{timestamp}"
filename = st.text_input("Filename (without extension)", value=default_filename)
# Export button
export_clicked = st.button("Export Data", type="primary")
if export_clicked:
try:
# Combine all results into a flat list for export
all_results = []
for subreddit, posts in filtered_results.items():
for post in posts:
post_copy = post.copy()
post_copy['subreddit'] = subreddit
all_results.append(post_copy)
# Save results based on selected format
if export_format == "CSV":
# Convert to dataframe and save
df = pd.DataFrame(all_results)
# Handle nested structures for CSV
if 'matching_comments' in df.columns:
df['matching_comments'] = df['matching_comments'].apply(
lambda x: json.dumps(x) if isinstance(x, list) else ''
)
csv_file = f"{filename}.csv"
df.to_csv(csv_file, index=False)
# Create download button
with open(csv_file, 'rb') as f:
st.download_button(
label="Download CSV",
data=f,
file_name=csv_file,
mime="text/csv"
)
st.success(f"Exported {len(all_results)} posts to {csv_file}")
else: # JSON
json_file = f"{filename}.json"
with open(json_file, 'w') as f:
json.dump(all_results, f, indent=2)
# Create download button
with open(json_file, 'rb') as f:
st.download_button(
label="Download JSON",
data=f,
file_name=json_file,
mime="application/json"
)
st.success(f"Exported {len(all_results)} posts to {json_file}")
except Exception as e:
st.error(f"Export failed: {str(e)}")
else:
st.info("Run a search to export results.")
# Tab 3: History
with tab3:
st.subheader("Search History")
if st.session_state.search_history:
for i, search in enumerate(reversed(st.session_state.search_history)):
with st.expander(f"Search #{len(st.session_state.search_history)-i}: {search['timestamp']} ({search['total_results']} results)"):
st.markdown(f"**Subreddits:** {', '.join(search['subreddits'])}")
st.markdown(f"**Keywords:** {', '.join(search['keywords'])}")
st.markdown(f"**Results:** {search['total_results']} posts")
st.markdown(f"**Time:** {search['timestamp']}")
else:
st.info("No search history yet.")
# Tab 4: API Credentials - Auto-closed by default
with tab4:
# Initialize session state for credentials if they don't exist
if 'client_id' not in st.session_state:
st.session_state.client_id = ""
if 'client_secret' not in st.session_state:
st.session_state.client_secret = ""
if 'user_agent' not in st.session_state:
st.session_state.user_agent = "RedditScraperApp/1.0"
# In development environment, try to load from .env file for convenience
# But don't do this in production to avoid credential leakage
is_local_dev = not os.environ.get('SPACE_ID') and not os.environ.get('SYSTEM')
if is_local_dev:
load_dotenv()
# Only load from env if session state is empty (first load)
if not st.session_state.client_id:
st.session_state.client_id = os.environ.get("REDDIT_CLIENT_ID", "")
if not st.session_state.client_secret:
st.session_state.client_secret = os.environ.get("REDDIT_CLIENT_SECRET", "")
if st.session_state.user_agent == "RedditScraperApp/1.0":
st.session_state.user_agent = os.environ.get("REDDIT_USER_AGENT", "RedditScraperApp/1.0")
# Two columns for instructions and input
cred_col1, cred_col2 = st.columns([1, 1])
with cred_col1:
st.markdown("""
#### Getting Credentials:
1. Go to [Reddit Developer Portal](https://www.reddit.com/prefs/apps)
2. Click "Create App" or "Create Another App"
3. Fill in details (name, description)
4. Select "script" as application type
5. Use "http://localhost:8000" as redirect URI
6. Click "Create app"
7. Note the client ID and secret
#### Privacy Note
Your credentials are never stored on any servers. For personal copies,
you can set them as Space secrets.
""")
with cred_col2:
# Use session state for the input values
client_id = st.text_input("Client ID", value=st.session_state.client_id, key="client_id_input")
client_secret = st.text_input("Client Secret", value=st.session_state.client_secret, type="password", key="client_secret_input")
user_agent = st.text_input("User Agent", value=st.session_state.user_agent, key="user_agent_input")
# Update session state when input changes
st.session_state.client_id = client_id
st.session_state.client_secret = client_secret
st.session_state.user_agent = user_agent
if st.button("Initialize API Connection", type="primary"):
if initialize_scraper(client_id, client_secret, user_agent):
st.success("API connection established!")
# Set environment variables for the current session
os.environ["REDDIT_CLIENT_ID"] = client_id
os.environ["REDDIT_CLIENT_SECRET"] = client_secret
os.environ["REDDIT_USER_AGENT"] = user_agent
if __name__ == "__main__":
main()