Spaces:
Sleeping
Sleeping
| """Dormouse seq2seq v3 training on ZeroGPU. | |
| v3: dropout, label smoothing, smaller model (embed=64, hidden=128). | |
| """ | |
| import json | |
| import os | |
| import random | |
| import gradio as gr | |
| import spaces | |
| import torch | |
| import torch.nn as nn | |
| from huggingface_hub import HfApi | |
| from torch.utils.data import DataLoader, Dataset | |
| # --- Vocab --- | |
| class Vocab: | |
| PAD, SOS, EOS, UNK = 0, 1, 2, 3 | |
| def __init__(self): | |
| self.word2idx = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3} | |
| self.idx2word = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"} | |
| def build(self, texts, min_freq=2): | |
| from collections import Counter | |
| counter = Counter() | |
| for t in texts: | |
| for w in t.lower().split(): | |
| counter[w] += 1 | |
| for w, freq in counter.most_common(): | |
| if freq < min_freq: | |
| continue | |
| if w not in self.word2idx: | |
| idx = len(self.word2idx) | |
| self.word2idx[w] = idx | |
| self.idx2word[idx] = w | |
| def encode(self, text, max_len=16): | |
| words = text.lower().split()[:max_len - 2] | |
| return [self.SOS] + [self.word2idx.get(w, self.UNK) for w in words] + [self.EOS] | |
| def decode(self, ids): | |
| words = [] | |
| for idx in ids: | |
| if idx == self.EOS: break | |
| if idx in (self.PAD, self.SOS): continue | |
| words.append(self.idx2word.get(idx, "<UNK>")) | |
| return " ".join(words) | |
| def __len__(self): return len(self.word2idx) | |
| # --- Model v3: з dropout --- | |
| class Enc(nn.Module): | |
| def __init__(self, vs, ed=64, hd=128, drop=0.3): | |
| super().__init__() | |
| self.emb = nn.Embedding(vs, ed, padding_idx=0) | |
| self.emb_drop = nn.Dropout(drop) | |
| self.rnn = nn.GRU(ed, hd, batch_first=True, bidirectional=True) | |
| self.fc = nn.Linear(hd*2, hd) | |
| self.drop = nn.Dropout(drop) | |
| def forward(self, x): | |
| o, h = self.rnn(self.emb_drop(self.emb(x))) | |
| h = self.drop(torch.tanh(self.fc(torch.cat((h[-2], h[-1]), 1)))).unsqueeze(0) | |
| return o, h | |
| class Attn(nn.Module): | |
| def __init__(self, hd=128): | |
| super().__init__() | |
| self.a = nn.Linear(hd*3, hd) | |
| self.v = nn.Linear(hd, 1, bias=False) | |
| def forward(self, h, eo): | |
| h = h.permute(1,0,2).repeat(1, eo.shape[1], 1) | |
| return torch.softmax(self.v(torch.tanh(self.a(torch.cat((h, eo), 2)))).squeeze(2), 1) | |
| class Dec(nn.Module): | |
| def __init__(self, vs, ed=64, hd=128, drop=0.3): | |
| super().__init__() | |
| self.emb = nn.Embedding(vs, ed, padding_idx=0) | |
| self.emb_drop = nn.Dropout(drop) | |
| self.attn = Attn(hd) | |
| self.rnn = nn.GRU(ed+hd*2, hd, batch_first=True) | |
| self.fc = nn.Linear(hd, vs) | |
| self.drop = nn.Dropout(drop) | |
| def forward(self, x, h, eo): | |
| e = self.emb_drop(self.emb(x.unsqueeze(1))) | |
| c = torch.bmm(self.attn(h, eo).unsqueeze(1), eo) | |
| o, h = self.rnn(torch.cat((e,c),2), h) | |
| return self.fc(self.drop(o.squeeze(1))), h | |
| class ExprModel(nn.Module): | |
| def __init__(self, svs, tvs, ed=64, hd=128, drop=0.3): | |
| super().__init__() | |
| self.enc = Enc(svs, ed, hd, drop) | |
| self.dec = Dec(tvs, ed, hd, drop) | |
| self.tvs = tvs | |
| def forward(self, src, tgt, tf=0.5): | |
| bs, tl = src.shape[0], tgt.shape[1] | |
| out = torch.zeros(bs, tl, self.tvs, device=src.device) | |
| eo, h = self.enc(src) | |
| inp = tgt[:,0] | |
| for t in range(1, tl): | |
| o, h = self.dec(inp, h, eo) | |
| out[:,t] = o | |
| inp = tgt[:,t] if random.random() < tf else o.argmax(1) | |
| return out | |
| def translate(self, src, tv, ml=16): | |
| self.train(False) | |
| with torch.no_grad(): | |
| eo, h = self.enc(src.unsqueeze(0)) | |
| inp = torch.tensor([tv.SOS], device=src.device) | |
| res = [] | |
| for _ in range(ml): | |
| o, h = self.dec(inp, h, eo) | |
| t = o.argmax(1).item() | |
| if t == tv.EOS: break | |
| res.append(t) | |
| inp = torch.tensor([t], device=src.device) | |
| return tv.decode(res) | |
| # --- Dataset --- | |
| class DS(Dataset): | |
| def __init__(self, s, t, sv, tv): | |
| self.s, self.t, self.sv, self.tv = s, t, sv, tv | |
| def __len__(self): return len(self.s) | |
| def __getitem__(self, i): | |
| return self.sv.encode(self.s[i]), self.tv.encode(self.t[i]) | |
| def collate(batch): | |
| ss, tt = zip(*batch) | |
| ms, mt = max(len(s) for s in ss), max(len(t) for t in tt) | |
| return ( | |
| torch.tensor([s + [0]*(ms-len(s)) for s in ss]), | |
| torch.tensor([t + [0]*(mt-len(t)) for t in tt]), | |
| ) | |
| def augment(sources, targets, factor=3): | |
| aug_s, aug_t = list(sources), list(targets) | |
| for _ in range(factor - 1): | |
| for s, t in zip(sources, targets): | |
| words = s.split() | |
| if len(words) < 2: continue | |
| if len(words) >= 2 and random.random() < 0.3: | |
| i = random.randint(0, len(words)-2) | |
| words[i], words[i+1] = words[i+1], words[i] | |
| if len(words) > 2 and random.random() < 0.2: | |
| di = random.randint(0, len(words)-1) | |
| words = words[:di] + words[di+1:] | |
| if len(words) >= 2 and random.random() < 0.1: | |
| ri = random.randint(0, len(words)-1) | |
| words.insert(ri, words[ri]) | |
| aug_s.append(" ".join(words)) | |
| aug_t.append(t) | |
| return aug_s, aug_t | |
| def train_model(epochs=200, batch_size=128, augment_factor=3, dropout=0.3, label_smoothing=0.1): | |
| """Train seq2seq v3 on GPU.""" | |
| with open("expression_pairs.json") as f: | |
| pairs = json.load(f) | |
| sources = [p["ua"] for p in pairs] | |
| targets = [p["en"] for p in pairs] | |
| log = f"Expression pairs: {len(pairs)}\n" | |
| sources, targets = augment(sources, targets, augment_factor) | |
| log += f"After augmentation (x{augment_factor}): {len(sources)}\n" | |
| src_vocab, tgt_vocab = Vocab(), Vocab() | |
| src_vocab.build(sources, min_freq=2) | |
| tgt_vocab.build(targets, min_freq=2) | |
| log += f"UA vocab: {len(src_vocab)}, EN vocab: {len(tgt_vocab)}\n" | |
| # 80/20 split | |
| idx = list(range(len(sources))) | |
| random.shuffle(idx) | |
| split = int(0.8 * len(idx)) | |
| tr_s = [sources[i] for i in idx[:split]] | |
| tr_t = [targets[i] for i in idx[:split]] | |
| va_s = [sources[i] for i in idx[split:]] | |
| va_t = [targets[i] for i in idx[split:]] | |
| train_dl = DataLoader(DS(tr_s, tr_t, src_vocab, tgt_vocab), batch_size=batch_size, shuffle=True, collate_fn=collate) | |
| val_dl = DataLoader(DS(va_s, va_t, src_vocab, tgt_vocab), batch_size=batch_size, collate_fn=collate) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model = ExprModel(len(src_vocab), len(tgt_vocab), ed=64, hd=128, drop=dropout).to(device) | |
| opt = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-5) | |
| sched = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, patience=10, factor=0.5) | |
| crit = nn.CrossEntropyLoss(ignore_index=0, label_smoothing=label_smoothing) | |
| params = sum(p.numel() for p in model.parameters()) | |
| log += f"Parameters: {params:,}\nDevice: {device}\n" | |
| log += f"Dropout: {dropout}, Label smoothing: {label_smoothing}\n\n" | |
| best_vl = float("inf") | |
| no_imp = 0 | |
| for ep in range(1, epochs + 1): | |
| model.train() | |
| tl = 0 | |
| for s, t in train_dl: | |
| s, t = s.to(device), t.to(device) | |
| opt.zero_grad() | |
| tf = max(0.1, 0.5 - ep * 0.002) | |
| o = model(s, t, tf) | |
| o = o[:, 1:].reshape(-1, o.shape[-1]) | |
| loss = crit(o, t[:, 1:].reshape(-1)) | |
| loss.backward() | |
| nn.utils.clip_grad_norm_(model.parameters(), 1.0) | |
| opt.step() | |
| tl += loss.item() | |
| tl /= len(train_dl) | |
| model.train(False) | |
| vl = 0 | |
| with torch.no_grad(): | |
| for s, t in val_dl: | |
| s, t = s.to(device), t.to(device) | |
| o = model(s, t, 0) | |
| o = o[:, 1:].reshape(-1, o.shape[-1]) | |
| vl += crit(o, t[:, 1:].reshape(-1)).item() | |
| vl /= max(len(val_dl), 1) | |
| sched.step(vl) | |
| if ep % 10 == 0 or ep == 1: | |
| correct, total = 0, 0 | |
| with torch.no_grad(): | |
| for s, t in val_dl: | |
| s = s.to(device) | |
| for i in range(min(s.shape[0], 50)): | |
| pred = model.translate(s[i], tgt_vocab) | |
| ref = tgt_vocab.decode(t[i].tolist()) | |
| if set(pred.lower().split()) == set(ref.lower().split()): | |
| correct += 1 | |
| total += 1 | |
| acc = correct / max(total, 1) * 100 | |
| lr = opt.param_groups[0]["lr"] | |
| line = f"Epoch {ep:3d} | train: {tl:.4f} | val: {vl:.4f} | exact: {acc:.1f}% | lr: {lr:.6f}" | |
| log += line + "\n" | |
| print(line) | |
| if vl < best_vl: | |
| best_vl = vl | |
| no_imp = 0 | |
| torch.save(model.cpu().state_dict(), "/tmp/expr_seq2seq.pt") | |
| model.to(device) | |
| with open("/tmp/expr_vocab_src.json", "w") as f: | |
| json.dump(src_vocab.word2idx, f, ensure_ascii=False) | |
| with open("/tmp/expr_vocab_tgt.json", "w") as f: | |
| json.dump(tgt_vocab.word2idx, f, ensure_ascii=False) | |
| with open("/tmp/expr_config.json", "w") as f: | |
| json.dump({"src_vocab_size": len(src_vocab), "tgt_vocab_size": len(tgt_vocab), | |
| "embed_dim": 64, "hidden_dim": 128, "dropout": dropout, | |
| "pairs_count": len(pairs)}, f) | |
| else: | |
| no_imp += 1 | |
| if no_imp >= 25: | |
| log += f"Early stopping at epoch {ep}\n" | |
| break | |
| # Examples | |
| model.load_state_dict(torch.load("/tmp/expr_seq2seq.pt", map_location=device, weights_only=True)) | |
| model.to(device) | |
| model.train(False) | |
| log += f"\nBest val_loss: {best_vl:.4f}\n\nExamples:\n" | |
| for i in range(min(20, len(va_s))): | |
| si = torch.tensor(src_vocab.encode(va_s[i]), device=device) | |
| pred = model.translate(si, tgt_vocab) | |
| log += f" {va_s[i]:<35} -> {pred:<25} (ref: {va_t[i]})\n" | |
| # Push to Hub | |
| token = os.environ.get("HF_TOKEN") | |
| if token: | |
| api = HfApi(token=token) | |
| repo = "Dariachup/dormouse-expression-pairs" | |
| for fname in ["expr_seq2seq.pt", "expr_vocab_src.json", "expr_vocab_tgt.json", "expr_config.json"]: | |
| api.upload_file( | |
| path_or_fileobj=f"/tmp/{fname}", | |
| path_in_repo=f"model/{fname}", | |
| repo_id=repo, | |
| repo_type="dataset", | |
| ) | |
| log += f"\nModel pushed to {repo}/model/\n" | |
| return log | |
| with gr.Blocks(title="Dormouse seq2seq v3 Training") as demo: | |
| gr.Markdown("# Dormouse seq2seq v3 — Expression UA→EN Training") | |
| gr.Markdown("v3: dropout, label smoothing, smaller model (2M params vs 7M).") | |
| with gr.Row(): | |
| epochs = gr.Slider(10, 300, value=200, step=10, label="Epochs") | |
| batch_size = gr.Slider(32, 256, value=128, step=32, label="Batch size") | |
| aug = gr.Slider(1, 5, value=3, step=1, label="Augmentation factor") | |
| with gr.Row(): | |
| dropout = gr.Slider(0.0, 0.5, value=0.3, step=0.05, label="Dropout") | |
| label_smooth = gr.Slider(0.0, 0.3, value=0.1, step=0.05, label="Label smoothing") | |
| btn = gr.Button("Train", variant="primary") | |
| output = gr.Textbox(label="Training log", lines=30) | |
| btn.click(train_model, inputs=[epochs, batch_size, aug, dropout, label_smooth], outputs=output) | |
| demo.launch() | |