|
|
|
|
|
import json |
|
from pathlib import Path |
|
from collections import Counter |
|
from sklearn.model_selection import train_test_split |
|
from datasets import Dataset, DatasetDict |
|
from transformers import AutoTokenizer |
|
|
|
|
|
INPUT_PATH = Path("data/labeled/labeled_dockerfiles.jsonl") |
|
TOP_RULES_PATH = Path("data/metadata/top_rules.json") |
|
OUTPUT_DIR = Path("data/processed/dataset_multilabel_top30") |
|
TOKENIZER_NAME = "microsoft/codebert-base" |
|
MAX_LENGTH = 512 |
|
SEED = 42 |
|
|
|
def load_top_rules(): |
|
with open(TOP_RULES_PATH, encoding="utf-8") as f: |
|
return json.load(f) |
|
|
|
def build_dataset(records, top_rules): |
|
rule2id = {r: i for i, r in enumerate(top_rules)} |
|
data = [] |
|
for row in records: |
|
if row.get("label") != "bad": |
|
continue |
|
|
|
triggered = row.get("rules_triggered", []) |
|
multilabel = [0] * len(top_rules) |
|
matched = False |
|
|
|
for rule in triggered: |
|
if rule in rule2id: |
|
multilabel[rule2id[rule]] = 1 |
|
matched = True |
|
|
|
if not matched: |
|
continue |
|
|
|
data.append({ |
|
"text": "\n".join(row["content"]) if isinstance(row["content"], list) else str(row["content"]), |
|
"labels": multilabel, |
|
"meta_lines": row.get("lines", {}), |
|
"meta_fixes": row.get("fixes", {}) |
|
}) |
|
|
|
return data |
|
|
|
def main(): |
|
print("📥 Wczytywanie danych...") |
|
top_rules = load_top_rules() |
|
print(f"🔝 Top {len(top_rules)} reguł: {top_rules}") |
|
|
|
with INPUT_PATH.open(encoding="utf-8") as f: |
|
records = [json.loads(line) for line in f if line.strip()] |
|
|
|
dataset = build_dataset(records, top_rules) |
|
print(f"📦 Zbudowano {len(dataset)} przykładów multilabel.") |
|
|
|
if not dataset: |
|
print("❌ Brak danych do przetworzenia. Sprawdź dane wejściowe.") |
|
return |
|
|
|
print("🔀 Podział na train/val/test...") |
|
train_val, test = train_test_split(dataset, test_size=0.1, random_state=SEED) |
|
train, val = train_test_split(train_val, test_size=0.1111, random_state=SEED) |
|
|
|
ds = DatasetDict({ |
|
"train": Dataset.from_list(train), |
|
"validation": Dataset.from_list(val), |
|
"test": Dataset.from_list(test), |
|
}) |
|
|
|
print("🔤 Tokenizacja...") |
|
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_NAME) |
|
|
|
def tokenize_function(batch): |
|
texts = [str(x) if x is not None else "" for x in batch["text"]] |
|
return tokenizer( |
|
texts, |
|
padding="max_length", |
|
truncation=True, |
|
max_length=MAX_LENGTH |
|
) |
|
|
|
ds_tokenized = ds.map( |
|
tokenize_function, |
|
batched=True, |
|
remove_columns=["text", "meta_lines", "meta_fixes"] |
|
) |
|
|
|
print(f"💾 Zapisuję do: {OUTPUT_DIR}") |
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
ds_tokenized.save_to_disk(str(OUTPUT_DIR)) |
|
|
|
print("✅ Gotowe.") |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|