OpenTriage_AI / spark_manager.py
KrishnaCosmic's picture
deployment
ce673e5
"""
Spark Session Manager for OpenTriage AI Engine.
Provides a singleton Spark session for distributed processing.
"""
import os
import logging
from pyspark.sql import SparkSession
from config.settings import settings
logger = logging.getLogger(__name__)
_spark_session = None
def get_or_create_spark_session() -> SparkSession:
"""
Get or create a Spark session.
Returns:
SparkSession: The Spark session instance
"""
global _spark_session
if _spark_session is not None:
return _spark_session
try:
builder = SparkSession.builder \
.appName(settings.SPARK_APP_NAME) \
.master(settings.SPARK_MASTER)
# Configure memory if specified
if settings.SPARK_DRIVER_MEMORY:
builder = builder.config("spark.driver.memory", settings.SPARK_DRIVER_MEMORY)
if settings.SPARK_EXECUTOR_MEMORY:
builder = builder.config("spark.executor.memory", settings.SPARK_EXECUTOR_MEMORY)
# Reduce logging verbosity
builder = builder.config("spark.ui.showConsoleProgress", "false")
_spark_session = builder.getOrCreate()
# Set log level
_spark_session.sparkContext.setLogLevel(settings.SPARK_LOG_LEVEL)
logger.info(f"Spark session created: {settings.SPARK_APP_NAME}")
return _spark_session
except Exception as e:
logger.error(f"Failed to create Spark session: {e}")
raise
def stop_spark_session():
"""Stop the Spark session if running."""
global _spark_session
if _spark_session is not None:
_spark_session.stop()
_spark_session = None
logger.info("Spark session stopped")