Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
""" | |
Copyright (c) 2020, Carleton University Biomedical Informatics Collaboratory | |
This source code is licensed under the MIT license found in the | |
LICENSE file in the root directory of this source tree. | |
""" | |
import pathlib | |
import json | |
import os | |
import subprocess as sp | |
import tempfile | |
from typing import List, Callable | |
from tqdm import tqdm | |
import numpy as np | |
from interfaces import AudiogramDict, AudiogramAnnotationDict, ThresholdDict | |
from digitizer.report_components.grid import Grid | |
from digitizer.report_components.label import Label | |
from digitizer.report_components.symbol import Symbol | |
from digitizer.report_components.report import Report | |
import utils.audiology as Audiology | |
from utils.geometry import compute_rotation_angle, apply_rotation | |
DIR = os.path.join(pathlib.Path(__file__).parent.absolute(), "..") # current directory | |
def detect_audiograms(filepath: str, weights: str, device: str = "cpu") -> List[AudiogramDict]: | |
"""Runs the audiogram detector. | |
The detector is run as a subprocess. | |
Parameters | |
---------- | |
filepath : str | |
Path to the image on which the detector is to be run. | |
weights : str | |
Path to the file holding the weights of the neural network (detector). | |
device : str | |
"cpu" or "gpu" | |
Returns | |
------- | |
List[AudiogramDict] | |
The AudiogramDict corresponding to the audiograms detected in the report. | |
""" | |
subprocess = sp.Popen([ | |
"python3", | |
f"{os.path.join(DIR, 'digitizer/yolov5/detect_audiograms.py')}", | |
"--source", f"{filepath}", | |
"--weights", weights, | |
"--device", device | |
], stdout=sp.PIPE) # TODO timeout should be an environment variable | |
output = subprocess.stdout.read().decode("utf-8") | |
audiograms = json.loads(output.split("$$$")[1]) | |
return audiograms | |
def detect_labels(filepath: str, weights: str, audiogram_coordinates: dict, correction_angle: float, device: str = "cpu") -> List[Label]: | |
"""Runs the label detector. | |
The detector is run as a subprocess. | |
Parameters | |
---------- | |
filepath : str | |
Path to the image on which the detector is to be run. | |
audiogram_coordinates: dict | |
The coordinates of the audiogram { "x": int, "y": int } needed to convert the label locations | |
with respect to the top-left corner of the bounding audiogram to relative to the top-left corner | |
of the report. | |
correction_angle: float | |
The correction angle in degrees that was applied to the audiogram, so that it can be reversed to | |
get the coordinates of the label with respect to the top-left corner of the original unrotated report. | |
weights : str | |
Path to the file holding the weights of the neural network (detector). | |
device : str | |
"cpu" or "gpu" | |
Returns | |
------- | |
List[Label] | |
A list of Label objects (NOT LabelDict). | |
""" | |
subprocess = sp.Popen([ | |
"python3", | |
os.path.join(DIR, "digitizer/yolov5/detect_labels.py"), | |
"--source", f"{filepath}", | |
"--weights", weights, | |
"--device", device | |
], stdout=sp.PIPE) | |
output = subprocess.stdout.read().decode("utf-8") | |
parsed = json.loads(output.split("$$$")[1]) | |
label_dicts = parsed | |
labels = [Label(label, audiogram_coordinates, correction_angle) for label in parsed] | |
return labels | |
def detect_symbols(filepath: str, weights: str, audiogram_coordinates: dict, correction_angle: float, device: str = "cpu") -> List[Symbol]: | |
"""Runs the symbol detector. | |
The detector is run as a subprocess. | |
Parameters | |
---------- | |
filepath : str | |
Path to the image on which the detector is to be run. | |
audiogram_coordinates: dict | |
The coordinates of the audiogram { "x": int, "y": int } needed to convert the label locations | |
with respect to the top-left corner of the bounding audiogram to relative to the top-left corner | |
of the report. | |
correction_angle: float | |
The correction angle in degrees that was applied to the audiogram, so that it can be reversed to | |
get the coordinates of the label with respect to the top-left corner of the original unrotated report. | |
weights : str | |
Path to the file holding the weights of the neural network (detector). | |
device : str | |
"cpu" or "gpu" | |
Returns | |
------- | |
List[Label] | |
A list of Symbol objects (NOT SymbolDict). | |
""" | |
subprocess = sp.Popen([ | |
"python3", | |
os.path.join(DIR, "digitizer/yolov5/detect_symbols.py"), | |
"--source", filepath, | |
"--weights", weights, | |
"--device", device | |
], stdout=sp.PIPE) | |
output = json.loads(subprocess.stdout.read().decode("utf-8").split("$$$")[1]) | |
symbols = [Symbol(detection, audiogram_coordinates, correction_angle) for detection in output] | |
return symbols | |
def detect_components(filepath: str, gpu: bool = False) -> List: | |
"""Invokes the object detectors. | |
Parameters | |
---------- | |
filepath : str | |
Path to the image. | |
gpu : bool | |
Whether the GPU should be used (default: False). | |
Returns | |
------- | |
List | |
A list (of length 0, 1 or 2) of the form | |
[ | |
{ "audiogram": AudiogramDict, "labels": List[Label], "symbols": List[Symbol] }, # plot 1 | |
{ "audiogram": AudiogramDict, "labels": List[Label], "symbols": List[Symbol] } # plot 2 | |
] | |
""" | |
components = [] | |
# Detect audiograms within the report | |
audiogram_model_weights_path = os.path.join(DIR, "..", "models/audiograms/latest/weights/best.pt") | |
audiograms = detect_audiograms(f"{filepath}", audiogram_model_weights_path) | |
# If no audiogram is detected, return... | |
if len(audiograms) == 0: | |
return components | |
# Iterate through every audiogram in the report | |
for i, audiogram in enumerate(audiograms): | |
components.append({}) | |
# Load the report | |
report = Report(filename=filepath) | |
# Generate a cropped version of the report around the detected audiogram | |
report = report.crop( | |
audiogram["boundingBox"]["x"], | |
audiogram["boundingBox"]["y"], | |
audiogram["boundingBox"]["x"] + audiogram["boundingBox"]["width"], | |
audiogram["boundingBox"]["y"] + audiogram["boundingBox"]["height"] | |
) | |
# Create a temporary file | |
cropped_file = tempfile.NamedTemporaryFile(suffix=".jpg") | |
# Correct for rotation | |
lines = report.detect_lines(threshold=200) | |
perpendicular_lines = [ | |
line for line in lines | |
if line.has_a_perpendicular_line(lines) | |
and (abs(line.get_angle() - 90) < 10 | |
or abs(line.get_angle()) < 10) | |
] | |
correction_angle = compute_rotation_angle(perpendicular_lines) | |
audiogram["correctionAngle"] = correction_angle | |
report = report.rotate(correction_angle) | |
report.save(cropped_file.name) | |
audiogram_coordinates = { | |
"x": audiogram["boundingBox"]["x"], | |
"y": audiogram["boundingBox"]["y"] | |
} | |
components[i]["audiogram"] = audiogram | |
labels_model_weights_path = os.path.join(DIR, "..", "models/labels/latest/weights/best.pt") | |
components[i]["labels"] = detect_labels(cropped_file.name, labels_model_weights_path, audiogram_coordinates, correction_angle) | |
symbols_model_weights_path = os.path.join(DIR, "..", "models/symbols/latest/weights/best.pt") | |
components[i]["symbols"] = detect_symbols(cropped_file.name, symbols_model_weights_path, audiogram_coordinates, correction_angle) | |
return components | |
def generate_partial_annotation(filepath: str, gpu: bool = False) -> List[AudiogramAnnotationDict]: | |
"""Generates a seed annotation to be completed in the nihl portal. | |
It is ``partial`` because it does not locate the corners of the audiogram. | |
Parameters | |
---------- | |
filepath : str | |
Path to the file for which an initial annotation is to b | |
gpu : bool | |
Whether the gpu should be used. | |
Returns | |
------- | |
List[AudiogramAnnotationDict] | |
An Annotation dict. | |
""" | |
components = detect_components(filepath, gpu=gpu) | |
audiograms = [] | |
for i in range(len(components)): | |
audiogram = components[i]["audiogram"] | |
audiogram["labels"] = [label.to_dict() for label in components[i]["labels"]] | |
audiogram["symbols"] = [symbol.to_dict() for symbol in components[i]["symbols"]] | |
audiogram["corners"] = [] # these are not located by the algorithm | |
audiograms.append(audiogram) | |
return audiograms | |
def extract_thresholds(filepath: str, gpu: bool = False) -> List[ThresholdDict]: | |
"""Extracts the thresholds from the report. | |
parameters | |
---------- | |
filepath : str | |
Path to the file for which an initial annotation is to b | |
gpu : bool | |
Whether the gpu should be used. | |
Returns | |
------- | |
list[ThresholdDict] | |
A list of thresholds. | |
""" | |
components = detect_components(filepath, gpu=gpu) | |
thresholds = [] | |
# For each audiogram, extract the thresholds and append them to the | |
# thresholds list | |
for i in range(len(components)): | |
audiogram = components[i]["audiogram"] | |
labels = components[i]["labels"] | |
symbols = components[i]["symbols"] | |
report = Report(filename=filepath) | |
report = report.crop( | |
audiogram["boundingBox"]["x"], | |
audiogram["boundingBox"]["y"], | |
audiogram["boundingBox"]["x"] + audiogram["boundingBox"]["width"], | |
audiogram["boundingBox"]["y"] + audiogram["boundingBox"]["height"] | |
) | |
report = report.rotate(audiogram["correctionAngle"]) | |
try: | |
grid = Grid(report, labels) | |
except Exception as e: | |
continue | |
thresholds += [{ | |
"ear": symbol.ear, | |
"conduction": symbol.conduction, | |
"masking": symbol.masking, | |
"measurementType": Audiology.stringify_measurement(symbol.to_dict()), | |
"frequency": grid.get_snapped_frequency(symbol), | |
"threshold": grid.get_snapped_threshold(symbol), | |
"response": True # IMPORTANT: assume that a response was obtain for measurements | |
} | |
for symbol in symbols | |
] | |
return thresholds | |
def get_correction_angle(corners: List[dict]) -> float: | |
"""Computes the rotation angle that must be applied based on | |
corner coordinates to get an unrotated audiogram. | |
Parameters | |
---------- | |
corners : List[dict] | |
A list of corners. | |
Returns | |
------- | |
float | |
The rotation angle that must be applied to correct for the rotation | |
of the audiogram. | |
""" | |
# sort the corners | |
corners = sorted(corners, key=lambda c: c["y"]) | |
top_corners = sorted(corners[2:], key=lambda c: c["x"]) | |
bottom_corners = sorted(corners[0:2], key=lambda c: c["x"]) | |
# Find the rotation angle based on the top_corners 2 corners | |
dx1 = top_corners[1]["x"] - top_corners[0]["x"] | |
dy1 = top_corners[1]["y"] - top_corners[0]["y"] | |
angle1 = np.arcsin(abs(dy1)/abs(dx1)) | |
# Repeat for the bottom_corners angles | |
dx2 = bottom_corners[1]["x"] - bottom_corners[0]["x"] | |
dy2 = bottom_corners[1]["y"] - bottom_corners[0]["y"] | |
angle2 = np.arcsin(abs(dy2)/abs(dx2)) | |
return np.sign(dy1) * np.mean([angle1, angle2]) | |
def get_conversion_maps(corners: List[dict]) -> List[Callable]: | |
"""Computes the functions that map pixel coordinates to frequency-threshold coordinates | |
and vice versa. | |
Parameters | |
---------- | |
corners : List[dict] | |
The audiogram corners. | |
Returns | |
------- | |
List[Callable] | |
A list of lambda functions. These functions all accept a single float argument. | |
They are in the following order. | |
1. pixel->frequency | |
2. pixel->threshold | |
3. frequency->pixel | |
4. threshold->pixel | |
""" | |
# For x axis | |
y_sorted_corners = sorted(corners, key=lambda c: c["y"]) | |
top_corners = sorted(y_sorted_corners[0:2], key=lambda c: c["x"]) | |
o_max = Audiology.frequency_to_octave(top_corners[1]["frequency"]) # max octave | |
x_max = top_corners[1]["x"] # max pixel value | |
o_min = Audiology.frequency_to_octave(top_corners[0]["frequency"]) # min octave | |
x_min = top_corners[0]["x"] | |
frequency_map = lambda p: Audiology.octave_to_frequency(o_min + (o_max - o_min)*(p - x_min)/(x_max - x_min)) | |
inverse_frequency_map = lambda f: x_min + (Audiology.frequency_to_octave(f) - o_min)*(x_max - x_min)/(o_max - o_min) | |
# For y axis | |
x_sorted_corners = sorted(corners, key=lambda c: c["x"]) | |
left_corners = sorted(x_sorted_corners[0:2], key=lambda c: c["y"]) | |
t_max = left_corners[1]["threshold"] # max threshold | |
y_max = left_corners[1]["y"] # max pixel value | |
t_min = left_corners[0]["threshold"] | |
y_min = left_corners[0]["y"] | |
threshold_map = lambda p: t_min + (t_max - t_min)*(p - y_min)/(y_max - y_min) | |
inverse_threshold_map = lambda t: y_min + (t - t_min)*(y_max - y_min)/(t_max - t_min) | |
return [frequency_map, threshold_map, inverse_frequency_map, inverse_threshold_map] | |
def annotation_to_thresholds(audiograms: dict) -> List[ThresholdDict]: | |
"""Extracts the thresholds from an annotation. | |
Parameters | |
---------- | |
audiograms : dict | |
An annotation. | |
Returns | |
------- | |
List[ThresholdDict] | |
A list of thresholds | |
""" | |
combined_thresholds = [] | |
for audiogram in audiograms: | |
correction_angle = get_correction_angle(audiogram["corners"]) | |
corners = [apply_rotation(corner, correction_angle) for corner in audiogram["corners"]] | |
frequency_map, threshold_map, inverse_frequency_map, inverse_threshold_map = get_conversion_maps(corners) | |
thresholds: List[ThresholdDict] = [] | |
for symbol in audiogram["symbols"]: | |
symbol_center = { | |
"x": symbol["boundingBox"]["x"] + symbol["boundingBox"]["width"] / 2, | |
"y": symbol["boundingBox"]["y"] + symbol["boundingBox"]["height"] / 2, | |
} | |
symbol = { **symbol, "boundingBox": symbol_center } | |
new_symbol = {**symbol, "boundingBox": apply_rotation(symbol["boundingBox"], correction_angle) } | |
bounding_box = new_symbol["boundingBox"] | |
ear = "left" if "left" in new_symbol["measurementType"].lower() else "right" | |
conduction = "air" if "air" in new_symbol["measurementType"].lower() else "bone" | |
masking = False if "unmasked" in new_symbol["measurementType"].lower() else True | |
if conduction == "air": | |
frequency = Audiology.round_frequency(frequency_map(bounding_box["x"])) | |
else: | |
frequency = Audiology.round_frequency_bone(frequency_map(bounding_box["x"]), ear) | |
threshold = Audiology.round_threshold(threshold_map(bounding_box["y"])) | |
thresholds.append({ | |
"ear": ear, | |
"conduction": conduction, | |
"masking": masking, | |
"frequency": frequency, | |
"threshold": threshold, | |
"response": True, # IMPORTANT: assume that a response was measured for threshold | |
"measurementType": f"{conduction}_{'MASKED' if masking else 'UNMASKED'}_{ear}".upper() | |
}) | |
combined_thresholds += thresholds | |
return combined_thresholds | |