Matiullah00999's picture
Upload 3 files
a5399c2 verified
import cv2
import numpy as np
import gradio as gr
from skimage.filters import threshold_multiotsu
from scipy.ndimage import binary_fill_holes, binary_opening
import matplotlib.pyplot as plt
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def enhanced_aggregate_segmentation(image):
"""Advanced concrete aggregate segmentation pipeline"""
try:
# Convert to grayscale if needed
if len(image.shape) == 3:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
else:
gray = image.copy()
# Step 1: Contrast enhancement
clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8))
enhanced = clahe.apply(gray)
# Step 2: Multi-level Otsu thresholding
thresholds = threshold_multiotsu(enhanced, classes=3)
regions = np.digitize(enhanced, bins=thresholds)
# Step 3: Aggregate mask creation
aggregate_mask = (regions == 2).astype(np.uint8) # Assuming aggregates are brightest
# Step 4: Morphological refinement
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5))
cleaned = cv2.morphologyEx(aggregate_mask*255, cv2.MORPH_CLOSE, kernel, iterations=2)
cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_OPEN, kernel, iterations=1)
filled = binary_fill_holes(cleaned > 127)
# Step 5: Final mask processing
final_mask = binary_opening(filled, structure=np.ones((3,3))).astype(np.uint8)
# Create output images
if len(image.shape) == 3:
aggregates = image.copy()
aggregates[final_mask == 0] = 0
mask_vis = cv2.applyColorMap((final_mask*255).astype(np.uint8), cv2.COLORMAP_JET)
mask_vis = cv2.addWeighted(image, 0.7, mask_vis, 0.3, 0)
else:
aggregates = np.zeros_like(image)
aggregates[final_mask == 1] = image[final_mask == 1]
mask_vis = cv2.applyColorMap((final_mask*255).astype(np.uint8), cv2.COLORMAP_JET)
return aggregates, mask_vis
except Exception as e:
logger.error(f"Segmentation error: {str(e)}")
error_img = np.zeros_like(image) if len(image.shape) == 2 else np.zeros((*image.shape[:2], 3))
return error_img, error_img
def process_image(image):
"""Process image for Gradio interface"""
try:
# Convert from Gradio's RGB to OpenCV's BGR
if isinstance(image, np.ndarray) and image.shape[2] == 3:
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
aggregates, mask_vis = enhanced_aggregate_segmentation(image)
# Convert back to RGB for display
if len(aggregates.shape) == 3:
aggregates = cv2.cvtColor(aggregates, cv2.COLOR_BGR2RGB)
mask_vis = cv2.cvtColor(mask_vis, cv2.COLOR_BGR2RGB)
return aggregates, mask_vis
except Exception as e:
logger.error(f"Processing error: {str(e)}")
error_img = np.zeros((512,512,3), dtype=np.uint8)
return error_img, error_img
# Create Gradio interface
with gr.Blocks(title="Concrete Aggregate Analyzer") as app:
gr.Markdown("""
## Concrete Aggregate Segmentation Analyzer
Upload an image of concrete surface for aggregate analysis
""")
with gr.Row():
with gr.Column():
input_img = gr.Image(label="Input Image", type="numpy")
process_btn = gr.Button("Analyze", variant="primary")
with gr.Column():
aggregates_img = gr.Image(label="Detected Aggregates")
mask_img = gr.Image(label="Segmentation Visualization")
debug_output = gr.Textbox(label="Processing Log")
def process_with_logging(img):
logger.handlers.clear()
log_messages = []
class LogHandler(logging.Handler):
def emit(self, record):
log_messages.append(f"{record.levelname}: {record.getMessage()}")
logger.addHandler(LogHandler())
results = process_image(img)
return (*results, "\n".join(log_messages))
process_btn.click(
fn=process_with_logging,
inputs=input_img,
outputs=[aggregates_img, mask_img, debug_output]
)
if __name__ == "__main__":
logger.info("Starting concrete aggregate analyzer...")
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False
)