Spaces:
Sleeping
Sleeping
from fastapi import FastAPI | |
from fastapi.responses import JSONResponse | |
from fastapi import HTTPException | |
import uvicorn | |
from demos.cam_service_example.start_frame_capturing import ImageCaptureService | |
from demos.cam_service_example.stop_frame_capturing import StopImageCaptureServiceExample | |
from demos.image_load_service_example.start_image_load_example import StartImageLoadExample | |
from demos.image_load_service_example.stop_image_load_example import StopImageLoadServiceExample | |
app = FastAPI() | |
def start_ipcam(): | |
flag_path = "resources/flag" | |
source = 'rtsp://ishwor:subedi@192.168.1.106:5555/h264_opus.sdp' | |
image_path_to_save = "images/cam_images" | |
image_hash_threshold = 5 | |
image_capture_start_example = ImageCaptureService(flag_path, source, image_path_to_save, | |
image_hash_threshold) | |
image_capture_start_example.start_service() | |
def stop_ipcam(): | |
flag_path = "resources/flag" | |
image_capture_stop_example = StopImageCaptureServiceExample(flag_path) | |
image_capture_stop_example.stop_load_image_example() | |
def start_detection(): | |
flag_path = "resources/flag_load_image" | |
image_dir_path = "images/cam_images" | |
start_load_image_example = StartImageLoadExample(flag_path, image_dir_path, model_path="resources/model/v1/best.pt") | |
start_load_image_example.start_service() | |
def stop_detection(): | |
flag_path = "resources/flag_load_image" | |
stop_load_image_example = StopImageLoadServiceExample(flag_path) | |
stop_load_image_example.stop_service() | |
# FastAPI Endpoints | |
def start_ipcam_server_api(): | |
try: | |
start_ipcam() | |
return JSONResponse(content={"message": "IP Cam Server started!"}, status_code=200) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def stop_ipcam_server_api(): | |
try: | |
stop_ipcam() | |
return JSONResponse(content={"message": "IP Cam Server stopped!"}, status_code=200) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def start_detection_service_api(): | |
try: | |
start_detection() | |
return JSONResponse(content={"message": "Image loading Server started!"}, status_code=200) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def stop_detection_service_api(): | |
try: | |
stop_detection() | |
return JSONResponse(content={"message": "Image loading Server stopped!"}, status_code=200) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
if __name__ == "__main__": | |
uvicorn.run(app, host="localhost", port=8001) | |