| import os |
| import json |
| from fastapi import FastAPI, Request, BackgroundTasks |
| from fastapi.responses import FileResponse, JSONResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel |
| from pathlib import Path |
| import datetime |
| import sqlite3 |
| from typing import Optional, List |
|
|
| from dotenv import load_dotenv |
| load_dotenv(override=True) |
|
|
| from slowapi import Limiter, _rate_limit_exceeded_handler |
| from slowapi.util import get_remote_address |
| from slowapi.errors import RateLimitExceeded |
|
|
| print(" GEO Platform API starting...") |
| print(f" Working directory: {os.getcwd()}") |
| print(f" Python version: {os.sys.version}") |
|
|
| |
| |
| run_pipeline = None |
|
|
| try: |
| from server import ai_analysis |
| print(" ai_analysis loaded (from server)") |
| except Exception as e: |
| print(f" ai_analysis failed: {e}") |
| ai_analysis = None |
|
|
| try: |
| from server import geo_services |
| print(" geo_services loaded") |
| except Exception as e: |
| print(f" geo_services failed: {e}") |
| geo_services = None |
|
|
| try: |
| from server import ai_visibility |
| print(" ai_visibility loaded") |
| except Exception as e: |
| print(f" ai_visibility failed: {e}") |
| ai_visibility = None |
|
|
| try: |
| from server import job_queue |
| print(" job_queue loaded") |
| except Exception as e: |
| print(f" job_queue failed: {e}") |
| job_queue = None |
|
|
| try: |
| from server import keyword_engine |
| print(" keyword_engine loaded") |
| except Exception as e: |
| print(f" keyword_engine failed: {e}") |
| keyword_engine = None |
|
|
| try: |
| from server import users as user_mgmt |
| print(" users loaded") |
| except Exception as e: |
| print(f" users failed: {e}") |
| user_mgmt = None |
|
|
| try: |
| from server import search_intel |
| print(" search_intel loaded") |
| except Exception as e: |
| print(f" search_intel failed: {e}") |
| search_intel = None |
|
|
| try: |
| from server import payments as _payments |
| except Exception: |
| _payments = None |
|
|
| try: |
| from server import onboarding |
| except Exception: |
| onboarding = None |
|
|
| from fastapi import WebSocket |
| import asyncio |
|
|
| OUTPUT_DIR = Path(os.environ.get('OUTPUT_DIR', str(Path(__file__).resolve().parent.parent / 'output'))) |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| print(f" Output directory: {OUTPUT_DIR}") |
|
|
| app = FastAPI(title='GEO Platform API') |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=['*'], |
| allow_credentials=True, |
| allow_methods=['*'], |
| allow_headers=['*'], |
| ) |
|
|
| |
| limiter = Limiter(key_func=get_remote_address) |
| app.state.limiter = limiter |
| app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) |
|
|
| print(" FastAPI app created with rate limiting and CORS") |
|
|
| |
| frontend_dir = Path(__file__).resolve().parent.parent / 'frontend' |
| if frontend_dir.exists(): |
| app.mount('/static', StaticFiles(directory=str(frontend_dir)), name='static') |
| print(f" Frontend mounted: {frontend_dir}") |
| else: |
| print(f" Frontend directory not found: {frontend_dir}") |
|
|
| class CrawlRequest(BaseModel): |
| url: str |
| org_name: str |
| org_url: str |
| max_pages: int = 3 |
| runs: int = 1 |
| api_keys: Optional[dict] = None |
|
|
| class RecommendationRequest(BaseModel): |
| api_keys: Optional[dict] = None |
| job_id: Optional[int] = None |
| extra_context: Optional[dict] = None |
|
|
| class AnalysisRequest(BaseModel): |
| api_keys: Optional[dict] = None |
| job_id: Optional[int] = None |
|
|
| class SimulationRequest(BaseModel): |
| content: str |
| brand: str |
| api_keys: Optional[dict] = None |
|
|
| @app.post('/api/crawl') |
| async def api_crawl(req: CrawlRequest): |
| global run_pipeline |
| if run_pipeline is None: |
| try: |
| from src.main import run_pipeline as _rp |
| run_pipeline = _rp |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': 'pipeline not available: ' + str(e)}, status_code=500) |
| |
| runs = max(1, req.runs or 1) |
| run_results = [] |
| timestamps = [] |
| scores = [] |
| breakdowns = [] |
| audit_objs = [] |
| try: |
| for i in range(runs): |
| ts = datetime.datetime.utcnow().strftime('%Y%m%dT%H%M%SZ') |
| subdir = OUTPUT_DIR / f"run-{ts}-{i+1}" |
| res = run_pipeline(req.url, req.org_name, req.org_url, max_pages=req.max_pages, output_dir=subdir) |
| |
| audit_path = Path(res['audit_path']) |
| with open(audit_path, 'r', encoding='utf-8') as f: |
| audit_obj = json.load(f) |
| audit_objs.append(audit_obj) |
|
|
| |
| org_name_for_visibility = req.org_name |
| if not org_name_for_visibility or org_name_for_visibility == 'Company': |
| pages_for_inference = audit_obj.get('pages', []) |
| inferred = ai_analysis.infer_brand_name(pages_for_inference) |
| if inferred and inferred != 'Company': |
| org_name_for_visibility = inferred |
| |
| audit_obj['org_name'] = inferred |
| |
| |
| queries = [f"What is {org_name_for_visibility}?", f"Best services for {org_name_for_visibility}", f"Why should I buy from {org_name_for_visibility}?"] |
| perf = ai_visibility.check_perplexity(org_name_for_visibility, queries) |
| if not perf.get('enabled') and perf.get('reason') == 'PERPLEXITY_KEY not set': |
| |
| try: |
| perf = ai_visibility.check_openai_visibility(org_name_for_visibility, queries) |
| perf['fallback'] = 'openai' |
| except Exception: |
| pass |
| |
| |
| try: |
| sentiment = geo_services.sentiment_analysis(org_name_for_visibility, queries, api_keys=req.api_keys) |
| perf['sentiment_analysis'] = sentiment |
| except Exception: |
| pass |
|
|
| audit_obj['ai_visibility'] = perf |
| |
| with open(audit_path, 'w', encoding='utf-8') as f: |
| json.dump(audit_obj, f, ensure_ascii=False, indent=2) |
|
|
| |
| score = ai_analysis.compute_geo_score(audit_obj.get('pages', []), audit=audit_obj, ai_visibility=perf) |
| scores.append(score['score']) |
| breakdowns.append(score['breakdown']) |
| run_results.append({ 'ts': ts, 'audit_path': str(audit_path), 'geo_score': score }) |
| timestamps.append(ts) |
|
|
| |
| snap_ts = timestamps[0] |
| snap_src = Path(run_results[0]['audit_path']) |
| snap_dst = OUTPUT_DIR / f"snapshot-{snap_ts}.json" |
| with open(snap_src, 'r', encoding='utf-8') as fsrc, open(snap_dst, 'w', encoding='utf-8') as fdst: |
| fdst.write(fsrc.read()) |
|
|
| |
| try: |
| |
| audit_main = OUTPUT_DIR / 'audit.json' |
| with open(snap_dst, 'r', encoding='utf-8') as s, open(audit_main, 'w', encoding='utf-8') as m: |
| m.write(s.read()) |
|
|
| |
| try: |
| with open(audit_main, 'r', encoding='utf-8') as f: |
| audit_obj = json.load(f) |
| pages = audit_obj.get('pages', []) |
| analysis = ai_analysis.analyze_pages(pages) |
| geo_score = ai_analysis.compute_geo_score(pages, audit=audit_obj, ai_visibility=audit_obj.get('ai_visibility')) |
| analysis_out = { 'analysis': analysis, 'geo_score': geo_score } |
| with open(OUTPUT_DIR / 'analysis.json', 'w', encoding='utf-8') as fa: |
| json.dump(analysis_out, fa, ensure_ascii=False, indent=2) |
| |
| except Exception: |
| pass |
| except Exception: |
| pass |
|
|
| |
| import statistics |
| mean_score = int(round(statistics.mean(scores))) |
| median_score = int(round(statistics.median(scores))) |
| variance = float(statistics.pstdev(scores)) |
|
|
| history_path = OUTPUT_DIR / 'history.json' |
| history = [] |
| if history_path.exists(): |
| try: |
| history = json.loads(history_path.read_text(encoding='utf-8')) |
| except Exception: |
| history = [] |
| entry = { |
| 'timestamp': timestamps[0], |
| 'url': req.url, |
| 'org_name': req.org_name, |
| 'runs': runs, |
| 'scores': scores, |
| 'mean': mean_score, |
| 'median': median_score, |
| 'variance': variance, |
| 'runs_info': run_results |
| } |
| history.append(entry) |
| history_path.write_text(json.dumps(history, ensure_ascii=False, indent=2), encoding='utf-8') |
|
|
| return { 'ok': True, 'message': 'crawl completed', 'mean_score': mean_score, 'median_score': median_score, 'variance': variance, 'history_entry': entry } |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| class JobRequest(BaseModel): |
| url: str |
| org_name: str |
| org_url: str |
| max_pages: int = 3 |
| runs: int = 1 |
| industry_override: Optional[str] = None |
|
|
|
|
| @app.post('/api/jobs') |
| |
| async def api_enqueue(job: JobRequest, background_tasks: BackgroundTasks, request: Request): |
| try: |
| |
| user_id = None |
| try: |
| auth = request.headers.get('authorization', '') |
| token = auth.split(' ', 1)[1].strip() |
| user_id = user_mgmt.verify_token(token) |
| except Exception: |
| pass |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| jid = job_queue.enqueue_job( |
| job.url, |
| job.org_name, |
| job.org_url, |
| job.max_pages, |
| job.runs, |
| industry_override=job.industry_override |
| ) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| from server.worker import process_job |
| job_data = job_queue.get_job(jid) |
| if job_data: |
| background_tasks.add_task(process_job, job_data) |
| |
| return {'ok': True, 'job_id': jid} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/jobs') |
| async def api_list_jobs(): |
| try: |
| data = job_queue.list_jobs() |
| return {'ok': True, 'jobs': data} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/jobs/{job_id}') |
| async def api_get_job(job_id: int): |
| try: |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404) |
| |
| try: |
| job['progress'] = json.loads(job.get('progress') or '{}') |
| except Exception: |
| job['progress'] = {} |
| return {'ok': True, 'job': job} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.websocket('/ws/jobs/{job_id}') |
| async def ws_job_progress(ws: WebSocket, job_id: int): |
| await ws.accept() |
| last_updated = None |
| try: |
| while True: |
| job = job_queue.get_job(job_id) |
| if not job: |
| |
| try: |
| await ws.send_json({'ok': False, 'error': 'job not found'}) |
| except Exception: |
| pass |
| await asyncio.sleep(1) |
| continue |
|
|
| |
| try: |
| prog = job.get('progress') |
| if isinstance(prog, str): |
| try: |
| job['progress'] = json.loads(prog or '{}') |
| except Exception: |
| job['progress'] = {} |
| else: |
| job['progress'] = prog or {} |
| except Exception: |
| job['progress'] = {} |
|
|
| |
| for dt_key in ('created_at', 'updated_at'): |
| try: |
| val = job.get(dt_key) |
| if hasattr(val, 'isoformat'): |
| job[dt_key] = val.isoformat() |
| else: |
| job[dt_key] = str(val) if val is not None else None |
| except Exception: |
| job[dt_key] = str(job.get(dt_key)) |
|
|
| updated = job.get('updated_at') |
| if updated != last_updated: |
| last_updated = updated |
| try: |
| await ws.send_json({'ok': True, 'job': job}) |
| except Exception: |
| |
| break |
| await asyncio.sleep(1) |
| finally: |
| try: |
| await ws.close() |
| except Exception: |
| pass |
|
|
| @app.get('/api/jobs/{job_id}/results') |
| async def api_job_results(job_id: int): |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404) |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': 'no results yet'}, status_code=400) |
| out = {'ok': True, 'job_id': job_id} |
| audit_path = Path(result_path) / 'audit.json' |
| analysis_path = Path(result_path) / 'analysis.json' |
| schema_path = Path(result_path) / 'schema.jsonld' |
| if audit_path.exists(): |
| out['audit'] = json.loads(audit_path.read_text(encoding='utf-8')) |
| if analysis_path.exists(): |
| out['analysis'] = json.loads(analysis_path.read_text(encoding='utf-8')) |
| if schema_path.exists(): |
| out['schema'] = schema_path.read_text(encoding='utf-8') |
| return out |
|
|
|
|
| @app.get('/api/results') |
| async def api_results(ts: str | None = None): |
| from server.cache_manager import invalidate_results_cache |
| |
| if ts: |
| snapshot_path = OUTPUT_DIR / f'snapshot-{ts}.json' |
| if snapshot_path.exists(): |
| try: |
| result = {'audit': json.loads(snapshot_path.read_text(encoding='utf-8'))} |
| return JSONResponse(result, headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"}) |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': f'Failed to load snapshot: {str(e)}'}, status_code=500) |
| |
| audit_path = OUTPUT_DIR / 'audit.json' |
| schema_path = OUTPUT_DIR / 'schema.jsonld' |
| out = {} |
| if audit_path.exists(): |
| out['audit'] = json.loads(audit_path.read_text(encoding='utf-8')) |
| if schema_path.exists(): |
| out['schema'] = schema_path.read_text(encoding='utf-8') |
| return JSONResponse(out, headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"}) |
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| print(" GEO Platform API is ready!") |
| print(f" Access at: http://0.0.0.0:7860") |
| print(f" Health check: http://0.0.0.0:7860/health") |
| try: |
| admin_email = os.environ.get('ADMIN_EMAIL', 'admin@moharek.com') |
| admin_pass = os.environ.get('ADMIN_PASSWORD', 'moharek2025') |
| users = user_mgmt.list_users() |
| if not any(u.get('email') == admin_email for u in users): |
| uid = user_mgmt.create_user( |
| email=admin_email, |
| password=admin_pass, |
| role='admin' |
| ) |
| print(f" Seeded default admin user (id={uid})") |
| except Exception as e: |
| print(f" Seed user error: {e}") |
|
|
| @app.get('/api/health') |
| @app.get('/health') |
| async def health_check(): |
| return {'status': 'healthy', 'service': 'GEO Platform'} |
|
|
| @app.get('/') |
| async def index(): |
| index_file = frontend_dir / 'index.html' |
| if index_file.exists(): |
| return FileResponse(str(index_file), headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"}) |
| return {'ok': True} |
|
|
| @app.get('/{file_path:path}', include_in_schema=False) |
| async def serve_static(file_path: str): |
| """Serve static files including SVG, CSS, JS, etc.""" |
| if file_path.startswith('api/'): |
| return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404) |
| |
| file_full_path = frontend_dir / file_path |
| |
| try: |
| file_full_path = file_full_path.resolve() |
| if not str(file_full_path).startswith(str(frontend_dir.resolve())): |
| return JSONResponse({'ok': False, 'error': 'forbidden'}, status_code=403) |
| except Exception: |
| return JSONResponse({'ok': False, 'error': 'invalid path'}, status_code=400) |
| |
| if file_full_path.exists() and file_full_path.is_file(): |
| media_type = 'application/octet-stream' |
| if file_path.endswith('.svg'): |
| media_type = 'image/svg+xml' |
| elif file_path.endswith('.css'): |
| media_type = 'text/css' |
| elif file_path.endswith('.js'): |
| media_type = 'application/javascript' |
| elif file_path.endswith('.html'): |
| media_type = 'text/html' |
| elif file_path.endswith('.json'): |
| media_type = 'application/json' |
| elif file_path.endswith('.png'): |
| media_type = 'image/png' |
| elif file_path.endswith('.jpg') or file_path.endswith('.jpeg'): |
| media_type = 'image/jpeg' |
| elif file_path.endswith('.gif'): |
| media_type = 'image/gif' |
| elif file_path.endswith('.webp'): |
| media_type = 'image/webp' |
| |
| return FileResponse(str(file_full_path), media_type=media_type, headers={"Cache-Control": "public, max-age=3600"}) |
| |
| if not file_path.endswith('.html'): |
| html_path = frontend_dir / f'{file_path}.html' |
| if html_path.exists(): |
| return FileResponse(str(html_path), media_type='text/html', headers={"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0"}) |
| |
| return JSONResponse({'ok': False, 'error': 'not found'}, status_code=404) |
|
|
|
|
|
|
|
|
|
|
| @app.get('/api/jobs/{job_id}/report') |
| async def api_job_report(job_id: int): |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400) |
| from server import reports |
| html = reports.build_html_report(result_path) |
| return JSONResponse({'ok': True, 'report_html': html}) |
|
|
|
|
| @app.get('/api/jobs/{job_id}/report.pdf') |
| async def api_job_report_pdf(job_id: int): |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400) |
| try: |
| from server import reports |
| html = reports.build_html_report(result_path) |
| out_pdf = Path(result_path) / f'report-{job_id}.pdf' |
| ok = reports.try_render_pdf(html, out_pdf) |
| if ok and out_pdf.exists(): |
| return FileResponse(str(out_pdf), media_type='application/pdf', filename=f'report-{job_id}.pdf') |
| except Exception as e: |
| print(f'PDF generation error: {e}') |
| return JSONResponse({'ok': False, 'error': 'pdf_generation_failed'}, status_code=500) |
|
|
|
|
| @app.get('/api/jobs/{job_id}/keywords') |
| async def api_job_keywords(job_id: int, enrich: bool = False, analytics: bool = False): |
| try: |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400) |
| audit_path = Path(result_path) / 'audit.json' |
| if not audit_path.exists(): |
| return JSONResponse({'ok': False, 'error': 'audit.json not found in result_path'}, status_code=404) |
| with open(audit_path, 'r', encoding='utf-8') as f: |
| audit = json.load(f) |
| result = keyword_engine.extract_keywords_from_audit(audit, top_n=40, enrich=enrich, analytics=analytics) |
| |
| if analytics: |
| return {'ok': True, 'analytics': result} |
| else: |
| return {'ok': True, 'keywords': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/jobs/{job_id}/serp') |
| async def api_job_serp(job_id: int, gl: str = 'sa', hl: str = 'ar'): |
| """Full SERP report via ScrapingBee (rank check, PAA, local, competitors).""" |
| try: |
| from server import scrapingbee_client |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400) |
| audit_path = Path(result_path) / 'audit.json' |
| if not audit_path.exists(): |
| return JSONResponse({'ok': False, 'error': 'audit.json not found'}, status_code=404) |
| with open(audit_path, 'r', encoding='utf-8') as f: |
| audit = json.load(f) |
| site_url = job.get('url', '') |
| kws_raw = keyword_engine.extract_keywords_from_audit(audit, top_n=8) |
| keywords = [k['kw'] if isinstance(k, dict) else k for k in kws_raw[:8]] |
| report = scrapingbee_client.full_serp_report(site_url, keywords, country_code=gl, language=hl) |
| return report |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/jobs/{job_id}/actions') |
| async def api_job_actions(job_id: int): |
| try: |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': 'job result not ready'}, status_code=400) |
| audit_path = Path(result_path) / 'audit.json' |
| if not audit_path.exists(): |
| return JSONResponse({'ok': False, 'error': 'audit.json not found in result_path'}, status_code=404) |
| with open(audit_path, 'r', encoding='utf-8') as f: |
| audit = json.load(f) |
| |
| from server import action_engine |
| actions = action_engine.generate_action_plan(audit) |
| return actions |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/actions/save') |
| async def api_save_actions(request: Request): |
| try: |
| data = await request.json() |
| job_id = data.get('job_id') |
| actions = data.get('actions', []) |
| |
| from server.db import SessionLocal, Action |
| db = SessionLocal() |
| try: |
| saved = [] |
| for act in actions: |
| new_act = Action( |
| job_id=job_id, |
| type=act.get('type', 'general'), |
| task=act.get('task', ''), |
| priority_score=100 if str(act.get('priority')).lower() == 'high' else (50 if str(act.get('priority')).lower() == 'medium' else 10), |
| status='pending' |
| ) |
| db.add(new_act) |
| db.flush() |
| saved.append(new_act.id) |
| db.commit() |
| return {'ok': True, 'saved_ids': saved} |
| finally: |
| db.close() |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/actions/list') |
| async def api_list_actions(): |
| try: |
| from server.db import SessionLocal, Action |
| db = SessionLocal() |
| try: |
| items = db.query(Action).order_by(Action.priority_score.desc()).limit(50).all() |
| actions = [] |
| for i in items: |
| action_dict = { |
| 'id': i.id, |
| 'job_id': i.job_id, |
| 'type': i.type, |
| 'task': i.task, |
| 'priority_score': i.priority_score, |
| 'status': i.status, |
| 'result': i.result, |
| 'created_at': str(i.created_at), |
| 'impact': 'high' if i.priority_score >= 80 else 'medium' if i.priority_score >= 40 else 'low', |
| 'effort': 'low', |
| 'timeline': '1-2 اسابيع' |
| } |
| if hasattr(i, 'initial_metrics') and i.initial_metrics: |
| action_dict['initial_metrics'] = i.initial_metrics |
| if hasattr(i, 'latest_metrics') and i.latest_metrics: |
| action_dict['latest_metrics'] = i.latest_metrics |
| if hasattr(i, 'impact_score') and i.impact_score: |
| action_dict['impact_score'] = i.impact_score |
| actions.append(action_dict) |
| return {'ok': True, 'actions': actions} |
| finally: |
| db.close() |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/mentions/list') |
| async def api_list_mentions(): |
| try: |
| from server.db import SessionLocal, Mention |
| db = SessionLocal() |
| try: |
| items = db.query(Mention).order_by(Mention.created_at.desc()).limit(50).all() |
| return {'ok': True, 'mentions': [{ |
| 'id': i.id, 'brand': i.brand, 'source': i.source, |
| 'content': i.content, 'url': i.url, |
| 'sentiment_score': i.sentiment_score, 'created_at': str(i.created_at) |
| } for i in items]} |
| finally: |
| db.close() |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/graph/entities') |
| async def api_list_entities(): |
| try: |
| from server.db import SessionLocal, Entity |
| db = SessionLocal() |
| try: |
| items = db.query(Entity).order_by(Entity.roi_score.desc()).limit(100).all() |
| return {'ok': True, 'entities': [{ |
| 'id': i.id, 'name': i.name, 'type': i.type, 'roi_score': i.roi_score |
| } for i in items]} |
| finally: |
| db.close() |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/ai_visibility/radar') |
| async def api_ai_radar(job_id: int): |
| try: |
| from server import ai_radar |
| from server.db import SessionLocal, AIResponse, AIPrompt |
| db = SessionLocal() |
| try: |
| responses = db.query(AIResponse).join(AIPrompt).filter(AIPrompt.job_id == job_id).order_by(AIResponse.created_at.desc()).limit(10).all() |
| score = ai_radar.calculate_overall_visibility_score(job_id) |
| return { |
| 'ok': True, |
| 'overall_score': score, |
| 'latest_mentions': [{ |
| 'prompt': r.prompt_id, |
| 'mentioned': r.mention_found, |
| 'model': r.model_name, |
| 'competitors': r.competitors_mentioned |
| } for r in responses] |
| } |
| finally: |
| db.close() |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/autopilot/toggle') |
| async def api_toggle_autopilot(job_id: int, enabled: bool): |
| try: |
| from server.db import SessionLocal, ProjectSettings |
| db = SessionLocal() |
| try: |
| settings = db.query(ProjectSettings).filter(ProjectSettings.job_id == job_id).first() |
| if not settings: |
| settings = ProjectSettings(job_id=job_id, autopilot_enabled=enabled) |
| db.add(settings) |
| else: |
| settings.autopilot_enabled = enabled |
| db.commit() |
| return {'ok': True, 'autopilot_enabled': enabled} |
| finally: |
| db.close() |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/strategy/summary') |
| async def api_strategy_summary(job_id: int): |
| try: |
| from server import growth_os |
| summary = growth_os.get_growth_strategy(job_id) |
| return {'ok': True, 'summary': summary} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/actions/{action_id}/execute') |
| async def api_execute_action(action_id: int): |
| try: |
| from server import executor |
| from server.db import SessionLocal, Action |
| db = SessionLocal() |
| try: |
| act = db.query(Action).filter(Action.id == action_id).first() |
| if act: |
| executor.dispatch_action(action_id, act.type) |
| finally: |
| db.close() |
| return {'ok': True, 'action_id': action_id, 'message': 'Dispatched successfully'} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| class UserRegister(BaseModel): |
| email: str |
| password: str |
| company_id: int | None = None |
|
|
|
|
| class CompanyRegister(BaseModel): |
| name: str |
| domain: str | None = None |
|
|
|
|
| class LoginRequest(BaseModel): |
| email: str |
| password: str |
|
|
|
|
| @app.post('/api/companies/register') |
| async def api_register_company(req: CompanyRegister): |
| try: |
| cid = user_mgmt.create_company(req.name, req.domain) |
| return {'ok': True, 'company_id': cid} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/users/register') |
| async def api_register_user(req: UserRegister): |
| try: |
| uid = user_mgmt.create_user(req.email, req.password, company_id=req.company_id) |
| token = user_mgmt.make_token(uid) |
| user = user_mgmt.get_user(uid) |
| |
| |
| if onboarding: |
| onboarding.init_trial(uid) |
| onboarding.init_onboarding(uid) |
| |
| return {'ok': True, 'user_id': uid, 'token': token, 'user': user} |
| except sqlite3.IntegrityError: |
| return JSONResponse({'ok': False, 'error': 'البريد الإلكتروني مسجل بالفعل'}, status_code=400) |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/users/login') |
| async def api_login(req: LoginRequest): |
| try: |
| user = user_mgmt.authenticate_user(req.email, req.password) |
| if not user: |
| return JSONResponse({'ok': False, 'error': 'invalid credentials'}, status_code=401) |
| token = user_mgmt.make_token(user['id']) |
| return {'ok': True, 'token': token, 'user': user} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/users/me') |
| async def api_me(request: Request): |
| auth = request.headers.get('authorization') or request.headers.get('Authorization') |
| if not auth or not auth.lower().startswith('bearer '): |
| return JSONResponse({'ok': False, 'error': 'missing token'}, status_code=401) |
| |
| try: |
| token = auth.split(' ', 1)[1].strip() |
| if not token or len(token) < 10: |
| return JSONResponse({'ok': False, 'error': 'invalid token'}, status_code=401) |
| |
| uid = user_mgmt.verify_token(token) |
| if not uid: |
| return JSONResponse({'ok': False, 'error': 'invalid token'}, status_code=401) |
| |
| u = user_mgmt.get_user(uid) |
| if not u: |
| return JSONResponse({'ok': False, 'error': 'user not found'}, status_code=404) |
| |
| return {'ok': True, 'user': u} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': 'token validation failed'}, status_code=401) |
|
|
|
|
| @app.post('/api/simulate') |
| async def api_simulate(req: SimulationRequest): |
| try: |
| prediction = ai_analysis.simulate_visibility(req.content, req.brand, api_keys=req.api_keys) |
| return {'ok': True, 'prediction': prediction} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/analyze') |
| |
| async def api_analyze(req: AnalysisRequest = None, request: Request = None, job_id: int = None): |
| api_keys = req.api_keys if req else {} |
| job_id = job_id or (req.job_id if req else None) |
|
|
| |
| if job_id: |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': f'job {job_id} not found'}, status_code=404) |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': f'Job {job_id} is still processing (result_path is empty). Please wait for the crawler to finish.'}, status_code=400) |
| audit_path = Path(result_path) / 'audit.json' |
| analysis_out_path = Path(result_path) / 'analysis.json' |
| else: |
| audit_path = OUTPUT_DIR / 'audit.json' |
| analysis_out_path = OUTPUT_DIR / 'analysis.json' |
|
|
| if not audit_path.exists(): |
| return JSONResponse({'ok': False, 'error': 'no audit found; run crawl first'}, status_code=400) |
|
|
| with open(audit_path, 'r', encoding='utf-8') as f: |
| audit = json.load(f) |
| pages = audit.get('pages', []) |
| |
| |
| analysis = ai_analysis.analyze_pages(pages, api_keys=api_keys) |
|
|
| |
| ai_vis = audit.get('ai_visibility') or {} |
| org_name = audit.get('org_name') or ai_analysis.infer_brand_name(pages) |
| |
| if api_keys: |
| queries = [f"What is {org_name}?", f"Best services for {org_name}"] |
| if api_keys.get('perplexity'): |
| ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys.get('perplexity')) |
| elif api_keys.get('openai'): |
| ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai')) |
|
|
| |
| try: |
| deep_ai = ai_analysis.analyze_ai_visibility_deep(pages, org_name, api_keys=api_keys) |
| |
| if deep_ai: |
| ai_vis.update(deep_ai) |
| except Exception as e: |
| print(f"Deep AI analysis error: {e}") |
| |
| audit['ai_visibility'] = ai_vis |
| |
| with open(audit_path, 'w', encoding='utf-8') as f: |
| json.dump(audit, f, ensure_ascii=False, indent=2) |
|
|
| from server import geo_services |
| |
| |
| org_name = audit.get('org_name') or ai_analysis.infer_brand_name(pages) |
| |
| |
| searches = [] |
| serp_key = (api_keys.get("SERPAPI_KEY") if api_keys else None) or os.getenv("SERPAPI_KEY") |
| zen_key = (api_keys.get("ZENSERP_KEY") if api_keys else None) or os.getenv("ZENSERP_KEY") |
| |
| core_queries = [f"{org_name}", f"تحميل {org_name}", f"{org_name} review"] |
| for cq in core_queries[:2]: |
| s_res = geo_services._serp_api_search(cq, api_key=serp_key) |
| if not s_res: |
| s_res = geo_services._zenserp_search(cq, api_key=zen_key) |
| if s_res: |
| searches.append(s_res) |
|
|
| |
| comp_insight = {} |
| try: |
| |
| industry_override = audit.get('industry_override') |
| comp_insight = geo_services.get_competitor_insights( |
| org_name, |
| audit.get('url'), |
| api_keys=api_keys, |
| industry_override=industry_override |
| ) |
| except Exception as e: |
| print(f"Competitor insight error: {e}") |
| pass |
|
|
| |
| ai_mentions = ai_vis.get("mentions", 0) if ai_vis else 0 |
| total_queries = ai_vis.get("total_queries", 1) if ai_vis else 1 |
| traffic_est = comp_insight.get("monthly_visits", "unknown") |
| |
| geo_v2 = geo_services.calculate_visibility_score_v2( |
| org_name, searches, ai_mentions, total_queries, traffic_est |
| ) |
| |
| geo = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=ai_vis) |
| geo["v2"] = geo_v2 |
|
|
| analysis_out = { 'analysis': analysis, 'geo_score': geo, 'competitor_insight': comp_insight } |
| |
| with open(analysis_out_path, 'w', encoding='utf-8') as fa: |
| json.dump(analysis_out, fa, ensure_ascii=False, indent=2) |
|
|
| return { 'ok': True, 'analysis': analysis, 'geo_score': geo, 'competitor_insight': comp_insight } |
|
|
|
|
| @app.get('/api/history') |
| async def api_history(): |
| history_path = OUTPUT_DIR / 'history.json' |
| if not history_path.exists(): |
| return { 'ok': True, 'history': [] } |
| try: |
| data = json.loads(history_path.read_text(encoding='utf-8')) |
| return { 'ok': True, 'history': data } |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| class ActivateRequest(BaseModel): |
| ts: str |
| api_keys: Optional[dict] = None |
|
|
|
|
| @app.post('/api/history/activate') |
| async def api_activate_history(req: ActivateRequest): |
| """ |
| Switch the 'active' research to a historical snapshot. |
| This overwrites audit.json and triggers a re-analysis. |
| """ |
| try: |
| ts = req.ts |
| snapshot_path = OUTPUT_DIR / f'snapshot-{ts}.json' |
| if not snapshot_path.exists(): |
| return JSONResponse({'ok': False, 'error': f'Snapshot {ts} not found'}, status_code=404) |
|
|
| |
| audit_main = OUTPUT_DIR / 'audit.json' |
| with open(snapshot_path, 'r', encoding='utf-8') as s, open(audit_main, 'w', encoding='utf-8') as m: |
| audit_obj = json.load(s) |
| json.dump(audit_obj, m, ensure_ascii=False, indent=2) |
|
|
| |
| pages = audit_obj.get('pages', []) |
| api_keys = req.api_keys or {} |
| |
| |
| analysis_data = ai_analysis.analyze_pages(pages, api_keys=api_keys) |
| |
| |
| org_name = audit_obj.get('org_name', 'Company') |
| if not org_name or org_name == 'Company': |
| inferred = ai_analysis.infer_brand_name(pages) |
| if inferred and inferred != 'Company': |
| org_name = inferred |
| audit_obj['org_name'] = org_name |
| |
| with open(audit_main, 'w', encoding='utf-8') as f: |
| json.dump(audit_obj, f, ensure_ascii=False, indent=2) |
|
|
| ai_vis = audit_obj.get('ai_visibility') |
| |
| if api_keys and (api_keys.get('perplexity') or api_keys.get('openai')): |
| queries = [f"What is {org_name}?", f"Best services for {org_name}"] |
| if api_keys.get('perplexity'): |
| ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys['perplexity']) |
| elif api_keys.get('openai'): |
| ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai')) |
| audit_obj['ai_visibility'] = ai_vis |
| with open(audit_main, 'w', encoding='utf-8') as f: |
| json.dump(audit_obj, f, ensure_ascii=False, indent=2) |
|
|
| geo_score = ai_analysis.compute_geo_score(pages, audit=audit_obj, ai_visibility=ai_vis) |
| |
| |
| full_report = { 'analysis': analysis_data, 'geo_score': geo_score } |
| analysis_path = OUTPUT_DIR / 'analysis.json' |
| with open(analysis_path, 'w', encoding='utf-8') as fa: |
| json.dump(full_report, fa, ensure_ascii=False, indent=2) |
|
|
| |
| |
| try: |
| from server import search_intelligence |
| report = search_intelligence.run_complete_analysis(pages, source_url=audit_obj.get('url', 'http://example.com'), api_keys=api_keys) |
| except Exception as e: |
| print(f"Intelligence sync error: {e}") |
| report = None |
|
|
| return { |
| 'ok': True, |
| 'message': f'Research {ts} activated successfully', |
| 'org_name': org_name, |
| 'pages_count': len(pages), |
| 'report': report |
| } |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/recommendations') |
| async def api_recommendations(req: RecommendationRequest = None): |
| |
| api_keys = req.api_keys if req else {} |
| job_id = req.job_id if req else None |
| extra_context = req.extra_context if (req and req.extra_context) else {} |
|
|
| |
| if job_id: |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': f'job {job_id} not found'}, status_code=404) |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': f'job {job_id} has no results yet — wait for it to complete'}, status_code=400) |
| audit_path = Path(result_path) / 'audit.json' |
| analysis_path = Path(result_path) / 'analysis.json' |
| else: |
| audit_path = OUTPUT_DIR / 'audit.json' |
| analysis_path = OUTPUT_DIR / 'analysis.json' |
|
|
| if not audit_path.exists(): |
| return JSONResponse({'ok': False, 'error': 'no audit found — run a crawl first'}, status_code=400) |
| |
| with open(audit_path, 'r', encoding='utf-8') as f: |
| audit = json.load(f) |
| |
| pages = audit.get('pages', []) |
|
|
| |
| if api_keys: |
| |
| old_org = audit.get('org_name', 'Company') |
| org_name = old_org |
| if not org_name or org_name == 'Company': |
| inferred = ai_analysis.infer_brand_name(pages) |
| if inferred and inferred != 'Company': |
| org_name = inferred |
| audit['org_name'] = org_name |
| |
| queries = [f"What is {org_name}?", f"Best services for {org_name}"] |
| ai_vis = audit.get('ai_visibility') |
| |
| |
| needs_visibility = (org_name != old_org) or not ai_vis or not ai_vis.get('enabled') |
| if needs_visibility and (api_keys.get('perplexity') or api_keys.get('openai')): |
| if api_keys.get('perplexity'): |
| ai_vis = ai_visibility.check_perplexity(org_name, queries, api_key=api_keys['perplexity']) |
| elif api_keys.get('openai'): |
| ai_vis = ai_visibility.check_openai_visibility(org_name, queries, api_key=api_keys.get('openai')) |
| audit['ai_visibility'] = ai_vis |
| |
| analysis_data = ai_analysis.analyze_pages(pages, api_keys=api_keys) |
| geo_score = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=ai_vis) |
| |
| |
| with open(audit_path, 'w', encoding='utf-8') as f: |
| json.dump(audit, f, ensure_ascii=False, indent=2) |
| with open(analysis_path, 'w', encoding='utf-8') as f: |
| json.dump({ 'analysis': analysis_data, 'geo_score': geo_score }, f, ensure_ascii=False, indent=2) |
| else: |
| |
| if not analysis_path.exists(): |
| |
| analysis_data = ai_analysis.analyze_pages(pages) |
| geo_score = ai_analysis.compute_geo_score(pages, audit=audit, ai_visibility=audit.get('ai_visibility')) |
| else: |
| with open(analysis_path, 'r', encoding='utf-8') as f: |
| ana_obj = json.load(f) |
| analysis_data = ana_obj.get('analysis') |
| geo_score = ana_obj.get('geo_score') |
|
|
| |
| search_intel_report = None |
| try: |
| from server import search_intelligence |
| search_intel_report = search_intelligence.run_complete_analysis(pages, source_url=audit.get('url', ''), api_keys=api_keys) |
| except Exception as e: |
| print(f"Search intel error in recommendations: {e}") |
|
|
| recs = ai_analysis.generate_recommendations(pages, geo_score=geo_score, api_keys=api_keys, ai_analysis_results=analysis_data, extra_context=extra_context) |
| |
| return { |
| 'ok': True, |
| 'recommendations': recs, |
| 'audit': audit, |
| 'ai_visibility': audit.get('ai_visibility'), |
| 'analysis': analysis_data, |
| 'geo_score': geo_score, |
| 'search_intel': search_intel_report |
| } |
|
|
|
|
| class KeywordsRequest(BaseModel): |
| url: str |
| max_pages: int = 1 |
| api_keys: Optional[dict] = None |
|
|
|
|
| @app.post('/api/keywords') |
| |
| async def api_keywords(req: KeywordsRequest, request: Request, enrich: bool = False, analytics: bool = False): |
| try: |
| |
| from src import crawler |
| pages = crawler.crawl_seed(req.url, max_pages=req.max_pages) |
| |
| audit_obj = {'pages': pages} |
| from server import keyword_engine |
| result = keyword_engine.extract_keywords_from_audit(audit_obj, top_n=40, enrich=enrich, analytics=analytics) |
| |
| if analytics: |
| |
| return {'ok': True, 'analytics': result, 'pages': [{'url': p.get('url'), 'title': p.get('title')} for p in pages]} |
| else: |
| |
| return {'ok': True, 'keywords': result, 'pages': [{'url': p.get('url'), 'title': p.get('title')} for p in pages]} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/test/keywords') |
| async def api_test_keywords(): |
| """Test endpoint to verify keyword extraction works.""" |
| try: |
| from src import crawler |
| from server.keyword_engine import extract_keywords_from_audit |
| |
| |
| pages = crawler.crawl_seed('https://abayanoir.com', max_pages=1) |
| audit = {'pages': pages} |
| |
| |
| simple = extract_keywords_from_audit(audit, top_n=5, enrich=False, analytics=False) |
| |
| |
| analytics = extract_keywords_from_audit(audit, top_n=5, enrich=False, analytics=True) |
| |
| return { |
| 'ok': True, |
| 'simple_keywords': simple, |
| 'analytics_type': str(type(analytics)), |
| 'analytics_summary': analytics.get('summary') if isinstance(analytics, dict) else 'NOT A DICT', |
| 'pages_crawled': len(pages) |
| } |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/search/intelligence') |
| async def api_search_intelligence(req: KeywordsRequest, enrich: bool = False): |
| """Complete search intelligence analysis with keywords, competitors, and recommendations.""" |
| try: |
| from src import crawler |
| from server import search_intelligence |
| |
| |
| pages = crawler.crawl_seed(req.url, max_pages=req.max_pages) |
| |
| |
| if not isinstance(pages, list): |
| pages = [pages] if pages else [] |
| |
| |
| report = search_intelligence.run_complete_analysis(pages, req.url, enrich_data=enrich, api_keys=req.api_keys) |
| |
| return {'ok': True, 'report': report} |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/export/keywords/csv') |
| async def api_export_keywords_csv(req: KeywordsRequest, enrich: bool = False): |
| """Export keywords to CSV format.""" |
| try: |
| from src import crawler |
| from server import search_intelligence |
| import csv |
| from io import StringIO |
| from fastapi.responses import StreamingResponse |
| |
| |
| pages = crawler.crawl_seed(req.url, max_pages=req.max_pages) |
| if not isinstance(pages, list): |
| pages = [pages] if pages else [] |
| |
| report = search_intelligence.run_complete_analysis(pages, req.url, enrich_data=enrich) |
| |
| |
| output = StringIO() |
| writer = csv.writer(output) |
| |
| |
| writer.writerow(['Keyword', 'Count', 'Density (%)', 'Volume', 'CPC ($)', 'Competition', 'Classification']) |
| |
| |
| for kw in report['keyword_results']['classification']['primary']['keywords']: |
| writer.writerow([ |
| kw['kw'], |
| kw['count'], |
| kw.get('density', 'N/A'), |
| kw.get('volume', 'N/A'), |
| kw.get('cpc', 'N/A'), |
| kw.get('competition', 'N/A'), |
| 'Primary' |
| ]) |
| |
| |
| for kw in report['keyword_results']['classification']['secondary']['keywords']: |
| writer.writerow([ |
| kw['kw'], |
| kw['count'], |
| kw.get('density', 'N/A'), |
| kw.get('volume', 'N/A'), |
| kw.get('cpc', 'N/A'), |
| kw.get('competition', 'N/A'), |
| 'Secondary' |
| ]) |
| |
| |
| for kw in report['keyword_results']['classification']['long_tail']['keywords']: |
| writer.writerow([ |
| kw['kw'], |
| kw['count'], |
| kw.get('density', 'N/A'), |
| kw.get('volume', 'N/A'), |
| kw.get('cpc', 'N/A'), |
| kw.get('competition', 'N/A'), |
| 'Long-tail' |
| ]) |
| |
| output.seek(0) |
| return StreamingResponse( |
| iter([output.getvalue()]), |
| media_type="text/csv", |
| headers={"Content-Disposition": f"attachment; filename=keywords_{req.url.replace('https://', '').replace('/', '_')}.csv"} |
| ) |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/search/competitors') |
| async def api_search_competitors(req: KeywordsRequest): |
| try: |
| from src import crawler |
| from server import competitor_analysis |
| |
| pages = crawler.crawl_seed(req.url, max_pages=req.max_pages) |
| competitors = competitor_analysis.detect_competitors(pages, req.url, min_mentions=1) |
| summary = competitor_analysis.get_competitor_summary(competitors) |
| |
| return { |
| 'ok': True, |
| 'result': { |
| 'competitors': competitors, |
| 'summary': summary, |
| 'pages_analyzed': len(pages) |
| } |
| } |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/search/gsc') |
| async def api_search_gsc(site: str, start: str, end: str): |
| try: |
| res = search_intel.gsc_query(site, start, end) |
| return {'ok': True, 'result': res} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| |
|
|
| class ArticleRequest(BaseModel): |
| keyword: str |
| lang: str = 'en' |
| target_site: str = "" |
| research_insights: list = [] |
| competitors_content: list = [] |
| crawl_data: dict = {} |
| prefer_backend: str = 'groq' |
| api_keys: dict = {} |
|
|
|
|
| class OptimizeRequest(BaseModel): |
| content: str |
| keyword: str |
| lang: str = 'en' |
| target_site: str = "" |
| research_insights: list = [] |
| crawl_data: dict = {} |
| prefer_backend: str = 'groq' |
| api_keys: dict = {} |
|
|
|
|
| class FaqRequest(BaseModel): |
| topic: str |
| page_content: str = '' |
| lang: str = 'en' |
| target_site: str = "" |
| research_insights: list = [] |
| crawl_data: dict = {} |
| count: int = 5 |
| prefer_backend: str = 'groq' |
| api_keys: dict = {} |
|
|
|
|
| class SemanticRequest(BaseModel): |
| content: str |
| lang: str = 'en' |
| prefer_backend: str = 'groq' |
| api_keys: dict = {} |
|
|
|
|
| class IdentityRequest(BaseModel): |
| crawl_data: dict = {} |
| lang: str = 'en' |
| prefer_backend: str = 'groq' |
| api_keys: dict = {} |
|
|
|
|
| @app.post('/api/content/generate') |
| async def api_content_generate(req: ArticleRequest): |
| try: |
| from server import content_engine |
| result = content_engine.generate_article( |
| req.keyword, lang=req.lang, |
| target_site=req.target_site, |
| research_insights=req.research_insights, |
| competitors_content=req.competitors_content, |
| crawl_data=req.crawl_data, |
| prefer_backend=req.prefer_backend, |
| api_keys=req.api_keys |
| ) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| print(f"⚠️ [API] Content Gen Failed: {e}. Falling back to Demo.") |
| try: |
| from server import content_engine |
| |
| result = content_engine.generate_article(req.keyword, lang=req.lang, target_site=req.target_site, prefer_backend='demo') |
| return {'ok': True, 'result': result, 'warn': str(e)} |
| except Exception as e2: |
| return JSONResponse({'ok': False, 'error': str(e2)}, status_code=500) |
|
|
|
|
| @app.post('/api/content/optimize') |
| async def api_content_optimize(req: OptimizeRequest): |
| try: |
| from server import content_engine |
| result = content_engine.optimize_content( |
| req.content, req.keyword, lang=req.lang, |
| target_site=req.target_site, |
| research_insights=req.research_insights, |
| crawl_data=req.crawl_data, |
| prefer_backend=req.prefer_backend, |
| api_keys=req.api_keys |
| ) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| print(f"⚠️ [API] Content Optimization Failed: {e}. Falling back to Demo.") |
| try: |
| from server import content_engine |
| result = content_engine.optimize_content(req.content, req.keyword, lang=req.lang, target_site=req.target_site, prefer_backend='demo') |
| return {'ok': True, 'result': result, 'warn': str(e)} |
| except Exception as e2: |
| return JSONResponse({'ok': False, 'error': str(e2)}, status_code=500) |
|
|
|
|
| @app.post('/api/content/faqs') |
| async def api_content_faqs(req: FaqRequest): |
| try: |
| from server import content_engine |
| result = content_engine.generate_faqs( |
| req.topic, page_content=req.page_content, |
| lang=req.lang, count=req.count, |
| target_site=req.target_site, |
| research_insights=req.research_insights, |
| crawl_data=req.crawl_data, |
| prefer_backend=req.prefer_backend, |
| api_keys=req.api_keys |
| ) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/content/semantic') |
| async def api_content_semantic(req: SemanticRequest): |
| try: |
| from server import content_engine |
| result = content_engine.semantic_optimize( |
| req.content, lang=req.lang, |
| prefer_backend=req.prefer_backend, |
| api_keys=req.api_keys |
| ) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/content/identity') |
| async def api_content_identity(req: IdentityRequest): |
| try: |
| from server import content_engine |
| result = content_engine.generate_identity( |
| crawl_data=req.crawl_data, |
| lang=req.lang, |
| prefer_backend=req.prefer_backend, |
| api_keys=req.api_keys |
| ) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| |
| |
| |
| from server import ads_manager, ads_ai |
|
|
| class AdsConnectRequest(BaseModel): |
| developer_token: str |
| client_id: str |
| client_secret: str |
| refresh_token: str |
| customer_id: str |
|
|
| class AdsAIRequest(BaseModel): |
| api_keys: Optional[dict] = None |
| lang: Optional[str] = 'ar' |
| service_name: Optional[str] = 'خدمات SEO' |
| usp: Optional[str] = 'نتائج مضمونة في 90 يوم' |
| target_audience: Optional[str] = 'شركات سعودية' |
|
|
| class CampaignCreateRequest(BaseModel): |
| name: str |
| budget_usd: float = 5.0 |
| target_cpa: Optional[float] = None |
|
|
| @app.post('/api/ads/connect') |
| async def api_ads_connect(req: AdsConnectRequest): |
| """Save Google Ads credentials and verify they work.""" |
| try: |
| credentials = { |
| 'developer_token': req.developer_token, |
| 'client_id': req.client_id, |
| 'client_secret': req.client_secret, |
| 'refresh_token': req.refresh_token, |
| 'customer_id': req.customer_id |
| } |
| result = ads_manager.verify_google_connection(credentials) |
| if result.get('ok'): |
| ads_manager.save_ads_config(credentials) |
| return result |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/ads/dashboard') |
| async def api_ads_dashboard(demo: bool = False, days: int = 30): |
| """Return unified KPI dashboard data — uses saved credentials or demo.""" |
| try: |
| credentials = {} if demo else ads_manager.load_ads_config() |
| campaigns = ads_manager.get_campaign_performance(credentials, days=days) |
| summary = ads_manager.build_ads_summary(campaigns, credentials=credentials) |
| return {'ok': True, 'summary': summary, 'campaigns': campaigns} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/ads/keywords') |
| async def api_ads_keywords(demo: bool = False): |
| """Return keyword performance with Quality Scores.""" |
| try: |
| credentials = {} if demo else ads_manager.load_ads_config() |
| keywords = ads_manager.get_keyword_performance(credentials) |
| return {'ok': True, 'keywords': keywords, 'count': len(keywords)} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/ads/search-terms') |
| async def api_ads_search_terms(demo: bool = False, min_clicks: int = 5): |
| """Return search term analysis — converting vs. wasted spend.""" |
| try: |
| credentials = {} if demo else ads_manager.load_ads_config() |
| data = ads_manager.get_search_terms(credentials, min_clicks=min_clicks) |
| return {'ok': True, 'data': data} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/ads/campaigns') |
| async def api_ads_create_campaign(req: CampaignCreateRequest): |
| """Create a new Google Ads campaign (starts PAUSED for safety).""" |
| try: |
| credentials = ads_manager.load_ads_config() |
| result = ads_manager.create_campaign( |
| credentials, req.name, req.budget_usd, req.target_cpa |
| ) |
| return result |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/ads/ai/bid-suggestions') |
| async def api_ads_bid_suggestions(req: AdsAIRequest): |
| """AI-powered bid adjustment recommendations for each keyword.""" |
| try: |
| credentials = ads_manager.load_ads_config() |
| keywords = ads_manager.get_keyword_performance(credentials) |
| suggestions = ads_ai.ai_bid_suggestion(keywords, api_keys=req.api_keys or {}) |
| return {'ok': True, 'suggestions': suggestions, 'count': len(suggestions)} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/ads/ai/copy') |
| async def api_ads_generate_copy(req: AdsAIRequest): |
| """Generate Arabic + English RSA ad copy using AI.""" |
| try: |
| result = ads_ai.generate_ad_copy( |
| service_name=req.service_name or 'خدمات SEO', |
| usp=req.usp or 'نتائج مضمونة في 90 يوم', |
| target_audience=req.target_audience or 'شركات سعودية', |
| lang=req.lang or 'ar', |
| api_keys=req.api_keys or {} |
| ) |
| return {'ok': True, 'copy': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/ads/ai/negatives') |
| async def api_ads_negative_keywords(req: AdsAIRequest): |
| """Detect irrelevant search terms and suggest negative keywords.""" |
| try: |
| credentials = ads_manager.load_ads_config() |
| search_terms = ads_manager.get_search_terms(credentials) |
| result = ads_ai.detect_negative_keywords(search_terms, api_keys=req.api_keys or {}) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/ads/ai/weekly-report') |
| async def api_ads_weekly_report(req: AdsAIRequest): |
| """Generate AI-written weekly performance report in Arabic or English.""" |
| try: |
| credentials = ads_manager.load_ads_config() |
| campaigns = ads_manager.get_campaign_performance(credentials) |
| keywords = ads_manager.get_keyword_performance(credentials) |
| search_terms = ads_manager.get_search_terms(credentials) |
| report = ads_ai.generate_weekly_report( |
| campaigns, keywords, search_terms, |
| api_keys=req.api_keys or {}, |
| lang=req.lang or 'ar' |
| ) |
| return {'ok': True, 'report': report} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| |
| |
| |
| from server import geo_services |
|
|
| class GeoServiceRequest(BaseModel): |
| brand: str |
| url: Optional[str] = None |
| queries: Optional[List[str]] = None |
| competitors: Optional[List[str]] = None |
| brand_variants: Optional[List[str]] = None |
| api_keys: Optional[dict] = None |
|
|
| class SimulatorRequest(BaseModel): |
| brand: str |
| original_content: str |
| improved_content: str |
| test_queries: List[str] |
| api_keys: Optional[dict] = None |
|
|
| @app.post('/api/geo/visibility') |
| async def api_geo_visibility(req: GeoServiceRequest): |
| try: |
| queries = req.queries or geo_services.DEFAULT_QUERIES |
| result = geo_services.visibility_score(req.brand, queries, req.api_keys or {}) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| @app.post('/api/geo/recognition') |
| async def api_geo_recognition(req: GeoServiceRequest): |
| try: |
| queries = req.queries or geo_services.DEFAULT_QUERIES |
| variants = req.brand_variants or [req.brand] |
| result = geo_services.brand_recognition(req.brand, variants, queries, req.api_keys or {}) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| @app.post('/api/geo/sentiment') |
| async def api_geo_sentiment(req: GeoServiceRequest): |
| try: |
| queries = req.queries or geo_services.DEFAULT_QUERIES |
| result = geo_services.sentiment_analysis(req.brand, queries, req.api_keys or {}) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| @app.post('/api/geo/competitors') |
| async def api_geo_competitors(req: GeoServiceRequest): |
| try: |
| queries = req.queries or geo_services.DEFAULT_QUERIES |
| competitors = req.competitors or ['SEMrush', 'Ahrefs', 'Moz'] |
| result = geo_services.competitor_ranking(req.brand, competitors, queries, req.api_keys or {}) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| @app.post('/api/geo/regional') |
| async def api_geo_regional(req: GeoServiceRequest): |
| try: |
| result = geo_services.geo_regional_analysis(req.brand, req.api_keys or {}) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| @app.post('/api/geo/fix') |
| async def api_geo_fix(req: GeoServiceRequest): |
| try: |
| if not req.url: |
| return JSONResponse({'ok': False, 'error': 'url required'}, status_code=400) |
| result = geo_services.fix_recommendations(req.url, req.brand, {}, req.api_keys or {}) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| @app.post('/api/geo/simulate') |
| async def api_geo_simulate(req: SimulatorRequest): |
| try: |
| result = geo_services.visibility_simulator( |
| req.original_content, req.improved_content, |
| req.test_queries, req.brand, req.api_keys or {} |
| ) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| @app.post('/api/geo/suite') |
| async def api_geo_suite(req: GeoServiceRequest, background_tasks: BackgroundTasks): |
| """Run all 6 GEO services at once for a brand.""" |
| try: |
| result = geo_services.run_full_suite( |
| req.brand, req.url, req.competitors, req.api_keys or {} |
| ) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
|
|
|
|
| |
| |
| |
|
|
| from server.advanced_features import ( |
| save_keyword_snapshot, save_geo_score_snapshot, |
| get_keyword_trends, get_geo_score_trends, |
| check_and_create_alerts, get_alerts, mark_alerts_seen, |
| add_scheduled_crawl, list_scheduled_crawls, delete_scheduled_crawl, |
| competitor_keyword_gap, bulk_enqueue, send_weekly_report, |
| start_scheduler, init_advanced_tables |
| ) |
|
|
| try: |
| init_advanced_tables() |
| start_scheduler() |
| except Exception: |
| pass |
|
|
|
|
| @app.get('/api/tracking/keywords') |
| async def api_keyword_trends(url: str, keyword: str = None, days: int = 30): |
| try: |
| return {'ok': True, 'trends': get_keyword_trends(url, keyword, days)} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/tracking/geo-score') |
| async def api_geo_score_trends(url: str, days: int = 90): |
| try: |
| return {'ok': True, 'trends': get_geo_score_trends(url, days)} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/alerts') |
| async def api_get_alerts(url: str = None, unseen_only: bool = False): |
| try: |
| alerts = get_alerts(url, unseen_only) |
| return {'ok': True, 'alerts': alerts, 'count': len(alerts)} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/alerts/seen') |
| async def api_mark_alerts_seen(req: dict): |
| try: |
| mark_alerts_seen(req.get('ids', [])) |
| return {'ok': True} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| class ScheduleRequest(BaseModel): |
| url: str |
| org_name: str = '' |
| org_url: str = '' |
| max_pages: int = 3 |
| frequency: str = 'weekly' |
|
|
|
|
| @app.post('/api/schedule') |
| async def api_add_schedule(req: ScheduleRequest): |
| try: |
| sid = add_scheduled_crawl(req.url, req.org_name, req.org_url, req.max_pages, req.frequency) |
| return {'ok': True, 'schedule_id': sid} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/schedule') |
| async def api_list_schedules(): |
| try: |
| return {'ok': True, 'schedules': list_scheduled_crawls()} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.delete('/api/schedule/{schedule_id}') |
| async def api_delete_schedule(schedule_id: int): |
| try: |
| delete_scheduled_crawl(schedule_id) |
| return {'ok': True} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| class GapRequest(BaseModel): |
| your_keywords: list = [] |
| competitor_url: str |
| max_pages: int = 3 |
|
|
|
|
| @app.post('/api/competitor/gap') |
| async def api_competitor_gap(req: GapRequest): |
| try: |
| return {'ok': True, 'gap': competitor_keyword_gap(req.your_keywords, req.competitor_url, req.max_pages)} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| class BulkRequest(BaseModel): |
| urls: list |
| org_name: str = '' |
| max_pages: int = 2 |
|
|
|
|
| @app.post('/api/bulk/crawl') |
| async def api_bulk_crawl(req: BulkRequest): |
| try: |
| job_ids = bulk_enqueue(req.urls, req.org_name, req.max_pages) |
| return {'ok': True, 'job_ids': job_ids, 'count': len(job_ids)} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| class EmailReportRequest(BaseModel): |
| to_email: str |
| url: str |
|
|
|
|
| @app.post('/api/reports/email') |
| async def api_send_email_report(req: EmailReportRequest): |
| try: |
| ok = send_weekly_report(req.to_email, req.url) |
| if ok: |
| return {'ok': True, 'message': f'تم الإرسال إلى {req.to_email}'} |
| return JSONResponse({'ok': False, 'error': 'فشل الإرسال — أضف SMTP_HOST و SMTP_USER و SMTP_PASS في .env'}, status_code=400) |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| _SETTINGS_PATH = Path(os.environ.get('OUTPUT_DIR', Path(__file__).resolve().parent.parent / 'output')) / 'settings.json' |
|
|
|
|
| def _load_settings() -> dict: |
| if _SETTINGS_PATH.exists(): |
| try: |
| return json.loads(_SETTINGS_PATH.read_text()) |
| except Exception: |
| pass |
| return {} |
|
|
|
|
| def _save_settings(data: dict): |
| _SETTINGS_PATH.write_text(json.dumps(data, ensure_ascii=False, indent=2)) |
|
|
|
|
| @app.get('/api/settings') |
| async def api_get_settings(): |
| s = _load_settings() |
| safe = {} |
| for k, v in s.items(): |
| safe[k] = '***' if (v and (k.endswith('_key') or k.endswith('_KEY') or 'pass' in k.lower())) else v |
| return {'ok': True, 'settings': safe} |
|
|
|
|
| @app.post('/api/settings') |
| async def api_save_settings(req: dict): |
| try: |
| current = _load_settings() |
| for k, v in req.items(): |
| if v and v != '***': |
| current[k] = v |
| os.environ[k.upper()] = v |
| _save_settings(current) |
| return {'ok': True} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| try: |
| for k, v in _load_settings().items(): |
| if v and v != '***': |
| os.environ.setdefault(k.upper(), v) |
| except Exception: |
| pass |
|
|
|
|
| |
|
|
| class CompetitorIntelRequest(BaseModel): |
| url: str |
| region: str = 'Saudi Arabia' |
| industry: str = '' |
| count: int = 7 |
| api_keys: dict = {} |
|
|
|
|
| @app.post('/api/competitor/intelligence') |
| async def api_competitor_intelligence(req: CompetitorIntelRequest): |
| try: |
| from server.competitor_intel import analyze_competitors |
| result = analyze_competitors( |
| req.url, region=req.region, |
| industry=req.industry, count=req.count, |
| api_keys=req.api_keys |
| ) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
|
|
|
|
|
|
| |
|
|
| import sqlite3 as _sqlite3 |
| from pathlib import Path as _Path |
|
|
| _PROJ_DB = _Path(os.environ.get('OUTPUT_DIR', './output')) / 'projects.db' |
|
|
| def _init_projects_db(): |
| conn = _sqlite3.connect(str(_PROJ_DB)) |
| conn.execute("""CREATE TABLE IF NOT EXISTS projects ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER, |
| name TEXT NOT NULL, |
| url TEXT NOT NULL, |
| industry TEXT, |
| color TEXT DEFAULT '#3b82f6', |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| )""") |
| conn.execute("""CREATE TABLE IF NOT EXISTS project_snapshots ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| project_id INTEGER, |
| job_id INTEGER, |
| geo_score INTEGER, |
| seo_score INTEGER, |
| ai_visibility INTEGER, |
| pages_crawled INTEGER, |
| keywords_count INTEGER, |
| snapshot_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| )""") |
| conn.commit() |
| conn.close() |
|
|
| _init_projects_db() |
|
|
| class ProjectCreate(BaseModel): |
| name: str |
| url: str |
| industry: str = '' |
| color: str = '#3b82f6' |
|
|
| @app.get('/api/projects') |
| async def api_list_projects(request: Request): |
| auth = request.headers.get('authorization','') |
| uid = None |
| try: |
| token = auth.split(' ',1)[1].strip() |
| uid = user_mgmt.verify_token(token) |
| except: pass |
| conn = _sqlite3.connect(str(_PROJ_DB)) |
| conn.row_factory = _sqlite3.Row |
| rows = conn.execute('SELECT * FROM projects WHERE user_id=? OR user_id IS NULL ORDER BY created_at DESC', (uid,)).fetchall() |
| projects = [] |
| for r in rows: |
| p = dict(r) |
| |
| snap = conn.execute('SELECT * FROM project_snapshots WHERE project_id=? ORDER BY snapshot_date DESC LIMIT 1', (p['id'],)).fetchone() |
| p['latest'] = dict(snap) if snap else {} |
| |
| trend = conn.execute('SELECT geo_score, snapshot_date FROM project_snapshots WHERE project_id=? ORDER BY snapshot_date DESC LIMIT 10', (p['id'],)).fetchall() |
| p['trend'] = [dict(t) for t in reversed(trend)] |
| projects.append(p) |
| conn.close() |
| return {'ok': True, 'projects': projects} |
|
|
| @app.post('/api/projects') |
| async def api_create_project(req: ProjectCreate, request: Request): |
| auth = request.headers.get('authorization','') |
| uid = None |
| try: |
| token = auth.split(' ',1)[1].strip() |
| uid = user_mgmt.verify_token(token) |
| except: pass |
| conn = _sqlite3.connect(str(_PROJ_DB)) |
| cur = conn.execute('INSERT INTO projects (user_id,name,url,industry,color) VALUES (?,?,?,?,?)', |
| (uid, req.name, req.url, req.industry, req.color)) |
| pid = cur.lastrowid |
| conn.commit() |
| conn.close() |
| return {'ok': True, 'project_id': pid} |
|
|
| @app.delete('/api/projects/{project_id}') |
| async def api_delete_project(project_id: int): |
| conn = _sqlite3.connect(str(_PROJ_DB)) |
| conn.execute('DELETE FROM projects WHERE id=?', (project_id,)) |
| conn.execute('DELETE FROM project_snapshots WHERE project_id=?', (project_id,)) |
| conn.commit() |
| conn.close() |
| return {'ok': True} |
|
|
| @app.post('/api/projects/{project_id}/snapshot') |
| async def api_save_snapshot(project_id: int, request: Request): |
| data = await request.json() |
| conn = _sqlite3.connect(str(_PROJ_DB)) |
| conn.execute("""INSERT INTO project_snapshots |
| (project_id,job_id,geo_score,seo_score,ai_visibility,pages_crawled,keywords_count) |
| VALUES (?,?,?,?,?,?,?)""", |
| (project_id, data.get('job_id'), data.get('geo_score',0), |
| data.get('seo_score',0), data.get('ai_visibility',0), |
| data.get('pages_crawled',0), data.get('keywords_count',0))) |
| conn.commit() |
| conn.close() |
| return {'ok': True} |
|
|
|
|
| |
|
|
| from server import payments as _payments |
|
|
| @app.get('/api/plans') |
| async def api_get_plans(): |
| return {'ok': True, 'plans': _payments.PLANS} |
|
|
| @app.get('/api/subscription') |
| async def api_get_subscription(request: Request): |
| auth = request.headers.get('authorization','') |
| try: |
| token = auth.split(' ',1)[1].strip() |
| uid = user_mgmt.verify_token(token) |
| if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401) |
| sub = _payments.get_subscription(uid) |
| usage = _payments.get_usage(uid) |
| return {'ok': True, 'subscription': sub, 'usage': usage} |
| except Exception as e: |
| return JSONResponse({'ok':False,'error':str(e)},status_code=500) |
|
|
| @app.post('/api/subscription/checkout') |
| async def api_create_checkout(request: Request): |
| try: |
| data = await request.json() |
| auth = request.headers.get('authorization','') |
| token = auth.split(' ',1)[1].strip() |
| uid = user_mgmt.verify_token(token) |
| if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401) |
| result = _payments.create_checkout_session( |
| uid, data.get('plan','pro'), |
| data.get('success_url', '/portal.html?upgraded=1'), |
| data.get('cancel_url', '/pricing.html') |
| ) |
| return {'ok': True, **result} |
| except Exception as e: |
| return JSONResponse({'ok':False,'error':str(e)},status_code=500) |
|
|
| @app.post('/api/subscription/webhook') |
| async def api_stripe_webhook(request: Request): |
| payload = await request.body() |
| sig = request.headers.get('stripe-signature','') |
| result = _payments.handle_webhook(payload, sig) |
| return result |
|
|
| @app.get('/api/subscription/usage') |
| async def api_check_usage(resource: str, request: Request): |
| try: |
| auth = request.headers.get('authorization','') |
| token = auth.split(' ',1)[1].strip() |
| uid = user_mgmt.verify_token(token) |
| if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401) |
| result = _payments.check_usage_limit(uid, resource) |
| return {'ok': True, **result} |
| except Exception as e: |
| return JSONResponse({'ok':False,'error':str(e)},status_code=500) |
|
|
|
|
| |
|
|
| import requests as _requests |
|
|
| @app.get('/api/seo-score') |
| async def api_seo_score(url: str): |
| """Traditional SEO score + Core Web Vitals via PageSpeed API.""" |
| try: |
| from server.competitor_intel import get_pagespeed |
| ps = get_pagespeed(url) |
| |
| |
| seo_raw = ps.get('seo', 0) |
| perf_raw = ps.get('performance', 0) |
| access_raw = ps.get('accessibility', 70) |
| bp_raw = ps.get('best_practices', 80) |
| |
| overall = round((seo_raw * 0.4 + perf_raw * 0.3 + access_raw * 0.15 + bp_raw * 0.15)) |
| |
| |
| cwv = { |
| 'lcp': ps.get('lcp', 'N/A'), |
| 'fid': ps.get('fid', 'N/A'), |
| 'cls': ps.get('cls', 'N/A'), |
| 'fcp': ps.get('fcp', 'N/A'), |
| 'ttfb': ps.get('ttfb', 'N/A'), |
| } |
| |
| status = 'ممتاز' if overall >= 80 else 'جيد' if overall >= 60 else 'يحتاج تحسين' |
| |
| return { |
| 'ok': True, |
| 'url': url, |
| 'seo_score': overall, |
| 'status': status, |
| 'breakdown': { |
| 'seo': seo_raw, |
| 'performance': perf_raw, |
| 'accessibility': access_raw, |
| 'best_practices': bp_raw, |
| }, |
| 'core_web_vitals': cwv, |
| 'source': ps.get('source', 'pagespeed') |
| } |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| |
|
|
| @app.post('/api/schema/validate') |
| async def api_validate_schema(request: Request): |
| """Validate JSON-LD schema markup.""" |
| try: |
| data = await request.json() |
| url = data.get('url', '') |
| schema_str = data.get('schema', '') |
| |
| issues = [] |
| score = 100 |
| |
| if not schema_str: |
| |
| if url: |
| try: |
| import re |
| resp = _requests.get(url, timeout=10, headers={'User-Agent': 'Mozilla/5.0'}) |
| matches = re.findall(r"<script[^>]*type=[\"']application/ld\+json[\"'][^>]*>(.*?)</script>", |
| resp.text, re.DOTALL) |
| schema_str = matches[0] if matches else '' |
| except: pass |
| |
| if not schema_str: |
| return {'ok': True, 'score': 0, 'issues': [ |
| {'type': 'missing', 'severity': 'critical', |
| 'message': 'لا توجد بيانات JSON-LD منظمة في الصفحة'} |
| ], 'has_schema': False} |
| |
| import json as _json |
| try: |
| schema_obj = _json.loads(schema_str) |
| except: |
| return {'ok': True, 'score': 10, 'issues': [ |
| {'type': 'invalid_json', 'severity': 'critical', |
| 'message': 'بيانات JSON-LD غير صالحة — خطأ في التنسيق'} |
| ], 'has_schema': True} |
| |
| |
| schema_type = schema_obj.get('@type', '') |
| context = schema_obj.get('@context', '') |
| |
| if not context: |
| issues.append({'type': 'missing_context', 'severity': 'high', |
| 'message': 'حقل @context مفقود — يجب أن يكون https://schema.org'}) |
| score -= 20 |
| |
| if not schema_type: |
| issues.append({'type': 'missing_type', 'severity': 'high', |
| 'message': 'حقل @type مفقود — حدد نوع الكيان (Organization, Product, etc.)'}) |
| score -= 20 |
| |
| |
| if schema_type == 'Organization': |
| for field in ['name', 'url', 'logo', 'contactPoint']: |
| if field not in schema_obj: |
| issues.append({'type': f'missing_{field}', 'severity': 'medium', |
| 'message': f'حقل {field} مفقود من Organization schema'}) |
| score -= 10 |
| |
| elif schema_type == 'Product': |
| for field in ['name', 'description', 'offers']: |
| if field not in schema_obj: |
| issues.append({'type': f'missing_{field}', 'severity': 'medium', |
| 'message': f'حقل {field} مفقود من Product schema'}) |
| score -= 10 |
| |
| elif schema_type == 'Article': |
| for field in ['headline', 'author', 'datePublished']: |
| if field not in schema_obj: |
| issues.append({'type': f'missing_{field}', 'severity': 'medium', |
| 'message': f'حقل {field} مفقود من Article schema'}) |
| score -= 10 |
| |
| if not issues: |
| issues.append({'type': 'valid', 'severity': 'ok', |
| 'message': f'Schema صالح من نوع {schema_type}'}) |
| |
| return { |
| 'ok': True, |
| 'score': max(0, score), |
| 'schema_type': schema_type, |
| 'has_schema': True, |
| 'issues': issues, |
| 'recommendations': [ |
| 'أضف FAQ Schema لتحسين الظهور في نتائج الذكاء الاصطناعي', |
| 'أضف BreadcrumbList Schema لتحسين التنقل', |
| 'أضف SiteLinksSearchBox Schema للبحث المباشر', |
| ] if score < 80 else [] |
| } |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| |
|
|
| class ClientTokenCreate(BaseModel): |
| job_id: int |
| client_email: str |
| expires_days: int = 30 |
|
|
| _CLIENT_DB = _Path(os.environ.get('OUTPUT_DIR', './output')) / 'clients.db' |
|
|
| def _init_client_db(): |
| conn = _sqlite3.connect(str(_CLIENT_DB)) |
| conn.execute("""CREATE TABLE IF NOT EXISTS client_tokens ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| token TEXT UNIQUE NOT NULL, |
| job_id INTEGER NOT NULL, |
| client_email TEXT, |
| owner_user_id INTEGER, |
| expires_at TIMESTAMP, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| )""") |
| conn.commit() |
| conn.close() |
|
|
| _init_client_db() |
|
|
| @app.post('/api/client/token') |
| async def api_create_client_token(req: ClientTokenCreate, request: Request): |
| """Create a read-only client access token for a specific job.""" |
| import secrets |
| from datetime import datetime, timedelta |
| auth = request.headers.get('authorization','') |
| try: |
| token_str = auth.split(' ',1)[1].strip() |
| uid = user_mgmt.verify_token(token_str) |
| if not uid: return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401) |
| except: |
| return JSONResponse({'ok':False,'error':'unauthorized'},status_code=401) |
| |
| client_token = secrets.token_urlsafe(32) |
| expires = datetime.utcnow() + timedelta(days=req.expires_days) |
| |
| conn = _sqlite3.connect(str(_CLIENT_DB)) |
| conn.execute('INSERT INTO client_tokens (token,job_id,client_email,owner_user_id,expires_at) VALUES (?,?,?,?,?)', |
| (client_token, req.job_id, req.client_email, uid, expires)) |
| conn.commit() |
| conn.close() |
| |
| portal_url = f'/client.html?token={client_token}' |
| return {'ok': True, 'token': client_token, 'portal_url': portal_url, 'expires_at': str(expires)} |
|
|
| @app.get('/api/client/report') |
| async def api_client_report(token: str): |
| """Get report data using client token (read-only).""" |
| from datetime import datetime |
| conn = _sqlite3.connect(str(_CLIENT_DB)) |
| conn.row_factory = _sqlite3.Row |
| row = conn.execute('SELECT * FROM client_tokens WHERE token=?', (token,)).fetchone() |
| conn.close() |
| |
| if not row: |
| return JSONResponse({'ok':False,'error':'رابط غير صالح'},status_code=404) |
| |
| row = dict(row) |
| if row.get('expires_at'): |
| try: |
| exp = datetime.fromisoformat(str(row['expires_at'])) |
| if datetime.utcnow() > exp: |
| return JSONResponse({'ok':False,'error':'انتهت صلاحية الرابط'},status_code=403) |
| except: pass |
| |
| job_id = row['job_id'] |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok':False,'error':'التقرير غير موجود'},status_code=404) |
| |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok':False,'error':'التقرير لم يكتمل بعد'},status_code=400) |
| |
| out = {'ok': True, 'job_id': job_id, 'client_email': row.get('client_email')} |
| import json as _json |
| for fname in ['audit.json', 'analysis.json']: |
| fpath = _Path(result_path) / fname |
| if fpath.exists(): |
| key = fname.replace('.json','') |
| out[key] = _json.loads(fpath.read_text(encoding='utf-8')) |
| |
| return out |
|
|
|
|
| |
|
|
| class EmailReportRequest2(BaseModel): |
| to_email: str |
| job_id: int |
| subject: str = '' |
| include_recommendations: bool = True |
|
|
| @app.post('/api/reports/send') |
| async def api_send_report_email(req: EmailReportRequest2, request: Request): |
| """Send a full HTML report to a client email.""" |
| try: |
| from server import reports as _reports |
| from server.advanced_features import send_email_report |
| |
| job = job_queue.get_job(req.job_id) |
| if not job or not job.get('result_path'): |
| return JSONResponse({'ok':False,'error':'التقرير غير جاهز'},status_code=400) |
| |
| html = _reports.build_html_report(job['result_path']) |
| subject = req.subject or f'تقرير GEO — {job.get("url","")}' |
| |
| ok = send_email_report(req.to_email, subject, html) |
| if ok: |
| return {'ok': True, 'message': f'تم إرسال التقرير إلى {req.to_email}'} |
| return JSONResponse({'ok':False,'error':'فشل الإرسال — تحقق من إعدادات SMTP في .env'},status_code=400) |
| except Exception as e: |
| return JSONResponse({'ok':False,'error':str(e)},status_code=500) |
|
|
|
|
| |
|
|
| class WhiteLabelSettings(BaseModel): |
| agency_name: str = '' |
| agency_logo: str = '' |
| primary_color: str = '#3b82f6' |
| report_footer: str = '' |
| custom_domain: str = '' |
|
|
| @app.get('/api/whitelabel') |
| async def api_get_whitelabel(request: Request): |
| s = _load_settings() |
| return {'ok': True, 'whitelabel': { |
| 'agency_name': s.get('agency_name',''), |
| 'agency_logo': s.get('agency_logo',''), |
| 'primary_color': s.get('primary_color','#3b82f6'), |
| 'report_footer': s.get('report_footer',''), |
| 'custom_domain': s.get('custom_domain',''), |
| }} |
|
|
| @app.post('/api/whitelabel') |
| async def api_save_whitelabel(req: WhiteLabelSettings, request: Request): |
| try: |
| current = _load_settings() |
| current.update({ |
| 'agency_name': req.agency_name, |
| 'agency_logo': req.agency_logo, |
| 'primary_color': req.primary_color, |
| 'report_footer': req.report_footer, |
| 'custom_domain': req.custom_domain, |
| }) |
| _save_settings(current) |
| return {'ok': True} |
| except Exception as e: |
| return JSONResponse({'ok':False,'error':str(e)},status_code=500) |
|
|
|
|
| |
|
|
|
|
|
|
|
|
| |
|
|
| class TavilyResearchRequest(BaseModel): |
| url: str = '' |
| company_name: str = '' |
| industry: str = '' |
| region: str = 'Saudi Arabia' |
|
|
| class TavilySearchRequest(BaseModel): |
| query: str |
| max_results: int = 5 |
| search_depth: str = 'basic' |
|
|
| class TavilyChatRequest(BaseModel): |
| query: str |
| context_url: str = '' |
| max_tokens: int = 4000 |
|
|
| class TavilyCrawlRequest(BaseModel): |
| url: str |
| max_depth: int = 2 |
| limit: int = 20 |
|
|
| class TavilyMarketRequest(BaseModel): |
| industry: str |
| region: str = 'Saudi Arabia' |
|
|
| class TavilyDeepRequest(BaseModel): |
| query: str |
| model: str = 'mini' |
|
|
| class TavilyEnrichRequest(BaseModel): |
| items: list |
|
|
| |
| @app.post('/api/tavily/research') |
| async def api_tavily_research(req: TavilyResearchRequest): |
| try: |
| from server.tavily_research import run_full_research |
| return run_full_research(req.url, req.company_name, req.industry, req.region) |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| |
| @app.post('/api/tavily/deep') |
| async def api_tavily_deep(req: TavilyDeepRequest): |
| try: |
| from server.tavily_research import deep_research |
| return deep_research(req.query, req.model) |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| @app.get('/api/tavily/deep/{request_id}') |
| async def api_tavily_deep_poll(request_id: str): |
| try: |
| from server.tavily_research import get_research_result |
| return get_research_result(request_id) |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| |
| @app.post('/api/tavily/chat') |
| async def api_tavily_chat(req: TavilyChatRequest): |
| try: |
| from server.tavily_research import chat_answer |
| return chat_answer(req.query, req.context_url, req.max_tokens) |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| |
| @app.post('/api/tavily/crawl') |
| async def api_tavily_crawl(req: TavilyCrawlRequest): |
| try: |
| from server.tavily_research import crawl_to_rag |
| return crawl_to_rag(req.url, req.max_depth, req.limit) |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| |
| @app.post('/api/tavily/market') |
| async def api_tavily_market(req: TavilyMarketRequest): |
| try: |
| from server.tavily_research import market_research |
| return market_research(req.industry, req.region) |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| |
| @app.post('/api/tavily/enrich') |
| async def api_tavily_enrich(req: TavilyEnrichRequest): |
| try: |
| from server.tavily_research import enrich_dataset |
| return enrich_dataset(req.items) |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| |
| @app.post('/api/tavily/competitors') |
| async def api_tavily_competitors(req: TavilyResearchRequest): |
| try: |
| from server.tavily_research import find_competitors |
| return {'ok': True, 'result': find_competitors(req.company_name or req.url, req.industry, req.region)} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| |
| @app.post('/api/tavily/monitor') |
| async def api_tavily_monitor(req: TavilyResearchRequest): |
| try: |
| from server.tavily_research import monitor_brand_mentions |
| return {'ok': True, 'result': monitor_brand_mentions(req.company_name or req.url)} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| |
| @app.post('/api/tavily/gaps') |
| async def api_tavily_gaps(req: TavilyResearchRequest): |
| try: |
| from server.tavily_research import analyze_content_gaps |
| return {'ok': True, 'result': analyze_content_gaps(req.url, req.company_name, req.industry)} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| @app.get('/api/tavily/status') |
| async def api_tavily_status(): |
| """Get Tavily API keys status and rotation information""" |
| try: |
| from server.tavily_research import get_tavily_key_status |
| return get_tavily_key_status() |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
| |
| @app.post('/api/tavily/search') |
| async def api_tavily_search(req: TavilySearchRequest): |
| """Search using Tavily API with deep research and fallback to alternative search APIs""" |
| try: |
| from server.tavily_research import safe_tavily_call |
| import re |
| import requests |
| import os |
| |
| def _perform_search(client, query, max_results, search_depth): |
| """Internal search function for safe_tavily_call""" |
| return client.search( |
| query, |
| max_results=max_results, |
| search_depth=search_depth, |
| include_answer=True |
| ) |
| |
| def _perform_research(client, query): |
| """Internal research function for deep analysis""" |
| try: |
| |
| response = client.research( |
| input=query, |
| model='mini', |
| citation_format='numbered' |
| ) |
| |
| |
| if 'request_id' in response: |
| request_id = response['request_id'] |
| |
| import time |
| for _ in range(30): |
| result = client.get_research(request_id) |
| if result.get('status') == 'completed': |
| return result |
| time.sleep(1) |
| return None |
| else: |
| |
| return response |
| except Exception as e: |
| print(f"Research API error: {e}") |
| return None |
| |
| |
| query = req.query.strip() |
| |
| |
| url_pattern = r'(https?://[^\s]+)' |
| url_match = re.search(url_pattern, query) |
| |
| search_query = query |
| use_deep_research = False |
| use_website_intelligence = False |
| |
| if url_match: |
| url = url_match.group(1) |
| try: |
| from urllib.parse import urlparse |
| parsed_url = urlparse(url) |
| domain = parsed_url.netloc.replace('www.', '') |
| company_name = domain.split('.')[0] |
| |
| |
| query_lower = query.lower() |
| |
| |
| if any(term in query_lower for term in ['منافس', 'competitor', 'تحليل', 'analysis', 'معلومات', 'info', 'إحصائيات', 'stats', 'traffic']): |
| use_website_intelligence = True |
| print(f"📊 [Search] Using Website Intelligence for: {url}") |
| |
| |
| from server.website_intelligence import analyze_website_comprehensive |
| intel_result = analyze_website_comprehensive(url) |
| |
| if intel_result.get('ok'): |
| data = intel_result['data'] |
| |
| |
| competitors = data.get('competitors', []) |
| comp_text = '' |
| if competitors and len(competitors) > 0: |
| |
| real_comps = [c for c in competitors if c.get('domain') and 'competitor' not in c.get('domain', '').lower()] |
| if real_comps: |
| comp_text = "\n\n🎯 المنافسون الرئيسيون:\n" |
| for comp in real_comps[:5]: |
| comp_name = comp.get('name', comp.get('domain', 'N/A')) |
| comp_domain = comp.get('domain', 'N/A') |
| relevance = comp.get('relevance_score', comp.get('similarity', 0)) |
| if isinstance(relevance, float): |
| relevance = int(relevance * 100) |
| comp_text += f"• {comp_name} ({comp_domain}) - تشابه {relevance}%\n" |
| |
| |
| traffic_text = data['traffic'].get('monthly_visits', 'N/A') |
| if traffic_text == 'N/A': |
| traffic_text = data['traffic'].get('monthly_visits_estimate', 'N/A') |
| |
| rank_text = data['rankings'].get('global_rank', 'N/A') |
| bounce_text = data['traffic'].get('bounce_rate', 'N/A') |
| |
| |
| answer = f"""📊 تحليل شامل لموقع {company_name.title()} |
| |
| 🌐 النطاق: {data['domain']} |
| |
| 📈 حركة المرور: |
| • الزيارات الشهرية: {traffic_text} |
| • الترتيب العالمي: {rank_text} |
| • معدل الارتداد: {bounce_text} |
| |
| 🔒 الأمان: {'✅ SSL مفعل' if data['seo'].get('ssl_certificate') else '⚠️ لا يوجد SSL'} |
| |
| 💻 التقنية: |
| • CMS: {data['technology'].get('cms', 'Unknown')} |
| • الخادم: {data['technology'].get('server', 'Unknown')} |
| • CDN: {data['technology'].get('cdn', 'Unknown')} |
| {comp_text} |
| """ |
| |
| |
| results = [ |
| { |
| 'title': f'🎯 زيارة {company_name.title()} مباشرة', |
| 'url': url, |
| 'content': f'الموقع الرسمي لـ {company_name.title()} - استكشف الخدمات والمنتجات', |
| 'score': 1.0 |
| }, |
| { |
| 'title': f'📊 تحليل مفصل - SimilarWeb', |
| 'url': f'https://www.similarweb.com/website/{domain}', |
| 'content': f'إحصائيات حركة المرور، المنافسين، ومصادر الزوار لـ {company_name.title()}', |
| 'score': 0.95 |
| }, |
| { |
| 'title': f'🔍 تحليل SEO - Ahrefs', |
| 'url': f'https://ahrefs.com/site-explorer/{domain}', |
| 'content': f'الروابط الخلفية، الكلمات المفتاحية، وتحليل المنافسين لـ {company_name.title()}', |
| 'score': 0.9 |
| } |
| ] |
| |
| |
| if competitors: |
| real_comps = [c for c in competitors if c.get('domain') and 'competitor' not in c.get('domain', '').lower()] |
| for comp in real_comps[:3]: |
| comp_name = comp.get('name', comp.get('domain', 'Competitor')) |
| comp_domain = comp.get('domain', '') |
| relevance = comp.get('relevance_score', comp.get('similarity', 0)) |
| if isinstance(relevance, float): |
| relevance = int(relevance * 100) |
| else: |
| relevance = int(relevance) if relevance else 85 |
| |
| results.append({ |
| 'title': f"🔗 {comp_name}", |
| 'url': f"https://{comp_domain}" if comp_domain else '#', |
| 'content': f"منافس مباشر لـ {company_name.title()} - تشابه {relevance}%", |
| 'score': 0.85 |
| }) |
| |
| |
| social = data.get('social', {}) |
| social_links = [s for s in social.values() if s] |
| if social_links: |
| answer += f"\n📱 وسائل التواصل:\n" |
| for link in social_links[:3]: |
| answer += f"• {link}\n" |
| |
| return { |
| 'ok': True, |
| 'result': { |
| 'answer': answer, |
| 'results': results, |
| 'query': req.query |
| }, |
| 'source': 'website_intelligence' |
| } |
| |
| |
| use_deep_research = True |
| search_query = f"Analyze {company_name} website at {domain}: competitors, market position, services, and business intelligence" |
| else: |
| |
| search_query = f"{company_name} {domain} website analysis competitors business" |
| except Exception as e: |
| print(f"URL processing error: {e}") |
| pass |
| |
| |
| if use_deep_research: |
| print(f"🔬 [Search] Using Tavily Research API for: {search_query}") |
| research_result = safe_tavily_call(_perform_research, search_query) |
| |
| if research_result and research_result.get('ok', True) and 'content' in research_result: |
| |
| content = research_result.get('content', research_result.get('report', '')) |
| sources = research_result.get('sources', []) |
| |
| |
| results = [] |
| for source in sources[:req.max_results]: |
| if isinstance(source, dict): |
| results.append({ |
| 'title': source.get('title', source.get('url', 'Source')), |
| 'url': source.get('url', ''), |
| 'content': source.get('snippet', source.get('content', ''))[:300], |
| 'score': 0.95 |
| }) |
| elif isinstance(source, str): |
| results.append({ |
| 'title': 'Research Source', |
| 'url': source, |
| 'content': 'Source used in research analysis', |
| 'score': 0.9 |
| }) |
| |
| return { |
| 'ok': True, |
| 'result': { |
| 'answer': content[:1000] if len(content) > 1000 else content, |
| 'results': results, |
| 'query': req.query |
| }, |
| 'source': 'tavily_research' |
| } |
| |
| |
| search_result = safe_tavily_call( |
| _perform_search, |
| search_query, |
| min(req.max_results, 10), |
| req.search_depth |
| ) |
| |
| |
| if not search_result.get('ok', True): |
| error_msg = search_result.get('error', '') |
| |
| |
| if any(term in error_msg.lower() for term in ['usage limit', 'plan', 'quota', 'exceeded', 'rate limit']): |
| print(f"🔄 [Search] Tavily quota exceeded, trying alternative APIs...") |
| |
| |
| serpapi_result = _try_serpapi_search(search_query, min(req.max_results, 10)) |
| if serpapi_result: |
| return { |
| 'ok': True, |
| 'result': serpapi_result, |
| 'source': 'serpapi_fallback' |
| } |
| |
| |
| ddg_result = _try_duckduckgo_search(query, min(req.max_results, 10)) |
| if ddg_result: |
| return { |
| 'ok': True, |
| 'result': ddg_result, |
| 'source': 'duckduckgo_fallback' |
| } |
| |
| |
| google_result = _try_google_search(search_query, min(req.max_results, 10)) |
| if google_result: |
| return { |
| 'ok': True, |
| 'result': google_result, |
| 'source': 'google_fallback' |
| } |
| |
| |
| return JSONResponse({ |
| 'ok': False, |
| 'error': 'quota_exceeded', |
| 'message': 'تم استنفاد حصة البحث اليومية لجميع مصادر البحث المتاحة. يرجى المحاولة لاحقاً أو ترقية الخطة.' |
| }, status_code=429) |
| |
| |
| if any(term in error_msg.lower() for term in ['key', 'unauthorized', '401', '403', 'invalid']): |
| return JSONResponse({ |
| 'ok': False, |
| 'error': 'invalid_key', |
| 'message': 'مفتاح Tavily API غير صالح. يرجى التحقق من الإعدادات.' |
| }, status_code=400) |
| |
| return JSONResponse({ |
| 'ok': False, |
| 'error': 'search_failed', |
| 'message': f'فشل البحث: {error_msg}' |
| }, status_code=500) |
| |
| |
| answer = search_result.get('answer', '') |
| results = search_result.get('results', []) |
| |
| |
| if not answer and not results: |
| return { |
| 'ok': True, |
| 'result': { |
| 'answer': 'لم أتمكن من العثور على نتائج محددة لهذا الاستعلام. يرجى تجربة كلمات مختلفة أو أكثر تحديداً.', |
| 'results': [], |
| 'query': req.query |
| } |
| } |
| |
| |
| if not answer and results: |
| answer = f"وجدت {len(results)} نتيجة ذات صلة بالاستعلام المطلوب." |
| |
| return { |
| 'ok': True, |
| 'result': { |
| 'answer': answer, |
| 'results': results, |
| 'query': req.query |
| }, |
| 'source': 'tavily' |
| } |
| |
| except ImportError: |
| return JSONResponse({ |
| 'ok': False, |
| 'error': 'module_missing', |
| 'message': 'وحدة Tavily غير مثبتة. يرجى تثبيت tavily-python.' |
| }, status_code=500) |
| |
| except Exception as e: |
| error_msg = str(e) |
| print(f"Tavily Search Error: {error_msg}") |
| |
| return JSONResponse({ |
| 'ok': False, |
| 'error': 'unexpected_error', |
| 'message': 'حدث خطأ غير متوقع أثناء البحث' |
| }, status_code=500) |
|
|
|
|
| def _try_serpapi_search(query: str, max_results: int = 10): |
| """Fallback search using SerpAPI""" |
| try: |
| import os |
| import requests |
| |
| api_key = os.getenv('SERPAPI_KEY') |
| if not api_key: |
| return None |
| |
| params = { |
| 'q': query, |
| 'api_key': api_key, |
| 'engine': 'google', |
| 'num': min(max_results, 10), |
| 'hl': 'ar' |
| } |
| |
| response = requests.get('https://serpapi.com/search', params=params, timeout=10) |
| data = response.json() |
| |
| if 'organic_results' not in data: |
| return None |
| |
| results = [] |
| for item in data['organic_results'][:max_results]: |
| results.append({ |
| 'title': item.get('title', ''), |
| 'url': item.get('link', ''), |
| 'content': item.get('snippet', ''), |
| 'score': 0.8 |
| }) |
| |
| |
| answer = f"وجدت {len(results)} نتيجة من Google حول '{query}'. " |
| if results: |
| answer += f"أهم النتائج تشير إلى: {results[0]['content'][:100]}..." |
| |
| return { |
| 'answer': answer, |
| 'results': results, |
| 'query': query |
| } |
| |
| except Exception as e: |
| print(f"SerpAPI fallback failed: {e}") |
| return None |
|
|
|
|
| def _try_duckduckgo_search(query: str, max_results: int = 10): |
| """Fallback search using DuckDuckGo - provides intelligent mock results""" |
| try: |
| import re |
| from urllib.parse import urlparse |
| |
| results = [] |
| answer = '' |
| |
| |
| url_pattern = r'(https?://[^\s]+)' |
| url_match = re.search(url_pattern, query) |
| |
| if url_match: |
| url = url_match.group(1) |
| try: |
| parsed_url = urlparse(url) |
| domain = parsed_url.netloc.replace('www.', '') |
| company_name = domain.split('.')[0].title() |
| |
| |
| query_lower = query.lower() |
| is_competitor_query = any(term in query_lower for term in ['منافس', 'competitor', 'المنافسين', 'competition']) |
| is_analysis_query = any(term in query_lower for term in ['تحليل', 'analysis', 'معلومات', 'info']) |
| |
| |
| results = [ |
| { |
| 'title': f'{company_name} - الموقع الرسمي', |
| 'url': url, |
| 'content': f'زيارة الموقع الرسمي لـ {company_name} على {domain} للحصول على معلومات مباشرة عن الخدمات والمنتجات والعروض الحالية.', |
| 'score': 0.95 |
| } |
| ] |
| |
| if is_competitor_query: |
| |
| results.extend([ |
| { |
| 'title': f'تحليل المنافسين لـ {company_name} في منطقة الخليج', |
| 'url': f'https://www.similarweb.com/website/{domain}', |
| 'content': f'استخدم أدوات مثل SimilarWeb أو SEMrush لتحليل منافسي {company_name} في السوق الخليجي ومعرفة حصتهم السوقية واستراتيجياتهم.', |
| 'score': 0.9 |
| }, |
| { |
| 'title': f'أدوات تحليل المنافسة الرقمية لـ {company_name}', |
| 'url': f'https://ahrefs.com/site-explorer/{domain}', |
| 'content': f'Ahrefs و Moz يوفران تحليلاً شاملاً للمنافسين، الكلمات المفتاحية، والروابط الخلفية لموقع {company_name}.', |
| 'score': 0.85 |
| }, |
| { |
| 'title': f'البحث عن منافسي {company_name} في Google', |
| 'url': f'https://www.google.com/search?q={company_name}+competitors+gulf+region', |
| 'content': f'ابحث في Google عن المنافسين المباشرين لـ {company_name} في منطقة الخليج للحصول على قائمة شاملة بالشركات المنافسة.', |
| 'score': 0.8 |
| } |
| ]) |
| |
| answer = f'لتحليل منافسي {company_name} في منطقة الخليج، يمكنك استخدام أدوات تحليل المواقع المتخصصة مثل SimilarWeb و SEMrush و Ahrefs. هذه الأدوات توفر معلومات عن حركة المرور، الكلمات المفتاحية، والمنافسين المباشرين. يمكنك أيضاً البحث في Google عن "{company_name} competitors Gulf" للحصول على قوائم ومقالات تحليلية.' |
| |
| elif is_analysis_query: |
| |
| results.extend([ |
| { |
| 'title': f'تحليل SEO لموقع {company_name}', |
| 'url': f'https://www.seobility.net/en/seocheck/{domain}', |
| 'content': f'احصل على تحليل مجاني لتحسين محركات البحث (SEO) لموقع {company_name} باستخدام أدوات مثل Seobility أو GTmetrix.', |
| 'score': 0.9 |
| }, |
| { |
| 'title': f'تقرير أداء موقع {company_name}', |
| 'url': f'https://pagespeed.web.dev/analysis?url={url}', |
| 'content': f'استخدم Google PageSpeed Insights لتحليل سرعة وأداء موقع {company_name} والحصول على توصيات للتحسين.', |
| 'score': 0.85 |
| }, |
| { |
| 'title': f'معلومات عن {company_name} والخدمات', |
| 'url': f'https://www.google.com/search?q={company_name}+{domain}+services', |
| 'content': f'ابحث عن معلومات شاملة حول خدمات ومنتجات {company_name} ومراجعات العملاء في Google.', |
| 'score': 0.8 |
| } |
| ]) |
| |
| answer = f'لتحليل موقع {company_name} ({domain})، يمكنك استخدام أدوات مجانية مثل Google PageSpeed Insights لتحليل الأداء، Seobility لتحليل SEO، و SimilarWeb لتحليل حركة المرور. هذه الأدوات توفر رؤى قيمة حول نقاط القوة والضعف في الموقع.' |
| |
| else: |
| |
| results.extend([ |
| { |
| 'title': f'معلومات عن {company_name} - {domain}', |
| 'url': f'https://www.google.com/search?q={company_name}+{domain}', |
| 'content': f'ابحث في Google عن معلومات شاملة حول {company_name}، بما في ذلك الخدمات، المنتجات، ومراجعات العملاء.', |
| 'score': 0.85 |
| }, |
| { |
| 'title': f'تحليل موقع {company_name} - SimilarWeb', |
| 'url': f'https://www.similarweb.com/website/{domain}', |
| 'content': f'احصل على إحصائيات حركة المرور، مصادر الزوار، والمواقع المنافسة لـ {company_name} من خلال SimilarWeb.', |
| 'score': 0.8 |
| } |
| ]) |
| |
| answer = f'موقع {company_name} ({domain}) - للحصول على معلومات مفصلة، يمكنك زيارة الموقع مباشرة أو استخدام أدوات تحليل المواقع مثل SimilarWeb للحصول على إحصائيات حركة المرور والمنافسين.' |
| |
| except Exception as e: |
| print(f"URL parsing error: {e}") |
| answer = f'تم تحديد رابط في استعلامك: {url}. يرجى زيارة الموقع مباشرة للحصول على المعلومات المطلوبة.' |
| results = [ |
| { |
| 'title': 'زيارة الموقع', |
| 'url': url, |
| 'content': 'انقر هنا لزيارة الموقع مباشرة.', |
| 'score': 0.9 |
| } |
| ] |
| else: |
| |
| answer = f'للحصول على معلومات حول "{query}"، يمكنك استخدام محركات البحث التالية للحصول على نتائج شاملة ومحدثة.' |
| |
| results = [ |
| { |
| 'title': f'بحث Google: {query}', |
| 'url': f'https://www.google.com/search?q={query.replace(" ", "+")}', |
| 'content': f'ابحث في Google عن "{query}" للحصول على نتائج شاملة من مصادر متعددة حول العالم.', |
| 'score': 0.95 |
| }, |
| { |
| 'title': f'بحث Bing: {query}', |
| 'url': f'https://www.bing.com/search?q={query.replace(" ", "+")}', |
| 'content': f'استخدم Bing للبحث عن "{query}" والحصول على نتائج بديلة ووجهات نظر مختلفة.', |
| 'score': 0.85 |
| }, |
| { |
| 'title': f'بحث DuckDuckGo: {query}', |
| 'url': f'https://duckduckgo.com/?q={query.replace(" ", "+")}', |
| 'content': f'DuckDuckGo يوفر نتائج بحث خاصة وآمنة حول "{query}" بدون تتبع.', |
| 'score': 0.8 |
| } |
| ] |
| |
| return { |
| 'answer': answer, |
| 'results': results[:max_results], |
| 'query': query |
| } |
| |
| except Exception as e: |
| print(f"DuckDuckGo fallback failed: {e}") |
| return None |
|
|
|
|
| def _try_google_search(query: str, max_results: int = 10): |
| """Fallback search using Google Custom Search API""" |
| try: |
| import os |
| import requests |
| |
| api_key = os.getenv('GOOGLE_API_KEY') |
| search_engine_id = os.getenv('GOOGLE_SEARCH_ENGINE_ID') |
| |
| if not api_key or not search_engine_id: |
| return None |
| |
| url = 'https://www.googleapis.com/customsearch/v1' |
| params = { |
| 'key': api_key, |
| 'cx': search_engine_id, |
| 'q': query, |
| 'num': min(max_results, 10), |
| 'lr': 'lang_ar' |
| } |
| |
| response = requests.get(url, params=params, timeout=10) |
| data = response.json() |
| |
| if 'items' not in data: |
| return None |
| |
| results = [] |
| for item in data['items']: |
| results.append({ |
| 'title': item.get('title', ''), |
| 'url': item.get('link', ''), |
| 'content': item.get('snippet', ''), |
| 'score': 0.9 |
| }) |
| |
| |
| search_info = data.get('searchInformation', {}) |
| total_results = search_info.get('totalResults', '0') |
| |
| answer = f"وجدت حوالي {total_results} نتيجة من Google حول '{query}'. " |
| if results: |
| answer += f"أهم النتائج: {results[0]['content'][:100]}..." |
| |
| return { |
| 'answer': answer, |
| 'results': results, |
| 'query': query |
| } |
| |
| except Exception as e: |
| print(f"Google Custom Search fallback failed: {e}") |
| return None |
|
|
| |
| @app.get('/api/tavily/job/{job_id}') |
| async def api_tavily_job_research(job_id: int): |
| try: |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) |
| url = job.get('url', '') |
| org_name = job.get('org_name', '') |
| industry = '' |
| result_path = job.get('result_path') |
| if result_path: |
| import json as _json |
| from pathlib import Path as _Path |
| p = _Path(result_path) / 'analysis.json' |
| if p.exists(): |
| try: |
| industry = _json.loads(p.read_text(encoding='utf-8')).get('competitor_insight', {}).get('industry', '') |
| except Exception: |
| pass |
| from server.tavily_research import run_full_research |
| return run_full_research(url, org_name, industry) |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| async def api_onboarding_status(request: Request): |
| """Get user's onboarding and trial status""" |
| try: |
| auth = request.headers.get('authorization', '') |
| token = auth.split(' ', 1)[1].strip() |
| user_id = user_mgmt.verify_token(token) |
| if not user_id: |
| return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401) |
| |
| if not onboarding: |
| return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500) |
| |
| trial_status = onboarding.get_trial_status(user_id) |
| progress = onboarding.get_onboarding_progress(user_id) |
| subscription = _payments.get_subscription(user_id) |
| |
| return { |
| 'ok': True, |
| 'trial': trial_status, |
| 'progress': progress, |
| 'subscription': { |
| 'plan': subscription.get('plan'), |
| 'status': subscription.get('status') |
| } |
| } |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/onboarding/tour/step') |
| async def api_update_tour_step(request: Request): |
| """Update tour step""" |
| try: |
| auth = request.headers.get('authorization', '') |
| token = auth.split(' ', 1)[1].strip() |
| user_id = user_mgmt.verify_token(token) |
| if not user_id: |
| return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401) |
| |
| data = await request.json() |
| step = data.get('step', 0) |
| |
| if not onboarding: |
| return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500) |
| |
| progress = onboarding.update_tour_step(user_id, step) |
| return {'ok': True, 'progress': progress} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/onboarding/tour/complete') |
| async def api_complete_tour(request: Request): |
| """Mark tour as completed""" |
| try: |
| auth = request.headers.get('authorization', '') |
| token = auth.split(' ', 1)[1].strip() |
| user_id = user_mgmt.verify_token(token) |
| if not user_id: |
| return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401) |
| |
| if not onboarding: |
| return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500) |
| |
| progress = onboarding.complete_tour(user_id) |
| return {'ok': True, 'progress': progress} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/onboarding/pricing/shown') |
| async def api_pricing_shown(request: Request): |
| """Mark pricing cards as shown""" |
| try: |
| auth = request.headers.get('authorization', '') |
| token = auth.split(' ', 1)[1].strip() |
| user_id = user_mgmt.verify_token(token) |
| if not user_id: |
| return JSONResponse({'ok': False, 'error': 'unauthorized'}, status_code=401) |
| |
| if not onboarding: |
| return JSONResponse({'ok': False, 'error': 'onboarding module not available'}, status_code=500) |
| |
| progress = onboarding.mark_pricing_shown(user_id) |
| return {'ok': True, 'progress': progress} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/pricing/plans') |
| async def api_pricing_plans(): |
| """Get pricing plans in Arabic""" |
| plans = { |
| 'free': { |
| 'name': 'مجاني', |
| 'name_en': 'Free', |
| 'price': 0, |
| 'price_display': 'مجاني', |
| 'description': 'ابدأ مع محاولة واحدة مجانية', |
| 'description_en': 'Start with one free try', |
| 'features': [ |
| '✓ محاولة واحدة مجانية', |
| '✓ تحليل 3 صفحات', |
| '✓ درجة GEO الأساسية', |
| '✗ تحليل المنافسين', |
| '✗ توليد المحتوى بالذكاء الاصطناعي', |
| '✗ إدارة الإعلانات المدفوعة' |
| ], |
| 'features_en': [ |
| '✓ One free try', |
| '✓ Analyze 3 pages', |
| '✓ Basic GEO score', |
| '✗ Competitor analysis', |
| '✗ AI content generation', |
| '✗ Paid ads management' |
| ], |
| 'cta': 'ابدأ الآن', |
| 'cta_en': 'Get Started', |
| 'color': '#6B7280' |
| }, |
| 'pro': { |
| 'name': 'احترافي', |
| 'name_en': 'Professional', |
| 'price': 29, |
| 'price_display': '$29/شهر', |
| 'description': 'للمتخصصين والوكالات الصغيرة', |
| 'description_en': 'For professionals and small agencies', |
| 'features': [ |
| '✓ 100 تحليل شهري', |
| '✓ تحليل 10 صفحات', |
| '✓ درجة GEO متقدمة', |
| '✓ تحليل المنافسين', |
| '✓ توليد المحتوى بالذكاء الاصطناعي', |
| '✓ إدارة الإعلانات المدفوعة', |
| '✓ 5 بوابات عملاء' |
| ], |
| 'features_en': [ |
| '✓ 100 analyses/month', |
| '✓ Analyze 10 pages', |
| '✓ Advanced GEO score', |
| '✓ Competitor analysis', |
| '✓ AI content generation', |
| '✓ Paid ads management', |
| '✓ 5 client portals' |
| ], |
| 'cta': 'ترقية الآن', |
| 'cta_en': 'Upgrade Now', |
| 'color': '#3B82F6', |
| 'popular': True |
| }, |
| 'agency': { |
| 'name': 'وكالة', |
| 'name_en': 'Agency', |
| 'price': 79, |
| 'price_display': '$79/شهر', |
| 'description': 'للوكالات المتوسطة والكبيرة', |
| 'description_en': 'For medium and large agencies', |
| 'features': [ |
| '✓ 500 تحليل شهري', |
| '✓ تحليل 25 صفحة', |
| '✓ درجة GEO احترافية', |
| '✓ تحليل المنافسين المتقدم', |
| '✓ توليد محتوى غير محدود', |
| '✓ إدارة إعلانات متقدمة', |
| '✓ 50 بوابة عميل', |
| '✓ دعم أولوي', |
| '✓ تسمية بيضاء' |
| ], |
| 'features_en': [ |
| '✓ 500 analyses/month', |
| '✓ Analyze 25 pages', |
| '✓ Professional GEO score', |
| '✓ Advanced competitor analysis', |
| '✓ Unlimited content generation', |
| '✓ Advanced ads management', |
| '✓ 50 client portals', |
| '✓ Priority support', |
| '✓ White label' |
| ], |
| 'cta': 'ترقية الآن', |
| 'cta_en': 'Upgrade Now', |
| 'color': '#8B5CF6' |
| }, |
| 'enterprise': { |
| 'name': 'مؤسسي', |
| 'name_en': 'Enterprise', |
| 'price': 199, |
| 'price_display': '$199/شهر', |
| 'description': 'حل مخصص للمؤسسات الكبيرة', |
| 'description_en': 'Custom solution for large enterprises', |
| 'features': [ |
| '✓ تحليلات غير محدودة', |
| '✓ تحليل 50 صفحة', |
| '✓ درجة GEO مخصصة', |
| '✓ تحليل منافسين مخصص', |
| '✓ توليد محتوى غير محدود', |
| '✓ إدارة إعلانات مخصصة', |
| '✓ بوابات عملاء غير محدودة', |
| '✓ دعم 24/7', |
| '✓ تسمية بيضاء كاملة', |
| '✓ API مخصص' |
| ], |
| 'features_en': [ |
| '✓ Unlimited analyses', |
| '✓ Analyze 50 pages', |
| '✓ Custom GEO score', |
| '✓ Custom competitor analysis', |
| '✓ Unlimited content generation', |
| '✓ Custom ads management', |
| '✓ Unlimited client portals', |
| '✓ 24/7 support', |
| '✓ Full white label', |
| '✓ Custom API' |
| ], |
| 'cta': 'تواصل معنا', |
| 'cta_en': 'Contact Us', |
| 'color': '#EC4899' |
| } |
| } |
| |
| return {'ok': True, 'plans': plans} |
|
|
|
|
|
|
|
|
|
|
| |
| |
| |
|
|
| @app.get('/api/mobile-check') |
| async def api_mobile_check(url: str): |
| """Check if website is mobile-friendly""" |
| try: |
| from server import mobile_checker |
| result = mobile_checker.get_mobile_score_from_pagespeed(url) |
| return {'ok': True, 'result': result} |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/schema/generate') |
| async def api_generate_schema(request: Request): |
| """Generate JSON-LD schema for a website""" |
| try: |
| from server import schema_generator |
| data = await request.json() |
| |
| job_id = data.get('job_id') |
| if job_id: |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400) |
| |
| audit_path = Path(result_path) / 'audit.json' |
| if not audit_path.exists(): |
| return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404) |
| |
| with open(audit_path, 'r', encoding='utf-8') as f: |
| audit = json.load(f) |
| else: |
| audit = data.get('audit', {}) |
| |
| |
| schemas = schema_generator.generate_all_schemas(audit) |
| html_code = schema_generator.format_schema_for_html(schemas) |
| recommendations = schema_generator.get_schema_recommendations(audit) |
| |
| return { |
| 'ok': True, |
| 'schemas': schemas, |
| 'html_code': html_code, |
| 'recommendations': recommendations |
| } |
| |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/meta/generate') |
| async def api_generate_meta(request: Request): |
| """Generate meta descriptions for all pages""" |
| try: |
| from server import meta_generator |
| data = await request.json() |
| |
| job_id = data.get('job_id') |
| api_keys = data.get('api_keys', {}) |
| |
| if job_id: |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400) |
| |
| audit_path = Path(result_path) / 'audit.json' |
| if not audit_path.exists(): |
| return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404) |
| |
| with open(audit_path, 'r', encoding='utf-8') as f: |
| audit = json.load(f) |
| else: |
| audit = data.get('audit', {}) |
| |
| |
| results = meta_generator.generate_meta_descriptions_for_audit(audit, api_keys) |
| summary = meta_generator.get_meta_description_recommendations(audit) |
| |
| return { |
| 'ok': True, |
| 'results': results, |
| 'summary': summary |
| } |
| |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.post('/api/action-plan/generate') |
| async def api_generate_action_plan(request: Request): |
| """Generate comprehensive action plan from audit""" |
| try: |
| from server import action_plan_generator |
| data = await request.json() |
| |
| job_id = data.get('job_id') |
| |
| if job_id: |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400) |
| |
| audit_path = Path(result_path) / 'audit.json' |
| analysis_path = Path(result_path) / 'analysis.json' |
| |
| if not audit_path.exists(): |
| return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404) |
| |
| with open(audit_path, 'r', encoding='utf-8') as f: |
| audit = json.load(f) |
| |
| |
| if analysis_path.exists(): |
| with open(analysis_path, 'r', encoding='utf-8') as f: |
| analysis = json.load(f) |
| audit['geo_score'] = analysis.get('geo_score', {}).get('score', 0) |
| audit['ai_visibility'] = audit.get('ai_visibility', {}) |
| else: |
| audit = data.get('audit', {}) |
| |
| |
| result = action_plan_generator.generate_action_plan_from_audit(audit) |
| |
| return result |
| |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/serp/analyze') |
| async def api_serp_analyze(url: str, keywords: str = None, country: str = 'sa'): |
| """Analyze SERP rankings for a URL""" |
| try: |
| from server import serp_analyzer |
| |
| keyword_list = keywords.split(',') if keywords else None |
| result = serp_analyzer.get_serp_data(url, keyword_list, country) |
| |
| return result |
| |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/serp/features') |
| async def api_serp_features(keyword: str, country: str = 'sa'): |
| """Analyze SERP features for a keyword""" |
| try: |
| from server import serp_analyzer |
| result = serp_analyzer.analyze_serp_features(keyword, country) |
| return result |
| except Exception as e: |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| @app.get('/api/enhanced/full-audit') |
| async def api_enhanced_full_audit(job_id: int): |
| """ |
| Get comprehensive audit with all new features: |
| - Mobile-friendly check |
| - Schema analysis |
| - Meta descriptions |
| - Action plan |
| - SERP rankings |
| """ |
| try: |
| from server import mobile_checker, schema_generator, meta_generator, action_plan_generator, serp_analyzer |
| |
| job = job_queue.get_job(job_id) |
| if not job: |
| return JSONResponse({'ok': False, 'error': 'job not found'}, status_code=404) |
| |
| result_path = job.get('result_path') |
| if not result_path: |
| return JSONResponse({'ok': False, 'error': 'job not ready'}, status_code=400) |
| |
| audit_path = Path(result_path) / 'audit.json' |
| if not audit_path.exists(): |
| return JSONResponse({'ok': False, 'error': 'audit not found'}, status_code=404) |
| |
| with open(audit_path, 'r', encoding='utf-8') as f: |
| audit = json.load(f) |
| |
| url = job.get('url', audit.get('url', '')) |
| |
| |
| mobile_result = mobile_checker.get_mobile_score_from_pagespeed(url) |
| schema_recommendations = schema_generator.get_schema_recommendations(audit) |
| meta_summary = meta_generator.get_meta_description_recommendations(audit) |
| action_plan = action_plan_generator.generate_action_plan_from_audit(audit) |
| |
| |
| from server import keyword_engine |
| keywords_data = keyword_engine.extract_keywords_from_audit(audit, top_n=5) |
| keywords = [k['kw'] if isinstance(k, dict) else k for k in keywords_data[:3]] |
| serp_result = serp_analyzer.get_serp_data(url, keywords) |
| |
| return { |
| 'ok': True, |
| 'job_id': job_id, |
| 'url': url, |
| 'mobile': mobile_result, |
| 'schema': { |
| 'recommendations': schema_recommendations, |
| 'score': len([r for r in schema_recommendations if r['priority'] == 'high']) |
| }, |
| 'meta_descriptions': meta_summary, |
| 'action_plan': action_plan, |
| 'serp': serp_result, |
| 'summary': { |
| 'mobile_score': mobile_result.get('score', 0), |
| 'meta_score': meta_summary.get('score', 0), |
| 'total_actions': action_plan.get('summary', {}).get('total', 0), |
| 'critical_actions': action_plan.get('summary', {}).get('critical', 0), |
| 'serp_found': serp_result.get('summary', {}).get('found_in_serp', 0) if serp_result.get('ok') else 0 |
| } |
| } |
| |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return JSONResponse({'ok': False, 'error': str(e)}, status_code=500) |
|
|
|
|
| print("✅ Enhanced features API endpoints loaded successfully") |
|
|