Neon-tech commited on
Commit
675be9e
Β·
verified Β·
1 Parent(s): 1340c9c

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +74 -124
app.py CHANGED
@@ -3,10 +3,8 @@ import json
3
  import time
4
  import socket
5
  import threading
6
- import re
7
  import requests
8
  import pyarrow.parquet as pq
9
- import pyarrow as pa
10
  import gc
11
  from pathlib import Path
12
  from huggingface_hub import HfApi
@@ -16,25 +14,20 @@ HF_TOKEN = os.environ.get("HF_TOKEN")
16
  RAW_DIR = "/data/raw"
17
  STATE_FILE = "/data/state.json"
18
  WORKER_TIMEOUT = 600
19
- MAX_BUFFERED = 50
20
- ROWS_PER_CHUNK = 50_000
21
 
22
  os.makedirs(RAW_DIR, exist_ok=True)
23
- api = HfApi(token=HF_TOKEN)
24
-
25
  AUTH_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"}
26
 
27
  # ── Sources ───────────────────────────────────────────────────────────────────
28
- # Each source: (name, type, urls_or_config)
29
- # Types: parquet_list, hf_list
30
- # For hf_list: uses HF API to discover files
31
  SOURCES = [
32
  {
33
  "name" : "fineweb",
34
  "type" : "hf_list",
35
  "repo" : "HuggingFaceFW/fineweb-edu",
36
  "prefix" : "data/CC-MAIN-2025-26",
37
- "skip" : 0, # already in state from prev run β€” coordinator skips done ones
38
  "take" : 10,
39
  "text_col": "text",
40
  },
@@ -43,7 +36,7 @@ SOURCES = [
43
  "type" : "hf_list",
44
  "repo" : "wikimedia/wikipedia",
45
  "prefix" : "20231101.en/train-",
46
- "skip" : 2, # first 2 already done in 50M
47
  "take" : 18,
48
  "text_col": "text",
49
  },
@@ -56,31 +49,16 @@ SOURCES = [
56
  "take" : 6,
57
  "text_col": "text",
58
  },
59
- {
60
- "name" : "phi",
61
- "type" : "url_list",
62
- "urls" : [
63
- "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00000-of-00004-ea05c5cb63b570a8.parquet?download=true",
64
- "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00001-of-00004-d99cbe052bab0d4e.parquet?download=true",
65
- "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00002-of-00004-2c25f0e11d537eaf.parquet?download=true",
66
- "https://huggingface.co/datasets/open-phi/programming_books_llama/resolve/main/data/train-00003-of-00004-faa8dbb07e5f02e8.parquet?download=true",
67
- ],
68
- "text_col": "markdown",
69
- },
70
  {
71
  "name" : "code",
72
  "type" : "url_list",
73
- "urls" : [
74
- # 12 new languages Γ— 2 shards = 24 files
75
- # Base: https://huggingface.co/datasets/Neon-tech/Dataset-arranger/resolve/main/by-language
76
- *[
77
- f"https://huggingface.co/datasets/Neon-tech/Dataset-arranger/resolve/main/by-language/{lang}/shard_{str(i).zfill(6)}.jsonl?download=true"
78
- for lang in ["C", "C++", "Java", "Go", "Rust", "Ruby", "PHP", "SQL", "C#", "Scala", "Lua", "Perl"]
79
- for i in range(2)
80
- ],
81
- ],
82
  "text_col": "text",
83
  "fmt" : "jsonl",
 
 
 
 
 
84
  },
85
  ]
86
 
@@ -96,12 +74,6 @@ def serve():
96
  conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
97
  conn.close()
98
 
99
- # ── Friendly name ─────────────────────────────────────────────────────────────
100
- def friendly_name(source_name, url_or_path):
101
- # Strip query string
102
- base = url_or_path.split("?")[0].split("/")[-1]
103
- return f"{source_name}__{base}"
104
-
105
  # ── State ─────────────────────────────────────────────────────────────────────
106
  def load_state():
107
  if os.path.exists(STATE_FILE):
@@ -124,46 +96,50 @@ def save_state(state):
124
  json.dump(state, f, indent=2)
125
  os.replace(tmp, STATE_FILE)
126
 
127
- # ── Discover all sources ──────────────────────────────────────────────────────
128
  def discover_all(state):
129
- known = {v["url"] for v in state["shards"].values()} | {e["url"] for e in state.get("queue", [])}
130
- new_count = 0
131
 
132
  for src in SOURCES:
133
  name = src["name"]
134
  print(f"\nDiscovering: {name}")
135
 
136
  if src["type"] == "hf_list":
137
- files = list(api.list_repo_files(src["repo"], repo_type="dataset"))
138
- files = [f for f in files if f.startswith(src["prefix"]) and f.endswith(".parquet")]
139
- files = sorted(files)
140
- files = files[src["skip"]: src["skip"] + src["take"]]
141
- base = f"https://huggingface.co/datasets/{src['repo']}/resolve/main/"
142
- urls = [base + f for f in files]
143
- fmt = "parquet"
 
144
  else:
145
  urls = src["urls"]
146
  fmt = src.get("fmt", "parquet")
147
 
 
148
  for url in urls:
149
- if url not in known:
150
  state["queue"].append({
151
  "url" : url,
152
  "source" : name,
153
  "text_col" : src["text_col"],
154
  "fmt" : fmt,
155
  })
156
- known.add(url)
157
  new_count += 1
 
158
 
159
- print(f" {name}: {len(urls)} files | {new_count} new queued")
160
 
161
  save_state(state)
162
  print(f"\nTotal queued: {len(state['queue'])} | In state: {len(state['shards'])}")
163
 
164
  # ── Reclaim stale ─────────────────────────────────────────────────────────────
165
  def reclaim_stale(state):
166
- now = time.time()
 
167
  for name, info in state["shards"].items():
168
  if info["status"] == "claimed" and info.get("claimed_at"):
169
  if now - info["claimed_at"] > WORKER_TIMEOUT:
@@ -171,50 +147,29 @@ def reclaim_stale(state):
171
  info["status"] = "pending"
172
  info["worker"] = None
173
  info["claimed_at"] = None
174
- save_state(state)
175
-
176
- # ── Split parquet into chunks ─────────────────────────────────────────────────
177
- def split_parquet(src_path, name, text_col):
178
- pf = pq.ParquetFile(src_path)
179
- chunk_paths = []
180
- chunk_idx = 0
181
- current = []
182
-
183
- for batch in pf.iter_batches(batch_size=10_000, columns=[text_col]):
184
- current.append(batch)
185
- if sum(len(b) for b in current) >= ROWS_PER_CHUNK:
186
- chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
187
- chunk_path = Path(RAW_DIR) / chunk_name
188
- table = pa.Table.from_batches(current)
189
- pq.write_table(table, chunk_path)
190
- print(f" βœ“ {chunk_name} ({len(table):,} rows)")
191
- chunk_paths.append((chunk_name, text_col, "parquet"))
192
- chunk_idx += 1
193
- current = []
194
- del table; gc.collect()
195
-
196
- if current:
197
- chunk_name = name.replace(".parquet", f"_chunk{chunk_idx:03d}.parquet")
198
- chunk_path = Path(RAW_DIR) / chunk_name
199
- table = pa.Table.from_batches(current)
200
- pq.write_table(table, chunk_path)
201
- print(f" βœ“ {chunk_name} ({len(table):,} rows)")
202
- chunk_paths.append((chunk_name, text_col, "parquet"))
203
- del table; gc.collect()
204
-
205
- return chunk_paths
206
 
207
- def copy_jsonl(src_path, name):
208
- """JSONL files are small enough to use directly β€” just copy to raw dir."""
209
- import shutil
210
- dst = Path(RAW_DIR) / name
211
- shutil.copy2(src_path, dst)
212
- return [(name, "text", "jsonl")]
 
 
 
 
 
 
 
 
 
213
 
214
  # ── Download loop ─────────────────────────────────────────────────────────────
215
  def download_loop(state):
216
  while True:
217
- # Reload state
218
  try:
219
  with open(STATE_FILE) as f:
220
  fresh = json.load(f)
@@ -245,55 +200,51 @@ def download_loop(state):
245
  source = entry["source"]
246
  text_col = entry["text_col"]
247
  fmt = entry.get("fmt", "parquet")
248
- ext = ".jsonl" if fmt == "jsonl" else ".parquet"
249
- name = friendly_name(source, url)
250
- if not name.endswith(ext):
251
- name = name.split(".")[0] + ext
252
- raw_path = Path(RAW_DIR) / name
253
- tmp_path = Path(RAW_DIR) / f"{name}.tmp"
254
 
255
- print(f" Downloading: {source} | {url.split('/')[-1].split('?')[0]}")
 
 
 
 
 
256
  try:
257
  resp = requests.get(url, headers=AUTH_HEADERS, timeout=300, stream=True)
258
  resp.raise_for_status()
259
  with open(tmp_path, "wb") as f:
260
  for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
261
  f.write(chunk)
262
- tmp_path.rename(raw_path)
263
  except Exception as e:
264
  print(f" βœ— Download failed: {e} β€” retrying in 30s")
265
  tmp_path.unlink(missing_ok=True)
266
  time.sleep(30)
267
  continue
268
 
269
- print(f" Processing: {name}")
270
- try:
271
- if fmt == "parquet":
272
- chunks = split_parquet(raw_path, name, text_col)
273
- else:
274
- chunks = copy_jsonl(raw_path, name)
275
- except Exception as e:
276
- print(f" βœ— Processing failed: {e}")
277
- raw_path.unlink(missing_ok=True)
278
- time.sleep(30)
279
- continue
 
 
 
280
 
281
- raw_path.unlink(missing_ok=True)
282
  state["queue"].pop(0)
283
-
284
- for chunk_name, col, chunk_fmt in chunks:
285
- state["shards"][chunk_name] = {
286
- "status" : "pending",
287
- "url" : url,
288
- "source" : source,
289
- "text_col" : col,
290
- "fmt" : chunk_fmt,
291
- "worker" : None,
292
- "claimed_at": None,
293
- "error" : None,
294
- }
295
  save_state(state)
296
- print(f" βœ“ {len(chunks)} chunks ready from {name}")
297
  time.sleep(3)
298
 
299
  # ── Monitor ───────────────────────────────────────────────────────────────────
@@ -311,7 +262,6 @@ def monitor_loop():
311
  total = len(shards) + len(queue)
312
  pct = (done / total * 100) if total else 0
313
 
314
- # Per-source breakdown
315
  src_done = {}
316
  for v in shards.values():
317
  src = v.get("source", "?")
 
3
  import time
4
  import socket
5
  import threading
 
6
  import requests
7
  import pyarrow.parquet as pq
 
8
  import gc
9
  from pathlib import Path
10
  from huggingface_hub import HfApi
 
14
  RAW_DIR = "/data/raw"
15
  STATE_FILE = "/data/state.json"
16
  WORKER_TIMEOUT = 600
17
+ MAX_BUFFERED = 999999
 
18
 
19
  os.makedirs(RAW_DIR, exist_ok=True)
20
+ api = HfApi(token=HF_TOKEN)
 
21
  AUTH_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"}
22
 
23
  # ── Sources ───────────────────────────────────────────────────────────────────
 
 
 
24
  SOURCES = [
25
  {
26
  "name" : "fineweb",
27
  "type" : "hf_list",
28
  "repo" : "HuggingFaceFW/fineweb-edu",
29
  "prefix" : "data/CC-MAIN-2025-26",
30
+ "skip" : 5,
31
  "take" : 10,
32
  "text_col": "text",
33
  },
 
36
  "type" : "hf_list",
37
  "repo" : "wikimedia/wikipedia",
38
  "prefix" : "20231101.en/train-",
39
+ "skip" : 2,
40
  "take" : 18,
41
  "text_col": "text",
42
  },
 
49
  "take" : 6,
50
  "text_col": "text",
51
  },
 
 
 
 
 
 
 
 
 
 
 
52
  {
53
  "name" : "code",
54
  "type" : "url_list",
 
 
 
 
 
 
 
 
 
55
  "text_col": "text",
56
  "fmt" : "jsonl",
57
+ "urls" : [
58
+ f"https://huggingface.co/datasets/Neon-tech/Dataset-arranger/resolve/main/by-language/{lang}/shard_{str(i).zfill(6)}.jsonl?download=true"
59
+ for lang in ["C", "C++", "Java", "Go", "Rust", "Ruby", "PHP", "SQL", "C#", "Scala", "Lua", "Perl"]
60
+ for i in range(2)
61
+ ],
62
  },
63
  ]
64
 
 
74
  conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
75
  conn.close()
76
 
 
 
 
 
 
 
77
  # ── State ─────────────────────────────────────────────────────────────────────
78
  def load_state():
79
  if os.path.exists(STATE_FILE):
 
96
  json.dump(state, f, indent=2)
97
  os.replace(tmp, STATE_FILE)
98
 
99
+ # ── Discover ──────────────────────────────────────────────────────────────────
100
  def discover_all(state):
101
+ known_urls = {v["url"] for v in state["shards"].values()} | {e["url"] for e in state.get("queue", [])}
102
+ new_count = 0
103
 
104
  for src in SOURCES:
105
  name = src["name"]
106
  print(f"\nDiscovering: {name}")
107
 
108
  if src["type"] == "hf_list":
109
+ all_files = sorted([
110
+ f for f in api.list_repo_files(src["repo"], repo_type="dataset")
111
+ if f.startswith(src["prefix"]) and f.endswith(".parquet")
112
+ ])
113
+ selected = all_files[src["skip"]: src["skip"] + src["take"]]
114
+ base_url = f"https://huggingface.co/datasets/{src['repo']}/resolve/main/"
115
+ urls = [base_url + f for f in selected]
116
+ fmt = "parquet"
117
  else:
118
  urls = src["urls"]
119
  fmt = src.get("fmt", "parquet")
120
 
121
+ added = 0
122
  for url in urls:
123
+ if url not in known_urls:
124
  state["queue"].append({
125
  "url" : url,
126
  "source" : name,
127
  "text_col" : src["text_col"],
128
  "fmt" : fmt,
129
  })
130
+ known_urls.add(url)
131
  new_count += 1
132
+ added += 1
133
 
134
+ print(f" {name}: {len(urls)} files | {added} new added to queue")
135
 
136
  save_state(state)
137
  print(f"\nTotal queued: {len(state['queue'])} | In state: {len(state['shards'])}")
138
 
139
  # ── Reclaim stale ─────────────────────────────────────────────────────────────
140
  def reclaim_stale(state):
141
+ now = time.time()
142
+ reclaimed = 0
143
  for name, info in state["shards"].items():
144
  if info["status"] == "claimed" and info.get("claimed_at"):
145
  if now - info["claimed_at"] > WORKER_TIMEOUT:
 
147
  info["status"] = "pending"
148
  info["worker"] = None
149
  info["claimed_at"] = None
150
+ reclaimed += 1
151
+ if reclaimed:
152
+ save_state(state)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
153
 
154
+ # ── Parquet β†’ JSONL ───────────────────────────────────────────────────────────
155
+ def parquet_to_jsonl(parquet_path, jsonl_path, text_col):
156
+ """Stream parquet batch by batch β†’ write one JSON line per doc. No full load."""
157
+ pf = pq.ParquetFile(parquet_path)
158
+ n_written = 0
159
+ with open(jsonl_path, "w", encoding="utf-8") as out:
160
+ for batch in pf.iter_batches(batch_size=1_000, columns=[text_col]):
161
+ texts = batch.column(text_col).to_pylist()
162
+ for text in texts:
163
+ if text and isinstance(text, str) and text.strip():
164
+ out.write(json.dumps({"text": text.strip()}, ensure_ascii=False) + "\n")
165
+ n_written += 1
166
+ del texts
167
+ gc.collect()
168
+ return n_written
169
 
170
  # ── Download loop ─────────────────────────────────────────────────────────────
171
  def download_loop(state):
172
  while True:
 
173
  try:
174
  with open(STATE_FILE) as f:
175
  fresh = json.load(f)
 
200
  source = entry["source"]
201
  text_col = entry["text_col"]
202
  fmt = entry.get("fmt", "parquet")
 
 
 
 
 
 
203
 
204
+ base_name = url.split("?")[0].split("/")[-1].replace(".parquet", "").replace(".jsonl", "")
205
+ shard_name = f"{source}__{base_name}.jsonl"
206
+ jsonl_path = Path(RAW_DIR) / shard_name
207
+ tmp_path = Path(RAW_DIR) / f"{shard_name}.tmp"
208
+
209
+ print(f" Downloading: {source} | {base_name}")
210
  try:
211
  resp = requests.get(url, headers=AUTH_HEADERS, timeout=300, stream=True)
212
  resp.raise_for_status()
213
  with open(tmp_path, "wb") as f:
214
  for chunk in resp.iter_content(chunk_size=8 * 1024 * 1024):
215
  f.write(chunk)
 
216
  except Exception as e:
217
  print(f" βœ— Download failed: {e} β€” retrying in 30s")
218
  tmp_path.unlink(missing_ok=True)
219
  time.sleep(30)
220
  continue
221
 
222
+ if fmt == "parquet":
223
+ print(f" Converting β†’ jsonl: {shard_name}")
224
+ try:
225
+ n = parquet_to_jsonl(tmp_path, jsonl_path, text_col)
226
+ tmp_path.unlink(missing_ok=True)
227
+ print(f" βœ“ {n:,} docs")
228
+ except Exception as e:
229
+ print(f" βœ— Convert failed: {e}")
230
+ tmp_path.unlink(missing_ok=True)
231
+ jsonl_path.unlink(missing_ok=True)
232
+ time.sleep(30)
233
+ continue
234
+ else:
235
+ tmp_path.rename(jsonl_path)
236
 
 
237
  state["queue"].pop(0)
238
+ state["shards"][shard_name] = {
239
+ "status" : "pending",
240
+ "url" : url,
241
+ "source" : source,
242
+ "worker" : None,
243
+ "claimed_at": None,
244
+ "error" : None,
245
+ }
 
 
 
 
246
  save_state(state)
247
+ print(f" βœ“ Ready: {shard_name}")
248
  time.sleep(3)
249
 
250
  # ── Monitor ───────────────────────────────────────────────────────────────────
 
262
  total = len(shards) + len(queue)
263
  pct = (done / total * 100) if total else 0
264
 
 
265
  src_done = {}
266
  for v in shards.values():
267
  src = v.get("source", "?")