nafees369's picture
Update app.py
5de4015 verified
import os
import numpy as np
import pandas as pd
import traceback
import gradio as gr
# Attempt to import libraries with more robust error handling
try:
import scapy.all as scapy
except ImportError:
scapy = None
try:
import pyshark
except ImportError:
pyshark = None
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
class NetworkAnomalyDetector:
def __init__(self, huggingface_model=None):
self.huggingface_model = huggingface_model
self.isolation_forest = IsolationForest(
contamination=0.1, # Assume 10% of packets might be anomalous
random_state=42
)
self.scaler = StandardScaler()
def parse_pcap_file(self, file_path):
"""
Parse network packet file with multiple parsing strategies
:param file_path: Path to the packet capture file
:return: DataFrame with packet features
"""
packet_features = []
parsing_errors = []
# Helper method to read raw file bytes as a fallback
def read_raw_file(path):
try:
with open(path, 'rb') as f:
file_contents = f.read()
return [{
'length': len(file_contents),
'protocol': 'Unknown',
'src_ip': 'Unknown',
'dst_ip': 'Unknown',
'timestamp': 0
}]
except Exception as e:
parsing_errors.append(f"Raw file reading error: {str(e)}")
return []
# Method 1: Scapy parsing
if scapy is not None:
try:
packets = scapy.rdpcap(file_path)
for packet in packets:
features = {
'length': len(packet),
'protocol': self._extract_protocol_scapy(packet),
'src_ip': self._extract_src_ip_scapy(packet),
'dst_ip': self._extract_dst_ip_scapy(packet),
'timestamp': getattr(packet, 'time', 0)
}
packet_features.append(features)
if packet_features:
return pd.DataFrame(packet_features)
except Exception as e:
parsing_errors.append(f"Scapy parsing error: {str(e)}")
# Method 2: PyShark parsing (with async handling)
if pyshark is not None:
try:
import asyncio
async def parse_with_pyshark():
capture = pyshark.FileCapture(file_path)
local_features = []
for packet in capture:
try:
features = {
'length': int(packet.length),
'protocol': self._extract_protocol_pyshark(packet),
'src_ip': self._extract_src_ip_pyshark(packet),
'dst_ip': self._extract_dst_ip_pyshark(packet),
'timestamp': float(packet.sniff_time.timestamp())
}
local_features.append(features)
except Exception as packet_error:
parsing_errors.append(f"PyShark packet parsing error: {str(packet_error)}")
capture.close()
return local_features
# Run the async function
try:
packet_features = asyncio.run(parse_with_pyshark())
if packet_features:
return pd.DataFrame(packet_features)
except Exception as async_error:
parsing_errors.append(f"PyShark async error: {str(async_error)}")
except Exception as e:
parsing_errors.append(f"PyShark parsing error: {str(e)}")
# Fallback: Raw file reading
packet_features = read_raw_file(file_path)
if packet_features:
# Log parsing errors if any occurred
if parsing_errors:
print("Parsing Errors:")
for error in parsing_errors:
print(error)
return pd.DataFrame(packet_features)
# If all parsing methods fail
print("All parsing methods failed. Parsing Errors:")
for error in parsing_errors:
print(error)
return pd.DataFrame()
def _extract_protocol_scapy(self, packet):
"""Extract protocol from Scapy packet"""
try:
if packet.haslayer(scapy.IP):
return str(packet[scapy.IP].proto)
return 'Unknown'
except:
return 'Unknown'
def _extract_src_ip_scapy(self, packet):
"""Extract source IP from Scapy packet"""
try:
if packet.haslayer(scapy.IP):
return packet[scapy.IP].src
return 'Unknown'
except:
return 'Unknown'
def _extract_dst_ip_scapy(self, packet):
"""Extract destination IP from Scapy packet"""
try:
if packet.haslayer(scapy.IP):
return packet[scapy.IP].dst
return 'Unknown'
except:
return 'Unknown'
def _extract_protocol_pyshark(self, packet):
"""Extract protocol from PyShark packet"""
try:
if hasattr(packet, 'transport_layer'):
return str(packet.transport_layer)
elif hasattr(packet, 'ip'):
return str(packet.ip.proto)
return 'Unknown'
except:
return 'Unknown'
def _extract_src_ip_pyshark(self, packet):
"""Extract source IP from PyShark packet"""
try:
if hasattr(packet, 'ip'):
return packet.ip.src
return 'Unknown'
except:
return 'Unknown'
def _extract_dst_ip_pyshark(self, packet):
"""Extract destination IP from PyShark packet"""
try:
if hasattr(packet, 'ip'):
return packet.ip.dst
return 'Unknown'
except:
return 'Unknown'
def analyze_network_file(self, file_path):
"""
Comprehensive network file analysis with anomaly detection
:param file_path: Path to the network packet capture file
:return: Dictionary containing anomaly detection results
"""
# Parse packet file
packets_df = self.parse_pcap_file(file_path)
if packets_df.empty:
return {
'summary': {
'total_packets': 0,
'isolation_forest_anomalies': 0
},
'packets': packets_df
}
# Prepare features for anomaly detection
feature_columns = ['length', 'timestamp']
# Handle protocol and IP as categorical features
packets_df['protocol_encoded'] = pd.Categorical(packets_df['protocol']).codes
packets_df['src_ip_encoded'] = pd.Categorical(packets_df['src_ip']).codes
packets_df['dst_ip_encoded'] = pd.Categorical(packets_df['dst_ip']).codes
feature_columns.extend(['protocol_encoded', 'src_ip_encoded', 'dst_ip_encoded'])
# Prepare features for anomaly detection
features = packets_df[feature_columns]
# Scale features
features_scaled = self.scaler.fit_transform(features)
# Detect anomalies
anomaly_labels = self.isolation_forest.fit_predict(features_scaled)
packets_df['is_anomaly'] = anomaly_labels == -1
# Create summary
summary = {
'total_packets': len(packets_df),
'isolation_forest_anomalies': sum(packets_df['is_anomaly'])
}
return {
'summary': summary,
'packets': packets_df
}
def analyze_network_file(file_path):
"""
Wrapper function to analyze network file and handle Gradio interface requirements
"""
try:
# Check if file exists
if not os.path.exists(file_path):
return "Error: File not found", None, "File does not exist."
# Initialize detector
detector = NetworkAnomalyDetector()
# Analyze network file
results = detector.analyze_network_file(file_path)
# Prepare summary text
summary_text = f"""
Anomaly Detection Results:
- Total Packets: {results['summary']['total_packets']}
- Anomalous Packets: {results['summary']['isolation_forest_anomalies']}
"""
# Return results for display
return summary_text, results['packets'], None
except Exception as e:
error_trace = traceback.format_exc()
return f"Error: {str(e)}", None, error_trace
def create_gradio_interface():
"""
Create Gradio interface for Network Anomaly Detector
"""
with gr.Blocks(title="Network Anomaly Detector") as demo:
gr.Markdown("# 🌐 Network Anomaly Detector")
gr.Markdown("Upload a network packet capture file (PCAP) for anomaly analysis.")
with gr.Row():
file_input = gr.File(label="Upload PCAP File", type="filepath", file_types=['.pcap', '.pkt'])
analyze_button = gr.Button("Analyze Network File", variant="primary")
# Outputs
summary_output = gr.Textbox(label="Analysis Summary", lines=5)
results_dataframe = gr.DataFrame(label="Packet Details")
error_output = gr.Textbox(label="Error Trace", visible=False)
# Event handlers
analyze_button.click(
fn=analyze_network_file,
inputs=[file_input],
outputs=[summary_output, results_dataframe, error_output]
)
# Optional: Add some explanatory text about the tool
gr.Markdown("""
### How it works:
1. Upload a PCAP (packet capture) file
2. Click "Analyze Network File"
3. View summary of total packets and detected anomalies
4. Explore detailed packet information
#### Anomaly Detection Techniques:
- Uses Isolation Forest algorithm
- Analyzes packet length, timestamp, protocol, and IP addresses
- Highlights statistically unusual network traffic
""")
return demo
def main():
# Check library requirements
try:
import scapy
import pyshark
import pandas
import numpy
import sklearn
import gradio
except ImportError as e:
print(f"Missing required library: {e}")
print("Please install: pip install scapy pyshark pandas numpy scikit-learn gradio")
return
# Launch Gradio app
demo = create_gradio_interface()
demo.launch(
server_name="0.0.0.0", # Make accessible on local network
share=True, # Optional: create a public shareable link
debug=True # Show detailed errors
)
if __name__ == "__main__":
main()