Spaces:
Running
Running
import os | |
import numpy as np | |
import pandas as pd | |
import traceback | |
import gradio as gr | |
# Attempt to import libraries with more robust error handling | |
try: | |
import scapy.all as scapy | |
except ImportError: | |
scapy = None | |
try: | |
import pyshark | |
except ImportError: | |
pyshark = None | |
from sklearn.ensemble import IsolationForest | |
from sklearn.preprocessing import StandardScaler | |
class NetworkAnomalyDetector: | |
def __init__(self, huggingface_model=None): | |
self.huggingface_model = huggingface_model | |
self.isolation_forest = IsolationForest( | |
contamination=0.1, # Assume 10% of packets might be anomalous | |
random_state=42 | |
) | |
self.scaler = StandardScaler() | |
def parse_pcap_file(self, file_path): | |
""" | |
Parse network packet file with multiple parsing strategies | |
:param file_path: Path to the packet capture file | |
:return: DataFrame with packet features | |
""" | |
packet_features = [] | |
parsing_errors = [] | |
# Helper method to read raw file bytes as a fallback | |
def read_raw_file(path): | |
try: | |
with open(path, 'rb') as f: | |
file_contents = f.read() | |
return [{ | |
'length': len(file_contents), | |
'protocol': 'Unknown', | |
'src_ip': 'Unknown', | |
'dst_ip': 'Unknown', | |
'timestamp': 0 | |
}] | |
except Exception as e: | |
parsing_errors.append(f"Raw file reading error: {str(e)}") | |
return [] | |
# Method 1: Scapy parsing | |
if scapy is not None: | |
try: | |
packets = scapy.rdpcap(file_path) | |
for packet in packets: | |
features = { | |
'length': len(packet), | |
'protocol': self._extract_protocol_scapy(packet), | |
'src_ip': self._extract_src_ip_scapy(packet), | |
'dst_ip': self._extract_dst_ip_scapy(packet), | |
'timestamp': getattr(packet, 'time', 0) | |
} | |
packet_features.append(features) | |
if packet_features: | |
return pd.DataFrame(packet_features) | |
except Exception as e: | |
parsing_errors.append(f"Scapy parsing error: {str(e)}") | |
# Method 2: PyShark parsing (with async handling) | |
if pyshark is not None: | |
try: | |
import asyncio | |
async def parse_with_pyshark(): | |
capture = pyshark.FileCapture(file_path) | |
local_features = [] | |
for packet in capture: | |
try: | |
features = { | |
'length': int(packet.length), | |
'protocol': self._extract_protocol_pyshark(packet), | |
'src_ip': self._extract_src_ip_pyshark(packet), | |
'dst_ip': self._extract_dst_ip_pyshark(packet), | |
'timestamp': float(packet.sniff_time.timestamp()) | |
} | |
local_features.append(features) | |
except Exception as packet_error: | |
parsing_errors.append(f"PyShark packet parsing error: {str(packet_error)}") | |
capture.close() | |
return local_features | |
# Run the async function | |
try: | |
packet_features = asyncio.run(parse_with_pyshark()) | |
if packet_features: | |
return pd.DataFrame(packet_features) | |
except Exception as async_error: | |
parsing_errors.append(f"PyShark async error: {str(async_error)}") | |
except Exception as e: | |
parsing_errors.append(f"PyShark parsing error: {str(e)}") | |
# Fallback: Raw file reading | |
packet_features = read_raw_file(file_path) | |
if packet_features: | |
# Log parsing errors if any occurred | |
if parsing_errors: | |
print("Parsing Errors:") | |
for error in parsing_errors: | |
print(error) | |
return pd.DataFrame(packet_features) | |
# If all parsing methods fail | |
print("All parsing methods failed. Parsing Errors:") | |
for error in parsing_errors: | |
print(error) | |
return pd.DataFrame() | |
def _extract_protocol_scapy(self, packet): | |
"""Extract protocol from Scapy packet""" | |
try: | |
if packet.haslayer(scapy.IP): | |
return str(packet[scapy.IP].proto) | |
return 'Unknown' | |
except: | |
return 'Unknown' | |
def _extract_src_ip_scapy(self, packet): | |
"""Extract source IP from Scapy packet""" | |
try: | |
if packet.haslayer(scapy.IP): | |
return packet[scapy.IP].src | |
return 'Unknown' | |
except: | |
return 'Unknown' | |
def _extract_dst_ip_scapy(self, packet): | |
"""Extract destination IP from Scapy packet""" | |
try: | |
if packet.haslayer(scapy.IP): | |
return packet[scapy.IP].dst | |
return 'Unknown' | |
except: | |
return 'Unknown' | |
def _extract_protocol_pyshark(self, packet): | |
"""Extract protocol from PyShark packet""" | |
try: | |
if hasattr(packet, 'transport_layer'): | |
return str(packet.transport_layer) | |
elif hasattr(packet, 'ip'): | |
return str(packet.ip.proto) | |
return 'Unknown' | |
except: | |
return 'Unknown' | |
def _extract_src_ip_pyshark(self, packet): | |
"""Extract source IP from PyShark packet""" | |
try: | |
if hasattr(packet, 'ip'): | |
return packet.ip.src | |
return 'Unknown' | |
except: | |
return 'Unknown' | |
def _extract_dst_ip_pyshark(self, packet): | |
"""Extract destination IP from PyShark packet""" | |
try: | |
if hasattr(packet, 'ip'): | |
return packet.ip.dst | |
return 'Unknown' | |
except: | |
return 'Unknown' | |
def analyze_network_file(self, file_path): | |
""" | |
Comprehensive network file analysis with anomaly detection | |
:param file_path: Path to the network packet capture file | |
:return: Dictionary containing anomaly detection results | |
""" | |
# Parse packet file | |
packets_df = self.parse_pcap_file(file_path) | |
if packets_df.empty: | |
return { | |
'summary': { | |
'total_packets': 0, | |
'isolation_forest_anomalies': 0 | |
}, | |
'packets': packets_df | |
} | |
# Prepare features for anomaly detection | |
feature_columns = ['length', 'timestamp'] | |
# Handle protocol and IP as categorical features | |
packets_df['protocol_encoded'] = pd.Categorical(packets_df['protocol']).codes | |
packets_df['src_ip_encoded'] = pd.Categorical(packets_df['src_ip']).codes | |
packets_df['dst_ip_encoded'] = pd.Categorical(packets_df['dst_ip']).codes | |
feature_columns.extend(['protocol_encoded', 'src_ip_encoded', 'dst_ip_encoded']) | |
# Prepare features for anomaly detection | |
features = packets_df[feature_columns] | |
# Scale features | |
features_scaled = self.scaler.fit_transform(features) | |
# Detect anomalies | |
anomaly_labels = self.isolation_forest.fit_predict(features_scaled) | |
packets_df['is_anomaly'] = anomaly_labels == -1 | |
# Create summary | |
summary = { | |
'total_packets': len(packets_df), | |
'isolation_forest_anomalies': sum(packets_df['is_anomaly']) | |
} | |
return { | |
'summary': summary, | |
'packets': packets_df | |
} | |
def analyze_network_file(file_path): | |
""" | |
Wrapper function to analyze network file and handle Gradio interface requirements | |
""" | |
try: | |
# Check if file exists | |
if not os.path.exists(file_path): | |
return "Error: File not found", None, "File does not exist." | |
# Initialize detector | |
detector = NetworkAnomalyDetector() | |
# Analyze network file | |
results = detector.analyze_network_file(file_path) | |
# Prepare summary text | |
summary_text = f""" | |
Anomaly Detection Results: | |
- Total Packets: {results['summary']['total_packets']} | |
- Anomalous Packets: {results['summary']['isolation_forest_anomalies']} | |
""" | |
# Return results for display | |
return summary_text, results['packets'], None | |
except Exception as e: | |
error_trace = traceback.format_exc() | |
return f"Error: {str(e)}", None, error_trace | |
def create_gradio_interface(): | |
""" | |
Create Gradio interface for Network Anomaly Detector | |
""" | |
with gr.Blocks(title="Network Anomaly Detector") as demo: | |
gr.Markdown("# 🌐 Network Anomaly Detector") | |
gr.Markdown("Upload a network packet capture file (PCAP) for anomaly analysis.") | |
with gr.Row(): | |
file_input = gr.File(label="Upload PCAP File", type="filepath", file_types=['.pcap', '.pkt']) | |
analyze_button = gr.Button("Analyze Network File", variant="primary") | |
# Outputs | |
summary_output = gr.Textbox(label="Analysis Summary", lines=5) | |
results_dataframe = gr.DataFrame(label="Packet Details") | |
error_output = gr.Textbox(label="Error Trace", visible=False) | |
# Event handlers | |
analyze_button.click( | |
fn=analyze_network_file, | |
inputs=[file_input], | |
outputs=[summary_output, results_dataframe, error_output] | |
) | |
# Optional: Add some explanatory text about the tool | |
gr.Markdown(""" | |
### How it works: | |
1. Upload a PCAP (packet capture) file | |
2. Click "Analyze Network File" | |
3. View summary of total packets and detected anomalies | |
4. Explore detailed packet information | |
#### Anomaly Detection Techniques: | |
- Uses Isolation Forest algorithm | |
- Analyzes packet length, timestamp, protocol, and IP addresses | |
- Highlights statistically unusual network traffic | |
""") | |
return demo | |
def main(): | |
# Check library requirements | |
try: | |
import scapy | |
import pyshark | |
import pandas | |
import numpy | |
import sklearn | |
import gradio | |
except ImportError as e: | |
print(f"Missing required library: {e}") | |
print("Please install: pip install scapy pyshark pandas numpy scikit-learn gradio") | |
return | |
# Launch Gradio app | |
demo = create_gradio_interface() | |
demo.launch( | |
server_name="0.0.0.0", # Make accessible on local network | |
share=True, # Optional: create a public shareable link | |
debug=True # Show detailed errors | |
) | |
if __name__ == "__main__": | |
main() |