film_mlmodule / steps /data_retrieving.py
root
first hf commit
6defa3d
raw
history blame contribute delete
No virus
1.05 kB
from zenml import step, pipeline
import psycopg2
from typing import Tuple
from typing_extensions import Annotated
import logging
import os
from dotenv import load_dotenv
import pandas as pd
from src.data_retrieving import Baseline
load_dotenv()
sql_folder_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "sql_files")
@step(enable_cache=False)
def retrieve_data() -> Tuple[
Annotated[pd.DataFrame, "df_movie"],
Annotated[pd.DataFrame, "df_user"],
Annotated[pd.DataFrame, "df_rating"],
]:
conn = psycopg2.connect(
dbname=os.environ.get('DB_NAME'),
user=os.environ.get('DB_USER'),
password=os.environ.get('DB_PASSWORD'),
host=os.environ.get('DB_HOST'),
port=os.environ.get("DB_PORT")
)
data_retrieve_class = Baseline()
df_movie, df_user, df_rating = data_retrieve_class.retrieve_data_sql(conn)
conn.close()
return df_movie, df_user, df_rating
@pipeline
def postgres_pipeline():
connect_to_postgres()
if __name__ == "__main__":
postgres_pipeline()