Keeby-smilyai commited on
Commit
6b9f465
Β·
verified Β·
1 Parent(s): 479bcae

Update backend.py

Browse files
Files changed (1) hide show
  1. backend.py +67 -98
backend.py CHANGED
@@ -1,4 +1,4 @@
1
- # backend.py β€” FINAL WORKING VERSION
2
  import sqlite3
3
  import threading
4
  import time
@@ -14,38 +14,22 @@ import shutil
14
 
15
  DB_PATH = "llm_kitchen.db"
16
  training_queue = []
17
- active_run_lock = threading.Lock()
18
- active_run_id = None
 
 
 
19
  RUN_TIMEOUT = 48 * 3600 # 48 hours
20
  MAX_RAM_PER_RUN_GB = 1.5
21
 
22
- # ------------------------------ DATABASE ------------------------------
23
-
24
  def init_db():
25
  conn = sqlite3.connect(DB_PATH, check_same_thread=False)
26
  cursor = conn.cursor()
27
  cursor.executescript("""
28
- CREATE TABLE IF NOT EXISTS users (
29
- id INTEGER PRIMARY KEY AUTOINCREMENT,
30
- hf_token TEXT UNIQUE NOT NULL,
31
- created_at DATETIME DEFAULT CURRENT_TIMESTAMP
32
- );
33
- CREATE TABLE IF NOT EXISTS training_runs (
34
- id INTEGER PRIMARY KEY AUTOINCREMENT,
35
- user_id INTEGER NOT NULL,
36
- arch_type TEXT NOT NULL,
37
- num_layers INTEGER NOT NULL,
38
- learning_rate REAL NOT NULL,
39
- epochs INTEGER NOT NULL,
40
- batch_size INTEGER NOT NULL,
41
- status TEXT DEFAULT 'queued',
42
- logs TEXT DEFAULT '',
43
- started_at DATETIME,
44
- completed_at DATETIME,
45
- FOREIGN KEY (user_id) REFERENCES users(id)
46
- );
47
  """)
48
- conn.commit()
49
  conn.close()
50
 
51
  init_db()
@@ -69,10 +53,7 @@ def create_user(hf_token):
69
  return user_id
70
 
71
  def create_training_run(user_id, config):
72
- _, run_id = db_query("""
73
- INSERT INTO training_runs (user_id, arch_type, num_layers, learning_rate, epochs, batch_size)
74
- VALUES (?, ?, ?, ?, ?, ?)
75
- """, (user_id, config['arch_type'], config['num_layers'], config['learning_rate'], config['epochs'], config['batch_size']))
76
  return run_id
77
 
78
  def get_user_runs(user_id):
@@ -98,8 +79,7 @@ def log_update(message, run_id):
98
  if run_id > 0:
99
  db_query("UPDATE training_runs SET logs = logs || ? || ? WHERE id = ?", ('\n', full_msg, run_id))
100
 
101
- # ------------------------------ AUTH ------------------------------
102
-
103
  def verify_hf_token(token):
104
  try:
105
  whoami(token=token)
@@ -111,7 +91,7 @@ def verify_hf_token(token):
111
  except Exception as e:
112
  return None, f"Invalid token. Please try again. ({str(e)})"
113
 
114
- # ------------------------------ TRAINING QUEUE ------------------------------
115
 
116
  def ram_available():
117
  return (psutil.virtual_memory().available / (1024**3)) >= MAX_RAM_PER_RUN_GB
@@ -122,34 +102,53 @@ def queue_training_run(user_id, config):
122
  return run_id
123
 
124
  def start_training_if_free():
125
- global active_run_id
126
- with active_run_lock:
127
- if active_run_id is not None or not training_queue:
128
- return False
129
- if not ram_available():
130
- print("MemoryWarning: Not enough RAM to start new run.")
131
- return False
132
-
133
- job = training_queue.pop(0)
134
- active_run_id = job["run_id"]
135
- update_run_status(active_run_id, "running")
136
- log_update("🍳 Starting kitchen process...", active_run_id)
137
-
138
- thread = threading.Thread(target=run_training_job, args=(job,))
139
- thread.start()
140
- threading.Timer(RUN_TIMEOUT, kill_run_timeout, args=[active_run_id]).start()
141
- return True
142
-
143
- def kill_run_timeout(run_id):
144
- global active_run_id
145
- with active_run_lock:
146
- if active_run_id == run_id:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  log_update(f"Run {run_id}: πŸ’₯ 48-HOUR TIMEOUT REACHED. Terminating.", run_id)
148
  update_run_status(run_id, "timeout")
149
- active_run_id = None
150
- start_training_if_free()
151
-
152
- # ------------------------------ CUSTOM MODELS FROM SCRATCH ------------------------------
 
 
 
 
153
  class CNNLanguageModel(nn.Module):
154
  def __init__(self, vocab_size, embed_dim=128, num_layers=4):
155
  super().__init__()
@@ -165,7 +164,6 @@ class CNNLanguageModel(nn.Module):
165
  logits = self.fc(x)
166
  loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None
167
  return {"loss": loss, "logits": logits}
168
-
169
  class RNNLanguageModel(nn.Module):
170
  def __init__(self, vocab_size, embed_dim=128, hidden_dim=256, num_layers=2):
171
  super().__init__()
@@ -178,7 +176,6 @@ class RNNLanguageModel(nn.Module):
178
  logits = self.fc(output)
179
  loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None
180
  return {"loss": loss, "logits": logits}
181
-
182
  class TransformerLanguageModel(nn.Module):
183
  def __init__(self, vocab_size, embed_dim=128, num_heads=4, num_layers=3):
184
  super().__init__()
@@ -192,13 +189,10 @@ class TransformerLanguageModel(nn.Module):
192
  logits = self.fc(x)
193
  loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None
194
  return {"loss": loss, "logits": logits}
195
-
196
  def get_model(arch_type, vocab_size, num_layers):
197
  models = {"cnn": CNNLanguageModel, "rnn": RNNLanguageModel, "transformer": TransformerLanguageModel}
198
  if arch_type not in models: raise ValueError(f"Unknown arch: {arch_type}")
199
  return models[arch_type](vocab_size, num_layers=num_layers)
200
-
201
- # ------------------------------ DATASET ------------------------------
202
  class TextDataset(Dataset):
203
  def __init__(self, tokenized_data):
204
  self.data = tokenized_data["input_ids"]
@@ -207,58 +201,41 @@ class TextDataset(Dataset):
207
  def __getitem__(self, idx):
208
  return {"input_ids": torch.tensor(self.data[idx]), "labels": torch.tensor(self.data[idx])}
209
 
210
- # ------------------------------ TRAINING JOB ------------------------------
211
 
212
  def run_training_job(job):
213
- global active_run_id
214
  run_id = job["run_id"]
 
215
  try:
216
  device = "cuda" if torch.cuda.is_available() else "cpu"
217
  log_update(f"πŸš€ Device = {device} | RAM available: {psutil.virtual_memory().available / (1024**3):.2f} GB", run_id)
218
-
219
  tokenizer = AutoTokenizer.from_pretrained("gpt2")
220
  tokenizer.pad_token = tokenizer.eos_token
221
  tokenizer_save_path = f"./runs/{run_id}/tokenizer"
222
  os.makedirs(tokenizer_save_path, exist_ok=True)
223
  tokenizer.save_pretrained(tokenizer_save_path)
224
  log_update(f"πŸ’Ύ Tokenizer saved to {tokenizer_save_path}", run_id)
225
-
226
  model = get_model(job["arch_type"], len(tokenizer), job["num_layers"]).to(device)
227
  log_update(f"🧱 Model initialized: {job['arch_type']} x{job['num_layers']} layers", run_id)
228
-
229
  dataset = load_dataset("voidful/reasoning_gemini_300k", split="train[:5000]")
230
- # THIS IS THE FIX: Added padding="max_length"
231
- tokenized_dataset = dataset.map(
232
- lambda ex: tokenizer(
233
- [q + " " + a for q, a in zip(ex["message"], ex["answer"])],
234
- truncation=True,
235
- padding="max_length", # <-- THE FIX IS HERE
236
- max_length=128
237
- ),
238
- batched=True,
239
- remove_columns=dataset.column_names
240
- )
241
  train_loader = DataLoader(TextDataset(tokenized_dataset), batch_size=job["batch_size"], shuffle=True)
242
  optimizer = torch.optim.AdamW(model.parameters(), lr=job["learning_rate"])
243
-
244
  model.train()
245
  log_update(f"▢️ Starting training for {job['epochs']} epochs...", run_id)
246
-
247
  for epoch in range(job["epochs"]):
248
  for step, batch in enumerate(train_loader):
249
  input_ids = batch["input_ids"].to(device)
250
  labels = batch["labels"].to(device)
251
-
252
  optimizer.zero_grad()
253
  outputs = model(input_ids, labels=labels)
254
  loss = outputs["loss"]
255
  loss.backward()
256
  optimizer.step()
257
-
258
  if step % 50 == 0:
259
  log_update(f"Epoch {epoch+1} | Step {step} | Loss: {loss.item():.4f}", run_id)
260
  log_update(f"βœ… Epoch {epoch+1} completed.", run_id)
261
-
262
  model_path = f"./runs/{run_id}"
263
  os.makedirs(model_path, exist_ok=True)
264
  torch.save(model.state_dict(), f"{model_path}/pytorch_model.bin")
@@ -273,49 +250,41 @@ def run_training_job(job):
273
  log_update(success_message, run_id)
274
  update_run_status(run_id, "completed")
275
  finally:
276
- with active_run_lock:
277
- if active_run_id == run_id: active_run_id = None
 
 
278
  start_training_if_free()
279
 
280
- # ------------------------------ INFERENCE ------------------------------
281
-
282
  def run_inference(run_id, prompt):
283
  model_path = f"./runs/{run_id}/pytorch_model.bin"
284
  tokenizer_path = f"./runs/{run_id}/tokenizer"
285
  if not (os.path.exists(model_path) and os.path.exists(tokenizer_path)):
286
  return "ModelError: Model or tokenizer files not found."
287
-
288
  tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)
289
  rows, _ = db_query("SELECT arch_type, num_layers FROM training_runs WHERE id = ?", (run_id,))
290
  if not rows: return "ModelError: Run not found in database."
291
-
292
  arch_type, num_layers = rows[0]
293
  model = get_model(arch_type, len(tokenizer), num_layers)
294
  model.load_state_dict(torch.load(model_path, map_location="cpu"))
295
  model.eval()
296
-
297
  inputs = tokenizer(prompt, return_tensors="pt")
298
  input_ids = inputs.input_ids
299
-
300
  with torch.no_grad():
301
  outputs = model(input_ids)
302
  logits = outputs["logits"]
303
  generated_ids = torch.argmax(logits, dim=-1)
304
  return f"πŸ§‘β€πŸ³ Model says:\n{tokenizer.decode(generated_ids[0], skip_special_tokens=True)}"
305
-
306
- # ------------------------------ PUBLISH TO HUB ------------------------------
307
-
308
  def publish_run_to_hub(run_id, hf_token, repo_name, user_description=""):
309
  local_dir = f"./runs/{run_id}/hub_upload"
310
  shutil.rmtree(local_dir, ignore_errors=True)
311
  os.makedirs(local_dir, exist_ok=True)
312
-
313
  shutil.copy(f"./runs/{run_id}/pytorch_model.bin", f"{local_dir}/pytorch_model.bin")
314
  shutil.copytree(f"./runs/{run_id}/tokenizer", f"{local_dir}/tokenizer", dirs_exist_ok=True)
315
-
316
  readme_content = user_description.strip() or f"# Model from LLM Kitchen - Run #{run_id}"
317
  with open(f"{local_dir}/README.md", "w") as f: f.write(readme_content)
318
-
319
  api = HfApi()
320
  repo_url = api.create_repo(repo_id=repo_name, token=hf_token, exist_ok=True).repo_id
321
  api.upload_folder(folder_path=local_dir, repo_id=repo_url, token=hf_token)
 
1
+ # backend.py β€” PARALLEL PROCESSING VERSION
2
  import sqlite3
3
  import threading
4
  import time
 
14
 
15
  DB_PATH = "llm_kitchen.db"
16
  training_queue = []
17
+ # --- NEW STATE MANAGEMENT FOR PARALLELISM ---
18
+ active_runs = set() # Stores run_ids of currently running jobs
19
+ active_users = set() # Stores user_ids of users with a currently running job
20
+ scheduler_lock = threading.Lock() # Protects access to the queue and active sets
21
+ # --- CONSTANTS ---
22
  RUN_TIMEOUT = 48 * 3600 # 48 hours
23
  MAX_RAM_PER_RUN_GB = 1.5
24
 
25
+ # ------------------------------ DATABASE (No Changes Needed) ------------------------------
 
26
  def init_db():
27
  conn = sqlite3.connect(DB_PATH, check_same_thread=False)
28
  cursor = conn.cursor()
29
  cursor.executescript("""
30
+ CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY AUTOINCREMENT, hf_token TEXT UNIQUE NOT NULL, created_at DATETIME DEFAULT CURRENT_TIMESTAMP);
31
+ CREATE TABLE IF NOT EXISTS training_runs (id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, arch_type TEXT NOT NULL, num_layers INTEGER NOT NULL, learning_rate REAL NOT NULL, epochs INTEGER NOT NULL, batch_size INTEGER NOT NULL, status TEXT DEFAULT 'queued', logs TEXT DEFAULT '', started_at DATETIME, completed_at DATETIME, FOREIGN KEY (user_id) REFERENCES users(id));
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
  """)
 
33
  conn.close()
34
 
35
  init_db()
 
53
  return user_id
54
 
55
  def create_training_run(user_id, config):
56
+ _, run_id = db_query("INSERT INTO training_runs (user_id, arch_type, num_layers, learning_rate, epochs, batch_size) VALUES (?, ?, ?, ?, ?, ?)", (user_id, config['arch_type'], config['num_layers'], config['learning_rate'], config['epochs'], config['batch_size']))
 
 
 
57
  return run_id
58
 
59
  def get_user_runs(user_id):
 
79
  if run_id > 0:
80
  db_query("UPDATE training_runs SET logs = logs || ? || ? WHERE id = ?", ('\n', full_msg, run_id))
81
 
82
+ # ------------------------------ AUTH (No Changes Needed) ------------------------------
 
83
  def verify_hf_token(token):
84
  try:
85
  whoami(token=token)
 
91
  except Exception as e:
92
  return None, f"Invalid token. Please try again. ({str(e)})"
93
 
94
+ # ------------------------------ NEW PARALLEL TRAINING QUEUE ------------------------------
95
 
96
  def ram_available():
97
  return (psutil.virtual_memory().available / (1024**3)) >= MAX_RAM_PER_RUN_GB
 
102
  return run_id
103
 
104
  def start_training_if_free():
105
+ """
106
+ The new scheduler. Tries to start as many jobs as possible from the queue
107
+ based on available RAM and the one-run-per-user constraint.
108
+ """
109
+ with scheduler_lock:
110
+ # Iterate through a copy of the queue as we might modify it
111
+ for job in list(training_queue):
112
+ # 1. Check for global resource constraint (RAM)
113
+ if not ram_available():
114
+ log_update("MemoryWarning: Not enough RAM for new runs. Waiting.", -1)
115
+ break # Stop trying to schedule if we're out of RAM
116
+
117
+ # 2. Check for per-user constraint
118
+ if job["user_id"] in active_users:
119
+ continue # Skip this job, user already has a run. Check next job.
120
+
121
+ # --- If we get here, we can start the job ---
122
+ log_update(f"Scheduler: Starting run #{job['run_id']} for user #{job['user_id']}", -1)
123
+
124
+ # Update state to reflect the new running job
125
+ active_runs.add(job["run_id"])
126
+ active_users.add(job["user_id"])
127
+ training_queue.remove(job)
128
+
129
+ # Update database and start the training thread
130
+ update_run_status(job["run_id"], "running")
131
+ log_update("🍳 Starting kitchen process...", job["run_id"])
132
+
133
+ thread = threading.Thread(target=run_training_job, args=(job,))
134
+ thread.start()
135
+ threading.Timer(RUN_TIMEOUT, kill_run_timeout, args=[job]).start()
136
+
137
+ def kill_run_timeout(job):
138
+ run_id = job["run_id"]
139
+ user_id = job["user_id"]
140
+ with scheduler_lock:
141
+ if run_id in active_runs:
142
  log_update(f"Run {run_id}: πŸ’₯ 48-HOUR TIMEOUT REACHED. Terminating.", run_id)
143
  update_run_status(run_id, "timeout")
144
+ # Free up resources
145
+ active_runs.discard(run_id)
146
+ active_users.discard(user_id)
147
+ # Try to schedule a new job now that resources are free
148
+ start_training_if_free()
149
+
150
+ # ------------------------------ MODELS & DATASET (No Changes Needed) -------------------------
151
+ # ... (All model and dataset classes are unchanged) ...
152
  class CNNLanguageModel(nn.Module):
153
  def __init__(self, vocab_size, embed_dim=128, num_layers=4):
154
  super().__init__()
 
164
  logits = self.fc(x)
165
  loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None
166
  return {"loss": loss, "logits": logits}
 
167
  class RNNLanguageModel(nn.Module):
168
  def __init__(self, vocab_size, embed_dim=128, hidden_dim=256, num_layers=2):
169
  super().__init__()
 
176
  logits = self.fc(output)
177
  loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None
178
  return {"loss": loss, "logits": logits}
 
179
  class TransformerLanguageModel(nn.Module):
180
  def __init__(self, vocab_size, embed_dim=128, num_heads=4, num_layers=3):
181
  super().__init__()
 
189
  logits = self.fc(x)
190
  loss = nn.CrossEntropyLoss()(logits.view(-1, logits.size(-1)), labels.view(-1)) if labels is not None else None
191
  return {"loss": loss, "logits": logits}
 
192
  def get_model(arch_type, vocab_size, num_layers):
193
  models = {"cnn": CNNLanguageModel, "rnn": RNNLanguageModel, "transformer": TransformerLanguageModel}
194
  if arch_type not in models: raise ValueError(f"Unknown arch: {arch_type}")
195
  return models[arch_type](vocab_size, num_layers=num_layers)
 
 
196
  class TextDataset(Dataset):
197
  def __init__(self, tokenized_data):
198
  self.data = tokenized_data["input_ids"]
 
201
  def __getitem__(self, idx):
202
  return {"input_ids": torch.tensor(self.data[idx]), "labels": torch.tensor(self.data[idx])}
203
 
204
+ # ------------------------------ TRAINING JOB (Updated `finally` block) -----------------------
205
 
206
  def run_training_job(job):
 
207
  run_id = job["run_id"]
208
+ user_id = job["user_id"] # Get user_id for state management
209
  try:
210
  device = "cuda" if torch.cuda.is_available() else "cpu"
211
  log_update(f"πŸš€ Device = {device} | RAM available: {psutil.virtual_memory().available / (1024**3):.2f} GB", run_id)
212
+ # (The core training logic remains the same)
213
  tokenizer = AutoTokenizer.from_pretrained("gpt2")
214
  tokenizer.pad_token = tokenizer.eos_token
215
  tokenizer_save_path = f"./runs/{run_id}/tokenizer"
216
  os.makedirs(tokenizer_save_path, exist_ok=True)
217
  tokenizer.save_pretrained(tokenizer_save_path)
218
  log_update(f"πŸ’Ύ Tokenizer saved to {tokenizer_save_path}", run_id)
 
219
  model = get_model(job["arch_type"], len(tokenizer), job["num_layers"]).to(device)
220
  log_update(f"🧱 Model initialized: {job['arch_type']} x{job['num_layers']} layers", run_id)
 
221
  dataset = load_dataset("voidful/reasoning_gemini_300k", split="train[:5000]")
222
+ tokenized_dataset = dataset.map(lambda ex: tokenizer([q + " " + a for q, a in zip(ex["message"], ex["answer"])], truncation=True, padding="max_length", max_length=128), batched=True, remove_columns=dataset.column_names)
 
 
 
 
 
 
 
 
 
 
223
  train_loader = DataLoader(TextDataset(tokenized_dataset), batch_size=job["batch_size"], shuffle=True)
224
  optimizer = torch.optim.AdamW(model.parameters(), lr=job["learning_rate"])
 
225
  model.train()
226
  log_update(f"▢️ Starting training for {job['epochs']} epochs...", run_id)
 
227
  for epoch in range(job["epochs"]):
228
  for step, batch in enumerate(train_loader):
229
  input_ids = batch["input_ids"].to(device)
230
  labels = batch["labels"].to(device)
 
231
  optimizer.zero_grad()
232
  outputs = model(input_ids, labels=labels)
233
  loss = outputs["loss"]
234
  loss.backward()
235
  optimizer.step()
 
236
  if step % 50 == 0:
237
  log_update(f"Epoch {epoch+1} | Step {step} | Loss: {loss.item():.4f}", run_id)
238
  log_update(f"βœ… Epoch {epoch+1} completed.", run_id)
 
239
  model_path = f"./runs/{run_id}"
240
  os.makedirs(model_path, exist_ok=True)
241
  torch.save(model.state_dict(), f"{model_path}/pytorch_model.bin")
 
250
  log_update(success_message, run_id)
251
  update_run_status(run_id, "completed")
252
  finally:
253
+ # --- NEW: Free up resources and trigger scheduler ---
254
+ with scheduler_lock:
255
+ active_runs.discard(run_id)
256
+ active_users.discard(user_id)
257
  start_training_if_free()
258
 
259
+ # ------------------------------ INFERENCE & PUBLISH (No Changes Needed) --------------------
260
+ # ... (run_inference and publish_run_to_hub are unchanged) ...
261
  def run_inference(run_id, prompt):
262
  model_path = f"./runs/{run_id}/pytorch_model.bin"
263
  tokenizer_path = f"./runs/{run_id}/tokenizer"
264
  if not (os.path.exists(model_path) and os.path.exists(tokenizer_path)):
265
  return "ModelError: Model or tokenizer files not found."
 
266
  tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)
267
  rows, _ = db_query("SELECT arch_type, num_layers FROM training_runs WHERE id = ?", (run_id,))
268
  if not rows: return "ModelError: Run not found in database."
 
269
  arch_type, num_layers = rows[0]
270
  model = get_model(arch_type, len(tokenizer), num_layers)
271
  model.load_state_dict(torch.load(model_path, map_location="cpu"))
272
  model.eval()
 
273
  inputs = tokenizer(prompt, return_tensors="pt")
274
  input_ids = inputs.input_ids
 
275
  with torch.no_grad():
276
  outputs = model(input_ids)
277
  logits = outputs["logits"]
278
  generated_ids = torch.argmax(logits, dim=-1)
279
  return f"πŸ§‘β€πŸ³ Model says:\n{tokenizer.decode(generated_ids[0], skip_special_tokens=True)}"
 
 
 
280
  def publish_run_to_hub(run_id, hf_token, repo_name, user_description=""):
281
  local_dir = f"./runs/{run_id}/hub_upload"
282
  shutil.rmtree(local_dir, ignore_errors=True)
283
  os.makedirs(local_dir, exist_ok=True)
 
284
  shutil.copy(f"./runs/{run_id}/pytorch_model.bin", f"{local_dir}/pytorch_model.bin")
285
  shutil.copytree(f"./runs/{run_id}/tokenizer", f"{local_dir}/tokenizer", dirs_exist_ok=True)
 
286
  readme_content = user_description.strip() or f"# Model from LLM Kitchen - Run #{run_id}"
287
  with open(f"{local_dir}/README.md", "w") as f: f.write(readme_content)
 
288
  api = HfApi()
289
  repo_url = api.create_repo(repo_id=repo_name, token=hf_token, exist_ok=True).repo_id
290
  api.upload_folder(folder_path=local_dir, repo_id=repo_url, token=hf_token)