File size: 4,076 Bytes
e9b8340 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# 04_clone_and_extract.py
# Ekstrakcja Dockerfile – wersja v3 (bez parsera, z poprawnym zapisem JSONL)
import json
import shutil
import hashlib
from pathlib import Path
from git import Repo
from datetime import datetime
import argparse
# === Ścieżki
REPO_LIST_PATH = Path("data/metadata/repos_filtered.json")
CLONE_DIR = Path("temp_repos")
OUTPUT_FILE = Path("data/raw/dockerfiles.jsonl")
OUTPUT_FILE.parent.mkdir(parents=True, exist_ok=True)
REMOVE_DIRS = [".git", ".github", "docs", "tests", "__pycache__", ".idea", ".vscode"]
def clean_repo(path: Path):
for d in REMOVE_DIRS:
shutil.rmtree(path / d, ignore_errors=True)
def compute_sha1(text: str) -> str:
return hashlib.sha1(text.encode("utf-8")).hexdigest()
def is_valid_dockerfile(path: Path) -> bool:
try:
text = path.read_text(encoding="utf-8").strip()
lines = [l.strip().lower() for l in text.splitlines() if l.strip()]
if len(lines) < 5 or path.stat().st_size > 200_000:
return False
top_lines = lines[:10]
has_from = any(l.startswith("from") for l in top_lines)
has_run = any(l.startswith(("run", "cmd", "copy")) for l in lines)
return has_from and has_run
except Exception as e:
print(f"⚠️ Błąd walidacji pliku {path}: {e}")
return False
def find_dockerfiles(repo_path: Path) -> list[Path]:
return [
f for f in repo_path.rglob("*")
if f.name.lower() == "dockerfile" and f.is_file()
]
def clone_repo(url: str, full_name: str) -> Path | None:
dest = CLONE_DIR / full_name.replace("/", "__")
if dest.exists():
print(f"⚠️ Repo {full_name} już istnieje – pomijam klonowanie.")
return dest
try:
print(f"⬇️ Klonuję {full_name}...")
Repo.clone_from(url, dest, depth=1)
clean_repo(dest)
return dest
except Exception as e:
print(f"❌ Błąd klonowania {full_name}: {e}")
return None
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--purge", action="store_true", help="Usuń repozytorium po ekstrakcji")
args = parser.parse_args()
with open(REPO_LIST_PATH) as f:
repos = json.load(f)
saved, skipped = 0, 0
seen_hashes = set()
with open(OUTPUT_FILE, "w", encoding="utf-8") as out_f:
for repo in repos:
full_name = repo["fullName"]
url = repo["url"]
repo_path = clone_repo(url, full_name)
if not repo_path:
continue
for file in find_dockerfiles(repo_path):
if not is_valid_dockerfile(file):
skipped += 1
continue
try:
lines = file.read_text(encoding="utf-8").strip().splitlines()
lines = [l.rstrip() for l in lines if l.strip()]
file_id = compute_sha1("\n".join(lines))
if file_id in seen_hashes:
skipped += 1
continue
seen_hashes.add(file_id)
json.dump({
"repo": full_name,
"path": str(file.relative_to(repo_path)),
"file_id": file_id,
"content": lines,
"size_bytes": file.stat().st_size,
"line_count": len(lines),
"valid": True,
"cloned_at": datetime.now().isoformat()
}, out_f)
out_f.write("\n")
saved += 1
except Exception as e:
print(f"⚠️ Błąd przy zapisie {file}: {e}")
skipped += 1
if args.purge:
shutil.rmtree(repo_path, ignore_errors=True)
print(f"\n✅ Zapisano {saved} poprawnych Dockerfile do {OUTPUT_FILE}")
print(f"🚫 Pominięto {skipped} plików (nieważne, błędne, zduplikowane)")
if __name__ == "__main__":
main()
|