Spaces:
Sleeping
Sleeping
File size: 7,218 Bytes
90537f3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Request
from sqlalchemy.orm import Session
from typing import Optional, List
import os
import shutil
import csv
from datetime import datetime, timezone
from ..database import (
get_db, Settings, Hotel, switch_database, get_current_database,
get_session_db, get_session_current_database, set_session_hotel_context,
get_session_hotel_id, authenticate_hotel_session
)
from ..models.settings import Settings as SettingsModel, SettingsUpdate
from ..models.database_config import DatabaseEntry, DatabaseList, DatabaseSelectRequest, DatabaseSelectResponse
from ..middleware import get_session_id
router = APIRouter(
prefix="/settings",
tags=["settings"],
responses={404: {"description": "Not found"}},
)
# Dependency to get session-aware database
def get_session_database(request: Request):
session_id = get_session_id(request)
return next(get_session_db(session_id))
# Get available hotels from hotels.csv
@router.get("/hotels", response_model=DatabaseList)
def get_hotels():
try:
hotels = []
with open("hotels.csv", "r") as file:
reader = csv.DictReader(file)
for row in reader:
hotels.append(DatabaseEntry(
database_name=row["hotel_name"], # Using hotel_name instead of hotel_database
password=row["password"]
))
# Return only hotel names, not passwords
return {"databases": [{"database_name": hotel.database_name, "password": "********"} for hotel in hotels]}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading hotel configuration: {str(e)}")
# Legacy endpoint for backward compatibility
@router.get("/databases", response_model=DatabaseList)
def get_databases():
return get_hotels()
# Get current hotel info
@router.get("/current-hotel")
def get_current_hotel(request: Request):
session_id = get_session_id(request)
hotel_id = get_session_hotel_id(session_id)
if hotel_id:
# Get hotel name from database
db = next(get_session_database(request))
hotel = db.query(Hotel).filter(Hotel.id == hotel_id).first()
if hotel:
return {"hotel_name": hotel.hotel_name, "hotel_id": hotel.id}
return {"hotel_name": None, "hotel_id": None}
# Legacy endpoint for backward compatibility
@router.get("/current-database")
def get_current_db(request: Request):
session_id = get_session_id(request)
return {"database_name": get_session_current_database(session_id)}
# Switch hotel
@router.post("/switch-hotel", response_model=DatabaseSelectResponse)
def select_hotel(request_data: DatabaseSelectRequest, request: Request):
try:
session_id = get_session_id(request)
# Authenticate hotel using hotel_name and password
hotel_id = authenticate_hotel_session(request_data.database_name, request_data.password)
if hotel_id:
# Set hotel context for this session
success = set_session_hotel_context(session_id, hotel_id)
if success:
return {
"success": True,
"message": f"Successfully switched to hotel: {request_data.database_name}"
}
else:
raise HTTPException(status_code=500, detail="Failed to set hotel context")
else:
raise HTTPException(status_code=401, detail="Invalid hotel credentials")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error switching hotel: {str(e)}")
# Legacy endpoint for backward compatibility
@router.post("/switch-database", response_model=DatabaseSelectResponse)
def select_database(request_data: DatabaseSelectRequest, request: Request):
return select_hotel(request_data, request)
# Get hotel settings
@router.get("/", response_model=SettingsModel)
def get_settings(request: Request, db: Session = Depends(get_session_database)):
session_id = get_session_id(request)
hotel_id = get_session_hotel_id(session_id)
if not hotel_id:
raise HTTPException(status_code=400, detail="No hotel context set")
# Get settings for the current hotel
settings = db.query(Settings).filter(Settings.hotel_id == hotel_id).first()
if not settings:
# Get hotel info for default settings
hotel = db.query(Hotel).filter(Hotel.id == hotel_id).first()
if not hotel:
raise HTTPException(status_code=404, detail="Hotel not found")
# Create default settings for this hotel
settings = Settings(
hotel_id=hotel_id,
hotel_name=hotel.hotel_name,
address="123 Main Street, City",
contact_number="+1 123-456-7890",
email="info@tabblehotel.com",
)
db.add(settings)
db.commit()
db.refresh(settings)
return settings
# Update hotel settings
@router.put("/", response_model=SettingsModel)
async def update_settings(
request: Request,
hotel_name: str = Form(...),
address: Optional[str] = Form(None),
contact_number: Optional[str] = Form(None),
email: Optional[str] = Form(None),
tax_id: Optional[str] = Form(None),
logo: Optional[UploadFile] = File(None),
db: Session = Depends(get_session_database)
):
session_id = get_session_id(request)
hotel_id = get_session_hotel_id(session_id)
if not hotel_id:
raise HTTPException(status_code=400, detail="No hotel context set")
# Get existing settings for this hotel or create new
settings = db.query(Settings).filter(Settings.hotel_id == hotel_id).first()
if not settings:
settings = Settings(
hotel_id=hotel_id,
hotel_name=hotel_name,
address=address,
contact_number=contact_number,
email=email,
tax_id=tax_id,
)
db.add(settings)
else:
# Update fields
settings.hotel_name = hotel_name
settings.address = address
settings.contact_number = contact_number
settings.email = email
settings.tax_id = tax_id
# Handle logo upload if provided
if logo:
# Get hotel info for organizing logos
hotel = db.query(Hotel).filter(Hotel.id == hotel_id).first()
hotel_name_for_path = hotel.hotel_name if hotel else f"hotel_{hotel_id}"
# Create directory structure: app/static/images/logo/{hotel_name}
hotel_logo_dir = f"app/static/images/logo/{hotel_name_for_path}"
os.makedirs(hotel_logo_dir, exist_ok=True)
# Save logo with hotel-specific path
logo_path = f"{hotel_logo_dir}/hotel_logo_{logo.filename}"
with open(logo_path, "wb") as buffer:
shutil.copyfileobj(logo.file, buffer)
# Update settings with logo path (URL path for serving)
settings.logo_path = f"/static/images/logo/{hotel_name_for_path}/hotel_logo_{logo.filename}"
# Update timestamp
settings.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(settings)
return settings
|