ai / backend /app /routes /events_route.py
Ahmed Tarek
Add application file
61d9463
from fastapi import APIRouter, HTTPException
from typing import List
from fastapi.responses import JSONResponse
from backend.app.schemas.events import (
EventCreate,
EventUpdate
)
from backend.app.helper.dependencies import event_db , events_vector_db , embedding_model
router = APIRouter()
@router.post("/get/")
async def get_events(item_ids: List[str]):
try:
events = event_db.get_events(item_ids)
return JSONResponse(
status_code=200,
content={
"message": f"Found {len(events)} events",
"data": events,
}
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/add/")
async def add_events(items: List[EventCreate]):
try:
input_ids = {str(item.id) for item in items}
duplicates = event_db.get_existing_event_ids(input_ids)
valid_items = []
for item in items:
if str(item.id) not in duplicates:
valid_items.append(item.to_dict())
if not valid_items:
return JSONResponse(
status_code=400,
content={
"message": "No valid events to create",
}
)
created_items = event_db.create_events(valid_items)
events_vector_db.store_embeddings(valid_items, embedding_model)
return JSONResponse(
status_code=201,
content={
"message": f"Successfully created events",
"created_events": [item['id'] for item in created_items],
}
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.put("/update/")
async def update_events(items: List[EventUpdate]):
try:
input_ids = {str(item.id) for item in items}
existing_ids = event_db.get_existing_event_ids(input_ids)
valid_items = []
for item in items:
if str(item.id) in existing_ids:
valid_items.append(item.to_dict())
if not valid_items:
return JSONResponse(
status_code=400,
content={
"message": "No valid events to update",
}
)
updated_items = event_db.update_events(valid_items)
events_vector_db.update_embeddings(valid_items, embedding_model)
return JSONResponse(
status_code=200,
content={
"message": f"Successfully updated events",
"updated_events": [item['id'] for item in updated_items],
}
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/delete/")
async def delete_events(item_ids: List[str]):
try:
items_to_delete = event_db.get_events(item_ids)
valid_events = {item['id'] for item in items_to_delete}
if not valid_events:
return JSONResponse(
status_code=400,
content={
"message": "No valid events to delete",
}
)
deleted = event_db.delete_events(list(valid_events))
events_vector_db.delete_items(list(valid_events))
return JSONResponse(
status_code=200,
content={
"message": f"Successfully deleted events",
"deleted_events": list(valid_events),
}
)
except Exception as e:
raise HTTPException(status_code=404, detail=str(e))