Spaces:
Runtime error
Runtime error
import os | |
import xml.etree.ElementTree as ET | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from typing import List, Dict, Any, Optional | |
from collections import defaultdict | |
from accelerate import Accelerator | |
from transformers import AutoTokenizer, AutoModel | |
import faiss | |
import numpy as np | |
class DynamicModel(nn.Module): | |
def __init__(self, sections: Dict[str, List[Dict[str, Any]]]): | |
super(DynamicModel, self).__init__() | |
self.sections = nn.ModuleDict() | |
if not sections: | |
sections = { | |
'default': [{ | |
'input_size': 128, | |
'output_size': 256, | |
'activation': 'relu', | |
'batch_norm': True, | |
'dropout': 0.1 | |
}] | |
} | |
for section_name, layers in sections.items(): | |
self.sections[section_name] = nn.ModuleList() | |
for layer_params in layers: | |
print(f"Creating layer in section '{section_name}' with params: {layer_params}") | |
self.sections[section_name].append(self.create_layer(layer_params)) | |
def create_layer(self, layer_params: Dict[str, Any]) -> nn.Module: | |
layers = [] | |
layers.append(nn.Linear(layer_params['input_size'], layer_params['output_size'])) | |
if layer_params.get('batch_norm', False): | |
layers.append(nn.BatchNorm1d(layer_params['output_size'])) | |
activation = layer_params.get('activation', 'relu') | |
if activation == 'relu': | |
layers.append(nn.ReLU(inplace=True)) | |
elif activation == 'tanh': | |
layers.append(nn.Tanh()) | |
elif activation == 'sigmoid': | |
layers.append(nn.Sigmoid()) | |
elif activation == 'leaky_relu': | |
layers.append(nn.LeakyReLU(negative_slope=0.01, inplace=True)) | |
elif activation == 'elu': | |
layers.append(nn.ELU(alpha=1.0, inplace=True)) | |
elif activation is not None: | |
raise ValueError(f"Unsupported activation function: {activation}") | |
if dropout_rate := layer_params.get('dropout', 0.0): | |
layers.append(nn.Dropout(p=dropout_rate)) | |
if hidden_layers := layer_params.get('hidden_layers', []): | |
for hidden_layer_params in hidden_layers: | |
layers.append(self.create_layer(hidden_layer_params)) | |
if layer_params.get('memory_augmentation', False): | |
layers.append(MemoryAugmentationLayer(layer_params['output_size'])) | |
if layer_params.get('hybrid_attention', False): | |
layers.append(HybridAttentionLayer(layer_params['output_size'])) | |
if layer_params.get('dynamic_flash_attention', False): | |
layers.append(DynamicFlashAttentionLayer(layer_params['output_size'])) | |
return nn.Sequential(*layers) | |
def forward(self, x: torch.Tensor, section_name: Optional[str] = None) -> torch.Tensor: | |
if section_name is not None: | |
if section_name not in self.sections: | |
raise KeyError(f"Section '{section_name}' not found in model") | |
for layer in self.sections[section_name]: | |
x = layer(x) | |
else: | |
for section_name, layers in self.sections.items(): | |
for layer in layers: | |
x = layer(x) | |
return x | |
class MemoryAugmentationLayer(nn.Module): | |
def __init__(self, size: int): | |
super(MemoryAugmentationLayer, self).__init__() | |
self.memory = nn.Parameter(torch.randn(size)) | |
def forward(self, x: torch.Tensor) -> torch.Tensor: | |
return x + self.memory | |
class HybridAttentionLayer(nn.Module): | |
def __init__(self, size: int): | |
super(HybridAttentionLayer, self).__init__() | |
self.attention = nn.MultiheadAttention(size, num_heads=8) | |
def forward(self, x: torch.Tensor) -> torch.Tensor: | |
x = x.unsqueeze(1) # Add sequence dimension | |
attn_output, _ = self.attention(x, x, x) | |
return attn_output.squeeze(1) | |
class DynamicFlashAttentionLayer(nn.Module): | |
def __init__(self, size: int): | |
super(DynamicFlashAttentionLayer, self).__init__() | |
self.attention = nn.MultiheadAttention(size, num_heads=8) | |
def forward(self, x: torch.Tensor) -> torch.Tensor: | |
x = x.unsqueeze(1) # Add sequence dimension | |
attn_output, _ = self.attention(x, x, x) | |
return attn_output.squeeze(1) | |
def parse_xml_file(file_path: str) -> List[Dict[str, Any]]: | |
tree = ET.parse(file_path) | |
root = tree.getroot() | |
layers = [] | |
for layer in root.findall('.//layer'): | |
layer_params = {} | |
layer_params['input_size'] = int(layer.get('input_size', 128)) | |
layer_params['output_size'] = int(layer.get('output_size', 256)) | |
layer_params['activation'] = layer.get('activation', 'relu').lower() | |
if layer_params['activation'] not in ['relu', 'tanh', 'sigmoid', 'none']: | |
raise ValueError(f"Unsupported activation function: {layer_params['activation']}") | |
if layer_params['input_size'] <= 0 or layer_params['output_size'] <= 0: | |
raise ValueError("Layer dimensions must be positive integers") | |
layers.append(layer_params) | |
if not layers: | |
layers.append({ | |
'input_size': 128, | |
'output_size': 256, | |
'activation': 'relu' | |
}) | |
return layers | |
def create_model_from_folder(folder_path: str) -> DynamicModel: | |
sections = defaultdict(list) | |
if not os.path.exists(folder_path): | |
print(f"Warning: Folder {folder_path} does not exist. Creating model with default configuration.") | |
return DynamicModel({}) | |
xml_files_found = False | |
for root, dirs, files in os.walk(folder_path): | |
for file in files: | |
if file.endswith('.xml'): | |
xml_files_found = True | |
file_path = os.path.join(root, file) | |
try: | |
layers = parse_xml_file(file_path) | |
section_name = os.path.basename(root).replace('.', '_') | |
sections[section_name].extend(layers) | |
except Exception as e: | |
print(f"Error processing {file_path}: {str(e)}") | |
if not xml_files_found: | |
print("Warning: No XML files found. Creating model with default configuration.") | |
return DynamicModel({}) | |
return DynamicModel(dict(sections)) | |
def create_embeddings_and_stores(folder_path: str, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModel.from_pretrained(model_name) | |
vector_store = faiss.IndexFlatL2(384) # Assuming 384-dimensional embeddings | |
doc_store = [] | |
for root, dirs, files in os.walk(folder_path): | |
for file in files: | |
if file.endswith('.xml'): | |
file_path = os.path.join(root, file) | |
try: | |
tree = ET.parse(file_path) | |
root = tree.getroot() | |
for elem in root.iter(): | |
if elem.text: | |
text = elem.text.strip() | |
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
with torch.no_grad(): | |
embeddings = model(**inputs).last_hidden_state.mean(dim=1).numpy() | |
vector_store.add(embeddings) | |
doc_store.append(text) | |
except Exception as e: | |
print(f"Error processing {file_path}: {str(e)}") | |
return vector_store, doc_store | |
def query_vector_store(query: str, vector_store, doc_store, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModel.from_pretrained(model_name) | |
inputs = tokenizer(query, return_tensors="pt", truncation=True, padding=True) | |
with torch.no_grad(): | |
query_embedding = model(**inputs).last_hidden_state.mean(dim=1).numpy() | |
D, I = vector_store.search(query_embedding, k=5) # Retrieve top 5 documents | |
results = [doc_store[i] for i in I[0]] | |
return results | |
def main(): | |
folder_path = 'data' | |
model = create_model_from_folder(folder_path) | |
print(f"Created dynamic PyTorch model with sections: {list(model.sections.keys())}") | |
first_section = next(iter(model.sections.keys())) | |
first_layer = model.sections[first_section][0] | |
input_features = first_layer[0].in_features | |
sample_input = torch.randn(1, input_features) | |
output = model(sample_input) | |
print(f"Sample output shape: {output.shape}") | |
vector_store, doc_store = create_embeddings_and_stores(folder_path) | |
accelerator = Accelerator() | |
optimizer = torch.optim.Adam(model.parameters(), lr=0.001) | |
criterion = nn.CrossEntropyLoss() | |
num_epochs = 10 | |
dataset = torch.utils.data.TensorDataset( | |
torch.randn(100, input_features), | |
torch.randint(0, 2, (100,)) | |
) | |
train_dataloader = torch.utils.data.DataLoader( | |
dataset, | |
batch_size=16, | |
shuffle=True | |
) | |
model, optimizer, train_dataloader = accelerator.prepare( | |
model, | |
optimizer, | |
train_dataloader | |
) | |
for epoch in range(num_epochs): | |
model.train() | |
total_loss = 0 | |
for batch_idx, (inputs, labels) in enumerate(train_dataloader): | |
optimizer.zero_grad() | |
outputs = model(inputs) | |
loss = criterion(outputs, labels) | |
accelerator.backward(loss) | |
optimizer.step() | |
total_loss += loss.item() | |
avg_loss = total_loss / len(train_dataloader) | |
print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}") | |
# Query the vector store after training | |
user_query = "example query text" | |
results = query_vector_store(user_query, vector_store, doc_store) | |
print(f"Query results: {results}") | |
if __name__ == "__main__": | |
main() |