TelegramSteamBot / main.py
mrpoddaa's picture
Upload 13 files
18b952c verified
"""
Telegram Multi-Part File Streamer - Main Application
High-performance file upload and streaming service with zero-disk buffering
"""
import asyncio
import logging
from typing import AsyncGenerator, Optional
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException, Response
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from session_manager import SessionManager
from database import Database, FileMetadata
from utils import (
calculate_part_and_offset,
generate_unique_id,
CHUNK_SIZE,
MAX_PART_SIZE
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Global instances
session_manager: Optional[SessionManager] = None
database: Optional[Database] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager"""
global session_manager, database
logger.info("Initializing application...")
# Initialize database
database = Database()
await database.connect()
# Initialize session manager
session_manager = SessionManager()
await session_manager.initialize()
logger.info("Application initialized successfully")
yield
# Cleanup
logger.info("Shutting down application...")
await session_manager.cleanup()
await database.disconnect()
logger.info("Application shutdown complete")
# Initialize FastAPI app
app = FastAPI(
title="Telegram Multi-Part File Streamer",
description="High-performance file upload and streaming service",
version="1.0.0",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
"""Health check endpoint"""
return {
"status": "online",
"service": "Telegram Multi-Part File Streamer",
"version": "1.0.0"
}
@app.get("/health")
async def health_check():
"""Detailed health check"""
session_count = len(session_manager.sessions) if session_manager else 0
db_connected = database.is_connected() if database else False
return {
"status": "healthy" if (session_count > 0 and db_connected) else "degraded",
"sessions": session_count,
"database": "connected" if db_connected else "disconnected"
}
@app.post("/upload")
async def upload_file(request: Request, filename: Optional[str] = None):
"""
High-speed zero-disk file upload endpoint
Streams data directly from HTTP to Telegram with auto-splitting
"""
if not session_manager or not database:
raise HTTPException(status_code=503, detail="Service not initialized")
logger.info(f"Upload request received: filename={filename}")
unique_id = generate_unique_id()
file_parts = []
total_size = 0
part_number = 0
try:
# Create async generator from request stream
async def request_stream() -> AsyncGenerator[bytes, None]:
async for chunk in request.stream():
yield chunk
# Buffer for part assembly
part_buffer = bytearray()
async for chunk in request_stream():
part_buffer.extend(chunk)
# Check if we need to upload this part
while len(part_buffer) >= MAX_PART_SIZE:
part_number += 1
part_data = bytes(part_buffer[:MAX_PART_SIZE])
part_buffer = part_buffer[MAX_PART_SIZE:]
logger.info(f"Uploading part {part_number} ({len(part_data)} bytes)")
# Upload part to Telegram
file_id = await session_manager.upload_part(
part_data,
f"{filename or unique_id}_part_{part_number}"
)
file_parts.append({
"part_number": part_number,
"file_id": file_id,
"size": len(part_data)
})
total_size += len(part_data)
logger.info(
f"Part {part_number} uploaded successfully. "
f"Total size: {total_size / (1024**3):.2f} GB"
)
# Upload remaining data as final part
if len(part_buffer) > 0:
part_number += 1
part_data = bytes(part_buffer)
logger.info(f"Uploading final part {part_number} ({len(part_data)} bytes)")
file_id = await session_manager.upload_part(
part_data,
f"{filename or unique_id}_part_{part_number}"
)
file_parts.append({
"part_number": part_number,
"file_id": file_id,
"size": len(part_data)
})
total_size += len(part_data)
# Store metadata in database
metadata = FileMetadata(
unique_id=unique_id,
filename=filename or f"file_{unique_id}",
total_size=total_size,
parts=file_parts,
part_count=part_number
)
await database.save_file_metadata(metadata)
logger.info(
f"Upload completed: unique_id={unique_id}, "
f"parts={part_number}, total_size={total_size / (1024**3):.2f} GB"
)
return {
"success": True,
"unique_id": unique_id,
"filename": metadata.filename,
"total_size": total_size,
"parts": part_number,
"download_url": f"/dl/{unique_id}"
}
except Exception as e:
logger.error(f"Upload failed: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
@app.get("/dl/{unique_id}")
async def stream_file(unique_id: str, request: Request):
"""
High-speed streaming endpoint with full range request support
Supports multi-part concatenation and parallel connections
"""
if not session_manager or not database:
raise HTTPException(status_code=503, detail="Service not initialized")
# Fetch file metadata
metadata = await database.get_file_metadata(unique_id)
if not metadata:
raise HTTPException(status_code=404, detail="File not found")
# Parse range header
range_header = request.headers.get("range")
start = 0
end = metadata.total_size - 1
status_code = 200
if range_header:
# Parse range: bytes=start-end
range_str = range_header.replace("bytes=", "")
range_parts = range_str.split("-")
if range_parts[0]:
start = int(range_parts[0])
if range_parts[1]:
end = int(range_parts[1])
status_code = 206 # Partial Content
# Validate range
if start < 0 or end >= metadata.total_size or start > end:
raise HTTPException(status_code=416, detail="Range not satisfiable")
logger.info(
f"Streaming request: unique_id={unique_id}, "
f"range={start}-{end}, size={end - start + 1}"
)
# Create streaming response
content_length = end - start + 1
headers = {
"Content-Type": "application/octet-stream",
"Content-Length": str(content_length),
"Accept-Ranges": "bytes",
"Content-Disposition": f'attachment; filename="{metadata.filename}"',
}
if status_code == 206:
headers["Content-Range"] = f"bytes {start}-{end}/{metadata.total_size}"
async def stream_generator() -> AsyncGenerator[bytes, None]:
"""Generate stream from Telegram parts"""
bytes_sent = 0
current_position = 0
for part in metadata.parts:
part_start = current_position
part_end = current_position + part["size"] - 1
# Check if this part overlaps with requested range
if part_end < start:
current_position += part["size"]
continue
if part_start > end:
break
# Calculate offset within this part
offset_in_part = max(0, start - part_start)
bytes_to_read = min(
part["size"] - offset_in_part,
content_length - bytes_sent
)
logger.debug(
f"Streaming part {part['part_number']}: "
f"offset={offset_in_part}, bytes={bytes_to_read}"
)
# Stream this part with retry logic
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
async for chunk in session_manager.stream_part(
part["file_id"],
offset=offset_in_part,
limit=bytes_to_read
):
chunk_size = len(chunk)
# Ensure we don't send more than requested
if bytes_sent + chunk_size > content_length:
chunk = chunk[:content_length - bytes_sent]
chunk_size = len(chunk)
yield chunk
bytes_sent += chunk_size
if bytes_sent >= content_length:
return
break # Success
except Exception as e:
retry_count += 1
if retry_count >= max_retries:
logger.error(
f"Failed to stream part {part['part_number']}: {str(e)}"
)
raise
wait_time = 2 ** retry_count
logger.warning(
f"Retry {retry_count}/{max_retries} for part "
f"{part['part_number']} after {wait_time}s"
)
await asyncio.sleep(wait_time)
current_position += part["size"]
return StreamingResponse(
stream_generator(),
status_code=status_code,
headers=headers,
media_type="application/octet-stream"
)
@app.get("/info/{unique_id}")
async def get_file_info(unique_id: str):
"""Get file metadata and information"""
if not database:
raise HTTPException(status_code=503, detail="Service not initialized")
metadata = await database.get_file_metadata(unique_id)
if not metadata:
raise HTTPException(status_code=404, detail="File not found")
return {
"unique_id": metadata.unique_id,
"filename": metadata.filename,
"total_size": metadata.total_size,
"total_size_gb": f"{metadata.total_size / (1024**3):.2f}",
"parts": metadata.part_count,
"uploaded_at": metadata.uploaded_at,
"download_url": f"/dl/{unique_id}"
}
@app.delete("/delete/{unique_id}")
async def delete_file(unique_id: str):
"""Delete file and all its parts"""
if not session_manager or not database:
raise HTTPException(status_code=503, detail="Service not initialized")
# Get metadata
metadata = await database.get_file_metadata(unique_id)
if not metadata:
raise HTTPException(status_code=404, detail="File not found")
# Delete from Telegram (best effort)
deleted_parts = 0
for part in metadata.parts:
try:
await session_manager.delete_part(part["file_id"])
deleted_parts += 1
except Exception as e:
logger.warning(f"Failed to delete part {part['part_number']}: {str(e)}")
# Delete from database
await database.delete_file_metadata(unique_id)
logger.info(f"Deleted file: unique_id={unique_id}, parts={deleted_parts}")
return {
"success": True,
"unique_id": unique_id,
"deleted_parts": deleted_parts,
"total_parts": metadata.part_count
}
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
workers=1, # Single worker for shared session state
log_level="info"
)