Spaces:
Sleeping
Sleeping
| # app.py | |
| from __future__ import print_function, division, absolute_import | |
| import streamlit as st | |
| import torch | |
| import torch.nn as nn | |
| from torchvision import transforms | |
| from PIL import Image, ImageDraw | |
| from ultralytics import YOLO | |
| from streamlit_drawable_canvas import st_canvas | |
| import os | |
| # --- Define Basic Components for InceptionResNetV2 --- | |
| class BasicConv2d(nn.Module): | |
| def __init__(self, in_planes, out_planes, kernel_size, stride, padding=0): | |
| super(BasicConv2d, self).__init__() | |
| self.conv = nn.Conv2d(in_planes, out_planes, | |
| kernel_size=kernel_size, stride=stride, | |
| padding=padding, bias=False) | |
| self.bn = nn.BatchNorm2d(out_planes) | |
| self.relu = nn.ReLU(inplace=False) | |
| def forward(self, x): | |
| x = self.conv(x) | |
| x = self.bn(x) | |
| x = self.relu(x) | |
| return x | |
| # --- Define InceptionResNetV2 Architecture --- | |
| class Mixed_5b(nn.Module): | |
| def __init__(self): | |
| super(Mixed_5b, self).__init__() | |
| self.branch0 = BasicConv2d(192, 96, kernel_size=1, stride=1) | |
| self.branch1 = nn.Sequential( | |
| BasicConv2d(192, 48, kernel_size=1, stride=1), | |
| BasicConv2d(48, 64, kernel_size=5, stride=1, padding=2) | |
| ) | |
| self.branch2 = nn.Sequential( | |
| BasicConv2d(192, 64, kernel_size=1, stride=1), | |
| BasicConv2d(64, 96, kernel_size=3, stride=1, padding=1), | |
| BasicConv2d(96, 96, kernel_size=3, stride=1, padding=1) | |
| ) | |
| self.branch3 = nn.Sequential( | |
| nn.AvgPool2d(3, stride=1, padding=1), | |
| BasicConv2d(192, 64, kernel_size=1, stride=1) | |
| ) | |
| def forward(self, x): | |
| x0 = self.branch0(x) | |
| x1 = self.branch1(x) | |
| x2 = self.branch2(x) | |
| x3 = self.branch3(x) | |
| out = torch.cat((x0, x1, x2, x3), 1) | |
| return out | |
| class Block35(nn.Module): | |
| def __init__(self, scale=1.0): | |
| super(Block35, self).__init__() | |
| self.scale = scale | |
| self.branch0 = BasicConv2d(320, 32, kernel_size=1, stride=1) | |
| self.branch1 = nn.Sequential( | |
| BasicConv2d(320, 32, kernel_size=1, stride=1), | |
| BasicConv2d(32, 32, kernel_size=3, stride=1, padding=1) | |
| ) | |
| self.branch2 = nn.Sequential( | |
| BasicConv2d(320, 32, kernel_size=1, stride=1), | |
| BasicConv2d(32, 48, kernel_size=3, stride=1, padding=1), | |
| BasicConv2d(48, 64, kernel_size=3, stride=1, padding=1) | |
| ) | |
| self.conv2d = nn.Conv2d(128, 320, kernel_size=1, stride=1) | |
| self.relu = nn.ReLU(inplace=False) | |
| def forward(self, x): | |
| x0 = self.branch0(x) | |
| x1 = self.branch1(x) | |
| x2 = self.branch2(x) | |
| out = torch.cat((x0, x1, x2), 1) | |
| out = self.conv2d(out) | |
| out = out * self.scale + x | |
| out = self.relu(out) | |
| return out | |
| class Mixed_6a(nn.Module): | |
| def __init__(self): | |
| super(Mixed_6a, self).__init__() | |
| self.branch0 = BasicConv2d(320, 384, kernel_size=3, stride=2) | |
| self.branch1 = nn.Sequential( | |
| BasicConv2d(320, 256, kernel_size=1, stride=1), | |
| BasicConv2d(256, 256, kernel_size=3, stride=1, padding=1), | |
| BasicConv2d(256, 384, kernel_size=3, stride=2) | |
| ) | |
| self.branch2 = nn.MaxPool2d(3, stride=2) | |
| def forward(self, x): | |
| x0 = self.branch0(x) | |
| x1 = self.branch1(x) | |
| x2 = self.branch2(x) | |
| out = torch.cat((x0, x1, x2), 1) | |
| return out | |
| class Block17(nn.Module): | |
| def __init__(self, scale=1.0): | |
| super(Block17, self).__init__() | |
| self.scale = scale | |
| self.branch0 = BasicConv2d(1088, 192, kernel_size=1, stride=1) | |
| self.branch1 = nn.Sequential( | |
| BasicConv2d(1088, 128, kernel_size=1, stride=1), | |
| BasicConv2d(128, 160, kernel_size=(1, 7), stride=1, padding=(0, 3)), | |
| BasicConv2d(160, 192, kernel_size=(7, 1), stride=1, padding=(3, 0)) | |
| ) | |
| self.conv2d = nn.Conv2d(384, 1088, kernel_size=1, stride=1) | |
| self.relu = nn.ReLU(inplace=False) | |
| def forward(self, x): | |
| x0 = self.branch0(x) | |
| x1 = self.branch1(x) | |
| out = torch.cat((x0, x1), 1) | |
| out = self.conv2d(out) | |
| out = out * self.scale + x | |
| out = self.relu(out) | |
| return out | |
| class Mixed_7a(nn.Module): | |
| def __init__(self): | |
| super(Mixed_7a, self).__init__() | |
| self.branch0 = nn.Sequential( | |
| BasicConv2d(1088, 256, kernel_size=1, stride=1), | |
| BasicConv2d(256, 384, kernel_size=3, stride=2) | |
| ) | |
| self.branch1 = nn.Sequential( | |
| BasicConv2d(1088, 256, kernel_size=1, stride=1), | |
| BasicConv2d(256, 288, kernel_size=3, stride=2) | |
| ) | |
| self.branch2 = nn.Sequential( | |
| BasicConv2d(1088, 256, kernel_size=1, stride=1), | |
| BasicConv2d(256, 288, kernel_size=3, stride=1, padding=1), | |
| BasicConv2d(288, 320, kernel_size=3, stride=2) | |
| ) | |
| self.branch3 = nn.MaxPool2d(3, stride=2) | |
| def forward(self, x): | |
| x0 = self.branch0(x) | |
| x1 = self.branch1(x) | |
| x2 = self.branch2(x) | |
| x3 = self.branch3(x) | |
| out = torch.cat((x0, x1, x2, x3), 1) | |
| return out | |
| class Block8(nn.Module): | |
| def __init__(self, scale=1.0, noReLU=False): | |
| super(Block8, self).__init__() | |
| self.scale = scale | |
| self.noReLU = noReLU | |
| self.branch0 = BasicConv2d(2080, 192, kernel_size=1, stride=1) | |
| self.branch1 = nn.Sequential( | |
| BasicConv2d(2080, 192, kernel_size=1, stride=1), | |
| BasicConv2d(192, 224, kernel_size=(1, 3), stride=1, padding=(0, 1)), | |
| BasicConv2d(224, 256, kernel_size=(3, 1), stride=1, padding=(1, 0)) | |
| ) | |
| self.conv2d = nn.Conv2d(448, 2080, kernel_size=1, stride=1) | |
| if not self.noReLU: | |
| self.relu = nn.ReLU(inplace=False) | |
| def forward(self, x): | |
| x0 = self.branch0(x) | |
| x1 = self.branch1(x) | |
| out = torch.cat((x0, x1), 1) | |
| out = self.conv2d(out) | |
| out = out * self.scale + x | |
| if not self.noReLU: | |
| out = self.relu(out) | |
| return out | |
| class InceptionResNetV2(nn.Module): | |
| def __init__(self, num_classes=1001): | |
| super(InceptionResNetV2, self).__init__() | |
| # Define all your layers here | |
| self.conv2d_1a = BasicConv2d(3, 32, kernel_size=3, stride=2) | |
| self.conv2d_2a = BasicConv2d(32, 32, kernel_size=3, stride=1) | |
| self.conv2d_2b = BasicConv2d(32, 64, kernel_size=3, stride=1, padding=1) | |
| self.maxpool_3a = nn.MaxPool2d(3, stride=2) | |
| self.conv2d_3b = BasicConv2d(64, 80, kernel_size=1, stride=1) | |
| self.conv2d_4a = BasicConv2d(80, 192, kernel_size=3, stride=1) | |
| self.maxpool_5a = nn.MaxPool2d(3, stride=2) | |
| self.mixed_5b = Mixed_5b() | |
| self.repeat = nn.Sequential( | |
| Block35(scale=0.17), | |
| Block35(scale=0.17), | |
| Block35(scale=0.17), | |
| Block35(scale=0.17), | |
| Block35(scale=0.17), | |
| Block35(scale=0.17), | |
| Block35(scale=0.17), | |
| Block35(scale=0.17), | |
| Block35(scale=0.17), | |
| Block35(scale=0.17) | |
| ) | |
| self.mixed_6a = Mixed_6a() | |
| self.repeat_1 = nn.Sequential( | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10), | |
| Block17(scale=0.10) | |
| ) | |
| self.mixed_7a = Mixed_7a() | |
| self.repeat_2 = nn.Sequential( | |
| Block8(scale=0.20), | |
| Block8(scale=0.20), | |
| Block8(scale=0.20), | |
| Block8(scale=0.20), | |
| Block8(scale=0.20), | |
| Block8(scale=0.20), | |
| Block8(scale=0.20), | |
| Block8(scale=0.20), | |
| Block8(scale=0.20) | |
| ) | |
| self.block8 = Block8(noReLU=True) | |
| self.conv2d_7b = BasicConv2d(2080, 1536, kernel_size=1, stride=1) | |
| self.avgpool_1a = nn.AvgPool2d(8, stride=1, padding=0) | |
| self.last_linear = nn.Linear(1536, num_classes) | |
| def features(self, input): | |
| x = self.conv2d_1a(input) | |
| x = self.conv2d_2a(x) | |
| x = self.conv2d_2b(x) | |
| x = self.maxpool_3a(x) | |
| x = self.conv2d_3b(x) | |
| x = self.conv2d_4a(x) | |
| x = self.maxpool_5a(x) | |
| x = self.mixed_5b(x) | |
| x = self.repeat(x) | |
| x = self.mixed_6a(x) | |
| x = self.repeat_1(x) | |
| x = self.mixed_7a(x) | |
| x = self.repeat_2(x) | |
| x = self.block8(x) | |
| x = self.conv2d_7b(x) | |
| return x | |
| def logits(self, features): | |
| x = self.avgpool_1a(features) | |
| x = x.view(x.size(0), -1) | |
| x = self.last_linear(x) | |
| return x | |
| def forward(self, input): | |
| x = self.features(input) | |
| x = self.logits(x) | |
| return x | |
| def inceptionresnetv2(num_classes=1000): | |
| return InceptionResNetV2(num_classes=num_classes) | |
| # --- Load Models --- | |
| def load_inception_model(model_path): | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model = inceptionresnetv2(num_classes=2).to(device) # Adjust num_classes as needed | |
| model.load_state_dict(torch.load(model_path, map_location=device)) | |
| model.eval() | |
| return model, device | |
| def load_yolo_model(yolo_model_path="yolov8n.pt"): | |
| model = YOLO(yolo_model_path) # You can specify a custom YOLOv8 model path if needed | |
| return model | |
| # --- Image Preprocessing --- | |
| data_transforms = transforms.Compose([ | |
| transforms.Resize(342), | |
| transforms.CenterCrop(299), | |
| transforms.ToTensor(), | |
| transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) | |
| ]) | |
| # --- Streamlit App --- | |
| def main(): | |
| st.title("Image Anomaly Detection and Object Detection") | |
| st.write("Upload an image to analyze for anomalies.") | |
| # Load models | |
| inception_model, device = load_inception_model(r'anamoly30.pth') # Ensure 'anamoly30.pth' is in the same directory | |
| yolo_model = load_yolo_model(r'best.pt') # Ensure 'yolov8n.pt' is in the same directory or specify the path | |
| # Upload the image | |
| uploaded_file = st.file_uploader("Choose an image", type=["jpg", "jpeg", "png"]) | |
| # User input for confidence threshold | |
| threshold = st.slider("Set Confidence Threshold", 0.0, 1.0, 0.5, 0.01) | |
| if uploaded_file is not None: | |
| # Display the uploaded image | |
| image = Image.open(uploaded_file).convert("RGB") | |
| st.image(image, caption="Uploaded Image", width=400) | |
| # Preprocess the image | |
| transformed_image = data_transforms(image).unsqueeze(0).to(device) | |
| # InceptionResNetV2 Prediction | |
| with torch.no_grad(): | |
| outputs = inception_model(transformed_image) | |
| _, predicted = torch.max(outputs, 1) | |
| predicted_class = ['bad', 'good'][predicted.item()] | |
| confidence = torch.nn.functional.softmax(outputs, dim=1)[0][predicted.item()].item() | |
| st.write(f"**Prediction:** {predicted_class}") | |
| st.write(f"**Confidence:** {confidence:.4f}") | |
| # Check if confidence is above the threshold | |
| if confidence >= threshold: | |
| if predicted_class == "bad": | |
| st.warning("Anomalies detected in the image. Processing further analysis...") | |
| # Automatically run YOLOv8 on the uploaded image | |
| st.write("Analyzing anomalies using YOLOv8...") | |
| yolo_results = yolo_model.predict(source=image, conf=0.25, show=False) | |
| # Display YOLOv8 predictions | |
| st.write("### YOLOv8 Predictions:") | |
| for result in yolo_results: | |
| # Plot the results on the image | |
| annotated_yolo_image = result.plot() | |
| st.image(annotated_yolo_image, caption="YOLOv8 Detection", width=400) | |
| # Optionally, display detailed results | |
| st.write("### Detection Details:") | |
| for result in yolo_results: | |
| for box in result.boxes: | |
| cls = int(box.cls) | |
| conf = box.conf | |
| label = yolo_model.names[cls] if cls < len(yolo_model.names) else "Unknown" | |
| st.write(f"- **Label**: {label}, **Confidence**: {conf.item():.2f}") | |
| # Provide interactive feedback option | |
| st.info("You can annotate the image to refine analysis.") | |
| # Initialize canvas for manual annotation | |
| canvas_result = st_canvas( | |
| fill_color="rgba(255, 165, 0, 0.3)", # Semi-transparent orange | |
| stroke_width=2, | |
| stroke_color="#FF0000", # Red | |
| background_color="#FFFFFF", | |
| background_image=image, | |
| update_streamlit=True, | |
| height=image.height, | |
| width=image.width, | |
| drawing_mode="rect", # Allow drawing rectangles | |
| key="canvas", | |
| ) | |
| if canvas_result.json_data is not None: | |
| objects = canvas_result.json_data["objects"] | |
| if len(objects) > 0: | |
| st.success("Bounding boxes drawn. Click the button below to analyze with YOLOv8.") | |
| if st.button("Analyze Manual Annotations"): | |
| # Draw the bounding boxes on the image | |
| annotated_image = image.copy() | |
| draw = ImageDraw.Draw(annotated_image) | |
| for obj in objects: | |
| if obj["type"] == "rect": | |
| left = obj["left"] | |
| top = obj["top"] | |
| width = obj["width"] | |
| height = obj["height"] | |
| draw.rectangle([left, top, left + width, top + height], outline="red", width=3) | |
| st.image(annotated_image, caption="Annotated Image", width=400) | |
| # Pass the manually annotated image to YOLOv8 | |
| yolo_results_manual = yolo_model.predict(source=annotated_image, conf=0.25, show=False) | |
| # Display YOLOv8 predictions for annotated image | |
| st.write("### YOLOv8 Predictions (Manual Annotations):") | |
| for result in yolo_results_manual: | |
| # Plot the results on the image | |
| annotated_yolo_image_manual = result.plot() | |
| st.image(annotated_yolo_image_manual, caption="YOLOv8 Detection (Manual)", width=400) | |
| # Display detection details | |
| st.write("### Detection Details (Manual Annotations):") | |
| for result in yolo_results_manual: | |
| for box in result.boxes: | |
| cls = int(box.cls) | |
| conf = box.conf | |
| label = yolo_model.names[cls] if cls < len(yolo_model.names) else "Unknown" | |
| st.write(f"- **Label**: {label}, **Confidence**: {conf.item():.2f}") | |
| else: | |
| st.info("Draw bounding boxes around the anomalies and press the button to analyze.") | |
| else: | |
| st.warning(f"The confidence level ({confidence:.4f}) is below the threshold of {threshold}. No further analysis will be performed.") | |
| else: | |
| st.info("Please upload an image to get started.") | |
| if __name__ == "__main__": | |
| main() |