EAV123's picture
Update app.py
f94f145 verified
raw
history blame
19.1 kB
import streamlit as st
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import pickle
import re
import time
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
# Load models and preprocessing components
@st.cache_resource
def load_components():
# Load deep learning models
cnn_model = load_model('cnn_model.h5')
lstm_model = load_model('lstm_model.h5')
# Load traditional ML models
with open('rf_model.pkl', 'rb') as f:
rf_model = pickle.load(f)
with open('svm_model.pkl', 'rb') as f:
svm_model = pickle.load(f)
# Load tokenizer and vectorizer
with open('sql_tokenizer.pkl', 'rb') as f:
tokenizer_data = pickle.load(f)
with open('tfidf_vectorizer.pkl', 'rb') as f:
tfidf_vectorizer = pickle.load(f)
return {
'cnn_model': cnn_model,
'lstm_model': lstm_model,
'rf_model': rf_model,
'svm_model': svm_model,
'tokenizer': tokenizer_data['tokenizer'],
'max_sequence_length': tokenizer_data['max_sequence_length'],
'tfidf_vectorizer': tfidf_vectorizer
}
# Try to load all components
try:
components = load_components()
model_loading_error = None
except Exception as e:
model_loading_error = str(e)
components = None
# Preprocess functions
def preprocess_query_for_deep_learning(query, tokenizer, max_sequence_length):
"""
Tokenizes and pads the input query to prepare it for deep learning models.
"""
sequences = tokenizer.texts_to_sequences([query])
padded = pad_sequences(sequences, maxlen=max_sequence_length, padding='post')
return padded
def preprocess_query_for_traditional_ml(query, tfidf_vectorizer):
"""
Transforms the input query using TF-IDF for traditional ML models.
"""
return tfidf_vectorizer.transform([query])
# Define improved regex patterns for SQL injection attempts
SQL_INJECTION_PATTERNS = [
# SQL comment syntax that follows a quote (likely injection)
r"(?i)'.*--",
# Quote followed by OR/AND with comparison (classic injection pattern)
r"(?i)'\s*(OR|AND)\s*['\d\w]+=\s*['\d\w]+",
# SQL Comment without preceding from a query context
r"(?i)(\s|^)--",
# Multiple query execution with semicolon
r"(?i)'.*;.*--",
# UNION-based injections
r"(?i)'\s*UNION\s+(ALL\s+)?SELECT",
# Time-delay attacks
r"(?i)'\s*;\s*WAITFOR\s+DELAY",
# DROP/ALTER table attacks
r"(?i)'\s*;\s*(DROP|ALTER)",
# Quote followed by a true condition
r"(?i)'\s*OR\s*'?\d+'?\s*=\s*'?\d+'?",
# Quote followed by always true condition like 1=1
r"(?i)'\s*OR\s*(['\"]\d+['\"])=(['\"]\d+['\"])",
# Batch queries
r"(?i);\s*(SELECT|INSERT|UPDATE|DELETE|DROP)",
# CAST attacks
r"(?i)CAST\s*\(.+AS\s+.+\)",
# Typical SQL function calls in injections
r"(?i)'\s*;\s*(EXEC|EXECUTE).*",
]
# Safe SQL patterns that should not trigger false positives
SAFE_SQL_PATTERNS = [
# Standard SELECT query
r"(?i)^SELECT\s+[\w\d\s,*]+\s+FROM\s+[\w\d]+(\s+WHERE\s+[\w\d\s=<>']+)?$",
# Standard INSERT query
r"(?i)^INSERT\s+INTO\s+[\w\d]+\s*\([^)]+\)\s*VALUES\s*\([^)]+\)$",
# Standard UPDATE query
r"(?i)^UPDATE\s+[\w\d]+\s+SET\s+[\w\d\s=',]+(\s+WHERE\s+[\w\d\s=<>']+)?$",
]
# Rule-based detection function
def detect_sql_injection_with_regex(query):
"""
Detects potential SQL injection patterns using improved regex.
Returns True if any malicious pattern matches and no safe pattern matches.
"""
# First check if the query matches any safe pattern
for pattern in SAFE_SQL_PATTERNS:
if re.search(pattern, query.strip()):
# Query matches a safe pattern
return False, None
# Then check for malicious patterns
for pattern in SQL_INJECTION_PATTERNS:
match = re.search(pattern, query)
if match:
return True, match.group(0)
# If no malicious pattern found
return False, None
# Ensemble prediction function
def predict_with_ensemble(query, components):
"""
Uses an ensemble of models to predict if the query is malicious.
Returns predictions from individual models and ensemble vote.
"""
# Get individual model predictions
# Random Forest prediction
query_tfidf = preprocess_query_for_traditional_ml(query, components['tfidf_vectorizer'])
rf_pred = int(components['rf_model'].predict(query_tfidf)[0])
# SVM prediction
svm_pred = int(components['svm_model'].predict(query_tfidf)[0])
# CNN prediction
query_padded = preprocess_query_for_deep_learning(query, components['tokenizer'], components['max_sequence_length'])
cnn_probability = components['cnn_model'].predict(query_padded)[0][0]
cnn_pred = int(cnn_probability > 0.5)
# LSTM prediction
lstm_probability = components['lstm_model'].predict(query_padded)[0][0]
lstm_pred = int(lstm_probability > 0.5)
# Majority voting
votes = [rf_pred, svm_pred, cnn_pred, lstm_pred]
ensemble_pred = np.bincount(votes).argmax()
return {
'rf': rf_pred,
'svm': svm_pred,
'cnn': {'prediction': cnn_pred, 'probability': float(cnn_probability)},
'lstm': {'prediction': lstm_pred, 'probability': float(lstm_probability)},
'ensemble': int(ensemble_pred),
'vote_count': {0: list(votes).count(0), 1: list(votes).count(1)}
}
# Initialize session state for UI flow control
if 'analysis_stage' not in st.session_state:
st.session_state.analysis_stage = 0 # 0: not started, 1: regex done, 2: ensemble done
if 'regex_result' not in st.session_state:
st.session_state.regex_result = None
if 'ensemble_result' not in st.session_state:
st.session_state.ensemble_result = None
# App title and description
st.title("🛡️ SQL Injection Detection")
st.markdown("""
This application uses a multi-layered approach to detect potentially malicious SQL queries:
1. **Rule-based detection** using improved regex patterns
2. **Ensemble learning** with majority voting from 4 models:
- Random Forest
- Support Vector Machine
- Convolutional Neural Network
- Long Short-Term Memory Network
Enter a query below or select from the examples to begin analysis.
""")
# Display warning if models couldn't be loaded
if model_loading_error:
st.warning(f"⚠️ Some models could not be loaded. The application will only use rule-based detection. Error: {model_loading_error}")
# Example queries in a dropdown
st.subheader("Select an Example or Enter Your Own Query")
example_categories = {
"Benign SQL Queries": [
"SELECT * FROM users WHERE username='admin'",
"SELECT id, name, price FROM products WHERE category_id=5",
"SELECT COUNT(*) FROM orders WHERE date > '2023-01-01'",
"INSERT INTO logs (user_id, action) VALUES (42, 'login')",
"UPDATE customers SET last_login='2023-06-15' WHERE id=101",
"DELETE FROM sessions WHERE last_activity < '2023-01-01'",
"SELECT email FROM subscribers WHERE active=1",
"INSERT INTO feedback (user_id, message) VALUES (87, 'Great service!')",
"UPDATE inventory SET stock = stock - 1 WHERE product_id = 300",
"SELECT name FROM employees WHERE department = 'Sales'",
"SELECT AVG(rating) FROM reviews WHERE product_id = 55",
"INSERT INTO audit_log (timestamp, event) VALUES (CURRENT_TIMESTAMP, 'update')",
"SELECT * FROM appointments WHERE doctor_id = 10 AND status = 'confirmed'",
"UPDATE settings SET value='dark' WHERE key='theme'",
"SELECT DISTINCT city FROM customers WHERE country='USA'",
"DELETE FROM cart_items WHERE user_id=12 AND product_id=78",
"SELECT MAX(salary) FROM employees WHERE role='manager'",
"INSERT INTO payments (user_id, amount, method) VALUES (33, 99.99, 'credit')",
"UPDATE products SET price = price * 1.1 WHERE category_id = 7",
"SELECT * FROM messages WHERE sender_id = 5 AND is_read = 0"
],
"Malicious SQL Queries": [
"' OR 1=1 --",
"admin'; DROP TABLE users; --",
"SELECT * FROM users WHERE username='' UNION SELECT username,password FROM admin_users --",
"'; WAITFOR DELAY '0:0:10' --",
"admin' OR '1'='1",
"' OR 'a'='a",
"' OR 1=1#",
"' OR 1=1/*",
"admin'--",
"'; EXEC xp_cmdshell('dir'); --",
"' OR EXISTS(SELECT * FROM users WHERE username = 'admin') --",
"1; DROP TABLE sessions --",
"'; SHUTDOWN --",
"' OR SLEEP(5) --",
"' AND 1=(SELECT COUNT(*) FROM users) --",
"admin' AND SUBSTRING(password, 1, 1) = 'a' --",
"' UNION ALL SELECT NULL,NULL,NULL --",
"0' OR 1=1 ORDER BY 1 --",
"1' AND (SELECT COUNT(*) FROM users) > 0 --",
"' OR (SELECT ASCII(SUBSTRING(password,1,1)) FROM users WHERE username='admin') > 64 --"
]
}
# First create category selection
category = st.selectbox(
"Choose query category:",
options=list(example_categories.keys()),
key="category"
)
# Then show examples from selected category
example = st.selectbox(
"Select an example:",
options=example_categories[category],
key="example"
)
# Allow user to use the selected example or enter their own
query_source = st.radio(
"Query source:",
["Use selected example", "Enter my own query"],
key="query_source"
)
if query_source == "Enter my own query":
query = st.text_area(
"Enter SQL Query:",
height=100,
placeholder="Type your SQL query here..."
)
else:
query = example
st.code(query, language="sql")
# Analysis process
if st.button("Start Analysis") and query:
# Reset analysis state
st.session_state.analysis_stage = 1
# Step 1: Rule-based detection
with st.spinner("Running rule-based detection..."):
time.sleep(0.5) # Simulate processing time
is_malicious, matched_pattern = detect_sql_injection_with_regex(query)
st.session_state.regex_result = (is_malicious, matched_pattern)
# If we have completed the regex analysis
if st.session_state.analysis_stage >= 1 and st.session_state.regex_result is not None:
is_malicious, matched_pattern = st.session_state.regex_result
st.subheader("Step 1: Rule-Based Detection")
if is_malicious:
st.error("🚨 SQL Injection Detected (Rule-Based)!")
st.warning(f"Matched pattern: `{matched_pattern}`")
# Show details in expander
with st.expander("Rule-Based Detection Details"):
st.markdown("""
**What was detected:**
- The query matched one or more known SQL injection patterns
- This type of pattern is commonly used in SQL injection attacks
- Review the query for security implications
""")
st.markdown("**Common SQL injection techniques detected:**")
st.markdown("""
- Comment sequences (`--`) after quotes
- Always true conditions (`OR 1=1`)
- Union-based injections
- SQL command injections
""")
else:
st.success("✅ No SQL injection patterns detected using rules")
with st.expander("Rule-Based Detection Details"):
st.markdown("""
**Analysis Details:**
- The query did not match any known SQL injection patterns
- The structure appears to be standard SQL syntax
- No suspicious patterns were identified
""")
# Ask if user wants to proceed with ensemble detection
proceed = st.radio(
"Would you like to proceed with ensemble model detection?",
["Yes", "No"],
index=0, # Default to Yes
key="proceed"
)
# Check if models are loaded before allowing ensemble analysis
if proceed == "Yes" and not model_loading_error:
if st.button("Run Ensemble Analysis"):
st.session_state.analysis_stage = 2
with st.spinner("Running ensemble models..."):
time.sleep(1) # Simulate processing time
ensemble_results = predict_with_ensemble(query, components)
st.session_state.ensemble_result = ensemble_results
elif proceed == "Yes" and model_loading_error:
st.error("Cannot run ensemble analysis because models failed to load.")
# If we have completed the ensemble analysis
if st.session_state.analysis_stage >= 2 and st.session_state.ensemble_result is not None:
results = st.session_state.ensemble_result
st.subheader("Step 2: Ensemble Model Detection")
# Create a visual representation of voting
vote_benign = results['vote_count'][0]
vote_malicious = results['vote_count'][1]
st.markdown(f"### Model Votes")
# Create columns for the voting visualization
col1, col2 = st.columns(2)
with col1:
st.metric("Safe Votes", vote_benign)
with col2:
st.metric("Malicious Votes", vote_malicious)
# Create a progress bar to visualize the voting ratio
vote_ratio = vote_malicious / (vote_benign + vote_malicious)
st.progress(vote_ratio, text=f"Malicious vote ratio: {vote_ratio*100:.0f}%")
# Display individual model results
st.markdown("### Individual Model Results")
model_cols = st.columns(4)
with model_cols[0]:
st.markdown("**Random Forest**")
if results['rf'] == 1:
st.error("⚠️ Malicious")
else:
st.success("✅ Safe")
with model_cols[1]:
st.markdown("**SVM**")
if results['svm'] == 1:
st.error("⚠️ Malicious")
else:
st.success("✅ Safe")
with model_cols[2]:
st.markdown("**CNN**")
cnn_prob = results['cnn']['probability'] * 100
if results['cnn']['prediction'] == 1:
st.error(f"⚠️ Malicious ({cnn_prob:.1f}%)")
else:
st.success(f"✅ Safe ({100-cnn_prob:.1f}%)")
with model_cols[3]:
st.markdown("**LSTM**")
lstm_prob = results['lstm']['probability'] * 100
if results['lstm']['prediction'] == 1:
st.error(f"⚠️ Malicious ({lstm_prob:.1f}%)")
else:
st.success(f"✅ Safe ({100-lstm_prob:.1f}%)")
# Final ensemble verdict
st.markdown("### Ensemble Verdict")
if results['ensemble'] == 1:
st.error("🚨 SQL Injection Detected by Majority Vote!")
else:
st.success("✅ Query deemed safe by majority vote")
# Explanation in expander
with st.expander("Ensemble Detection Details"):
st.markdown("""
**How ensemble voting works:**
- Each model casts a vote (0 for safe, 1 for malicious)
- The final decision is based on majority vote
- This approach combines the strengths of different model architectures
- More robust than any single model alone
""")
if results['ensemble'] == 1:
st.markdown(f"""
**Why was this flagged:**
- {vote_malicious} out of 4 models identified this query as potentially malicious
- The majority vote indicates suspicious patterns
- This query should be carefully reviewed before execution
""")
else:
st.markdown(f"""
**Why was this considered safe:**
- {vote_benign} out of 4 models identified this query as likely safe
- The majority vote indicates standard SQL patterns
- No significant red flags were detected in the ensemble
""")
# Final verdict combining both approaches
st.subheader("Final Analysis")
is_malicious_regex, _ = st.session_state.regex_result
is_malicious_ensemble = results['ensemble'] == 1
if is_malicious_regex or is_malicious_ensemble:
st.error("⚠️ This query appears to contain SQL injection patterns. Review carefully before executing.")
else:
st.success("✅ This query appears safe based on both rule-based and ensemble detection.")
st.info("ℹ️ Remember: Always use parameterized queries and proper input validation in production systems.")
# Reset button
if st.button("Analyze Another Query"):
st.session_state.analysis_stage = 0
st.session_state.regex_result = None
st.session_state.ensemble_result = None
st.experimental_rerun()
# Sidebar with additional info
with st.sidebar:
st.header("About This App")
st.markdown("""
### Multi-Layer Detection Process
1. **Rule-Based Detection**
- Fast, pattern-matching approach
- Uses improved regex to identify SQL injection patterns
- Reduces false positives with safe pattern recognition
2. **Ensemble Detection**
- Combines 4 different machine learning models:
- Random Forest
- Support Vector Machine (SVM)
- Convolutional Neural Network (CNN)
- Long Short-Term Memory Network (LSTM)
- Final decision by majority voting
""")
st.markdown("### Machine Learning Architecture")
st.code("""
# Traditional ML
- Random Forest (n_estimators=100)
- SVM (kernel='linear')
# CNN Architecture
Sequential([
Embedding(input_dim=10000, output_dim=128),
Conv1D(filters=64, kernel_size=3, activation='relu'),
MaxPooling1D(pool_size=2),
Dropout(0.5),
Conv1D(filters=128, kernel_size=3, activation='relu'),
MaxPooling1D(pool_size=2),
Flatten(),
Dense(64, activation='relu'),
Dropout(0.5),
Dense(1, activation='sigmoid')
])
# LSTM Architecture
Sequential([
Embedding(input_dim=10000, output_dim=128),
Bidirectional(LSTM(64, return_sequences=True)),
Dropout(0.5),
Bidirectional(LSTM(32)),
Dropout(0.5),
Dense(32, activation='relu'),
Dense(1, activation='sigmoid')
])
""")
st.markdown("### How It Works")
st.markdown("""
1. **Step 1:** Rule-based patterns scan for known SQL injection techniques
2. **Step 2:** Ensemble of 4 models evaluates the query structure
3. **Final Analysis:** Combined verdict from both approaches
""")
st.markdown("---")
st.warning("**Note:** This is a demonstration tool, not a replacement for proper security measures.")
# Footer
st.markdown("---")
st.markdown("""
<style>
.footer {
position: fixed;
left: 0;
bottom: 0;
width: 100%;
background-color: white;
color: black;
text-align: center;
padding: 10px;
border-top: 1px solid #e5e5e5;
}
</style>
<div class="footer">
<p>Developed with ❤️ using Streamlit | SQL Injection Detection System</p>
</div>
""", unsafe_allow_html=True)