backend / app /model /transaction.py
praneethys's picture
fix income statement endpoints (#16)
4761dd9 verified
raw
history blame
No virus
2.51 kB
from datetime import datetime
from typing import List
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship, Mapped, mapped_column
from sqlalchemy.sql import expression as sql
from sqlalchemy.ext.asyncio import AsyncSession
from app.model.base import BaseModel
from app.engine.postgresdb import Base
from app.schema.index import TransactionCreate
class Transaction(Base, BaseModel):
__tablename__ = "transactions"
transaction_date: Mapped[datetime]
category: Mapped[str]
name_description: Mapped[str]
amount: Mapped[float]
type: Mapped[str]
user_id = mapped_column(ForeignKey("users.id"))
user = relationship("User", back_populates="transactions")
def __str__(self) -> str:
return f"{self.transaction_date}, {self.category}, {self.name_description}, {self.amount}, {self.type}"
@classmethod
async def create(cls: "type[Transaction]", db: AsyncSession, **kwargs) -> "Transaction":
query = sql.insert(cls).values(**kwargs)
transactions = await db.execute(query)
transaction = transactions.first()
await db.commit()
return transaction
@classmethod
async def bulk_create(cls: "type[Transaction]", db: AsyncSession, transactions: List[TransactionCreate]) -> None:
values = [transaction.model_dump() for transaction in transactions]
query = sql.insert(cls).values(values)
await db.execute(query)
await db.commit()
@classmethod
async def update(cls: "type[Transaction]", db: AsyncSession, id: int, **kwargs) -> "Transaction":
query = sql.update(cls).where(cls.id == id).values(**kwargs).execution_options(synchronize_session="fetch")
transactions = await db.scalars(query)
transaction = transactions.first()
await db.commit()
return transaction
@classmethod
async def get_by_user(cls: "type[Transaction]", db: AsyncSession, user_id: int) -> "List[Transaction]":
query = sql.select(cls).where(cls.user_id == user_id)
transactions = await db.scalars(query)
return transactions
@classmethod
async def get_by_user_between_dates(
cls: "type[Transaction]", db: AsyncSession, user_id: int, start_date: datetime, end_date: datetime
) -> "List[Transaction]":
query = sql.select(cls).where(cls.user_id == user_id).where(cls.transaction_date.between(start_date, end_date))
transactions = await db.scalars(query)
return transactions.all()