from fastapi import FastAPI, Request, UploadFile, File,HTTPException from fastapi.responses import HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from ultralytics import YOLO import aiofiles import os from pathlib import Path import json from jinja2 import Environment, FileSystemLoader import uvicorn from fastapi.middleware.cors import CORSMiddleware # Import CORS middleware # add the fuzzy import requests #extract function from the video_processing path: from video_processing.video_frames_opt import extract_frames_and_detect_objects, lik, lik_prediction from video_processing.video_frames_new import extract_frames_and_detect_new_objects import pandas as pd import numpy as np import cv2 import logging import os from dotenv import load_dotenv import boto3 from dotenv import dotenv_values # Configure logging to write to a file logging.basicConfig(filename='uvicorn.log', level=logging.DEBUG) # Create a JSON file with tracking results # Define the Jinja2 environment jinja_env = Environment(loader=FileSystemLoader("templates")) # Set the environment variable os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" # Initialize FastAPI app = FastAPI() # CORS policy origins = ["*"] # Allow requests from any origin app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], ) # Serve static files app.mount("/static", StaticFiles(directory="static"), name="static") # Templates directory templates = Jinja2Templates(directory="templates") # Load the exported ONNX model #onnx_model = YOLO('yolov8n.onnx') # Directory to save uploaded videos video_directory = Path("uploaded_videos") # Directory to save the new videos: video_directory_new = Path("uploaded_video_new") # Chunk size for reading video files CHUNK_SIZE = 1024 * 1024 # 1 MB ################################################################# # define new function that upload the files from s3 bucket # the part to load the env information # Load environment variables from .env import botocore # Initialize the S3 client with anonymous access s3 = boto3.client('s3', config=botocore.client.Config(signature_version=botocore.UNSIGNED)) @app.get("/download_s3_file/") async def download_and_process_s3_file(s3_url: str): try: # Parse the S3 URL to extract bucket name and object key bucket_name, object_key = s3_url.replace("s3://", "").split("/", 1) # Construct the public URL for downloading the S3 object public_url = f"https://{bucket_name}.s3.amazonaws.com/{object_key}" response = requests.get(public_url) file_name = object_key.split("/")[-1] # Extract the original filename file_path = os.path.join("uploaded_video", file_name) with open(file_path, "wb") as f: f.write(response.content) if file_path: results = extract_frames_and_detect_objects(file_path) json_data = lik(results) processed_results = lik_prediction(json_data) # Create a JSON file with tracking results video_name = os.path.basename(file_path) json_results_path = f"uploaded_videos/{video_name}.json" with open(json_results_path, "w") as json_file: json_file.write(processed_results) return processed_results else: return {"message": "File download failed"} except (requests.exceptions.RequestException, botocore.exceptions.ClientError) as e: raise HTTPException(status_code=400, detail=f"Error processing video: {str(e)}") # now get the access aws s3 bucket AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY") AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY") AWS_S3_BUCKET_NAME = os.environ.get("AWS_S3_BUCKET_NAME") FOLDER_NAME = os.environ.get("FOLDER_NAME") # print the data access loading print(FOLDER_NAME) print('_url_upload_video_s3_') def download_file_from_s3(bucket_name, access_key, secret_key, folder_name, name_for_s3): s3_client = boto3.client( service_name='s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key ) try: # Define the path where the downloaded file will be saved file_path = f"uploaded_video/{name_for_s3}" # Check if the file already exists if os.path.exists(file_path): return file_path s3_source_path = f"{FOLDER_NAME}/{name_for_s3}" s3_client.download_file(bucket_name, s3_source_path, file_path) return file_path except Exception as e: print(f"Error downloading file from S3: {e}") return None # Define a new endpoint to download files from S3, process them, and save JSON tracking results with changed URL names @app.get("/process_s3_save/{file_name}") async def process_and_save_file(file_name: str): video_file_path = download_file_from_s3(AWS_S3_BUCKET_NAME, AWS_ACCESS_KEY, AWS_SECRET_KEY, FOLDER_NAME, file_name) if video_file_path: results = extract_frames_and_detect_objects(video_file_path) json_data = lik(results) processed_results = lik_prediction(json_data) # Create a JSON file with tracking results video_name = os.path.basename(video_file_path) json_results_path = f"uploaded_videos/{video_name}.json" with open(json_results_path, "w") as json_file: json_file.write(processed_results) return processed_results else: return {"message": "File download failed"} ################################################################# @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.get("/upload/image", response_class=HTMLResponse) async def upload_image_form(request: Request): return templates.TemplateResponse("upload_image.html", {"request": request}) @app.post("/upload/video", response_class=HTMLResponse) async def upload_video(video: UploadFile = File(...)): # Ensure the directory exists, create it if it doesn't video_directory.mkdir(parents=True, exist_ok=True) # Define the path where the video will be saved video_path = video_directory / video.filename # Open the video file in binary write mode asynchronously and write the video data async with aiofiles.open(video_path, "wb") as buffer: while True: # Read the video data in chunks asynchronously chunk = await video.read(CHUNK_SIZE) if not chunk: break # Write the chunk to the file await buffer.write(chunk) results = extract_frames_and_detect_objects(str(video_path)) ###################################################### json_date = lik(results) ##################################################### # Process the prediction processed_results = lik_prediction(json_date) ###################################################### # Create a JSON file with tracking results json_results_path = video_directory / f"{video.filename}.json" open_path = str(json_results_path) with open(open_path, "w") as json_file: json_file.write(processed_results) # Return the processed JSON data return processed_results # upload video and see the results: @app.get("/upload/video_new", response_class=HTMLResponse) async def upload_video_new(video: UploadFile = File(...)): # Ensure the directory exists, create it if it doesn't video_directory_new.mkdir(parents=True, exist_ok=True) # Define the path where the video will be saved video_path = video_directory_new / video.filename # Open the video file in binary write mode asynchronously and write the video data async with aiofiles.open(video_path, "wb") as buffer: while True: # Read the video data in chunks asynchronously chunk = await video.read(CHUNK_SIZE) if not chunk: break # Write the chunk to the file await buffer.write(chunk) results = extract_frames_and_detect_new_objects(str(video_path)) return results if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)