1proxy / app /admin /scraping_admin.py
paijo77's picture
update app/admin/scraping_admin.py
91310c5 verified
"""
Admin Panel untuk Enhanced Scraping Configuration
Panel admin lengkap untuk mengelola konfigurasi scraping module,
monitoring statistik, dan mengatur scraping operations.
"""
import asyncio
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, BackgroundTasks, HTTPException, Query, Request
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from pydantic import BaseModel, Field
from fastapi import BackgroundTasks
from app.database import get_db
from app.dependencies import require_admin
from app.db_storage_extended import extended_db_storage
from app.db_models import ProxySource, User, ValidationHistory, ScrapingSession
from app.grabber.scraping_enhancements import (
EnhancedScrapingService,
ScrapingEnhancementConfig,
)
from app.grabber.scraping_utils import (
RequestQueue,
RateLimiter,
ExponentialBackoff,
ProxyRotator,
calculate_proxy_score,
format_bytes,
generate_session_id,
)
router = APIRouter(
prefix="/api/v1/admin/scraping",
tags=["admin", "scraping"],
dependencies=[Depends(require_admin), Depends(get_db)],
)
# Pydantic models untuk admin panel
class ScrapingConfigResponse(BaseModel):
"""Response model untuk scraping configuration"""
global_config: Dict[str, Any]
module_configs: Dict[str, Any]
active_sessions: List[Dict[str, Any]]
rate_limiter_status: Dict[str, Any]
performance_stats: Dict[str, Any]
performance_stats: Dict[str, Any]
class ScrapingConfigRequest(BaseModel):
"""Request model untuk memperbarui konfigurasi"""
module_name: str
settings: Dict[str, Any]
class SessionResponse(BaseModel):
"""Response model untuk session scraping"""
session_id: str
start_time: datetime
end_time: Optional[datetime]
duration: Optional[float]
requests_made: int
successful_requests: int
failed_requests: int
success_rate: float
total_data_bytes: int
avg_response_time: float
proxies_used: List[str]
proxies_tested: int
class SessionStatsResponse(BaseModel):
"""Response model untuk statistik session"""
total_sessions: int
active_sessions: int
total_requests: int
successful_requests: int
avg_session_duration: float
total_data_bytes: int
success_rate: float
class ProxySourceManagementResponse(BaseModel):
"""Response model untuk manajemen proxy sources"""
sources: List[Dict[str, Any]]
total: int
pending_approval: int
auto_discovered: int
class AdvancedScrapingRequest(BaseModel):
"""Request model untuk advanced scraping configuration"""
enable_scheduler: bool = False
schedule: Optional[Dict[str, Any]] = None
enable_proxy_testing: bool = False
proxy_rotation_enabled: bool = False
max_proxies_per_source: Optional[int] = None
test_urls: Optional[List[str]] = None
# Background tasks
background_tasks = BackgroundTasks()
# Global scraping service instance
enhanced_service: Optional[EnhancedScrapingService] = None
def get_enhanced_service() -> EnhancedScrapingService:
"""Get atau buat enhanced scraping service instance"""
global enhanced_service
if not enhanced_service:
default_config = ScrapingEnhancementConfig(
enable_proxy_rotation=True,
max_concurrent_requests=10,
rate_limit_per_second=5,
enable_retry=True,
max_retries=3,
timeout_seconds=30,
)
enhanced_service = EnhancedScrapingService(config=default_config)
return enhanced_service
# Global session manager
active_sessions: Dict[str, ScrapingSession] = {}
@router.get("/scraping/config", response_model=ScrapingConfigResponse)
async def get_scraping_config() -> ScrapingConfigResponse:
"""Dapatkan konfigurasi scraping global dan per-module"""
service = get_enhanced_service()
# Get global config
global_config = service.config_manager.get_global_config()
# Get module configs
module_configs = {}
for module_name in ["github_grabber", "subscription_grabber", "advanced_grabber"]:
module_configs[module_name] = service.config_manager.get_config(module_name)
# Get active sessions
global_sessions = {
session_id: session_data.session_id
for session_id, session_data in active_sessions.items()
}
# Get rate limiter status
rate_limiter_status = service.config_manager.config.get("global", {})
return ScrapingConfigResponse(
global_config=global_config,
module_configs=module_configs,
active_sessions=global_sessions,
rate_limiter_status=rate_limiter_status,
performance_stats=service.performance_monitor.get_overall_stats()
if service.performance_monitor
else {},
)
@router.post("/scraping/config/{module_name}", response_model=dict)
async def update_scraping_config(
request: ScrapingConfigRequest,
module_name: str,
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
"""Perbarui konfigurasi scraping untuk module spesifik"""
service = get_enhanced_service()
# Validate request
if not module_name or module_name not in [
"github_grabber",
"subscription_grabber",
"advanced_grabber",
]:
raise HTTPException(status_code=400, detail="Invalid module name")
# Validate settings
for key, value in request.settings.items():
if key not in [
"max_concurrent_requests",
"default_timeout",
"max_retries",
"retry_delay",
"github_timeout",
"github_max_retries",
"subscription_timeout",
"subscription_max_retries",
"enable_base64_padding_fix",
"supported_formats",
"enable_batching",
"batch_size",
"min_proxy_quality",
"enable_duplicate_filtering",
"enable_user_agent_rotation",
"enable_proxy_rotation",
"max_proxies_per_source",
"success_rate_threshold",
]:
raise HTTPException(status_code=400, detail=f"Invalid setting: {key}")
try:
# Update konfigurasi
success = await service.config_manager.update_config(
module_name, request.settings
)
if not success:
raise HTTPException(
status_code=500, detail=f"Gagal memperbarui konfigurasi {module_name}"
)
return {
"message": f"Konfigurasi {module_name} berhasil diperbarui",
"success": True,
}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error memperbarui konfigurasi: {str(e)}"
)
@router.post("/scraping/start-session", response_model=dict)
async def start_scraping_session(
request: Dict[str, Any],
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
"""Memulai sesi scraping baru"""
service = get_enhanced_service()
# Dapatkan konfigurasi
module_configs = {}
for module_name in ["github_grabber", "subscription_grabber", "advanced_grabber"]:
module_configs[module_name] = service.config_manager.get_config(module_name)
# Validasi input
source_config = request.get("source_config", {})
if not source_config:
raise HTTPException(status_code=400, detail="Source config required")
# Create session ID
session_id = generate_session_id()
session = ScrapingSession(session_id=session_id, start_time=datetime.now())
# Create scraping session
task_id = background_tasks.add_task(
service.run_scraping_session(
session=session, source_config=source_config, module_configs=module_configs
),
name=f"scraping-session-{session_id[:8]}",
)
active_sessions[session_id] = session
return {
"message": f"Sesi scraping dimulai dengan ID: {session_id}",
"session_id": session_id,
"task_id": task_id,
}
@router.get("/scraping/sessions/{session_id}", response_model=SessionResponse)
async def get_scraping_session(
session_id: str,
db: AsyncSession = Depends(get_db),
) -> SessionResponse:
"""Dapatkan detail sesi scraping"""
service = get_enhanced_service()
if session_id not in active_sessions:
raise HTTPException(status_code=404, detail="Session not found")
session = active_sessions[session_id]
return SessionResponse(
session_id=session_id,
start_time=session.start_time.isoformat(),
end_time=session.end_time.isoformat() if session.end_time else None,
duration=(session.end_time - session.start_time).total_seconds()
if session.end_time
else None,
requests_made=session.requests_made,
successful_requests=session.successful_requests,
failed_requests=session.failed_requests,
success_rate=session.success_rate,
total_data_bytes=session.total_data_bytes,
avg_response_time=session.avg_response_time,
proxies_used=session.proxies_used,
)
@router.post("/scraping/sessions/{session_id}/stop", response_model=dict)
async def stop_scraping_session(
session_id: str,
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
"""Hentikan sesi scraping"""
service = get_enhanced_service()
if session_id not in active_sessions:
raise HTTPException(status_code=404, detail="Session not found")
session = active_sessions[session_id]
# Simpanikan background task
task_id = session.task_id
if task_id and background_tasks:
background_tasks.cancel_task(task_id)
# Mark session as ended
session.end_time = datetime.now()
# Update session data
success = await extended_db_storage.update_scraping_session(
session=db,
session_id=session.session_id,
status="completed",
proxies_found=len(session.proxies_used),
proxies_valid=session.proxies_tested,
)
# Remove dari aktif
del active_sessions[session_id]
return {"message": f"Sesi {session_id} dihentikan", "session_id": session_id}
@router.get("/scraping/stats/overview", response_model=dict)
async def get_scraping_overview(db: AsyncSession = Depends(get_db)) -> Dict[str, Any]:
"""Dapatkan statistik overview scraping"""
service = get_enhanced_service()
# Dapatkan statistik dari performance monitor
overview_stats = service.performance_monitor.get_overall_stats()
# Dapatkan statistik proxy sources
total_sources = await db_storage.count_sources(db)
hunter_stats = await extended_db_storage.get_hunter_statistics(db)
pending_approval = hunter_stats.get("total_candidates", 0)
total_sessions = 0
total_proxies = await db_storage.count_proxies(db)
validated_proxies = await db_storage.count_proxies(
db, validation_status="validated"
)
return {
"proxy_sources": {
"total": total_sources,
"auto_discovered": pending_approval,
"manual": total_sources - pending_approval,
},
"sessions": {
"total": total_sessions,
"active": active_sessions,
"completed": total_sessions - active_sessions,
},
"validation": {
"total_proxies": total_proxies,
"validated": validated_proxies,
"validation_rate": (validated_proxies / total_proxies) * 100
if total_proxies > 0
else 0,
},
"performance": overview_stats,
}
@router.get(
"/scraping/stats/sessions/{session_id}", response_model=SessionStatsResponse
)
async def get_session_stats(session_id: str) -> SessionStatsResponse:
"""Dapatkan statistik detail sesi spesifik"""
service = get_enhanced_service()
if session_id not in active_sessions:
raise HTTPException(status_code=404, detail="Session not found")
session = active_sessions[session_id]
# Calculate detailed statistics
# Implementasi...
session_start = session.start_time
session_end = session.end_time or datetime.now()
return SessionStatsResponse(
session_id=session_id,
start_time=session_start.isoformat(),
end_time=session_end.isoformat(),
duration=(session_end - session_start).total_seconds(),
requests_made=session.requests_made,
successful_requests=session.successful_requests,
failed_requests=session.failed_requests,
success_rate=session.success_rate,
total_data_bytes=session.total_data_bytes,
avg_response_time=session.avg_response_time,
proxies_used=len(session.proxies_used) if session.proxies_used else 0,
)
@router.get("/scraping/proxy-sources", response_model=ProxySourceManagementResponse)
async def get_proxy_sources(
db: AsyncSession = Depends(get_db),
status: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> ProxySourceManagementResponse:
"""Dapatkan daftar proxy sources dengan management tools"""
# Get proxy sources
sources = await db_storage.get_sources(db, enabled_only=(status == "enabled"))
if status == "pending":
hunter_stats = await extended_db_storage.get_hunter_statistics(db)
pending_approval = (
hunter_stats.get("status_stats", {}).get("pending", {}).get("count", 0)
)
else:
pending_approval = 0
return ProxySourceManagementResponse(
sources=[
{
"id": source.id,
"url": source.url,
"type": source.type.value,
"name": source.name,
"description": source.description,
"enabled": source.enabled,
"validated": source.validated,
"validation_error": source.validation_error,
"total_scraped": source.total_scraped,
"success_rate": source.success_rate,
"created_at": source.created_at.isoformat(),
"updated_at": source.updated_at.isoformat(),
"is_admin_source": source.is_admin_source,
"candidate_id": source.candidate_id,
}
for source in sources
],
total=len(sources),
pending_approval=pending_approval,
)
@router.post("/scraping/proxy-sources", response_model=dict)
async def create_proxy_source(
request: Dict[str, Any],
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
"""Tambah proxy source baru"""
service = get_enhanced_service()
try:
# Validasi input
if not request.get("url"):
raise HTTPException(status_code=400, detail="URL is required")
url = request.get("url")
if not url.startswith(("http://", "https://")):
raise HTTPException(
status_code=400, detail="URL must start with http:// or https://"
)
# Validasi URL
parsed_url = urlparse(url)
domain = parsed_url.netloc
# Cek domain yang diblokir
blocked_domains = ["example.com", "test.com"]
if any(blocked in domain for blocked in blocked_domains):
raise HTTPException(status_code=400, detail=f"Domain {domain} is blocked")
source = ProxySource(
url=url,
type=SourceType.MANUAL,
name=request.get("name", f"Source-{domain}"),
description=request.get("description", ""),
enabled=True,
is_admin_source=False,
)
# Simpankan dengan database
await db_storage.create_proxy_source(db, source)
return {"message": "Proxy source created successfully", "source_id": source.id}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error creating proxy source: {str(e)}"
)
@router.put("/scraping/proxy-sources/{source_id}", response_model=dict)
async def update_proxy_source(
source_id: int,
request: Dict[str, Any],
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
"""Update proxy source"""
service = get_enhanced_service()
# Get existing source
stmt = select(ProxySource).where(ProxySource.id == source_id)
result = await db.execute(stmt)
source = result.scalar_one_or_none()
if not source:
raise HTTPException(status_code=404, detail="Source not found")
# Update fields
for field in ["name", "description", "enabled", "validated"]:
if field in request:
setattr(source, field, request[field])
try:
session.add(source)
await session.commit()
return {
"message": f"Proxy source {source_id} updated successfully",
"source_id": source.id,
}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error updating proxy source: {str(e)}"
)
@router.delete("/scraping/proxy-sources/{source_id}", response_model=dict)
async def delete_proxy_source(
source_id: int,
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
"""Hapus proxy source"""
service = get_enhanced_service()
# Get existing source
stmt = select(ProxySource).where(ProxySource.id == source_id)
result = await db.execute(stmt)
source = result.scalar_one_or_none()
if not source:
raise HTTPException(status_code=404, detail="Source not found")
try:
await session.delete(source)
await session.commit()
return {
"message": f"Proxy source {source_id} deleted successfully",
"source_id": source_id,
}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error deleting proxy source: {str(e)}"
)
@router.post("/scraping/proxy-sources/{source_id}/validate", response_model=dict)
async def validate_proxy_source(
source_id: int,
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
"""Validasi proxy source"""
service = get_enhanced_service()
# Get existing source
stmt = select(ProxySource).where(ProxySource.id == source_id)
result = await db.execute(stmt)
source = result.scalar_one_or_none()
if not source:
raise HTTPException(status_code=404, detail="Source not found")
try:
# Trigger validation
task_id = background_tasks.add_task(
service.validate_proxy_source(db=db, source_id=source_id),
name=f"validate-source-{source_id}",
)
return {
"message": f"Validation started for proxy source {source_id}",
"task_id": task_id,
}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error starting validation: {str(e)}"
)
@router.get("/scraping/hunter", response_model=dict)
async def get_hunter_status(db: AsyncSession = Depends(get_db)) -> Dict[str, Any]:
"""Dapatkan status Hunter Protocol"""
service = get_enhanced_service()
# Get Hunter statistics
hunter_stats = await extended_db_storage.get_hunter_statistics(db)
# Get active Hunter tasks
active_tasks = background_tasks.list_tasks()
return {
"active_tasks": active_tasks,
"statistics": hunter_stats,
"candidates_found": hunter_stats.get("candidates_found", 0),
"sources_approved": hunter_stats.get("sources_approved", 0),
"approval_rate": hunter_stats.get("approval_rate", 0.0),
"last_run": hunter_stats.get("last_run"),
}
@router.post("/scraping/hunter/trigger", response_model=dict)
async def trigger_hunter_manual(
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
"""Manual trigger Hunter Protocol"""
service = get_enhanced_service()
try:
task_id = background_tasks.add_task(
service.run_hunter(),
name=f"hunter-manual-trigger-{datetime.now().isoformat()}",
)
return {"message": "Hunter Protocol triggered manually", "task_id": task_id}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error triggering Hunter: {str(e)}"
)
@router.get("/scraping/queue", response_model=dict)
async def get_request_queue(db: AsyncSession = Depends(get_db)) -> Dict[str, Any]:
"""Dapatkan antrian request queue"""
service = get_enhanced_service()
queue = service.request_queue
return {
"queue_size": queue.size(),
"active_requests": len(queue.active_requests),
"completed_requests": len(queue.completed_requests),
"failed_requests": len(queue.failed_requests),
"global_requests": queue.global_requests,
"rate_limiter_stats": {
"domain_requests": queue.limiter.domain_requests,
"ip_requests": queue.limiter.ip_requests,
"global_rate_limiting": queue.limiter.global_rate_limiting,
},
}
@router.post("/scraping/queue/clear", response_model=dict)
async def clear_request_queue(db: AsyncSession = Depends(get_db)) -> Dict[str, Any]:
"""Bersihkan antrian request queue"""
service = get_enhanced_service()
try:
await service.request_queue.clear()
return {"message": "Request queue cleared successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error clearing queue: {str(e)}")
@router.get("/scraping/advanced-config", response_model=AdvancedScrapingRequest)
async def get_advanced_config() -> AdvancedScrapingRequest:
"""Dapatkan konfigurasi advanced scraping"""
service = get_enhanced_service()
config = service.config_manager.get_global_config()
advanced_config = config.get("enhanced", {})
return AdvancedScrapingRequest(
enable_scheduler=advanced_config.get("enable_scheduler", False),
schedule=advanced_config.get("schedule"),
enable_proxy_testing=advanced_config.get("enable_proxy_testing", False),
proxy_rotation_enabled=advanced_config.get("enable_proxy_rotation", False),
max_proxies_per_source=advanced_config.get("max_proxies_per_source"),
test_urls=advanced_config.get("test_urls", []),
)
@router.post("/scraping/advanced-config", response_model=dict)
async def update_advanced_config(
request: AdvancedScrapingRequest,
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
"""Perbarui konfigurasi advanced scraping"""
service = get_enhanced_service()
try:
success = await service.config_manager.update_config(
"enhanced", request.settings
)
if not success:
raise HTTPException(
status_code=500, detail=f"Gagal memperbarui konfigurasi advanced"
)
return {"message": "Konfigurasi advanced berhasil diperbarui", "success": True}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error memperbarui konfigurasi advanced: {str(e)}"
)
# Module: app/db_storage extension
@router.get("/scraping/operations", response_model=dict)
async def get_scraping_operations(db: AsyncSession = Depends(get_db)) -> Dict[str, Any]:
"""Dapatkan operasi scraping yang tersedia"""
operations = {
"validate_proxy_source": "validasi proxy source",
"run_hunter_protocol": "jalankan Hunter Protocol",
"start_scraping_session": "memulai sesi scraping",
"stop_scraping_session": "hentikan sesi scraping",
"manage_proxy_sources": "kelola proxy sources",
"view_scraping_stats": "lihat statistik scraping",
"advanced_configuration": "konfigurasi advanced",
"queue_management": "kelola antrian request queue",
}
return operations
@router.post("/scraping/operations/{operation}", response_model=dict)
async def execute_operation(
operation: str,
params: Dict[str, Any],
db: AsyncSession = Depends(get_db),
) -> Dict[str, Any]:
"""Eksekusi operasi scraping"""
service = get_enhanced_service()
try:
if operation == "validate_proxy_source":
return await validate_proxy_source(
params["source_id"], db, background_tasks
)
elif operation == "run_hunter_protocol":
return await trigger_hunter_manual(db, background_tasks)
elif operation == "start_scraping_session":
return await start_scraping_session(params, db, background_tasks)
elif operation == "stop_scraping_session":
return await stop_scraping_session(
params["session_id"], db, background_tasks
)
elif operation == "clear_request_queue":
return await clear_request_queue(db, background_tasks)
else:
raise HTTPException(
status_code=400, detail=f"Invalid operation: {operation}"
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Error executing operation {operation}: {str(e)}"
)