AlexRyder's picture
Upload 5 files
2da028e verified
import os
import uvicorn
from fastapi import FastAPI, Request, HTTPException, status, WebSocketDisconnect
from fastapi.applications import AppType
from fastapi.responses import JSONResponse
from requests import Timeout, TooManyRedirects, RequestException, Session
from connection_manager import *
from request_forwarder_classes import *
PORT: int = int(os.getenv("PORT"))
PASSWORD: str = os.getenv("PASSWORD")
app: AppType = FastAPI()
connection_manager = ConnectionManager(PASSWORD)
def __get_response(forwarded_request: ForwardedRequest, session: Session | None = None) -> Tuple[int, Dict]:
try:
if not session:
session = Session()
response = session.request(
forwarded_request.method,
forwarded_request.url,
headers=forwarded_request.headers,
data=forwarded_request.data,
params=forwarded_request.params,
auth=forwarded_request.auth,
cookies=forwarded_request.cookies,
json=forwarded_request.json)
forwarded_response = ForwardedResponse.from_response(response, forwarded_request.response_content_type)
return status.HTTP_200_OK, {"response": forwarded_response.to_json_dict(), "error": None}
except ConnectionError as e:
return status.HTTP_418_IM_A_TEAPOT, {"response": None, "error": {"ConnectionError": str(e)}}
except Timeout as e:
return status.HTTP_418_IM_A_TEAPOT, {"response": None, "error": {"Timeout": str(e)}}
except TooManyRedirects as e:
return status.HTTP_418_IM_A_TEAPOT, {"response": None, "error": {"TooManyRedirects": str(e)}}
except RequestException as e:
return status.HTTP_418_IM_A_TEAPOT, {"response": None, "error": {"RequestException": str(e)}}
except Exception as e:
return status.HTTP_418_IM_A_TEAPOT, {"response": None, "error": str(e)}
@app.websocket("/ws/forward-requests")
async def websocket_endpoint(websocket: WebSocket):
await connection_manager.connect(websocket)
session = Session()
try:
while True:
request = await websocket.receive_json()
print(request)
forwarded_request = ForwardedRequest.from_json(request)
response_status, response = __get_response(forwarded_request, session)
await websocket.send_json(response)
except WebSocketDisconnect:
connection_manager.disconnect(websocket)
@app.post("/forward-request")
async def forward_request(forwarded_request: ForwardedRequest, request: Request):
authorization: str = request.headers.get("Authorization")
if not authorization:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization header missing")
token_type, _, token = authorization.partition(' ')
if token_type != "Bearer" or not token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authorization header format")
if token != PASSWORD:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token")
response_status, response = __get_response(forwarded_request)
return JSONResponse(content=response, status_code=response_status)
if __name__ == '__main__':
uvicorn.run("main:app", host="0.0.0.0", port=PORT)