from roboflow import Roboflow import cv2 import json import numpy as np import gradio as gr from sklearn.linear_model import LinearRegression rf = Roboflow(api_key="5uMN18FScJQlsWbgIwrP") project = rf.workspace().project("soil-phosphorus-analysis") model = project.version(1).model # Define the formulas def formula_1(r, g, b): return r / (r + g + b) def formula_2(r, g, b): return (r - g) / (r + g + b) def formula_3(r, g, b): return (r - b) / (r + g + b) def formula_4(r, g, b): return (r - b) / (r + b) def formula_5(r, g, b): return (r - g) / (r + g) def formula_6(r, g, b): return b / (r + g + b) def formula_7(r, g, b): return (2*b) - g - r def formula_8(r, g, b): return (2*r) - g - b formulas = { "R / (R + G + B)": formula_1, "(R - G) / (R + G + B)": formula_2, "(R - B) / (R + G + B)": formula_3, "(R - B) / (R + B)": formula_4, "(R - G) / (R + G)": formula_5, "B / (R + G + B)": formula_6, "(2*B) - G - R": formula_7, "(2*R) - G - B": formula_8 } phosphorus_values = [0, 5, 10, 15, 20, 25] def get_roboflow_predictions(image): # Save the image to a temporary file temp_image_path = "temp_image.jpg" cv2.imwrite(temp_image_path, image) # Get predictions from the Roboflow model predictions = model.predict(temp_image_path, confidence=45, overlap=30).json() return predictions def get_flasks(pred): json_data = pred # Filter out only the flasks flasks = [obj for obj in json_data['predictions'] if obj['class'] == 'flask'] # Sort flasks by 'y' in descending order flasks_sorted_by_y = sorted(flasks, key=lambda obj: -obj['y']) # Group flasks into rows of 6 each (considering the last row might have fewer than 6) rows = [] current_row = [] row_limit = 6 for flask in flasks_sorted_by_y: current_row.append(flask) if len(current_row) == row_limit: rows.append(current_row) current_row = [] if current_row: # Add the last row if it's not empty rows.append(current_row) # Now sort each row by 'x' in ascending order rows_sorted_by_x = [sorted(row, key=lambda obj: -obj['x']) for row in rows] # Flatten the list of rows back into a single list and add the order flasks_ordered = [] order = 1 for row in rows_sorted_by_x: for flask in row: flask['order'] = order flasks_ordered.append(flask) order += 1 # Output the sorted flasks in the desired format formatted_flasks = [ {"order": flask['order'], "x": flask['x'], "y": flask['y'], "width": flask["width"], "height": flask["height"], "detection_id": flask['detection_id'], "class": "flask"} for flask in flasks_ordered ] # Output the formatted flasks flasks = formatted_flasks return flasks def find_rgb_values(image, flasks): flask_rgbs = [] for i, prediction in enumerate(flasks): if prediction.get("class") == "flask": try: # Calculate the center point of the flask using width and height center_x = int(prediction["x"]) center_y = int(prediction["y"]) width = int(prediction["width"]) height = int(prediction["height"]) # Define three points around the center points = [ (center_x, center_y - int(height / 8)), (center_x, center_y ), (center_x, center_y + int(height / 8)) ] # Extract RGB values from the three points rgb_values = [] for point in points: x, y = point if 0 <= y < image.shape[0] and 0 <= x < image.shape[1]: rgb_value = image[y, x] rgb_values.append(rgb_value) else: raise IndexError("Point is out of image bounds") # Calculate the average RGB values avg_rgb = np.mean(rgb_values, axis=0) flask_rgbs.append(avg_rgb) # Draw the bounding box and points on the image top_left = (center_x - width // 2, center_y - height // 2) bottom_right = (center_x + width // 2, center_y + height // 2) cv2.rectangle(image, top_left, bottom_right, (0, 255, 0), 2) # Green bounding box for point in points: cv2.circle(image, point, 5, (0, 0, 255), -1) # Red points except IndexError: # Skip processing faulty coordinates print(f"Ignore flask {i+1} due to faulty coordinates.") return flask_rgbs def predict(image, formula_choice): pred = get_roboflow_predictions(image) flasks = get_flasks(pred) image_path = "temp_image.jpg" cv2.imwrite(image_path, image) image = cv2.imread(image_path) # Check if the image is loaded properly if image is None: raise ValueError(f"Image at path {image_path} could not be loaded.") # Get original dimensions original_height, original_width = image.shape[:2] # Define new dimensions new_width = 600 new_height = 800 # Calculate scale factors scale_x = new_width / original_width scale_y = new_height / original_height # Resize the image image = cv2.resize(image, (new_width, new_height)) # Rescale predictions rescaled_predictions = [] for prediction in flasks: rescaled_prediction = prediction.copy() rescaled_prediction["x"] = int(prediction["x"] * scale_x) rescaled_prediction["y"] = int(prediction["y"] * scale_y) rescaled_prediction["width"] = int(prediction["width"] * scale_x) rescaled_prediction["height"] = int(prediction["height"] * scale_y) rescaled_predictions.append(rescaled_prediction) # Find RGB values from three points in the center of each flask flask_rgbs = find_rgb_values(image, rescaled_predictions) flask_outputs = [] for i, rgb in enumerate(flask_rgbs): flask_outputs.append({"flask_number": i + 1, "rgb_values": rgb}) # Get the chosen formula chosen_formula = formulas.get(formula_choice) if not chosen_formula: raise ValueError("Invalid formula choice.") # Apply the chosen formula to each flask results = [] for flask in flask_outputs: r, g, b = flask["rgb_values"] result = chosen_formula(r, g, b) results.append({"flask_number": flask["flask_number"], "rgb_value": result}) # Separate the first 6 flasks for training training_flasks = results[:6] training_rgb_values = [flask["rgb_value"] for flask in training_flasks] # Create the linear regression model model = LinearRegression() X_train = np.array(training_rgb_values).reshape(-1, 1) y_train = np.array(phosphorus_values) model.fit(X_train, y_train) # Apply the model to the remaining flasks remaining_flasks = results[6:] X_test = np.array([flask["rgb_value"] for flask in remaining_flasks]).reshape(-1, 1) predicted_phosphorus = model.predict(X_test) # Combine the results output_data = [] for flask in training_flasks: output_data.append([ flask["flask_number"], flask["rgb_value"], phosphorus_values[flask["flask_number"] - 1] ]) for i, flask in enumerate(remaining_flasks): output_data.append([ flask["flask_number"], flask["rgb_value"], predicted_phosphorus[i] ]) return image, output_data iface = gr.Interface( fn=predict, inputs=[ gr.Image(type="numpy", label="Upload an image"), gr.Dropdown(choices=list(formulas.keys()), label="Choose a formula"), ], outputs=[ gr.Image(type="numpy", label="Predicted Image"), gr.Dataframe(headers=["Flask Number", "Regressions", "Phosphorus"], label="Predictions"), ], title="Digital Spectrophotometer", description="Upload an image to detect flasks and calculate the Phosphorus Value for each flask.", ) # Add a button for exporting CSV iface.launch(share=True)