Spaces:
Sleeping
Sleeping
File size: 8,786 Bytes
f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 332b30f b5276b3 332b30f f7d305f b5276b3 332b30f b5276b3 332b30f b5276b3 332b30f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f 332b30f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
import gradio as gr
import torch
import torch.nn as nn
import sentencepiece as spm
import math
# Define Transformer components (unchanged)
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
def scaled_dot_product_attention(self, Q, K, V, mask=None):
attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
attn_probs = torch.softmax(attn_scores, dim=-1)
output = torch.matmul(attn_probs, V)
return output
def split_heads(self, x):
batch_size, seq_length, d_model = x.size()
return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
def combine_heads(self, x):
batch_size, _, seq_length, d_k = x.size()
return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
def forward(self, Q, K, V, mask=None):
Q = self.split_heads(self.W_q(Q))
K = self.split_heads(self.W_k(K))
V = self.split_heads(self.W_v(V))
attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
output = self.W_o(self.combine_heads(attn_output))
return output
class PositionWiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super(PositionWiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()
def forward(self, x):
return self.fc2(self.relu(self.fc1(x)))
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_seq_length):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_seq_length, d_model)
position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x):
return x + self.pe[:, :x.size(1)]
class EncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
attn_output = self.self_attn(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm2(x + self.dropout(ff_output))
return x
class DecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(DecoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.cross_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_output, src_mask, tgt_mask):
attn_output = self.self_attn(x, x, x, tgt_mask)
x = self.norm1(x + self.dropout(attn_output))
attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
x = self.norm2(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm3(x + self.dropout(ff_output))
return x
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
super(Transformer, self).__init__()
self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.fc = nn.Linear(d_model, tgt_vocab_size)
self.dropout = nn.Dropout(dropout)
def generate_mask(self, src, tgt):
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
seq_length = tgt.size(1)
nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
tgt_mask = tgt_mask & nopeak_mask
return src_mask, tgt_mask
def forward(self, src, tgt):
src_mask, tgt_mask = self.generate_mask(src, tgt)
src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
enc_output = src_embedded
for enc_layer in self.encoder_layers:
enc_output = enc_layer(enc_output, src_mask)
dec_output = tgt_embedded
for dec_layer in self.decoder_layers:
dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)
output = self.fc(dec_output)
return output
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Load tokenizers
try:
sp_pseudo = spm.SentencePieceProcessor(model_file="pseudo.model")
sp_code = spm.SentencePieceProcessor(model_file="code.model")
print("Tokenizers loaded successfully.")
except Exception as e:
print(f"Error loading tokenizers: {e}")
raise
# Load the full saved model
model_path = "transformer_cpp_to_pseudo_30.pth"
try:
model = torch.load(model_path, map_location=device, weights_only=False)
model.eval()
model = model.to(device)
print("Model loaded successfully.")
except Exception as e:
print(f"Error loading model: {e}")
raise
def generate_pseudocode(cpp_code, max_len):
"""Generate pseudocode from C++ code with streaming output."""
print(f"Input C++ code: {cpp_code}")
model.eval()
try:
src_tokens = sp_code.encode_as_ids(cpp_code)
print(f"Source tokens: {src_tokens}")
src = torch.tensor([src_tokens], dtype=torch.long, device=device)
tgt = torch.tensor([[2]], dtype=torch.long, device=device) # <bos_id>=2
generated_tokens = [2] # Start with <START>
response = ""
with torch.no_grad():
for i in range(max_len):
output = model(src, tgt)
next_token = output[:, -1, :].argmax(-1).item()
generated_tokens.append(next_token)
tgt = torch.cat([tgt, torch.tensor([[next_token]], device=device)], dim=1)
response = sp_pseudo.decode_ids(generated_tokens)
print(f"Step {i}: Next token = {next_token}, Generated so far: {response}")
yield response # Yield partial output
if next_token == 3: # <END>=3
print("EOS token detected, stopping generation.")
break
yield response # Final output
except Exception as e:
print(f"Error in generation: {e}")
yield f"Error: {e}"
def respond(message, history, max_tokens):
"""Wrapper for Gradio interface."""
print(f"Received message: {message}")
for response in generate_pseudocode(message, max_tokens):
yield response
# Gradio interface
demo = gr.ChatInterface(
respond,
chatbot=gr.Chatbot(label="C++ to Pseudocode Generator"),
textbox=gr.Textbox(placeholder="Enter C++ code (e.g., 'int x = 5; for(int i=0; i<x; i++) cout << i;')", label="C++ Code"),
additional_inputs=[
gr.Slider(minimum=10, maximum=1000, value=50, step=1, label="Max tokens"),
],
title="C++ to Pseudocode Transformer",
description="Convert C++ code to pseudocode using a custom transformer trained on the SPoC dataset.",
)
if __name__ == "__main__":
demo.launch(debug=True) # Enable debug mode for more output |