Spaces:
Running
Running
import logging | |
from flask import request, jsonify, Blueprint | |
from services.horoscope_service import horoscope_service | |
from services.llm_service import llm_service | |
from services.scheduler_service import scheduler_service | |
from services.wordpress_service import wordpress_service | |
from utils.rate_limiter import RateLimiter | |
from models import db, Horoscope, ConsolidatedHoroscope, ScheduledJob, WordPressExport | |
from datetime import datetime, date | |
import json | |
logger = logging.getLogger(__name__) | |
# Create Blueprint | |
horoscope_bp = Blueprint('horoscope', __name__, url_prefix='/api/horoscope') | |
# API-wide rate limiter (10 requests per minute) | |
api_rate_limiter = RateLimiter(window_size=60, max_requests=10) | |
def health_check(): | |
"""Health check endpoint for horoscope API""" | |
return jsonify({ | |
"status": "ok", | |
"services": { | |
"horoscope_scraper": "up", | |
"llm": "up" if llm_service.api_key else "down", | |
"scheduler": "up" if scheduler_service.running else "down", | |
"wordpress": "up" if wordpress_service.is_configured else "down" | |
} | |
}) | |
def scrape_horoscope(): | |
"""Scrape horoscope for a specific sign""" | |
# Check rate limit | |
if not api_rate_limiter.can_proceed(): | |
return jsonify({ | |
"error": "Rate limit exceeded", | |
"wait_seconds": api_rate_limiter.get_wait_time() | |
}), 429 | |
# Record request for rate limiting | |
api_rate_limiter.record_request() | |
# Get parameters from request | |
data = request.get_json() | |
if not data: | |
return jsonify({"error": "Missing request data"}), 400 | |
sign = data.get('sign') | |
source = data.get('source') | |
date_str = data.get('date') | |
if not sign: | |
return jsonify({"error": "Missing 'sign' parameter"}), 400 | |
if sign.lower() not in horoscope_service.scrapers["astrology.com"].ZODIAC_SIGNS: | |
return jsonify({"error": f"Invalid zodiac sign: {sign}"}), 400 | |
# If source is specified, check if it's valid | |
if source and source not in horoscope_service.scrapers: | |
return jsonify({"error": f"Unknown source: {source}"}), 400 | |
# Scrape from all sources or the specified one | |
if source: | |
result = horoscope_service.scrape_sign(source, sign, date_str) | |
else: | |
result = horoscope_service.scrape_sign_from_all_sources(sign, date_str) | |
return jsonify(result) | |
def scrape_all_horoscopes(): | |
"""Scrape horoscopes for all signs from all sources""" | |
# Check rate limit | |
if not api_rate_limiter.can_proceed(): | |
return jsonify({ | |
"error": "Rate limit exceeded", | |
"wait_seconds": api_rate_limiter.get_wait_time() | |
}), 429 | |
# Record request for rate limiting | |
api_rate_limiter.record_request() | |
# Get date from request | |
data = request.get_json() or {} | |
date_str = data.get('date') | |
# Scrape all horoscopes | |
results = horoscope_service.scrape_all_horoscopes(date_str) | |
return jsonify({"results": results}) | |
def get_horoscope(sign): | |
"""Get horoscope for a specific sign""" | |
# Check if sign is valid | |
if sign.lower() not in horoscope_service.scrapers["astrology.com"].ZODIAC_SIGNS: | |
return jsonify({"error": f"Invalid zodiac sign: {sign}"}), 400 | |
# Get optional parameters | |
date_str = request.args.get('date') | |
source = request.args.get('source') | |
# Get horoscope | |
result = horoscope_service.get_horoscope(sign, date_str, source) | |
return jsonify(result) | |
def get_all_horoscopes(): | |
"""Get horoscopes for all signs for a specific date""" | |
# Get date parameter | |
date_str = request.args.get('date') | |
# Get horoscopes | |
result = horoscope_service.get_horoscopes_for_date(date_str) | |
return jsonify(result) | |
def consolidate_horoscope(sign): | |
"""Consolidate horoscopes for a specific sign using LLM""" | |
# Check rate limit | |
if not api_rate_limiter.can_proceed(): | |
return jsonify({ | |
"error": "Rate limit exceeded", | |
"wait_seconds": api_rate_limiter.get_wait_time() | |
}), 429 | |
# Record request for rate limiting | |
api_rate_limiter.record_request() | |
# Check if sign is valid | |
if sign.lower() not in horoscope_service.scrapers["astrology.com"].ZODIAC_SIGNS: | |
return jsonify({"error": f"Invalid zodiac sign: {sign}"}), 400 | |
# Get date from request | |
data = request.get_json() or {} | |
date_str = data.get('date') | |
# Parse date | |
if date_str: | |
try: | |
horoscope_date = datetime.strptime(date_str, '%Y-%m-%d').date() | |
except ValueError: | |
return jsonify({"error": f"Invalid date format: {date_str}. Use YYYY-MM-DD."}), 400 | |
else: | |
horoscope_date = date.today() | |
# Get horoscopes for the sign and date | |
horoscopes = Horoscope.query.filter_by( | |
sign=sign.lower(), | |
date=horoscope_date | |
).all() | |
if not horoscopes: | |
# Try to scrape if no horoscopes found | |
horoscope_service.scrape_sign_from_all_sources(sign, date_str) | |
# Check again | |
horoscopes = Horoscope.query.filter_by( | |
sign=sign.lower(), | |
date=horoscope_date | |
).all() | |
if not horoscopes: | |
return jsonify({"error": f"No horoscopes found for {sign} on {horoscope_date}"}), 404 | |
# Check if already consolidated | |
existing = ConsolidatedHoroscope.query.filter_by( | |
sign=sign.lower(), | |
date=horoscope_date | |
).first() | |
if existing: | |
return jsonify({ | |
"message": f"Horoscope for {sign} on {horoscope_date} already consolidated", | |
"horoscope": existing.to_dict() | |
}) | |
# Convert to format needed by LLM service | |
horoscope_data = [h.to_dict() for h in horoscopes] | |
# Consolidate data using LLM | |
consolidated = llm_service.consolidate_horoscopes(horoscope_data) | |
if not consolidated or "error" in consolidated: | |
return jsonify({ | |
"error": f"Error consolidating horoscopes: {consolidated.get('error', 'Unknown error')}" | |
}), 500 | |
# Create new consolidated horoscope | |
sources = [h.source for h in horoscopes] | |
new_consolidated = ConsolidatedHoroscope() | |
new_consolidated.sign = sign.lower() | |
new_consolidated.date = horoscope_date | |
new_consolidated.consolidated_prediction = consolidated.get("consolidated_prediction", "") | |
new_consolidated.sources = json.dumps(sources) | |
db.session.add(new_consolidated) | |
db.session.commit() | |
return jsonify({ | |
"message": f"Consolidated horoscope created for {sign} on {horoscope_date}", | |
"horoscope": new_consolidated.to_dict() | |
}) | |
def consolidate_all_horoscopes(): | |
"""Consolidate horoscopes for all signs using LLM""" | |
# Check rate limit | |
if not api_rate_limiter.can_proceed(): | |
return jsonify({ | |
"error": "Rate limit exceeded", | |
"wait_seconds": api_rate_limiter.get_wait_time() | |
}), 429 | |
# Record request for rate limiting | |
api_rate_limiter.record_request() | |
# Get date from request | |
data = request.get_json() or {} | |
date_str = data.get('date') | |
# Parse date | |
if date_str: | |
try: | |
horoscope_date = datetime.strptime(date_str, '%Y-%m-%d').date() | |
except ValueError: | |
return jsonify({"error": f"Invalid date format: {date_str}. Use YYYY-MM-DD."}), 400 | |
else: | |
horoscope_date = date.today() | |
# Get all zodiac signs | |
signs = horoscope_service.scrapers["astrology.com"].ZODIAC_SIGNS | |
results = {} | |
for sign in signs: | |
# Get horoscopes for the sign and date | |
horoscopes = Horoscope.query.filter_by( | |
sign=sign, | |
date=horoscope_date | |
).all() | |
if not horoscopes: | |
results[sign] = {"status": "skipped", "message": "No horoscopes found"} | |
continue | |
# Check if already consolidated | |
existing = ConsolidatedHoroscope.query.filter_by( | |
sign=sign, | |
date=horoscope_date | |
).first() | |
if existing: | |
results[sign] = {"status": "exists", "message": "Already consolidated"} | |
continue | |
# Convert to format needed by LLM service | |
horoscope_data = [h.to_dict() for h in horoscopes] | |
# Consolidate data using LLM | |
consolidated = llm_service.consolidate_horoscopes(horoscope_data) | |
if not consolidated or "error" in consolidated: | |
results[sign] = { | |
"status": "error", | |
"message": f"Error: {consolidated.get('error', 'Unknown error')}" | |
} | |
continue | |
# Create new consolidated horoscope | |
sources = [h.source for h in horoscopes] | |
new_consolidated = ConsolidatedHoroscope() | |
new_consolidated.sign = sign | |
new_consolidated.date = horoscope_date | |
new_consolidated.consolidated_prediction = consolidated.get("consolidated_prediction", "") | |
new_consolidated.sources = json.dumps(sources) | |
db.session.add(new_consolidated) | |
results[sign] = {"status": "success", "message": "Consolidated successfully"} | |
db.session.commit() | |
return jsonify({ | |
"message": f"Consolidated horoscopes for {horoscope_date}", | |
"results": results | |
}) | |
def publish_to_wordpress(horoscope_id): | |
"""Publish a consolidated horoscope to WordPress""" | |
# Check rate limit | |
if not api_rate_limiter.can_proceed(): | |
return jsonify({ | |
"error": "Rate limit exceeded", | |
"wait_seconds": api_rate_limiter.get_wait_time() | |
}), 429 | |
# Record request for rate limiting | |
api_rate_limiter.record_request() | |
# Check if WordPress is configured | |
if not wordpress_service.is_configured: | |
return jsonify({"error": "WordPress API not configured"}), 500 | |
# Get the consolidated horoscope | |
horoscope = ConsolidatedHoroscope.query.get(horoscope_id) | |
if not horoscope: | |
return jsonify({"error": f"Horoscope with ID {horoscope_id} not found"}), 404 | |
# Check if already published | |
existing_export = WordPressExport.query.filter_by(horoscope_id=horoscope_id).first() | |
if existing_export: | |
return jsonify({ | |
"message": f"Horoscope already published to WordPress", | |
"export": existing_export.to_dict() | |
}) | |
# Publish to WordPress | |
result = wordpress_service.publish_horoscope(horoscope) | |
if not result or not result.get("success", False): | |
return jsonify({ | |
"error": f"Error publishing to WordPress: {result.get('error', 'Unknown error')}" | |
}), 500 | |
# Create export record | |
export = WordPressExport() | |
export.horoscope_id = horoscope_id | |
export.wordpress_post_id = result.get("post_id") | |
export.wordpress_url = result.get("url") | |
export.status = "published" | |
db.session.add(export) | |
db.session.commit() | |
return jsonify({ | |
"message": f"Published horoscope to WordPress", | |
"export": export.to_dict() | |
}) | |
def get_schedules(): | |
"""Get list of scheduled jobs""" | |
jobs = scheduler_service.get_all_jobs() | |
return jsonify({"jobs": jobs}) | |
def add_schedule(): | |
"""Add a new scheduled job""" | |
# Get parameters from request | |
data = request.get_json() | |
if not data: | |
return jsonify({"error": "Missing request data"}), 400 | |
name = data.get('name') | |
frequency = data.get('frequency') | |
if not name or not frequency: | |
return jsonify({"error": "Missing 'name' or 'frequency' parameter"}), 400 | |
# Add job | |
success = scheduler_service.add_job(name, frequency) | |
if success: | |
return jsonify({"message": f"Added job '{name}' with frequency '{frequency}'"}) | |
else: | |
return jsonify({"error": f"Failed to add job '{name}'"}), 500 | |
def remove_schedule(name): | |
"""Remove a scheduled job""" | |
# Remove job | |
success = scheduler_service.remove_job(name) | |
if success: | |
return jsonify({"message": f"Removed job '{name}'"}) | |
else: | |
return jsonify({"error": f"Failed to remove job '{name}'"}), 500 | |
def test_wordpress(): | |
"""Test WordPress connection""" | |
result = wordpress_service.test_connection() | |
if result.get("success", False): | |
return jsonify(result) | |
else: | |
return jsonify(result), 500 | |
# Register LLM method for horoscope consolidation | |
def consolidate_horoscopes(horoscope_data): | |
"""Consolidate multiple horoscope predictions using LLM""" | |
if not horoscope_data: | |
return {"error": "No horoscope data provided"} | |
try: | |
# Prepare data for LLM | |
sign = horoscope_data[0].get("sign", "unknown") | |
date_str = horoscope_data[0].get("date", "unknown date") | |
sources_text = "" | |
for i, data in enumerate(horoscope_data, 1): | |
source = data.get("source", "Unknown Source") | |
prediction = data.get("prediction", "No prediction available") | |
sources_text += f"SOURCE {i} ({source}):\n" | |
sources_text += f"Prediction: {prediction}\n\n" | |
# Create prompt for consolidation | |
prompt = f""" | |
Please analyze and consolidate these daily horoscope predictions for {sign.upper()} for {date_str}. | |
{sources_text} | |
Create a single, coherent daily horoscope prediction that synthesizes the information from all sources. | |
Focus on the common themes and advice while maintaining the mystical and guiding tone typical of horoscopes. | |
The response should be 2-3 paragraphs long and should NOT mention the sources or that it's a consolidation. | |
Respond with JSON in this format: | |
{{ | |
"consolidated_prediction": "The consolidated horoscope text..." | |
}} | |
""" | |
# Call OpenAI API | |
response = llm_service.client.chat.completions.create( | |
model=llm_service.model_name, | |
messages=[ | |
{"role": "system", "content": "You are an expert astrologer specializing in synthesizing horoscope predictions."}, | |
{"role": "user", "content": prompt} | |
], | |
response_format={"type": "json_object"}, | |
temperature=0.7 | |
) | |
# Parse the response | |
result = json.loads(response.choices[0].message.content) | |
return result | |
except Exception as e: | |
logger.error(f"Error consolidating horoscopes with LLM: {str(e)}") | |
return {"error": f"Failed to consolidate horoscopes: {str(e)}"} | |
# Add custom LLM method to llm_service | |
llm_service.consolidate_horoscopes = consolidate_horoscopes |