Spaces:
Running
Running
import json | |
import os | |
from pathlib import Path | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from datasets import Dataset | |
# Tag dictionaries | |
DOMAIN_TAGS = { | |
"physics": "[PHYS]", | |
"biology": "[BIO]", | |
"materials": "[MAT]", | |
"education": "[GEN]", | |
} | |
TASK_TAGS = { | |
"hypothesis": "[HYP]", | |
"method": "[MTH]", | |
"experiment": "[EXP]", | |
} | |
SECTION_TAGS = { | |
"abstract": "[ABSTRACT]", | |
"introduction": "[INTRO]", | |
"results": "[RESULTS]", | |
"discussion": "[DISCUSSION]", | |
"conclusion": "[CONCLUSION]", | |
"method": "[MTH]", | |
"experiment": "[EXP]", | |
} | |
SRC_PATH = Path(r"C:\Users\kunya\PycharmProjects\DataVolt\Tokenization\scientific_corpus_325M.jsonl") | |
CLEANED_JSONL_PATH = Path("scientific_corpus_325M.cleaned.jsonl") | |
CLEANED_ARROW_PATH = Path("scientific_corpus_325M.cleaned.arrow") | |
CHUNK_SIZE = 10000 | |
MAX_WORKERS = os.cpu_count() or 4 | |
def tag_record(record): | |
# Tagging logic: add tags to text fields if domain/task/section present | |
# You may need to adjust keys based on your schema | |
domain = record.get("domain", "").lower() | |
task = record.get("task", "").lower() | |
section = record.get("section", "").lower() | |
text = record.get("full_text", "") | |
tags = [] | |
if domain in DOMAIN_TAGS: | |
tags.append(DOMAIN_TAGS[domain]) | |
if task in TASK_TAGS: | |
tags.append(TASK_TAGS[task]) | |
if section in SECTION_TAGS: | |
tags.append(SECTION_TAGS[section]) | |
# Prepend tags to text | |
record["tagged_text"] = " ".join(tags) + " " + text if tags else text | |
return record | |
def process_chunk(lines): | |
cleaned = [] | |
for line in lines: | |
try: | |
record = json.loads(line) | |
cleaned.append(tag_record(record)) | |
except Exception: | |
continue # skip malformed lines | |
return cleaned | |
def chunked_file_reader(path, chunk_size): | |
with open(path, "r", encoding="utf-8") as f: | |
chunk = [] | |
for line in f: | |
chunk.append(line) | |
if len(chunk) == chunk_size: | |
yield chunk | |
chunk = [] | |
if chunk: | |
yield chunk | |
def main(): | |
print("Starting cleaning process...") | |
# Write cleaned records to a new JSONL file in chunks | |
with open(CLEANED_JSONL_PATH, "w", encoding="utf-8") as out_f: | |
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | |
futures = [] | |
for chunk in chunked_file_reader(SRC_PATH, CHUNK_SIZE): | |
futures.append(executor.submit(process_chunk, chunk)) | |
for fut in as_completed(futures): | |
for record in fut.result(): | |
out_f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
print(f"Cleaned JSONL written to {CLEANED_JSONL_PATH}") | |
# Convert cleaned JSONL to Arrow using datasets (handles chunking internally) | |
print("Saving cleaned dataset to Arrow format...") | |
ds = Dataset.from_json(str(CLEANED_JSONL_PATH)) | |
ds.save_to_disk(str(CLEANED_ARROW_PATH)) | |
print(f"Saved cleaned Arrow dataset at: {CLEANED_ARROW_PATH}") | |
# Optionally, call hf_upload.py asynchronously | |
print("Uploading to HuggingFace using hf_upload.py ...") | |
os.system(f"python hf_upload.py") | |
if __name__ == "__main__": | |
main() |