Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, Depends, HTTPException, status, Request | |
from sqlalchemy.orm import Session | |
from typing import List | |
from datetime import datetime, timezone | |
from ..database import get_db, LoyaltyProgram as LoyaltyProgramModel, get_session_db, get_hotel_id_from_request | |
from ..models.loyalty import LoyaltyProgram, LoyaltyProgramCreate, LoyaltyProgramUpdate | |
from ..middleware import get_session_id | |
router = APIRouter( | |
prefix="/loyalty", | |
tags=["loyalty"], | |
responses={404: {"description": "Not found"}}, | |
) | |
# Dependency to get session-aware database | |
def get_session_database(request: Request): | |
session_id = get_session_id(request) | |
return next(get_session_db(session_id)) | |
# Get all loyalty program tiers | |
def get_all_loyalty_tiers(request: Request, db: Session = Depends(get_session_database)): | |
hotel_id = get_hotel_id_from_request(request) | |
return db.query(LoyaltyProgramModel).filter(LoyaltyProgramModel.hotel_id == hotel_id).order_by(LoyaltyProgramModel.visit_count).all() | |
# Get active loyalty program tiers | |
def get_active_loyalty_tiers(request: Request, db: Session = Depends(get_session_database)): | |
hotel_id = get_hotel_id_from_request(request) | |
return ( | |
db.query(LoyaltyProgramModel) | |
.filter( | |
LoyaltyProgramModel.hotel_id == hotel_id, | |
LoyaltyProgramModel.is_active == True | |
) | |
.order_by(LoyaltyProgramModel.visit_count) | |
.all() | |
) | |
# Get loyalty tier by ID | |
def get_loyalty_tier(tier_id: int, request: Request, db: Session = Depends(get_session_database)): | |
db_tier = ( | |
db.query(LoyaltyProgramModel).filter(LoyaltyProgramModel.id == tier_id).first() | |
) | |
if not db_tier: | |
raise HTTPException(status_code=404, detail="Loyalty tier not found") | |
return db_tier | |
# Create new loyalty tier | |
def create_loyalty_tier(tier: LoyaltyProgramCreate, request: Request, db: Session = Depends(get_session_database)): | |
hotel_id = get_hotel_id_from_request(request) | |
# Check if a tier with this visit count already exists for this hotel | |
existing_tier = ( | |
db.query(LoyaltyProgramModel) | |
.filter( | |
LoyaltyProgramModel.hotel_id == hotel_id, | |
LoyaltyProgramModel.visit_count == tier.visit_count | |
) | |
.first() | |
) | |
if existing_tier: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Loyalty tier with visit count {tier.visit_count} already exists", | |
) | |
# Create new tier | |
db_tier = LoyaltyProgramModel( | |
hotel_id=hotel_id, | |
visit_count=tier.visit_count, | |
discount_percentage=tier.discount_percentage, | |
is_active=tier.is_active, | |
created_at=datetime.now(timezone.utc), | |
updated_at=datetime.now(timezone.utc), | |
) | |
db.add(db_tier) | |
db.commit() | |
db.refresh(db_tier) | |
return db_tier | |
# Update loyalty tier | |
def update_loyalty_tier( | |
tier_id: int, tier_update: LoyaltyProgramUpdate, request: Request, db: Session = Depends(get_session_database) | |
): | |
db_tier = ( | |
db.query(LoyaltyProgramModel).filter(LoyaltyProgramModel.id == tier_id).first() | |
) | |
if not db_tier: | |
raise HTTPException(status_code=404, detail="Loyalty tier not found") | |
# Check if updating visit count and if it already exists | |
if ( | |
tier_update.visit_count is not None | |
and tier_update.visit_count != db_tier.visit_count | |
): | |
existing_tier = ( | |
db.query(LoyaltyProgramModel) | |
.filter( | |
LoyaltyProgramModel.visit_count == tier_update.visit_count, | |
LoyaltyProgramModel.id != tier_id, | |
) | |
.first() | |
) | |
if existing_tier: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Loyalty tier with visit count {tier_update.visit_count} already exists", | |
) | |
db_tier.visit_count = tier_update.visit_count | |
# Update other fields if provided | |
if tier_update.discount_percentage is not None: | |
db_tier.discount_percentage = tier_update.discount_percentage | |
if tier_update.is_active is not None: | |
db_tier.is_active = tier_update.is_active | |
db_tier.updated_at = datetime.now(timezone.utc) | |
db.commit() | |
db.refresh(db_tier) | |
return db_tier | |
# Delete loyalty tier | |
def delete_loyalty_tier(tier_id: int, request: Request, db: Session = Depends(get_session_database)): | |
db_tier = ( | |
db.query(LoyaltyProgramModel).filter(LoyaltyProgramModel.id == tier_id).first() | |
) | |
if not db_tier: | |
raise HTTPException(status_code=404, detail="Loyalty tier not found") | |
db.delete(db_tier) | |
db.commit() | |
return {"message": "Loyalty tier deleted successfully"} | |
# Get applicable discount for a visit count | |
def get_discount_for_visit_count(visit_count: int, request: Request, db: Session = Depends(get_session_database)): | |
hotel_id = get_hotel_id_from_request(request) | |
# Find the tier that exactly matches the visit count for this hotel | |
applicable_tier = ( | |
db.query(LoyaltyProgramModel) | |
.filter( | |
LoyaltyProgramModel.hotel_id == hotel_id, | |
LoyaltyProgramModel.visit_count == visit_count, | |
LoyaltyProgramModel.is_active == True, | |
) | |
.first() | |
) | |
if not applicable_tier: | |
return {"discount_percentage": 0, "message": "No applicable loyalty discount"} | |
return { | |
"discount_percentage": applicable_tier.discount_percentage, | |
"tier_id": applicable_tier.id, | |
"visit_count": applicable_tier.visit_count, | |
"message": f"Loyalty discount of {applicable_tier.discount_percentage}% applied for {visit_count} visits", | |
} | |