kaggle-utils / main.py
hahunavth
add cron server
c3ece9d
raw
history blame contribute delete
No virus
3.16 kB
from typing import Annotated
from apscheduler.schedulers.background import BackgroundScheduler
from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import RequestValidationError
from starlette.middleware.cors import CORSMiddleware
from fastapi import FastAPI, Header, UploadFile, Depends, HTTPException, status
import base64
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from starlette.responses import JSONResponse
from collections import defaultdict
from pydantic import BaseModel
from threading import Lock
from logger import get_now
from run import main as run_main
START_AT = get_now()
app = FastAPI()
lock = Lock()
n_run = 0
last_run = None
is_running=False
def scheduled_job():
with lock:
global is_running
if is_running:
return False
is_running = True
print("Job is running!")
run_main()
with lock:
global n_run
n_run = n_run + 1
global last_run
last_run = get_now()
is_running = False
return True
# Create a scheduler
scheduler = BackgroundScheduler()
# Add the scheduled job to the scheduler
scheduler.add_job(scheduled_job, 'interval', minutes=30)
# Start the scheduler
scheduler.start()
# You can also stop the scheduler when the FastAPI application shuts down
@app.on_event("shutdown")
def shutdown_event():
scheduler.shutdown()
class BaseResponse(BaseModel):
status: int = 1
message: str = ""
result: object = None
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc: HTTPException):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=jsonable_encoder(BaseResponse(status=0, message=exc.detail))
)
@app.exception_handler(RequestValidationError)
def validation_exception_handler(request, exc: RequestValidationError) -> JSONResponse:
reformatted_message = defaultdict(list)
for pydantic_error in exc.errors():
loc, msg = pydantic_error["loc"], pydantic_error["msg"]
filtered_loc = loc[1:] if loc[0] in ("body", "query", "path") else loc
field_string = ".".join(filtered_loc)
reformatted_message[field_string].append(msg)
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=jsonable_encoder(BaseResponse(status=0, message="Invalid request", result=reformatted_message))
)
@app.get("/status", response_model=BaseResponse)
def status():
return BaseResponse(result={
"start_at": START_AT,
"current": get_now(),
"n_runs": n_run,
"last_run": last_run,
})
@app.get("/run")
def run_once():
print("Running the job once.")
success = scheduled_job() # Manually trigger the job
if not success:
return BaseResponse(message="Job is running, not start a new job")
return BaseResponse(message="Job executed once.")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)