load_utils
1import pandas as pd 2import boto3, logging, json, os 3from botocore.exceptions import ClientError 4from sqlalchemy import create_engine, inspect, DateTime, Boolean, Date 5from sqlalchemy.exc import SQLAlchemyError 6from io import BytesIO 7from datetime import datetime 8from dotenv import load_dotenv 9 10load_dotenv() 11 12 13def get_secret( 14 secret_name: str = "project-onyx/warehouse-login", 15 region_name: str = "eu-west-2", 16): 17 """ 18 Retrieves warehouse login credentials from AWS Secrets Manager and 19 returns it as a sqlalchemy engine db string 20 21 :param secret_name: The name of the secret to retrieve. 22 :param region_name: The AWS region where the secret is stored. 23 :raises ClientError: If there is an error retrieving the secret. 24 :return: The secret DB string 25 """ 26 log_message(__name__, 10, "Entered get_secret_for_warehouse") 27 try: 28 client = boto3.client(service_name="secretsmanager", region_name=region_name) 29 get_secret_value_response = client.get_secret_value(SecretId=secret_name) 30 secret_dict = get_secret_value_response["SecretString"] 31 secret = json.loads(secret_dict) 32 result = f"postgresql+pg8000://{secret['username']}:{secret['password']}@{secret['host']}:{secret['port']}/{secret['dbname']}" 33 log_message(__name__, 20, "Secret retrieved") 34 return result 35 36 except ClientError as e: 37 # For a list of exceptions thrown, see 38 # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html 39 log_message(__name__, 40, e.response["Error"]["Message"]) 40 raise e 41 42 43def log_message(name: str, level: int, message: str = ""): 44 """ 45 Sends a message to the logger at a specified level. 46 47 :param name: The name of the logger. 48 :param level: The logging level (one of 10, 20, 30, 40, 50). 49 :param message: The message to log. 50 """ 51 logger = logging.getLogger(name) 52 53 # Define a mapping of level integers to logging methods 54 level_map = { 55 10: logger.debug, 56 20: logger.info, 57 30: logger.warning, 58 40: logger.error, 59 50: logger.critical, 60 } 61 62 # Get the logging method from the map 63 log_method = level_map.get(level) 64 65 if log_method: 66 log_method(message) 67 else: 68 logger.error("Invalid log level: %d", level) 69 70 71def read_parquets_from_s3( 72 s3_client, table, last_load, bucket="onyx-processed-data-bucket" 73): 74 """Reads parquet files from an s3 bucket and converts to pandas dataframe 75 76 Args: 77 s3_client (boto3('s3')): Boto3 s3 client to access s3 bucket 78 last_load (string): Read from .txt file containing timestamp 79 when load function last ran 80 bucket (str, optional): The name of the s3 bucket to be read. 81 Defaults to "onyx-processed-data-bucket". 82 83 Returns: 84 tuple: A tuple containing lists of dimension table names, fact table names, 85 dimension dataframes, and fact dataframes. 86 """ 87 try: 88 log_message(__name__, 20, f"Entered read_parquet_from_s3 function for {table}.") 89 bucket_contents = s3_client.list_objects_v2(Bucket=bucket, Prefix=table).get( 90 "Contents", [] 91 ) 92 if not bucket_contents: 93 log_message(__name__, 20, "No files found in the bucket.") 94 return [] 95 96 last_load = datetime.strptime(last_load, "%Y-%m-%d %H:%M:%S%z") 97 98 new_files = [ 99 file["Key"] 100 for file in bucket_contents 101 if last_load 102 and last_load < file["LastModified"] 103 and ".txt" not in file["Key"] 104 ] 105 if not new_files: 106 log_message(__name__, 20, "No new files to process.") 107 return [] 108 109 df_list = [] 110 for parquet_file_name in new_files: 111 log_message(__name__, 20, f"Converting {parquet_file_name} to dataframe.") 112 response = s3_client.get_object(Bucket=bucket, Key=parquet_file_name) 113 data = response["Body"].read() 114 df = pd.read_parquet(BytesIO(data)) 115 df_list.append(df) 116 log_message(__name__, 20, f"Conversion of {parquet_file_name} complete.") 117 118 log_message(__name__, 20, f"Parquet {table} files read and dataframes created.") 119 return df_list 120 121 except ClientError as e: 122 log_message( 123 __name__, 40, f"Error reading from S3: {e.response['Error']['Message']}" 124 ) 125 raise e 126 except Exception as e: 127 log_message(__name__, 40, f"Unexpected error: {str(e)}") 128 raise e 129 130 131def write_df_to_warehouse(df_list, table, engine_string=os.getenv("TEST-ENGINE")): 132 """ 133 Writes the DataFrames to the associated tables in a PostgreSQL database. 134 135 Args: 136 read_parquet (list): List of lists received via output of 137 read_parquets_from_s3 function. 138 engine_string (str, optional): Database credentials in SQLAlchemy db string 139 format. Defaults to the 'TEST-ENGINE' env variable. 140 """ 141 try: 142 if not engine_string: 143 engine_string = get_secret() 144 145 if not df_list: 146 log_message(__name__, 20, "No data to write to the warehouse.") 147 return 148 149 for df in df_list: 150 log_message(__name__, 20, f"Writing data to {table}.") 151 152 upload_dataframe_to_table(df, table) 153 log_message(__name__, 20, f"Data written to {table} successfully.") 154 155 except SQLAlchemyError as e: 156 log_message(__name__, 40, f"SQLAlchemy error: {str(e)}") 157 raise e 158 except Exception as e: 159 log_message(__name__, 40, f"Unexpected error: {str(e)}") 160 raise e 161 162 163def upload_dataframe_to_table(df, table): 164 """ 165 Converts dataframe values to match those in the reference table and uploads it, ensuring no duplicates. 166 167 Parameters: 168 df (pd.DataFrame): The dataframe to be processed and uploaded. 169 table (str): The name of the target table in the database. 170 171 Returns: 172 None 173 """ 174 175 engine_url = get_secret() 176 log_message(__name__, 20, "Retrieved engine URL.") 177 178 engine = create_engine(engine_url) 179 log_message(__name__, 20, f"Created engine for table: {table}.") 180 181 try: 182 with engine.begin() as connection: 183 # Use Inspector to get table schema 184 inspector = inspect(connection) 185 columns = inspector.get_columns(table) 186 187 # Create a dictionary of column names and types 188 # Ensure dataframe columns match table columns 189 190 table_columns = {col["name"]: col["type"] for col in columns} 191 192 log_message(__name__, 20, f"Table columns: {table_columns}") 193 log_message(__name__, 20, f"Dataframe columns: {df.columns} ") 194 195 primary_key_column = df.columns[0] 196 log_message( 197 __name__, 20, f"Primary key column identified: {primary_key_column}." 198 ) 199 # Convert dataframe columns to the correct types 200 for col_name, col_type in table_columns.items(): 201 if isinstance(col_type, DateTime) or isinstance(col_type, Date): 202 df[col_name] = pd.to_datetime(df[col_name], errors="coerce").dt.date 203 elif col_type.__class__.__name__ == "Integer": 204 df[col_name] = pd.to_numeric( 205 df[col_name], errors="coerce", downcast="integer" 206 ) 207 elif col_type.__class__.__name__ == "Float": 208 df[col_name] = pd.to_numeric( 209 df[col_name], errors="coerce", downcast="float" 210 ) 211 elif col_type.__class__.__name__ == "String": 212 df[col_name] = df[col_name].astype(str) 213 214 elif isinstance(col_type, Boolean): 215 # Convert to boolean 216 df[col_name] = df[col_name].astype(bool) 217 218 log_message( 219 __name__, 20, f"Converted column {col_name} to type {col_type}." 220 ) 221 222 existing_data = pd.read_sql_table( 223 table, con=connection, schema="project_team_3" 224 ) 225 log_message(__name__, 20, f"Retrieved existing data from {table}.") 226 227 if not table.startswith("fact"): 228 df = df[~df[primary_key_column].isin(existing_data[primary_key_column])] 229 log_message(__name__, 20, f"Removed duplicate rows from dataframe.") 230 231 log_message(__name__, 10, f"Dataframe being uploaded: {df.head()}") 232 233 df.to_sql( 234 table, 235 con=connection, 236 schema="project_team_3", 237 if_exists="append", 238 index=False, 239 ) 240 log_message(__name__, 20, f"Uploaded data to {table} successfully.") 241 242 except SQLAlchemyError as e: 243 log_message(__name__, 40, f"SQLAlchemy error: {str(e)}") 244 raise e 245 except Exception as e: 246 log_message(__name__, 40, f"Unexpected error: {str(e)}") 247 raise e
14def get_secret( 15 secret_name: str = "project-onyx/warehouse-login", 16 region_name: str = "eu-west-2", 17): 18 """ 19 Retrieves warehouse login credentials from AWS Secrets Manager and 20 returns it as a sqlalchemy engine db string 21 22 :param secret_name: The name of the secret to retrieve. 23 :param region_name: The AWS region where the secret is stored. 24 :raises ClientError: If there is an error retrieving the secret. 25 :return: The secret DB string 26 """ 27 log_message(__name__, 10, "Entered get_secret_for_warehouse") 28 try: 29 client = boto3.client(service_name="secretsmanager", region_name=region_name) 30 get_secret_value_response = client.get_secret_value(SecretId=secret_name) 31 secret_dict = get_secret_value_response["SecretString"] 32 secret = json.loads(secret_dict) 33 result = f"postgresql+pg8000://{secret['username']}:{secret['password']}@{secret['host']}:{secret['port']}/{secret['dbname']}" 34 log_message(__name__, 20, "Secret retrieved") 35 return result 36 37 except ClientError as e: 38 # For a list of exceptions thrown, see 39 # https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html 40 log_message(__name__, 40, e.response["Error"]["Message"]) 41 raise e
Retrieves warehouse login credentials from AWS Secrets Manager and returns it as a sqlalchemy engine db string
Parameters
- secret_name: The name of the secret to retrieve.
- region_name: The AWS region where the secret is stored.
Raises
- ClientError: If there is an error retrieving the secret.
Returns
The secret DB string
44def log_message(name: str, level: int, message: str = ""): 45 """ 46 Sends a message to the logger at a specified level. 47 48 :param name: The name of the logger. 49 :param level: The logging level (one of 10, 20, 30, 40, 50). 50 :param message: The message to log. 51 """ 52 logger = logging.getLogger(name) 53 54 # Define a mapping of level integers to logging methods 55 level_map = { 56 10: logger.debug, 57 20: logger.info, 58 30: logger.warning, 59 40: logger.error, 60 50: logger.critical, 61 } 62 63 # Get the logging method from the map 64 log_method = level_map.get(level) 65 66 if log_method: 67 log_method(message) 68 else: 69 logger.error("Invalid log level: %d", level)
Sends a message to the logger at a specified level.
:param name: The name of the logger.
:param level: The logging level (one of 10, 20, 30, 40, 50).
Parameters
- message: The message to log.
72def read_parquets_from_s3( 73 s3_client, table, last_load, bucket="onyx-processed-data-bucket" 74): 75 """Reads parquet files from an s3 bucket and converts to pandas dataframe 76 77 Args: 78 s3_client (boto3('s3')): Boto3 s3 client to access s3 bucket 79 last_load (string): Read from .txt file containing timestamp 80 when load function last ran 81 bucket (str, optional): The name of the s3 bucket to be read. 82 Defaults to "onyx-processed-data-bucket". 83 84 Returns: 85 tuple: A tuple containing lists of dimension table names, fact table names, 86 dimension dataframes, and fact dataframes. 87 """ 88 try: 89 log_message(__name__, 20, f"Entered read_parquet_from_s3 function for {table}.") 90 bucket_contents = s3_client.list_objects_v2(Bucket=bucket, Prefix=table).get( 91 "Contents", [] 92 ) 93 if not bucket_contents: 94 log_message(__name__, 20, "No files found in the bucket.") 95 return [] 96 97 last_load = datetime.strptime(last_load, "%Y-%m-%d %H:%M:%S%z") 98 99 new_files = [ 100 file["Key"] 101 for file in bucket_contents 102 if last_load 103 and last_load < file["LastModified"] 104 and ".txt" not in file["Key"] 105 ] 106 if not new_files: 107 log_message(__name__, 20, "No new files to process.") 108 return [] 109 110 df_list = [] 111 for parquet_file_name in new_files: 112 log_message(__name__, 20, f"Converting {parquet_file_name} to dataframe.") 113 response = s3_client.get_object(Bucket=bucket, Key=parquet_file_name) 114 data = response["Body"].read() 115 df = pd.read_parquet(BytesIO(data)) 116 df_list.append(df) 117 log_message(__name__, 20, f"Conversion of {parquet_file_name} complete.") 118 119 log_message(__name__, 20, f"Parquet {table} files read and dataframes created.") 120 return df_list 121 122 except ClientError as e: 123 log_message( 124 __name__, 40, f"Error reading from S3: {e.response['Error']['Message']}" 125 ) 126 raise e 127 except Exception as e: 128 log_message(__name__, 40, f"Unexpected error: {str(e)}") 129 raise e
Reads parquet files from an s3 bucket and converts to pandas dataframe
Args: s3_client (boto3('s3')): Boto3 s3 client to access s3 bucket last_load (string): Read from .txt file containing timestamp when load function last ran bucket (str, optional): The name of the s3 bucket to be read. Defaults to "onyx-processed-data-bucket".
Returns: tuple: A tuple containing lists of dimension table names, fact table names, dimension dataframes, and fact dataframes.
132def write_df_to_warehouse(df_list, table, engine_string=os.getenv("TEST-ENGINE")): 133 """ 134 Writes the DataFrames to the associated tables in a PostgreSQL database. 135 136 Args: 137 read_parquet (list): List of lists received via output of 138 read_parquets_from_s3 function. 139 engine_string (str, optional): Database credentials in SQLAlchemy db string 140 format. Defaults to the 'TEST-ENGINE' env variable. 141 """ 142 try: 143 if not engine_string: 144 engine_string = get_secret() 145 146 if not df_list: 147 log_message(__name__, 20, "No data to write to the warehouse.") 148 return 149 150 for df in df_list: 151 log_message(__name__, 20, f"Writing data to {table}.") 152 153 upload_dataframe_to_table(df, table) 154 log_message(__name__, 20, f"Data written to {table} successfully.") 155 156 except SQLAlchemyError as e: 157 log_message(__name__, 40, f"SQLAlchemy error: {str(e)}") 158 raise e 159 except Exception as e: 160 log_message(__name__, 40, f"Unexpected error: {str(e)}") 161 raise e
Writes the DataFrames to the associated tables in a PostgreSQL database.
Args: read_parquet (list): List of lists received via output of read_parquets_from_s3 function. engine_string (str, optional): Database credentials in SQLAlchemy db string format. Defaults to the 'TEST-ENGINE' env variable.
164def upload_dataframe_to_table(df, table): 165 """ 166 Converts dataframe values to match those in the reference table and uploads it, ensuring no duplicates. 167 168 Parameters: 169 df (pd.DataFrame): The dataframe to be processed and uploaded. 170 table (str): The name of the target table in the database. 171 172 Returns: 173 None 174 """ 175 176 engine_url = get_secret() 177 log_message(__name__, 20, "Retrieved engine URL.") 178 179 engine = create_engine(engine_url) 180 log_message(__name__, 20, f"Created engine for table: {table}.") 181 182 try: 183 with engine.begin() as connection: 184 # Use Inspector to get table schema 185 inspector = inspect(connection) 186 columns = inspector.get_columns(table) 187 188 # Create a dictionary of column names and types 189 # Ensure dataframe columns match table columns 190 191 table_columns = {col["name"]: col["type"] for col in columns} 192 193 log_message(__name__, 20, f"Table columns: {table_columns}") 194 log_message(__name__, 20, f"Dataframe columns: {df.columns} ") 195 196 primary_key_column = df.columns[0] 197 log_message( 198 __name__, 20, f"Primary key column identified: {primary_key_column}." 199 ) 200 # Convert dataframe columns to the correct types 201 for col_name, col_type in table_columns.items(): 202 if isinstance(col_type, DateTime) or isinstance(col_type, Date): 203 df[col_name] = pd.to_datetime(df[col_name], errors="coerce").dt.date 204 elif col_type.__class__.__name__ == "Integer": 205 df[col_name] = pd.to_numeric( 206 df[col_name], errors="coerce", downcast="integer" 207 ) 208 elif col_type.__class__.__name__ == "Float": 209 df[col_name] = pd.to_numeric( 210 df[col_name], errors="coerce", downcast="float" 211 ) 212 elif col_type.__class__.__name__ == "String": 213 df[col_name] = df[col_name].astype(str) 214 215 elif isinstance(col_type, Boolean): 216 # Convert to boolean 217 df[col_name] = df[col_name].astype(bool) 218 219 log_message( 220 __name__, 20, f"Converted column {col_name} to type {col_type}." 221 ) 222 223 existing_data = pd.read_sql_table( 224 table, con=connection, schema="project_team_3" 225 ) 226 log_message(__name__, 20, f"Retrieved existing data from {table}.") 227 228 if not table.startswith("fact"): 229 df = df[~df[primary_key_column].isin(existing_data[primary_key_column])] 230 log_message(__name__, 20, f"Removed duplicate rows from dataframe.") 231 232 log_message(__name__, 10, f"Dataframe being uploaded: {df.head()}") 233 234 df.to_sql( 235 table, 236 con=connection, 237 schema="project_team_3", 238 if_exists="append", 239 index=False, 240 ) 241 log_message(__name__, 20, f"Uploaded data to {table} successfully.") 242 243 except SQLAlchemyError as e: 244 log_message(__name__, 40, f"SQLAlchemy error: {str(e)}") 245 raise e 246 except Exception as e: 247 log_message(__name__, 40, f"Unexpected error: {str(e)}") 248 raise e
Converts dataframe values to match those in the reference table and uploads it, ensuring no duplicates.
Parameters: df (pd.DataFrame): The dataframe to be processed and uploaded. table (str): The name of the target table in the database.
Returns: None