File size: 5,003 Bytes
fa59b8b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from typing import Union,Optional
from fastapi import FastAPI,Header,HTTPException
from pydantic import BaseModel
from typing_extensions import Annotated
from sqlmodel import SQLModel,create_engine,Field,Session,update

app = FastAPI()
#postgressql://<username>:<password>@localhost/fastapi, localhost -> ip addrress/domain name
#create user username with encryption password 'password'
# DATABASE_URL = 'postgresql://postgres:admin@localhost:5433/fastapi'
# engine = create_engine(DATABASE_URL)
DATABASE_URL = "sqlite:///./sql_app.db"
engine = create_engine(DATABASE_URL, connect_args={"check_same_thread":False})
# connect_args={"check_same_thread":False} only in sqlite not in postgres
def creat_db_tables():
    SQLModel.metadata.create_all(engine)

class Users(SQLModel, table=True):
    id : Optional[int] = Field(default=None,primary_key = True)
    name : str
    city : str
    email : str

users = {
    1 : {'id':1,
        "name":"John",
        "city":"Gandhinagar",
        "email":"john@gmail.com"},
        
    2 : {'id':2,
        "name":"Emili",
        "city":"Surat",
        "email":"emili@gmail.com"},
    
    3 : {'id':3,
        "name":"Miya",
        "city":"Bombay",
        "email":"miya@gmail.com"}
}

class User(BaseModel):
    id : int
    name : str
    city : str
    email : str

class UserUpdate(BaseModel):
    name : str
    city : str
    email : str

@app.on_event("startup")
def on_startup():
    creat_db_tables()

@app.get("/") # @ - decorator , "get('/hello')"--> http://localhost:8000/hello
def index(): # index is arbitary
    return {"Message" : "Hello World"}

@app.get("/test") # @ - decorator , "get('/test')"--> http://localhost:8000/test
def test():
    a = 5
    return {"Message" : "Testing World",
            "Value" : a}

@app.get("/users", status_code=200)
def get_users(x_api_key: Annotated[Union[str,None], Header()], city:str=None): # Annotated|str
    # if city is None or city == "":
    #     return {"message": "Users List", "data": users, "header" : x_api_key}
    # else:
    #     list = [x for x in users.values() if x.get('city').lower()==city.lower()]
    #     return {"message": "Users List", "data": list, "filter":city, "header": x_api_key}
    with Session(engine) as session:
        users = session.query(Users).all() # select * from users
        print(f'users list {users}')
        return {"message": "Users list", "data":users,"header":x_api_key}


# @app.get("/users/city")
# def get_users_by_filter(city : str = None):
#     list = [x for x in users.values() if x.get('city').lower()==city.lower()]
#     return {"message": "Users List", "data": list, "filter":city}

@app.get("/users/{user_id}") # GET baseURL/users/1
def get_user_by_id(user_id:int):
    with Session(engine) as session:
        user = session.query(Users).filter(User.id == user_id).one_or_none() 
    # return {"message": "Users List", "data": users[user_id]}
        return {"message": "Users Details", "data": user}

@app.post("/users",status_code=201)
# def create_user(user : User):
#     # users[user.id] = user
#     users.update({user.id: dict(user)})
#     return {"message" : "New User Added", "Users": users}
def create_user(user:Users):
    with Session(engine) as session:
        session.add(user)
        session.commit()
        session.refresh(user)
        return {"message":"New User", "data": user}

@app.put("/users/{user_id}")
# def update_user(user_id:int, user:UserUpdate):
#     if user_id in users.keys():
#         updated_user = users.get(user_id)
#         updated_user.update({"city": user.city})  
#         updated_user.update({"email": user.email})
#         updated_user.update({"name": user.name})
#         users.update({user_id : updated_user})
#         return {"message" : "User Updated", "Users":users}
#     else:
#         return "User ID Doesn't Exist"
def update_user(user_id:int, user:Users):
    with Session(engine) as session:
        user_exist = session.query(Users).filter(Users.id == user_id).one_or_none()
        if not user_exist:
            raise HTTPException(404, 'Invalid User ID')
        
        user_exist.name = user.name
        user_exist.city = user.city
        user_exist.email = user.email
        session.add(user_exist)
        session.commit()
        session.refresh(user_exist)
        return {"message":"User Updated Successfully","data":user_exist}
    


@app.delete("/users/{user_id}")
# def delete_user(user_id:int):
#     if user_id in users.keys():
#         del users[user_id]
#         return {"message":"User Deleted","Users":users}
#     else:
#         return {"message":"User_id is not valid"}
def delete_user(user_id:int):
    with Session(engine) as session:
        user_exist = session.query(Users).filter(Users.id == user_id).one_or_none()
        if not user_exist:
            raise HTTPException(404, 'Invalid User ID')

        session.delete(user_exist)
        session.commit()
        return {"message":"User Delted Successfully", "data":user_exist}