last_edit / server /api.py
Moharek
Deploy Moharek GEO Platform
a74b879
import os
import json
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from pathlib import Path
import datetime
import sqlite3
from typing import Optional, List
from dotenv import load_dotenv
load_dotenv(override=True)
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
print(" GEO Platform API starting...")
print(f" Working directory: {os.getcwd()}")
print(f" Python version: {os.sys.version}")
# lazily import heavy pipeline to avoid startup failures when optional deps
# (like spaCy) are not installed. import inside handlers that need it.
run_pipeline = None
try:
from server import ai_analysis
print(" ai_analysis loaded (from server)")
except Exception as e:
print(f" ai_analysis failed: {e}")
ai_analysis = None
try:
from server import geo_services
print(" geo_services loaded")
except Exception as e:
print(f" geo_services failed: {e}")
geo_services = None
try:
from server import ai_visibility
print(" ai_visibility loaded")
except Exception as e:
print(f" ai_visibility failed: {e}")
ai_visibility = None
try:
from server import job_queue
print(" job_queue loaded")
except Exception as e:
print(f" job_queue failed: {e}")
job_queue = None
try:
from server import keyword_engine
print(" keyword_engine loaded")
except Exception as e:
print(f" keyword_engine failed: {e}")
keyword_engine = None
try:
from server import users as user_mgmt
print(" users loaded")
except Exception as e:
print(f" users failed: {e}")
user_mgmt = None
try:
from server import search_intel
print(" search_intel loaded")
except Exception as e:
print(f" search_intel failed: {e}")
search_intel = None
try:
from server import payments as _payments
except Exception:
_payments = None
try:
from server import onboarding
except Exception:
onboarding = None
from fastapi import WebSocket
import asyncio
OUTPUT_DIR = Path(os.environ.get('OUTPUT_DIR', str(Path(__file__).resolve().parent.parent / 'output')))
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
print(f" Output directory: {OUTPUT_DIR}")
app = FastAPI(title='GEO Platform API')
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
# Rate limiting
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
print(" FastAPI app created with rate limiting and CORS")
# Serve frontend static files
frontend_dir = Path(__file__).resolve().parent.parent / 'frontend'
if frontend_dir.exists():
app.mount('/static', StaticFiles(directory=str(frontend_dir)), name='static')
print(f" Frontend mounted: {frontend_dir}")
else:
print(f" Frontend directory not found: {frontend_dir}")
class CrawlRequest(BaseModel):
url: str
org_name: str
org_url: str
max_pages: int = 3
runs: int = 1
api_keys: Optional[dict] = None
class RecommendationRequest(BaseModel):
api_keys: Optional[dict] = None
job_id: Optional[int] = None
extra_context: Optional[dict] = None
class AnalysisRequest(BaseModel):
api_keys: Optional[dict] = None
job_id: Optional[int] = None
class SimulationRequest(BaseModel):
content: str
brand: str
api_keys: Optional[dict] = None
@app.post('/api/crawl')
async def api_crawl(req: CrawlRequest):
global run_pipeline
if run_pipeline is None:
try:
from src.main import run_pipeline as _rp
run_pipeline = _rp
except Exception as e:
return JSONResponse({'ok': False, 'error': 'pipeline not available: ' + str(e)}, status_code=500)
# runs: perform multiple runs and average scores, snapshot first run
runs = max(1, req.runs or 1)
run_results = []
timestamps = []
scores = []
breakdowns = []
audit_objs = []
try:
for i in range(runs):
ts = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
subdir = OUTPUT_DIR / f"run-{ts}-{i+1}"
res = run_pipeline(req.url, req.org_name, req.org_url, max_pages=req.max_pages, output_dir=subdir)
# load created audit
audit_path = Path(res['audit_path'])
with open(audit_path, 'r', encoding='utf-8') as f:
audit_obj = json.load(f)
audit_objs.append(audit_obj)
# Infer brand name if generic for visibility check
org_name_for_visibility = req.org_name
if not org_name_for_visibility or org_name_for_visibility == 'Company':
pages_for_inference = audit_obj.get('pages', [])
inferred = ai_analysis.infer_brand_name(pages_for_inference)
if inferred and inferred != 'Company':
org_name_for_visibility = inferred
# Update audit_obj with inferred name for consistency
audit_obj['org_name'] = inferred
# run AI visibility (Perplexity) for this run and attach; fallback to OpenAI visibility
queries = [f"What is {org_name_for_visibility}?", f"Best services for {org_name_for_visibility}", f"Why should I buy from {org_name_for_visibility}?"]
perf = ai_visibility.check_perplexity(org_name_for_visibility, queries)
if not perf.get('enabled') and perf.get('reason') == 'PERPLEXITY_KEY not set':
# try OpenAI fallback
try:
perf = ai_visibility.check_openai_visibility(org_name_for_visibility, queries)
perf['fallback'] = 'openai'
except Exception:
pass
# 🔷 ADDED: Deep Sentiment & Context Analysis (The "Zaher" Gap)
try:
sentiment = geo_services.sentiment_analysis(org_name_for_visibility, queries, api_keys=req.api_keys)
perf['sentiment_analysis'] = sentiment
except Exception:
pass
audit_obj['ai_visibility'] = perf
# overwrite audit with ai visibility included
with open(audit_path, 'w', encoding='utf-8') as f:
json.dump(audit_obj, f, ensure_ascii=False, indent=2)
# compute geo score for this run
score = ai_analysis.compute_geo_score(audit_obj.get('pages', []), audit=audit_obj, ai_visibility=perf)
scores.append(score['score'])
breakdowns.append(score['breakdown'])
run_results.append({ 'ts': ts, 'audit_path': str(audit_path), 'geo_score': score })
timestamps.append(ts)
# snapshot first run pages for deterministic reference
snap_ts = timestamps[0]
snap_src = Path(run_results[0]['audit_path'])
snap_dst = OUTPUT_DIR / f"snapshot-{snap_ts}.json"
with open(snap_src, 'r', encoding='utf-8') as fsrc, open(snap_dst, 'w', encoding='utf-8') as fdst:
fdst.write(fsrc.read())
# Update the main audit/analysis files so UI endpoints (`/api/results`) read latest data
try:
# copy snapshot to output/audit.json
audit_main = OUTPUT_DIR / 'audit.json'
with open(snap_dst, 'r', encoding='utf-8') as s, open(audit_main, 'w', encoding='utf-8') as m:
m.write(s.read())
# generate and save aggregated analysis (OpenAI/Groq) and geo_score based on snapshot
try:
with open(audit_main, 'r', encoding='utf-8') as f:
audit_obj = json.load(f)
pages = audit_obj.get('pages', [])
analysis = ai_analysis.analyze_pages(pages)
geo_score = ai_analysis.compute_geo_score(pages, audit=audit_obj, ai_visibility=audit_obj.get('ai_visibility'))
analysis_out = { 'analysis': analysis, 'geo_score': geo_score }
with open(OUTPUT_DIR / 'analysis.json', 'w', encoding='utf-8') as fa:
json.dump(analysis_out, fa, ensure_ascii=False, indent=2)
# also save a top-level analysis.json for backwards compatibility
except Exception:
pass
except Exception:
pass
# compute aggregated stats
import statistics
mean_score = int(round(statistics.mean(scores)))
median_score = int(round(statistics.median(scores)))
variance = float(statistics.pstdev(scores))
history_path = OUTPUT_DIR / 'history.json'
history = []
if history_path.exists():
try:
history = json.loads(history_path.read_text(encoding='utf-8'))
except Exception:
history = []
entry = {
'timestamp': timestamps[0],
'url': req.url,
'org_name': req.org_name,
'runs': runs,
'scores': scores,
'mean': mean_score,
'median': median_score,
'variance': variance,
'runs_info': run_results
}
history.append(entry)
history_path.write_text(json.dumps(history, ensure_ascii=False, indent=2), encoding='utf-8')
return { 'ok': True, 'message': 'crawl completed', 'mean_score': mean_score, 'median_score': median_score, 'variance': variance, 'history_entry': entry }
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class JobRequest(BaseModel):
url: str
org_name: str
org_url: str
max_pages: int = 3
runs: int = 1
industry_override: Optional[str] = None
@app.post('/api/jobs')
# @limiter.limit("30/minute") # Rate limiting disabled for testing
async def api_enqueue(job: JobRequest, background_tasks: BackgroundTasks, request: Request):
try:
# ── Usage limit check ── TEMPORARILY DISABLED FOR DEVELOPMENT
user_id = None
try:
auth = request.headers.get('authorization', '')
token = auth.split(' ', 1)[1].strip()
user_id = user_mgmt.verify_token(token)
except Exception:
pass
# TEMPORARILY DISABLED: Allow unlimited access during development
# TODO: Re-enable usage limits later
# if user_id:
# # Check trial limit first
# if onboarding:
# trial_status = onboarding.get_trial_status(user_id)
# subscription = _payments.get_subscription(user_id)
#
# # If on free plan and trial expired, show pricing
# if subscription.get('plan') == 'free' and trial_status['trial_expired']:
# return JSONResponse({
# 'ok': False,
# 'error': 'انتهت فترة التجربة المجانية. يرجى الترقية إلى خطة مدفوعة.',
# 'trial_expired': True,
# 'show_pricing': True,
# 'plan': 'free'
# }, status_code=429)
#
# # Check if free user has used their one try
# if subscription.get('plan') == 'free' and trial_status['tries_remaining'] <= 0:
# return JSONResponse({
# 'ok': False,
# 'error': 'لقد استخدمت محاولتك المجانية. يرجى الترقية للحصول على المزيد من التحليلات.',
# 'trial_exhausted': True,
# 'show_pricing': True,
# 'plan': 'free',
# 'tries_used': trial_status['tries_used']
# }, status_code=429)
#
# limit_check = _payments.check_usage_limit(user_id, 'crawls')
# if not limit_check['allowed']:
# plan = _payments.get_subscription(user_id).get('plan', 'free')
# return JSONResponse({
# 'ok': False,
# 'error': f'لقد استنفدت حد التحليلات الشهري ({limit_check["limit"]} تحليل). يرجى الترقية إلى خطة أعلى.',
# 'limit_reached': True,
# 'plan': plan,
# 'used': limit_check['used'],
# 'limit': limit_check['limit']
# }, status_code=429)
jid = job_queue.enqueue_job(
job.url,
job.org_name,
job.org_url,
job.max_pages,
job.runs,
industry_override=job.industry_override
)
# Track usage - TEMPORARILY DISABLED
# TODO: Re-enable usage tracking later
# if user_id:
# try:
# _payments.track_usage(user_id, 'crawls')
# # Use trial attempt for free users
# if onboarding:
# subscription = _payments.get_subscription(user_id)
# if subscription.get('plan') == 'free':
# onboarding.use_trial(user_id)
# except Exception:
# pass
# Dispatch background task immediately
from server.worker import process_job
job_data = job_queue.get_job(jid)
if job_data:
background_tasks.add_task(process_job, job_data)
return {'ok': True, 'job_id': jid}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/jobs')
async def api_list_jobs():
try:
data = job_queue.list_jobs()
return {'ok': True, 'jobs': data}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/jobs/{job_id}')
async def api_get_job(job_id: int):
try:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404)
# parse progress JSON
try:
job['progress'] = json.loads(job.get('progress') or '{}')
except Exception:
job['progress'] = {}
return {'ok': True, 'job': job}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.websocket('/ws/jobs/{job_id}')
async def ws_job_progress(ws: WebSocket, job_id: int):
await ws.accept()
last_updated = None
try:
while True:
job = job_queue.get_job(job_id)
if not job:
# send a not-found once then keep waiting in case job appears
try:
await ws.send_json({'ok': False, 'error': 'job not found'})
except Exception:
pass
await asyncio.sleep(1)
continue
# normalize progress and datetime fields for JSON
try:
prog = job.get('progress')
if isinstance(prog, str):
try:
job['progress'] = json.loads(prog or '{}')
except Exception:
job['progress'] = {}
else:
job['progress'] = prog or {}
except Exception:
job['progress'] = {}
# convert timestamps to ISO strings
for dt_key in ('created_at', 'updated_at'):
try:
val = job.get(dt_key)
if hasattr(val, 'isoformat'):
job[dt_key] = val.isoformat()
else:
job[dt_key] = str(val) if val is not None else None
except Exception:
job[dt_key] = str(job.get(dt_key))
updated = job.get('updated_at')
if updated != last_updated:
last_updated = updated
try:
await ws.send_json({'ok': True, 'job': job})
except Exception:
# if sending fails (e.g., client gone), break loop
break
await asyncio.sleep(1)
finally:
try:
await ws.close()
except Exception:
pass
@app.get('/api/jobs/{job_id}/results')
async def api_job_results(job_id: int):
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'no results yet'}, status_code=400)
out = {'ok': True, 'job_id': job_id}
audit_path = Path(result_path) / 'audit.json'
analysis_path = Path(result_path) / 'analysis.json'
schema_path = Path(result_path) / 'schema.jsonld'
if audit_path.exists():
out['audit'] = json.loads(audit_path.read_text(encoding='utf-8'))
if analysis_path.exists():
out['analysis'] = json.loads(analysis_path.read_text(encoding='utf-8'))
if schema_path.exists():
out['schema'] = schema_path.read_text(encoding='utf-8')
return out
@app.get('/api/results')
async def api_results(ts: str | None = None):
from server.cache_manager import invalidate_results_cache
if ts:
snapshot_path = OUTPUT_DIR / f'snapshot-{ts}.json'
if snapshot_path.exists():
try:
result = {'audit': json.loads(snapshot_path.read_text(encoding='utf-8'))}
return JSONResponse(result, headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"})
except Exception as e:
return JSONResponse({'ok': False, 'error': f'Failed to load snapshot: {str(e)}'}, status_code=500)
audit_path = OUTPUT_DIR / 'audit.json'
schema_path = OUTPUT_DIR / 'schema.jsonld'
out = {}
if audit_path.exists():
out['audit'] = json.loads(audit_path.read_text(encoding='utf-8'))
if schema_path.exists():
out['schema'] = schema_path.read_text(encoding='utf-8')
return JSONResponse(out, headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"})
@app.on_event("startup")
async def startup_event():
print(" GEO Platform API is ready!")
print(f" Access at: http://0.0.0.0:7860")
print(f" Health check: http://0.0.0.0:7860/health")
try:
admin_email = os.environ.get('ADMIN_EMAIL', 'admin@moharek.com')
admin_pass = os.environ.get('ADMIN_PASSWORD', 'moharek2025')
users = user_mgmt.list_users()
if not any(u.get('email') == admin_email for u in users):
uid = user_mgmt.create_user(
email=admin_email,
password=admin_pass,
role='admin'
)
print(f" Seeded default admin user (id={uid})")
except Exception as e:
print(f" Seed user error: {e}")
@app.get('/api/health')
@app.get('/health')
async def health_check():
return {'status': 'healthy', 'service': 'GEO Platform'}
@app.get('/')
async def index():
index_file = frontend_dir / 'index.html'
if index_file.exists():
return FileResponse(str(index_file), headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"})
return {'ok': True}
@app.get('/{file_path:path}', include_in_schema=False)
async def serve_static(file_path: str):
"""Serve static files including SVG, CSS, JS, etc."""
if file_path.startswith('api/'):
return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404)
file_full_path = frontend_dir / file_path
try:
file_full_path = file_full_path.resolve()
if not str(file_full_path).startswith(str(frontend_dir.resolve())):
return JSONResponse({'ok': False, 'error': 'forbidden'}, status_code=403)
except Exception:
return JSONResponse({'ok': False, 'error': 'invalid path'}, status_code=400)
if file_full_path.exists() and file_full_path.is_file():
media_type = 'application/octet-stream'
if file_path.endswith('.svg'):
media_type = 'image/svg+xml'
elif file_path.endswith('.css'):
media_type = 'text/css'
elif file_path.endswith('.js'):
media_type = 'application/javascript'
elif file_path.endswith('.html'):
media_type = 'text/html'
elif file_path.endswith('.json'):
media_type = 'application/json'
elif file_path.endswith('.png'):
media_type = 'image/png'
elif file_path.endswith('.jpg') or file_path.endswith('.jpeg'):
media_type = 'image/jpeg'
elif file_path.endswith('.gif'):
media_type = 'image/gif'
elif file_path.endswith('.webp'):
media_type = 'image/webp'
return FileResponse(str(file_full_path), media_type=media_type, headers={"Cache-Control": "public, max-age=3600"})
if not file_path.endswith('.html'):
html_path = frontend_dir / f'{file_path}.html'
if html_path.exists():
return FileResponse(str(html_path), media_type='text/html', headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"})
return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404)
@app.get('/api/jobs/{job_id}/report')
async def api_job_report(job_id: int):
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400)
from server import reports
html = reports.build_html_report(result_path)
return JSONResponse({'ok': True, 'report_html': html})
@app.get('/api/jobs/{job_id}/report.pdf')
async def api_job_report_pdf(job_id: int):
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400)
try:
from server import reports
html = reports.build_html_report(result_path)
out_pdf = Path(result_path) / f'report-{job_id}.pdf'
ok = reports.try_render_pdf(html, out_pdf)
if ok and out_pdf.exists():
return FileResponse(str(out_pdf), media_type='application/pdf', filename=f'report-{job_id}.pdf')
except Exception as e:
print(f'PDF generation error: {e}')
return JSONResponse({'ok': False, 'error': 'pdf_generation_failed'}, status_code=500)
@app.get('/api/jobs/{job_id}/keywords')
async def api_job_keywords(job_id: int, enrich: bool = False, analytics: bool = False):
try:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'audit.json not found in result_path'}, status_code=404)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
result = keyword_engine.extract_keywords_from_audit(audit, top_n=40, enrich=enrich, analytics=analytics)
if analytics:
return {'ok': True, 'analytics': result}
else:
return {'ok': True, 'keywords': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/jobs/{job_id}/serp')
async def api_job_serp(job_id: int, gl: str = 'sa', hl: str = 'ar'):
"""Full SERP report via ScrapingBee (rank check, PAA, local, competitors)."""
try:
from server import scrapingbee_client
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'audit.json not found'}, status_code=404)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
site_url = job.get('url', '')
kws_raw = keyword_engine.extract_keywords_from_audit(audit, top_n=8)
keywords = [k['kw'] if isinstance(k, dict) else k for k in kws_raw[:8]]
report = scrapingbee_client.full_serp_report(site_url, keywords, country_code=gl, language=hl)
return report
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/jobs/{job_id}/actions')
async def api_job_actions(job_id: int):
try:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'audit.json not found in result_path'}, status_code=404)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
from server import action_engine
actions = action_engine.generate_action_plan(audit)
return actions
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/actions/save')
async def api_save_actions(request: Request):
try:
data = await request.json()
job_id = data.get('job_id')
actions = data.get('actions', [])
from server.db import SessionLocal, Action
db = SessionLocal()
try:
saved = []
for act in actions:
new_act = Action(
job_id=job_id,
type=act.get('type', 'general'),
task=act.get('task', ''),
priority_score=100 if str(act.get('priority')).lower() == 'high' else (50 if str(act.get('priority')).lower() == 'medium' else 10),
status='pending'
)
db.add(new_act)
db.flush()
saved.append(new_act.id)
db.commit()
return {'ok': True, 'saved_ids': saved}
finally:
db.close()
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/actions/list')
async def api_list_actions():
try:
from server.db import SessionLocal, Action
db = SessionLocal()
try:
items = db.query(Action).order_by(Action.priority_score.desc()).limit(50).all()
actions = []
for i in items:
action_dict = {
'id': i.id,
'job_id': i.job_id,
'type': i.type,
'task': i.task,
'priority_score': i.priority_score,
'status': i.status,
'result': i.result,
'created_at': str(i.created_at),
'impact': 'high' if i.priority_score >= 80 else 'medium' if i.priority_score >= 40 else 'low',
'effort': 'low',
'timeline': '1-2 اسابيع'
}
if hasattr(i, 'initial_metrics') and i.initial_metrics:
action_dict['initial_metrics'] = i.initial_metrics
if hasattr(i, 'latest_metrics') and i.latest_metrics:
action_dict['latest_metrics'] = i.latest_metrics
if hasattr(i, 'impact_score') and i.impact_score:
action_dict['impact_score'] = i.impact_score
actions.append(action_dict)
return {'ok': True, 'actions': actions}
finally:
db.close()
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/mentions/list')
async def api_list_mentions():
try:
from server.db import SessionLocal, Mention
db = SessionLocal()
try:
items = db.query(Mention).order_by(Mention.created_at.desc()).limit(50).all()
return {'ok': True, 'mentions': [{
'id': i.id, 'brand': i.brand, 'source': i.source,
'content': i.content, 'url': i.url,
'sentiment_score': i.sentiment_score, 'created_at': str(i.created_at)
} for i in items]}
finally:
db.close()
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/graph/entities')
async def api_list_entities():
try:
from server.db import SessionLocal, Entity
db = SessionLocal()
try:
items = db.query(Entity).order_by(Entity.roi_score.desc()).limit(100).all()
return {'ok': True, 'entities': [{
'id': i.id, 'name': i.name, 'type': i.type, 'roi_score': i.roi_score
} for i in items]}
finally:
db.close()
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/ai_visibility/radar')
async def api_ai_radar(job_id: int):
try:
from server import ai_radar
from server.db import SessionLocal, AIResponse, AIPrompt
db = SessionLocal()
try:
responses = db.query(AIResponse).join(AIPrompt).filter(AIPrompt.job_id == job_id).order_by(AIResponse.created_at.desc()).limit(10).all()
score = ai_radar.calculate_overall_visibility_score(job_id)
return {
'ok': True,
'overall_score': score,
'latest_mentions': [{
'prompt': r.prompt_id, # Simplified
'mentioned': r.mention_found,
'model': r.model_name,
'competitors': r.competitors_mentioned
} for r in responses]
}
finally:
db.close()
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/autopilot/toggle')
async def api_toggle_autopilot(job_id: int, enabled: bool):
try:
from server.db import SessionLocal, ProjectSettings
db = SessionLocal()
try:
settings = db.query(ProjectSettings).filter(ProjectSettings.job_id == job_id).first()
if not settings:
settings = ProjectSettings(job_id=job_id, autopilot_enabled=enabled)
db.add(settings)
else:
settings.autopilot_enabled = enabled
db.commit()
return {'ok': True, 'autopilot_enabled': enabled}
finally:
db.close()
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/strategy/summary')
async def api_strategy_summary(job_id: int):
try:
from server import growth_os
summary = growth_os.get_growth_strategy(job_id)
return {'ok': True, 'summary': summary}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/actions/{action_id}/execute')
async def api_execute_action(action_id: int):
try:
from server import executor
from server.db import SessionLocal, Action
db = SessionLocal()
try:
act = db.query(Action).filter(Action.id == action_id).first()
if act:
executor.dispatch_action(action_id, act.type)
finally:
db.close()
return {'ok': True, 'action_id': action_id, 'message': 'Dispatched successfully'}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class UserRegister(BaseModel):
email: str
password: str
company_id: int | None = None
class CompanyRegister(BaseModel):
name: str
domain: str | None = None
class LoginRequest(BaseModel):
email: str
password: str
@app.post('/api/companies/register')
async def api_register_company(req: CompanyRegister):
try:
cid = user_mgmt.create_company(req.name, req.domain)
return {'ok': True, 'company_id': cid}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/users/register')
async def api_register_user(req: UserRegister):
try:
uid = user_mgmt.create_user(req.email, req.password, company_id=req.company_id)
token = user_mgmt.make_token(uid)
user = user_mgmt.get_user(uid)
# Initialize trial and onboarding for new user
if onboarding:
onboarding.init_trial(uid)
onboarding.init_onboarding(uid)
return {'ok': True, 'user_id': uid, 'token': token, 'user': user}
except sqlite3.IntegrityError:
return JSONResponse({'ok': False, 'error': 'البريد الإلكتروني مسجل بالفعل'}, status_code=400)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/users/login')
async def api_login(req: LoginRequest):
try:
user = user_mgmt.authenticate_user(req.email, req.password)
if not user:
return JSONResponse({'ok': False, 'error': 'invalid credentials'}, status_code=401)
token = user_mgmt.make_token(user['id'])
return {'ok': True, 'token': token, 'user': user}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/users/me')
async def api_me(request: Request):
auth = request.headers.get('authorization') or request.headers.get('Authorization')
if not auth or not auth.lower().startswith('bearer '):
return JSONResponse({'ok': False, 'error': 'missing token'}, status_code=401)
try:
token = auth.split(' ', 1)[1].strip()
if not token or len(token) < 10:
return JSONResponse({'ok': False, 'error': 'invalid token'}, status_code=401)
uid = user_mgmt.verify_token(token)
if not uid:
return JSONResponse({'ok': False, 'error': 'invalid token'}, status_code=401)
u = user_mgmt.get_user(uid)
if not u:
return JSONResponse({'ok': False, 'error': 'user not found'}, status_code=404)
return {'ok': True, 'user': u}
except Exception as e:
return JSONResponse({'ok': False, 'error': 'token validation failed'}, status_code=401)
@app.post('/api/simulate')
async def api_simulate(req: SimulationRequest):
try:
prediction = ai_analysis.simulate_visibility(req.content, req.brand, api_keys=req.api_keys)
return {'ok': True, 'prediction': prediction}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/analyze')
# @limiter.limit("20/minute") # Rate limiting disabled for testing
async def api_analyze(req: AnalysisRequest = None, request: Request = None, job_id: int = None):
api_keys = req.api_keys if req else {}
job_id = job_id or (req.job_id if req else None)
# Resolve audit path: job-specific first, then global fallback
if job_id:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': f'job {job_id} not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': f'Job {job_id} is still processing (result_path is empty). Please wait for the crawler to finish.'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
analysis_out_path = Path(result_path) / 'analysis.json'
else:
audit_path = OUTPUT_DIR / 'audit.json'
analysis_out_path = OUTPUT_DIR / 'analysis.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'no audit found; run crawl first'}, status_code=400)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
pages = audit.get('pages', [])
# Pass user-provided keys to analysis
analysis = ai_analysis.analyze_pages(pages, api_keys=api_keys)
# AI Visibility Check (Optional Per-request override)
ai_vis = audit.get('ai_visibility') or {}
org_name = audit.get('org_name') or ai_analysis.infer_brand_name(pages)
if api_keys:
queries = [f"What is {org_name}?", f"Best services for {org_name}"]
if api_keys.get('perplexity'):
ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys.get('perplexity'))
elif api_keys.get('openai'):
ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai'))
# 🔷 Deep AI Visibility Analysis (Sentiment/Shopping/Context) - Always Run if possible
try:
deep_ai = ai_analysis.analyze_ai_visibility_deep(pages, org_name, api_keys=api_keys)
# Flatten deep_ai into ai_vis so compute_geo_score can find them easily
if deep_ai:
ai_vis.update(deep_ai)
except Exception as e:
print(f"Deep AI analysis error: {e}")
audit['ai_visibility'] = ai_vis
# Save updated audit
with open(audit_path, 'w', encoding='utf-8') as f:
json.dump(audit, f, ensure_ascii=False, indent=2)
from server import geo_services
# ── Enhanced Visibility Score v2 (API Based) ────────────────────────────────
org_name = audit.get('org_name') or ai_analysis.infer_brand_name(pages)
# Fetch real search results using provided keys or environment fallbacks
searches = []
serp_key = (api_keys.get("SERPAPI_KEY") if api_keys else None) or os.getenv("SERPAPI_KEY")
zen_key = (api_keys.get("ZENSERP_KEY") if api_keys else None) or os.getenv("ZENSERP_KEY")
core_queries = [f"{org_name}", f"تحميل {org_name}", f"{org_name} review"]
for cq in core_queries[:2]:
s_res = geo_services._serp_api_search(cq, api_key=serp_key)
if not s_res:
s_res = geo_services._zenserp_search(cq, api_key=zen_key)
if s_res:
searches.append(s_res)
# ── Competitor Insight Enrichment ──────────────────────────────────────────
comp_insight = {}
try:
# Pass industry override if available
industry_override = audit.get('industry_override')
comp_insight = geo_services.get_competitor_insights(
org_name,
audit.get('url'),
api_keys=api_keys,
industry_override=industry_override
)
except Exception as e:
print(f"Competitor insight error: {e}")
pass
# Hybrid Score Calculation (v2)
ai_mentions = ai_vis.get("mentions", 0) if ai_vis else 0
total_queries = ai_vis.get("total_queries", 1) if ai_vis else 1
traffic_est = comp_insight.get("monthly_visits", "unknown")
geo_v2 = geo_services.calculate_visibility_score_v2(
org_name, searches, ai_mentions, total_queries, traffic_est
)
geo = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=ai_vis)
geo["v2"] = geo_v2 # Combined 40/40/20 score
analysis_out = { 'analysis': analysis, 'geo_score': geo, 'competitor_insight': comp_insight }
with open(analysis_out_path, 'w', encoding='utf-8') as fa:
json.dump(analysis_out, fa, ensure_ascii=False, indent=2)
return { 'ok': True, 'analysis': analysis, 'geo_score': geo, 'competitor_insight': comp_insight }
@app.get('/api/history')
async def api_history():
history_path = OUTPUT_DIR / 'history.json'
if not history_path.exists():
return { 'ok': True, 'history': [] }
try:
data = json.loads(history_path.read_text(encoding='utf-8'))
return { 'ok': True, 'history': data }
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class ActivateRequest(BaseModel):
ts: str
api_keys: Optional[dict] = None
@app.post('/api/history/activate')
async def api_activate_history(req: ActivateRequest):
"""
Switch the 'active' research to a historical snapshot.
This overwrites audit.json and triggers a re-analysis.
"""
try:
ts = req.ts
snapshot_path = OUTPUT_DIR / f'snapshot-{ts}.json'
if not snapshot_path.exists():
return JSONResponse({'ok': False, 'error': f'Snapshot {ts} not found'}, status_code=404)
# 1. Update audit.json
audit_main = OUTPUT_DIR / 'audit.json'
with open(snapshot_path, 'r', encoding='utf-8') as s, open(audit_main, 'w', encoding='utf-8') as m:
audit_obj = json.load(s)
json.dump(audit_obj, m, ensure_ascii=False, indent=2)
# 2. Trigger analysis and compute score
pages = audit_obj.get('pages', [])
api_keys = req.api_keys or {}
# Pass user-provided keys to analysis
analysis_data = ai_analysis.analyze_pages(pages, api_keys=api_keys)
# If brand name is generic, infer it
org_name = audit_obj.get('org_name', 'Company')
if not org_name or org_name == 'Company':
inferred = ai_analysis.infer_brand_name(pages)
if inferred and inferred != 'Company':
org_name = inferred
audit_obj['org_name'] = org_name
# Re-save with inferred name
with open(audit_main, 'w', encoding='utf-8') as f:
json.dump(audit_obj, f, ensure_ascii=False, indent=2)
ai_vis = audit_obj.get('ai_visibility')
# Re-run visibility if keys provided
if api_keys and (api_keys.get('perplexity') or api_keys.get('openai')):
queries = [f"What is {org_name}?", f"Best services for {org_name}"]
if api_keys.get('perplexity'):
ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys['perplexity'])
elif api_keys.get('openai'):
ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai'))
audit_obj['ai_visibility'] = ai_vis
with open(audit_main, 'w', encoding='utf-8') as f:
json.dump(audit_obj, f, ensure_ascii=False, indent=2)
geo_score = ai_analysis.compute_geo_score(pages, audit=audit_obj, ai_visibility=ai_vis)
# 3. Save analysis.json
full_report = { 'analysis': analysis_data, 'geo_score': geo_score }
analysis_path = OUTPUT_DIR / 'analysis.json'
with open(analysis_path, 'w', encoding='utf-8') as fa:
json.dump(full_report, fa, ensure_ascii=False, indent=2)
# 4. Generate Strategic Intelligence Report for the newly activated data
# This ensures the Competitive Intelligence Matrix updates immediately
try:
from server import search_intelligence
report = search_intelligence.run_complete_analysis(pages, source_url=audit_obj.get('url', 'http://example.com'), api_keys=api_keys)
except Exception as e:
print(f"Intelligence sync error: {e}")
report = None
return {
'ok': True,
'message': f'Research {ts} activated successfully',
'org_name': org_name,
'pages_count': len(pages),
'report': report
}
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/recommendations')
async def api_recommendations(req: RecommendationRequest = None):
# load analysis and audit results then produce recommendations
api_keys = req.api_keys if req else {}
job_id = req.job_id if req else None
extra_context = req.extra_context if (req and req.extra_context) else {}
# Resolve paths: prefer job-specific result_path, fall back to global output/
if job_id:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': f'job {job_id} not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': f'job {job_id} has no results yet — wait for it to complete'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
analysis_path = Path(result_path) / 'analysis.json'
else:
audit_path = OUTPUT_DIR / 'audit.json'
analysis_path = OUTPUT_DIR / 'analysis.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'no audit found — run a crawl first'}, status_code=400)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
pages = audit.get('pages', [])
# If keys are provided, we should probably re-run analysis to fill in the "Visibility" and "AI Analysis" gaps
if api_keys:
# Re-run visibility if brand changed or results are missing
old_org = audit.get('org_name', 'Company')
org_name = old_org
if not org_name or org_name == 'Company':
inferred = ai_analysis.infer_brand_name(pages)
if inferred and inferred != 'Company':
org_name = inferred
audit['org_name'] = org_name
queries = [f"What is {org_name}?", f"Best services for {org_name}"]
ai_vis = audit.get('ai_visibility')
# If brand name improved OR visibility is missing/demo, re-run it
needs_visibility = (org_name != old_org) or not ai_vis or not ai_vis.get('enabled')
if needs_visibility and (api_keys.get('perplexity') or api_keys.get('openai')):
if api_keys.get('perplexity'):
ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys['perplexity'])
elif api_keys.get('openai'):
ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai'))
audit['ai_visibility'] = ai_vis
# Re-run analysis
analysis_data = ai_analysis.analyze_pages(pages, api_keys=api_keys)
geo_score = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=ai_vis)
# Save updated results
with open(audit_path, 'w', encoding='utf-8') as f:
json.dump(audit, f, ensure_ascii=False, indent=2)
with open(analysis_path, 'w', encoding='utf-8') as f:
json.dump({ 'analysis': analysis_data, 'geo_score': geo_score }, f, ensure_ascii=False, indent=2)
else:
# Just load existing
if not analysis_path.exists():
# Trigger basic analysis if missing
analysis_data = ai_analysis.analyze_pages(pages)
geo_score = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=audit.get('ai_visibility'))
else:
with open(analysis_path, 'r', encoding='utf-8') as f:
ana_obj = json.load(f)
analysis_data = ana_obj.get('analysis')
geo_score = ana_obj.get('geo_score')
# Get Search Intelligence for benchmarks
search_intel_report = None
try:
from server import search_intelligence
search_intel_report = search_intelligence.run_complete_analysis(pages, source_url=audit.get('url', ''), api_keys=api_keys)
except Exception as e:
print(f"Search intel error in recommendations: {e}")
recs = ai_analysis.generate_recommendations(pages, geo_score=geo_score, api_keys=api_keys, ai_analysis_results=analysis_data, extra_context=extra_context)
return {
'ok': True,
'recommendations': recs,
'audit': audit,
'ai_visibility': audit.get('ai_visibility'),
'analysis': analysis_data,
'geo_score': geo_score,
'search_intel': search_intel_report
}
class KeywordsRequest(BaseModel):
url: str
max_pages: int = 1
api_keys: Optional[dict] = None
@app.post('/api/keywords')
# @limiter.limit("15/minute") # Rate limiting disabled for testing
async def api_keywords(req: KeywordsRequest, request: Request, enrich: bool = False, analytics: bool = False):
try:
# try to fetch pages via crawler (will fallback to requests)
from src import crawler
pages = crawler.crawl_seed(req.url, max_pages=req.max_pages)
# build a minimal audit-like object
audit_obj = {'pages': pages}
from server import keyword_engine
result = keyword_engine.extract_keywords_from_audit(audit_obj, top_n=40, enrich=enrich, analytics=analytics)
if analytics:
# Return full analytics report
return {'ok': True, 'analytics': result, 'pages': [{'url': p.get('url'), 'title': p.get('title')} for p in pages]}
else:
# Return simple keyword list
return {'ok': True, 'keywords': result, 'pages': [{'url': p.get('url'), 'title': p.get('title')} for p in pages]}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/test/keywords')
async def api_test_keywords():
"""Test endpoint to verify keyword extraction works."""
try:
from src import crawler
from server.keyword_engine import extract_keywords_from_audit
# Test with abayanoir
pages = crawler.crawl_seed('https://abayanoir.com', max_pages=1)
audit = {'pages': pages}
# Simple mode
simple = extract_keywords_from_audit(audit, top_n=5, enrich=False, analytics=False)
# Analytics mode
analytics = extract_keywords_from_audit(audit, top_n=5, enrich=False, analytics=True)
return {
'ok': True,
'simple_keywords': simple,
'analytics_type': str(type(analytics)),
'analytics_summary': analytics.get('summary') if isinstance(analytics, dict) else 'NOT A DICT',
'pages_crawled': len(pages)
}
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/search/intelligence')
async def api_search_intelligence(req: KeywordsRequest, enrich: bool = False):
"""Complete search intelligence analysis with keywords, competitors, and recommendations."""
try:
from src import crawler
from server import search_intelligence
# Crawl pages
pages = crawler.crawl_seed(req.url, max_pages=req.max_pages)
# Ensure pages is a list
if not isinstance(pages, list):
pages = [pages] if pages else []
# Run complete analysis
report = search_intelligence.run_complete_analysis(pages, req.url, enrich_data=enrich, api_keys=req.api_keys)
return {'ok': True, 'report': report}
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/export/keywords/csv')
async def api_export_keywords_csv(req: KeywordsRequest, enrich: bool = False):
"""Export keywords to CSV format."""
try:
from src import crawler
from server import search_intelligence
import csv
from io import StringIO
from fastapi.responses import StreamingResponse
# Crawl and analyze
pages = crawler.crawl_seed(req.url, max_pages=req.max_pages)
if not isinstance(pages, list):
pages = [pages] if pages else []
report = search_intelligence.run_complete_analysis(pages, req.url, enrich_data=enrich)
# Create CSV
output = StringIO()
writer = csv.writer(output)
# Header
writer.writerow(['Keyword', 'Count', 'Density (%)', 'Volume', 'CPC ($)', 'Competition', 'Classification'])
# Primary keywords
for kw in report['keyword_results']['classification']['primary']['keywords']:
writer.writerow([
kw['kw'],
kw['count'],
kw.get('density', 'N/A'),
kw.get('volume', 'N/A'),
kw.get('cpc', 'N/A'),
kw.get('competition', 'N/A'),
'Primary'
])
# Secondary keywords
for kw in report['keyword_results']['classification']['secondary']['keywords']:
writer.writerow([
kw['kw'],
kw['count'],
kw.get('density', 'N/A'),
kw.get('volume', 'N/A'),
kw.get('cpc', 'N/A'),
kw.get('competition', 'N/A'),
'Secondary'
])
# Long-tail keywords
for kw in report['keyword_results']['classification']['long_tail']['keywords']:
writer.writerow([
kw['kw'],
kw['count'],
kw.get('density', 'N/A'),
kw.get('volume', 'N/A'),
kw.get('cpc', 'N/A'),
kw.get('competition', 'N/A'),
'Long-tail'
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=keywords_{req.url.replace('https://', '').replace('/', '_')}.csv"}
)
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/search/competitors')
async def api_search_competitors(req: KeywordsRequest):
try:
from src import crawler
from server import competitor_analysis
pages = crawler.crawl_seed(req.url, max_pages=req.max_pages)
competitors = competitor_analysis.detect_competitors(pages, req.url, min_mentions=1)
summary = competitor_analysis.get_competitor_summary(competitors)
return {
'ok': True,
'result': {
'competitors': competitors,
'summary': summary,
'pages_analyzed': len(pages)
}
}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/search/gsc')
async def api_search_gsc(site: str, start: str, end: str):
try:
res = search_intel.gsc_query(site, start, end)
return {'ok': True, 'result': res}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# ── AI Content Engine ──────────────────────────────────────────────────────────
class ArticleRequest(BaseModel):
keyword: str
lang: str = 'en'
target_site: str = ""
research_insights: list = []
competitors_content: list = []
crawl_data: dict = {}
prefer_backend: str = 'groq'
api_keys: dict = {}
class OptimizeRequest(BaseModel):
content: str
keyword: str
lang: str = 'en'
target_site: str = ""
research_insights: list = []
crawl_data: dict = {}
prefer_backend: str = 'groq'
api_keys: dict = {}
class FaqRequest(BaseModel):
topic: str
page_content: str = ''
lang: str = 'en'
target_site: str = ""
research_insights: list = []
crawl_data: dict = {}
count: int = 5
prefer_backend: str = 'groq'
api_keys: dict = {}
class SemanticRequest(BaseModel):
content: str
lang: str = 'en'
prefer_backend: str = 'groq'
api_keys: dict = {}
class IdentityRequest(BaseModel):
crawl_data: dict = {}
lang: str = 'en'
prefer_backend: str = 'groq'
api_keys: dict = {}
@app.post('/api/content/generate')
async def api_content_generate(req: ArticleRequest):
try:
from server import content_engine
result = content_engine.generate_article(
req.keyword, lang=req.lang,
target_site=req.target_site,
research_insights=req.research_insights,
competitors_content=req.competitors_content,
crawl_data=req.crawl_data,
prefer_backend=req.prefer_backend,
api_keys=req.api_keys
)
return {'ok': True, 'result': result}
except Exception as e:
print(f"⚠️ [API] Content Gen Failed: {e}. Falling back to Demo.")
try:
from server import content_engine
# Force demo mode
result = content_engine.generate_article(req.keyword, lang=req.lang, target_site=req.target_site, prefer_backend='demo')
return {'ok': True, 'result': result, 'warn': str(e)}
except Exception as e2:
return JSONResponse({'ok': False, 'error': str(e2)}, status_code=500)
@app.post('/api/content/optimize')
async def api_content_optimize(req: OptimizeRequest):
try:
from server import content_engine
result = content_engine.optimize_content(
req.content, req.keyword, lang=req.lang,
target_site=req.target_site,
research_insights=req.research_insights,
crawl_data=req.crawl_data,
prefer_backend=req.prefer_backend,
api_keys=req.api_keys
)
return {'ok': True, 'result': result}
except Exception as e:
print(f"⚠️ [API] Content Optimization Failed: {e}. Falling back to Demo.")
try:
from server import content_engine
result = content_engine.optimize_content(req.content, req.keyword, lang=req.lang, target_site=req.target_site, prefer_backend='demo')
return {'ok': True, 'result': result, 'warn': str(e)}
except Exception as e2:
return JSONResponse({'ok': False, 'error': str(e2)}, status_code=500)
@app.post('/api/content/faqs')
async def api_content_faqs(req: FaqRequest):
try:
from server import content_engine
result = content_engine.generate_faqs(
req.topic, page_content=req.page_content,
lang=req.lang, count=req.count,
target_site=req.target_site,
research_insights=req.research_insights,
crawl_data=req.crawl_data,
prefer_backend=req.prefer_backend,
api_keys=req.api_keys
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/content/semantic')
async def api_content_semantic(req: SemanticRequest):
try:
from server import content_engine
result = content_engine.semantic_optimize(
req.content, lang=req.lang,
prefer_backend=req.prefer_backend,
api_keys=req.api_keys
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/content/identity')
async def api_content_identity(req: IdentityRequest):
try:
from server import content_engine
result = content_engine.generate_identity(
crawl_data=req.crawl_data,
lang=req.lang,
prefer_backend=req.prefer_backend,
api_keys=req.api_keys
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# ═══════════════════════════════════════════════════════════════════════════════
# PAID ADS MANAGEMENT MODULE
# ═══════════════════════════════════════════════════════════════════════════════
from server import ads_manager, ads_ai
class AdsConnectRequest(BaseModel):
developer_token: str
client_id: str
client_secret: str
refresh_token: str
customer_id: str
class AdsAIRequest(BaseModel):
api_keys: Optional[dict] = None
lang: Optional[str] = 'ar'
service_name: Optional[str] = 'خدمات SEO'
usp: Optional[str] = 'نتائج مضمونة في 90 يوم'
target_audience: Optional[str] = 'شركات سعودية'
class CampaignCreateRequest(BaseModel):
name: str
budget_usd: float = 5.0
target_cpa: Optional[float] = None
@app.post('/api/ads/connect')
async def api_ads_connect(req: AdsConnectRequest):
"""Save Google Ads credentials and verify they work."""
try:
credentials = {
'developer_token': req.developer_token,
'client_id': req.client_id,
'client_secret': req.client_secret,
'refresh_token': req.refresh_token,
'customer_id': req.customer_id
}
result = ads_manager.verify_google_connection(credentials)
if result.get('ok'):
ads_manager.save_ads_config(credentials)
return result
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/ads/dashboard')
async def api_ads_dashboard(demo: bool = False, days: int = 30):
"""Return unified KPI dashboard data — uses saved credentials or demo."""
try:
credentials = {} if demo else ads_manager.load_ads_config()
campaigns = ads_manager.get_campaign_performance(credentials, days=days)
summary = ads_manager.build_ads_summary(campaigns, credentials=credentials)
return {'ok': True, 'summary': summary, 'campaigns': campaigns}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/ads/keywords')
async def api_ads_keywords(demo: bool = False):
"""Return keyword performance with Quality Scores."""
try:
credentials = {} if demo else ads_manager.load_ads_config()
keywords = ads_manager.get_keyword_performance(credentials)
return {'ok': True, 'keywords': keywords, 'count': len(keywords)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/ads/search-terms')
async def api_ads_search_terms(demo: bool = False, min_clicks: int = 5):
"""Return search term analysis — converting vs. wasted spend."""
try:
credentials = {} if demo else ads_manager.load_ads_config()
data = ads_manager.get_search_terms(credentials, min_clicks=min_clicks)
return {'ok': True, 'data': data}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/ads/campaigns')
async def api_ads_create_campaign(req: CampaignCreateRequest):
"""Create a new Google Ads campaign (starts PAUSED for safety)."""
try:
credentials = ads_manager.load_ads_config()
result = ads_manager.create_campaign(
credentials, req.name, req.budget_usd, req.target_cpa
)
return result
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/ads/ai/bid-suggestions')
async def api_ads_bid_suggestions(req: AdsAIRequest):
"""AI-powered bid adjustment recommendations for each keyword."""
try:
credentials = ads_manager.load_ads_config()
keywords = ads_manager.get_keyword_performance(credentials)
suggestions = ads_ai.ai_bid_suggestion(keywords, api_keys=req.api_keys or {})
return {'ok': True, 'suggestions': suggestions, 'count': len(suggestions)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/ads/ai/copy')
async def api_ads_generate_copy(req: AdsAIRequest):
"""Generate Arabic + English RSA ad copy using AI."""
try:
result = ads_ai.generate_ad_copy(
service_name=req.service_name or 'خدمات SEO',
usp=req.usp or 'نتائج مضمونة في 90 يوم',
target_audience=req.target_audience or 'شركات سعودية',
lang=req.lang or 'ar',
api_keys=req.api_keys or {}
)
return {'ok': True, 'copy': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/ads/ai/negatives')
async def api_ads_negative_keywords(req: AdsAIRequest):
"""Detect irrelevant search terms and suggest negative keywords."""
try:
credentials = ads_manager.load_ads_config()
search_terms = ads_manager.get_search_terms(credentials)
result = ads_ai.detect_negative_keywords(search_terms, api_keys=req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/ads/ai/weekly-report')
async def api_ads_weekly_report(req: AdsAIRequest):
"""Generate AI-written weekly performance report in Arabic or English."""
try:
credentials = ads_manager.load_ads_config()
campaigns = ads_manager.get_campaign_performance(credentials)
keywords = ads_manager.get_keyword_performance(credentials)
search_terms = ads_manager.get_search_terms(credentials)
report = ads_ai.generate_weekly_report(
campaigns, keywords, search_terms,
api_keys=req.api_keys or {},
lang=req.lang or 'ar'
)
return {'ok': True, 'report': report}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# ══════════════════════════════════════════════════════════════════════════════
# GEO SERVICES — 6 AI Visibility Services
# ══════════════════════════════════════════════════════════════════════════════
from server import geo_services
class GeoServiceRequest(BaseModel):
brand: str
url: Optional[str] = None
queries: Optional[List[str]] = None
competitors: Optional[List[str]] = None
brand_variants: Optional[List[str]] = None
api_keys: Optional[dict] = None
class SimulatorRequest(BaseModel):
brand: str
original_content: str
improved_content: str
test_queries: List[str]
api_keys: Optional[dict] = None
@app.post('/api/geo/visibility')
async def api_geo_visibility(req: GeoServiceRequest):
try:
queries = req.queries or geo_services.DEFAULT_QUERIES
result = geo_services.visibility_score(req.brand, queries, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/recognition')
async def api_geo_recognition(req: GeoServiceRequest):
try:
queries = req.queries or geo_services.DEFAULT_QUERIES
variants = req.brand_variants or [req.brand]
result = geo_services.brand_recognition(req.brand, variants, queries, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/sentiment')
async def api_geo_sentiment(req: GeoServiceRequest):
try:
queries = req.queries or geo_services.DEFAULT_QUERIES
result = geo_services.sentiment_analysis(req.brand, queries, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/competitors')
async def api_geo_competitors(req: GeoServiceRequest):
try:
queries = req.queries or geo_services.DEFAULT_QUERIES
competitors = req.competitors or ['SEMrush', 'Ahrefs', 'Moz']
result = geo_services.competitor_ranking(req.brand, competitors, queries, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/regional')
async def api_geo_regional(req: GeoServiceRequest):
try:
result = geo_services.geo_regional_analysis(req.brand, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/fix')
async def api_geo_fix(req: GeoServiceRequest):
try:
if not req.url:
return JSONResponse({'ok': False, 'error': 'url required'}, status_code=400)
result = geo_services.fix_recommendations(req.url, req.brand, {}, req.api_keys or {})
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/simulate')
async def api_geo_simulate(req: SimulatorRequest):
try:
result = geo_services.visibility_simulator(
req.original_content, req.improved_content,
req.test_queries, req.brand, req.api_keys or {}
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/geo/suite')
async def api_geo_suite(req: GeoServiceRequest, background_tasks: BackgroundTasks):
"""Run all 6 GEO services at once for a brand."""
try:
result = geo_services.run_full_suite(
req.brand, req.url, req.competitors, req.api_keys or {}
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# ═══════════════════════════════════════════════════════════════════════════════
# ADVANCED FEATURES — Keyword Tracking, Alerts, Scheduler, Bulk, Gap, Email
# ═══════════════════════════════════════════════════════════════════════════════
from server.advanced_features import (
save_keyword_snapshot, save_geo_score_snapshot,
get_keyword_trends, get_geo_score_trends,
check_and_create_alerts, get_alerts, mark_alerts_seen,
add_scheduled_crawl, list_scheduled_crawls, delete_scheduled_crawl,
competitor_keyword_gap, bulk_enqueue, send_weekly_report,
start_scheduler, init_advanced_tables
)
try:
init_advanced_tables()
start_scheduler()
except Exception:
pass
@app.get('/api/tracking/keywords')
async def api_keyword_trends(url: str, keyword: str = None, days: int = 30):
try:
return {'ok': True, 'trends': get_keyword_trends(url, keyword, days)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/tracking/geo-score')
async def api_geo_score_trends(url: str, days: int = 90):
try:
return {'ok': True, 'trends': get_geo_score_trends(url, days)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/alerts')
async def api_get_alerts(url: str = None, unseen_only: bool = False):
try:
alerts = get_alerts(url, unseen_only)
return {'ok': True, 'alerts': alerts, 'count': len(alerts)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/alerts/seen')
async def api_mark_alerts_seen(req: dict):
try:
mark_alerts_seen(req.get('ids', []))
return {'ok': True}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class ScheduleRequest(BaseModel):
url: str
org_name: str = ''
org_url: str = ''
max_pages: int = 3
frequency: str = 'weekly'
@app.post('/api/schedule')
async def api_add_schedule(req: ScheduleRequest):
try:
sid = add_scheduled_crawl(req.url, req.org_name, req.org_url, req.max_pages, req.frequency)
return {'ok': True, 'schedule_id': sid}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/schedule')
async def api_list_schedules():
try:
return {'ok': True, 'schedules': list_scheduled_crawls()}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.delete('/api/schedule/{schedule_id}')
async def api_delete_schedule(schedule_id: int):
try:
delete_scheduled_crawl(schedule_id)
return {'ok': True}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class GapRequest(BaseModel):
your_keywords: list = []
competitor_url: str
max_pages: int = 3
@app.post('/api/competitor/gap')
async def api_competitor_gap(req: GapRequest):
try:
return {'ok': True, 'gap': competitor_keyword_gap(req.your_keywords, req.competitor_url, req.max_pages)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class BulkRequest(BaseModel):
urls: list
org_name: str = ''
max_pages: int = 2
@app.post('/api/bulk/crawl')
async def api_bulk_crawl(req: BulkRequest):
try:
job_ids = bulk_enqueue(req.urls, req.org_name, req.max_pages)
return {'ok': True, 'job_ids': job_ids, 'count': len(job_ids)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
class EmailReportRequest(BaseModel):
to_email: str
url: str
@app.post('/api/reports/email')
async def api_send_email_report(req: EmailReportRequest):
try:
ok = send_weekly_report(req.to_email, req.url)
if ok:
return {'ok': True, 'message': f'تم الإرسال إلى {req.to_email}'}
return JSONResponse({'ok': False, 'error': 'فشل الإرسال — أضف SMTP_HOST و SMTP_USER و SMTP_PASS في .env'}, status_code=400)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
_SETTINGS_PATH = Path(os.environ.get('OUTPUT_DIR', Path(__file__).resolve().parent.parent / 'output')) / 'settings.json'
def _load_settings() -> dict:
if _SETTINGS_PATH.exists():
try:
return json.loads(_SETTINGS_PATH.read_text())
except Exception:
pass
return {}
def _save_settings(data: dict):
_SETTINGS_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2))
@app.get('/api/settings')
async def api_get_settings():
s = _load_settings()
safe = {}
for k, v in s.items():
safe[k] = '***' if (v and (k.endswith('_key') or k.endswith('_KEY') or 'pass' in k.lower())) else v
return {'ok': True, 'settings': safe}
@app.post('/api/settings')
async def api_save_settings(req: dict):
try:
current = _load_settings()
for k, v in req.items():
if v and v != '***':
current[k] = v
os.environ[k.upper()] = v
_save_settings(current)
return {'ok': True}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
try:
for k, v in _load_settings().items():
if v and v != '***':
os.environ.setdefault(k.upper(), v)
except Exception:
pass
# ── Competitor Intelligence Analyzer ─────────────────────────────────────────
class CompetitorIntelRequest(BaseModel):
url: str
region: str = 'Saudi Arabia'
industry: str = ''
count: int = 7
api_keys: dict = {}
@app.post('/api/competitor/intelligence')
async def api_competitor_intelligence(req: CompetitorIntelRequest):
try:
from server.competitor_intel import analyze_competitors
result = analyze_competitors(
req.url, region=req.region,
industry=req.industry, count=req.count,
api_keys=req.api_keys
)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# ── Projects (Multi-site) ─────────────────────────────────────────────────────
import sqlite3 as _sqlite3
from pathlib import Path as _Path
_PROJ_DB = _Path(os.environ.get('OUTPUT_DIR', './output')) / 'projects.db'
def _init_projects_db():
conn = _sqlite3.connect(str(_PROJ_DB))
conn.execute("""CREATE TABLE IF NOT EXISTS projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
name TEXT NOT NULL,
url TEXT NOT NULL,
industry TEXT,
color TEXT DEFAULT '#3b82f6',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""")
conn.execute("""CREATE TABLE IF NOT EXISTS project_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id INTEGER,
job_id INTEGER,
geo_score INTEGER,
seo_score INTEGER,
ai_visibility INTEGER,
pages_crawled INTEGER,
keywords_count INTEGER,
snapshot_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""")
conn.commit()
conn.close()
_init_projects_db()
class ProjectCreate(BaseModel):
name: str
url: str
industry: str = ''
color: str = '#3b82f6'
@app.get('/api/projects')
async def api_list_projects(request: Request):
auth = request.headers.get('authorization','')
uid = None
try:
token = auth.split(' ',1)[1].strip()
uid = user_mgmt.verify_token(token)
except: pass
conn = _sqlite3.connect(str(_PROJ_DB))
conn.row_factory = _sqlite3.Row
rows = conn.execute('SELECT * FROM projects WHERE user_id=? OR user_id IS NULL ORDER BY created_at DESC', (uid,)).fetchall()
projects = []
for r in rows:
p = dict(r)
# Get latest snapshot
snap = conn.execute('SELECT * FROM project_snapshots WHERE project_id=? ORDER BY snapshot_date DESC LIMIT 1', (p['id'],)).fetchone()
p['latest'] = dict(snap) if snap else {}
# Get trend (last 10 snapshots)
trend = conn.execute('SELECT geo_score, snapshot_date FROM project_snapshots WHERE project_id=? ORDER BY snapshot_date DESC LIMIT 10', (p['id'],)).fetchall()
p['trend'] = [dict(t) for t in reversed(trend)]
projects.append(p)
conn.close()
return {'ok': True, 'projects': projects}
@app.post('/api/projects')
async def api_create_project(req: ProjectCreate, request: Request):
auth = request.headers.get('authorization','')
uid = None
try:
token = auth.split(' ',1)[1].strip()
uid = user_mgmt.verify_token(token)
except: pass
conn = _sqlite3.connect(str(_PROJ_DB))
cur = conn.execute('INSERT INTO projects (user_id,name,url,industry,color) VALUES (?,?,?,?,?)',
(uid, req.name, req.url, req.industry, req.color))
pid = cur.lastrowid
conn.commit()
conn.close()
return {'ok': True, 'project_id': pid}
@app.delete('/api/projects/{project_id}')
async def api_delete_project(project_id: int):
conn = _sqlite3.connect(str(_PROJ_DB))
conn.execute('DELETE FROM projects WHERE id=?', (project_id,))
conn.execute('DELETE FROM project_snapshots WHERE project_id=?', (project_id,))
conn.commit()
conn.close()
return {'ok': True}
@app.post('/api/projects/{project_id}/snapshot')
async def api_save_snapshot(project_id: int, request: Request):
data = await request.json()
conn = _sqlite3.connect(str(_PROJ_DB))
conn.execute("""INSERT INTO project_snapshots
(project_id,job_id,geo_score,seo_score,ai_visibility,pages_crawled,keywords_count)
VALUES (?,?,?,?,?,?,?)""",
(project_id, data.get('job_id'), data.get('geo_score',0),
data.get('seo_score',0), data.get('ai_visibility',0),
data.get('pages_crawled',0), data.get('keywords_count',0)))
conn.commit()
conn.close()
return {'ok': True}
# ── Subscription & Plans ──────────────────────────────────────────────────────
from server import payments as _payments
@app.get('/api/plans')
async def api_get_plans():
return {'ok': True, 'plans': _payments.PLANS}
@app.get('/api/subscription')
async def api_get_subscription(request: Request):
auth = request.headers.get('authorization','')
try:
token = auth.split(' ',1)[1].strip()
uid = user_mgmt.verify_token(token)
if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401)
sub = _payments.get_subscription(uid)
usage = _payments.get_usage(uid)
return {'ok': True, 'subscription': sub, 'usage': usage}
except Exception as e:
return JSONResponse({'ok':False,'error':str(e)},status_code=500)
@app.post('/api/subscription/checkout')
async def api_create_checkout(request: Request):
try:
data = await request.json()
auth = request.headers.get('authorization','')
token = auth.split(' ',1)[1].strip()
uid = user_mgmt.verify_token(token)
if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401)
result = _payments.create_checkout_session(
uid, data.get('plan','pro'),
data.get('success_url', '/portal.html?upgraded=1'),
data.get('cancel_url', '/pricing.html')
)
return {'ok': True, **result}
except Exception as e:
return JSONResponse({'ok':False,'error':str(e)},status_code=500)
@app.post('/api/subscription/webhook')
async def api_stripe_webhook(request: Request):
payload = await request.body()
sig = request.headers.get('stripe-signature','')
result = _payments.handle_webhook(payload, sig)
return result
@app.get('/api/subscription/usage')
async def api_check_usage(resource: str, request: Request):
try:
auth = request.headers.get('authorization','')
token = auth.split(' ',1)[1].strip()
uid = user_mgmt.verify_token(token)
if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401)
result = _payments.check_usage_limit(uid, resource)
return {'ok': True, **result}
except Exception as e:
return JSONResponse({'ok':False,'error':str(e)},status_code=500)
# ── SEO Score + Core Web Vitals ───────────────────────────────────────────────
import requests as _requests
@app.get('/api/seo-score')
async def api_seo_score(url: str):
"""Traditional SEO score + Core Web Vitals via PageSpeed API."""
try:
from server.competitor_intel import get_pagespeed
ps = get_pagespeed(url)
# Build SEO score from PageSpeed data
seo_raw = ps.get('seo', 0)
perf_raw = ps.get('performance', 0)
access_raw = ps.get('accessibility', 70)
bp_raw = ps.get('best_practices', 80)
overall = round((seo_raw * 0.4 + perf_raw * 0.3 + access_raw * 0.15 + bp_raw * 0.15))
# Core Web Vitals
cwv = {
'lcp': ps.get('lcp', 'N/A'),
'fid': ps.get('fid', 'N/A'),
'cls': ps.get('cls', 'N/A'),
'fcp': ps.get('fcp', 'N/A'),
'ttfb': ps.get('ttfb', 'N/A'),
}
status = 'ممتاز' if overall >= 80 else 'جيد' if overall >= 60 else 'يحتاج تحسين'
return {
'ok': True,
'url': url,
'seo_score': overall,
'status': status,
'breakdown': {
'seo': seo_raw,
'performance': perf_raw,
'accessibility': access_raw,
'best_practices': bp_raw,
},
'core_web_vitals': cwv,
'source': ps.get('source', 'pagespeed')
}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# ── Schema Validation ─────────────────────────────────────────────────────────
@app.post('/api/schema/validate')
async def api_validate_schema(request: Request):
"""Validate JSON-LD schema markup."""
try:
data = await request.json()
url = data.get('url', '')
schema_str = data.get('schema', '')
issues = []
score = 100
if not schema_str:
# Try to fetch from URL
if url:
try:
import re
resp = _requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'})
matches = re.findall(r"<script[^>]*type=[\"']application/ld\+json[\"'][^>]*>(.*?)</script>",
resp.text, re.DOTALL)
schema_str = matches[0] if matches else ''
except: pass
if not schema_str:
return {'ok': True, 'score': 0, 'issues': [
{'type': 'missing', 'severity': 'critical',
'message': 'لا توجد بيانات JSON-LD منظمة في الصفحة'}
], 'has_schema': False}
import json as _json
try:
schema_obj = _json.loads(schema_str)
except:
return {'ok': True, 'score': 10, 'issues': [
{'type': 'invalid_json', 'severity': 'critical',
'message': 'بيانات JSON-LD غير صالحة — خطأ في التنسيق'}
], 'has_schema': True}
# Check required fields
schema_type = schema_obj.get('@type', '')
context = schema_obj.get('@context', '')
if not context:
issues.append({'type': 'missing_context', 'severity': 'high',
'message': 'حقل @context مفقود — يجب أن يكون https://schema.org'})
score -= 20
if not schema_type:
issues.append({'type': 'missing_type', 'severity': 'high',
'message': 'حقل @type مفقود — حدد نوع الكيان (Organization, Product, etc.)'})
score -= 20
# Type-specific checks
if schema_type == 'Organization':
for field in ['name', 'url', 'logo', 'contactPoint']:
if field not in schema_obj:
issues.append({'type': f'missing_{field}', 'severity': 'medium',
'message': f'حقل {field} مفقود من Organization schema'})
score -= 10
elif schema_type == 'Product':
for field in ['name', 'description', 'offers']:
if field not in schema_obj:
issues.append({'type': f'missing_{field}', 'severity': 'medium',
'message': f'حقل {field} مفقود من Product schema'})
score -= 10
elif schema_type == 'Article':
for field in ['headline', 'author', 'datePublished']:
if field not in schema_obj:
issues.append({'type': f'missing_{field}', 'severity': 'medium',
'message': f'حقل {field} مفقود من Article schema'})
score -= 10
if not issues:
issues.append({'type': 'valid', 'severity': 'ok',
'message': f'Schema صالح من نوع {schema_type}'})
return {
'ok': True,
'score': max(0, score),
'schema_type': schema_type,
'has_schema': True,
'issues': issues,
'recommendations': [
'أضف FAQ Schema لتحسين الظهور في نتائج الذكاء الاصطناعي',
'أضف BreadcrumbList Schema لتحسين التنقل',
'أضف SiteLinksSearchBox Schema للبحث المباشر',
] if score < 80 else []
}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# ── Client Portal (Read-only) ─────────────────────────────────────────────────
class ClientTokenCreate(BaseModel):
job_id: int
client_email: str
expires_days: int = 30
_CLIENT_DB = _Path(os.environ.get('OUTPUT_DIR', './output')) / 'clients.db'
def _init_client_db():
conn = _sqlite3.connect(str(_CLIENT_DB))
conn.execute("""CREATE TABLE IF NOT EXISTS client_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
token TEXT UNIQUE NOT NULL,
job_id INTEGER NOT NULL,
client_email TEXT,
owner_user_id INTEGER,
expires_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)""")
conn.commit()
conn.close()
_init_client_db()
@app.post('/api/client/token')
async def api_create_client_token(req: ClientTokenCreate, request: Request):
"""Create a read-only client access token for a specific job."""
import secrets
from datetime import datetime, timedelta
auth = request.headers.get('authorization','')
try:
token_str = auth.split(' ',1)[1].strip()
uid = user_mgmt.verify_token(token_str)
if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401)
except:
return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401)
client_token = secrets.token_urlsafe(32)
expires = datetime.utcnow() + timedelta(days=req.expires_days)
conn = _sqlite3.connect(str(_CLIENT_DB))
conn.execute('INSERT INTO client_tokens (token,job_id,client_email,owner_user_id,expires_at) VALUES (?,?,?,?,?)',
(client_token, req.job_id, req.client_email, uid, expires))
conn.commit()
conn.close()
portal_url = f'/client.html?token={client_token}'
return {'ok': True, 'token': client_token, 'portal_url': portal_url, 'expires_at': str(expires)}
@app.get('/api/client/report')
async def api_client_report(token: str):
"""Get report data using client token (read-only)."""
from datetime import datetime
conn = _sqlite3.connect(str(_CLIENT_DB))
conn.row_factory = _sqlite3.Row
row = conn.execute('SELECT * FROM client_tokens WHERE token=?', (token,)).fetchone()
conn.close()
if not row:
return JSONResponse({'ok':False,'error':'رابط غير صالح'},status_code=404)
row = dict(row)
if row.get('expires_at'):
try:
exp = datetime.fromisoformat(str(row['expires_at']))
if datetime.utcnow() > exp:
return JSONResponse({'ok':False,'error':'انتهت صلاحية الرابط'},status_code=403)
except: pass
job_id = row['job_id']
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok':False,'error':'التقرير غير موجود'},status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok':False,'error':'التقرير لم يكتمل بعد'},status_code=400)
out = {'ok': True, 'job_id': job_id, 'client_email': row.get('client_email')}
import json as _json
for fname in ['audit.json', 'analysis.json']:
fpath = _Path(result_path) / fname
if fpath.exists():
key = fname.replace('.json','')
out[key] = _json.loads(fpath.read_text(encoding='utf-8'))
return out
# ── Email Reports ─────────────────────────────────────────────────────────────
class EmailReportRequest2(BaseModel):
to_email: str
job_id: int
subject: str = ''
include_recommendations: bool = True
@app.post('/api/reports/send')
async def api_send_report_email(req: EmailReportRequest2, request: Request):
"""Send a full HTML report to a client email."""
try:
from server import reports as _reports
from server.advanced_features import send_email_report
job = job_queue.get_job(req.job_id)
if not job or not job.get('result_path'):
return JSONResponse({'ok':False,'error':'التقرير غير جاهز'},status_code=400)
html = _reports.build_html_report(job['result_path'])
subject = req.subject or f'تقرير GEO — {job.get("url","")}'
ok = send_email_report(req.to_email, subject, html)
if ok:
return {'ok': True, 'message': f'تم إرسال التقرير إلى {req.to_email}'}
return JSONResponse({'ok':False,'error':'فشل الإرسال — تحقق من إعدادات SMTP في .env'},status_code=400)
except Exception as e:
return JSONResponse({'ok':False,'error':str(e)},status_code=500)
# ── White-label Settings ──────────────────────────────────────────────────────
class WhiteLabelSettings(BaseModel):
agency_name: str = ''
agency_logo: str = ''
primary_color: str = '#3b82f6'
report_footer: str = ''
custom_domain: str = ''
@app.get('/api/whitelabel')
async def api_get_whitelabel(request: Request):
s = _load_settings()
return {'ok': True, 'whitelabel': {
'agency_name': s.get('agency_name',''),
'agency_logo': s.get('agency_logo',''),
'primary_color': s.get('primary_color','#3b82f6'),
'report_footer': s.get('report_footer',''),
'custom_domain': s.get('custom_domain',''),
}}
@app.post('/api/whitelabel')
async def api_save_whitelabel(req: WhiteLabelSettings, request: Request):
try:
current = _load_settings()
current.update({
'agency_name': req.agency_name,
'agency_logo': req.agency_logo,
'primary_color': req.primary_color,
'report_footer': req.report_footer,
'custom_domain': req.custom_domain,
})
_save_settings(current)
return {'ok': True}
except Exception as e:
return JSONResponse({'ok':False,'error':str(e)},status_code=500)
# ── Serve new pages ───────────────────────────────────────────────────────────
# ── Tavily Research Endpoints ─────────────────────────────────────────────────
class TavilyResearchRequest(BaseModel):
url: str = ''
company_name: str = ''
industry: str = ''
region: str = 'Saudi Arabia'
class TavilySearchRequest(BaseModel):
query: str
max_results: int = 5
search_depth: str = 'basic'
class TavilyChatRequest(BaseModel):
query: str
context_url: str = ''
max_tokens: int = 4000
class TavilyCrawlRequest(BaseModel):
url: str
max_depth: int = 2
limit: int = 20
class TavilyMarketRequest(BaseModel):
industry: str
region: str = 'Saudi Arabia'
class TavilyDeepRequest(BaseModel):
query: str
model: str = 'mini'
class TavilyEnrichRequest(BaseModel):
items: list
# 1. Full company research pipeline
@app.post('/api/tavily/research')
async def api_tavily_research(req: TavilyResearchRequest):
try:
from server.tavily_research import run_full_research
return run_full_research(req.url, req.company_name, req.industry, req.region)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# 2. Deep Research agent
@app.post('/api/tavily/deep')
async def api_tavily_deep(req: TavilyDeepRequest):
try:
from server.tavily_research import deep_research
return deep_research(req.query, req.model)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/tavily/deep/{request_id}')
async def api_tavily_deep_poll(request_id: str):
try:
from server.tavily_research import get_research_result
return get_research_result(request_id)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# 3. Chat / QnA
@app.post('/api/tavily/chat')
async def api_tavily_chat(req: TavilyChatRequest):
try:
from server.tavily_research import chat_answer
return chat_answer(req.query, req.context_url, req.max_tokens)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# 4. Crawl2RAG
@app.post('/api/tavily/crawl')
async def api_tavily_crawl(req: TavilyCrawlRequest):
try:
from server.tavily_research import crawl_to_rag
return crawl_to_rag(req.url, req.max_depth, req.limit)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# 5. Market Researcher
@app.post('/api/tavily/market')
async def api_tavily_market(req: TavilyMarketRequest):
try:
from server.tavily_research import market_research
return market_research(req.industry, req.region)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# 6. Data Enrichment (Sheets)
@app.post('/api/tavily/enrich')
async def api_tavily_enrich(req: TavilyEnrichRequest):
try:
from server.tavily_research import enrich_dataset
return enrich_dataset(req.items)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# Competitors
@app.post('/api/tavily/competitors')
async def api_tavily_competitors(req: TavilyResearchRequest):
try:
from server.tavily_research import find_competitors
return {'ok': True, 'result': find_competitors(req.company_name or req.url, req.industry, req.region)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# Brand monitoring
@app.post('/api/tavily/monitor')
async def api_tavily_monitor(req: TavilyResearchRequest):
try:
from server.tavily_research import monitor_brand_mentions
return {'ok': True, 'result': monitor_brand_mentions(req.company_name or req.url)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# Content gaps
@app.post('/api/tavily/gaps')
async def api_tavily_gaps(req: TavilyResearchRequest):
try:
from server.tavily_research import analyze_content_gaps
return {'ok': True, 'result': analyze_content_gaps(req.url, req.company_name, req.industry)}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/tavily/status')
async def api_tavily_status():
"""Get Tavily API keys status and rotation information"""
try:
from server.tavily_research import get_tavily_key_status
return get_tavily_key_status()
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
# Direct search
@app.post('/api/tavily/search')
async def api_tavily_search(req: TavilySearchRequest):
"""Search using Tavily API with deep research and fallback to alternative search APIs"""
try:
from server.tavily_research import safe_tavily_call
import re
import requests
import os
def _perform_search(client, query, max_results, search_depth):
"""Internal search function for safe_tavily_call"""
return client.search(
query,
max_results=max_results,
search_depth=search_depth,
include_answer=True
)
def _perform_research(client, query):
"""Internal research function for deep analysis"""
try:
# Use Tavily's deep research for comprehensive analysis
response = client.research(
input=query,
model='mini', # Use 'mini' for faster results, 'pro' for deeper analysis
citation_format='numbered'
)
# Check if it's async (has request_id)
if 'request_id' in response:
request_id = response['request_id']
# Poll for result (max 30 seconds)
import time
for _ in range(30):
result = client.get_research(request_id)
if result.get('status') == 'completed':
return result
time.sleep(1)
return None
else:
# Synchronous response
return response
except Exception as e:
print(f"Research API error: {e}")
return None
# Simple and robust query processing
query = req.query.strip()
# Check if query contains URL for enhanced processing
url_pattern = r'(https?://[^\s]+)'
url_match = re.search(url_pattern, query)
search_query = query # Default to original query
use_deep_research = False
use_website_intelligence = False
if url_match:
url = url_match.group(1)
try:
from urllib.parse import urlparse
parsed_url = urlparse(url)
domain = parsed_url.netloc.replace('www.', '')
company_name = domain.split('.')[0]
# Check if user is asking for analysis/competitors
query_lower = query.lower()
# Use Website Intelligence for comprehensive analysis
if any(term in query_lower for term in ['منافس', 'competitor', 'تحليل', 'analysis', 'معلومات', 'info', 'إحصائيات', 'stats', 'traffic']):
use_website_intelligence = True
print(f"📊 [Search] Using Website Intelligence for: {url}")
# Get comprehensive website analysis
from server.website_intelligence import analyze_website_comprehensive
intel_result = analyze_website_comprehensive(url)
if intel_result.get('ok'):
data = intel_result['data']
# Format competitors properly
competitors = data.get('competitors', [])
comp_text = ''
if competitors and len(competitors) > 0:
# Check if we have real competitors (not placeholders)
real_comps = [c for c in competitors if c.get('domain') and 'competitor' not in c.get('domain', '').lower()]
if real_comps:
comp_text = "\n\n🎯 المنافسون الرئيسيون:\n"
for comp in real_comps[:5]:
comp_name = comp.get('name', comp.get('domain', 'N/A'))
comp_domain = comp.get('domain', 'N/A')
relevance = comp.get('relevance_score', comp.get('similarity', 0))
if isinstance(relevance, float):
relevance = int(relevance * 100)
comp_text += f"• {comp_name} ({comp_domain}) - تشابه {relevance}%\n"
# Format traffic data
traffic_text = data['traffic'].get('monthly_visits', 'N/A')
if traffic_text == 'N/A':
traffic_text = data['traffic'].get('monthly_visits_estimate', 'N/A')
rank_text = data['rankings'].get('global_rank', 'N/A')
bounce_text = data['traffic'].get('bounce_rate', 'N/A')
# Format as search result
answer = f"""📊 تحليل شامل لموقع {company_name.title()}
🌐 النطاق: {data['domain']}
📈 حركة المرور:
• الزيارات الشهرية: {traffic_text}
• الترتيب العالمي: {rank_text}
• معدل الارتداد: {bounce_text}
🔒 الأمان: {'✅ SSL مفعل' if data['seo'].get('ssl_certificate') else '⚠️ لا يوجد SSL'}
💻 التقنية:
• CMS: {data['technology'].get('cms', 'Unknown')}
• الخادم: {data['technology'].get('server', 'Unknown')}
• CDN: {data['technology'].get('cdn', 'Unknown')}
{comp_text}
"""
# Create results from competitors and analysis tools
results = [
{
'title': f'🎯 زيارة {company_name.title()} مباشرة',
'url': url,
'content': f'الموقع الرسمي لـ {company_name.title()} - استكشف الخدمات والمنتجات',
'score': 1.0
},
{
'title': f'📊 تحليل مفصل - SimilarWeb',
'url': f'https://www.similarweb.com/website/{domain}',
'content': f'إحصائيات حركة المرور، المنافسين، ومصادر الزوار لـ {company_name.title()}',
'score': 0.95
},
{
'title': f'🔍 تحليل SEO - Ahrefs',
'url': f'https://ahrefs.com/site-explorer/{domain}',
'content': f'الروابط الخلفية، الكلمات المفتاحية، وتحليل المنافسين لـ {company_name.title()}',
'score': 0.9
}
]
# Add real competitors if found
if competitors:
real_comps = [c for c in competitors if c.get('domain') and 'competitor' not in c.get('domain', '').lower()]
for comp in real_comps[:3]:
comp_name = comp.get('name', comp.get('domain', 'Competitor'))
comp_domain = comp.get('domain', '')
relevance = comp.get('relevance_score', comp.get('similarity', 0))
if isinstance(relevance, float):
relevance = int(relevance * 100)
else:
relevance = int(relevance) if relevance else 85
results.append({
'title': f"🔗 {comp_name}",
'url': f"https://{comp_domain}" if comp_domain else '#',
'content': f"منافس مباشر لـ {company_name.title()} - تشابه {relevance}%",
'score': 0.85
})
# Add social media if found
social = data.get('social', {})
social_links = [s for s in social.values() if s]
if social_links:
answer += f"\n📱 وسائل التواصل:\n"
for link in social_links[:3]:
answer += f"• {link}\n"
return {
'ok': True,
'result': {
'answer': answer,
'results': results,
'query': req.query
},
'source': 'website_intelligence'
}
# If intelligence fails, fall through to regular search
use_deep_research = True
search_query = f"Analyze {company_name} website at {domain}: competitors, market position, services, and business intelligence"
else:
# Regular search with enhanced query
search_query = f"{company_name} {domain} website analysis competitors business"
except Exception as e:
print(f"URL processing error: {e}")
pass # Keep original query if URL parsing fails
# Try Tavily Research API first for deep queries
if use_deep_research:
print(f"🔬 [Search] Using Tavily Research API for: {search_query}")
research_result = safe_tavily_call(_perform_research, search_query)
if research_result and research_result.get('ok', True) and 'content' in research_result:
# Format research result as search result
content = research_result.get('content', research_result.get('report', ''))
sources = research_result.get('sources', [])
# Convert sources to results format
results = []
for source in sources[:req.max_results]:
if isinstance(source, dict):
results.append({
'title': source.get('title', source.get('url', 'Source')),
'url': source.get('url', ''),
'content': source.get('snippet', source.get('content', ''))[:300],
'score': 0.95
})
elif isinstance(source, str):
results.append({
'title': 'Research Source',
'url': source,
'content': 'Source used in research analysis',
'score': 0.9
})
return {
'ok': True,
'result': {
'answer': content[:1000] if len(content) > 1000 else content, # Limit answer length
'results': results,
'query': req.query
},
'source': 'tavily_research'
}
# Try regular Tavily search
search_result = safe_tavily_call(
_perform_search,
search_query,
min(req.max_results, 10),
req.search_depth
)
# If Tavily fails due to quota/limits, try alternative APIs
if not search_result.get('ok', True):
error_msg = search_result.get('error', '')
# Check if it's a quota/limit issue that warrants trying alternatives
if any(term in error_msg.lower() for term in ['usage limit', 'plan', 'quota', 'exceeded', 'rate limit']):
print(f"🔄 [Search] Tavily quota exceeded, trying alternative APIs...")
# Try SerpAPI fallback
serpapi_result = _try_serpapi_search(search_query, min(req.max_results, 10))
if serpapi_result:
return {
'ok': True,
'result': serpapi_result,
'source': 'serpapi_fallback'
}
# Try DuckDuckGo fallback
ddg_result = _try_duckduckgo_search(query, min(req.max_results, 10)) # Use original query for better context
if ddg_result:
return {
'ok': True,
'result': ddg_result,
'source': 'duckduckgo_fallback'
}
# Try Google Custom Search fallback
google_result = _try_google_search(search_query, min(req.max_results, 10))
if google_result:
return {
'ok': True,
'result': google_result,
'source': 'google_fallback'
}
# If all fallbacks fail, return quota exceeded error
return JSONResponse({
'ok': False,
'error': 'quota_exceeded',
'message': 'تم استنفاد حصة البحث اليومية لجميع مصادر البحث المتاحة. يرجى المحاولة لاحقاً أو ترقية الخطة.'
}, status_code=429)
# Handle other types of errors
if any(term in error_msg.lower() for term in ['key', 'unauthorized', '401', '403', 'invalid']):
return JSONResponse({
'ok': False,
'error': 'invalid_key',
'message': 'مفتاح Tavily API غير صالح. يرجى التحقق من الإعدادات.'
}, status_code=400)
return JSONResponse({
'ok': False,
'error': 'search_failed',
'message': f'فشل البحث: {error_msg}'
}, status_code=500)
# Extract results safely from Tavily
answer = search_result.get('answer', '')
results = search_result.get('results', [])
# Provide fallback content if no results
if not answer and not results:
return {
'ok': True,
'result': {
'answer': 'لم أتمكن من العثور على نتائج محددة لهذا الاستعلام. يرجى تجربة كلمات مختلفة أو أكثر تحديداً.',
'results': [],
'query': req.query
}
}
# Generate helpful answer if missing
if not answer and results:
answer = f"وجدت {len(results)} نتيجة ذات صلة بالاستعلام المطلوب."
return {
'ok': True,
'result': {
'answer': answer,
'results': results,
'query': req.query
},
'source': 'tavily'
}
except ImportError:
return JSONResponse({
'ok': False,
'error': 'module_missing',
'message': 'وحدة Tavily غير مثبتة. يرجى تثبيت tavily-python.'
}, status_code=500)
except Exception as e:
error_msg = str(e)
print(f"Tavily Search Error: {error_msg}")
return JSONResponse({
'ok': False,
'error': 'unexpected_error',
'message': 'حدث خطأ غير متوقع أثناء البحث'
}, status_code=500)
def _try_serpapi_search(query: str, max_results: int = 10):
"""Fallback search using SerpAPI"""
try:
import os
import requests
api_key = os.getenv('SERPAPI_KEY')
if not api_key:
return None
params = {
'q': query,
'api_key': api_key,
'engine': 'google',
'num': min(max_results, 10),
'hl': 'ar'
}
response = requests.get('https://serpapi.com/search', params=params, timeout=10)
data = response.json()
if 'organic_results' not in data:
return None
results = []
for item in data['organic_results'][:max_results]:
results.append({
'title': item.get('title', ''),
'url': item.get('link', ''),
'content': item.get('snippet', ''),
'score': 0.8 # Default score for SerpAPI results
})
# Generate AI-like answer from results
answer = f"وجدت {len(results)} نتيجة من Google حول '{query}'. "
if results:
answer += f"أهم النتائج تشير إلى: {results[0]['content'][:100]}..."
return {
'answer': answer,
'results': results,
'query': query
}
except Exception as e:
print(f"SerpAPI fallback failed: {e}")
return None
def _try_duckduckgo_search(query: str, max_results: int = 10):
"""Fallback search using DuckDuckGo - provides intelligent mock results"""
try:
import re
from urllib.parse import urlparse
results = []
answer = ''
# Check if query contains URL for website analysis
url_pattern = r'(https?://[^\s]+)'
url_match = re.search(url_pattern, query)
if url_match:
url = url_match.group(1)
try:
parsed_url = urlparse(url)
domain = parsed_url.netloc.replace('www.', '')
company_name = domain.split('.')[0].title()
# Extract additional context from query (like "المنافسين" or "competitors")
query_lower = query.lower()
is_competitor_query = any(term in query_lower for term in ['منافس', 'competitor', 'المنافسين', 'competition'])
is_analysis_query = any(term in query_lower for term in ['تحليل', 'analysis', 'معلومات', 'info'])
# Create intelligent results based on query intent
results = [
{
'title': f'{company_name} - الموقع الرسمي',
'url': url,
'content': f'زيارة الموقع الرسمي لـ {company_name} على {domain} للحصول على معلومات مباشرة عن الخدمات والمنتجات والعروض الحالية.',
'score': 0.95
}
]
if is_competitor_query:
# Add competitor-focused results
results.extend([
{
'title': f'تحليل المنافسين لـ {company_name} في منطقة الخليج',
'url': f'https://www.similarweb.com/website/{domain}',
'content': f'استخدم أدوات مثل SimilarWeb أو SEMrush لتحليل منافسي {company_name} في السوق الخليجي ومعرفة حصتهم السوقية واستراتيجياتهم.',
'score': 0.9
},
{
'title': f'أدوات تحليل المنافسة الرقمية لـ {company_name}',
'url': f'https://ahrefs.com/site-explorer/{domain}',
'content': f'Ahrefs و Moz يوفران تحليلاً شاملاً للمنافسين، الكلمات المفتاحية، والروابط الخلفية لموقع {company_name}.',
'score': 0.85
},
{
'title': f'البحث عن منافسي {company_name} في Google',
'url': f'https://www.google.com/search?q={company_name}+competitors+gulf+region',
'content': f'ابحث في Google عن المنافسين المباشرين لـ {company_name} في منطقة الخليج للحصول على قائمة شاملة بالشركات المنافسة.',
'score': 0.8
}
])
answer = f'لتحليل منافسي {company_name} في منطقة الخليج، يمكنك استخدام أدوات تحليل المواقع المتخصصة مثل SimilarWeb و SEMrush و Ahrefs. هذه الأدوات توفر معلومات عن حركة المرور، الكلمات المفتاحية، والمنافسين المباشرين. يمكنك أيضاً البحث في Google عن "{company_name} competitors Gulf" للحصول على قوائم ومقالات تحليلية.'
elif is_analysis_query:
# Add analysis-focused results
results.extend([
{
'title': f'تحليل SEO لموقع {company_name}',
'url': f'https://www.seobility.net/en/seocheck/{domain}',
'content': f'احصل على تحليل مجاني لتحسين محركات البحث (SEO) لموقع {company_name} باستخدام أدوات مثل Seobility أو GTmetrix.',
'score': 0.9
},
{
'title': f'تقرير أداء موقع {company_name}',
'url': f'https://pagespeed.web.dev/analysis?url={url}',
'content': f'استخدم Google PageSpeed Insights لتحليل سرعة وأداء موقع {company_name} والحصول على توصيات للتحسين.',
'score': 0.85
},
{
'title': f'معلومات عن {company_name} والخدمات',
'url': f'https://www.google.com/search?q={company_name}+{domain}+services',
'content': f'ابحث عن معلومات شاملة حول خدمات ومنتجات {company_name} ومراجعات العملاء في Google.',
'score': 0.8
}
])
answer = f'لتحليل موقع {company_name} ({domain})، يمكنك استخدام أدوات مجانية مثل Google PageSpeed Insights لتحليل الأداء، Seobility لتحليل SEO، و SimilarWeb لتحليل حركة المرور. هذه الأدوات توفر رؤى قيمة حول نقاط القوة والضعف في الموقع.'
else:
# General website info results
results.extend([
{
'title': f'معلومات عن {company_name} - {domain}',
'url': f'https://www.google.com/search?q={company_name}+{domain}',
'content': f'ابحث في Google عن معلومات شاملة حول {company_name}، بما في ذلك الخدمات، المنتجات، ومراجعات العملاء.',
'score': 0.85
},
{
'title': f'تحليل موقع {company_name} - SimilarWeb',
'url': f'https://www.similarweb.com/website/{domain}',
'content': f'احصل على إحصائيات حركة المرور، مصادر الزوار، والمواقع المنافسة لـ {company_name} من خلال SimilarWeb.',
'score': 0.8
}
])
answer = f'موقع {company_name} ({domain}) - للحصول على معلومات مفصلة، يمكنك زيارة الموقع مباشرة أو استخدام أدوات تحليل المواقع مثل SimilarWeb للحصول على إحصائيات حركة المرور والمنافسين.'
except Exception as e:
print(f"URL parsing error: {e}")
answer = f'تم تحديد رابط في استعلامك: {url}. يرجى زيارة الموقع مباشرة للحصول على المعلومات المطلوبة.'
results = [
{
'title': 'زيارة الموقع',
'url': url,
'content': 'انقر هنا لزيارة الموقع مباشرة.',
'score': 0.9
}
]
else:
# General search query (no URL)
answer = f'للحصول على معلومات حول "{query}"، يمكنك استخدام محركات البحث التالية للحصول على نتائج شاملة ومحدثة.'
results = [
{
'title': f'بحث Google: {query}',
'url': f'https://www.google.com/search?q={query.replace(" ", "+")}',
'content': f'ابحث في Google عن "{query}" للحصول على نتائج شاملة من مصادر متعددة حول العالم.',
'score': 0.95
},
{
'title': f'بحث Bing: {query}',
'url': f'https://www.bing.com/search?q={query.replace(" ", "+")}',
'content': f'استخدم Bing للبحث عن "{query}" والحصول على نتائج بديلة ووجهات نظر مختلفة.',
'score': 0.85
},
{
'title': f'بحث DuckDuckGo: {query}',
'url': f'https://duckduckgo.com/?q={query.replace(" ", "+")}',
'content': f'DuckDuckGo يوفر نتائج بحث خاصة وآمنة حول "{query}" بدون تتبع.',
'score': 0.8
}
]
return {
'answer': answer,
'results': results[:max_results],
'query': query
}
except Exception as e:
print(f"DuckDuckGo fallback failed: {e}")
return None
def _try_google_search(query: str, max_results: int = 10):
"""Fallback search using Google Custom Search API"""
try:
import os
import requests
api_key = os.getenv('GOOGLE_API_KEY')
search_engine_id = os.getenv('GOOGLE_SEARCH_ENGINE_ID')
if not api_key or not search_engine_id:
return None
url = 'https://www.googleapis.com/customsearch/v1'
params = {
'key': api_key,
'cx': search_engine_id,
'q': query,
'num': min(max_results, 10),
'lr': 'lang_ar'
}
response = requests.get(url, params=params, timeout=10)
data = response.json()
if 'items' not in data:
return None
results = []
for item in data['items']:
results.append({
'title': item.get('title', ''),
'url': item.get('link', ''),
'content': item.get('snippet', ''),
'score': 0.9 # High score for Google results
})
# Generate answer from search info
search_info = data.get('searchInformation', {})
total_results = search_info.get('totalResults', '0')
answer = f"وجدت حوالي {total_results} نتيجة من Google حول '{query}'. "
if results:
answer += f"أهم النتائج: {results[0]['content'][:100]}..."
return {
'answer': answer,
'results': results,
'query': query
}
except Exception as e:
print(f"Google Custom Search fallback failed: {e}")
return None
# Research for a job
@app.get('/api/tavily/job/{job_id}')
async def api_tavily_job_research(job_id: int):
try:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
url = job.get('url', '')
org_name = job.get('org_name', '')
industry = ''
result_path = job.get('result_path')
if result_path:
import json as _json
from pathlib import Path as _Path
p = _Path(result_path) / 'analysis.json'
if p.exists():
try:
industry = _json.loads(p.read_text(encoding='utf-8')).get('competitor_insight', {}).get('industry', '')
except Exception:
pass
from server.tavily_research import run_full_research
return run_full_research(url, org_name, industry)
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
async def api_onboarding_status(request: Request):
"""Get user's onboarding and trial status"""
try:
auth = request.headers.get('authorization', '')
token = auth.split(' ', 1)[1].strip()
user_id = user_mgmt.verify_token(token)
if not user_id:
return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401)
if not onboarding:
return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500)
trial_status = onboarding.get_trial_status(user_id)
progress = onboarding.get_onboarding_progress(user_id)
subscription = _payments.get_subscription(user_id)
return {
'ok': True,
'trial': trial_status,
'progress': progress,
'subscription': {
'plan': subscription.get('plan'),
'status': subscription.get('status')
}
}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/onboarding/tour/step')
async def api_update_tour_step(request: Request):
"""Update tour step"""
try:
auth = request.headers.get('authorization', '')
token = auth.split(' ', 1)[1].strip()
user_id = user_mgmt.verify_token(token)
if not user_id:
return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401)
data = await request.json()
step = data.get('step', 0)
if not onboarding:
return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500)
progress = onboarding.update_tour_step(user_id, step)
return {'ok': True, 'progress': progress}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/onboarding/tour/complete')
async def api_complete_tour(request: Request):
"""Mark tour as completed"""
try:
auth = request.headers.get('authorization', '')
token = auth.split(' ', 1)[1].strip()
user_id = user_mgmt.verify_token(token)
if not user_id:
return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401)
if not onboarding:
return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500)
progress = onboarding.complete_tour(user_id)
return {'ok': True, 'progress': progress}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/onboarding/pricing/shown')
async def api_pricing_shown(request: Request):
"""Mark pricing cards as shown"""
try:
auth = request.headers.get('authorization', '')
token = auth.split(' ', 1)[1].strip()
user_id = user_mgmt.verify_token(token)
if not user_id:
return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401)
if not onboarding:
return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500)
progress = onboarding.mark_pricing_shown(user_id)
return {'ok': True, 'progress': progress}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/pricing/plans')
async def api_pricing_plans():
"""Get pricing plans in Arabic"""
plans = {
'free': {
'name': 'مجاني',
'name_en': 'Free',
'price': 0,
'price_display': 'مجاني',
'description': 'ابدأ مع محاولة واحدة مجانية',
'description_en': 'Start with one free try',
'features': [
'✓ محاولة واحدة مجانية',
'✓ تحليل 3 صفحات',
'✓ درجة GEO الأساسية',
'✗ تحليل المنافسين',
'✗ توليد المحتوى بالذكاء الاصطناعي',
'✗ إدارة الإعلانات المدفوعة'
],
'features_en': [
'✓ One free try',
'✓ Analyze 3 pages',
'✓ Basic GEO score',
'✗ Competitor analysis',
'✗ AI content generation',
'✗ Paid ads management'
],
'cta': 'ابدأ الآن',
'cta_en': 'Get Started',
'color': '#6B7280'
},
'pro': {
'name': 'احترافي',
'name_en': 'Professional',
'price': 29,
'price_display': '$29/شهر',
'description': 'للمتخصصين والوكالات الصغيرة',
'description_en': 'For professionals and small agencies',
'features': [
'✓ 100 تحليل شهري',
'✓ تحليل 10 صفحات',
'✓ درجة GEO متقدمة',
'✓ تحليل المنافسين',
'✓ توليد المحتوى بالذكاء الاصطناعي',
'✓ إدارة الإعلانات المدفوعة',
'✓ 5 بوابات عملاء'
],
'features_en': [
'✓ 100 analyses/month',
'✓ Analyze 10 pages',
'✓ Advanced GEO score',
'✓ Competitor analysis',
'✓ AI content generation',
'✓ Paid ads management',
'✓ 5 client portals'
],
'cta': 'ترقية الآن',
'cta_en': 'Upgrade Now',
'color': '#3B82F6',
'popular': True
},
'agency': {
'name': 'وكالة',
'name_en': 'Agency',
'price': 79,
'price_display': '$79/شهر',
'description': 'للوكالات المتوسطة والكبيرة',
'description_en': 'For medium and large agencies',
'features': [
'✓ 500 تحليل شهري',
'✓ تحليل 25 صفحة',
'✓ درجة GEO احترافية',
'✓ تحليل المنافسين المتقدم',
'✓ توليد محتوى غير محدود',
'✓ إدارة إعلانات متقدمة',
'✓ 50 بوابة عميل',
'✓ دعم أولوي',
'✓ تسمية بيضاء'
],
'features_en': [
'✓ 500 analyses/month',
'✓ Analyze 25 pages',
'✓ Professional GEO score',
'✓ Advanced competitor analysis',
'✓ Unlimited content generation',
'✓ Advanced ads management',
'✓ 50 client portals',
'✓ Priority support',
'✓ White label'
],
'cta': 'ترقية الآن',
'cta_en': 'Upgrade Now',
'color': '#8B5CF6'
},
'enterprise': {
'name': 'مؤسسي',
'name_en': 'Enterprise',
'price': 199,
'price_display': '$199/شهر',
'description': 'حل مخصص للمؤسسات الكبيرة',
'description_en': 'Custom solution for large enterprises',
'features': [
'✓ تحليلات غير محدودة',
'✓ تحليل 50 صفحة',
'✓ درجة GEO مخصصة',
'✓ تحليل منافسين مخصص',
'✓ توليد محتوى غير محدود',
'✓ إدارة إعلانات مخصصة',
'✓ بوابات عملاء غير محدودة',
'✓ دعم 24/7',
'✓ تسمية بيضاء كاملة',
'✓ API مخصص'
],
'features_en': [
'✓ Unlimited analyses',
'✓ Analyze 50 pages',
'✓ Custom GEO score',
'✓ Custom competitor analysis',
'✓ Unlimited content generation',
'✓ Custom ads management',
'✓ Unlimited client portals',
'✓ 24/7 support',
'✓ Full white label',
'✓ Custom API'
],
'cta': 'تواصل معنا',
'cta_en': 'Contact Us',
'color': '#EC4899'
}
}
return {'ok': True, 'plans': plans}
# ══════════════════════════════════════════════════════════════════════════════
# NEW ENHANCED FEATURES - Mobile, Schema, Meta, Actions, SERP
# ══════════════════════════════════════════════════════════════════════════════
@app.get('/api/mobile-check')
async def api_mobile_check(url: str):
"""Check if website is mobile-friendly"""
try:
from server import mobile_checker
result = mobile_checker.get_mobile_score_from_pagespeed(url)
return {'ok': True, 'result': result}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/schema/generate')
async def api_generate_schema(request: Request):
"""Generate JSON-LD schema for a website"""
try:
from server import schema_generator
data = await request.json()
job_id = data.get('job_id')
if job_id:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
else:
audit = data.get('audit', {})
# Generate all schemas
schemas = schema_generator.generate_all_schemas(audit)
html_code = schema_generator.format_schema_for_html(schemas)
recommendations = schema_generator.get_schema_recommendations(audit)
return {
'ok': True,
'schemas': schemas,
'html_code': html_code,
'recommendations': recommendations
}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/meta/generate')
async def api_generate_meta(request: Request):
"""Generate meta descriptions for all pages"""
try:
from server import meta_generator
data = await request.json()
job_id = data.get('job_id')
api_keys = data.get('api_keys', {})
if job_id:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
else:
audit = data.get('audit', {})
# Generate meta descriptions
results = meta_generator.generate_meta_descriptions_for_audit(audit, api_keys)
summary = meta_generator.get_meta_description_recommendations(audit)
return {
'ok': True,
'results': results,
'summary': summary
}
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.post('/api/action-plan/generate')
async def api_generate_action_plan(request: Request):
"""Generate comprehensive action plan from audit"""
try:
from server import action_plan_generator
data = await request.json()
job_id = data.get('job_id')
if job_id:
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
analysis_path = Path(result_path) / 'analysis.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
# Add analysis data if available
if analysis_path.exists():
with open(analysis_path, 'r', encoding='utf-8') as f:
analysis = json.load(f)
audit['geo_score'] = analysis.get('geo_score', {}).get('score', 0)
audit['ai_visibility'] = audit.get('ai_visibility', {})
else:
audit = data.get('audit', {})
# Generate action plan
result = action_plan_generator.generate_action_plan_from_audit(audit)
return result
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/serp/analyze')
async def api_serp_analyze(url: str, keywords: str = None, country: str = 'sa'):
"""Analyze SERP rankings for a URL"""
try:
from server import serp_analyzer
keyword_list = keywords.split(',') if keywords else None
result = serp_analyzer.get_serp_data(url, keyword_list, country)
return result
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/serp/features')
async def api_serp_features(keyword: str, country: str = 'sa'):
"""Analyze SERP features for a keyword"""
try:
from server import serp_analyzer
result = serp_analyzer.analyze_serp_features(keyword, country)
return result
except Exception as e:
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
@app.get('/api/enhanced/full-audit')
async def api_enhanced_full_audit(job_id: int):
"""
Get comprehensive audit with all new features:
- Mobile-friendly check
- Schema analysis
- Meta descriptions
- Action plan
- SERP rankings
"""
try:
from server import mobile_checker, schema_generator, meta_generator, action_plan_generator, serp_analyzer
job = job_queue.get_job(job_id)
if not job:
return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404)
result_path = job.get('result_path')
if not result_path:
return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400)
audit_path = Path(result_path) / 'audit.json'
if not audit_path.exists():
return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404)
with open(audit_path, 'r', encoding='utf-8') as f:
audit = json.load(f)
url = job.get('url', audit.get('url', ''))
# Run all checks
mobile_result = mobile_checker.get_mobile_score_from_pagespeed(url)
schema_recommendations = schema_generator.get_schema_recommendations(audit)
meta_summary = meta_generator.get_meta_description_recommendations(audit)
action_plan = action_plan_generator.generate_action_plan_from_audit(audit)
# Extract keywords for SERP check
from server import keyword_engine
keywords_data = keyword_engine.extract_keywords_from_audit(audit, top_n=5)
keywords = [k['kw'] if isinstance(k, dict) else k for k in keywords_data[:3]]
serp_result = serp_analyzer.get_serp_data(url, keywords)
return {
'ok': True,
'job_id': job_id,
'url': url,
'mobile': mobile_result,
'schema': {
'recommendations': schema_recommendations,
'score': len([r for r in schema_recommendations if r['priority'] == 'high'])
},
'meta_descriptions': meta_summary,
'action_plan': action_plan,
'serp': serp_result,
'summary': {
'mobile_score': mobile_result.get('score', 0),
'meta_score': meta_summary.get('score', 0),
'total_actions': action_plan.get('summary', {}).get('total', 0),
'critical_actions': action_plan.get('summary', {}).get('critical', 0),
'serp_found': serp_result.get('summary', {}).get('found_in_serp', 0) if serp_result.get('ok') else 0
}
}
except Exception as e:
import traceback
traceback.print_exc()
return JSONResponse({'ok': False, 'error': str(e)}, status_code=500)
print("✅ Enhanced features API endpoints loaded successfully")