|
|
""" |
|
|
RQ Worker Entry Point |
|
|
""" |
|
|
import sys |
|
|
import os |
|
|
from pathlib import Path |
|
|
from loguru import logger |
|
|
from redis import Redis |
|
|
from rq import Worker, Queue |
|
|
|
|
|
from app.config import settings |
|
|
from app.core.redis_client import get_redis_client |
|
|
|
|
|
|
|
|
|
|
|
logger.remove() |
|
|
logger.add( |
|
|
sys.stdout, |
|
|
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan> - <level>{message}</level>", |
|
|
level=settings.LOG_LEVEL |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
log_dir = Path("logs") |
|
|
log_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
logger.add( |
|
|
"logs/swara_worker_{time:YYYY-MM-DD}.log", |
|
|
rotation="1 day", |
|
|
retention="7 days", |
|
|
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function} - {message}", |
|
|
level=settings.LOG_LEVEL |
|
|
) |
|
|
except (PermissionError, OSError) as e: |
|
|
|
|
|
logger.warning(f"Cannot create log file: {e}. Using stdout only.") |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Start RQ worker""" |
|
|
logger.info("=" * 70) |
|
|
logger.info("π§ SWARA Worker Starting...") |
|
|
logger.info("=" * 70) |
|
|
logger.info(f"Environment: {settings.ENV}") |
|
|
logger.info(f"Redis URL: {settings.REDIS_URL}") |
|
|
logger.info(f"Queue Name: {settings.TASK_QUEUE_NAME}") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
logger.info("Creating worker Redis connection...") |
|
|
redis_conn = Redis.from_url( |
|
|
settings.REDIS_URL, |
|
|
decode_responses=False, |
|
|
max_connections=10, |
|
|
socket_connect_timeout=30, |
|
|
socket_timeout=30, |
|
|
socket_keepalive=True, |
|
|
health_check_interval=30 |
|
|
) |
|
|
redis_conn.ping() |
|
|
logger.info("β Redis connected") |
|
|
|
|
|
|
|
|
queue = Queue(settings.TASK_QUEUE_NAME, connection=redis_conn) |
|
|
logger.info(f"β Queue '{settings.TASK_QUEUE_NAME}' initialized") |
|
|
|
|
|
logger.info("=" * 70) |
|
|
logger.info("β Worker ready and listening for tasks...") |
|
|
logger.info("=" * 70) |
|
|
|
|
|
|
|
|
worker = Worker([queue], connection=redis_conn) |
|
|
worker.work(with_scheduler=True) |
|
|
|
|
|
except KeyboardInterrupt: |
|
|
logger.info("\nπ Worker interrupted by user") |
|
|
except Exception as e: |
|
|
logger.error(f"β Worker failed: {e}") |
|
|
sys.exit(1) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|