Spaces:
Sleeping
Sleeping
| """ | |
| Spark Session Manager for OpenTriage AI Engine. | |
| Provides a singleton Spark session for distributed processing. | |
| """ | |
| import os | |
| import logging | |
| from pyspark.sql import SparkSession | |
| from config.settings import settings | |
| logger = logging.getLogger(__name__) | |
| _spark_session = None | |
| def get_or_create_spark_session() -> SparkSession: | |
| """ | |
| Get or create a Spark session. | |
| Returns: | |
| SparkSession: The Spark session instance | |
| """ | |
| global _spark_session | |
| if _spark_session is not None: | |
| return _spark_session | |
| try: | |
| builder = SparkSession.builder \ | |
| .appName(settings.SPARK_APP_NAME) \ | |
| .master(settings.SPARK_MASTER) | |
| # Configure memory if specified | |
| if settings.SPARK_DRIVER_MEMORY: | |
| builder = builder.config("spark.driver.memory", settings.SPARK_DRIVER_MEMORY) | |
| if settings.SPARK_EXECUTOR_MEMORY: | |
| builder = builder.config("spark.executor.memory", settings.SPARK_EXECUTOR_MEMORY) | |
| # Reduce logging verbosity | |
| builder = builder.config("spark.ui.showConsoleProgress", "false") | |
| _spark_session = builder.getOrCreate() | |
| # Set log level | |
| _spark_session.sparkContext.setLogLevel(settings.SPARK_LOG_LEVEL) | |
| logger.info(f"Spark session created: {settings.SPARK_APP_NAME}") | |
| return _spark_session | |
| except Exception as e: | |
| logger.error(f"Failed to create Spark session: {e}") | |
| raise | |
| def stop_spark_session(): | |
| """Stop the Spark session if running.""" | |
| global _spark_session | |
| if _spark_session is not None: | |
| _spark_session.stop() | |
| _spark_session = None | |
| logger.info("Spark session stopped") | |