import json from datetime import date, datetime from enum import Enum from typing import Any, Optional from inference.enterprise.stream_management.manager.entities import ( ENCODING, ERROR_TYPE_KEY, PIPELINE_ID_KEY, REQUEST_ID_KEY, RESPONSE_KEY, STATUS_KEY, ErrorType, OperationStatus, ) def serialise_to_json(obj: Any) -> Any: if isinstance(obj, (datetime, date)): return obj.isoformat() if issubclass(type(obj), Enum): return obj.value raise TypeError(f"Type {type(obj)} not serializable") def describe_error( exception: Optional[Exception] = None, error_type: ErrorType = ErrorType.INTERNAL_ERROR, ) -> dict: payload = { STATUS_KEY: OperationStatus.FAILURE, ERROR_TYPE_KEY: error_type, } if exception is not None: payload["error_class"] = exception.__class__.__name__ payload["error_message"] = str(exception) return payload def prepare_error_response( request_id: str, error: Exception, error_type: ErrorType, pipeline_id: Optional[str] ) -> bytes: error_description = describe_error(exception=error, error_type=error_type) return prepare_response( request_id=request_id, response=error_description, pipeline_id=pipeline_id ) def prepare_response( request_id: str, response: dict, pipeline_id: Optional[str] ) -> bytes: payload = json.dumps( { REQUEST_ID_KEY: request_id, RESPONSE_KEY: response, PIPELINE_ID_KEY: pipeline_id, }, default=serialise_to_json, ) return payload.encode(ENCODING)