load_utils

  1import pandas as pd
  2import boto3, logging, json, os
  3from botocore.exceptions import ClientError
  4from sqlalchemy import create_engine, inspect, DateTime, Boolean, Date
  5from sqlalchemy.exc import SQLAlchemyError
  6from io import BytesIO
  7from datetime import datetime
  8from dotenv import load_dotenv
  9
 10load_dotenv()
 11
 12
 13def get_secret(
 14    secret_name: str = "project-onyx/warehouse-login",
 15    region_name: str = "eu-west-2",
 16):
 17    """
 18    Retrieves warehouse login credentials from AWS Secrets Manager and
 19    returns it as a sqlalchemy engine db string
 20
 21    :param secret_name: The name of the secret to retrieve.
 22    :param region_name: The AWS region where the secret is stored.
 23    :raises ClientError: If there is an error retrieving the secret.
 24    :return: The secret DB string
 25    """
 26    log_message(__name__, 10, "Entered get_secret_for_warehouse")
 27    try:
 28        client = boto3.client(service_name="secretsmanager", region_name=region_name)
 29        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
 30        secret_dict = get_secret_value_response["SecretString"]
 31        secret = json.loads(secret_dict)
 32        result = f"postgresql+pg8000://{secret['username']}:{secret['password']}@{secret['host']}:{secret['port']}/{secret['dbname']}"
 33        log_message(__name__, 20, "Secret retrieved")
 34        return result
 35
 36    except ClientError as e:
 37        # For a list of exceptions thrown, see
 38        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
 39        log_message(__name__, 40, e.response["Error"]["Message"])
 40        raise e
 41
 42
 43def log_message(name: str, level: int, message: str = ""):
 44    """
 45        Sends a message to the logger at a specified level.
 46
 47        :param name: The name of the logger.
 48        :param level: The logging level (one of 10, 20, 30, 40, 50).
 49    :param message: The message to log.
 50    """
 51    logger = logging.getLogger(name)
 52
 53    # Define a mapping of level integers to logging methods
 54    level_map = {
 55        10: logger.debug,
 56        20: logger.info,
 57        30: logger.warning,
 58        40: logger.error,
 59        50: logger.critical,
 60    }
 61
 62    # Get the logging method from the map
 63    log_method = level_map.get(level)
 64
 65    if log_method:
 66        log_method(message)
 67    else:
 68        logger.error("Invalid log level: %d", level)
 69
 70
 71def read_parquets_from_s3(
 72    s3_client, table, last_load, bucket="onyx-processed-data-bucket"
 73):
 74    """Reads parquet files from an s3 bucket and converts to pandas dataframe
 75
 76    Args:
 77        s3_client (boto3('s3')): Boto3 s3 client to access s3 bucket
 78        last_load (string): Read from .txt file containing timestamp
 79                            when load function last ran
 80        bucket (str, optional): The name of the s3 bucket to be read.
 81                                Defaults to "onyx-processed-data-bucket".
 82
 83    Returns:
 84        tuple: A tuple containing lists of dimension table names, fact table names,
 85               dimension dataframes, and fact dataframes.
 86    """
 87    try:
 88        log_message(__name__, 20, f"Entered read_parquet_from_s3 function for {table}.")
 89        bucket_contents = s3_client.list_objects_v2(Bucket=bucket, Prefix=table).get(
 90            "Contents", []
 91        )
 92        if not bucket_contents:
 93            log_message(__name__, 20, "No files found in the bucket.")
 94            return []
 95
 96        last_load = datetime.strptime(last_load, "%Y-%m-%d %H:%M:%S%z")
 97
 98        new_files = [
 99            file["Key"]
100            for file in bucket_contents
101            if last_load
102            and last_load < file["LastModified"]
103            and ".txt" not in file["Key"]
104        ]
105        if not new_files:
106            log_message(__name__, 20, "No new files to process.")
107            return []
108
109        df_list = []
110        for parquet_file_name in new_files:
111            log_message(__name__, 20, f"Converting {parquet_file_name} to dataframe.")
112            response = s3_client.get_object(Bucket=bucket, Key=parquet_file_name)
113            data = response["Body"].read()
114            df = pd.read_parquet(BytesIO(data))
115            df_list.append(df)
116            log_message(__name__, 20, f"Conversion of {parquet_file_name} complete.")
117
118        log_message(__name__, 20, f"Parquet {table} files read and dataframes created.")
119        return df_list
120
121    except ClientError as e:
122        log_message(
123            __name__, 40, f"Error reading from S3: {e.response['Error']['Message']}"
124        )
125        raise e
126    except Exception as e:
127        log_message(__name__, 40, f"Unexpected error: {str(e)}")
128        raise e
129
130
131def write_df_to_warehouse(df_list, table, engine_string=os.getenv("TEST-ENGINE")):
132    """
133    Writes the DataFrames to the associated tables in a PostgreSQL database.
134
135    Args:
136        read_parquet (list): List of lists received via output of
137                             read_parquets_from_s3 function.
138        engine_string (str, optional): Database credentials in SQLAlchemy db string
139                                       format. Defaults to the 'TEST-ENGINE' env variable.
140    """
141    try:
142        if not engine_string:
143            engine_string = get_secret()
144
145        if not df_list:
146            log_message(__name__, 20, "No data to write to the warehouse.")
147            return
148
149        for df in df_list:
150            log_message(__name__, 20, f"Writing data to {table}.")
151
152            upload_dataframe_to_table(df, table)
153            log_message(__name__, 20, f"Data written to {table} successfully.")
154
155    except SQLAlchemyError as e:
156        log_message(__name__, 40, f"SQLAlchemy error: {str(e)}")
157        raise e
158    except Exception as e:
159        log_message(__name__, 40, f"Unexpected error: {str(e)}")
160        raise e
161
162
163def upload_dataframe_to_table(df, table):
164    """
165    Converts dataframe values to match those in the reference table and uploads it, ensuring no duplicates.
166
167    Parameters:
168    df (pd.DataFrame): The dataframe to be processed and uploaded.
169    table (str): The name of the target table in the database.
170
171    Returns:
172    None
173    """
174
175    engine_url = get_secret()
176    log_message(__name__, 20, "Retrieved engine URL.")
177
178    engine = create_engine(engine_url)
179    log_message(__name__, 20, f"Created engine for table: {table}.")
180
181    try:
182        with engine.begin() as connection:
183            # Use Inspector to get table schema
184            inspector = inspect(connection)
185            columns = inspector.get_columns(table)
186
187            # Create a dictionary of column names and types
188            # Ensure dataframe columns match table columns
189
190            table_columns = {col["name"]: col["type"] for col in columns}
191
192            log_message(__name__, 20, f"Table columns: {table_columns}")
193            log_message(__name__, 20, f"Dataframe columns: {df.columns} ")
194
195            primary_key_column = df.columns[0]
196            log_message(
197                __name__, 20, f"Primary key column identified: {primary_key_column}."
198            )
199            # Convert dataframe columns to the correct types
200            for col_name, col_type in table_columns.items():
201                if isinstance(col_type, DateTime) or isinstance(col_type, Date):
202                    df[col_name] = pd.to_datetime(df[col_name], errors="coerce").dt.date
203                elif col_type.__class__.__name__ == "Integer":
204                    df[col_name] = pd.to_numeric(
205                        df[col_name], errors="coerce", downcast="integer"
206                    )
207                elif col_type.__class__.__name__ == "Float":
208                    df[col_name] = pd.to_numeric(
209                        df[col_name], errors="coerce", downcast="float"
210                    )
211                elif col_type.__class__.__name__ == "String":
212                    df[col_name] = df[col_name].astype(str)
213
214                elif isinstance(col_type, Boolean):
215                    # Convert to boolean
216                    df[col_name] = df[col_name].astype(bool)
217
218                log_message(
219                    __name__, 20, f"Converted column {col_name} to type {col_type}."
220                )
221
222            existing_data = pd.read_sql_table(
223                table, con=connection, schema="project_team_3"
224            )
225            log_message(__name__, 20, f"Retrieved existing data from {table}.")
226
227            if not table.startswith("fact"):
228                df = df[~df[primary_key_column].isin(existing_data[primary_key_column])]
229                log_message(__name__, 20, f"Removed duplicate rows from dataframe.")
230
231            log_message(__name__, 10, f"Dataframe being uploaded: {df.head()}")
232
233            df.to_sql(
234                table,
235                con=connection,
236                schema="project_team_3",
237                if_exists="append",
238                index=False,
239            )
240            log_message(__name__, 20, f"Uploaded data to {table} successfully.")
241
242    except SQLAlchemyError as e:
243        log_message(__name__, 40, f"SQLAlchemy error: {str(e)}")
244        raise e
245    except Exception as e:
246        log_message(__name__, 40, f"Unexpected error: {str(e)}")
247        raise e
def get_secret( secret_name: str = 'project-onyx/warehouse-login', region_name: str = 'eu-west-2'):
14def get_secret(
15    secret_name: str = "project-onyx/warehouse-login",
16    region_name: str = "eu-west-2",
17):
18    """
19    Retrieves warehouse login credentials from AWS Secrets Manager and
20    returns it as a sqlalchemy engine db string
21
22    :param secret_name: The name of the secret to retrieve.
23    :param region_name: The AWS region where the secret is stored.
24    :raises ClientError: If there is an error retrieving the secret.
25    :return: The secret DB string
26    """
27    log_message(__name__, 10, "Entered get_secret_for_warehouse")
28    try:
29        client = boto3.client(service_name="secretsmanager", region_name=region_name)
30        get_secret_value_response = client.get_secret_value(SecretId=secret_name)
31        secret_dict = get_secret_value_response["SecretString"]
32        secret = json.loads(secret_dict)
33        result = f"postgresql+pg8000://{secret['username']}:{secret['password']}@{secret['host']}:{secret['port']}/{secret['dbname']}"
34        log_message(__name__, 20, "Secret retrieved")
35        return result
36
37    except ClientError as e:
38        # For a list of exceptions thrown, see
39        # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
40        log_message(__name__, 40, e.response["Error"]["Message"])
41        raise e

Retrieves warehouse login credentials from AWS Secrets Manager and returns it as a sqlalchemy engine db string

Parameters
  • secret_name: The name of the secret to retrieve.
  • region_name: The AWS region where the secret is stored.
Raises
  • ClientError: If there is an error retrieving the secret.
Returns

The secret DB string

def log_message(name: str, level: int, message: str = ''):
44def log_message(name: str, level: int, message: str = ""):
45    """
46        Sends a message to the logger at a specified level.
47
48        :param name: The name of the logger.
49        :param level: The logging level (one of 10, 20, 30, 40, 50).
50    :param message: The message to log.
51    """
52    logger = logging.getLogger(name)
53
54    # Define a mapping of level integers to logging methods
55    level_map = {
56        10: logger.debug,
57        20: logger.info,
58        30: logger.warning,
59        40: logger.error,
60        50: logger.critical,
61    }
62
63    # Get the logging method from the map
64    log_method = level_map.get(level)
65
66    if log_method:
67        log_method(message)
68    else:
69        logger.error("Invalid log level: %d", level)

Sends a message to the logger at a specified level.

:param name: The name of the logger.
:param level: The logging level (one of 10, 20, 30, 40, 50).
Parameters
  • message: The message to log.
def read_parquets_from_s3(s3_client, table, last_load, bucket='onyx-processed-data-bucket'):
 72def read_parquets_from_s3(
 73    s3_client, table, last_load, bucket="onyx-processed-data-bucket"
 74):
 75    """Reads parquet files from an s3 bucket and converts to pandas dataframe
 76
 77    Args:
 78        s3_client (boto3('s3')): Boto3 s3 client to access s3 bucket
 79        last_load (string): Read from .txt file containing timestamp
 80                            when load function last ran
 81        bucket (str, optional): The name of the s3 bucket to be read.
 82                                Defaults to "onyx-processed-data-bucket".
 83
 84    Returns:
 85        tuple: A tuple containing lists of dimension table names, fact table names,
 86               dimension dataframes, and fact dataframes.
 87    """
 88    try:
 89        log_message(__name__, 20, f"Entered read_parquet_from_s3 function for {table}.")
 90        bucket_contents = s3_client.list_objects_v2(Bucket=bucket, Prefix=table).get(
 91            "Contents", []
 92        )
 93        if not bucket_contents:
 94            log_message(__name__, 20, "No files found in the bucket.")
 95            return []
 96
 97        last_load = datetime.strptime(last_load, "%Y-%m-%d %H:%M:%S%z")
 98
 99        new_files = [
100            file["Key"]
101            for file in bucket_contents
102            if last_load
103            and last_load < file["LastModified"]
104            and ".txt" not in file["Key"]
105        ]
106        if not new_files:
107            log_message(__name__, 20, "No new files to process.")
108            return []
109
110        df_list = []
111        for parquet_file_name in new_files:
112            log_message(__name__, 20, f"Converting {parquet_file_name} to dataframe.")
113            response = s3_client.get_object(Bucket=bucket, Key=parquet_file_name)
114            data = response["Body"].read()
115            df = pd.read_parquet(BytesIO(data))
116            df_list.append(df)
117            log_message(__name__, 20, f"Conversion of {parquet_file_name} complete.")
118
119        log_message(__name__, 20, f"Parquet {table} files read and dataframes created.")
120        return df_list
121
122    except ClientError as e:
123        log_message(
124            __name__, 40, f"Error reading from S3: {e.response['Error']['Message']}"
125        )
126        raise e
127    except Exception as e:
128        log_message(__name__, 40, f"Unexpected error: {str(e)}")
129        raise e

Reads parquet files from an s3 bucket and converts to pandas dataframe

Args: s3_client (boto3('s3')): Boto3 s3 client to access s3 bucket last_load (string): Read from .txt file containing timestamp when load function last ran bucket (str, optional): The name of the s3 bucket to be read. Defaults to "onyx-processed-data-bucket".

Returns: tuple: A tuple containing lists of dimension table names, fact table names, dimension dataframes, and fact dataframes.

def write_df_to_warehouse(df_list, table, engine_string=None):
132def write_df_to_warehouse(df_list, table, engine_string=os.getenv("TEST-ENGINE")):
133    """
134    Writes the DataFrames to the associated tables in a PostgreSQL database.
135
136    Args:
137        read_parquet (list): List of lists received via output of
138                             read_parquets_from_s3 function.
139        engine_string (str, optional): Database credentials in SQLAlchemy db string
140                                       format. Defaults to the 'TEST-ENGINE' env variable.
141    """
142    try:
143        if not engine_string:
144            engine_string = get_secret()
145
146        if not df_list:
147            log_message(__name__, 20, "No data to write to the warehouse.")
148            return
149
150        for df in df_list:
151            log_message(__name__, 20, f"Writing data to {table}.")
152
153            upload_dataframe_to_table(df, table)
154            log_message(__name__, 20, f"Data written to {table} successfully.")
155
156    except SQLAlchemyError as e:
157        log_message(__name__, 40, f"SQLAlchemy error: {str(e)}")
158        raise e
159    except Exception as e:
160        log_message(__name__, 40, f"Unexpected error: {str(e)}")
161        raise e

Writes the DataFrames to the associated tables in a PostgreSQL database.

Args: read_parquet (list): List of lists received via output of read_parquets_from_s3 function. engine_string (str, optional): Database credentials in SQLAlchemy db string format. Defaults to the 'TEST-ENGINE' env variable.

def upload_dataframe_to_table(df, table):
164def upload_dataframe_to_table(df, table):
165    """
166    Converts dataframe values to match those in the reference table and uploads it, ensuring no duplicates.
167
168    Parameters:
169    df (pd.DataFrame): The dataframe to be processed and uploaded.
170    table (str): The name of the target table in the database.
171
172    Returns:
173    None
174    """
175
176    engine_url = get_secret()
177    log_message(__name__, 20, "Retrieved engine URL.")
178
179    engine = create_engine(engine_url)
180    log_message(__name__, 20, f"Created engine for table: {table}.")
181
182    try:
183        with engine.begin() as connection:
184            # Use Inspector to get table schema
185            inspector = inspect(connection)
186            columns = inspector.get_columns(table)
187
188            # Create a dictionary of column names and types
189            # Ensure dataframe columns match table columns
190
191            table_columns = {col["name"]: col["type"] for col in columns}
192
193            log_message(__name__, 20, f"Table columns: {table_columns}")
194            log_message(__name__, 20, f"Dataframe columns: {df.columns} ")
195
196            primary_key_column = df.columns[0]
197            log_message(
198                __name__, 20, f"Primary key column identified: {primary_key_column}."
199            )
200            # Convert dataframe columns to the correct types
201            for col_name, col_type in table_columns.items():
202                if isinstance(col_type, DateTime) or isinstance(col_type, Date):
203                    df[col_name] = pd.to_datetime(df[col_name], errors="coerce").dt.date
204                elif col_type.__class__.__name__ == "Integer":
205                    df[col_name] = pd.to_numeric(
206                        df[col_name], errors="coerce", downcast="integer"
207                    )
208                elif col_type.__class__.__name__ == "Float":
209                    df[col_name] = pd.to_numeric(
210                        df[col_name], errors="coerce", downcast="float"
211                    )
212                elif col_type.__class__.__name__ == "String":
213                    df[col_name] = df[col_name].astype(str)
214
215                elif isinstance(col_type, Boolean):
216                    # Convert to boolean
217                    df[col_name] = df[col_name].astype(bool)
218
219                log_message(
220                    __name__, 20, f"Converted column {col_name} to type {col_type}."
221                )
222
223            existing_data = pd.read_sql_table(
224                table, con=connection, schema="project_team_3"
225            )
226            log_message(__name__, 20, f"Retrieved existing data from {table}.")
227
228            if not table.startswith("fact"):
229                df = df[~df[primary_key_column].isin(existing_data[primary_key_column])]
230                log_message(__name__, 20, f"Removed duplicate rows from dataframe.")
231
232            log_message(__name__, 10, f"Dataframe being uploaded: {df.head()}")
233
234            df.to_sql(
235                table,
236                con=connection,
237                schema="project_team_3",
238                if_exists="append",
239                index=False,
240            )
241            log_message(__name__, 20, f"Uploaded data to {table} successfully.")
242
243    except SQLAlchemyError as e:
244        log_message(__name__, 40, f"SQLAlchemy error: {str(e)}")
245        raise e
246    except Exception as e:
247        log_message(__name__, 40, f"Unexpected error: {str(e)}")
248        raise e

Converts dataframe values to match those in the reference table and uploads it, ensuring no duplicates.

Parameters: df (pd.DataFrame): The dataframe to be processed and uploaded. table (str): The name of the target table in the database.

Returns: None