|
import os |
|
import re |
|
import logging |
|
import uuid |
|
import time |
|
from datetime import datetime, timezone, timedelta |
|
from collections import defaultdict |
|
from typing import Optional, Dict, Any |
|
import asyncio |
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path, Request |
|
from fastapi.responses import StreamingResponse |
|
from pydantic import BaseModel, Field |
|
|
|
import openai |
|
import google.generativeai as genai |
|
from google.generativeai.types import GenerationConfig |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
CUSTOM_API_BASE_URL_DEFAULT = "https://api-q3ieh5raqfuad9o8.aistudio-app.com/v1" |
|
CUSTOM_API_MODEL_DEFAULT = "gemma3:27b" |
|
DEFAULT_GEMINI_MODEL = "gemini-2.0-flash" |
|
GEMINI_REQUEST_TIMEOUT_SECONDS = 300 |
|
|
|
|
|
tasks_db: Dict[str, Dict[str, Any]] = {} |
|
|
|
|
|
class ChatPayload(BaseModel): |
|
message: str |
|
temperature: float = Field(0.6, ge=0.0, le=1.0) |
|
|
|
class GeminiTaskRequest(BaseModel): |
|
message: str |
|
url: Optional[str] = None |
|
gemini_model: Optional[str] = None |
|
api_key: Optional[str] = Field(None, description="Gemini API Key (optional; uses Space secret if not provided)") |
|
|
|
class TaskSubmissionResponse(BaseModel): |
|
task_id: str |
|
status: str |
|
task_detail_url: str |
|
|
|
class TaskStatusResponse(BaseModel): |
|
task_id: str |
|
status: str |
|
submitted_at: datetime |
|
last_updated_at: datetime |
|
result: Optional[str] = None |
|
error: Optional[str] = None |
|
|
|
|
|
|
|
|
|
class RateLimiter: |
|
def __init__(self, max_requests: int, time_window: timedelta): |
|
self.max_requests = max_requests |
|
self.time_window = time_window |
|
self.requests: Dict[str, list] = defaultdict(list) |
|
|
|
def _cleanup_old_requests(self, user_ip: str) -> None: |
|
"""Remove requests that are outside the time window.""" |
|
current_time = time.time() |
|
self.requests[user_ip] = [ |
|
timestamp for timestamp in self.requests[user_ip] |
|
if current_time - timestamp < self.time_window.total_seconds() |
|
] |
|
|
|
def is_rate_limited(self, user_ip: str) -> bool: |
|
"""Check if the user has exceeded their rate limit.""" |
|
self._cleanup_old_requests(user_ip) |
|
|
|
|
|
current_count = len(self.requests[user_ip]) |
|
|
|
|
|
current_time = time.time() |
|
self.requests[user_ip].append(current_time) |
|
|
|
|
|
return (current_count + 1) > self.max_requests |
|
|
|
def get_current_count(self, user_ip: str) -> int: |
|
"""Get the current request count for an IP.""" |
|
self._cleanup_old_requests(user_ip) |
|
return len(self.requests[user_ip]) |
|
|
|
|
|
|
|
rate_limiter = RateLimiter( |
|
max_requests=50, |
|
time_window=timedelta(days=1) |
|
) |
|
|
|
def get_user_ip(request: Request) -> str: |
|
"""Helper function to get user's IP address.""" |
|
forwarded = request.headers.get("X-Forwarded-For") |
|
if forwarded: |
|
return forwarded.split(",")[0] |
|
return request.client.host |
|
|
|
|
|
class ApiRotator: |
|
def __init__(self, apis): |
|
self.apis = apis |
|
self.last_successful_index = None |
|
|
|
def get_prioritized_apis(self): |
|
if self.last_successful_index is not None: |
|
|
|
rotated_apis = ( |
|
[self.apis[self.last_successful_index]] + |
|
self.apis[:self.last_successful_index] + |
|
self.apis[self.last_successful_index+1:] |
|
) |
|
return rotated_apis |
|
return self.apis |
|
|
|
def update_last_successful(self, index): |
|
self.last_successful_index = index |
|
|
|
|
|
|
|
app = FastAPI( |
|
title="Dual Chat & Async Gemini API", |
|
description="Made by Cody from chrunos.com.", |
|
version="2.0.0" |
|
) |
|
|
|
|
|
def is_video_url_for_gemini(url: Optional[str]) -> bool: |
|
if not url: |
|
return False |
|
|
|
youtube_regex = ( |
|
r'(https_?://)?(www\.)?' |
|
r'(youtube|youtu|youtube-nocookie)\.(com|be)/' |
|
r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})' |
|
) |
|
|
|
googleusercontent_youtube_regex = r'https_?://googleusercontent\.com/youtube\.com/\w+' |
|
|
|
return re.match(youtube_regex, url) is not None or \ |
|
re.match(googleusercontent_youtube_regex, url) is not None |
|
|
|
async def process_gemini_request_background( |
|
task_id: str, |
|
user_message: str, |
|
input_url: Optional[str], |
|
requested_gemini_model: str, |
|
gemini_key_to_use: str |
|
): |
|
logger.info(f"[Task {task_id}] Starting background Gemini processing. Model: {requested_gemini_model}, URL: {input_url}") |
|
tasks_db[task_id]["status"] = "PROCESSING" |
|
tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc) |
|
|
|
try: |
|
genai.configure(api_key=gemini_key_to_use) |
|
|
|
model_instance = genai.GenerativeModel(model_name=requested_gemini_model) |
|
|
|
content_parts = [{"text": user_message}] |
|
if input_url and is_video_url_for_gemini(input_url): |
|
logger.info(f"[Task {task_id}] Adding video URL to Gemini content: {input_url}") |
|
content_parts.append({ |
|
"file_data": { |
|
"mime_type": "video/youtube", |
|
"file_uri": input_url |
|
} |
|
}) |
|
|
|
gemini_contents = [{"parts": content_parts}] |
|
|
|
generation_config = GenerationConfig(candidate_count=1) |
|
request_options = {"timeout": GEMINI_REQUEST_TIMEOUT_SECONDS} |
|
|
|
logger.info(f"[Task {task_id}] Sending request to Gemini API...") |
|
response = await model_instance.generate_content_async( |
|
gemini_contents, |
|
stream=False, |
|
generation_config=generation_config, |
|
request_options=request_options |
|
) |
|
|
|
|
|
|
|
full_response_text = "" |
|
if hasattr(response, 'text') and response.text: |
|
full_response_text = response.text |
|
elif hasattr(response, 'parts'): |
|
for part in response.parts: |
|
if hasattr(part, 'text'): |
|
full_response_text += part.text |
|
else: |
|
|
|
|
|
|
|
logger.warning(f"[Task {task_id}] Gemini response structure not as expected or empty. Response: {response}") |
|
|
|
|
|
if not full_response_text and response.prompt_feedback and response.prompt_feedback.block_reason: |
|
block_reason_name = response.prompt_feedback.block_reason.name if hasattr(response.prompt_feedback.block_reason, 'name') else str(response.prompt_feedback.block_reason) |
|
logger.warning(f"[Task {task_id}] Gemini content blocked: {block_reason_name}") |
|
tasks_db[task_id]["status"] = "FAILED" |
|
tasks_db[task_id]["error"] = f"Content blocked by Gemini due to: {block_reason_name}" |
|
elif full_response_text: |
|
logger.info(f"[Task {task_id}] Gemini processing successful. Result length: {len(full_response_text)}") |
|
tasks_db[task_id]["status"] = "COMPLETED" |
|
tasks_db[task_id]["result"] = full_response_text |
|
else: |
|
logger.warning(f"[Task {task_id}] Gemini processing completed but no text content found and no block reason.") |
|
tasks_db[task_id]["status"] = "FAILED" |
|
tasks_db[task_id]["error"] = "Gemini returned no content and no specific block reason." |
|
|
|
except Exception as e: |
|
logger.error(f"[Task {task_id}] Error during Gemini background processing: {e}", exc_info=True) |
|
tasks_db[task_id]["status"] = "FAILED" |
|
tasks_db[task_id]["error"] = str(e) |
|
finally: |
|
tasks_db[task_id]["last_updated_at"] = datetime.now(timezone.utc) |
|
|
|
|
|
|
|
@app.post("/chat", response_class=StreamingResponse) |
|
async def direct_chat(payload: ChatPayload, request: Request): |
|
logger.info(f"Direct chat request received. Temperature: {payload.temperature}, Message: '{payload.message[:50]}...'") |
|
user_ip = get_user_ip(request) |
|
|
|
if rate_limiter.is_rate_limited(user_ip): |
|
current_count = rate_limiter.get_current_count(user_ip) |
|
raise HTTPException( |
|
status_code=429, |
|
detail={ |
|
"error": "You have exceeded the maximum number of requests per day. Please try again tomorrow.", |
|
"url": "https://t.me/chrunoss" |
|
} |
|
) |
|
custom_api_key_secret = os.getenv("CUSTOM_API_SECRET_KEY") |
|
custom_api_base_url = os.getenv("CUSTOM_API_BASE_URL", CUSTOM_API_BASE_URL_DEFAULT) |
|
custom_api_model = os.getenv("CUSTOM_API_MODEL", CUSTOM_API_MODEL_DEFAULT) |
|
|
|
if not custom_api_key_secret: |
|
logger.error("Custom API key ('CUSTOM_API_SECRET_KEY') is not configured for /chat.") |
|
raise HTTPException(status_code=500, detail="Custom API key not configured.") |
|
|
|
async def custom_api_streamer(): |
|
client = None |
|
try: |
|
logger.info("Sending request to Custom API for /chat.") |
|
|
|
|
|
from openai import AsyncOpenAI |
|
client = AsyncOpenAI( |
|
api_key=custom_api_key_secret, |
|
base_url=custom_api_base_url, |
|
timeout=60.0 |
|
) |
|
|
|
stream = await client.chat.completions.create( |
|
model=custom_api_model, |
|
temperature=payload.temperature, |
|
messages=[{"role": "user", "content": payload.message}], |
|
stream=True |
|
) |
|
|
|
async for chunk in stream: |
|
try: |
|
|
|
if hasattr(chunk.choices[0].delta, "reasoning_content") and chunk.choices[0].delta.reasoning_content: |
|
yield chunk.choices[0].delta.reasoning_content |
|
elif chunk.choices[0].delta.content is not None: |
|
yield chunk.choices[0].delta.content |
|
|
|
except (IndexError, AttributeError) as e: |
|
|
|
continue |
|
except Exception as e: |
|
logger.warning(f"Skipping chunk due to error: {e}") |
|
continue |
|
|
|
except Exception as e: |
|
logger.error(f"Error during Custom API call for /chat: {e}", exc_info=True) |
|
|
|
|
|
if "peer closed connection" in str(e) or "incomplete chunked read" in str(e): |
|
yield "Connection interrupted. Please try again." |
|
else: |
|
yield f"Error processing with Custom API: {str(e)}" |
|
|
|
finally: |
|
if client: |
|
try: |
|
await client.close() |
|
except Exception as cleanup_error: |
|
logger.warning(f"Error closing OpenAI client: {cleanup_error}") |
|
|
|
return StreamingResponse( |
|
custom_api_streamer(), |
|
media_type="text/plain", |
|
headers={ |
|
"Cache-Control": "no-cache", |
|
"Connection": "keep-alive", |
|
} |
|
) |
|
|
|
@app.post("/gemini/submit_task", response_model=TaskSubmissionResponse) |
|
async def submit_gemini_task(request: GeminiTaskRequest, background_tasks: BackgroundTasks): |
|
task_id = str(uuid.uuid4()) |
|
logger.info(f"Received Gemini task submission. Assigning Task ID: {task_id}. Message: '{request.message[:50]}...'") |
|
|
|
gemini_api_key_from_request = request.api_key |
|
gemini_api_key_secret = os.getenv("GEMINI_API_KEY") |
|
key_to_use = gemini_api_key_from_request |
|
|
|
if not key_to_use: |
|
logger.error(f"[Task {task_id}] Gemini API Key missing for task submission.") |
|
raise HTTPException(status_code=400, detail="Gemini API Key required.") |
|
|
|
requested_model = request.gemini_model or DEFAULT_GEMINI_MODEL |
|
|
|
current_time = datetime.now(timezone.utc) |
|
tasks_db[task_id] = { |
|
"status": "PENDING", |
|
"result": None, |
|
"error": None, |
|
"submitted_at": current_time, |
|
"last_updated_at": current_time, |
|
"request_params": request.model_dump() |
|
} |
|
|
|
background_tasks.add_task( |
|
process_gemini_request_background, |
|
task_id, |
|
request.message, |
|
request.url, |
|
requested_model, |
|
key_to_use |
|
) |
|
|
|
logger.info(f"[Task {task_id}] Task submitted to background processing.") |
|
return TaskSubmissionResponse( |
|
task_id=task_id, |
|
status="PENDING", |
|
task_detail_url=f"/gemini/task/{task_id}" |
|
) |
|
|
|
|
|
|
|
@app.get("/gemini/task/{task_id}", response_model=TaskStatusResponse) |
|
async def get_gemini_task_status(task_id: str = Path(..., description="The ID of the task to retrieve")): |
|
logger.info(f"Status query for Task ID: {task_id}") |
|
task = tasks_db.get(task_id) |
|
if not task: |
|
logger.warning(f"Task ID not found: {task_id}") |
|
raise HTTPException(status_code=404, detail="Task ID not found.") |
|
|
|
logger.info(f"[Task {task_id}] Current status: {task['status']}") |
|
return TaskStatusResponse( |
|
task_id=task_id, |
|
status=task["status"], |
|
submitted_at=task["submitted_at"], |
|
last_updated_at=task["last_updated_at"], |
|
result=task.get("result"), |
|
error=task.get("error"), |
|
|
|
) |
|
|
|
@app.get("/") |
|
async def read_root(): |
|
logger.info("Root endpoint '/' accessed (health check).") |
|
return {"message": "API for Direct Chat and Async Gemini Tasks is running."} |