samgis-lisa-on-cuda / samgis /io /wrappers_helpers.py
alessandro trinca tornidor
[feat] prepare entire docker build with nvidia GPU on hf space cloning https://huggingface.co/spaces/aletrn/samgis
0914710
raw
history blame
No virus
8.8 kB
"""lambda helper functions"""
import logging
from sys import stdout
from typing import Dict
import loguru
from xyzservices import providers, TileProvider
from lisa_on_cuda.utils.app_helpers import get_cleaned_input
from samgis import app_logger
from samgis.io.coordinates_pixel_conversion import get_latlng_to_pixel_coordinates
from samgis.utilities.constants import COMPLETE_URL_TILES_MAPBOX, COMPLETE_URL_TILES_NEXTZEN, CUSTOM_RESPONSE_MESSAGES
from samgis.utilities.type_hints import ApiRequestBody, ContentTypes, XYZTerrainProvidersNames, \
XYZDefaultProvidersNames, StringPromptApiRequestBody
from samgis_core.utilities.utilities import base64_decode
def get_response(status: int, start_time: float, request_id: str, response_body: Dict = None) -> str:
"""
Response composer
Args:
status: status response
start_time: request start time (float)
request_id: str
response_body: dict we embed into our response
Returns:
json response
"""
from json import dumps
from time import time
app_logger.debug(f"response_body:{response_body}.")
response_body["duration_run"] = time() - start_time
response_body["message"] = CUSTOM_RESPONSE_MESSAGES[status]
response_body["request_id"] = request_id
response = {
"statusCode": status,
"header": {"Content-Type": ContentTypes.APPLICATION_JSON},
"body": dumps(response_body),
"isBase64Encoded": False
}
app_logger.debug(f"response type:{type(response)} => {response}.")
return dumps(response)
def get_parsed_bbox_points_with_string_prompt(request_input: StringPromptApiRequestBody) -> Dict:
"""
Parse the raw input request into bbox, prompt string and zoom
Args:
request_input: input dict
Returns:
dict with bounding box, prompt string and zoom
"""
app_logger.info(f"try to parsing input request {request_input}...")
bbox = request_input.bbox
app_logger.debug(f"request bbox: {type(bbox)}, value:{bbox}.")
ne = bbox.ne
sw = bbox.sw
app_logger.debug(f"request ne: {type(ne)}, value:{ne}.")
app_logger.debug(f"request sw: {type(sw)}, value:{sw}.")
ne_latlng = [float(ne.lat), float(ne.lng)]
sw_latlng = [float(sw.lat), float(sw.lng)]
new_zoom = int(request_input.zoom)
cleaned_prompt = get_cleaned_input(request_input.string_prompt)
app_logger.debug(f"bbox => {bbox}.")
app_logger.debug(f'request_input-prompt cleaned => {cleaned_prompt}.')
app_logger.info("unpacking elaborated request...")
return {
"bbox": [ne_latlng, sw_latlng],
"prompt": cleaned_prompt,
"zoom": new_zoom,
"source": get_url_tile(request_input.source_type)
}
def get_parsed_bbox_points_with_dictlist_prompt(request_input: ApiRequestBody) -> Dict:
"""
Parse the raw input request into bbox, prompt and zoom
Args:
request_input: input dict
Returns:
dict with bounding box, prompt and zoom
"""
app_logger.info(f"try to parsing input request {request_input}...")
bbox = request_input.bbox
app_logger.debug(f"request bbox: {type(bbox)}, value:{bbox}.")
ne = bbox.ne
sw = bbox.sw
app_logger.debug(f"request ne: {type(ne)}, value:{ne}.")
app_logger.debug(f"request sw: {type(sw)}, value:{sw}.")
ne_latlng = [float(ne.lat), float(ne.lng)]
sw_latlng = [float(sw.lat), float(sw.lng)]
new_zoom = int(request_input.zoom)
new_prompt_list = _get_parsed_prompt_list(ne, sw, new_zoom, request_input.prompt)
app_logger.debug(f"bbox => {bbox}.")
app_logger.debug(f'request_input-prompt updated => {new_prompt_list}.')
app_logger.info("unpacking elaborated request...")
return {
"bbox": [ne_latlng, sw_latlng],
"prompt": new_prompt_list,
"zoom": new_zoom,
"source": get_url_tile(request_input.source_type)
}
def _get_parsed_prompt_list(bbox_ne, bbox_sw, zoom, prompt_list):
new_prompt_list = []
for prompt in prompt_list:
app_logger.debug(f"current prompt: {type(prompt)}, value:{prompt}.")
new_prompt = {"type": prompt.type.value}
if prompt.type == "point":
new_prompt_data = _get_new_prompt_data_point(bbox_ne, bbox_sw, prompt, zoom)
new_prompt["label"] = prompt.label.value
elif prompt.type == "rectangle":
new_prompt_data = _get_new_prompt_data_rectangle(bbox_ne, bbox_sw, prompt, zoom)
else:
msg = "Valid prompt type: 'point' or 'rectangle', not '{}'. Check ApiRequestBody parsing/validation."
raise TypeError(msg.format(prompt.type))
app_logger.debug(f"new_prompt_data: {type(new_prompt_data)}, value:{new_prompt_data}.")
new_prompt["data"] = new_prompt_data
new_prompt_list.append(new_prompt)
return new_prompt_list
def _get_new_prompt_data_point(bbox_ne, bbox_sw, prompt, zoom):
current_point = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data, zoom, prompt.type)
app_logger.debug(f"current prompt: {type(current_point)}, value:{current_point}, label: {prompt.label}.")
return [current_point['x'], current_point['y']]
def _get_new_prompt_data_rectangle(bbox_ne, bbox_sw, prompt, zoom):
current_point_ne = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data.ne, zoom, prompt.type)
app_logger.debug(
f"rectangle:: current_point_ne prompt: {type(current_point_ne)}, value:{current_point_ne}.")
current_point_sw = get_latlng_to_pixel_coordinates(bbox_ne, bbox_sw, prompt.data.sw, zoom, prompt.type)
app_logger.debug(
f"rectangle:: current_point_sw prompt: {type(current_point_sw)}, value:{current_point_sw}.")
# correct order for rectangle prompt
return [
current_point_sw["x"],
current_point_ne["y"],
current_point_ne["x"],
current_point_sw["y"]
]
def get_parsed_request_body(event: Dict or str) -> ApiRequestBody:
"""
Validator for the raw input request lambda event
Args:
event: input dict
Returns:
parsed request input
"""
from json import dumps, loads
from logging import getLevelName
def _get_current_log_level(logger: loguru.logger) -> [str, loguru._logger.Level]:
levels = logger._core.levels
current_log_level = logger._core.min_level
level_filt = [l for l in levels.items() if l[1].no == current_log_level]
return level_filt[0]
app_logger.info(f"event:{dumps(event)}...")
try:
raw_body = event["body"]
except Exception as e_constants1:
app_logger.error(f"e_constants1:{e_constants1}.")
raw_body = event
app_logger.debug(f"raw_body, #1: {type(raw_body)}, {raw_body}...")
if isinstance(raw_body, str):
body_decoded_str = base64_decode(raw_body)
app_logger.debug(f"body_decoded_str: {type(body_decoded_str)}, {body_decoded_str}...")
raw_body = loads(body_decoded_str)
app_logger.info(f"body, #2: {type(raw_body)}, {raw_body}...")
parsed_body = ApiRequestBody.model_validate(raw_body)
log_level = "DEBUG" if parsed_body.debug else "INFO"
app_logger.remove()
app_logger.add(stdout, level=log_level)
try:
current_log_level_name, _ = _get_current_log_level(app_logger)
app_logger.warning(f"set log level to {getLevelName(current_log_level_name)}.")
except Exception as ex:
print("failing setting parsing bbox, logger is ok? ex:", ex, "#")
return parsed_body
mapbox_terrain_rgb = TileProvider(
name=XYZTerrainProvidersNames.MAPBOX_TERRAIN_TILES_NAME,
url=COMPLETE_URL_TILES_MAPBOX,
attribution=""
)
nextzen_terrain_rgb = TileProvider(
name=XYZTerrainProvidersNames.NEXTZEN_TERRAIN_TILES_NAME,
url=COMPLETE_URL_TILES_NEXTZEN,
attribution=""
)
def get_url_tile(source_type: str):
try:
match source_type.lower():
case XYZDefaultProvidersNames.DEFAULT_TILES_NAME_SHORT:
return providers.query_name(XYZDefaultProvidersNames.DEFAULT_TILES_NAME)
case XYZTerrainProvidersNames.MAPBOX_TERRAIN_TILES_NAME:
return mapbox_terrain_rgb
case XYZTerrainProvidersNames.NEXTZEN_TERRAIN_TILES_NAME:
app_logger.info("nextzen_terrain_rgb:", nextzen_terrain_rgb)
return nextzen_terrain_rgb
case _:
return providers.query_name(source_type)
except ValueError as ve:
from pydantic_core import ValidationError
app_logger.error("ve:", str(ve))
raise ValidationError(ve)
def check_source_type_is_terrain(source: str | TileProvider):
return isinstance(source, TileProvider) and source.name in list(XYZTerrainProvidersNames)