Balajim57's picture
Update app.py
fca893e verified
# app.py
from __future__ import print_function, division, absolute_import
import streamlit as st
import torch
import torch.nn as nn
from torchvision import transforms
from PIL import Image, ImageDraw
from ultralytics import YOLO
from streamlit_drawable_canvas import st_canvas
import os
# --- Define Basic Components for InceptionResNetV2 ---
class BasicConv2d(nn.Module):
def __init__(self, in_planes, out_planes, kernel_size, stride, padding=0):
super(BasicConv2d, self).__init__()
self.conv = nn.Conv2d(in_planes, out_planes,
kernel_size=kernel_size, stride=stride,
padding=padding, bias=False)
self.bn = nn.BatchNorm2d(out_planes)
self.relu = nn.ReLU(inplace=False)
def forward(self, x):
x = self.conv(x)
x = self.bn(x)
x = self.relu(x)
return x
# --- Define InceptionResNetV2 Architecture ---
class Mixed_5b(nn.Module):
def __init__(self):
super(Mixed_5b, self).__init__()
self.branch0 = BasicConv2d(192, 96, kernel_size=1, stride=1)
self.branch1 = nn.Sequential(
BasicConv2d(192, 48, kernel_size=1, stride=1),
BasicConv2d(48, 64, kernel_size=5, stride=1, padding=2)
)
self.branch2 = nn.Sequential(
BasicConv2d(192, 64, kernel_size=1, stride=1),
BasicConv2d(64, 96, kernel_size=3, stride=1, padding=1),
BasicConv2d(96, 96, kernel_size=3, stride=1, padding=1)
)
self.branch3 = nn.Sequential(
nn.AvgPool2d(3, stride=1, padding=1),
BasicConv2d(192, 64, kernel_size=1, stride=1)
)
def forward(self, x):
x0 = self.branch0(x)
x1 = self.branch1(x)
x2 = self.branch2(x)
x3 = self.branch3(x)
out = torch.cat((x0, x1, x2, x3), 1)
return out
class Block35(nn.Module):
def __init__(self, scale=1.0):
super(Block35, self).__init__()
self.scale = scale
self.branch0 = BasicConv2d(320, 32, kernel_size=1, stride=1)
self.branch1 = nn.Sequential(
BasicConv2d(320, 32, kernel_size=1, stride=1),
BasicConv2d(32, 32, kernel_size=3, stride=1, padding=1)
)
self.branch2 = nn.Sequential(
BasicConv2d(320, 32, kernel_size=1, stride=1),
BasicConv2d(32, 48, kernel_size=3, stride=1, padding=1),
BasicConv2d(48, 64, kernel_size=3, stride=1, padding=1)
)
self.conv2d = nn.Conv2d(128, 320, kernel_size=1, stride=1)
self.relu = nn.ReLU(inplace=False)
def forward(self, x):
x0 = self.branch0(x)
x1 = self.branch1(x)
x2 = self.branch2(x)
out = torch.cat((x0, x1, x2), 1)
out = self.conv2d(out)
out = out * self.scale + x
out = self.relu(out)
return out
class Mixed_6a(nn.Module):
def __init__(self):
super(Mixed_6a, self).__init__()
self.branch0 = BasicConv2d(320, 384, kernel_size=3, stride=2)
self.branch1 = nn.Sequential(
BasicConv2d(320, 256, kernel_size=1, stride=1),
BasicConv2d(256, 256, kernel_size=3, stride=1, padding=1),
BasicConv2d(256, 384, kernel_size=3, stride=2)
)
self.branch2 = nn.MaxPool2d(3, stride=2)
def forward(self, x):
x0 = self.branch0(x)
x1 = self.branch1(x)
x2 = self.branch2(x)
out = torch.cat((x0, x1, x2), 1)
return out
class Block17(nn.Module):
def __init__(self, scale=1.0):
super(Block17, self).__init__()
self.scale = scale
self.branch0 = BasicConv2d(1088, 192, kernel_size=1, stride=1)
self.branch1 = nn.Sequential(
BasicConv2d(1088, 128, kernel_size=1, stride=1),
BasicConv2d(128, 160, kernel_size=(1, 7), stride=1, padding=(0, 3)),
BasicConv2d(160, 192, kernel_size=(7, 1), stride=1, padding=(3, 0))
)
self.conv2d = nn.Conv2d(384, 1088, kernel_size=1, stride=1)
self.relu = nn.ReLU(inplace=False)
def forward(self, x):
x0 = self.branch0(x)
x1 = self.branch1(x)
out = torch.cat((x0, x1), 1)
out = self.conv2d(out)
out = out * self.scale + x
out = self.relu(out)
return out
class Mixed_7a(nn.Module):
def __init__(self):
super(Mixed_7a, self).__init__()
self.branch0 = nn.Sequential(
BasicConv2d(1088, 256, kernel_size=1, stride=1),
BasicConv2d(256, 384, kernel_size=3, stride=2)
)
self.branch1 = nn.Sequential(
BasicConv2d(1088, 256, kernel_size=1, stride=1),
BasicConv2d(256, 288, kernel_size=3, stride=2)
)
self.branch2 = nn.Sequential(
BasicConv2d(1088, 256, kernel_size=1, stride=1),
BasicConv2d(256, 288, kernel_size=3, stride=1, padding=1),
BasicConv2d(288, 320, kernel_size=3, stride=2)
)
self.branch3 = nn.MaxPool2d(3, stride=2)
def forward(self, x):
x0 = self.branch0(x)
x1 = self.branch1(x)
x2 = self.branch2(x)
x3 = self.branch3(x)
out = torch.cat((x0, x1, x2, x3), 1)
return out
class Block8(nn.Module):
def __init__(self, scale=1.0, noReLU=False):
super(Block8, self).__init__()
self.scale = scale
self.noReLU = noReLU
self.branch0 = BasicConv2d(2080, 192, kernel_size=1, stride=1)
self.branch1 = nn.Sequential(
BasicConv2d(2080, 192, kernel_size=1, stride=1),
BasicConv2d(192, 224, kernel_size=(1, 3), stride=1, padding=(0, 1)),
BasicConv2d(224, 256, kernel_size=(3, 1), stride=1, padding=(1, 0))
)
self.conv2d = nn.Conv2d(448, 2080, kernel_size=1, stride=1)
if not self.noReLU:
self.relu = nn.ReLU(inplace=False)
def forward(self, x):
x0 = self.branch0(x)
x1 = self.branch1(x)
out = torch.cat((x0, x1), 1)
out = self.conv2d(out)
out = out * self.scale + x
if not self.noReLU:
out = self.relu(out)
return out
class InceptionResNetV2(nn.Module):
def __init__(self, num_classes=1001):
super(InceptionResNetV2, self).__init__()
# Define all your layers here
self.conv2d_1a = BasicConv2d(3, 32, kernel_size=3, stride=2)
self.conv2d_2a = BasicConv2d(32, 32, kernel_size=3, stride=1)
self.conv2d_2b = BasicConv2d(32, 64, kernel_size=3, stride=1, padding=1)
self.maxpool_3a = nn.MaxPool2d(3, stride=2)
self.conv2d_3b = BasicConv2d(64, 80, kernel_size=1, stride=1)
self.conv2d_4a = BasicConv2d(80, 192, kernel_size=3, stride=1)
self.maxpool_5a = nn.MaxPool2d(3, stride=2)
self.mixed_5b = Mixed_5b()
self.repeat = nn.Sequential(
Block35(scale=0.17),
Block35(scale=0.17),
Block35(scale=0.17),
Block35(scale=0.17),
Block35(scale=0.17),
Block35(scale=0.17),
Block35(scale=0.17),
Block35(scale=0.17),
Block35(scale=0.17),
Block35(scale=0.17)
)
self.mixed_6a = Mixed_6a()
self.repeat_1 = nn.Sequential(
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10),
Block17(scale=0.10)
)
self.mixed_7a = Mixed_7a()
self.repeat_2 = nn.Sequential(
Block8(scale=0.20),
Block8(scale=0.20),
Block8(scale=0.20),
Block8(scale=0.20),
Block8(scale=0.20),
Block8(scale=0.20),
Block8(scale=0.20),
Block8(scale=0.20),
Block8(scale=0.20)
)
self.block8 = Block8(noReLU=True)
self.conv2d_7b = BasicConv2d(2080, 1536, kernel_size=1, stride=1)
self.avgpool_1a = nn.AvgPool2d(8, stride=1, padding=0)
self.last_linear = nn.Linear(1536, num_classes)
def features(self, input):
x = self.conv2d_1a(input)
x = self.conv2d_2a(x)
x = self.conv2d_2b(x)
x = self.maxpool_3a(x)
x = self.conv2d_3b(x)
x = self.conv2d_4a(x)
x = self.maxpool_5a(x)
x = self.mixed_5b(x)
x = self.repeat(x)
x = self.mixed_6a(x)
x = self.repeat_1(x)
x = self.mixed_7a(x)
x = self.repeat_2(x)
x = self.block8(x)
x = self.conv2d_7b(x)
return x
def logits(self, features):
x = self.avgpool_1a(features)
x = x.view(x.size(0), -1)
x = self.last_linear(x)
return x
def forward(self, input):
x = self.features(input)
x = self.logits(x)
return x
def inceptionresnetv2(num_classes=1000):
return InceptionResNetV2(num_classes=num_classes)
# --- Load Models ---
@st.cache_resource
def load_inception_model(model_path):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = inceptionresnetv2(num_classes=2).to(device) # Adjust num_classes as needed
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval()
return model, device
@st.cache_resource
def load_yolo_model(yolo_model_path="yolov8n.pt"):
model = YOLO(yolo_model_path) # You can specify a custom YOLOv8 model path if needed
return model
# --- Image Preprocessing ---
data_transforms = transforms.Compose([
transforms.Resize(342),
transforms.CenterCrop(299),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# --- Streamlit App ---
def main():
st.title("Image Anomaly Detection and Object Detection")
st.write("Upload an image to analyze for anomalies.")
# Load models
inception_model, device = load_inception_model(r'anamoly30.pth') # Ensure 'anamoly30.pth' is in the same directory
yolo_model = load_yolo_model(r'best.pt') # Ensure 'yolov8n.pt' is in the same directory or specify the path
# Upload the image
uploaded_file = st.file_uploader("Choose an image", type=["jpg", "jpeg", "png"])
# User input for confidence threshold
threshold = st.slider("Set Confidence Threshold", 0.0, 1.0, 0.5, 0.01)
if uploaded_file is not None:
# Display the uploaded image
image = Image.open(uploaded_file).convert("RGB")
st.image(image, caption="Uploaded Image", width=400)
# Preprocess the image
transformed_image = data_transforms(image).unsqueeze(0).to(device)
# InceptionResNetV2 Prediction
with torch.no_grad():
outputs = inception_model(transformed_image)
_, predicted = torch.max(outputs, 1)
predicted_class = ['bad', 'good'][predicted.item()]
confidence = torch.nn.functional.softmax(outputs, dim=1)[0][predicted.item()].item()
st.write(f"**Prediction:** {predicted_class}")
st.write(f"**Confidence:** {confidence:.4f}")
# Check if confidence is above the threshold
if confidence >= threshold:
if predicted_class == "bad":
st.warning("Anomalies detected in the image. Processing further analysis...")
# Automatically run YOLOv8 on the uploaded image
st.write("Analyzing anomalies using YOLOv8...")
yolo_results = yolo_model.predict(source=image, conf=0.25, show=False)
# Display YOLOv8 predictions
st.write("### YOLOv8 Predictions:")
for result in yolo_results:
# Plot the results on the image
annotated_yolo_image = result.plot()
st.image(annotated_yolo_image, caption="YOLOv8 Detection", width=400)
# Optionally, display detailed results
st.write("### Detection Details:")
for result in yolo_results:
for box in result.boxes:
cls = int(box.cls)
conf = box.conf
label = yolo_model.names[cls] if cls < len(yolo_model.names) else "Unknown"
st.write(f"- **Label**: {label}, **Confidence**: {conf.item():.2f}")
# Provide interactive feedback option
st.info("You can annotate the image to refine analysis.")
# Initialize canvas for manual annotation
canvas_result = st_canvas(
fill_color="rgba(255, 165, 0, 0.3)", # Semi-transparent orange
stroke_width=2,
stroke_color="#FF0000", # Red
background_color="#FFFFFF",
background_image=image,
update_streamlit=True,
height=image.height,
width=image.width,
drawing_mode="rect", # Allow drawing rectangles
key="canvas",
)
if canvas_result.json_data is not None:
objects = canvas_result.json_data["objects"]
if len(objects) > 0:
st.success("Bounding boxes drawn. Click the button below to analyze with YOLOv8.")
if st.button("Analyze Manual Annotations"):
# Draw the bounding boxes on the image
annotated_image = image.copy()
draw = ImageDraw.Draw(annotated_image)
for obj in objects:
if obj["type"] == "rect":
left = obj["left"]
top = obj["top"]
width = obj["width"]
height = obj["height"]
draw.rectangle([left, top, left + width, top + height], outline="red", width=3)
st.image(annotated_image, caption="Annotated Image", width=400)
# Pass the manually annotated image to YOLOv8
yolo_results_manual = yolo_model.predict(source=annotated_image, conf=0.25, show=False)
# Display YOLOv8 predictions for annotated image
st.write("### YOLOv8 Predictions (Manual Annotations):")
for result in yolo_results_manual:
# Plot the results on the image
annotated_yolo_image_manual = result.plot()
st.image(annotated_yolo_image_manual, caption="YOLOv8 Detection (Manual)", width=400)
# Display detection details
st.write("### Detection Details (Manual Annotations):")
for result in yolo_results_manual:
for box in result.boxes:
cls = int(box.cls)
conf = box.conf
label = yolo_model.names[cls] if cls < len(yolo_model.names) else "Unknown"
st.write(f"- **Label**: {label}, **Confidence**: {conf.item():.2f}")
else:
st.info("Draw bounding boxes around the anomalies and press the button to analyze.")
else:
st.warning(f"The confidence level ({confidence:.4f}) is below the threshold of {threshold}. No further analysis will be performed.")
else:
st.info("Please upload an image to get started.")
if __name__ == "__main__":
main()