|
|
import json |
|
|
import math |
|
|
from pathlib import Path |
|
|
import numpy as np |
|
|
import cv2 |
|
|
from pycocotools import mask as mask_utils |
|
|
from PIL import Image, ExifTags |
|
|
|
|
|
def get_image_dimensions(image_path): |
|
|
"""更健壮的尺寸获取方法,包含多种异常处理""" |
|
|
try: |
|
|
|
|
|
with Image.open(image_path) as img: |
|
|
width, height = img.size |
|
|
orientation = 1 |
|
|
|
|
|
try: |
|
|
exif = img._getexif() or {} |
|
|
for tag, name in ExifTags.TAGS.items(): |
|
|
if name == 'Orientation': |
|
|
orientation = exif.get(tag, 1) |
|
|
break |
|
|
except Exception as e: |
|
|
print(f"EXIF读取警告 [{image_path.name}]: {str(e)}") |
|
|
|
|
|
|
|
|
if orientation in [5, 6, 7, 8]: |
|
|
return height, width |
|
|
else: |
|
|
return width, height |
|
|
|
|
|
except Exception as pil_error: |
|
|
print(f"PIL读取失败 [{image_path.name}], 尝试OpenCV: {str(pil_error)}") |
|
|
try: |
|
|
|
|
|
img = cv2.imread(str(image_path)) |
|
|
if img is not None: |
|
|
h, w = img.shape[:2] |
|
|
return w, h |
|
|
raise ValueError("OpenCV返回空图像") |
|
|
except Exception as cv_error: |
|
|
print(f"严重错误: 无法获取尺寸 [{image_path.name}]: {str(cv_error)}") |
|
|
return (0, 0) |
|
|
|
|
|
def points_to_rle(points, img_dimensions): |
|
|
"""带安全坐标钳位的多边形转换""" |
|
|
width, height = img_dimensions |
|
|
mask = np.zeros((height, width), dtype=np.uint8) |
|
|
|
|
|
polygon = [] |
|
|
for x, y in points: |
|
|
|
|
|
safe_x = min(max(0, int(round(x))), width - 1) |
|
|
safe_y = min(max(0, int(round(y))), height - 1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
polygon.append((safe_x, safe_y)) |
|
|
|
|
|
|
|
|
if len(polygon) < 3: |
|
|
raise ValueError(f"无效多边形,点数不足3个") |
|
|
|
|
|
|
|
|
cv2.fillPoly(mask, [np.array(polygon, dtype=np.int32)], color=1) |
|
|
rle = mask_utils.encode(np.asfortranarray(mask)) |
|
|
|
|
|
return { |
|
|
"size": [height, width], |
|
|
"counts": rle['counts'].decode('utf-8') |
|
|
} |
|
|
|
|
|
def convert_medical_json(input_file, config=None): |
|
|
"""增强版转换函数""" |
|
|
cfg = { |
|
|
"task_type": "Image-Segmentation", |
|
|
"source": "Lisa", |
|
|
"domain": "General", |
|
|
**(config or {}) |
|
|
} |
|
|
|
|
|
try: |
|
|
input_path = Path(input_file) |
|
|
image_path = input_path.with_suffix('.jpg') |
|
|
|
|
|
|
|
|
if not image_path.exists(): |
|
|
raise FileNotFoundError(f"关联图片不存在: {image_path.name}") |
|
|
|
|
|
media_paths=(Path(".") / "data" / cfg['source'] / image_path.name).as_posix() |
|
|
media_paths = f"./{media_paths}" |
|
|
|
|
|
width, height = get_image_dimensions(image_path) |
|
|
if width == 0 or height == 0: |
|
|
raise ValueError("获取图片尺寸失败") |
|
|
|
|
|
|
|
|
with open(input_file, 'r', encoding='utf-8') as f: |
|
|
raw_data = json.load(f) |
|
|
|
|
|
annotations = [] |
|
|
for shape in raw_data.get('shapes', []): |
|
|
if shape.get('label') != 'target': |
|
|
continue |
|
|
|
|
|
points = shape.get('points', []) |
|
|
try: |
|
|
rle = points_to_rle(points, (width, height)) |
|
|
annotations.append({ |
|
|
"bbox": [], |
|
|
"segmentation": rle, |
|
|
"category_name": "" |
|
|
}) |
|
|
except ValueError as e: |
|
|
print(f"标注跳过 [{input_path.name}]: {str(e)}") |
|
|
|
|
|
return [{ |
|
|
"index": 0, |
|
|
"media_type": "image", |
|
|
"media_paths": media_paths, |
|
|
"description": "", |
|
|
"task_type": cfg['task_type'], |
|
|
"question": raw_data.get('text', []), |
|
|
"question_type": "detection-form", |
|
|
"options": [], |
|
|
"annotations": [annotations], |
|
|
"answer": [], |
|
|
"source": cfg['source'], |
|
|
"domain": cfg['domain'] |
|
|
}] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"转换失败 [{input_path.name}]: {str(e)}") |
|
|
return None |
|
|
|
|
|
def batch_convert(input_dir, output_file): |
|
|
"""批量处理增强版""" |
|
|
input_dir = Path(input_dir) |
|
|
all_data = [] |
|
|
success_count = 0 |
|
|
failed_files = [] |
|
|
index_counter = 0 |
|
|
|
|
|
for json_file in input_dir.glob('*.json'): |
|
|
if result := convert_medical_json(json_file): |
|
|
|
|
|
for item in result: |
|
|
item["index"] = index_counter |
|
|
index_counter += 1 |
|
|
all_data.extend(result) |
|
|
success_count += len(result) |
|
|
else: |
|
|
failed_files.append(json_file.name) |
|
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(all_data, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
print(f"转换完成: 成功 {success_count} 个文件,失败 {len(failed_files)} 个") |
|
|
if failed_files: |
|
|
print("失败文件列表:\n" + "\n".join(failed_files)) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
batch_convert( |
|
|
input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/general/lisa/image/val", |
|
|
output_file="/mnt/data/users/zys/proj/vlm_reasoning/utils/json/converted_dataset3.json" |
|
|
) |