Spaces:
Paused
Paused
import os | |
import random | |
from concurrent.futures import ProcessPoolExecutor | |
from pathlib import Path | |
import json | |
from PIL import Image | |
import numpy as np | |
import argparse | |
from tqdm import tqdm | |
# μΈμ νμ± | |
parser = argparse.ArgumentParser(description="Dataset creation for image colorization") | |
parser.add_argument("--source_dir", type=str, required=True, help="Source directory") | |
parser.add_argument( | |
"--target_dir", type=str, required=True, help="Target directory for the dataset" | |
) | |
parser.add_argument( | |
"--resolution", type=int, default=512, help="Resolution for the dataset" | |
) | |
args = parser.parse_args() | |
# κ²½λ‘ μ€μ | |
root_dir = Path("E:/datasets") | |
target_dir = root_dir / args.target_dir | |
source_dir = root_dir / args.source_dir | |
target_images_dir = target_dir / "images" | |
target_conditioning_dir = target_dir / "conditioning_images" | |
metadata_file = target_dir / "metadata.jsonl" | |
# λλ ν 리 μμ± | |
target_dir.mkdir(parents=True, exist_ok=True) | |
target_images_dir.mkdir(exist_ok=True) | |
target_conditioning_dir.mkdir(exist_ok=True) | |
# ν둬ννΈ λͺ©λ‘ | |
prompts = [ | |
"a color image, realistic style, photo", | |
"a color image, high resolution, realistic, painting", | |
"a color image, high resolution, realistic, photo", | |
"very good quality, absurd, photo, color, 4k image", | |
"high resolution, color, photo, realistic", | |
"high resolution, color, photo, realistic, 4k image", | |
"a color image, high resolution, realistic, 4k image", | |
"color, high resolution, photo, realistic", | |
"512x512, color, photo, realistic", | |
] | |
def process_image(image_path): | |
try: | |
# μ΄λ―Έμ§ λ‘λ λ° ν¬λ‘ | |
with Image.open(image_path) as img: | |
# μ΄λ―Έμ§ ν¬κΈ° νμΈ | |
width, height = img.size | |
size = min(width, height) | |
left = (width - size) // 2 | |
top = (height - size) // 2 | |
right = left + size | |
bottom = top + size | |
# ν¬λ‘ λ° λ¦¬μ¬μ΄μ¦ | |
img_cropped = img.crop((left, top, right, bottom)).resize( | |
(args.resolution, args.resolution), Image.LANCZOS | |
) | |
# κ·Έλ μ΄μ€μΌμΌ λ³ν | |
img_gray = img_cropped.convert("L") | |
# νμΌλͺ μμ± | |
filename = image_path.stem + ".jpg" | |
# μ΄λ―Έμ§ μ μ₯ | |
img_cropped.save(target_images_dir / filename) | |
img_gray.save(target_conditioning_dir / filename) | |
# λ©νλ°μ΄ν° μμ± | |
metadata = { | |
"image": str(filename), | |
"text": random.choice(prompts), | |
"conditioning_image": str(filename), | |
} | |
return metadata | |
except Exception as e: | |
print(f"Error processing {image_path}: {e}") | |
return None | |
def generate_dataset_loader(target_dir): | |
# λμ λλ ν 리μ μ΄λ¦μ κ°μ Έμ΅λλ€ | |
dir_name = target_dir.name | |
# ν΄λμ€ μ΄λ¦μ μμ±ν©λλ€ (μ: ciff_dataset -> CiffDataset) | |
class_name = ''.join(word.capitalize() for word in dir_name.split('_')) | |
# νμΌ μ΄λ¦μ μμ±ν©λλ€ | |
file_name = f"{dir_name}.py" | |
# νμΌ κ²½λ‘λ₯Ό μμ±ν©λλ€ | |
file_path = target_dir / file_name | |
# λ°μ΄ν°μ λ‘λ μ½λλ₯Ό μμ±ν©λλ€ | |
code = f''' | |
import pandas as pd | |
from pathlib import Path | |
import datasets | |
import os | |
_VERSION = datasets.Version("0.0.2") | |
_DESCRIPTION = "TODO" | |
_HOMEPAGE = "TODO" | |
_LICENSE = "TODO" | |
_CITATION = "TODO" | |
_FEATURES = datasets.Features( | |
{{ | |
"image": datasets.Image(), | |
"conditioning_image": datasets.Image(), | |
"text": datasets.Value("string"), | |
}} | |
) | |
_DEFAULT_CONFIG = datasets.BuilderConfig(name="default", version=_VERSION) | |
class {class_name}(datasets.GeneratorBasedBuilder): | |
BUILDER_CONFIGS = [_DEFAULT_CONFIG] | |
DEFAULT_CONFIG_NAME = "default" | |
def _info(self): | |
return datasets.DatasetInfo( | |
description=_DESCRIPTION, | |
features=_FEATURES, | |
supervised_keys=None, | |
homepage=_HOMEPAGE, | |
license=_LICENSE, | |
citation=_CITATION, | |
) | |
def _split_generators(self, dl_manager): | |
base_path = Path(dl_manager._base_path) | |
metadata_path = base_path / "metadata.jsonl" | |
images_dir = base_path / "images" | |
conditioning_images_dir = base_path / "conditioning_images" | |
return [ | |
datasets.SplitGenerator( | |
name=datasets.Split.TRAIN, | |
gen_kwargs={{ | |
"metadata_path": metadata_path, | |
"images_dir": images_dir, | |
"conditioning_images_dir": conditioning_images_dir, | |
}}, | |
), | |
] | |
def _generate_examples(self, metadata_path, images_dir, conditioning_images_dir): | |
metadata = pd.read_json(metadata_path, lines=True) | |
for idx, row in metadata.iterrows(): | |
text = row["text"] | |
image_path = os.path.join(images_dir, row["image"]) | |
image = open(image_path, "rb").read() | |
conditioning_image_path = os.path.join(conditioning_images_dir, row["conditioning_image"]) | |
conditioning_image = open(conditioning_image_path, "rb").read() | |
yield idx, {{ | |
"text": text, | |
"image": {{ | |
"path": image_path, | |
"bytes": image, | |
}}, | |
"conditioning_image": {{ | |
"path": conditioning_image_path, | |
"bytes": conditioning_image, | |
}}, | |
}} | |
''' | |
# νμΌμ μμ±νκ³ μ½λλ₯Ό μμ±ν©λλ€ | |
with open(file_path, 'w') as f: | |
f.write(code) | |
print(f"λ°μ΄ν°μ λ‘λ νμΌμ΄ μμ±λμμ΅λλ€: {file_path}") | |
def main(): | |
# μ΄λ―Έμ§ νμΌ λͺ©λ‘ κ°μ Έμ€κΈ° | |
image_files = list(source_dir.glob("*")) | |
# νλ‘μΈμ€ μ μ€μ (CPU μ½μ΄ μ - 1) | |
num_workers = (3 * os.cpu_count()) // 4 | |
# λ©ν°νλ‘μΈμ± μ€ν | |
with ProcessPoolExecutor(max_workers=num_workers) as executor: | |
results = list(tqdm(executor.map(process_image, image_files), total=len(image_files), desc="Processing images")) | |
# λ©νλ°μ΄ν° μ μ₯ | |
with open(metadata_file, "w") as f: | |
for metadata in results: | |
if metadata: | |
json.dump(metadata, f) | |
f.write("\n") | |
# λ°μ΄ν°μ λ‘λ νμΌ μμ± | |
generate_dataset_loader(target_dir) | |
if __name__ == "__main__": | |
main() | |
print(f"Dataset creation completed. Output directory: {target_dir}") |