| | import sentencepiece as spm |
| | import os, json, numpy as np, tensorflow as tf |
| | from tensorflow.keras import layers, Model |
| | import requests |
| | from tensorflow.keras import mixed_precision |
| | import glob |
| |
|
| | tf.get_logger().setLevel("ERROR") |
| | SEED = 42 |
| | tf.random.set_seed(SEED) |
| | np.random.seed(SEED) |
| |
|
| | try: |
| | resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu="local") |
| | tf.tpu.experimental.initialize_tpu_system(resolver) |
| | strategy = tf.distribute.TPUStrategy(resolver) |
| | print("✅ TPU 초기화 완료") |
| | on_tpu = True |
| |
|
| | except Exception as e: |
| | print("⚠️ TPU 미사용, GPU/CPU로 진행:", e) |
| | strategy = tf.distribute.get_strategy() |
| | on_tpu = False |
| |
|
| | policy_name = "mixed_bfloat16" if on_tpu else "float32" |
| | policy = mixed_precision.Policy(policy_name) |
| | mixed_precision.set_global_policy(policy) |
| | print(f"✅ Mixed precision: {policy_name}") |
| |
|
| | def download_file(url, save_path): |
| | if not os.path.exists(save_path): |
| | r = requests.get(url, stream=True) |
| | r.raise_for_status() |
| | with open(save_path, "wb") as f: |
| | for chunk in r.iter_content(8192*2): |
| | f.write(chunk) |
| | print(f"✅ {save_path} 저장됨") |
| |
|
| | TOKENIZER_PATH = "tokenizer.model" |
| | download_file("https://huggingface.co/datasets/OpenLab-NLP/tiny-corpus/resolve/main/tokenizer.model?download=true", TOKENIZER_PATH) |
| | DATA_DIR = "/kaggle/input/lm-pretrain" |
| | FILE_PATTERN = os.path.join(DATA_DIR, "tokenized_variable_part_*.txt") |
| | sp = spm.SentencePieceProcessor(TOKENIZER_PATH) |
| | pad_id = sp.piece_to_id("<pad>") if sp.piece_to_id("<pad>") != -1 else 0 |
| | end_id = sp.piece_to_id("[EOS]") |
| | vocab_size = sp.get_piece_size() |
| |
|
| | max_len = 512 |
| | batch_size = 768 |
| |
|
| | import random |
| |
|
| | def prepare_packed_dataset(file_pattern, max_len, batch_size): |
| | file_list = tf.io.gfile.glob(file_pattern) |
| | |
| | # [수정] 파일 경로 리스트 자체를 무작위로 섞습니다. |
| | # 이렇게 하면 매 에포크마다 파일을 읽는 순서가 달라집니다. |
| | random.shuffle(file_list) |
| | print(f"🔄 파일 로드 순서 섞기 완료 (첫 번째 파일: {file_list[0]})") |
| |
|
| | dataset = tf.data.TextLineDataset(file_list) |
| | |
| | def parse_tokens(line): |
| | return tf.strings.to_number(tf.strings.split(line), tf.int32) |
| |
|
| | dataset = dataset.map(parse_tokens, num_parallel_calls=tf.data.AUTOTUNE) |
| | dataset = dataset.unbatch() |
| | dataset = dataset.batch(max_len + 1, drop_remainder=True) |
| | |
| | def split_input_target(chunk): |
| | return chunk[:-1], chunk[1:] |
| |
|
| | dataset = dataset.map(split_input_target, num_parallel_calls=tf.data.AUTOTUNE) |
| | dataset = dataset.shuffle(20000) |
| | dataset = dataset.batch(batch_size, drop_remainder=True) |
| | return dataset.prefetch(tf.data.AUTOTUNE) |
| |
|
| | with strategy.scope(): |
| | dataset = prepare_packed_dataset(FILE_PATTERN, max_len, batch_size) |
| | dist_dataset = strategy.experimental_distribute_dataset(dataset) |
| | print("✅ 데이터 패킹 및 TPU 분산 파이프라인 준비 완료") |
| |
|
| | class TimeMix(layers.Layer): |
| | def __init__(self, d_model, layer_id, n_layers): |
| | super().__init__() |
| | self.d_model = d_model |
| | ratio = (layer_id / (n_layers - 1)) if n_layers > 1 else 0.5 |
| | decay_speed = np.arange(d_model) |
| | self.time_decay = tf.Variable( |
| | -5 + 8 * (decay_speed / (d_model - 1)) ** (0.7 + 1.3 * ratio), |
| | dtype=tf.float32, name="time_decay" |
| | ) |
| | self.time_first = tf.Variable( |
| | np.ones(d_model) * np.log(0.3), |
| | dtype=tf.float32, name="time_first" |
| | ) |
| | self.time_mix_k = tf.Variable(1 - (ratio ** 0.5), dtype=tf.float32) |
| | self.time_mix_v = tf.Variable(1 - (ratio ** 0.5), dtype=tf.float32) |
| | self.time_mix_r = tf.Variable(1 - (ratio ** 0.2), dtype=tf.float32) |
| | self.key = layers.Dense(d_model, use_bias=False) |
| | self.value = layers.Dense(d_model, use_bias=False) |
| | self.receptance = layers.Dense(d_model, use_bias=False) |
| | self.output_projection = layers.Dense(d_model, use_bias=False) |
| | def call(self, x, training=False): |
| | t_type = x.dtype |
| | tm_k = tf.cast(self.time_mix_k, t_type) |
| | tm_v = tf.cast(self.time_mix_v, t_type) |
| | tm_r = tf.cast(self.time_mix_r, t_type) |
| | xx = tf.pad(x[:, :-1, :], [[0, 0], [1, 0], [0, 0]]) |
| | k = self.key(x * tm_k + xx * (1 - tm_k)) |
| | v = self.value(x * tm_v + xx * (1 - tm_v)) |
| | r = self.receptance(x * tm_r + xx * (1 - tm_r)) |
| | wkv = self.parallel_wkv(k, v) |
| | return self.output_projection(tf.nn.sigmoid(r) * wkv) |
| |
|
| | def parallel_wkv(self, k, v): |
| | t_type = k.dtype |
| | w = tf.cast(tf.exp(self.time_decay), t_type) |
| | u = tf.cast(self.time_first, t_type) |
| | t = tf.shape(k)[1] |
| | t_index = tf.cast(tf.range(t), t_type)[:, None] |
| | s = k - (t_index * w) |
| | kv = tf.exp(s) * v |
| | k_exp = tf.exp(s) |
| | num = tf.cumsum(kv, axis=1) - kv + tf.exp(s + u) * v |
| | den = tf.cumsum(k_exp, axis=1) - k_exp + tf.exp(s + u) |
| | return num / (den + 1e-8) |
| |
|
| | class ChannelMix(layers.Layer): |
| | def __init__(self, d_model, layer_id, n_layers, num_experts=8, top_k=2): |
| | super().__init__() |
| | self.d_model = d_model |
| | self.num_experts = num_experts |
| | self.top_k = top_k |
| | |
| | ratio = (layer_id / (n_layers - 1)) if n_layers > 1 else 0.5 |
| | self.time_mix_k = tf.Variable(1 - (ratio ** 0.5), dtype=tf.float32) |
| | self.time_mix_r = tf.Variable(1 - (ratio ** 0.5), dtype=tf.float32) |
| | |
| | # Gate: 전문가 선택을 위한 로직 |
| | self.gate = layers.Dense(num_experts, use_bias=False) |
| | |
| | # Experts 가중치 |
| | self.key_weight = self.add_weight( |
| | name="expert_key", |
| | shape=(num_experts, d_model, int(d_model * 4)), |
| | initializer="glorot_uniform" |
| | ) |
| | self.value_weight = self.add_weight( |
| | name="expert_value", |
| | shape=(num_experts, int(d_model * 4), d_model), |
| | initializer="glorot_uniform" |
| | ) |
| | self.receptance = layers.Dense(d_model, use_bias=False) |
| |
|
| | def call(self, x, training=False): |
| | t_type = x.dtype |
| | b, t, d = tf.shape(x)[0], tf.shape(x)[1], tf.shape(x)[2] |
| | xx = tf.pad(x[:, :-1, :], [[0, 0], [1, 0], [0, 0]]) |
| | |
| | k_in = x * tf.cast(self.time_mix_k, t_type) + xx * (1 - tf.cast(self.time_mix_k, t_type)) |
| | |
| | # 1. Gate Logits 및 Top-K 선택 |
| | gate_logits = self.gate(k_in) # (B, T, num_experts) |
| | |
| | # Top-K 전문가와 가중치 추출 |
| | raw_weights, indices = tf.math.top_k(gate_logits, k=self.top_k) |
| | gate_weights = tf.nn.softmax(tf.cast(raw_weights, tf.float32)) |
| | gate_weights = tf.cast(gate_weights, t_type) # (B, T, top_k) |
| |
|
| | # 2. Sparse 연산을 위한 Mask 생성 (수치적 안정성 및 로드 밸런싱용) |
| | # 실제로는 모든 전문가를 다 계산한 뒤 Masking하는 방식이 TPU MXU 활용에 유리할 수 있음 |
| | # 여기서는 einsum을 활용하되 선택된 전문가의 영향력만 남김 |
| | masks = tf.one_hot(indices, depth=self.num_experts, dtype=t_type) # (B, T, top_k, num_experts) |
| | final_mask = tf.reduce_sum(masks * tf.expand_dims(gate_weights, -1), axis=2) # (B, T, num_experts) |
| |
|
| | # 3. Auxiliary Loss (전문가 균등 분배) |
| | if training: |
| | # Load Balancing Loss: gate_logits의 확률 분포가 균등하도록 유도 |
| | prob_dist = tf.nn.softmax(tf.cast(gate_logits, tf.float32), axis=-1) |
| | importance = tf.reduce_sum(prob_dist, axis=[0, 1]) |
| | load = tf.reduce_sum(tf.cast(final_mask > 0, tf.float32), axis=[0, 1]) |
| | aux_loss = tf.reduce_sum(importance * load) * (self.num_experts / (tf.cast(b * t, tf.float32) ** 2)) |
| | self.add_loss(0.01 * aux_loss) |
| |
|
| | # 4. 전문가 연산 (Einsum 활용) |
| | # 모든 전문가를 계산하되, mask를 통해 필요한 정보만 남김 (Sparse Approximation) |
| | k_experts = tf.einsum('btd,edh->bteh', k_in, self.key_weight) |
| | k_experts = tf.square(tf.nn.relu(k_experts)) |
| | v_experts = tf.einsum('bteh,ehd->bted', k_experts, self.value_weight) # (B, T, E, D) |
| | |
| | # 5. 가중 합산 (최종 선택된 전문가의 결과만 결합) |
| | kv = tf.reduce_sum(v_experts * tf.expand_dims(final_mask, -1), axis=2) |
| | |
| | # Receptance (Gate) 연산 |
| | r_in = x * tf.cast(self.time_mix_r, t_type) + xx * (1 - tf.cast(self.time_mix_r, t_type)) |
| | r = self.receptance(r_in) |
| | |
| | return tf.nn.sigmoid(r) * kv |
| |
|
| | class Block(layers.Layer): |
| | def __init__(self, d_model, layer_id, n_layers): |
| | super().__init__() |
| | self.ln = layers.LayerNormalization(epsilon=1e-5) |
| | self.time_mix = TimeMix(d_model, layer_id, n_layers) |
| | self.channel_mix = ChannelMix(d_model, layer_id, n_layers) |
| | def call(self, x, training=False): |
| | ln_x = self.ln(x) |
| | return x + self.time_mix(ln_x, training=training) + self.channel_mix(ln_x) |
| |
|
| | class Head(tf.keras.Model): |
| | def __init__(self, vocab_size): |
| | super().__init__() |
| | self.lm_head = layers.Dense(vocab_size, use_bias=False, name="output_head", dtype=policy) |
| | def call(self, x, training=False): |
| | logits = self.lm_head(x) |
| | return tf.cast(logits, tf.float32) |
| | |
| | class LM(tf.keras.Model): |
| | def __init__(self, d_model, n_layers, dropout_rate=0.1): |
| | super().__init__() |
| | self.token_embedding = layers.Embedding(vocab_size, d_model) |
| | self.blocks = [Block(d_model, i, n_layers) for i in range(n_layers)] |
| | self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype=tf.float32) |
| | def call(self, x, training=False): |
| | x = self.token_embedding(x) |
| | for block in self.blocks: |
| | x = block(x, training=training) |
| | x = tf.cast(x, tf.float32) |
| | x = self.ln_f(x) |
| | return x |
| |
|
| | def smoothed_loss_keras(y_true, y_pred, eps=0.1): |
| | y_true = tf.cast(y_true, tf.int32) |
| | mask = tf.cast(tf.not_equal(y_true, pad_id), tf.float32) |
| | vocab = tf.shape(y_pred)[-1] |
| | y_true_oh = tf.one_hot(y_true, depth=vocab, dtype=tf.float32) |
| | y_true_ls = (1.0 - eps) * y_true_oh + eps / tf.cast(vocab, tf.float32) |
| | log_probs = tf.nn.log_softmax(y_pred, axis=-1) |
| | per_tok = -tf.reduce_sum(y_true_ls * log_probs, axis=-1) |
| | return tf.reduce_sum(per_tok * mask) / (tf.reduce_sum(mask) + 1e-8) |
| |
|
| | with strategy.scope(): |
| | blocklm = LM(d_model=512, n_layers=16) |
| | head = Head(vocab_size=vocab_size) |
| |
|
| | inputs = layers.Input(shape=(max_len,), dtype=tf.int32) |
| | x = blocklm(inputs) |
| | outputs = head(x) |
| | model = tf.keras.Model(inputs=inputs, outputs=outputs) |
| | optimizer = tf.keras.optimizers.AdamW(learning_rate=1e-4, weight_decay=0.01) |
| | model.compile(optimizer=optimizer, loss=smoothed_loss_keras) |
| | dummy_input = np.zeros((1, max_len), dtype=np.int32) |
| | model(dummy_input) |
| | model.summary() |
| |
|
| | def get_training_stats(file_pattern, max_len, batch_size): |
| | total_tokens = 0 |
| | files = glob.glob(file_pattern) |
| | for f in files: |
| | with open(f, 'r') as file: |
| | for line in file: |
| | total_tokens += len(line.split()) |
| | total_chunks = total_tokens |
| | steps_per_epoch = total_chunks |
| | return total_tokens, total_chunks, steps_per_epoch |
| |
|
| | #total_tokens, total_chunks, steps_per_epoch = get_training_stats(FILE_PATTERN, max_len, batch_size) |
| |
|
| | #print(f"✅ 총 토큰 수: {total_tokens}") |
| | #print(f"✅ 생성된 총 덩어리(Chunk) 수: {total_chunks}") |
| | #print(f"✅ steps_per_epoch: {steps_per_epoch}") |
| |
|
| | model.fit(dist_dataset, epochs=1, steps_per_epoch=14582) |
| | blocklm.save_weights("blocklm.weights.h5") |
| | head.save_weights("head.weights.h5") |
| |
|
| | print("저장됨") |
| |
|