white_proxy / app.py
NitinBot001's picture
Update app.py
a051394 verified
import httpx
import asyncio
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
import urllib.parse
app = FastAPI(title="Universal CORS Proxy", version="1.0.0")
# ─── CORS β€” sab domains allow ───────────────────────────────────────────────
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"],
)
# ─── Headers jo proxy nahi karega ────────────────────────────────────────────
HOP_BY_HOP_HEADERS = {
"connection", "keep-alive", "proxy-authenticate", "proxy-authorization",
"te", "trailers", "transfer-encoding", "upgrade",
"host", # host hum khud set karenge
"content-length", # FastAPI khud recalculate karega β€” mismatch error fix
}
TIMEOUT = httpx.Timeout(30.0, connect=10.0)
def extract_target_url(path: str, query_string: str) -> str:
"""
Path se target URL nikalo.
Formats supported:
/https://api.example.com/v1/users
/http://localhost:3000/data
"""
# leading slash hata do
raw = path.lstrip("/")
# agar URL encoded hai toh decode karo
decoded = urllib.parse.unquote(raw)
# scheme verify karo
if not (decoded.startswith("http://") or decoded.startswith("https://")):
return None
# query string attach karo agar hai
if query_string:
decoded = f"{decoded}?{query_string}"
return decoded
def filter_headers(headers: dict, skip: set = HOP_BY_HOP_HEADERS) -> dict:
"""Hop-by-hop headers remove karo, baaki forward karo."""
return {k: v for k, v in headers.items() if k.lower() not in skip}
@app.get("/")
async def root():
return {
"service": "Universal CORS Proxy",
"usage": "/{target_url}",
"example": "/https://api.github.com/users/octocat",
"methods": ["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"],
}
@app.api_route("/{full_path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
async def proxy(full_path: str, request: Request):
# ── 1. Target URL extract karo ──────────────────────────────────────────
query_string = request.url.query
target_url = extract_target_url(full_path, query_string)
if not target_url:
return Response(
content='{"error": "Invalid URL. Use: /{full_url} e.g. /https://api.example.com/endpoint"}',
status_code=400,
media_type="application/json",
)
# ── 2. Request body lo ──────────────────────────────────────────────────
body = await request.body()
# ── 3. Headers filter karo ──────────────────────────────────────────────
forward_headers = filter_headers(dict(request.headers))
# ── 4. Actual request bhejo ─────────────────────────────────────────────
async with httpx.AsyncClient(
timeout=TIMEOUT,
follow_redirects=True,
verify=True, # SSL verify on β€” production ke liye
) as client:
try:
upstream_response = await client.request(
method=request.method,
url=target_url,
headers=forward_headers,
content=body if body else None,
)
except httpx.ConnectError as e:
return Response(
content=f'{{"error": "Could not connect to target", "detail": "{str(e)}"}}',
status_code=502,
media_type="application/json",
)
except httpx.TimeoutException:
return Response(
content='{"error": "Request to target timed out"}',
status_code=504,
media_type="application/json",
)
except Exception as e:
return Response(
content=f'{{"error": "Proxy error", "detail": "{str(e)}"}}',
status_code=500,
media_type="application/json",
)
# ── 5. Response headers filter karo + CORS inject karo ──────────────────
response_headers = filter_headers(dict(upstream_response.headers))
response_headers["Access-Control-Allow-Origin"] = "*"
response_headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS"
response_headers["Access-Control-Allow-Headers"] = "*"
response_headers["X-Proxied-By"] = "universal-cors-proxy"
response_headers["X-Original-URL"] = target_url.split("?")[0] # query strip for safety
# ── 6. Response return karo ─────────────────────────────────────────────
return Response(
content=upstream_response.content,
status_code=upstream_response.status_code,
headers=response_headers,
media_type=upstream_response.headers.get("content-type", "application/octet-stream"),
)
# ─── Run ─────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)