csc525_retrieval_based_chatbot / environment_setup.py
JoeArmani
restructuring
71ca212
from typing import Dict, Optional, Tuple
from pathlib import Path
import tensorflow as tf
import os
import subprocess
from datetime import datetime
from logger_config import config_logger
logger = config_logger(__name__)
class EnvironmentSetup:
def __init__(self):
self.device_type, self.strategy = self.setup_devices()
self.cache_dir = None
def initialize(self, cache_dir: Optional[Path] = None):
self.cache_dir = self.setup_model_cache(cache_dir)
self.training_dirs = self.setup_training_directories()
@staticmethod
def setup_model_cache(cache_dir: Optional[Path] = None) -> Path:
"""Setup and manage model cache directory."""
if cache_dir is None:
cache_dir = Path.home() / '.chatbot_cache'
cache_dir.mkdir(parents=True, exist_ok=True)
# Set environment variables for various libraries
os.environ['TRANSFORMERS_CACHE'] = str(cache_dir / 'transformers')
os.environ['TORCH_HOME'] = str(cache_dir / 'torch')
os.environ['HF_HOME'] = str(cache_dir / 'huggingface')
logger.info(f"Using cache directory: {cache_dir}")
return cache_dir
@staticmethod
def setup_training_directories(base_dir: str = "chatbot_training") -> Dict[str, Path]:
"""Setup directory structure for training artifacts."""
base_dir = Path(base_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
train_dir = base_dir / f"training_run_{timestamp}"
directories = {
'base': train_dir,
'checkpoints': train_dir / 'checkpoints',
'plots': train_dir / 'plots',
'logs': train_dir / 'logs'
}
# Create directories
for dir_path in directories.values():
dir_path.mkdir(parents=True, exist_ok=True)
return directories
@staticmethod
def is_colab() -> bool:
"""Check if code is running in Google Colab."""
try:
# Handle both import and attribute checks
import google.colab # type: ignore
import IPython # type: ignore
return True
except (ImportError, AttributeError):
return False
def setup_colab_tpu(self) -> Optional[tf.distribute.Strategy]:
"""Setup TPU in Colab environment if available."""
if not self.is_colab():
return None
try:
import requests
import os
# Check TPU availability
if 'COLAB_TPU_ADDR' not in os.environ:
return None
# TPU address should be set
tpu_address = 'grpc://' + os.environ['COLAB_TPU_ADDR']
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=tpu_address)
tf.config.experimental_connect_to_cluster(resolver)
tf.tpu.experimental.initialize_tpu_system(resolver)
strategy = tf.distribute.TPUStrategy(resolver)
return strategy
except Exception as e:
logger.warning(f"Failed to initialize Colab TPU: {e}")
return None
def setup_devices(self) -> Tuple[str, tf.distribute.Strategy]:
"""Configure available compute devices with Colab optimizations."""
logger.info("Checking available compute devices...")
# Colab-specific setup
if self.is_colab():
logger.info("Running in Google Colab environment")
# Try TPU first in Colab
tpu_strategy = self.setup_colab_tpu()
if tpu_strategy is not None:
logger.info("Colab TPU detected and initialized")
return "TPU", tpu_strategy
# Colab GPU setup
gpus = tf.config.list_physical_devices('GPU')
if gpus:
try:
# Colab-specific GPU memory management
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# Get GPU info using subprocess
try:
gpu_name = subprocess.check_output(
['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'],
stderr=subprocess.DEVNULL
).decode('utf-8').strip()
logger.info(f"Colab GPU detected: {gpu_name}")
except (subprocess.SubprocessError, FileNotFoundError):
logger.warning("Could not detect specific GPU model")
strategy = tf.distribute.OneDeviceStrategy("/GPU:0")
return "GPU", strategy
except Exception as e:
logger.error(f"Error configuring Colab GPU: {str(e)}")
# Non-Colab setup
else:
# Check for TPU
try:
resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
tf.config.experimental_connect_to_cluster(resolver)
tf.tpu.experimental.initialize_tpu_system(resolver)
strategy = tf.distribute.TPUStrategy(resolver)
logger.info("TPU detected and initialized")
return "TPU", strategy
except ValueError:
logger.info("No TPU detected. Checking for GPUs...")
# Check for GPUs
gpus = tf.config.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if len(gpus) > 1:
strategy = tf.distribute.MirroredStrategy()
logger.info(f"Multi-GPU strategy set up with {len(gpus)} GPUs")
else:
strategy = tf.distribute.OneDeviceStrategy("/GPU:0")
logger.info("Single GPU strategy set up")
return "GPU", strategy
except Exception as e:
logger.error(f"Error configuring GPU: {str(e)}")
# CPU fallback
strategy = tf.distribute.OneDeviceStrategy("/CPU:0")
logger.info("Using CPU strategy")
return "CPU", strategy
def optimize_batch_size(self, base_batch_size: int = 16) -> int:
"""Colab-specific optimizations for training."""
if not self.is_colab():
return base_batch_size
# Colab batch size optimization
if self.device_type == "GPU":
try:
gpu_name = subprocess.check_output(
['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'],
stderr=subprocess.DEVNULL
).decode('utf-8').strip()
if "A100" in gpu_name:
logger.info("Optimizing for Colab A100 GPU")
base_batch_size = min(base_batch_size * 8, 64)
elif "T4" in gpu_name:
logger.info("Optimizing for Colab T4 GPU")
base_batch_size = min(base_batch_size * 2, 32)
elif "V100" in gpu_name:
logger.info("Optimizing for Colab V100 GPU")
base_batch_size = min(base_batch_size * 3, 48)
except (subprocess.SubprocessError, FileNotFoundError):
logger.warning("Could not detect specific GPU model, using default settings")
elif self.device_type == "TPU":
# TPU optimizations
base_batch_size = min(base_batch_size * 4, 64)
logger.info("Optimizing for Colab TPU")
logger.info(f"Optimized batch size for Colab: {base_batch_size}")
return base_batch_size