from datetime import datetime from typing import List from sqlalchemy import ForeignKey from sqlalchemy.orm import relationship, Mapped, mapped_column from sqlalchemy.sql import expression as sql from sqlalchemy.ext.asyncio import AsyncSession from app.model.base import BaseModel from app.engine.postgresdb import Base from app.schema.index import TransactionCreate class Transaction(Base, BaseModel): __tablename__ = "transactions" transaction_date: Mapped[datetime] category: Mapped[str] name_description: Mapped[str] amount: Mapped[float] type: Mapped[str] user_id = mapped_column(ForeignKey("users.id")) user = relationship("User", back_populates="transactions") def __str__(self) -> str: return f"{self.transaction_date}, {self.category}, {self.name_description}, {self.amount}, {self.type}" @classmethod async def create(cls: "type[Transaction]", db: AsyncSession, **kwargs) -> "Transaction": query = sql.insert(cls).values(**kwargs) transactions = await db.execute(query) transaction = transactions.first() await db.commit() return transaction @classmethod async def bulk_create(cls: "type[Transaction]", db: AsyncSession, transactions: List[TransactionCreate]) -> None: values = [transaction.model_dump() for transaction in transactions] query = sql.insert(cls).values(values) await db.execute(query) await db.commit() @classmethod async def update(cls: "type[Transaction]", db: AsyncSession, id: int, **kwargs) -> "Transaction": query = sql.update(cls).where(cls.id == id).values(**kwargs).execution_options(synchronize_session="fetch") transactions = await db.scalars(query) transaction = transactions.first() await db.commit() return transaction @classmethod async def get_by_user(cls: "type[Transaction]", db: AsyncSession, user_id: int) -> "List[Transaction]": query = sql.select(cls).where(cls.user_id == user_id) transactions = await db.scalars(query) return transactions @classmethod async def get_by_user_between_dates( cls: "type[Transaction]", db: AsyncSession, user_id: int, start_date: datetime, end_date: datetime ) -> "List[Transaction]": query = sql.select(cls).where(cls.user_id == user_id).where(cls.transaction_date.between(start_date, end_date)) transactions = await db.scalars(query) return transactions.all()