from typing import Dict, Optional, Tuple from pathlib import Path import tensorflow as tf import os import subprocess from datetime import datetime from logger_config import config_logger logger = config_logger(__name__) class EnvironmentSetup: def __init__(self): self.device_type, self.strategy = self.setup_devices() self.cache_dir = None def initialize(self, cache_dir: Optional[Path] = None): self.cache_dir = self.setup_model_cache(cache_dir) self.training_dirs = self.setup_training_directories() @staticmethod def setup_model_cache(cache_dir: Optional[Path] = None) -> Path: """Setup and manage model cache directory.""" if cache_dir is None: cache_dir = Path.home() / '.chatbot_cache' cache_dir.mkdir(parents=True, exist_ok=True) # Set environment variables for various libraries os.environ['TRANSFORMERS_CACHE'] = str(cache_dir / 'transformers') os.environ['TORCH_HOME'] = str(cache_dir / 'torch') os.environ['HF_HOME'] = str(cache_dir / 'huggingface') logger.info(f"Using cache directory: {cache_dir}") return cache_dir @staticmethod def setup_training_directories(base_dir: str = "chatbot_training") -> Dict[str, Path]: """Setup directory structure for training artifacts.""" base_dir = Path(base_dir) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") train_dir = base_dir / f"training_run_{timestamp}" directories = { 'base': train_dir, 'checkpoints': train_dir / 'checkpoints', 'plots': train_dir / 'plots', 'logs': train_dir / 'logs' } # Create directories for dir_path in directories.values(): dir_path.mkdir(parents=True, exist_ok=True) return directories @staticmethod def is_colab() -> bool: """Check if code is running in Google Colab.""" try: # Handle both import and attribute checks import google.colab # type: ignore import IPython # type: ignore return True except (ImportError, AttributeError): return False def setup_colab_tpu(self) -> Optional[tf.distribute.Strategy]: """Setup TPU in Colab environment if available.""" if not self.is_colab(): return None try: import requests import os # Check TPU availability if 'COLAB_TPU_ADDR' not in os.environ: return None # TPU address should be set tpu_address = 'grpc://' + os.environ['COLAB_TPU_ADDR'] resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=tpu_address) tf.config.experimental_connect_to_cluster(resolver) tf.tpu.experimental.initialize_tpu_system(resolver) strategy = tf.distribute.TPUStrategy(resolver) return strategy except Exception as e: logger.warning(f"Failed to initialize Colab TPU: {e}") return None def setup_devices(self) -> Tuple[str, tf.distribute.Strategy]: """Configure available compute devices with Colab optimizations.""" logger.info("Checking available compute devices...") # Colab-specific setup if self.is_colab(): logger.info("Running in Google Colab environment") # Try TPU first in Colab tpu_strategy = self.setup_colab_tpu() if tpu_strategy is not None: logger.info("Colab TPU detected and initialized") return "TPU", tpu_strategy # Colab GPU setup gpus = tf.config.list_physical_devices('GPU') if gpus: try: # Colab-specific GPU memory management for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) # Get GPU info using subprocess try: gpu_name = subprocess.check_output( ['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'], stderr=subprocess.DEVNULL ).decode('utf-8').strip() logger.info(f"Colab GPU detected: {gpu_name}") except (subprocess.SubprocessError, FileNotFoundError): logger.warning("Could not detect specific GPU model") strategy = tf.distribute.OneDeviceStrategy("/GPU:0") return "GPU", strategy except Exception as e: logger.error(f"Error configuring Colab GPU: {str(e)}") # Non-Colab setup else: # Check for TPU try: resolver = tf.distribute.cluster_resolver.TPUClusterResolver() tf.config.experimental_connect_to_cluster(resolver) tf.tpu.experimental.initialize_tpu_system(resolver) strategy = tf.distribute.TPUStrategy(resolver) logger.info("TPU detected and initialized") return "TPU", strategy except ValueError: logger.info("No TPU detected. Checking for GPUs...") # Check for GPUs gpus = tf.config.list_physical_devices('GPU') if gpus: try: for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if len(gpus) > 1: strategy = tf.distribute.MirroredStrategy() logger.info(f"Multi-GPU strategy set up with {len(gpus)} GPUs") else: strategy = tf.distribute.OneDeviceStrategy("/GPU:0") logger.info("Single GPU strategy set up") return "GPU", strategy except Exception as e: logger.error(f"Error configuring GPU: {str(e)}") # CPU fallback strategy = tf.distribute.OneDeviceStrategy("/CPU:0") logger.info("Using CPU strategy") return "CPU", strategy def optimize_batch_size(self, base_batch_size: int = 16) -> int: """Colab-specific optimizations for training.""" if not self.is_colab(): return base_batch_size # Colab batch size optimization if self.device_type == "GPU": try: gpu_name = subprocess.check_output( ['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'], stderr=subprocess.DEVNULL ).decode('utf-8').strip() if "A100" in gpu_name: logger.info("Optimizing for Colab A100 GPU") base_batch_size = min(base_batch_size * 8, 64) elif "T4" in gpu_name: logger.info("Optimizing for Colab T4 GPU") base_batch_size = min(base_batch_size * 2, 32) elif "V100" in gpu_name: logger.info("Optimizing for Colab V100 GPU") base_batch_size = min(base_batch_size * 3, 48) except (subprocess.SubprocessError, FileNotFoundError): logger.warning("Could not detect specific GPU model, using default settings") elif self.device_type == "TPU": # TPU optimizations base_batch_size = min(base_batch_size * 4, 64) logger.info("Optimizing for Colab TPU") logger.info(f"Optimized batch size for Colab: {base_batch_size}") return base_batch_size