Spaces:
Sleeping
Sleeping
import os | |
import uvicorn | |
from fastapi import FastAPI, Request, HTTPException, status, WebSocketDisconnect | |
from fastapi.applications import AppType | |
from fastapi.responses import JSONResponse | |
from requests import Timeout, TooManyRedirects, RequestException, Session | |
from connection_manager import * | |
from request_forwarder_classes import * | |
PORT: int = int(os.getenv("PORT")) | |
PASSWORD: str = os.getenv("PASSWORD") | |
app: AppType = FastAPI() | |
connection_manager = ConnectionManager(PASSWORD) | |
def __get_response(forwarded_request: ForwardedRequest, session: Session | None = None) -> Tuple[int, Dict]: | |
try: | |
if not session: | |
session = Session() | |
response = session.request( | |
forwarded_request.method, | |
forwarded_request.url, | |
headers=forwarded_request.headers, | |
data=forwarded_request.data, | |
params=forwarded_request.params, | |
auth=forwarded_request.auth, | |
cookies=forwarded_request.cookies, | |
json=forwarded_request.json) | |
forwarded_response = ForwardedResponse.from_response(response, forwarded_request.response_content_type) | |
return status.HTTP_200_OK, {"response": forwarded_response.to_json_dict(), "error": None} | |
except ConnectionError as e: | |
return status.HTTP_418_IM_A_TEAPOT, {"response": None, "error": {"ConnectionError": str(e)}} | |
except Timeout as e: | |
return status.HTTP_418_IM_A_TEAPOT, {"response": None, "error": {"Timeout": str(e)}} | |
except TooManyRedirects as e: | |
return status.HTTP_418_IM_A_TEAPOT, {"response": None, "error": {"TooManyRedirects": str(e)}} | |
except RequestException as e: | |
return status.HTTP_418_IM_A_TEAPOT, {"response": None, "error": {"RequestException": str(e)}} | |
except Exception as e: | |
return status.HTTP_418_IM_A_TEAPOT, {"response": None, "error": str(e)} | |
async def websocket_endpoint(websocket: WebSocket): | |
await connection_manager.connect(websocket) | |
session = Session() | |
try: | |
while True: | |
request = await websocket.receive_json() | |
print(request) | |
forwarded_request = ForwardedRequest.from_json(request) | |
response_status, response = __get_response(forwarded_request, session) | |
await websocket.send_json(response) | |
except WebSocketDisconnect: | |
connection_manager.disconnect(websocket) | |
async def forward_request(forwarded_request: ForwardedRequest, request: Request): | |
authorization: str = request.headers.get("Authorization") | |
if not authorization: | |
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization header missing") | |
token_type, _, token = authorization.partition(' ') | |
if token_type != "Bearer" or not token: | |
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authorization header format") | |
if token != PASSWORD: | |
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid token") | |
response_status, response = __get_response(forwarded_request) | |
return JSONResponse(content=response, status_code=response_status) | |
if __name__ == '__main__': | |
uvicorn.run("main:app", host="0.0.0.0", port=PORT) | |