NRTK-Gradio / app.py
emilyveenhuis's picture
gitignore and README updates
3416608 verified
# This app explores integrating `nrtk` with `Gradio` to create an interactive interface for applying perturbations.
import numpy as np
import yaml
from pathlib import Path
from nrtk.impls.perturb_image.pybsm.scenario import PybsmScenario
from nrtk.impls.perturb_image.pybsm.sensor import PybsmSensor
from nrtk.impls.perturb_image.pybsm.perturber import PybsmPerturber
import gradio as gr # type: ignore
asset_dir = Path(__file__).parent / "assets"
# Define default values for fields for initilization and button clicks
#
# Note, for this application we'll be focusing on the use case of UAS imagery, so we'll try to pick default
# values that work with images similar to those in the VisDrone dataset. Perturbing images from other datasets or other
# operational tasks may not be successful without modification to these values; defining broad defaults is extremely
# difficult, if not impossible due to the physics-based nature of these perturbations.
default_values = {
"gsd": 0.105,
"scenario": {
"aircraftSpeed": 0,
"altitude": 75,
"backgroundReflectance": 0.07,
"backgroundTemperature": 293.,
"cn2at1m": 0,
"groundRange": 0,
"haWindspeed": 21.,
"ihaze": 2,
"name": "",
"targetReflectance": 0.15,
"targetTemperature": 295.,
},
"sensor": {
"D": 0.004,
"bitdepth": 11.9,
"darkCurrent": 0.,
"dax": 0.0001,
"day": 0.0001,
"eta": 0.4,
"f": 0.01429,
"intTime": 0.03,
"maxN": 96000,
"maxWellFill": 0.005,
"name": "",
"optTransWavelengths": [3.8e-07, 7.0e-07],
"opticsTransmission": [],
"px": 2.0e-05,
"qe": [0.05, 0.6, 0.75, 0.85, 0.85, 0.75, 0.5, 0.2, 0],
"qewavelengths": [3.0e-7, 4.0e-7, 5.0e-7, 6.0e-7, 7.0e-7, 8.0e-7, 9.0e-7, 1.0e-6, 1.1e-6],
"readNoise": 25.,
"sx": 0.,
"sy": 0.
}
}
def load_config(config):
"""
Loads configuration from config dictionary to UI elements
"""
scenario_config = config["scenario"]
default_scenario = default_values["scenario"]
sensor_config = config["sensor"]
default_sensor = default_values["sensor"]
return {
input_group: gr.Group(visible=True),
img_gsd: config["gsd"] if "gsd" in config else default_values["gsd"],
scenario_name: scenario_config["name"] if "name" in scenario_config else default_scenario["name"],
ihaze: scenario_config["ihaze"] if "ihaze" in scenario_config else default_scenario["ihaze"],
altitude_m: scenario_config["altitude"] if "altitude" in scenario_config else default_scenario["altitude"],
ground_range_m: scenario_config["groundRange"]
if "groundRange" in scenario_config else default_scenario["groundRange"],
aircraft_speed_m_Per_s: scenario_config["aircraftSpeed"]
if "aircraftSpeed" in scenario_config else default_scenario["aircraftSpeed"],
target_reflectance: scenario_config["targetReflectance"]
if "targetReflectance" in scenario_config else default_scenario["targetReflectance"],
target_temperature_K: scenario_config["targetTemperature"]
if "targetTemperature" in scenario_config else default_scenario["targetTemperature"],
bkgd_reflectance: scenario_config["backgroundReflectance"]
if "backgroundReflectance" in scenario_config else default_scenario["backgroundReflectance"],
bkgd_temperature_K: scenario_config["backgroundTemperature"]
if "backgroundTemperature" in scenario_config else default_scenario["backgroundTemperature"],
ha_windspeed_m_Per_s: scenario_config["haWindspeed"]
if "haWindspeed" in scenario_config else default_scenario["haWindspeed"],
cn2at1m: scenario_config["cn2at1m"]
if "cn2at1m" in scenario_config else default_scenario["cn2at1m"],
sensor_name: sensor_config["name"] if "name" in sensor_config else default_sensor["name"],
D_m: sensor_config["D"] if "D" in sensor_config else default_sensor["D"],
f_m: sensor_config["f"] if "f" in sensor_config else default_sensor["f"],
px_m: sensor_config["px"] if "px" in sensor_config else default_sensor["px"],
# Convert lists to comma separated text
optTransWavelengths_str: ", ".join(str(x) for x in sensor_config["optTransWavelengths"])
if "optTransWavelengths" in sensor_config and sensor_config["optTransWavelengths"] else "",
opticsTransmission_str: ", ".join(str(x) for x in sensor_config["opticsTransmission"])
if "opticsTransmission" in sensor_config and sensor_config["opticsTransmission"] else "",
eta: sensor_config["eta"] if "eta" in sensor_config else default_sensor["eta"],
int_time_s: sensor_config["intTime"] if "intTime" in sensor_config else default_sensor["intTime"],
dark_current: sensor_config["darkCurrent"] if "darkCurrent" in sensor_config else default_sensor["darkCurrent"],
read_noise: sensor_config["readNoise"] if "readNoise" in sensor_config else default_sensor["readNoise"],
max_N: sensor_config["maxN"] if "maxN" in sensor_config else default_sensor["maxN"],
bit_depth: sensor_config["bitdepth"] if "bitdepth" in sensor_config else default_sensor["bitdepth"],
max_well_fill: sensor_config["maxWellFill"] if "maxWellFill" in sensor_config else default_sensor["maxWellFill"],
sx: sensor_config["sx"] if "sx" in sensor_config else default_sensor["sx"],
sy: sensor_config["sy"] if "sy" in sensor_config else default_sensor["sy"],
dax: sensor_config["dax"] if "dax" in sensor_config else default_sensor["dax"],
day: sensor_config["day"] if "day" in sensor_config else default_sensor["day"],
# Convert lists to comma separated text
qe_str: ", ".join(str(x) for x in sensor_config["qe"]) if "qe" in sensor_config and sensor_config["qe"] else "",
qewavelengths_str: ", ".join(str(x) for x in sensor_config["qewavelengths"])
if "qewavelengths" in sensor_config and sensor_config["qewavelengths"] else ""
}
def generate_config(data):
"""
Generate dictionary that can easily be transformed into sensor and scenario objects
"""
# Input validation
if data[ihaze] not in PybsmScenario.ihaze_values:
raise gr.Error("Invalid ihaze value!")
if data[altitude_m] not in PybsmScenario.altitude_values:
raise gr.Error("Invalid altitude value!")
if data[ground_range_m] not in PybsmScenario.groundRange_values:
raise gr.Error("Invalid ground range value!")
scenario_config = {
"name": data[scenario_name],
"ihaze": data[ihaze],
"altitude": data[altitude_m],
"groundRange": data[ground_range_m],
"aircraftSpeed": data[aircraft_speed_m_Per_s],
"targetReflectance": data[target_reflectance],
"targetTemperature": data[target_temperature_K],
"backgroundReflectance": data[bkgd_reflectance],
"backgroundTemperature": data[bkgd_temperature_K],
"haWindspeed": data[ha_windspeed_m_Per_s],
"cn2at1m": data[cn2at1m]
}
# Convert text fields into lists of floats
optTransWavelengths = [float(w.strip()) for w in data[optTransWavelengths_str].split(",") if w.strip()]
opticsTransmission = [float(t.strip()) for t in data[opticsTransmission_str].split(",") if t.strip()]
opticsTransmission = None if not opticsTransmission else opticsTransmission
qe = [float(q.strip()) for q in data[qe_str].split(",") if q.strip()]
qe = None if not qe else qe
qewavelengths = [float(q.strip()) for q in data[qewavelengths_str].split(",") if q.strip()]
qewavelengths = None if not qewavelengths else qewavelengths
# More input validation
if len(optTransWavelengths) < 2:
raise gr.Error("At least 2 optical transmission wavelengths required!")
if optTransWavelengths[0] >= optTransWavelengths[-1]:
raise gr.Error("Optical transmission wavelengths should be entered least to greatest!")
if opticsTransmission is not None and len(opticsTransmission) != len(optTransWavelengths):
raise gr.Error("If provided, Optical Transmission must have the same number of values as Spectral Bandpass!")
sensor_config = {
"name": data[sensor_name],
"D": data[D_m],
"f": data[f_m],
"px": data[px_m],
"optTransWavelengths": optTransWavelengths,
"opticsTransmission": opticsTransmission,
"eta": data[eta],
"intTime": data[int_time_s],
"darkCurrent": data[dark_current],
"readNoise": data[read_noise],
"maxN": data[max_N],
"bitdepth": data[bit_depth],
"maxWellFill": data[max_well_fill],
"sx": data[sx],
"sy": data[sy],
"dax": data[dax],
"day": data[day],
"qe": qe,
"qewavelengths": qewavelengths
}
return {
"scenario": scenario_config,
"sensor": sensor_config,
"gsd" : data[img_gsd]
}
def gen_new_config():
"""
Resets all fields to default values
"""
return load_config(default_values)
def load_config_from_file(data):
"""
Loads configuration from given file to UI elements
"""
if not data[config_file]:
raise gr.Error("A file must be uploaded to load existing configuration!")
with open(data[config_file]) as file:
config = yaml.safe_load(file)
return load_config(config)
def submit(data):
"""
Apply the perturbation and hide/show relevant UI elements as needed
"""
config = generate_config(data)
scenario_config = config["scenario"]
sensor_config = config["sensor"]
# Sensor expects numpy arrays, but plain lists serialize better so convert here
if sensor_config["optTransWavelengths"]:
sensor_config["optTransWavelengths"] = np.asarray(sensor_config["optTransWavelengths"])
if sensor_config["opticsTransmission"]:
sensor_config["opticsTransmission"] = np.asarray(sensor_config["opticsTransmission"])
if sensor_config["qe"]:
sensor_config["qe"] = np.asarray(sensor_config["qe"])
if sensor_config["qewavelengths"]:
sensor_config["qewavelengths"] = np.asarray(sensor_config["qewavelengths"])
gsd = config["gsd"]
sensor = PybsmSensor(**sensor_config)
scenario = PybsmScenario(**scenario_config)
perturber = PybsmPerturber(sensor=sensor, scenario=scenario)
config = generate_config(data)
config_file = str(asset_dir / "generated_config.yml")
with open(config_file, "w") as f:
yaml.dump(config, f)
def stretch_contrast_convert_8bit(img, perc=[0.1, 99.9]):
img = img.astype(float)
img = img - np.percentile(img.ravel(), perc[0])
img = img / (np.percentile(img.ravel(), perc[1]) / 255)
img = np.clip(img, 0, 255)
return np.round(img).astype(np.uint8)
perturbed_img = perturber(image=data[input_img], additional_params={"img_gsd": gsd})
peturbed_img = stretch_contrast_convert_8bit(perturbed_img, [0.1, 90.9])
# Apply the perturbation and display
return {
out_img: perturbed_img,
out_config: config_file
}
# Lastly, we define the layout of the application and register the button click listener functions:
with gr.Blocks() as demo:
gr.Markdown(
"""
# Apply pyBSM Perturbations with NRTK
Note: Default configuration options are tailored to UAS imagery (specifically VisDrone).
Perturbing other datasets or other operational tasks may not be successful without modification
to some of these configuration options; defining broad defaults is extremely difficult, if not
impossible due to the physics-based nature of these perturbations.
"""
)
with gr.Row():
with gr.Column():
sample_img_path = str(asset_dir / "0000161_01584_d_0000158.jpg")
input_img = gr.Image(
label="Input Image",
value=sample_img_path
)
gr.Examples(
examples=[
sample_img_path,
str(asset_dir / "0000006_01111_d_0000003.jpg"),
str(asset_dir / "0000006_01659_d_0000004.jpg"),
str(asset_dir / "0000006_04309_d_0000011.jpg")
],
inputs=input_img
)
with gr.Column() as output_col:
out_img = gr.Image(label="Perturbed Image")
out_config = gr.File(label="Generated Configuration")
with gr.Row():
with gr.Column() as input_col:
with gr.Row():
gen_config_btn = gr.Button("Generate New Configuration")
with gr.Column():
config_file = gr.File(label="Configuration File", file_types=[".yaml"])
load_config_btn = gr.Button("Load Configuration from File")
with gr.Group(visible=False) as input_group:
with gr.Accordion("Image Parameters", open=False):
img_gsd = gr.Number(
label="Image Ground Sample Distance (GSD) (m)",
info="The size of one pixel on the ground",
value=default_values["gsd"]
)
with gr.Accordion("Scenario Parameters") as scenario_params:
altitude_m = gr.Number(
label="Altitude (m)",
info="Sensor height above ground level in meters. The database includes the following " \
"altitude options: 2m 32.55m 75m 150m 225m 500m, 1000m to 12000m in 1000m steps, " \
"14000m to 20000m in 2000m steps, and 24500m",
value=default_values["scenario"]["altitude"]
)
ground_range_m = gr.Number(
label="Ground Range (m)",
info="Distance on the ground between the target and sensor in meters. The following " \
"ground ranges are included in the database at each altitude until the ground " \
"range exceeds the distance to the spherical earth horizon: 0m 100m 500m, 1000m to " \
"20000m in 1000m steps, 22000m to 80000m in 2000m steps, and 85000m to " \
"300000m in 5000m steps.",
value=default_values["scenario"]["groundRange"]
)
with gr.Accordion("Additional Scenario Parameters", open=False) as opt_scenario_params:
scenario_name = gr.Textbox(label="Scenario Name", value=default_values["scenario"]["name"])
ihaze = gr.Dropdown(
label="IHAZE",
info="MODTRAN code for visibility",
choices=[1, 2],
value=default_values["scenario"]["ihaze"]
)
aircraft_speed_m_Per_s = gr.Number(
label="Aircraft Speed (m/s)",
info="Ground speed of the aircraft",
value=default_values["scenario"]["aircraftSpeed"]
)
with gr.Row():
target_reflectance = gr.Number(
label="Target Reflectance",
info="Object reflectance",
value=default_values["scenario"]["targetReflectance"]
)
target_temperature_K = gr.Number(
label="Target Temperature (K)",
info="Object temperature (Kelvin)",
value=default_values["scenario"]["targetTemperature"]
)
with gr.Row():
bkgd_reflectance = gr.Number(
label="Background Reflectance",
info="Background reflectance",
value=default_values["scenario"]["backgroundReflectance"]
)
bkgd_temperature_K = gr.Number(
label="Background Temperature (K)",
info="Background temperature (Kelvin)",
value=default_values["scenario"]["backgroundTemperature"]
)
ha_windspeed_m_Per_s = gr.Number(
label="High Altitude Windspeed (m/s)",
info="Used to calculate the turbulence profile",
value=default_values["scenario"]["haWindspeed"]
)
cn2at1m = gr.Number(
label="Refractive Index Structure Parameter",
info='The refractive index structure parameter "near the ground" (e.g. ' \
'at h = 1m). Used to calculate the turbulence profile',
value=default_values["scenario"]["cn2at1m"]
)
with gr.Accordion("Sensor Parameters") as sensor_params:
D_m = gr.Number(label="Effective Aperture Diameter (m)", value=default_values["sensor"]["D"])
f_m = gr.Number(label="Focal Length (m)", value=default_values["sensor"]["f"])
with gr.Accordion("Additional Sensor Parameters", open=False) as opt_sensor_parameters:
sensor_name = gr.Textbox(label="Sensor Name", value=default_values["sensor"]["name"])
px_m = gr.Number(label="Detector Center-to-Center Spacing (Pitch) (m)", value=default_values["sensor"]["px"])
optTransWavelengths_str = gr.Textbox(
label="Spectral Bandpass of the Camera (m)",
info="Enter a comma separated list. At minimum, a start and end wavelength should be specified",
value=", ".join(map(str, default_values["sensor"]["optTransWavelengths"]))
if default_values["sensor"]["optTransWavelengths"] else ""
)
opticsTransmission_str = gr.Textbox(
label="Full System In-Band Optical Transmission",
info="Enter a comma separated list. Loss due to any telescope obscuration should not be included",
value=", ".join(map(str, default_values["sensor"]["opticsTransmission"]))
if default_values["sensor"]["opticsTransmission"] else ""
)
eta = gr.Number(label="Relative Linear Obscuration", value=default_values["sensor"]["eta"])
int_time_s = gr.Number(
label="Integration Time (s)", info="Maximum integration time",
value=default_values["sensor"]["intTime"]
)
dark_current = gr.Number(label="Detector Dark Current (e-/s)", value=default_values["sensor"]["darkCurrent"])
read_noise = gr.Number(label="RMS Read Noise (RMS e-)", value=default_values["sensor"]["readNoise"])
max_N = gr.Number(label="Maximum ADC Level (e-)", value=default_values["sensor"]["maxN"])
bit_depth = gr.Number(
label="Bit Depth (bits)",
info="Resolution of the detector ADC",
value=default_values["sensor"]["bitdepth"]
)
max_well_fill = gr.Number(
label="Max Well Fill",
info="Desired well fill. i.e. maximum well size x desired fill fraction",
value=default_values["sensor"]["maxWellFill"]
)
with gr.Row():
sx = gr.Number(label="RMS Jitter Amplitude, X Direction (rad)", value=default_values["sensor"]["sx"])
sy = gr.Number(label="RMS Jitter Amplitude, Y Direction (rad)", value=default_values["sensor"]["sy"])
with gr.Row():
dax = gr.Number(
label="Line of Sight Angular Drift Rate, X Direction (rad/s)",
info="Drift rate during one integration time",
value=default_values["sensor"]["dax"]
)
day = gr.Number(
label="Line of Sight Angular Drift Rate, Y Direction (rad/s)",
info="Drift rate during one integration time",
value=default_values["sensor"]["day"]
)
qe_str = gr.Textbox(
label="Quantum Efficiency as a function of Wavelength (e-/photon)",
info="Enter a comma separated list",
value=", ".join(map(str, default_values["sensor"]["qe"])) if default_values["sensor"]["qe"] else ""
)
qewavelengths_str = gr.Textbox(
label="Wavelengths Corresponding to the Quantum Efficiency Array (microns)",
info="Enter a comma separated list",
value=", ".join(map(str, default_values["sensor"]["qewavelengths"]))
if default_values["sensor"]["qewavelengths"] else ""
)
submit_btn = gr.Button("Perturb Image")
github_btn = gr.Button(
"Check out NRTK on GitHub!",
icon=str(asset_dir / "github-badge.png"),
link="https://github.com/Kitware/nrtk"
)
# Button listeners
gen_config_btn.click(
fn=gen_new_config,
inputs=None,
outputs=[
input_group, img_gsd, scenario_name, ihaze, altitude_m, ground_range_m, aircraft_speed_m_Per_s, target_reflectance,
target_temperature_K, bkgd_reflectance, bkgd_temperature_K, ha_windspeed_m_Per_s, cn2at1m, sensor_name,
D_m, f_m, px_m, optTransWavelengths_str, opticsTransmission_str, eta, int_time_s, dark_current, read_noise,
max_N, bit_depth, max_well_fill, sx, sy, dax, day, qe_str, qewavelengths_str
]
)
load_config_btn.click(
fn=load_config_from_file,
inputs={config_file},
outputs=[
input_group, img_gsd, scenario_name, ihaze, altitude_m, ground_range_m, aircraft_speed_m_Per_s, target_reflectance,
target_temperature_K, bkgd_reflectance, bkgd_temperature_K, ha_windspeed_m_Per_s, cn2at1m, sensor_name,
D_m, f_m, px_m, optTransWavelengths_str, opticsTransmission_str, eta, int_time_s, dark_current, read_noise,
max_N, bit_depth, max_well_fill, sx, sy, dax, day, qe_str, qewavelengths_str
]
)
submit_btn.click(
fn=submit,
inputs={
input_img, img_gsd, scenario_name, ihaze, altitude_m, ground_range_m, aircraft_speed_m_Per_s, target_reflectance,
target_temperature_K, bkgd_reflectance, bkgd_temperature_K, ha_windspeed_m_Per_s, cn2at1m, sensor_name,
D_m, f_m, px_m, optTransWavelengths_str, opticsTransmission_str, eta, int_time_s, dark_current, read_noise,
max_N, bit_depth, max_well_fill, sx, sy, dax, day, qe_str, qewavelengths_str
},
outputs=[out_img, out_config],
)
demo.launch(show_error=True)