Spaces:
Running
Running
from fastapi import FastAPI, HTTPException, Request | |
from fastapi.responses import HTMLResponse | |
from fastapi.templating import Jinja2Templates | |
from fastapi.staticfiles import StaticFiles | |
from pydantic import HttpUrl, EmailStr | |
from scraper import scrape_page | |
from summarizer import quick_summarize | |
from rich_card_builder import build_rich_card | |
from send_email import send_rcs_email | |
import asyncio | |
from urllib.parse import urlparse | |
import logging | |
import uuid | |
import json | |
import http.client | |
from dotenv import load_dotenv | |
import os | |
import google.generativeai as genai | |
from typing import Optional, List, Dict | |
load_dotenv() | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
app = FastAPI(title="Website Scraper API (Enhanced for RCS)") | |
# Mount static files | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Set up Jinja2 templates | |
templates = Jinja2Templates(directory="templates") | |
# In-memory session storage | |
sessions = {} | |
async def generate_dynamic_buttons(title: str, description: str, url: str, next_interaction: str = None) -> List[Dict]: | |
"""Generate dynamic quick reply buttons for the next interaction using Gemini-1.5 pro.""" | |
try: | |
# Validate inputs with defaults | |
title = title.strip() if title and title.strip() else "News Summary" | |
description = description.strip() if description and description.strip() else "Explore news and insights." | |
url = url.strip() if url and url.strip() else "https://example.com" | |
logging.info(f"Generating buttons for: title={title}, description={description[:30]}..., url={url}") | |
# Get Gemini API key | |
api_key = os.getenv("GEMINI_API_KEY") | |
if not api_key: | |
logging.error("Gemini API key not found. Please set GEMINI_API_KEY in .env file.") | |
return [ | |
{ | |
"type": "postback", | |
"title": "View Details", | |
"payload": f"goto_{next_interaction}", | |
"execute": next_interaction | |
} | |
] if next_interaction else [] | |
# Configure Gemini client | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel('gemini-1.5-pro') | |
# Combine inputs | |
input_text = f"Title: {title}\nDescription: {description}\nURL: {url}" | |
input_text = input_text[:500] # Truncate to avoid exceeding limits | |
# Optimized prompt for dynamic, contextually relevant buttons | |
prompt = ( | |
f"Based on the following content, generate up to two concise (3-8 words) quick reply button titles that are action-oriented, engaging, and relevant to the content. Avoid generic terms like 'Show Next' or 'Explore More'. Return the titles as a JSON array of strings.\n\n" | |
f"Content:\n{input_text}\n\n" | |
f"Example output: [\"Discover New Styles\", \"Shop Now Online\"]\n" | |
f"Return only the JSON array, no markdown or extra text." | |
) | |
# Retry mechanism for API calls | |
max_retries = 3 | |
for attempt in range(max_retries): | |
try: | |
response = await model.generate_content_async(prompt) | |
raw_content = response.text.strip() | |
logging.info(f"Gemini response: {raw_content}") | |
# Remove markdown code block markers if present | |
raw_content = raw_content.strip('```json').strip('```').strip() | |
# Parse response | |
button_titles = json.loads(raw_content) | |
if not isinstance(button_titles, list) or not all(isinstance(t, str) for t in button_titles): | |
logging.warning(f"Invalid Gemini response format: {raw_content}") | |
raise ValueError("Response is not a list of strings") | |
# Filter valid button titles | |
valid_buttons = [t.strip() for t in button_titles if t.strip() and 3 <= len(t.strip().split()) <= 8] | |
if not valid_buttons: | |
logging.warning("No valid button titles in response") | |
raise ValueError("No valid button titles") | |
# Create quick replies | |
quick_replies = [ | |
{ | |
"type": "postback", | |
"title": title, | |
"payload": f"goto_{next_interaction}_{i}", | |
"execute": next_interaction | |
} | |
for i, title in enumerate(valid_buttons[:2]) | |
] | |
logging.info(f"Generated quick replies: {quick_replies}") | |
return quick_replies | |
except Exception as e: | |
logging.warning(f"Attempt {attempt + 1} failed: {str(e)}") | |
if attempt < max_retries - 1: | |
await asyncio.sleep(1) # Wait before retrying | |
continue | |
# Fallback if all retries fail | |
logging.error("All retries failed for button generation") | |
return [ | |
{ | |
"type": "postback", | |
"title": "View Details", | |
"payload": f"goto_{next_interaction}", | |
"execute": next_interaction | |
} | |
] if next_interaction else [] | |
except Exception as e: | |
logging.error(f"Error generating dynamic buttons: {str(e)}") | |
return [ | |
{ | |
"type": "postback", | |
"title": "View Details", | |
"payload": f"goto_{next_interaction}", | |
"execute": next_interaction | |
} | |
] if next_interaction else [] | |
async def create_nativemsg_bot(rich_cards: List[Dict], url: str, bot_name: str, api_token: str) -> Dict: | |
"""Create a bot on NativeMSG with connected interactions based on rich cards.""" | |
try: | |
# Validate API token | |
if not api_token: | |
logging.error("NativeMSG API token not provided and not found in .env file.") | |
raise ValueError("NativeMSG API token is required.") | |
# Use provided bot name or default to dynamic name | |
final_bot_name = bot_name or f"Bot for {urlparse(url).netloc}" | |
# Prepare bot payload | |
interactions = [] | |
for idx, card in enumerate(rich_cards, 1): | |
# Build interaction using the original rich card structure | |
message = { | |
"text": f"{card['title']}\n\n{card['text']}", | |
"mediaType": "image", | |
"media": card.get("media", "") or "https://example.com/placeholder.jpg", | |
"richCard": { | |
"cardOrientation": "VERTICAL", | |
"mediaHeight": "MEDIUM" | |
}, | |
"buttons": card.get("buttons", []), | |
"quickReplies": card.get("quickReplies", []) | |
} | |
# Build interaction | |
interaction = { | |
"name": f"Interaction #{idx}", | |
"intents": ["show_content", f"content_{idx}"], | |
"actions": [ | |
{ | |
"send": { | |
"message": message | |
}, | |
"type": "RichCard", | |
"name": f"Send Rich Card #{idx}" | |
} | |
] | |
} | |
interactions.append(interaction) | |
# Add welcome interaction | |
welcome_message = { | |
"text": f"Welcome to the {urlparse(url).netloc} RCS Bot! Explore the latest content.", | |
"richCard": { | |
"cardOrientation": "VERTICAL", | |
"mediaHeight": "MEDIUM" | |
}, | |
"quickReplies": [ | |
{ | |
"type": "postback", | |
"title": "Start Exploring", | |
"payload": "start_exploring", | |
"execute": "Interaction #1" | |
} | |
] | |
} | |
welcome_interaction = { | |
"name": "Welcome Interaction", | |
"intents": ["start", "welcome"], | |
"actions": [ | |
{ | |
"send": { | |
"message": welcome_message | |
}, | |
"type": "RichCard", | |
"name": "Send Welcome Message" | |
} | |
] | |
} | |
interactions.insert(0, welcome_interaction) | |
payload = { | |
"name": final_bot_name, | |
"interactions": interactions | |
} | |
# Log the payload for debugging | |
logging.info(f"NativeMSG bot payload: {json.dumps(payload, indent=2)}") | |
# Send request to NativeMSG API | |
connection = http.client.HTTPSConnection("api.nativemsg.com") | |
headers = { | |
"Authorization": f"Bearer {api_token}", | |
"Content-Type": "application/json" | |
} | |
connection.request("POST", "/v1/bots", json.dumps(payload), headers) | |
response = connection.getresponse() | |
response_data = response.read().decode('utf-8') | |
logging.info(f"NativeMSG bot creation response: Status {response.status}, Data: {response_data}") | |
if response.status != 200: | |
logging.error(f"Failed to create bot: {response_data}") | |
raise HTTPException(status_code=500, detail=f"Failed to create bot: {response_data}") | |
return json.loads(response_data) | |
except Exception as e: | |
logging.error(f"Error creating NativeMSG bot: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Failed to create bot: {str(e)}") | |
async def crawl_website( | |
url: HttpUrl, | |
email: EmailStr, | |
bot_name: Optional[str] = None, | |
nativemsg_token: Optional[str] = None | |
): | |
"""Crawl a website, generate rich cards with dynamic buttons, create a NativeMSG bot, and send an email with a link to view the RCS.""" | |
try: | |
# Determine API token | |
api_token = nativemsg_token or os.getenv("NATIVEMSG_API_TOKEN") | |
# Scrape the website | |
visited = set() | |
to_visit = {str(url)} | |
base_domain = urlparse(str(url)).netloc | |
results = [] | |
while to_visit and len(visited) < 20: # Limited to 20 for demo | |
current_url = to_visit.pop() | |
if current_url in visited: | |
continue | |
visited.add(current_url) | |
logging.info(f"Scraping page: {current_url}") | |
page_data, new_links = await scrape_page(current_url, visited, base_domain) | |
if page_data: | |
logging.info(f"Scraped data: {page_data}") | |
summary = await quick_summarize(page_data["text"], page_data["url"]) | |
rich_card = build_rich_card(page_data, summary) | |
rich_card["title"] = summary.get("title", "News Summary") | |
rich_card["url"] = page_data.get("url", str(url)) | |
results.append(rich_card) | |
to_visit.update(new_links) | |
await asyncio.sleep(0.5) | |
if not results: | |
logging.error("No rich cards generated from scraping.") | |
raise HTTPException(status_code=400, detail="No content scraped from the provided URL.") | |
# Generate dynamic quick replies for each rich card | |
for idx, card in enumerate(results): | |
next_interaction = f"Interaction #{idx + 2}" if idx < len(results) - 1 else None | |
next_card = results[idx + 1] if idx < len(results) - 1 else None | |
dynamic_quick_replies = [] | |
if next_card: | |
dynamic_quick_replies = await generate_dynamic_buttons( | |
title=next_card.get("title", "News Summary"), | |
description=next_card.get("text", "Explore news and insights."), | |
url=next_card.get("url", ""), | |
next_interaction=next_interaction | |
) | |
else: | |
# Fallback for the last card to ensure it has a button | |
dynamic_quick_replies = await generate_dynamic_buttons( | |
title=card.get("title", "News Summary"), | |
description=card.get("text", "Explore news and insights."), | |
url=card.get("url", ""), | |
next_interaction=None | |
) | |
# Update the rich card's quickReplies | |
card["quickReplies"] = dynamic_quick_replies + [ | |
{ | |
"type": "call", | |
"title": "Contact Support", | |
"payload": "+12345678901" | |
} | |
] | |
# Create NativeMSG bot with the rich cards | |
bot_response = await create_nativemsg_bot(results, str(url), bot_name, api_token) | |
# Store the results | |
session_id = str(uuid.uuid4()) | |
sessions[session_id] = { | |
"rich_cards": results, | |
"bot_response": bot_response | |
} | |
logging.info(f"Session created with ID: {session_id}, Session data: {sessions[session_id]}") | |
# Generate the direct link to view the RCS | |
direct_link = f"https://aideveloper1-rcs.hf.space/view-rcs/{session_id}" | |
logging.info(f"Generated direct link: {direct_link}") | |
# Send email with the direct link | |
await send_rcs_email(email, direct_link) | |
logging.info(f"Final response: {results}, Bot: {bot_response}, Email sent to: {email}, Session ID: {session_id}") | |
return {"rich_cards": results, "bot_response": bot_response, "view_link": direct_link} | |
except Exception as e: | |
logging.error(f"Scraping or bot creation failed: {str(e)}") | |
raise HTTPException(status_code=500, detail=f"Error: {str(e)}") | |
async def view_rcs(session_id: str, request: Request): | |
"""Serve the RCS cards for a specific session ID.""" | |
logging.info(f"Attempting to access session with ID: {session_id}") | |
logging.info(f"Current sessions: {list(sessions.keys())}") | |
if session_id not in sessions: | |
logging.error(f"Session ID {session_id} not found in sessions.") | |
raise HTTPException(status_code=404, detail="Session not found.") | |
rich_cards = sessions[session_id]["rich_cards"] | |
logging.info(f"Retrieved session data for ID {session_id}: {rich_cards}") | |
return templates.TemplateResponse("rcs_view.html", {"request": request, "rich_cards": rich_cards}) | |
async def serve_home(request: Request): | |
"""Serve the frontend HTML page.""" | |
return templates.TemplateResponse("index.html", {"request": request}) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8001) |