Spaces:
Sleeping
Sleeping
| """ | |
| Response compression middleware for FastAPI. | |
| This module provides GZip and optionally Brotli compression | |
| for API responses, improving transfer speeds significantly. | |
| Usage: | |
| from compression import CompressionMiddleware | |
| app = FastAPI() | |
| app.add_middleware(CompressionMiddleware, minimum_size=500) | |
| """ | |
| import gzip | |
| import io | |
| from typing import Callable, Optional | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| from starlette.requests import Request | |
| from starlette.responses import Response, StreamingResponse | |
| from starlette.types import ASGIApp | |
| # Check if Brotli is available | |
| try: | |
| import brotli | |
| BROTLI_AVAILABLE = True | |
| except ImportError: | |
| BROTLI_AVAILABLE = False | |
| class CompressionMiddleware(BaseHTTPMiddleware): | |
| """ | |
| Middleware that compresses response bodies using GZip or Brotli. | |
| Features: | |
| - Automatically detects best compression from Accept-Encoding | |
| - Skips small responses (configurable threshold) | |
| - Skips already compressed content | |
| - Preserves streaming responses | |
| """ | |
| # Content types that should be compressed | |
| COMPRESSIBLE_TYPES = frozenset([ | |
| "application/json", | |
| "application/xml", | |
| "text/html", | |
| "text/plain", | |
| "text/css", | |
| "text/javascript", | |
| "application/javascript", | |
| "application/x-javascript", | |
| "image/svg+xml", | |
| ]) | |
| # Content types that should NOT be compressed (already compressed) | |
| NON_COMPRESSIBLE_TYPES = frozenset([ | |
| "image/jpeg", | |
| "image/png", | |
| "image/gif", | |
| "image/webp", | |
| "application/zip", | |
| "application/gzip", | |
| "application/pdf", | |
| "video/mp4", | |
| "audio/mpeg", | |
| ]) | |
| def __init__( | |
| self, | |
| app: ASGIApp, | |
| minimum_size: int = 500, | |
| compression_level: int = 6, | |
| prefer_brotli: bool = True | |
| ): | |
| """ | |
| Initialize compression middleware. | |
| Args: | |
| app: The ASGI application | |
| minimum_size: Minimum response size to compress (bytes) | |
| compression_level: GZip compression level (1-9) | |
| prefer_brotli: Prefer Brotli over GZip if available | |
| """ | |
| super().__init__(app) | |
| self.minimum_size = minimum_size | |
| self.compression_level = compression_level | |
| self.prefer_brotli = prefer_brotli and BROTLI_AVAILABLE | |
| async def dispatch( | |
| self, | |
| request: Request, | |
| call_next: Callable | |
| ) -> Response: | |
| # Get accepted encodings | |
| accept_encoding = request.headers.get("Accept-Encoding", "") | |
| # Determine best compression | |
| compression = self._get_best_compression(accept_encoding) | |
| if not compression: | |
| return await call_next(request) | |
| # Get response | |
| response = await call_next(request) | |
| # Skip if streaming or already compressed | |
| if isinstance(response, StreamingResponse): | |
| return response | |
| if response.headers.get("Content-Encoding"): | |
| return response | |
| # Check content type | |
| content_type = response.headers.get("Content-Type", "") | |
| base_content_type = content_type.split(";")[0].strip().lower() | |
| if not self._should_compress(base_content_type): | |
| return response | |
| # Get body | |
| body = b"" | |
| async for chunk in response.body_iterator: | |
| body += chunk | |
| # Skip small responses | |
| if len(body) < self.minimum_size: | |
| return Response( | |
| content=body, | |
| status_code=response.status_code, | |
| headers=dict(response.headers), | |
| media_type=response.media_type | |
| ) | |
| # Compress | |
| compressed = self._compress(body, compression) | |
| # Only use compressed if smaller | |
| if len(compressed) >= len(body): | |
| return Response( | |
| content=body, | |
| status_code=response.status_code, | |
| headers=dict(response.headers), | |
| media_type=response.media_type | |
| ) | |
| # Build response with compression | |
| headers = dict(response.headers) | |
| headers["Content-Encoding"] = compression | |
| headers["Content-Length"] = str(len(compressed)) | |
| headers["Vary"] = "Accept-Encoding" | |
| return Response( | |
| content=compressed, | |
| status_code=response.status_code, | |
| headers=headers, | |
| media_type=response.media_type | |
| ) | |
| def _get_best_compression(self, accept_encoding: str) -> Optional[str]: | |
| """Determine the best compression method""" | |
| accept_encoding = accept_encoding.lower() | |
| # Check for Brotli first if preferred | |
| if self.prefer_brotli and "br" in accept_encoding: | |
| return "br" | |
| if "gzip" in accept_encoding: | |
| return "gzip" | |
| # Fallback to Brotli if available | |
| if BROTLI_AVAILABLE and "br" in accept_encoding: | |
| return "br" | |
| return None | |
| def _should_compress(self, content_type: str) -> bool: | |
| """Check if content type should be compressed""" | |
| if content_type in self.NON_COMPRESSIBLE_TYPES: | |
| return False | |
| if content_type in self.COMPRESSIBLE_TYPES: | |
| return True | |
| # Compress text and JSON-like types | |
| return content_type.startswith(("text/", "application/json")) | |
| def _compress(self, data: bytes, method: str) -> bytes: | |
| """Compress data using specified method""" | |
| if method == "br" and BROTLI_AVAILABLE: | |
| return brotli.compress(data, quality=4) | |
| # GZip compression | |
| buffer = io.BytesIO() | |
| with gzip.GzipFile( | |
| mode="wb", | |
| fileobj=buffer, | |
| compresslevel=self.compression_level | |
| ) as f: | |
| f.write(data) | |
| return buffer.getvalue() | |
| class ETaggerMiddleware(BaseHTTPMiddleware): | |
| """ | |
| Middleware that adds ETag headers for cache validation. | |
| This enables browsers to use If-None-Match headers | |
| for conditional requests, reducing bandwidth. | |
| """ | |
| def __init__( | |
| self, | |
| app: ASGIApp, | |
| weak_etag: bool = True | |
| ): | |
| """ | |
| Initialize ETag middleware. | |
| Args: | |
| app: The ASGI application | |
| weak_etag: Use weak ETags (recommended for dynamic content) | |
| """ | |
| super().__init__(app) | |
| self.weak_etag = weak_etag | |
| async def dispatch( | |
| self, | |
| request: Request, | |
| call_next: Callable | |
| ) -> Response: | |
| # Only for GET and HEAD requests | |
| if request.method not in ("GET", "HEAD"): | |
| return await call_next(request) | |
| response = await call_next(request) | |
| # Skip if already has ETag or is streaming | |
| if response.headers.get("ETag") or isinstance(response, StreamingResponse): | |
| return await call_next(request) | |
| # Get body | |
| body = b"" | |
| async for chunk in response.body_iterator: | |
| body += chunk | |
| # Calculate ETag | |
| import hashlib | |
| hash_value = hashlib.md5(body).hexdigest()[:16] | |
| etag = f'W/"{hash_value}"' if self.weak_etag else f'"{hash_value}"' | |
| # Check If-None-Match header | |
| if_none_match = request.headers.get("If-None-Match") | |
| if if_none_match and if_none_match == etag: | |
| return Response( | |
| status_code=304, | |
| headers={"ETag": etag} | |
| ) | |
| # Add ETag to response | |
| headers = dict(response.headers) | |
| headers["ETag"] = etag | |
| return Response( | |
| content=body, | |
| status_code=response.status_code, | |
| headers=headers, | |
| media_type=response.media_type | |
| ) | |
| def get_compression_stats(original_size: int, compressed_size: int) -> dict: | |
| """ | |
| Calculate compression statistics. | |
| Returns: | |
| Dictionary with compression metrics | |
| """ | |
| ratio = (1 - compressed_size / original_size) * 100 if original_size > 0 else 0 | |
| return { | |
| "original_size": original_size, | |
| "compressed_size": compressed_size, | |
| "saved_bytes": original_size - compressed_size, | |
| "compression_ratio": f"{ratio:.1f}%" | |
| } | |