InnSight-Backend / api /compression.py
jackonthemike's picture
Initial commit: InnSight scraper backend with Playwright
d77abf8
"""
Response compression middleware for FastAPI.
This module provides GZip and optionally Brotli compression
for API responses, improving transfer speeds significantly.
Usage:
from compression import CompressionMiddleware
app = FastAPI()
app.add_middleware(CompressionMiddleware, minimum_size=500)
"""
import gzip
import io
from typing import Callable, Optional
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.types import ASGIApp
# Check if Brotli is available
try:
import brotli
BROTLI_AVAILABLE = True
except ImportError:
BROTLI_AVAILABLE = False
class CompressionMiddleware(BaseHTTPMiddleware):
"""
Middleware that compresses response bodies using GZip or Brotli.
Features:
- Automatically detects best compression from Accept-Encoding
- Skips small responses (configurable threshold)
- Skips already compressed content
- Preserves streaming responses
"""
# Content types that should be compressed
COMPRESSIBLE_TYPES = frozenset([
"application/json",
"application/xml",
"text/html",
"text/plain",
"text/css",
"text/javascript",
"application/javascript",
"application/x-javascript",
"image/svg+xml",
])
# Content types that should NOT be compressed (already compressed)
NON_COMPRESSIBLE_TYPES = frozenset([
"image/jpeg",
"image/png",
"image/gif",
"image/webp",
"application/zip",
"application/gzip",
"application/pdf",
"video/mp4",
"audio/mpeg",
])
def __init__(
self,
app: ASGIApp,
minimum_size: int = 500,
compression_level: int = 6,
prefer_brotli: bool = True
):
"""
Initialize compression middleware.
Args:
app: The ASGI application
minimum_size: Minimum response size to compress (bytes)
compression_level: GZip compression level (1-9)
prefer_brotli: Prefer Brotli over GZip if available
"""
super().__init__(app)
self.minimum_size = minimum_size
self.compression_level = compression_level
self.prefer_brotli = prefer_brotli and BROTLI_AVAILABLE
async def dispatch(
self,
request: Request,
call_next: Callable
) -> Response:
# Get accepted encodings
accept_encoding = request.headers.get("Accept-Encoding", "")
# Determine best compression
compression = self._get_best_compression(accept_encoding)
if not compression:
return await call_next(request)
# Get response
response = await call_next(request)
# Skip if streaming or already compressed
if isinstance(response, StreamingResponse):
return response
if response.headers.get("Content-Encoding"):
return response
# Check content type
content_type = response.headers.get("Content-Type", "")
base_content_type = content_type.split(";")[0].strip().lower()
if not self._should_compress(base_content_type):
return response
# Get body
body = b""
async for chunk in response.body_iterator:
body += chunk
# Skip small responses
if len(body) < self.minimum_size:
return Response(
content=body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
# Compress
compressed = self._compress(body, compression)
# Only use compressed if smaller
if len(compressed) >= len(body):
return Response(
content=body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type
)
# Build response with compression
headers = dict(response.headers)
headers["Content-Encoding"] = compression
headers["Content-Length"] = str(len(compressed))
headers["Vary"] = "Accept-Encoding"
return Response(
content=compressed,
status_code=response.status_code,
headers=headers,
media_type=response.media_type
)
def _get_best_compression(self, accept_encoding: str) -> Optional[str]:
"""Determine the best compression method"""
accept_encoding = accept_encoding.lower()
# Check for Brotli first if preferred
if self.prefer_brotli and "br" in accept_encoding:
return "br"
if "gzip" in accept_encoding:
return "gzip"
# Fallback to Brotli if available
if BROTLI_AVAILABLE and "br" in accept_encoding:
return "br"
return None
def _should_compress(self, content_type: str) -> bool:
"""Check if content type should be compressed"""
if content_type in self.NON_COMPRESSIBLE_TYPES:
return False
if content_type in self.COMPRESSIBLE_TYPES:
return True
# Compress text and JSON-like types
return content_type.startswith(("text/", "application/json"))
def _compress(self, data: bytes, method: str) -> bytes:
"""Compress data using specified method"""
if method == "br" and BROTLI_AVAILABLE:
return brotli.compress(data, quality=4)
# GZip compression
buffer = io.BytesIO()
with gzip.GzipFile(
mode="wb",
fileobj=buffer,
compresslevel=self.compression_level
) as f:
f.write(data)
return buffer.getvalue()
class ETaggerMiddleware(BaseHTTPMiddleware):
"""
Middleware that adds ETag headers for cache validation.
This enables browsers to use If-None-Match headers
for conditional requests, reducing bandwidth.
"""
def __init__(
self,
app: ASGIApp,
weak_etag: bool = True
):
"""
Initialize ETag middleware.
Args:
app: The ASGI application
weak_etag: Use weak ETags (recommended for dynamic content)
"""
super().__init__(app)
self.weak_etag = weak_etag
async def dispatch(
self,
request: Request,
call_next: Callable
) -> Response:
# Only for GET and HEAD requests
if request.method not in ("GET", "HEAD"):
return await call_next(request)
response = await call_next(request)
# Skip if already has ETag or is streaming
if response.headers.get("ETag") or isinstance(response, StreamingResponse):
return await call_next(request)
# Get body
body = b""
async for chunk in response.body_iterator:
body += chunk
# Calculate ETag
import hashlib
hash_value = hashlib.md5(body).hexdigest()[:16]
etag = f'W/"{hash_value}"' if self.weak_etag else f'"{hash_value}"'
# Check If-None-Match header
if_none_match = request.headers.get("If-None-Match")
if if_none_match and if_none_match == etag:
return Response(
status_code=304,
headers={"ETag": etag}
)
# Add ETag to response
headers = dict(response.headers)
headers["ETag"] = etag
return Response(
content=body,
status_code=response.status_code,
headers=headers,
media_type=response.media_type
)
def get_compression_stats(original_size: int, compressed_size: int) -> dict:
"""
Calculate compression statistics.
Returns:
Dictionary with compression metrics
"""
ratio = (1 - compressed_size / original_size) * 100 if original_size > 0 else 0
return {
"original_size": original_size,
"compressed_size": compressed_size,
"saved_bytes": original_size - compressed_size,
"compression_ratio": f"{ratio:.1f}%"
}