|
from roboflow import Roboflow |
|
import cv2 |
|
import json |
|
import numpy as np |
|
import gradio as gr |
|
from sklearn.linear_model import LinearRegression |
|
|
|
rf = Roboflow(api_key="5uMN18FScJQlsWbgIwrP") |
|
project = rf.workspace().project("soil-phosphorus-analysis") |
|
model = project.version(1).model |
|
|
|
|
|
def formula_1(r, g, b): |
|
return r / (r + g + b) |
|
|
|
def formula_2(r, g, b): |
|
return (r - g) / (r + g + b) |
|
|
|
def formula_3(r, g, b): |
|
return (r - b) / (r + g + b) |
|
|
|
def formula_4(r, g, b): |
|
return (r - b) / (r + b) |
|
|
|
def formula_5(r, g, b): |
|
return (r - g) / (r + g) |
|
|
|
def formula_6(r, g, b): |
|
return b / (r + g + b) |
|
|
|
def formula_7(r, g, b): |
|
return (2*b) - g - r |
|
|
|
def formula_8(r, g, b): |
|
return (2*r) - g - b |
|
|
|
formulas = { |
|
"R / (R + G + B)": formula_1, |
|
"(R - G) / (R + G + B)": formula_2, |
|
"(R - B) / (R + G + B)": formula_3, |
|
"(R - B) / (R + B)": formula_4, |
|
"(R - G) / (R + G)": formula_5, |
|
"B / (R + G + B)": formula_6, |
|
"(2*B) - G - R": formula_7, |
|
"(2*R) - G - B": formula_8 |
|
} |
|
|
|
phosphorus_values = [0, 5, 10, 15, 20, 25] |
|
|
|
def get_roboflow_predictions(image): |
|
|
|
temp_image_path = "temp_image.jpg" |
|
cv2.imwrite(temp_image_path, image) |
|
|
|
|
|
predictions = model.predict(temp_image_path, confidence=45, overlap=30).json() |
|
|
|
return predictions |
|
|
|
def get_flasks(pred): |
|
json_data = pred |
|
|
|
|
|
flasks = [obj for obj in json_data['predictions'] if obj['class'] == 'flask'] |
|
|
|
|
|
flasks_sorted_by_y = sorted(flasks, key=lambda obj: -obj['y']) |
|
|
|
|
|
rows = [] |
|
current_row = [] |
|
row_limit = 6 |
|
for flask in flasks_sorted_by_y: |
|
current_row.append(flask) |
|
if len(current_row) == row_limit: |
|
rows.append(current_row) |
|
current_row = [] |
|
if current_row: |
|
rows.append(current_row) |
|
|
|
|
|
rows_sorted_by_x = [sorted(row, key=lambda obj: -obj['x']) for row in rows] |
|
|
|
|
|
flasks_ordered = [] |
|
order = 1 |
|
for row in rows_sorted_by_x: |
|
for flask in row: |
|
flask['order'] = order |
|
flasks_ordered.append(flask) |
|
order += 1 |
|
|
|
|
|
formatted_flasks = [ |
|
{"order": flask['order'], "x": flask['x'], "y": flask['y'], "width": flask["width"], |
|
"height": flask["height"], "detection_id": flask['detection_id'], "class": "flask"} |
|
for flask in flasks_ordered |
|
] |
|
|
|
|
|
flasks = formatted_flasks |
|
return flasks |
|
|
|
|
|
def find_rgb_values(image, flasks): |
|
flask_rgbs = [] |
|
|
|
for i, prediction in enumerate(flasks): |
|
if prediction.get("class") == "flask": |
|
try: |
|
|
|
center_x = int(prediction["x"]) |
|
center_y = int(prediction["y"]) |
|
|
|
width = int(prediction["width"]) |
|
height = int(prediction["height"]) |
|
|
|
|
|
points = [ |
|
(center_x, center_y - int(height / 8)), |
|
(center_x, center_y ), |
|
(center_x, center_y + int(height / 8)) |
|
] |
|
|
|
|
|
rgb_values = [] |
|
for point in points: |
|
x, y = point |
|
if 0 <= y < image.shape[0] and 0 <= x < image.shape[1]: |
|
rgb_value = image[y, x] |
|
rgb_values.append(rgb_value) |
|
else: |
|
raise IndexError("Point is out of image bounds") |
|
|
|
|
|
avg_rgb = np.mean(rgb_values, axis=0) |
|
|
|
flask_rgbs.append(avg_rgb) |
|
|
|
|
|
top_left = (center_x - width // 2, center_y - height // 2) |
|
bottom_right = (center_x + width // 2, center_y + height // 2) |
|
cv2.rectangle(image, top_left, bottom_right, (0, 255, 0), 2) |
|
|
|
for point in points: |
|
cv2.circle(image, point, 5, (0, 0, 255), -1) |
|
|
|
except IndexError: |
|
|
|
print(f"Ignore flask {i+1} due to faulty coordinates.") |
|
|
|
return flask_rgbs |
|
|
|
def predict(image, formula_choice): |
|
pred = get_roboflow_predictions(image) |
|
flasks = get_flasks(pred) |
|
|
|
image_path = "temp_image.jpg" |
|
cv2.imwrite(image_path, image) |
|
|
|
image = cv2.imread(image_path) |
|
|
|
|
|
if image is None: |
|
raise ValueError(f"Image at path {image_path} could not be loaded.") |
|
|
|
|
|
original_height, original_width = image.shape[:2] |
|
|
|
|
|
new_width = 600 |
|
new_height = 800 |
|
|
|
|
|
scale_x = new_width / original_width |
|
scale_y = new_height / original_height |
|
|
|
|
|
image = cv2.resize(image, (new_width, new_height)) |
|
|
|
|
|
rescaled_predictions = [] |
|
for prediction in flasks: |
|
rescaled_prediction = prediction.copy() |
|
rescaled_prediction["x"] = int(prediction["x"] * scale_x) |
|
rescaled_prediction["y"] = int(prediction["y"] * scale_y) |
|
rescaled_prediction["width"] = int(prediction["width"] * scale_x) |
|
rescaled_prediction["height"] = int(prediction["height"] * scale_y) |
|
rescaled_predictions.append(rescaled_prediction) |
|
|
|
|
|
|
|
flask_rgbs = find_rgb_values(image, rescaled_predictions) |
|
|
|
flask_outputs = [] |
|
for i, rgb in enumerate(flask_rgbs): |
|
flask_outputs.append({"flask_number": i + 1, "rgb_values": rgb}) |
|
|
|
|
|
chosen_formula = formulas.get(formula_choice) |
|
|
|
if not chosen_formula: |
|
raise ValueError("Invalid formula choice.") |
|
|
|
|
|
results = [] |
|
for flask in flask_outputs: |
|
r, g, b = flask["rgb_values"] |
|
result = chosen_formula(r, g, b) |
|
results.append({"flask_number": flask["flask_number"], "rgb_value": result}) |
|
|
|
|
|
training_flasks = results[:6] |
|
training_rgb_values = [flask["rgb_value"] for flask in training_flasks] |
|
|
|
|
|
model = LinearRegression() |
|
X_train = np.array(training_rgb_values).reshape(-1, 1) |
|
y_train = np.array(phosphorus_values) |
|
model.fit(X_train, y_train) |
|
|
|
|
|
remaining_flasks = results[6:] |
|
X_test = np.array([flask["rgb_value"] for flask in remaining_flasks]).reshape(-1, 1) |
|
predicted_phosphorus = model.predict(X_test) |
|
|
|
|
|
output_data = [] |
|
for flask in training_flasks: |
|
output_data.append([ |
|
flask["flask_number"], |
|
flask["rgb_value"], |
|
phosphorus_values[flask["flask_number"] - 1] |
|
]) |
|
|
|
for i, flask in enumerate(remaining_flasks): |
|
output_data.append([ |
|
flask["flask_number"], |
|
flask["rgb_value"], |
|
predicted_phosphorus[i] |
|
]) |
|
|
|
return image, output_data |
|
|
|
iface = gr.Interface( |
|
fn=predict, |
|
inputs=[ |
|
gr.Image(type="numpy", label="Upload an image"), |
|
gr.Dropdown(choices=list(formulas.keys()), label="Choose a formula"), |
|
], |
|
outputs=[ |
|
gr.Image(type="numpy", label="Predicted Image"), |
|
gr.Dataframe(headers=["Flask Number", "Regressions", "Phosphorus"], label="Predictions"), |
|
], |
|
title="Digital Spectrophotometer", |
|
description="Upload an image to detect flasks and calculate the Phosphorus Value for each flask.", |
|
) |
|
|
|
|
|
iface.launch(share=True) |
|
|
|
|