tebakaja commited on
Commit
89eb9ba
1 Parent(s): 48df144

[ Feat ]: Implementation Server Side Caching \u Redis (Aiven Cloud)

Browse files
.github/workflows/gru_pipeline.yaml CHANGED
@@ -186,10 +186,12 @@ jobs:
186
  run: |
187
  rm models.zip
188
  rm pickles.zip
 
189
  rm posttrained.zip
190
 
191
  rm -rf models
192
  rm -rf pickles
 
193
  rm -rf posttrained
194
 
195
  - name: Commit changes
@@ -205,7 +207,15 @@ jobs:
205
  uses: ad-m/github-push-action@master
206
  with:
207
  github_token: ${{ secrets.GH_TOKEN }}
208
- branch: main
 
 
 
 
 
 
 
 
209
 
210
 
211
  tebakaja_crypto_space-0:
 
186
  run: |
187
  rm models.zip
188
  rm pickles.zip
189
+ rm datasets.zip
190
  rm posttrained.zip
191
 
192
  rm -rf models
193
  rm -rf pickles
194
+ rm -rf datasets
195
  rm -rf posttrained
196
 
197
  - name: Commit changes
 
207
  uses: ad-m/github-push-action@master
208
  with:
209
  github_token: ${{ secrets.GH_TOKEN }}
210
+ branch: main
211
+
212
+ - name: Flush All Caching Data (Redis)
213
+ if: env.match != 'true'
214
+ run: |
215
+ echo "FLUSHALL" | redis-cli --tls \
216
+ -h ${{ secrets.REDIS_HOST }} \
217
+ -p ${{ secrets.REDIS_PORT }} \
218
+ -a ${{ secrets.REDIS_TOKEN }}
219
 
220
 
221
  tebakaja_crypto_space-0:
.github/workflows/lstm_gru_pipeline.yaml CHANGED
@@ -186,10 +186,12 @@ jobs:
186
  run: |
187
  rm models.zip
188
  rm pickles.zip
 
189
  rm posttrained.zip
190
 
191
  rm -rf models
192
  rm -rf pickles
 
193
  rm -rf posttrained
194
 
195
  - name: Commit changes
 
186
  run: |
187
  rm models.zip
188
  rm pickles.zip
189
+ rm datasets.zip
190
  rm posttrained.zip
191
 
192
  rm -rf models
193
  rm -rf pickles
194
+ rm -rf datasets
195
  rm -rf posttrained
196
 
197
  - name: Commit changes
.github/workflows/lstm_pipeline.yaml CHANGED
@@ -186,10 +186,12 @@ jobs:
186
  run: |
187
  rm models.zip
188
  rm pickles.zip
 
189
  rm posttrained.zip
190
 
191
  rm -rf models
192
  rm -rf pickles
 
193
  rm -rf posttrained
194
 
195
  - name: Commit changes
 
186
  run: |
187
  rm models.zip
188
  rm pickles.zip
189
+ rm datasets.zip
190
  rm posttrained.zip
191
 
192
  rm -rf models
193
  rm -rf pickles
194
+ rm -rf datasets
195
  rm -rf posttrained
196
 
197
  - name: Commit changes
.gitignore CHANGED
@@ -9,6 +9,8 @@
9
  /Include
10
  /Scripts
11
 
 
 
12
  # Pycache
13
  /__pycache__
14
  /restful/__pycache__
 
9
  /Include
10
  /Scripts
11
 
12
+ .env
13
+
14
  # Pycache
15
  /__pycache__
16
  /restful/__pycache__
app.py CHANGED
@@ -2,6 +2,7 @@ from fastapi import FastAPI
2
  from restful.routes import route
3
  from fastapi.responses import RedirectResponse
4
  from fastapi.middleware.cors import CORSMiddleware
 
5
 
6
  app = FastAPI(
7
  title = "Cryptocurency Prediction Service",
@@ -24,5 +25,10 @@ app.include_router(
24
  )
25
 
26
  @app.get("/", tags = ['Main'])
27
- def root():
28
  return RedirectResponse(url="/docs")
 
 
 
 
 
 
2
  from restful.routes import route
3
  from fastapi.responses import RedirectResponse
4
  from fastapi.middleware.cors import CORSMiddleware
5
+ from restful.cachings import close_caching_connector
6
 
7
  app = FastAPI(
8
  title = "Cryptocurency Prediction Service",
 
25
  )
26
 
27
  @app.get("/", tags = ['Main'])
28
+ def root() -> None:
29
  return RedirectResponse(url="/docs")
30
+
31
+
32
+ @app.on_event("shutdown")
33
+ async def on_shutdown() -> None:
34
+ await close_caching_connector()
datasets.zip DELETED
Binary file (351 kB)
 
requirements.txt CHANGED
@@ -7,7 +7,9 @@ pandas==2.2.2
7
  protobuf==4.25.3
8
  pydantic==2.7.2
9
  pydantic_core==2.18.3
 
10
  scikit-learn==1.5.0
11
  scipy==1.13.1
12
  tensorflow==2.16.1
13
  uvicorn==0.30.1
 
 
7
  protobuf==4.25.3
8
  pydantic==2.7.2
9
  pydantic_core==2.18.3
10
+ redis==3.5.3
11
  scikit-learn==1.5.0
12
  scipy==1.13.1
13
  tensorflow==2.16.1
14
  uvicorn==0.30.1
15
+ valkey==6.0.0b1
restful/cachings.py ADDED
@@ -0,0 +1,39 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from redis import Redis
2
+ from valkey import Valkey
3
+ from typing import _ProtocolMeta
4
+ from restful.configs import settings, Settings
5
+
6
+
7
+ """ caching_connector """
8
+ def _caching_connector(db_type: str, env_var: Settings) -> _ProtocolMeta:
9
+ if db_type == 'redis':
10
+ return Redis(
11
+ host = env_var.CACHING_HOST,
12
+ port = env_var.CACHING_PORT,
13
+ password = env_var.CACHING_PASS,
14
+ ssl = True
15
+ )
16
+
17
+ elif db_type == 'valkey':
18
+ return Valkey(
19
+ host = env_var.CACHING_HOST,
20
+ port = env_var.CACHING_PORT,
21
+ password = env_var.CACHING_PASS,
22
+ ssl = True
23
+ )
24
+
25
+ # default
26
+ else:
27
+ return Redis(
28
+ host = env_var.CACHING_HOST,
29
+ port = env_var.CACHING_PORT,
30
+ password = env_var.CACHING_PASS,
31
+ ssl = True
32
+ )
33
+
34
+ connector: _ProtocolMeta = _caching_connector(
35
+ db_type = settings.CACHING_TYPE, env_var = settings)
36
+
37
+ async def close_caching_connector() -> None:
38
+ global connector
39
+ if connector: await connector.close()
restful/configs.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic_settings import BaseSettings
2
+
3
+
4
+ class Settings(BaseSettings):
5
+ CACHING_TYPE: str
6
+ CACHING_HOST: str
7
+ CACHING_PORT: int
8
+ CACHING_PASS: str
9
+
10
+ class Config:
11
+ env_file = ".env"
12
+
13
+ settings = Settings()
restful/controllers.py CHANGED
@@ -1,22 +1,23 @@
1
  import os
2
  from http import HTTPStatus
 
3
  from fastapi.responses import JSONResponse
4
  from restful.services import ForecastingService
5
  from restful.schemas import ForecastingServiceSchema
6
 
7
 
8
- """ Forecasting Controller """
9
  class ForecastingControllers:
10
 
11
  __SERVICE: ForecastingService = ForecastingService()
12
 
13
 
14
- """
15
- Algorithms Controller
16
- """
17
  async def algorithms_controller(self) -> JSONResponse:
18
  try:
19
- algorithms: list = sorted(os.listdir("resources/algorithms"))
 
 
20
  return JSONResponse(
21
  content = {
22
  'message': 'Success',
@@ -39,16 +40,15 @@ class ForecastingControllers:
39
 
40
 
41
 
42
- """
43
- Currency Controller
44
- """
45
  async def currencies_controller(self) -> JSONResponse:
46
  try:
47
  path: str = 'resources/datasets'
48
  datasets: list = sorted(
49
  [
50
  item.replace(".csv", "") for item in os.listdir(path)
51
- if os.path.isfile(os.path.join(path, item)) and item.endswith('.csv')
 
52
  ]
53
  )
54
 
@@ -74,10 +74,9 @@ class ForecastingControllers:
74
 
75
 
76
 
77
- """
78
- Forecasting Controller
79
- """
80
- async def forecasting_controller(self, payload: ForecastingServiceSchema) -> JSONResponse:
81
  try:
82
  path: str = 'resources/datasets'
83
  datasets: list = sorted(
@@ -120,7 +119,8 @@ class ForecastingControllers:
120
  )
121
 
122
 
123
- prediction: dict = await self.__SERVICE.forecasting(payload)
 
124
 
125
  if not prediction :
126
  return JSONResponse(
 
1
  import os
2
  from http import HTTPStatus
3
+ from typing import _ProtocolMeta
4
  from fastapi.responses import JSONResponse
5
  from restful.services import ForecastingService
6
  from restful.schemas import ForecastingServiceSchema
7
 
8
 
9
+ # Forecasting Controller
10
  class ForecastingControllers:
11
 
12
  __SERVICE: ForecastingService = ForecastingService()
13
 
14
 
15
+ # Algorithms Controller
 
 
16
  async def algorithms_controller(self) -> JSONResponse:
17
  try:
18
+ algorithms: list = sorted(
19
+ os.listdir("resources/algorithms"))
20
+
21
  return JSONResponse(
22
  content = {
23
  'message': 'Success',
 
40
 
41
 
42
 
43
+ # Currency Controller
 
 
44
  async def currencies_controller(self) -> JSONResponse:
45
  try:
46
  path: str = 'resources/datasets'
47
  datasets: list = sorted(
48
  [
49
  item.replace(".csv", "") for item in os.listdir(path)
50
+ if os.path.isfile(os.path.join(path, item))
51
+ and item.endswith('.csv')
52
  ]
53
  )
54
 
 
74
 
75
 
76
 
77
+ # Forecasting Controller
78
+ async def forecasting_controller(self, payload: ForecastingServiceSchema,
79
+ caching: _ProtocolMeta) -> JSONResponse:
 
80
  try:
81
  path: str = 'resources/datasets'
82
  datasets: list = sorted(
 
119
  )
120
 
121
 
122
+ prediction: dict = await self.__SERVICE.forecasting(
123
+ payload = payload, caching = caching)
124
 
125
  if not prediction :
126
  return JSONResponse(
restful/cutils/build/lib.linux-x86_64-3.10/utilities.cpython-310-x86_64-linux-gnu.so CHANGED
Binary files a/restful/cutils/build/lib.linux-x86_64-3.10/utilities.cpython-310-x86_64-linux-gnu.so and b/restful/cutils/build/lib.linux-x86_64-3.10/utilities.cpython-310-x86_64-linux-gnu.so differ
 
restful/cutils/build/temp.linux-x86_64-3.10/utilities.o CHANGED
Binary files a/restful/cutils/build/temp.linux-x86_64-3.10/utilities.o and b/restful/cutils/build/temp.linux-x86_64-3.10/utilities.o differ
 
restful/cutils/utilities.c CHANGED
The diff for this file is too large to render. See raw diff
 
restful/cutils/utilities.cpython-310-x86_64-linux-gnu.so CHANGED
Binary files a/restful/cutils/utilities.cpython-310-x86_64-linux-gnu.so and b/restful/cutils/utilities.cpython-310-x86_64-linux-gnu.so differ
 
restful/cutils/utilities.pyx CHANGED
@@ -1,13 +1,14 @@
1
  import os
 
2
  from joblib import load
3
  from numpy import append, expand_dims
4
  from pandas import read_json, to_datetime, Timedelta
5
  from tensorflow.keras.models import load_model
6
- import cython
7
 
8
  cdef class Utilities:
9
  async def forecasting_utils(self, int sequence_length,
10
- int days, str model_name, str algorithm) -> tuple:
11
  cdef str model_path = os.path.join(f'./resources/algorithms/{algorithm}/models',
12
  f'{model_name}.keras')
13
  model = load_model(model_path)
@@ -21,27 +22,30 @@ cdef class Utilities:
21
  f'{model_name}_minmax_scaler.pickle'))
22
  standard_scaler = load(os.path.join(f'./resources/algorithms/{algorithm}/pickles',
23
  f'{model_name}_standard_scaler.pickle'))
24
-
25
- # Prediction
26
- lst_seq = dataframe[-sequence_length:].values
27
- lst_seq = expand_dims(lst_seq, axis=0)
28
 
29
- cdef dict predicted_prices = {}
30
- last_date = to_datetime(dataframe.index[-1])
 
 
31
 
32
- for _ in range(days):
33
- predicted_price = model.predict(lst_seq)
34
- last_date = last_date + Timedelta(days=1)
35
 
36
- predicted_prices[last_date] = minmax_scaler.inverse_transform(predicted_price)
37
- predicted_prices[last_date] = standard_scaler.inverse_transform(predicted_prices[last_date])
 
38
 
39
- lst_seq = append(lst_seq[:, 1:, :], [predicted_price], axis=1)
 
40
 
41
- predictions = [
42
- {'date': date.strftime('%Y-%m-%d'), 'price': float(price)} \
43
- for date, price in predicted_prices.items()
44
- ]
 
 
 
 
45
 
46
  # Actual
47
  df_date = dataframe.index[-sequence_length:].values
 
1
  import os
2
+ import cython
3
  from joblib import load
4
  from numpy import append, expand_dims
5
  from pandas import read_json, to_datetime, Timedelta
6
  from tensorflow.keras.models import load_model
7
+
8
 
9
  cdef class Utilities:
10
  async def forecasting_utils(self, int sequence_length,
11
+ int days, str model_name, str algorithm, bint with_pred) -> tuple:
12
  cdef str model_path = os.path.join(f'./resources/algorithms/{algorithm}/models',
13
  f'{model_name}.keras')
14
  model = load_model(model_path)
 
22
  f'{model_name}_minmax_scaler.pickle'))
23
  standard_scaler = load(os.path.join(f'./resources/algorithms/{algorithm}/pickles',
24
  f'{model_name}_standard_scaler.pickle'))
 
 
 
 
25
 
26
+ if with_pred:
27
+ # Prediction
28
+ lst_seq = dataframe[-sequence_length:].values
29
+ lst_seq = expand_dims(lst_seq, axis=0)
30
 
31
+ predicted_prices = {}
32
+ last_date = to_datetime(dataframe.index[-1])
 
33
 
34
+ for _ in range(days):
35
+ predicted_price = model.predict(lst_seq)
36
+ last_date = last_date + Timedelta(days=1)
37
 
38
+ predicted_prices[last_date] = minmax_scaler.inverse_transform(predicted_price)
39
+ predicted_prices[last_date] = standard_scaler.inverse_transform(predicted_prices[last_date])
40
 
41
+ lst_seq = append(lst_seq[:, 1:, :], [predicted_price], axis=1)
42
+
43
+ predictions = [
44
+ {'date': date.strftime('%Y-%m-%d'), 'price': float(price)} \
45
+ for date, price in predicted_prices.items()
46
+ ]
47
+
48
+ else: predictions = []
49
 
50
  # Actual
51
  df_date = dataframe.index[-sequence_length:].values
restful/routes.py CHANGED
@@ -1,13 +1,22 @@
 
1
  from fastapi import APIRouter, Body
2
  from fastapi.responses import JSONResponse
3
- from restful.controllers import ForecastingControllers
 
4
  from restful.schemas import ForecastingServiceSchema
 
 
5
 
6
  """ API Router """
7
  route = APIRouter()
8
 
 
9
  """ Forecasting Controller """
10
- __CONTROLLER = ForecastingControllers()
 
 
 
 
11
 
12
 
13
  """ Algorithms Route """
@@ -27,5 +36,5 @@ async def currencies_route() -> JSONResponse:
27
  async def forecasting_route(
28
  payload: ForecastingServiceSchema = Body(...)
29
  ) -> JSONResponse:
30
- return await __CONTROLLER.forecasting_controller(payload = payload)
31
 
 
1
+ from typing import _ProtocolMeta
2
  from fastapi import APIRouter, Body
3
  from fastapi.responses import JSONResponse
4
+
5
+ from restful.cachings import connector
6
  from restful.schemas import ForecastingServiceSchema
7
+ from restful.controllers import ForecastingControllers
8
+
9
 
10
  """ API Router """
11
  route = APIRouter()
12
 
13
+
14
  """ Forecasting Controller """
15
+ __CONTROLLER: ForecastingControllers = ForecastingControllers()
16
+
17
+
18
+ """ Caching Connector """
19
+ __CONNECTOR: _ProtocolMeta = connector
20
 
21
 
22
  """ Algorithms Route """
 
36
  async def forecasting_route(
37
  payload: ForecastingServiceSchema = Body(...)
38
  ) -> JSONResponse:
39
+ return await __CONTROLLER.forecasting_controller(payload = payload, caching = __CONNECTOR)
40
 
restful/services.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  from restful.cutils.utilities import Utilities
2
  from restful.schemas import ForecastingServiceSchema
3
 
@@ -7,17 +9,27 @@ class ForecastingService:
7
 
8
  __FORECAST_UTILS = Utilities()
9
 
10
- async def forecasting(self, payload: ForecastingServiceSchema) -> dict:
11
- days: int = payload.days
12
- currency: str = payload.currency
13
- algorithm: str = payload.algorithm
14
 
15
  actuals, predictions = await self.__FORECAST_UTILS.forecasting_utils(
16
- days = days,
17
- algorithm = algorithm,
18
- model_name = currency,
19
 
 
20
  sequence_length = 60
21
  )
22
 
 
 
 
 
 
 
 
 
 
 
23
  return {'actuals': actuals, 'predictions': predictions}
 
1
+ import json
2
+ from typing import _ProtocolMeta
3
  from restful.cutils.utilities import Utilities
4
  from restful.schemas import ForecastingServiceSchema
5
 
 
9
 
10
  __FORECAST_UTILS = Utilities()
11
 
12
+ async def forecasting(self, payload: ForecastingServiceSchema, caching: _ProtocolMeta) -> dict:
13
+ caching_data = caching.get(
14
+ f'{payload.algorithm}_{payload.currency}_{payload.days}')
 
15
 
16
  actuals, predictions = await self.__FORECAST_UTILS.forecasting_utils(
17
+ days = payload.days,
18
+ algorithm = payload.algorithm,
19
+ model_name = payload.currency,
20
 
21
+ with_pred = (caching_data == None),
22
  sequence_length = 60
23
  )
24
 
25
+ if caching_data != None:
26
+ predictions = json.loads(caching_data.decode('utf-8'))
27
+
28
+ else:
29
+ caching.set(
30
+ f'{payload.algorithm}_{payload.currency}_{payload.days}',
31
+ json.dumps(predictions)
32
+ )
33
+
34
+
35
  return {'actuals': actuals, 'predictions': predictions}