Spaces:
Sleeping
Sleeping
File size: 8,786 Bytes
f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 332b30f b5276b3 332b30f f7d305f b5276b3 332b30f b5276b3 332b30f b5276b3 332b30f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f b5276b3 f7d305f 332b30f |
|
import gradio as gr
import torch
import torch.nn as nn
import sentencepiece as spm
import math
# Define Transformer components (unchanged)
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super(MultiHeadAttention, self).__init__()
assert d_model % num_heads == 0
self.d_model = d_model
self.num_heads = num_heads
self.d_k = d_model // num_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
def scaled_dot_product_attention(self, Q, K, V, mask=None):
attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
if mask is not None:
attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
attn_probs = torch.softmax(attn_scores, dim=-1)
output = torch.matmul(attn_probs, V)
return output
def split_heads(self, x):
batch_size, seq_length, d_model = x.size()
return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
def combine_heads(self, x):
batch_size, _, seq_length, d_k = x.size()
return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
def forward(self, Q, K, V, mask=None):
Q = self.split_heads(self.W_q(Q))
K = self.split_heads(self.W_k(K))
V = self.split_heads(self.W_v(V))
attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
output = self.W_o(self.combine_heads(attn_output))
return output
class PositionWiseFeedForward(nn.Module):
def __init__(self, d_model, d_ff):
super(PositionWiseFeedForward, self).__init__()
self.fc1 = nn.Linear(d_model, d_ff)
self.fc2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()
def forward(self, x):
return self.fc2(self.relu(self.fc1(x)))
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_seq_length):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_seq_length, d_model)
position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x):
return x + self.pe[:, :x.size(1)]
class EncoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, mask):
attn_output = self.self_attn(x, x, x, mask)
x = self.norm1(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm2(x + self.dropout(ff_output))
return x
class DecoderLayer(nn.Module):
def __init__(self, d_model, num_heads, d_ff, dropout):
super(DecoderLayer, self).__init__()
self.self_attn = MultiHeadAttention(d_model, num_heads)
self.cross_attn = MultiHeadAttention(d_model, num_heads)
self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x, enc_output, src_mask, tgt_mask):
attn_output = self.self_attn(x, x, x, tgt_mask)
x = self.norm1(x + self.dropout(attn_output))
attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
x = self.norm2(x + self.dropout(attn_output))
ff_output = self.feed_forward(x)
x = self.norm3(x + self.dropout(ff_output))
return x
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
super(Transformer, self).__init__()
self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
self.fc = nn.Linear(d_model, tgt_vocab_size)
self.dropout = nn.Dropout(dropout)
def generate_mask(self, src, tgt):
src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
seq_length = tgt.size(1)
nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
tgt_mask = tgt_mask & nopeak_mask
return src_mask, tgt_mask
def forward(self, src, tgt):
src_mask, tgt_mask = self.generate_mask(src, tgt)
src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
enc_output = src_embedded
for enc_layer in self.encoder_layers:
enc_output = enc_layer(enc_output, src_mask)
dec_output = tgt_embedded
for dec_layer in self.decoder_layers:
dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)
output = self.fc(dec_output)
return output
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Load tokenizers
try:
sp_pseudo = spm.SentencePieceProcessor(model_file="pseudo.model")
sp_code = spm.SentencePieceProcessor(model_file="code.model")
print("Tokenizers loaded successfully.")
except Exception as e:
print(f"Error loading tokenizers: {e}")
raise
# Load the full saved model
model_path = "transformer_cpp_to_pseudo_30.pth"
try:
model = torch.load(model_path, map_location=device, weights_only=False)
model.eval()
model = model.to(device)
print("Model loaded successfully.")
except Exception as e:
print(f"Error loading model: {e}")
raise
def generate_pseudocode(cpp_code, max_len):
"""Generate pseudocode from C++ code with streaming output."""
print(f"Input C++ code: {cpp_code}")
model.eval()
try:
src_tokens = sp_code.encode_as_ids(cpp_code)
print(f"Source tokens: {src_tokens}")
src = torch.tensor([src_tokens], dtype=torch.long, device=device)
tgt = torch.tensor([[2]], dtype=torch.long, device=device) # <bos_id>=2
generated_tokens = [2] # Start with <START>
response = ""
with torch.no_grad():
for i in range(max_len):
output = model(src, tgt)
next_token = output[:, -1, :].argmax(-1).item()
generated_tokens.append(next_token)
tgt = torch.cat([tgt, torch.tensor([[next_token]], device=device)], dim=1)
response = sp_pseudo.decode_ids(generated_tokens)
print(f"Step {i}: Next token = {next_token}, Generated so far: {response}")
yield response # Yield partial output
if next_token == 3: # <END>=3
print("EOS token detected, stopping generation.")
break
yield response # Final output
except Exception as e:
print(f"Error in generation: {e}")
yield f"Error: {e}"
def respond(message, history, max_tokens):
"""Wrapper for Gradio interface."""
print(f"Received message: {message}")
for response in generate_pseudocode(message, max_tokens):
yield response
# Gradio interface
demo = gr.ChatInterface(
respond,
chatbot=gr.Chatbot(label="C++ to Pseudocode Generator"),
textbox=gr.Textbox(placeholder="Enter C++ code (e.g., 'int x = 5; for(int i=0; i<x; i++) cout << i;')", label="C++ Code"),
additional_inputs=[
gr.Slider(minimum=10, maximum=1000, value=50, step=1, label="Max tokens"),
],
title="C++ to Pseudocode Transformer",
description="Convert C++ code to pseudocode using a custom transformer trained on the SPoC dataset.",
)
if __name__ == "__main__":
demo.launch(debug=True) # Enable debug mode for more output |