|
from typing import Dict, Optional, Tuple |
|
from pathlib import Path |
|
import tensorflow as tf |
|
import os |
|
import subprocess |
|
from datetime import datetime |
|
from logger_config import config_logger |
|
|
|
logger = config_logger(__name__) |
|
|
|
class EnvironmentSetup: |
|
def __init__(self): |
|
self.device_type, self.strategy = self.setup_devices() |
|
self.cache_dir = None |
|
|
|
def initialize(self, cache_dir: Optional[Path] = None): |
|
self.cache_dir = self.setup_model_cache(cache_dir) |
|
self.training_dirs = self.setup_training_directories() |
|
|
|
@staticmethod |
|
def setup_model_cache(cache_dir: Optional[Path] = None) -> Path: |
|
"""Setup and manage model cache directory.""" |
|
if cache_dir is None: |
|
cache_dir = Path.home() / '.chatbot_cache' |
|
|
|
cache_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
os.environ['TRANSFORMERS_CACHE'] = str(cache_dir / 'transformers') |
|
os.environ['TORCH_HOME'] = str(cache_dir / 'torch') |
|
os.environ['HF_HOME'] = str(cache_dir / 'huggingface') |
|
|
|
logger.info(f"Using cache directory: {cache_dir}") |
|
return cache_dir |
|
|
|
@staticmethod |
|
def setup_training_directories(base_dir: str = "chatbot_training") -> Dict[str, Path]: |
|
"""Setup directory structure for training artifacts.""" |
|
base_dir = Path(base_dir) |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
train_dir = base_dir / f"training_run_{timestamp}" |
|
|
|
directories = { |
|
'base': train_dir, |
|
'checkpoints': train_dir / 'checkpoints', |
|
'plots': train_dir / 'plots', |
|
'logs': train_dir / 'logs' |
|
} |
|
|
|
|
|
for dir_path in directories.values(): |
|
dir_path.mkdir(parents=True, exist_ok=True) |
|
|
|
return directories |
|
|
|
@staticmethod |
|
def is_colab() -> bool: |
|
"""Check if code is running in Google Colab.""" |
|
try: |
|
|
|
import google.colab |
|
import IPython |
|
return True |
|
except (ImportError, AttributeError): |
|
return False |
|
|
|
def setup_colab_tpu(self) -> Optional[tf.distribute.Strategy]: |
|
"""Setup TPU in Colab environment if available.""" |
|
if not self.is_colab(): |
|
return None |
|
|
|
try: |
|
import requests |
|
import os |
|
|
|
|
|
if 'COLAB_TPU_ADDR' not in os.environ: |
|
return None |
|
|
|
|
|
tpu_address = 'grpc://' + os.environ['COLAB_TPU_ADDR'] |
|
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=tpu_address) |
|
tf.config.experimental_connect_to_cluster(resolver) |
|
tf.tpu.experimental.initialize_tpu_system(resolver) |
|
strategy = tf.distribute.TPUStrategy(resolver) |
|
|
|
return strategy |
|
except Exception as e: |
|
logger.warning(f"Failed to initialize Colab TPU: {e}") |
|
return None |
|
|
|
def setup_devices(self) -> Tuple[str, tf.distribute.Strategy]: |
|
"""Configure available compute devices with Colab optimizations.""" |
|
logger.info("Checking available compute devices...") |
|
|
|
|
|
if self.is_colab(): |
|
logger.info("Running in Google Colab environment") |
|
|
|
|
|
tpu_strategy = self.setup_colab_tpu() |
|
if tpu_strategy is not None: |
|
logger.info("Colab TPU detected and initialized") |
|
return "TPU", tpu_strategy |
|
|
|
|
|
gpus = tf.config.list_physical_devices('GPU') |
|
if gpus: |
|
try: |
|
|
|
for gpu in gpus: |
|
tf.config.experimental.set_memory_growth(gpu, True) |
|
|
|
|
|
try: |
|
gpu_name = subprocess.check_output( |
|
['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'], |
|
stderr=subprocess.DEVNULL |
|
).decode('utf-8').strip() |
|
logger.info(f"Colab GPU detected: {gpu_name}") |
|
|
|
except (subprocess.SubprocessError, FileNotFoundError): |
|
logger.warning("Could not detect specific GPU model") |
|
|
|
strategy = tf.distribute.OneDeviceStrategy("/GPU:0") |
|
return "GPU", strategy |
|
|
|
except Exception as e: |
|
logger.error(f"Error configuring Colab GPU: {str(e)}") |
|
|
|
|
|
else: |
|
|
|
try: |
|
resolver = tf.distribute.cluster_resolver.TPUClusterResolver() |
|
tf.config.experimental_connect_to_cluster(resolver) |
|
tf.tpu.experimental.initialize_tpu_system(resolver) |
|
strategy = tf.distribute.TPUStrategy(resolver) |
|
logger.info("TPU detected and initialized") |
|
return "TPU", strategy |
|
except ValueError: |
|
logger.info("No TPU detected. Checking for GPUs...") |
|
|
|
|
|
gpus = tf.config.list_physical_devices('GPU') |
|
if gpus: |
|
try: |
|
for gpu in gpus: |
|
tf.config.experimental.set_memory_growth(gpu, True) |
|
|
|
if len(gpus) > 1: |
|
strategy = tf.distribute.MirroredStrategy() |
|
logger.info(f"Multi-GPU strategy set up with {len(gpus)} GPUs") |
|
else: |
|
strategy = tf.distribute.OneDeviceStrategy("/GPU:0") |
|
logger.info("Single GPU strategy set up") |
|
|
|
return "GPU", strategy |
|
|
|
except Exception as e: |
|
logger.error(f"Error configuring GPU: {str(e)}") |
|
|
|
|
|
strategy = tf.distribute.OneDeviceStrategy("/CPU:0") |
|
logger.info("Using CPU strategy") |
|
return "CPU", strategy |
|
|
|
def optimize_batch_size(self, base_batch_size: int = 16) -> int: |
|
"""Colab-specific optimizations for training.""" |
|
if not self.is_colab(): |
|
return base_batch_size |
|
|
|
|
|
if self.device_type == "GPU": |
|
try: |
|
gpu_name = subprocess.check_output( |
|
['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'], |
|
stderr=subprocess.DEVNULL |
|
).decode('utf-8').strip() |
|
|
|
if "A100" in gpu_name: |
|
logger.info("Optimizing for Colab A100 GPU") |
|
base_batch_size = min(base_batch_size * 8, 64) |
|
elif "T4" in gpu_name: |
|
logger.info("Optimizing for Colab T4 GPU") |
|
base_batch_size = min(base_batch_size * 2, 32) |
|
elif "V100" in gpu_name: |
|
logger.info("Optimizing for Colab V100 GPU") |
|
base_batch_size = min(base_batch_size * 3, 48) |
|
except (subprocess.SubprocessError, FileNotFoundError): |
|
logger.warning("Could not detect specific GPU model, using default settings") |
|
|
|
elif self.device_type == "TPU": |
|
|
|
base_batch_size = min(base_batch_size * 4, 64) |
|
logger.info("Optimizing for Colab TPU") |
|
|
|
logger.info(f"Optimized batch size for Colab: {base_batch_size}") |
|
return base_batch_size |
|
|