| |
| """ |
| PPE Compliance Detection Training - FIXED VERSION |
| - Swaps opencv-python for opencv-python-headless before importing ultralytics |
| - Downloads keremberke dataset as ZIP files (script-based datasets no longer supported) |
| - Uses model.train() return value correctly (no results.best) |
| - Pushes best.pt to HuggingFace Hub after training |
| """ |
|
|
| import subprocess |
| import sys |
| import os |
|
|
| |
| print("[0/5] Swapping opencv-python for opencv-python-headless...") |
| subprocess.run([sys.executable, "-m", "pip", "uninstall", "-y", "opencv-python"], |
| capture_output=True) |
| subprocess.run([sys.executable, "-m", "pip", "install", "--quiet", "opencv-python-headless"], |
| capture_output=True) |
| print(" Done") |
|
|
| import zipfile |
| import shutil |
| import json |
| from pathlib import Path |
| from huggingface_hub import hf_hub_download, HfApi |
| from PIL import Image |
| import yaml |
|
|
| HF_USERNAME = "baskarmother" |
| MODEL_ID = "yolov8s-ppe-construction-v2" |
| DATASET_DIR = Path("/app/combined_ppe_dataset") |
| EPOCHS = 150 |
| IMG_SIZE = 640 |
| BATCH = 16 |
| DEVICE = "0" |
|
|
| UNIFIED_CLASSES = [ |
| "person", "helmet", "vest", "mask", "gloves", |
| "safety_shoe", "goggles", "no_helmet", "no_mask", |
| "no_vest", "head", "barricade", "dumpster", |
| "excavators", "safety_net", "dump_truck", "truck", "wheel_loader", |
| ] |
|
|
|
|
| def download_ppe_dataset(): |
| print("[1/5] Downloading 51ddhesh/PPE_Detection...") |
| zip_path = hf_hub_download( |
| repo_id="51ddhesh/PPE_Detection", |
| filename="PPE.zip", |
| repo_type="dataset", |
| cache_dir="/app/hf_cache", |
| local_dir="/app/downloads", |
| ) |
| extract_dir = Path("/app/downloads/ppe_dataset") |
| extract_dir.mkdir(parents=True, exist_ok=True) |
| with zipfile.ZipFile(zip_path, 'r') as zf: |
| zf.extractall(extract_dir) |
| print(f" Extracted to {extract_dir}") |
| return extract_dir |
|
|
|
|
| def download_keremberke_dataset(): |
| print("[2/5] Downloading keremberke/construction-safety-object-detection...") |
| download_dir = Path("/app/downloads/keremberke") |
| download_dir.mkdir(parents=True, exist_ok=True) |
|
|
| for split_file in ["data/train.zip", "data/valid.zip", "data/test.zip"]: |
| try: |
| path = hf_hub_download( |
| repo_id="keremberke/construction-safety-object-detection", |
| filename=split_file, |
| repo_type="dataset", |
| cache_dir="/app/hf_cache", |
| local_dir=str(download_dir), |
| ) |
| extract_to = download_dir / split_file.replace("data/", "").replace(".zip", "") |
| extract_to.mkdir(parents=True, exist_ok=True) |
| with zipfile.ZipFile(path, 'r') as zf: |
| zf.extractall(extract_to) |
| print(f" Downloaded and extracted {split_file}") |
| except Exception as e: |
| print(f" Warning: Could not download {split_file}: {e}") |
|
|
| return download_dir |
|
|
|
|
| def convert_keremberke_to_yolo(raw_dir: Path, output_dir: Path): |
| print("[3/5] Converting keremberke dataset to YOLO format...") |
|
|
| class_map = { |
| "person": 0, "hardhat": 1, "mask": 3, |
| "no-hardhat": 7, "no-mask": 8, "no-safety vest": 9, |
| "gloves": 4, "safety shoes": 5, "safety vest": 2, |
| "barricade": 11, "dumpster": 12, "excavators": 13, |
| "safety net": 14, "dump truck": 15, |
| "mini-van": 0, "truck": 16, "wheel loader": 17, |
| } |
|
|
| for split in ["train", "valid", "test"]: |
| images_dir = output_dir / split / "images" |
| labels_dir = output_dir / split / "labels" |
| images_dir.mkdir(parents=True, exist_ok=True) |
| labels_dir.mkdir(parents=True, exist_ok=True) |
|
|
| raw_split_dir = raw_dir / split |
| if not raw_split_dir.exists(): |
| print(f" WARNING: {raw_split_dir} not found, skipping") |
| continue |
|
|
| json_files = list(raw_split_dir.rglob("*.json")) |
| print(f" {split}: Found {len(json_files)} JSON files") |
|
|
| if not json_files: |
| img_files = [] |
| for ext in ["*.jpg", "*.jpeg", "*.png"]: |
| img_files.extend(raw_split_dir.rglob(ext)) |
| for img_path in img_files: |
| shutil.copy2(img_path, images_dir / f"keremberke_{img_path.name}") |
| print(f" {split}: Copied {len(img_files)} images (no labels)") |
| continue |
|
|
| for coco_file in json_files: |
| with open(coco_file) as f: |
| coco_data = json.load(f) |
|
|
| image_id_to_file = {} |
| image_id_to_size = {} |
| for img in coco_data.get("images", []): |
| image_id_to_file[img["id"]] = img["file_name"] |
| image_id_to_size[img["id"]] = (img.get("width", 640), img.get("height", 640)) |
|
|
| cat_id_to_name = {} |
| for cat in coco_data.get("categories", []): |
| cat_id_to_name[cat["id"]] = cat["name"] |
|
|
| anns_by_img = {} |
| for ann in coco_data.get("annotations", []): |
| anns_by_img.setdefault(ann["image_id"], []).append(ann) |
|
|
| all_images = {} |
| for ext in ["*.jpg", "*.jpeg", "*.png"]: |
| for p in raw_split_dir.rglob(ext): |
| all_images[p.name] = p |
|
|
| processed = 0 |
| for img_id, filename in image_id_to_file.items(): |
| img_path = all_images.get(filename) |
| if not img_path: |
| continue |
|
|
| out_name = f"keremberke_{filename}" |
| shutil.copy2(img_path, images_dir / out_name) |
|
|
| w, h = image_id_to_size.get(img_id, (640, 640)) |
| label_path = labels_dir / f"{out_name.rsplit('.', 1)[0]}.txt" |
|
|
| with open(label_path, "w") as f: |
| for ann in anns_by_img.get(img_id, []): |
| cat_name = cat_id_to_name.get(ann["category_id"], "") |
| if cat_name not in class_map: |
| continue |
| cls = class_map[cat_name] |
| x, y, bw, bh = ann["bbox"] |
| xc = (x + bw / 2) / w |
| yc = (y + bh / 2) / h |
| nw = bw / w |
| nh = bh / h |
| xc = max(0, min(1, xc)) |
| yc = max(0, min(1, yc)) |
| nw = max(0, min(1, nw)) |
| nh = max(0, min(1, nh)) |
| f.write(f"{cls} {xc:.6f} {yc:.6f} {nw:.6f} {nh:.6f}\n") |
| processed += 1 |
|
|
| print(f" {split}: Processed {processed} images from {coco_file.name}") |
|
|
| print(f" Converted to {output_dir}") |
|
|
|
|
| def merge_datasets(ppe_extract_dir: Path, keremberke_dir: Path, output_dir: Path): |
| print("[4/5] Merging datasets...") |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| ppe_dir = None |
| for candidate in [ppe_extract_dir / "PPE", ppe_extract_dir / "ppe", ppe_extract_dir]: |
| if (candidate / "train" / "images").exists(): |
| ppe_dir = candidate |
| break |
|
|
| if ppe_dir is None: |
| print(" ERROR: Could not find PPE dataset structure") |
| os._exit(1) |
|
|
| print(f" Found PPE dataset at: {ppe_dir}") |
|
|
| ppe_class_map = {0: 2, 1: 5, 2: 3, 3: 1, 4: 6, 5: 4} |
|
|
| for split in ["train", "valid", "test"]: |
| out_images = output_dir / split / "images" |
| out_labels = output_dir / split / "labels" |
| out_images.mkdir(parents=True, exist_ok=True) |
| out_labels.mkdir(parents=True, exist_ok=True) |
|
|
| ppe_images = ppe_dir / split / "images" |
| ppe_labels = ppe_dir / split / "labels" |
| if ppe_images.exists(): |
| for img_file in sorted(ppe_images.iterdir()): |
| if img_file.suffix.lower() not in [".jpg", ".jpeg", ".png"]: |
| continue |
| shutil.copy2(img_file, out_images / f"ppe_{img_file.name}") |
| label_file = ppe_labels / f"{img_file.stem}.txt" |
| if label_file.exists(): |
| with open(label_file) as f: |
| lines = f.readlines() |
| remapped = [] |
| for line in lines: |
| parts = line.strip().split() |
| if len(parts) < 5: |
| continue |
| src_cls = int(parts[0]) |
| if src_cls in ppe_class_map: |
| remapped.append(f"{ppe_class_map[src_cls]} {' '.join(parts[1:])}\n") |
| out_label = out_labels / f"ppe_{img_file.stem}.txt" |
| with open(out_label, "w") as f: |
| f.writelines(remapped) |
|
|
| k_images = keremberke_dir / split / "images" |
| k_labels = keremberke_dir / split / "labels" |
| if k_images.exists(): |
| for img_file in sorted(k_images.iterdir()): |
| shutil.copy2(img_file, out_images / img_file.name) |
| for label_file in sorted(k_labels.iterdir()): |
| shutil.copy2(label_file, out_labels / label_file.name) |
|
|
| data_yaml = { |
| "path": str(output_dir.absolute()), |
| "train": "train/images", |
| "val": "valid/images", |
| "test": "test/images", |
| "names": {i: name for i, name in enumerate(UNIFIED_CLASSES)}, |
| "nc": len(UNIFIED_CLASSES), |
| } |
| with open(output_dir / "data.yaml", "w") as f: |
| yaml.dump(data_yaml, f, default_flow_style=False) |
|
|
| for split in ["train", "valid", "test"]: |
| n = len(list((output_dir / split / "images").glob("*"))) |
| print(f" {split}: {n} images") |
|
|
|
|
| def train_model(data_yaml_path: Path): |
| print("[5/5] Training YOLOv8s...") |
| from ultralytics import YOLO |
|
|
| model = YOLO("yolov8s.pt") |
|
|
| model.train( |
| data=str(data_yaml_path), |
| epochs=EPOCHS, |
| imgsz=IMG_SIZE, |
| batch=BATCH, |
| device=DEVICE, |
| patience=30, |
| project="/app/runs", |
| name="ppe_improved", |
| exist_ok=True, |
| pretrained=True, |
| optimizer="SGD", |
| lr0=0.01, |
| lrf=0.01, |
| momentum=0.9, |
| weight_decay=0.0005, |
| augment=True, |
| mosaic=1.0, |
| hsv_h=0.015, |
| hsv_s=0.7, |
| hsv_v=0.4, |
| degrees=5.0, |
| translate=0.1, |
| scale=0.5, |
| shear=2.0, |
| perspective=0.0, |
| flipud=0.0, |
| fliplr=0.5, |
| ) |
|
|
| print(" Training complete!") |
| best_model = Path("/app/runs/ppe_improved/weights/best.pt") |
| print(f" Best model saved at: {best_model} (exists={best_model.exists()})") |
| return best_model |
|
|
|
|
| def push_to_hub(best_model_path: Path): |
| print("Pushing model to HuggingFace Hub...") |
| api = HfApi() |
| repo_id = f"{HF_USERNAME}/{MODEL_ID}" |
|
|
| try: |
| api.create_repo(repo_id=repo_id, repo_type="model", exist_ok=True) |
| except Exception as e: |
| print(f" Repo info: {e}") |
|
|
| api.upload_file( |
| path_or_fileobj=str(best_model_path), |
| path_in_repo="best.pt", |
| repo_id=repo_id, |
| repo_type="model", |
| ) |
|
|
| readme = f"""--- |
| license: cc-by-4.0 |
| library_name: ultralytics |
| tags: |
| - object-detection |
| - ppe |
| - construction-safety |
| - yolov8 |
| --- |
| |
| # {MODEL_ID} |
| |
| Improved PPE Compliance Detection Model for Construction Sites (v2) |
| |
| ## Classes ({len(UNIFIED_CLASSES)}) |
| {chr(10).join(f"- {i}: {name}" for i, name in enumerate(UNIFIED_CLASSES))} |
| |
| ## Usage |
| ```python |
| from ultralytics import YOLO |
| model = YOLO("hf://{repo_id}/best.pt") |
| results = model.predict("image.jpg") |
| ``` |
| |
| ## Training Details |
| - Base Model: YOLOv8s |
| - Epochs: {EPOCHS} |
| - Image Size: {IMG_SIZE}x{IMG_SIZE} |
| - Batch Size: {BATCH} |
| """ |
| api.upload_file( |
| path_or_fileobj=readme.encode(), |
| path_in_repo="README.md", |
| repo_id=repo_id, |
| repo_type="model", |
| ) |
| print(f" Model pushed to https://huggingface.co/{repo_id}") |
|
|
|
|
| def main(): |
| print("=" * 60) |
| print("IMPROVED PPE DETECTION TRAINING (FIXED)") |
| print("=" * 60) |
|
|
| ppe_dir = download_ppe_dataset() |
| keremberke_raw = download_keremberke_dataset() |
| keremberke_yolo = Path("/app/keremberke_yolo") |
| convert_keremberke_to_yolo(keremberke_raw, keremberke_yolo) |
| DATASET_DIR.mkdir(parents=True, exist_ok=True) |
| merge_datasets(ppe_dir, keremberke_yolo, DATASET_DIR) |
| best_model = train_model(DATASET_DIR / "data.yaml") |
|
|
| if best_model.exists(): |
| push_to_hub(best_model) |
| else: |
| print(f" WARNING: Best model not found at {best_model}") |
| for pt in Path("/app/runs").rglob("best.pt"): |
| push_to_hub(pt) |
| break |
|
|
| print("=" * 60) |
| print("DONE!") |
| print("=" * 60) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|