File size: 2,506 Bytes
f92858b
81ec8d6
f92858b
 
81ec8d6
 
d5684b3
 
 
0dbb03c
d5684b3
f92858b
d5684b3
 
f92858b
 
 
 
 
d5684b3
f92858b
 
81ec8d6
4761dd9
 
 
81ec8d6
 
a4ef0c7
0dbb03c
81ec8d6
 
 
 
0dbb03c
 
b905bd6
 
0dbb03c
 
 
81ec8d6
 
9c3303f
81ec8d6
 
 
 
 
 
 
 
 
 
a4ef0c7
 
 
 
 
 
 
4761dd9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from datetime import datetime
from typing import List
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship, Mapped, mapped_column
from sqlalchemy.sql import expression as sql
from sqlalchemy.ext.asyncio import AsyncSession

from app.model.base import BaseModel
from app.engine.postgresdb import Base
from app.schema.index import TransactionCreate


class Transaction(Base, BaseModel):
    __tablename__ = "transactions"
    transaction_date: Mapped[datetime]
    category: Mapped[str]
    name_description: Mapped[str]
    amount: Mapped[float]
    type: Mapped[str]

    user_id = mapped_column(ForeignKey("users.id"))
    user = relationship("User", back_populates="transactions")

    def __str__(self) -> str:
        return f"{self.transaction_date}, {self.category}, {self.name_description}, {self.amount}, {self.type}"

    @classmethod
    async def create(cls: "type[Transaction]", db: AsyncSession, **kwargs) -> "Transaction":
        query = sql.insert(cls).values(**kwargs)
        transactions = await db.execute(query)
        transaction = transactions.first()
        await db.commit()
        return transaction

    @classmethod
    async def bulk_create(cls: "type[Transaction]", db: AsyncSession, transactions: List[TransactionCreate]) -> None:
        values = [transaction.model_dump() for transaction in transactions]
        query = sql.insert(cls).values(values)
        await db.execute(query)
        await db.commit()

    @classmethod
    async def update(cls: "type[Transaction]", db: AsyncSession, id: int, **kwargs) -> "Transaction":
        query = sql.update(cls).where(cls.id == id).values(**kwargs).execution_options(synchronize_session="fetch")
        transactions = await db.scalars(query)
        transaction = transactions.first()
        await db.commit()
        return transaction

    @classmethod
    async def get_by_user(cls: "type[Transaction]", db: AsyncSession, user_id: int) -> "List[Transaction]":
        query = sql.select(cls).where(cls.user_id == user_id)
        transactions = await db.scalars(query)
        return transactions

    @classmethod
    async def get_by_user_between_dates(
        cls: "type[Transaction]", db: AsyncSession, user_id: int, start_date: datetime, end_date: datetime
    ) -> "List[Transaction]":
        query = sql.select(cls).where(cls.user_id == user_id).where(cls.transaction_date.between(start_date, end_date))
        transactions = await db.scalars(query)
        return transactions.all()