| import json
|
| import logging
|
| import time
|
| from concurrent.futures import ThreadPoolExecutor
|
| from contextvars import ContextVar
|
| from typing import Any, Dict, Generator, List
|
|
|
| from anthropic import Anthropic
|
| from fastapi import FastAPI, HTTPException, Request, Response
|
| from fastapi.responses import JSONResponse, StreamingResponse, HTMLResponse
|
| from fastapi.security import HTTPBearer
|
| from starlette.concurrency import run_in_threadpool
|
| from pathlib import Path
|
| import markdown
|
| from pygments.formatters import HtmlFormatter
|
|
|
|
|
| from schemas import OpenAIChatCompletionForm, FilterForm
|
|
|
|
|
| logger = logging.getLogger()
|
|
|
|
|
| app = FastAPI()
|
| security = HTTPBearer()
|
|
|
|
|
| token_context = ContextVar('token', default=None)
|
|
|
|
|
| PUBLIC_ENDPOINTS = {"/"}
|
|
|
|
|
| AVAILABLE_MODELS = [
|
| "claude-3-haiku-20240307",
|
| "claude-3-opus-20240229",
|
| "claude-3-sonnet-20240229",
|
| "claude-3-5-sonnet-20241022"
|
| ]
|
|
|
| @app.middleware("http")
|
| async def auth_middleware(request: Request, call_next):
|
| """
|
| Middleware for handling authentication and response logging.
|
|
|
| Args:
|
| request: The incoming HTTP request
|
| call_next: The next middleware in the chain
|
|
|
| Returns:
|
| Response: The processed HTTP response
|
| """
|
| if request.url.path in PUBLIC_ENDPOINTS:
|
| start_time = time.perf_counter()
|
| response = await call_next(request)
|
| process_time = time.perf_counter() - start_time
|
| response.headers["X-Process-Time"] = str(process_time)
|
| return response
|
|
|
| try:
|
| auth_header = request.headers.get('Authorization')
|
| if not auth_header:
|
| raise HTTPException(
|
| status_code=401,
|
| detail="No authorization header"
|
| )
|
|
|
| scheme, token = auth_header.split()
|
| if scheme.lower() != 'bearer':
|
| raise HTTPException(
|
| status_code=401,
|
| detail="Invalid authentication scheme"
|
| )
|
|
|
| token_context.set(token)
|
|
|
| start_time = time.perf_counter()
|
| response = await call_next(request)
|
| process_time = time.perf_counter() - start_time
|
| response.headers["X-Process-Time"] = str(process_time)
|
|
|
| return response
|
|
|
| except HTTPException as http_ex:
|
| logger.error(
|
| f"HTTP Exception - Status: {http_ex.status_code} - "
|
| f"Detail: {http_ex.detail} - Path: {request.url.path}"
|
| )
|
| return JSONResponse(
|
| status_code=http_ex.status_code,
|
| content={"detail": http_ex.detail}
|
| )
|
| except Exception as e:
|
| logger.error(
|
| f"Unexpected error in middleware - Error: {str(e)} - "
|
| f"Path: {request.url.path}",
|
| exc_info=True
|
| )
|
| return JSONResponse(
|
| status_code=500,
|
| content={"detail": "Internal server error"}
|
| )
|
|
|
|
|
| def get_anthropic_client():
|
| """
|
| Get an authenticated Anthropic client using the current token.
|
|
|
| Returns:
|
| Anthropic: An authenticated Anthropic client instance
|
|
|
| Raises:
|
| HTTPException: If no authorization token is found
|
| """
|
| token = token_context.get()
|
| if not token:
|
| raise HTTPException(status_code=401, detail="No authorization token found")
|
| return Anthropic(api_key=token)
|
|
|
|
|
|
|
| @app.get("/v1")
|
| @app.get("/")
|
| async def read_root():
|
| """Root endpoint for API health check."""
|
| try:
|
|
|
| readme_path = Path("README.md")
|
| if not readme_path.exists():
|
| return HTMLResponse(content="<h1>README.md non trouvé</h1>")
|
|
|
| md_text = readme_path.read_text(encoding='utf-8')
|
| md_text = '\n'.join(md_text.split('\n')[10:])
|
|
|
|
|
| html = markdown.markdown(
|
| md_text,
|
| extensions=[
|
| 'markdown.extensions.fenced_code',
|
| 'markdown.extensions.tables',
|
| 'markdown.extensions.codehilite',
|
| 'markdown.extensions.sane_lists'
|
| ]
|
| )
|
|
|
|
|
| css_file = Path("main.css")
|
| custom_css = css_file.read_text(encoding='utf-8') if css_file.exists() else ""
|
|
|
|
|
| code_css = HtmlFormatter(style='default').get_style_defs('.codehilite')
|
|
|
|
|
| html_content = f"""
|
| <!DOCTYPE html>
|
| <html>
|
| <head>
|
| <meta charset="utf-8">
|
| <meta name="viewport" content="width=device-width, initial-scale=1">
|
| <style>
|
| {custom_css}
|
| {code_css}
|
| </style>
|
| </head>
|
| <body>
|
| <div class="markdown-body">
|
| {html}
|
| </div>
|
| </body>
|
| </html>
|
| """
|
|
|
| return HTMLResponse(content=html_content)
|
|
|
| except Exception as e:
|
| return HTMLResponse(
|
| content=f"<h1>Erreur: {str(e)}</h1>",
|
| status_code=500
|
| )
|
|
|
|
|
| @app.get("/v1/models")
|
| @app.get("/models")
|
| async def get_models():
|
| """
|
| Get available Anthropic models.
|
|
|
| Returns:
|
| JSONResponse: List of available models and their details
|
| """
|
| get_anthropic_client()
|
|
|
| models = [
|
| {
|
| "id": model_id,
|
| "object": "model",
|
| "name": f"🤖 {model_id}",
|
| "created": int(time.time()),
|
| "owned_by": "anthropic",
|
| "pipeline": {"type": "custom", "valves": False}
|
| }
|
| for model_id in AVAILABLE_MODELS
|
| ]
|
|
|
| return JSONResponse(
|
| content={
|
| "data": models,
|
| "object": "list",
|
| "pipelines": True,
|
| }
|
| )
|
|
|
|
|
| def stream_message(
|
| model: str,
|
| messages: List[Dict[str, Any]]
|
| ) -> Generator[str, None, None]:
|
| """
|
| Stream messages using the specified model.
|
|
|
| Args:
|
| model: The model identifier to use
|
| messages: List of messages to process
|
|
|
| Returns:
|
| Generator: Stream of SSE formatted responses
|
| """
|
| client = get_anthropic_client()
|
| response = client.messages.create(
|
| model=model,
|
| max_tokens=1024,
|
| messages=messages,
|
| stream=True
|
| )
|
|
|
| def event_stream() -> Generator[str, None, None]:
|
| message_id = None
|
|
|
| for chunk in response:
|
| if not message_id:
|
| message_id = f"chatcmpl-{int(time.time())}"
|
|
|
| if chunk.type == 'content_block_delta':
|
| data = {
|
| "id": message_id,
|
| "object": "chat.completion.chunk",
|
| "created": int(time.time()),
|
| "model": model,
|
| "choices": [
|
| {
|
| "index": 0,
|
| "delta": {
|
| "content": (
|
| chunk.delta.text
|
| if hasattr(chunk.delta, 'text')
|
| else ""
|
| )
|
| },
|
| "logprobs": None,
|
| "finish_reason": None,
|
| }
|
| ],
|
| }
|
| yield f"data: {json.dumps(data)}\n\n"
|
|
|
| elif chunk.type == 'content_block_stop':
|
| data = {
|
| "id": message_id,
|
| "object": "chat.completion.chunk",
|
| "created": int(time.time()),
|
| "model": model,
|
| "choices": [
|
| {
|
| "index": 0,
|
| "delta": {},
|
| "logprobs": None,
|
| "finish_reason": "stop",
|
| }
|
| ],
|
| }
|
| yield f"data: {json.dumps(data)}\n\n"
|
|
|
| yield "data: [DONE]\n\n"
|
|
|
| return event_stream()
|
|
|
|
|
| def send_message(model: str, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
|
| """
|
| Send a message via the Anthropic provider without streaming.
|
|
|
| Args:
|
| model: The model identifier to use
|
| messages: List of messages to process
|
|
|
| Returns:
|
| dict: The formatted response from the model
|
| """
|
| client = get_anthropic_client()
|
| response = client.messages.create(
|
| model=model,
|
| max_tokens=1024,
|
| messages=messages
|
| )
|
|
|
| content = response.content[0].text if response.content else ""
|
|
|
| return {
|
| "id": response.id,
|
| "object": "chat.completion",
|
| "created": int(time.time()),
|
| "model": model,
|
| "choices": [
|
| {
|
| "index": 0,
|
| "message": {
|
| "role": "assistant",
|
| "content": content,
|
| },
|
| "logprobs": None,
|
| "finish_reason": "stop",
|
| }
|
| ],
|
| }
|
|
|
|
|
| @app.post("/v1/chat/completions")
|
| @app.post("/chat/completions")
|
| async def generate_chat_completion(form_data: OpenAIChatCompletionForm):
|
| """
|
| Generate chat completions from the model.
|
|
|
| Args:
|
| form_data: The chat completion request parameters
|
|
|
| Returns:
|
| Union[StreamingResponse, dict]: Either a streaming response or a complete message
|
| """
|
| messages = [
|
| {"role": message.role, "content": message.content}
|
| for message in form_data.messages
|
| ]
|
| model = form_data.model
|
|
|
| def job():
|
| """Handle both streaming and non-streaming modes."""
|
| if form_data.stream:
|
| return StreamingResponse(
|
| stream_message(model=model, messages=messages),
|
| media_type="text/event-stream"
|
| )
|
| return send_message(model=model, messages=messages)
|
|
|
| with ThreadPoolExecutor() as executor:
|
| return await run_in_threadpool(job)
|
|
|
|
|
| @app.post("/v1/{pipeline_id}/filter/inlet")
|
| @app.post("/{pipeline_id}/filter/inlet")
|
| async def filter_inlet(pipeline_id: str, form_data: FilterForm):
|
| """
|
| Handle inlet filtering for the pipeline.
|
|
|
| Args:
|
| pipeline_id: The ID of the pipeline
|
| form_data: The filter parameters
|
|
|
| Returns:
|
| dict: The processed request body
|
| """
|
| return form_data.body
|
|
|
|
|
| @app.post("/v1/{pipeline_id}/filter/outlet")
|
| @app.post("/{pipeline_id}/filter/outlet")
|
| async def filter_outlet(pipeline_id: str, form_data: FilterForm):
|
| """
|
| Handle outlet filtering for the pipeline.
|
|
|
| Args:
|
| pipeline_id: The ID of the pipeline
|
| form_data: The filter parameters
|
|
|
| Returns:
|
| dict: The processed request body
|
| """
|
| return form_data.body |