|
|
from fastapi import FastAPI, Request, Form, Depends, HTTPException |
|
|
from fastapi.templating import Jinja2Templates |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from fastapi.responses import HTMLResponse, RedirectResponse |
|
|
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, Text |
|
|
from sqlalchemy.ext.declarative import declarative_base |
|
|
from sqlalchemy.orm import sessionmaker, Session |
|
|
from datetime import datetime |
|
|
import random |
|
|
import string |
|
|
import re |
|
|
import os |
|
|
import bcrypt |
|
|
from typing import Optional |
|
|
|
|
|
|
|
|
BASE_URL = "https://triflix-notepad.hf.space" |
|
|
|
|
|
|
|
|
app = FastAPI(title="Notepad App") |
|
|
|
|
|
|
|
|
templates_dir = os.path.join(os.path.dirname(__file__), "templates") |
|
|
templates = Jinja2Templates(directory=templates_dir) |
|
|
|
|
|
|
|
|
static_dir = os.path.join(os.path.dirname(__file__), "static") |
|
|
os.makedirs(static_dir, exist_ok=True) |
|
|
app.mount("/static", StaticFiles(directory=static_dir), name="static") |
|
|
|
|
|
|
|
|
DATABASE_URL = "sqlite:///./notepad.db" |
|
|
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread": False}) |
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) |
|
|
Base = declarative_base() |
|
|
|
|
|
|
|
|
class Note(Base): |
|
|
__tablename__ = "notes" |
|
|
|
|
|
id = Column(Integer, primary_key=True, index=True) |
|
|
code = Column(String, unique=True, index=True) |
|
|
content = Column(Text) |
|
|
is_private = Column(Boolean, default=False) |
|
|
password = Column(String, nullable=True) |
|
|
created_at = Column(DateTime, default=datetime.utcnow) |
|
|
|
|
|
|
|
|
Base.metadata.create_all(bind=engine) |
|
|
|
|
|
|
|
|
def get_db(): |
|
|
db = SessionLocal() |
|
|
try: |
|
|
yield db |
|
|
finally: |
|
|
db.close() |
|
|
|
|
|
|
|
|
def generate_random_code(length=6): |
|
|
characters = string.ascii_letters + string.digits |
|
|
return ''.join(random.choice(characters) for _ in range(length)) |
|
|
|
|
|
|
|
|
def process_content(content): |
|
|
|
|
|
image_pattern = r'(https?://\S+\.(?:jpg|jpeg|png|gif|webp|svg))' |
|
|
|
|
|
|
|
|
processed_content = re.sub( |
|
|
image_pattern, |
|
|
r'<img src="\1" class="my-2 max-w-full h-auto rounded" alt="Image">', |
|
|
content |
|
|
) |
|
|
|
|
|
|
|
|
processed_content = processed_content.replace('\n', '<br>') |
|
|
|
|
|
return processed_content |
|
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def home(request: Request): |
|
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
|
|
|
|
|
@app.post("/create", response_class=HTMLResponse) |
|
|
async def create_note( |
|
|
request: Request, |
|
|
content: str = Form(...), |
|
|
is_private: bool = Form(False), |
|
|
password: Optional[str] = Form(None), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
|
|
|
code = generate_random_code() |
|
|
while db.query(Note).filter(Note.code == code).first(): |
|
|
code = generate_random_code() |
|
|
|
|
|
|
|
|
hashed_password = None |
|
|
if is_private and password: |
|
|
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') |
|
|
|
|
|
|
|
|
new_note = Note( |
|
|
code=code, |
|
|
content=content, |
|
|
is_private=is_private, |
|
|
password=hashed_password |
|
|
) |
|
|
|
|
|
db.add(new_note) |
|
|
db.commit() |
|
|
|
|
|
|
|
|
return RedirectResponse(url=f"{BASE_URL}/{code}", status_code=303) |
|
|
|
|
|
|
|
|
@app.get("/{code}", response_class=HTMLResponse) |
|
|
async def view_note(request: Request, code: str, db: Session = Depends(get_db)): |
|
|
|
|
|
note = db.query(Note).filter(Note.code == code).first() |
|
|
|
|
|
if not note: |
|
|
raise HTTPException(status_code=404, detail="Note not found") |
|
|
|
|
|
|
|
|
if note.is_private: |
|
|
return templates.TemplateResponse( |
|
|
"password.html", |
|
|
{"request": request, "code": code} |
|
|
) |
|
|
|
|
|
|
|
|
processed_content = process_content(note.content) |
|
|
|
|
|
return templates.TemplateResponse( |
|
|
"view.html", |
|
|
{"request": request, "note": note, "content": processed_content} |
|
|
) |
|
|
|
|
|
|
|
|
@app.post("/{code}/verify", response_class=HTMLResponse) |
|
|
async def verify_password( |
|
|
request: Request, |
|
|
code: str, |
|
|
password: str = Form(...), |
|
|
db: Session = Depends(get_db) |
|
|
): |
|
|
|
|
|
note = db.query(Note).filter(Note.code == code).first() |
|
|
|
|
|
if not note: |
|
|
raise HTTPException(status_code=404, detail="Note not found") |
|
|
|
|
|
|
|
|
if not note.is_private or not note.password: |
|
|
return RedirectResponse(url=f"{BASE_URL}/{code}", status_code=303) |
|
|
|
|
|
is_valid = bcrypt.checkpw(password.encode('utf-8'), note.password.encode('utf-8')) |
|
|
|
|
|
if not is_valid: |
|
|
return templates.TemplateResponse( |
|
|
"password.html", |
|
|
{"request": request, "code": code, "error": "Invalid password"} |
|
|
) |
|
|
|
|
|
|
|
|
processed_content = process_content(note.content) |
|
|
|
|
|
return templates.TemplateResponse( |
|
|
"view.html", |
|
|
{"request": request, "note": note, "content": processed_content} |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
|
|
|
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |
|
|
|