File size: 3,455 Bytes
8b50706
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
from pydantic import BaseModel
from peewee import *
from playhouse.shortcuts import model_to_dict
from typing import List, Union, Optional
import time

from utils.utils import decode_token
from utils.misc import get_gravatar_url

from apps.web.internal.db import DB

import json

####################
# Modelfile DB Schema
####################


class Modelfile(Model):
    tag_name = CharField(unique=True)
    user_id = CharField()
    modelfile = TextField()
    timestamp = BigIntegerField()

    class Meta:
        database = DB


class ModelfileModel(BaseModel):
    tag_name: str
    user_id: str
    modelfile: str
    timestamp: int  # timestamp in epoch


####################
# Forms
####################


class ModelfileForm(BaseModel):
    modelfile: dict


class ModelfileTagNameForm(BaseModel):
    tag_name: str


class ModelfileUpdateForm(ModelfileForm, ModelfileTagNameForm):
    pass


class ModelfileResponse(BaseModel):
    tag_name: str
    user_id: str
    modelfile: dict
    timestamp: int  # timestamp in epoch


class ModelfilesTable:

    def __init__(self, db):
        self.db = db
        self.db.create_tables([Modelfile])

    def insert_new_modelfile(
        self, user_id: str, form_data: ModelfileForm
    ) -> Optional[ModelfileModel]:
        if "tagName" in form_data.modelfile:
            modelfile = ModelfileModel(
                **{
                    "user_id": user_id,
                    "tag_name": form_data.modelfile["tagName"],
                    "modelfile": json.dumps(form_data.modelfile),
                    "timestamp": int(time.time()),
                }
            )

            try:
                result = Modelfile.create(**modelfile.model_dump())
                if result:
                    return modelfile
                else:
                    return None
            except:
                return None

        else:
            return None

    def get_modelfile_by_tag_name(self, tag_name: str) -> Optional[ModelfileModel]:
        try:
            modelfile = Modelfile.get(Modelfile.tag_name == tag_name)
            return ModelfileModel(**model_to_dict(modelfile))
        except:
            return None

    def get_modelfiles(self, skip: int = 0, limit: int = 50) -> List[ModelfileResponse]:
        return [
            ModelfileResponse(
                **{
                    **model_to_dict(modelfile),
                    "modelfile": json.loads(modelfile.modelfile),
                }
            )
            for modelfile in Modelfile.select()
            # .limit(limit).offset(skip)
        ]

    def update_modelfile_by_tag_name(
        self, tag_name: str, modelfile: dict
    ) -> Optional[ModelfileModel]:
        try:
            query = Modelfile.update(
                modelfile=json.dumps(modelfile),
                timestamp=int(time.time()),
            ).where(Modelfile.tag_name == tag_name)

            query.execute()

            modelfile = Modelfile.get(Modelfile.tag_name == tag_name)
            return ModelfileModel(**model_to_dict(modelfile))
        except:
            return None

    def delete_modelfile_by_tag_name(self, tag_name: str) -> bool:
        try:
            query = Modelfile.delete().where((Modelfile.tag_name == tag_name))
            query.execute()  # Remove the rows, return number of rows removed.

            return True
        except:
            return False


Modelfiles = ModelfilesTable(DB)