LPX
feat: implement CommitScheduler for automated logging to Hugging Face dataset and refactor dataset initialization process
5967d4d
import os | |
import base64 | |
import json | |
import io | |
import datetime | |
from PIL import Image | |
import logging | |
from huggingface_hub import HfApi, CommitOperationAdd # Keep HfApi for repo creation, but remove CommitOperationAdd for direct upload | |
import numpy as np | |
logger = logging.getLogger(__name__) | |
HF_DATASET_NAME = "aiwithoutborders-xyz/degentic_rd0" | |
LOCAL_LOG_DIR = "./hf_inference_logs" # Define a local directory to store logs | |
# Custom JSON Encoder to handle numpy types | |
class NumpyEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, np.float32): | |
return float(obj) | |
return json.JSONEncoder.default(self, obj) | |
def _pil_to_base64(image: Image.Image) -> str: | |
"""Converts a PIL Image to a base64 string.""" | |
# Explicitly check if the input is a PIL Image | |
if not isinstance(image, Image.Image): | |
raise TypeError(f"Expected a PIL Image, but received type: {type(image)}") | |
buffered = io.BytesIO() | |
# Ensure image is in RGB mode before saving as JPEG | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
image.save(buffered, format="JPEG", quality=85) | |
return base64.b64encode(buffered.getvalue()).decode('utf-8') | |
# The initialize_dataset function will change significantly or be removed/simplified | |
# as we are no longer appending to a datasets.Dataset object directly in memory | |
def initialize_dataset_repo(): | |
"""Initializes or ensures the Hugging Face dataset repository exists.""" | |
api = HfApi() | |
try: | |
api.repo_info(repo_id=HF_DATASET_NAME, repo_type="dataset") | |
logger.info(f"Hugging Face dataset repository already exists: {HF_DATASET_NAME}") | |
except Exception: | |
logger.info(f"Creating new Hugging Face dataset repository: {HF_DATASET_NAME}") | |
api.create_repo(repo_id=HF_DATASET_NAME, repo_type="dataset", private=True) | |
return api # Return the API object for subsequent operations | |
def log_inference_data( | |
original_image: Image.Image, | |
inference_params: dict, | |
model_predictions: list[dict], | |
ensemble_output: dict, | |
forensic_images: list[Image.Image], | |
agent_monitoring_data: dict, | |
human_feedback: dict = None | |
): | |
"""Logs a single inference event by uploading a JSON file to the Hugging Face dataset repository.""" | |
try: | |
api = initialize_dataset_repo() # Get or create the repository | |
original_image_b64 = _pil_to_base64(original_image) | |
forensic_images_b64 = [] | |
for img_item in forensic_images: | |
if img_item is not None: | |
if not isinstance(img_item, Image.Image): | |
try: | |
img_item = Image.fromarray(img_item) | |
except Exception as e: | |
logger.error(f"Error converting forensic image to PIL for base64 encoding: {e}") | |
continue | |
forensic_images_b64.append(_pil_to_base64(img_item)) | |
new_entry = { | |
"timestamp": datetime.datetime.now().isoformat(), | |
"image": original_image_b64, | |
"inference_request": inference_params, | |
"model_predictions": model_predictions, | |
"ensemble_output": ensemble_output, | |
"forensic_outputs": forensic_images_b64, | |
"agent_monitoring_data": agent_monitoring_data, | |
"human_feedback": human_feedback if human_feedback is not None else {} | |
} | |
# Define a unique path for the new log file within the local directory | |
os.makedirs(LOCAL_LOG_DIR, exist_ok=True) # Ensure the local directory exists | |
timestamp_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f") | |
log_file_path = os.path.join(LOCAL_LOG_DIR, f"log_{timestamp_str}.json") | |
# Serialize the new entry to a JSON file using the custom encoder | |
with open(log_file_path, 'w', encoding='utf-8') as f: | |
json.dump(new_entry, f, cls=NumpyEncoder, indent=2) | |
logger.info(f"Inference data logged successfully to local file: {log_file_path}") | |
except Exception as e: | |
logger.error(f"Failed to log inference data to local file: {e}") |