Neon-tech commited on
Commit
7845b2b
Β·
verified Β·
1 Parent(s): 83beb12

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +200 -5
app.py CHANGED
@@ -1,7 +1,202 @@
1
- import shutil
 
 
 
 
 
 
 
2
  from pathlib import Path
 
 
3
 
4
- for folder in ["/data/by-language", "/data/progress_state.json"]:
5
- p = Path(folder)
6
- if p.exists():
7
- shutil.rmtree(p)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import socket
5
+ import threading
6
+ import io
7
+ import requests
8
+ import pandas as pd
9
  from pathlib import Path
10
+ from tokenizers import Tokenizer
11
+ from huggingface_hub import HfApi
12
 
13
+ # ── Config ───────────────────────────────────────────────────────────────────
14
+ HF_TOKEN = os.environ.get("HF_TOKEN")
15
+ DATASET_REPO = "Neon-coding/github-code-raw"
16
+ TOK_PATH = "/data/tokenizer.json"
17
+ OUT_DIR = "/data/by-language"
18
+ STATE_FILE = "/data/progress_state.json"
19
+ TOTAL_PARQUETS = 880
20
+ SHARD_TOKENS = 50_000_000 # 50M tokens per shard
21
+
22
+ PARQUET_URL = (
23
+ "https://huggingface.co/datasets/codeparrot/github-code-clean"
24
+ "/resolve/main/data/train-{i:05d}-of-00880.parquet"
25
+ )
26
+
27
+ os.makedirs(OUT_DIR, exist_ok=True)
28
+
29
+ api = HfApi(token=HF_TOKEN)
30
+
31
+ # ── Port 7860 β€” keeps Space green ────────────────────────────────────────────
32
+ def serve():
33
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
34
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
35
+ s.bind(("0.0.0.0", 7860))
36
+ s.listen(5)
37
+ print("βœ“ Listening on port 7860")
38
+ while True:
39
+ conn, _ = s.accept()
40
+ conn.send(b"HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK")
41
+ conn.close()
42
+
43
+ # ── State ────────────────────────────────────────────────────────────────────
44
+ def load_state():
45
+ if os.path.exists(STATE_FILE):
46
+ with open(STATE_FILE) as f:
47
+ state = json.load(f)
48
+ print(f"Resuming β€” {len(state['done'])} / {TOTAL_PARQUETS} parquets done")
49
+ else:
50
+ state = {
51
+ "done": [],
52
+ "lang_shards": {},
53
+ "lang_tokens": {},
54
+ }
55
+ print("Starting fresh")
56
+ return state
57
+
58
+ def save_state(state, retries=3, delay=5):
59
+ for attempt in range(retries):
60
+ try:
61
+ with open(STATE_FILE, "w") as f:
62
+ json.dump(state, f, indent=2)
63
+ return
64
+ except OSError as e:
65
+ print(f" ⚠ State save attempt {attempt + 1} failed: {e}")
66
+ if attempt < retries - 1:
67
+ time.sleep(delay)
68
+ print(" βœ— State save failed after all retries β€” continuing")
69
+
70
+ # ── Shard buffers β€” global per language, persist across parquets ─────────────
71
+ buffers = {}
72
+
73
+ def get_buffer(lang):
74
+ if lang not in buffers:
75
+ buffers[lang] = {"rows": [], "token_count": 0}
76
+ return buffers[lang]
77
+
78
+ def flush_shard(lang, rows, state):
79
+ shard_idx = state["lang_shards"].get(lang, 0)
80
+ lang_dir = Path(OUT_DIR) / lang
81
+ lang_dir.mkdir(parents=True, exist_ok=True)
82
+ shard_name = f"shard_{shard_idx:06d}.jsonl"
83
+ shard_path = lang_dir / shard_name
84
+
85
+ with open(shard_path, "w", encoding="utf-8") as f:
86
+ for row in rows:
87
+ f.write(json.dumps(row, ensure_ascii=False) + "\n")
88
+
89
+ tok_in_shard = sum(r["token_count"] for r in rows)
90
+ state["lang_shards"][lang] = shard_idx + 1
91
+ state["lang_tokens"][lang] = state["lang_tokens"].get(lang, 0) + tok_in_shard
92
+ print(f" βœ“ {lang}/{shard_name} | {len(rows)} samples | {tok_in_shard:,} tokens")
93
+
94
+ # ── Main processing loop ─────────────────────────────────────────────────────
95
+ def process(tokenizer, state):
96
+ for i in range(TOTAL_PARQUETS):
97
+ if i in state["done"]:
98
+ print(f"[{i:06d}/{TOTAL_PARQUETS}] SKIP")
99
+ continue
100
+
101
+ url = PARQUET_URL.format(i=i)
102
+ print(f"[{i:06d}/{TOTAL_PARQUETS}] Downloading...")
103
+
104
+ try:
105
+ resp = requests.get(
106
+ url,
107
+ headers={"Authorization": f"Bearer {HF_TOKEN}"},
108
+ timeout=180,
109
+ )
110
+ resp.raise_for_status()
111
+ df = pd.read_parquet(io.BytesIO(resp.content))
112
+ except Exception as e:
113
+ print(f"[{i:06d}] Download error: {e} β€” skipping")
114
+ continue
115
+
116
+ print(f"[{i:06d}] {len(df):,} rows | {df['language'].nunique()} languages")
117
+
118
+ # row by row β€” constant memory
119
+ for row_tuple in df.itertuples(index=False):
120
+ lang = row_tuple.language
121
+ text = row_tuple.code if row_tuple.code else ""
122
+ repo = row_tuple.repo_name
123
+ fpath = row_tuple.path
124
+ lic = row_tuple.license
125
+
126
+ if not text.strip():
127
+ continue
128
+
129
+ enc = tokenizer.encode(text)
130
+ token_count = len(enc.ids)
131
+
132
+ if token_count < 2:
133
+ continue
134
+
135
+ buf = get_buffer(lang)
136
+ row = {
137
+ "text": text,
138
+ "token_count": token_count,
139
+ "repo": repo,
140
+ "path": fpath,
141
+ "license": lic,
142
+ }
143
+
144
+ if buf["token_count"] + token_count > SHARD_TOKENS and buf["rows"]:
145
+ flush_shard(lang, buf["rows"], state)
146
+ save_state(state)
147
+ buf["rows"] = []
148
+ buf["token_count"] = 0
149
+
150
+ buf["rows"].append(row)
151
+ buf["token_count"] += token_count
152
+
153
+ del df
154
+
155
+ state["done"].append(i)
156
+ save_state(state)
157
+ print(f"[{i:06d}] βœ“ Complete")
158
+
159
+ # ── Flush remaining partial shards ────────────────────────────────────────
160
+ print("\nFlushing remaining buffers...")
161
+ for lang, buf in buffers.items():
162
+ if buf["rows"]:
163
+ flush_shard(lang, buf["rows"], state)
164
+ save_state(state)
165
+
166
+ # ── Write meta.json per language ──────────────────────────────────────────
167
+ print("\nWriting meta.json per language...")
168
+ for lang in state["lang_tokens"]:
169
+ meta = {
170
+ "language": lang,
171
+ "total_tokens": state["lang_tokens"][lang],
172
+ "total_shards": state["lang_shards"].get(lang, 0),
173
+ }
174
+ meta_path = Path(OUT_DIR) / lang / "meta.json"
175
+ with open(meta_path, "w") as f:
176
+ json.dump(meta, f, indent=2)
177
+ print(f" {lang}: {meta['total_tokens']:,} tokens | {meta['total_shards']} shards")
178
+
179
+ # ── Push everything to HF dataset repo ───────────────────────────────────
180
+ print(f"\nPushing to {DATASET_REPO}...")
181
+ api.upload_folder(
182
+ folder_path=OUT_DIR,
183
+ repo_id=DATASET_REPO,
184
+ repo_type="dataset",
185
+ token=HF_TOKEN,
186
+ )
187
+ print("\nβœ“ All done!")
188
+
189
+ # ── Entry point ──────────────────────────────────────────────────────────────
190
+ if __name__ == "__main__":
191
+ threading.Thread(target=serve, daemon=True).start()
192
+
193
+ print("βœ“ Loading tokenizer from /data/tokenizer.json...")
194
+ tokenizer = Tokenizer.from_file(TOK_PATH)
195
+ print(f"βœ“ Tokenizer loaded | vocab: {tokenizer.get_vocab_size():,}")
196
+
197
+ state = load_state()
198
+
199
+ threading.Thread(target=process, args=(tokenizer, state), daemon=True).start()
200
+
201
+ while True:
202
+ time.sleep(60)