weiren119's picture
Feat: app.py
34acdd0
raw
history blame
15.3 kB
#!/usr/bin/env python3
"""
Copyright (c) 2020, Carleton University Biomedical Informatics Collaboratory
This source code is licensed under the MIT license found in the
LICENSE file in the root directory of this source tree.
"""
import pathlib
import json
import os
import subprocess as sp
import tempfile
from typing import List, Callable
from tqdm import tqdm
import numpy as np
from interfaces import AudiogramDict, AudiogramAnnotationDict, ThresholdDict
from digitizer.report_components.grid import Grid
from digitizer.report_components.label import Label
from digitizer.report_components.symbol import Symbol
from digitizer.report_components.report import Report
import utils.audiology as Audiology
from utils.geometry import compute_rotation_angle, apply_rotation
DIR = os.path.join(pathlib.Path(__file__).parent.absolute(), "..") # current directory
def detect_audiograms(filepath: str, weights: str, device: str = "cpu") -> List[AudiogramDict]:
"""Runs the audiogram detector.
The detector is run as a subprocess.
Parameters
----------
filepath : str
Path to the image on which the detector is to be run.
weights : str
Path to the file holding the weights of the neural network (detector).
device : str
"cpu" or "gpu"
Returns
-------
List[AudiogramDict]
The AudiogramDict corresponding to the audiograms detected in the report.
"""
subprocess = sp.Popen([
"python3",
f"{os.path.join(DIR, 'digitizer/yolov5/detect_audiograms.py')}",
"--source", f"{filepath}",
"--weights", weights,
"--device", device
], stdout=sp.PIPE) # TODO timeout should be an environment variable
output = subprocess.stdout.read().decode("utf-8")
audiograms = json.loads(output.split("$$$")[1])
return audiograms
def detect_labels(filepath: str, weights: str, audiogram_coordinates: dict, correction_angle: float, device: str = "cpu") -> List[Label]:
"""Runs the label detector.
The detector is run as a subprocess.
Parameters
----------
filepath : str
Path to the image on which the detector is to be run.
audiogram_coordinates: dict
The coordinates of the audiogram { "x": int, "y": int } needed to convert the label locations
with respect to the top-left corner of the bounding audiogram to relative to the top-left corner
of the report.
correction_angle: float
The correction angle in degrees that was applied to the audiogram, so that it can be reversed to
get the coordinates of the label with respect to the top-left corner of the original unrotated report.
weights : str
Path to the file holding the weights of the neural network (detector).
device : str
"cpu" or "gpu"
Returns
-------
List[Label]
A list of Label objects (NOT LabelDict).
"""
subprocess = sp.Popen([
"python3",
os.path.join(DIR, "digitizer/yolov5/detect_labels.py"),
"--source", f"{filepath}",
"--weights", weights,
"--device", device
], stdout=sp.PIPE)
output = subprocess.stdout.read().decode("utf-8")
parsed = json.loads(output.split("$$$")[1])
label_dicts = parsed
labels = [Label(label, audiogram_coordinates, correction_angle) for label in parsed]
return labels
def detect_symbols(filepath: str, weights: str, audiogram_coordinates: dict, correction_angle: float, device: str = "cpu") -> List[Symbol]:
"""Runs the symbol detector.
The detector is run as a subprocess.
Parameters
----------
filepath : str
Path to the image on which the detector is to be run.
audiogram_coordinates: dict
The coordinates of the audiogram { "x": int, "y": int } needed to convert the label locations
with respect to the top-left corner of the bounding audiogram to relative to the top-left corner
of the report.
correction_angle: float
The correction angle in degrees that was applied to the audiogram, so that it can be reversed to
get the coordinates of the label with respect to the top-left corner of the original unrotated report.
weights : str
Path to the file holding the weights of the neural network (detector).
device : str
"cpu" or "gpu"
Returns
-------
List[Label]
A list of Symbol objects (NOT SymbolDict).
"""
subprocess = sp.Popen([
"python3",
os.path.join(DIR, "digitizer/yolov5/detect_symbols.py"),
"--source", filepath,
"--weights", weights,
"--device", device
], stdout=sp.PIPE)
output = json.loads(subprocess.stdout.read().decode("utf-8").split("$$$")[1])
symbols = [Symbol(detection, audiogram_coordinates, correction_angle) for detection in output]
return symbols
def detect_components(filepath: str, gpu: bool = False) -> List:
"""Invokes the object detectors.
Parameters
----------
filepath : str
Path to the image.
gpu : bool
Whether the GPU should be used (default: False).
Returns
-------
List
A list (of length 0, 1 or 2) of the form
[
{ "audiogram": AudiogramDict, "labels": List[Label], "symbols": List[Symbol] }, # plot 1
{ "audiogram": AudiogramDict, "labels": List[Label], "symbols": List[Symbol] } # plot 2
]
"""
components = []
# Detect audiograms within the report
audiogram_model_weights_path = os.path.join(DIR, "..", "models/audiograms/latest/weights/best.pt")
audiograms = detect_audiograms(f"{filepath}", audiogram_model_weights_path)
# If no audiogram is detected, return...
if len(audiograms) == 0:
return components
# Iterate through every audiogram in the report
for i, audiogram in enumerate(audiograms):
components.append({})
# Load the report
report = Report(filename=filepath)
# Generate a cropped version of the report around the detected audiogram
report = report.crop(
audiogram["boundingBox"]["x"],
audiogram["boundingBox"]["y"],
audiogram["boundingBox"]["x"] + audiogram["boundingBox"]["width"],
audiogram["boundingBox"]["y"] + audiogram["boundingBox"]["height"]
)
# Create a temporary file
cropped_file = tempfile.NamedTemporaryFile(suffix=".jpg")
# Correct for rotation
lines = report.detect_lines(threshold=200)
perpendicular_lines = [
line for line in lines
if line.has_a_perpendicular_line(lines)
and (abs(line.get_angle() - 90) < 10
or abs(line.get_angle()) < 10)
]
correction_angle = compute_rotation_angle(perpendicular_lines)
audiogram["correctionAngle"] = correction_angle
report = report.rotate(correction_angle)
report.save(cropped_file.name)
audiogram_coordinates = {
"x": audiogram["boundingBox"]["x"],
"y": audiogram["boundingBox"]["y"]
}
components[i]["audiogram"] = audiogram
labels_model_weights_path = os.path.join(DIR, "..", "models/labels/latest/weights/best.pt")
components[i]["labels"] = detect_labels(cropped_file.name, labels_model_weights_path, audiogram_coordinates, correction_angle)
symbols_model_weights_path = os.path.join(DIR, "..", "models/symbols/latest/weights/best.pt")
components[i]["symbols"] = detect_symbols(cropped_file.name, symbols_model_weights_path, audiogram_coordinates, correction_angle)
return components
def generate_partial_annotation(filepath: str, gpu: bool = False) -> List[AudiogramAnnotationDict]:
"""Generates a seed annotation to be completed in the nihl portal.
It is ``partial`` because it does not locate the corners of the audiogram.
Parameters
----------
filepath : str
Path to the file for which an initial annotation is to b
gpu : bool
Whether the gpu should be used.
Returns
-------
List[AudiogramAnnotationDict]
An Annotation dict.
"""
components = detect_components(filepath, gpu=gpu)
audiograms = []
for i in range(len(components)):
audiogram = components[i]["audiogram"]
audiogram["labels"] = [label.to_dict() for label in components[i]["labels"]]
audiogram["symbols"] = [symbol.to_dict() for symbol in components[i]["symbols"]]
audiogram["corners"] = [] # these are not located by the algorithm
audiograms.append(audiogram)
return audiograms
def extract_thresholds(filepath: str, gpu: bool = False) -> List[ThresholdDict]:
"""Extracts the thresholds from the report.
parameters
----------
filepath : str
Path to the file for which an initial annotation is to b
gpu : bool
Whether the gpu should be used.
Returns
-------
list[ThresholdDict]
A list of thresholds.
"""
components = detect_components(filepath, gpu=gpu)
thresholds = []
# For each audiogram, extract the thresholds and append them to the
# thresholds list
for i in range(len(components)):
audiogram = components[i]["audiogram"]
labels = components[i]["labels"]
symbols = components[i]["symbols"]
report = Report(filename=filepath)
report = report.crop(
audiogram["boundingBox"]["x"],
audiogram["boundingBox"]["y"],
audiogram["boundingBox"]["x"] + audiogram["boundingBox"]["width"],
audiogram["boundingBox"]["y"] + audiogram["boundingBox"]["height"]
)
report = report.rotate(audiogram["correctionAngle"])
try:
grid = Grid(report, labels)
except Exception as e:
continue
thresholds += [{
"ear": symbol.ear,
"conduction": symbol.conduction,
"masking": symbol.masking,
"measurementType": Audiology.stringify_measurement(symbol.to_dict()),
"frequency": grid.get_snapped_frequency(symbol),
"threshold": grid.get_snapped_threshold(symbol),
"response": True # IMPORTANT: assume that a response was obtain for measurements
}
for symbol in symbols
]
return thresholds
def get_correction_angle(corners: List[dict]) -> float:
"""Computes the rotation angle that must be applied based on
corner coordinates to get an unrotated audiogram.
Parameters
----------
corners : List[dict]
A list of corners.
Returns
-------
float
The rotation angle that must be applied to correct for the rotation
of the audiogram.
"""
# sort the corners
corners = sorted(corners, key=lambda c: c["y"])
top_corners = sorted(corners[2:], key=lambda c: c["x"])
bottom_corners = sorted(corners[0:2], key=lambda c: c["x"])
# Find the rotation angle based on the top_corners 2 corners
dx1 = top_corners[1]["x"] - top_corners[0]["x"]
dy1 = top_corners[1]["y"] - top_corners[0]["y"]
angle1 = np.arcsin(abs(dy1)/abs(dx1))
# Repeat for the bottom_corners angles
dx2 = bottom_corners[1]["x"] - bottom_corners[0]["x"]
dy2 = bottom_corners[1]["y"] - bottom_corners[0]["y"]
angle2 = np.arcsin(abs(dy2)/abs(dx2))
return np.sign(dy1) * np.mean([angle1, angle2])
def get_conversion_maps(corners: List[dict]) -> List[Callable]:
"""Computes the functions that map pixel coordinates to frequency-threshold coordinates
and vice versa.
Parameters
----------
corners : List[dict]
The audiogram corners.
Returns
-------
List[Callable]
A list of lambda functions. These functions all accept a single float argument.
They are in the following order.
1. pixel->frequency
2. pixel->threshold
3. frequency->pixel
4. threshold->pixel
"""
# For x axis
y_sorted_corners = sorted(corners, key=lambda c: c["y"])
top_corners = sorted(y_sorted_corners[0:2], key=lambda c: c["x"])
o_max = Audiology.frequency_to_octave(top_corners[1]["frequency"]) # max octave
x_max = top_corners[1]["x"] # max pixel value
o_min = Audiology.frequency_to_octave(top_corners[0]["frequency"]) # min octave
x_min = top_corners[0]["x"]
frequency_map = lambda p: Audiology.octave_to_frequency(o_min + (o_max - o_min)*(p - x_min)/(x_max - x_min))
inverse_frequency_map = lambda f: x_min + (Audiology.frequency_to_octave(f) - o_min)*(x_max - x_min)/(o_max - o_min)
# For y axis
x_sorted_corners = sorted(corners, key=lambda c: c["x"])
left_corners = sorted(x_sorted_corners[0:2], key=lambda c: c["y"])
t_max = left_corners[1]["threshold"] # max threshold
y_max = left_corners[1]["y"] # max pixel value
t_min = left_corners[0]["threshold"]
y_min = left_corners[0]["y"]
threshold_map = lambda p: t_min + (t_max - t_min)*(p - y_min)/(y_max - y_min)
inverse_threshold_map = lambda t: y_min + (t - t_min)*(y_max - y_min)/(t_max - t_min)
return [frequency_map, threshold_map, inverse_frequency_map, inverse_threshold_map]
def annotation_to_thresholds(audiograms: dict) -> List[ThresholdDict]:
"""Extracts the thresholds from an annotation.
Parameters
----------
audiograms : dict
An annotation.
Returns
-------
List[ThresholdDict]
A list of thresholds
"""
combined_thresholds = []
for audiogram in audiograms:
correction_angle = get_correction_angle(audiogram["corners"])
corners = [apply_rotation(corner, correction_angle) for corner in audiogram["corners"]]
frequency_map, threshold_map, inverse_frequency_map, inverse_threshold_map = get_conversion_maps(corners)
thresholds: List[ThresholdDict] = []
for symbol in audiogram["symbols"]:
symbol_center = {
"x": symbol["boundingBox"]["x"] + symbol["boundingBox"]["width"] / 2,
"y": symbol["boundingBox"]["y"] + symbol["boundingBox"]["height"] / 2,
}
symbol = { **symbol, "boundingBox": symbol_center }
new_symbol = {**symbol, "boundingBox": apply_rotation(symbol["boundingBox"], correction_angle) }
bounding_box = new_symbol["boundingBox"]
ear = "left" if "left" in new_symbol["measurementType"].lower() else "right"
conduction = "air" if "air" in new_symbol["measurementType"].lower() else "bone"
masking = False if "unmasked" in new_symbol["measurementType"].lower() else True
if conduction == "air":
frequency = Audiology.round_frequency(frequency_map(bounding_box["x"]))
else:
frequency = Audiology.round_frequency_bone(frequency_map(bounding_box["x"]), ear)
threshold = Audiology.round_threshold(threshold_map(bounding_box["y"]))
thresholds.append({
"ear": ear,
"conduction": conduction,
"masking": masking,
"frequency": frequency,
"threshold": threshold,
"response": True, # IMPORTANT: assume that a response was measured for threshold
"measurementType": f"{conduction}_{'MASKED' if masking else 'UNMASKED'}_{ear}".upper()
})
combined_thresholds += thresholds
return combined_thresholds