gcharanteja
feat: Refactor SQLite server to support multi-project databases and update Express.js server
c3f4433 | import os | |
| import sqlite3 | |
| import json | |
| from pathlib import Path | |
| from fastapi import FastAPI, HTTPException, Query | |
| from pydantic import BaseModel | |
| from typing import List, Dict, Any | |
| import uvicorn | |
| app = FastAPI(title="SQLite Multi-Project Server") | |
| DB_DIR = "/data" | |
| def ensure_db_dir(): | |
| Path(DB_DIR).mkdir(parents=True, exist_ok=True) | |
| def get_db_path(project_name: str): | |
| # Sanitize project name | |
| project_name = project_name.replace("/", "_").replace("\\", "_") | |
| return os.path.join(DB_DIR, f"{project_name}.db") | |
| def create_db_if_not_exists(db_path: str): | |
| if not os.path.exists(db_path): | |
| conn = sqlite3.connect(db_path) | |
| conn.close() | |
| def get_connection(db_path: str): | |
| ensure_db_dir() | |
| create_db_if_not_exists(db_path) | |
| conn = sqlite3.connect(db_path) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| class QueryRequest(BaseModel): | |
| sql: str | |
| project: str = "default" | |
| class ExecuteRequest(BaseModel): | |
| sql: str | |
| project: str = "default" | |
| class ProjectRequest(BaseModel): | |
| name: str | |
| description: str = "" | |
| def heartbeat(): | |
| try: | |
| ensure_db_dir() | |
| return {"status": "ok", "message": "SQLite Multi-Project Server running"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def list_projects(): | |
| """List all projects (databases)""" | |
| try: | |
| ensure_db_dir() | |
| files = [f[:-3] for f in os.listdir(DB_DIR) if f.endswith('.db')] | |
| projects = [] | |
| for name in files: | |
| db_path = get_db_path(name) | |
| size = os.path.getsize(db_path) | |
| projects.append({"name": name, "size_bytes": size}) | |
| return {"projects": projects, "total": len(projects)} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def create_project(request: ProjectRequest): | |
| """Create a new project database""" | |
| try: | |
| db_path = get_db_path(request.name) | |
| if os.path.exists(db_path): | |
| raise HTTPException(status_code=400, detail=f"Project '{request.name}' already exists") | |
| create_db_if_not_exists(db_path) | |
| return { | |
| "status": "created", | |
| "project": request.name, | |
| "path": db_path, | |
| "description": request.description | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def delete_project(project_name: str): | |
| """Delete a project database""" | |
| try: | |
| db_path = get_db_path(project_name) | |
| if not os.path.exists(db_path): | |
| raise HTTPException(status_code=404, detail=f"Project '{project_name}' not found") | |
| os.remove(db_path) | |
| return {"status": "deleted", "project": project_name} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def query(request: QueryRequest): | |
| """Execute SELECT query on a project""" | |
| try: | |
| db_path = get_db_path(request.project) | |
| conn = get_connection(db_path) | |
| cursor = conn.execute(request.sql) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| return { | |
| "project": request.project, | |
| "data": [dict(row) for row in rows], | |
| "rows": len(rows) | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| def execute(request: ExecuteRequest): | |
| """Execute INSERT/UPDATE/DELETE on a project""" | |
| try: | |
| db_path = get_db_path(request.project) | |
| conn = get_connection(db_path) | |
| cursor = conn.execute(request.sql) | |
| conn.commit() | |
| changes = cursor.rowcount | |
| conn.close() | |
| return { | |
| "project": request.project, | |
| "rows_affected": changes, | |
| "status": "success" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| def list_tables(project: str = Query("default")): | |
| """List all tables in a project""" | |
| try: | |
| db_path = get_db_path(project) | |
| conn = get_connection(db_path) | |
| cursor = conn.execute( | |
| "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" | |
| ) | |
| tables = [row[0] for row in cursor.fetchall()] | |
| conn.close() | |
| return {"project": project, "tables": tables, "count": len(tables)} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| def get_schema(project: str = Query("default")): | |
| """Get database schema for a project""" | |
| try: | |
| db_path = get_db_path(project) | |
| conn = get_connection(db_path) | |
| cursor = conn.execute( | |
| "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" | |
| ) | |
| tables = [row[0] for row in cursor.fetchall()] | |
| schema = {} | |
| for table in tables: | |
| cursor = conn.execute(f"PRAGMA table_info({table})") | |
| schema[table] = [ | |
| {"name": col[1], "type": col[2], "notnull": col[3], "pk": col[5]} | |
| for col in cursor.fetchall() | |
| ] | |
| conn.close() | |
| return {"project": project, "schema": schema} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| ensure_db_dir() | |
| uvicorn.run(app, host="0.0.0.0", port=8000) | |