Neon-tech commited on
Commit
541a74a
Β·
verified Β·
1 Parent(s): 1807b95

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +37 -45
app.py CHANGED
@@ -4,9 +4,10 @@ import time
4
  import socket
5
  import threading
6
  import gc
 
7
  import multiprocessing as mp
8
  from pathlib import Path
9
- import pandas as pd
10
  from tokenizers import Tokenizer
11
 
12
  # ── Config ───────────────────────────────────────────────────────────────────
@@ -56,7 +57,7 @@ def claim_shard(state):
56
  return name, raw_path
57
  return None, None
58
 
59
- # ── Tokenize chunk (subprocess) ───────────────────────────────────────────────
60
  _worker_tokenizer = None
61
 
62
  def init_worker(tok_path):
@@ -64,71 +65,62 @@ def init_worker(tok_path):
64
  _worker_tokenizer = Tokenizer.from_file(tok_path)
65
 
66
  def tokenize_chunk(texts):
67
- results = []
68
- for text in texts:
69
- if not text or not text.strip():
70
- continue
71
- enc = _worker_tokenizer.encode(text)
72
- ids = enc.ids
73
- if len(ids) >= 2:
74
- results.append(ids)
75
- return results
76
-
77
- # ── Process shard using both cores ───────────────────────────────────────────
78
  def process_shard(name, raw_path, pool):
79
  print(f" [{WORKER_ID}] Processing: {name}")
80
 
 
 
 
 
81
  try:
82
- df = pd.read_parquet(raw_path, columns=["text"])
83
  except Exception as e:
 
84
  return False, f"read_failed: {e}"
85
 
86
- total = len(df)
87
- print(f" [{WORKER_ID}] {total:,} rows β€” splitting across 2 cores")
88
-
89
- mid = total // 2
90
- texts1 = df.iloc[:mid]["text"].tolist()
91
- texts2 = df.iloc[mid:]["text"].tolist()
92
- del df
93
- gc.collect()
94
-
95
  try:
96
- results = pool.map(tokenize_chunk, [texts1, texts2])
97
- except Exception as e:
98
- return False, f"tokenize_failed: {e}"
 
99
 
100
- all_ids = results[0] + results[1]
101
- del results, texts1, texts2
102
- gc.collect()
 
103
 
104
- if not all_ids:
105
- return False, "no_tokens_produced"
 
106
 
107
- out_name = name.replace(".parquet", ".jsonl")
108
- out_path = Path(OUT_DIR) / out_name
109
- total_tokens = 0
110
 
111
- try:
112
- with open(out_path, "w", encoding="utf-8") as f:
113
- for ids in all_ids:
114
- f.write(json.dumps({"input_ids": ids}) + "\n")
115
- total_tokens += len(ids)
116
  except Exception as e:
117
  return False, f"write_failed: {e}"
118
 
119
- del all_ids
120
- gc.collect()
121
-
122
  print(f" βœ“ [{WORKER_ID}] {out_name} | {total_tokens:,} tokens")
123
  return True, None
124
 
 
 
 
 
 
 
 
 
125
  # ── Worker loop ───────────────────────────────────────────────────────────────
126
  def worker_loop():
127
  print(f"βœ“ [{WORKER_ID}] Loading tokenizer...")
128
  tok = Tokenizer.from_file(TOK_PATH)
129
  print(f"βœ“ [{WORKER_ID}] Tokenizer ready | vocab: {tok.get_vocab_size():,}")
130
  del tok
131
- gc.collect()
132
 
133
  pool = mp.Pool(processes=2, initializer=init_worker, initargs=(TOK_PATH,))
134
  print(f"βœ“ [{WORKER_ID}] Worker pool ready")
@@ -184,9 +176,9 @@ def worker_loop():
184
  state["shards"][name]["claimed_at"] = None
185
  state["shards"][name]["error"] = error
186
  save_state(state)
187
- print(f" [{WORKER_ID}] Shard failed ({error}), left on disk for retry: {name}")
188
 
189
- gc.collect()
190
  time.sleep(5)
191
 
192
  finally:
 
4
  import socket
5
  import threading
6
  import gc
7
+ import ctypes
8
  import multiprocessing as mp
9
  from pathlib import Path
10
+ import pyarrow.parquet as pq
11
  from tokenizers import Tokenizer
12
 
13
  # ── Config ───────────────────────────────────────────────────────────────────
 
57
  return name, raw_path
58
  return None, None
59
 
60
+ # ── Tokenizer subprocess ──────────────────────────────────────────────────────
61
  _worker_tokenizer = None
62
 
63
  def init_worker(tok_path):
 
65
  _worker_tokenizer = Tokenizer.from_file(tok_path)
66
 
67
  def tokenize_chunk(texts):
68
+ encs = _worker_tokenizer.encode_batch(texts)
69
+ return [e.ids for e in encs if len(e.ids) >= 2]
70
+
71
+ # ── Process shard ─────────────────────────────────────────────────────────────
 
 
 
 
 
 
 
72
  def process_shard(name, raw_path, pool):
73
  print(f" [{WORKER_ID}] Processing: {name}")
74
 
75
+ out_name = name.replace(".parquet", ".jsonl")
76
+ out_path = Path(OUT_DIR) / out_name
77
+ total_tokens = 0
78
+
79
  try:
80
+ pf = pq.ParquetFile(raw_path)
81
  except Exception as e:
82
+ raw_path.unlink(missing_ok=True)
83
  return False, f"read_failed: {e}"
84
 
 
 
 
 
 
 
 
 
 
85
  try:
86
+ with open(out_path, "w", encoding="utf-8") as f:
87
+ for batch in pf.iter_batches(batch_size=5_000, columns=["text"]):
88
+ texts = batch.column("text").to_pylist()
89
+ mid = len(texts) // 2
90
 
91
+ try:
92
+ results = pool.map(tokenize_chunk, [texts[:mid], texts[mid:]])
93
+ except Exception as e:
94
+ return False, f"tokenize_failed: {e}"
95
 
96
+ for ids in results[0] + results[1]:
97
+ f.write(json.dumps({"input_ids": ids}) + "\n")
98
+ total_tokens += len(ids)
99
 
100
+ del texts, results
101
+ gc.collect()
 
102
 
 
 
 
 
 
103
  except Exception as e:
104
  return False, f"write_failed: {e}"
105
 
 
 
 
106
  print(f" βœ“ [{WORKER_ID}] {out_name} | {total_tokens:,} tokens")
107
  return True, None
108
 
109
+ # ── Force full memory flush ───────────────────────────────────────────────────
110
+ def flush_memory():
111
+ gc.collect()
112
+ try:
113
+ ctypes.CDLL("libc.so.6").malloc_trim(0)
114
+ except Exception:
115
+ pass
116
+
117
  # ── Worker loop ───────────────────────────────────────────────────────────────
118
  def worker_loop():
119
  print(f"βœ“ [{WORKER_ID}] Loading tokenizer...")
120
  tok = Tokenizer.from_file(TOK_PATH)
121
  print(f"βœ“ [{WORKER_ID}] Tokenizer ready | vocab: {tok.get_vocab_size():,}")
122
  del tok
123
+ flush_memory()
124
 
125
  pool = mp.Pool(processes=2, initializer=init_worker, initargs=(TOK_PATH,))
126
  print(f"βœ“ [{WORKER_ID}] Worker pool ready")
 
176
  state["shards"][name]["claimed_at"] = None
177
  state["shards"][name]["error"] = error
178
  save_state(state)
179
+ print(f" [{WORKER_ID}] Failed ({error}) β€” left on disk for retry: {name}")
180
 
181
+ flush_memory()
182
  time.sleep(5)
183
 
184
  finally: