Spaces:
Paused
Paused
| import socket | |
| import threading | |
| import time | |
| from queue import Queue | |
| import uvicorn | |
| from fastapi import FastAPI, Form, Request | |
| from fastapi.responses import HTMLResponse | |
| # --- UI Template (HTML, CSS, JS) --- | |
| # NOTE: All curly braces for CSS and JS are doubled (e.g., {{ ... }}) | |
| # to escape them from Python's .format() method. The only single-brace | |
| # placeholder is {content}. | |
| HTML_TEMPLATE = """ | |
| <!DOCTYPE html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="UTF-8"> | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
| <title>FastAPI Port Scanner</title> | |
| <style> | |
| :root {{ | |
| --bg-color: #1a1a1a; | |
| --surface-color: #2c2c2c; | |
| --primary-color: #00aaff; | |
| --primary-hover-color: #0088cc; | |
| --text-color: #e0e0e0; | |
| --text-muted-color: #a0a0a0; | |
| --success-color: #4CAF50; | |
| --border-color: #444; | |
| }} | |
| body {{ | |
| font-family: 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif; | |
| background-color: var(--bg-color); | |
| color: var(--text-color); | |
| margin: 0; | |
| padding: 2rem; | |
| display: flex; | |
| justify-content: center; | |
| align-items: center; | |
| min-height: 100vh; | |
| }} | |
| .container {{ | |
| background-color: var(--surface-color); | |
| padding: 2.5rem; | |
| border-radius: 12px; | |
| box-shadow: 0 10px 30px rgba(0, 0, 0, 0.5); | |
| width: 100%; | |
| max-width: 650px; | |
| border: 1px solid var(--border-color); | |
| }} | |
| h1, h2 {{ | |
| color: var(--primary-color); | |
| text-align: center; | |
| margin-top: 0; | |
| }} | |
| form {{ | |
| display: flex; | |
| flex-direction: column; | |
| gap: 1.2rem; | |
| }} | |
| .form-group {{ | |
| display: flex; | |
| flex-direction: column; | |
| }} | |
| label {{ | |
| font-weight: bold; | |
| margin-bottom: 0.5rem; | |
| font-size: 0.9rem; | |
| }} | |
| input {{ | |
| padding: 0.8rem 1rem; | |
| border-radius: 6px; | |
| border: 1px solid var(--border-color); | |
| background-color: var(--bg-color); | |
| color: var(--text-color); | |
| font-size: 1rem; | |
| transition: border-color 0.3s, box-shadow 0.3s; | |
| }} | |
| input:focus {{ | |
| outline: none; | |
| border-color: var(--primary-color); | |
| box-shadow: 0 0 0 3px rgba(0, 170, 255, 0.2); | |
| }} | |
| button {{ | |
| padding: 1rem; | |
| border-radius: 6px; | |
| border: none; | |
| background-color: var(--primary-color); | |
| color: #fff; | |
| font-size: 1.1rem; | |
| font-weight: bold; | |
| cursor: pointer; | |
| transition: background-color 0.3s; | |
| margin-top: 1rem; | |
| }} | |
| button:hover {{ | |
| background-color: var(--primary-hover-color); | |
| }} | |
| .disclaimer {{ | |
| font-size: 0.8rem; | |
| color: #ffeb3b; | |
| text-align: center; | |
| margin-top: 1.5rem; | |
| border: 1px dashed #ffeb3b; | |
| padding: 0.5rem; | |
| border-radius: 4px; | |
| }} | |
| .loader {{ | |
| text-align: center; | |
| padding: 3rem 0; | |
| display: none; /* Hidden by default */ | |
| }} | |
| .spinner {{ | |
| border: 8px solid var(--border-color); | |
| border-top: 8px solid var(--primary-color); | |
| border-radius: 50%; | |
| width: 60px; | |
| height: 60px; | |
| animation: spin 1s linear infinite; | |
| margin: 0 auto 1rem; | |
| }} | |
| @keyframes spin {{ | |
| 0% {{ transform: rotate(0deg); }} | |
| 100% {{ transform: rotate(360deg); }} | |
| }} | |
| .results-table {{ | |
| width: 100%; | |
| border-collapse: collapse; | |
| margin-top: 1.5rem; | |
| }} | |
| .results-table th, .results-table td {{ | |
| padding: 0.8rem; | |
| text-align: left; | |
| border-bottom: 1px solid var(--border-color); | |
| }} | |
| .results-table th {{ | |
| color: var(--primary-color); | |
| }} | |
| .open-port {{ | |
| color: var(--success-color); | |
| font-weight: bold; | |
| }} | |
| .scan-summary p {{ | |
| font-size: 1.1em; | |
| background: #252525; | |
| padding: 0.8em; | |
| border-radius: 6px; | |
| }} | |
| a {{ | |
| color: var(--primary-color); | |
| text-decoration: none; | |
| display: inline-block; | |
| margin-top: 1.5rem; | |
| text-align: center; | |
| width: 100%; | |
| }} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container" id="main-container"> | |
| <!-- The content will be replaced by render_form or render_results --> | |
| {content} | |
| </div> | |
| <script> | |
| function showLoader() {{ | |
| const formContainer = document.getElementById('form-container'); | |
| const loader = document.getElementById('loader'); | |
| if (formContainer && loader) {{ | |
| formContainer.style.display = 'none'; | |
| loader.style.display = 'block'; | |
| }} | |
| }} | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| FORM_CONTENT = """ | |
| <div id="form-container"> | |
| <h1>FastAPI Port Scanner</h1> | |
| <form action="/scan" method="post" onsubmit="showLoader()"> | |
| <div class="form-group"> | |
| <label for="target">Target IP Address or Hostname:</label> | |
| <input type="text" id="target" name="target" value="127.0.0.1" required> | |
| </div> | |
| <div class="form-group"> | |
| <label for="start_port">Start Port:</label> | |
| <input type="number" id="start_port" name="start_port" value="1" min="1" max="65535" required> | |
| </div> | |
| <div class="form-group"> | |
| <label for="end_port">End Port:</label> | |
| <input type="number" id="end_port" name="end_port" value="65535" min="1" max="65535" required> | |
| </div> | |
| <div class="form-group"> | |
| <label for="threads">Number of Threads (Concurrency):</label> | |
| <input type="number" id="threads" name="threads" value="2500" min="1" max="2500" required> | |
| </div> | |
| <button type="submit">Start Scan</button> | |
| </form> | |
| <div class="disclaimer"> | |
| <strong>Warning:</strong> Use only on networks you own or have explicit permission to scan. Unauthorized scanning is illegal. | |
| </div> | |
| </div> | |
| <div class="loader" id="loader"> | |
| <div class="spinner"></div> | |
| <p>Scanning in progress... This may take a moment.</p> | |
| </div> | |
| """ | |
| # --- Scanner Logic --- | |
| def run_scan(target_ip: str, start_port: int, end_port: int, num_threads: int): | |
| """ | |
| Sets up the queue and worker threads for scanning and returns the results. | |
| This is now a self-contained function that returns results instead of using globals. | |
| """ | |
| q = Queue() | |
| open_ports = [] | |
| thread_lock = threading.Lock() | |
| def worker(): | |
| while not q.empty(): | |
| port = q.get() | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| s.settimeout(0.5) # Aggressive timeout | |
| try: | |
| if s.connect_ex((target_ip, port)) == 0: | |
| with thread_lock: | |
| open_ports.append(port) | |
| except (socket.timeout, ConnectionRefusedError): | |
| pass | |
| q.task_done() | |
| # Populate queue | |
| for port in range(start_port, end_port + 1): | |
| q.put(port) | |
| # Create and start threads | |
| threads = [] | |
| for _ in range(num_threads): | |
| thread = threading.Thread(target=worker, daemon=True) | |
| thread.start() | |
| threads.append(thread) | |
| q.join() # Block until all ports are scanned | |
| open_ports.sort() | |
| return open_ports | |
| # --- FastAPI Application --- | |
| app = FastAPI() | |
| async def get_form(): | |
| """Serves the main page with the input form.""" | |
| return HTML_TEMPLATE.format(content=FORM_CONTENT) | |
| async def scan( | |
| target: str = Form(...), | |
| start_port: int = Form(...), | |
| end_port: int = Form(...), | |
| threads: int = Form(...) | |
| ): | |
| """Handles the scan request, runs the scanner, and returns the results page.""" | |
| # Input validation | |
| if not (1 <= start_port <= end_port <= 65535): | |
| return HTMLResponse(content="<h1>Error: Invalid port range.</h1>", status_code=400) | |
| if not (1 <= threads <= 2500): | |
| return HTMLResponse(content="<h1>Error: Thread count must be between 1 and 2500.</h1>", status_code=400) | |
| # Resolve hostname | |
| try: | |
| target_ip = socket.gethostbyname(target) | |
| except socket.gaierror: | |
| return HTMLResponse(content=f"<h1>Error: Could not resolve hostname '{target}'</h1>", status_code=400) | |
| start_time = time.time() | |
| open_ports = run_scan(target_ip, start_port, end_port, threads) | |
| duration = time.time() - start_time | |
| # Build results content | |
| results_html = f""" | |
| <h2>Scan Results</h2> | |
| <div class="scan-summary"> | |
| <p><strong>Target:</strong> {target} ({target_ip})</p> | |
| <p><strong>Time Taken:</strong> {duration:.2f} seconds</p> | |
| <p><strong>Open Ports Found:</strong> {len(open_ports)}</p> | |
| </div> | |
| """ | |
| if open_ports: | |
| rows = "".join(f'<tr><td class="open-port">{port}</td></tr>' for port in open_ports) | |
| results_html += f""" | |
| <table class="results-table"> | |
| <thead><tr><th>Open Port</th></tr></thead> | |
| <tbody>{rows}</tbody> | |
| </table> | |
| """ | |
| else: | |
| results_html += "<p>No open ports were found in the specified range.</p>" | |
| results_html += '<a href="/">Scan Another Target</a>' | |
| return HTML_TEMPLATE.format(content=results_html) | |
| # To run this app, you need uvicorn: `pip install uvicorn` | |
| # Then run from your terminal: `uvicorn main:app --reload` | |
| if __name__ == "__main__": | |
| uvicorn.run("main:app", host="0.0.0.0", port=5000, reload=False) |