File size: 8,119 Bytes
f7b283c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71ca212
f7b283c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71ca212
f7b283c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71ca212
f7b283c
 
 
71ca212
f7b283c
 
 
 
 
 
 
9decf80
 
71ca212
9decf80
f7b283c
9decf80
f7b283c
 
9decf80
f7b283c
 
 
 
 
9decf80
f7b283c
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
from typing import Dict, Optional, Tuple
from pathlib import Path
import tensorflow as tf
import os
import subprocess
from datetime import datetime
from logger_config import config_logger

logger = config_logger(__name__)

class EnvironmentSetup:
    def __init__(self):
        self.device_type, self.strategy = self.setup_devices()
        self.cache_dir = None
        
    def initialize(self, cache_dir: Optional[Path] = None):
        self.cache_dir = self.setup_model_cache(cache_dir)
        self.training_dirs = self.setup_training_directories()
    
    @staticmethod
    def setup_model_cache(cache_dir: Optional[Path] = None) -> Path:
        """Setup and manage model cache directory."""
        if cache_dir is None:
            cache_dir = Path.home() / '.chatbot_cache'
        
        cache_dir.mkdir(parents=True, exist_ok=True)
        
        # Set environment variables for various libraries
        os.environ['TRANSFORMERS_CACHE'] = str(cache_dir / 'transformers')
        os.environ['TORCH_HOME'] = str(cache_dir / 'torch')
        os.environ['HF_HOME'] = str(cache_dir / 'huggingface')
        
        logger.info(f"Using cache directory: {cache_dir}")
        return cache_dir
    
    @staticmethod
    def setup_training_directories(base_dir: str = "chatbot_training") -> Dict[str, Path]:
        """Setup directory structure for training artifacts."""
        base_dir = Path(base_dir)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        train_dir = base_dir / f"training_run_{timestamp}"
        
        directories = {
            'base': train_dir,
            'checkpoints': train_dir / 'checkpoints',
            'plots': train_dir / 'plots',
            'logs': train_dir / 'logs'
        }
        
        # Create directories
        for dir_path in directories.values():
            dir_path.mkdir(parents=True, exist_ok=True)
        
        return directories

    @staticmethod
    def is_colab() -> bool:
        """Check if code is running in Google Colab."""
        try:
            # Handle both import and attribute checks
            import google.colab  # type: ignore
            import IPython  # type: ignore
            return True
        except (ImportError, AttributeError):
            return False

    def setup_colab_tpu(self) -> Optional[tf.distribute.Strategy]:
        """Setup TPU in Colab environment if available."""
        if not self.is_colab():
            return None
            
        try:
            import requests
            import os
            
            # Check TPU availability
            if 'COLAB_TPU_ADDR' not in os.environ:
                return None
                
            # TPU address should be set
            tpu_address = 'grpc://' + os.environ['COLAB_TPU_ADDR']
            resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu=tpu_address)
            tf.config.experimental_connect_to_cluster(resolver)
            tf.tpu.experimental.initialize_tpu_system(resolver)
            strategy = tf.distribute.TPUStrategy(resolver)
            
            return strategy
        except Exception as e:
            logger.warning(f"Failed to initialize Colab TPU: {e}")
            return None

    def setup_devices(self) -> Tuple[str, tf.distribute.Strategy]:
        """Configure available compute devices with Colab optimizations."""
        logger.info("Checking available compute devices...")
        
        # Colab-specific setup
        if self.is_colab():
            logger.info("Running in Google Colab environment")
            
            # Try TPU first in Colab
            tpu_strategy = self.setup_colab_tpu()
            if tpu_strategy is not None:
                logger.info("Colab TPU detected and initialized")
                return "TPU", tpu_strategy
                
            # Colab GPU setup
            gpus = tf.config.list_physical_devices('GPU')
            if gpus:
                try:
                    # Colab-specific GPU memory management
                    for gpu in gpus:
                        tf.config.experimental.set_memory_growth(gpu, True)
                    
                    # Get GPU info using subprocess
                    try:
                        gpu_name = subprocess.check_output(
                            ['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'],
                            stderr=subprocess.DEVNULL
                        ).decode('utf-8').strip()
                        logger.info(f"Colab GPU detected: {gpu_name}")
                    
                    except (subprocess.SubprocessError, FileNotFoundError):
                        logger.warning("Could not detect specific GPU model")
                    
                    strategy = tf.distribute.OneDeviceStrategy("/GPU:0")
                    return "GPU", strategy
                    
                except Exception as e:
                    logger.error(f"Error configuring Colab GPU: {str(e)}")
        
        # Non-Colab setup
        else:
            # Check for TPU
            try:
                resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
                tf.config.experimental_connect_to_cluster(resolver)
                tf.tpu.experimental.initialize_tpu_system(resolver)
                strategy = tf.distribute.TPUStrategy(resolver)
                logger.info("TPU detected and initialized")
                return "TPU", strategy
            except ValueError:
                logger.info("No TPU detected. Checking for GPUs...")
            
            # Check for GPUs
            gpus = tf.config.list_physical_devices('GPU')
            if gpus:
                try:
                    for gpu in gpus:
                        tf.config.experimental.set_memory_growth(gpu, True)
                    
                    if len(gpus) > 1:
                        strategy = tf.distribute.MirroredStrategy()
                        logger.info(f"Multi-GPU strategy set up with {len(gpus)} GPUs")
                    else:
                        strategy = tf.distribute.OneDeviceStrategy("/GPU:0")
                        logger.info("Single GPU strategy set up")
                    
                    return "GPU", strategy
                    
                except Exception as e:
                    logger.error(f"Error configuring GPU: {str(e)}")
        
        # CPU fallback
        strategy = tf.distribute.OneDeviceStrategy("/CPU:0")
        logger.info("Using CPU strategy")
        return "CPU", strategy

    def optimize_batch_size(self, base_batch_size: int = 16) -> int:
        """Colab-specific optimizations for training."""
        if not self.is_colab():
            return base_batch_size
            
        # Colab batch size optimization
        if self.device_type == "GPU":
            try:
                gpu_name = subprocess.check_output(
                    ['nvidia-smi', '--query-gpu=gpu_name', '--format=csv,noheader'],
                    stderr=subprocess.DEVNULL
                ).decode('utf-8').strip()
                
                if "A100" in gpu_name:
                    logger.info("Optimizing for Colab A100 GPU")
                    base_batch_size = min(base_batch_size * 8, 64)
                elif "T4" in gpu_name:
                    logger.info("Optimizing for Colab T4 GPU")
                    base_batch_size = min(base_batch_size * 2, 32)
                elif "V100" in gpu_name:
                    logger.info("Optimizing for Colab V100 GPU")
                    base_batch_size = min(base_batch_size * 3, 48)
            except (subprocess.SubprocessError, FileNotFoundError):
                logger.warning("Could not detect specific GPU model, using default settings")
                    
        elif self.device_type == "TPU":
            # TPU optimizations
            base_batch_size = min(base_batch_size * 4, 64)
            logger.info("Optimizing for Colab TPU")
            
        logger.info(f"Optimized batch size for Colab: {base_batch_size}")
        return base_batch_size