transform_utils
1import pandas as pd 2import logging, json, boto3, time 3from datetime import datetime 4from typing import Optional 5from botocore.exceptions import ClientError 6from io import BytesIO 7 8pd.set_option("display.max_rows", None) 9pd.set_option("display.max_columns", None) 10 11 12def log_message(name: str, level: int, message: str = ""): 13 """ 14 Sends a message to the logger at a specified level. 15 16 :param name: The name of the logger. 17 :param level: The logging level (one of 10, 20, 30, 40, 50). 18 :param message: The message to log. 19 """ 20 logger = logging.getLogger(name) 21 22 # Define a mapping of level integers to logging methods 23 level_map = { 24 10: logger.debug, 25 20: logger.info, 26 30: logger.warning, 27 40: logger.error, 28 50: logger.critical, 29 } 30 31 # Get the logging method from the map 32 log_method = level_map.get(level) 33 34 if log_method: 35 log_method(message) 36 else: 37 logger.error("Invalid log level: %d", level) 38 39 40def list_s3_files_by_prefix(bucket: str, prefix: str = "", s3_client=None) -> list: 41 """ 42 Lists files in an S3 bucket. If a prefix is provided, it filters the files by that prefix. 43 If no prefix is provided, it lists all files in the bucket. 44 45 Args: 46 bucket (str): The name of the S3 bucket. 47 prefix (str, optional): The prefix to filter the S3 objects. Defaults to an empty string, which lists all files. 48 s3_client (boto3.client, optional): The S3 client. If not provided, a new client will be created. 49 50 Returns: 51 list: A list of keys (file paths) in the S3 bucket that match the prefix or all files if no prefix is provided. 52 """ 53 if not s3_client: 54 s3_client = boto3.client("s3") 55 56 try: 57 response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix) 58 59 # Check if 'Contents' is in the response 60 if "Contents" in response: 61 file_list = [content["Key"] for content in response["Contents"]] 62 log_message( 63 __name__, 64 20, 65 f"Found {len(file_list)} files in bucket '{bucket}' with prefix '{prefix}'", 66 ) 67 return file_list 68 else: 69 log_message( 70 __name__, 71 20, 72 f"No files found in bucket '{bucket}' with prefix '{prefix}'", 73 ) 74 return [] 75 76 except Exception as e: 77 log_message( 78 __name__, 79 40, 80 f"Failed to list files in S3 bucket '{bucket}' with prefix '{prefix}': {e}", 81 ) 82 return [] 83 84 85def create_df_from_json_in_bucket( 86 source_bucket: str, file_name: str, s3_client=None 87) -> Optional[pd.DataFrame]: 88 """ 89 Reads a JSON file from an S3 bucket and converts it to a pandas DataFrame. 90 91 Args: 92 source_bucket (str): The name of the S3 bucket. 93 file_name (str): The key (path) of the JSON file within the S3 bucket. 94 s3_client (client): AWS S3 client. 95 96 Returns: 97 Optional[pd.DataFrame]: A DataFrame containing the data from the JSON file, 98 or None if there are issues with the file or its content. 99 """ 100 if not s3_client: 101 s3_client = boto3.client("s3") 102 103 if not file_name.endswith(".json"): 104 log_message(__name__, 20, f"File {file_name} is not a JSON file.") 105 return None 106 107 try: 108 # Retrieve the JSON file from S3 109 json_file = s3_client.get_object(Bucket=source_bucket, Key=file_name) 110 json_contents = json_file["Body"].read().decode("utf-8") 111 json_data = json.loads(json_contents) 112 113 # Determine the table name from the file path 114 table = file_name.split("/")[0] 115 data = json_data.get(table, []) 116 117 if not data: 118 log_message( 119 __name__, 30, f"No data found for table {table} in file {file_name}." 120 ) 121 return None 122 123 df = pd.DataFrame(data) 124 return df 125 126 except json.JSONDecodeError as e: 127 log_message(__name__, 40, f"JSON decoding error for file {file_name}: {e}") 128 except ClientError as e: 129 log_message( 130 __name__, 131 40, 132 f"Client error when accessing {file_name} in bucket {source_bucket}: {e}", 133 ) 134 except Exception as e: 135 log_message( 136 __name__, 137 40, 138 f"Unexpected error reading or processing file {file_name}: {e}", 139 ) 140 141 142def create_dim_date(start_date: str, end_date: str) -> pd.DataFrame: 143 """ 144 Creates a dimension date DataFrame for a given range of dates. 145 146 Args: 147 start_date (str): Start date in "YYYY-MM-DD" format. 148 end_date (str): End date in "YYYY-MM-DD" format. 149 150 Returns: 151 pd.DataFrame: DataFrame containing the date dimension table. 152 """ 153 try: 154 start = datetime.strptime(start_date, "%Y-%m-%d") 155 end = datetime.strptime(end_date, "%Y-%m-%d") 156 date_range = pd.date_range(start=start, end=end) 157 158 dim_date = pd.DataFrame(date_range, columns=["date_id"]) 159 dim_date["year"] = dim_date["date_id"].dt.year 160 dim_date["month"] = dim_date["date_id"].dt.month 161 dim_date["day"] = dim_date["date_id"].dt.day 162 dim_date["day_of_week"] = dim_date["date_id"].dt.weekday + 1 163 dim_date["day_name"] = dim_date["date_id"].dt.day_name() 164 dim_date["month_name"] = dim_date["date_id"].dt.month_name() 165 dim_date["quarter"] = dim_date["date_id"].dt.quarter 166 167 dim_date["date_id"] = dim_date["date_id"].dt.date 168 169 return dim_date 170 171 except Exception as e: 172 log_message(__name__, 40, f"Error creating dim_date DataFrame: {e}") 173 raise 174 175 176def process_table( 177 df: pd.DataFrame, file: str, bucket: str, timer: int = 60, s3_client=None 178): 179 """ 180 Process specific table based on the file name, converts it to a dataframe. 181 182 Args: 183 source_bucket (str): The name of the S3 bucket containing the source JSON files. 184 file (str): str of file path (key) within the source bucket. 185 output_bucket (str): The name of the S3 bucket to upload processed files to. 186 timer (int): delay timer in order to allow files to be created before joining on another. 187 188 Returns: 189 pd.DataFrame: DataFrame containing the processed dim or fact table. 190 """ 191 if not s3_client: 192 s3_client = boto3.client("s3") 193 194 table = file.split("/")[0] 195 output_table = "" 196 197 try: 198 if table == "address": 199 df = df.rename(columns={"address_id": "location_id"}).drop( 200 ["created_at", "last_updated"], axis=1 201 ) 202 output_table = "dim_location" 203 204 elif table == "design": 205 df = df.drop(["created_at", "last_updated"], axis=1) 206 output_table = "dim_design" 207 208 elif table == "department": 209 df = df.drop(["created_at", "last_updated"], axis=1) 210 output_table = "dim_department" 211 212 elif table == "payment_type": 213 df = df.drop(["created_at", "last_updated"], axis=1) 214 output_table = "dim_payment_type" 215 216 elif table == "transaction": 217 df = df.drop(["created_at", "last_updated"], axis=1) 218 output_table = "dim_transaction" 219 220 elif table == "currency": 221 currency_mapping = { 222 "GBP": "British Pound Sterling", 223 "USD": "United States Dollar", 224 "EUR": "Euros", 225 } 226 df = df.drop(["created_at", "last_updated"], axis=1).assign( 227 currency_name=df["currency_code"].map(currency_mapping) 228 ) 229 if df["currency_name"].isnull().any(): 230 log_message(__name__, 30, f"Unmapped currency codes found in {file}.") 231 output_table = "dim_currency" 232 233 elif table == "counterparty": # combine counterparty with address table 234 log_message(__name__, 20, "Entered counterparty inside process_table") 235 time.sleep(timer) 236 dim_location_df = combine_parquet_from_s3(bucket, "dim_location") 237 df = df.merge( 238 dim_location_df, 239 how="inner", 240 left_on="legal_address_id", 241 right_on="location_id", 242 ) 243 df = df.drop( 244 [ 245 "commercial_contact", 246 "delivery_contact", 247 "created_at", 248 "last_updated", 249 "legal_address_id", 250 "location_id", 251 ], 252 axis=1, 253 ) 254 df = df.rename( 255 columns={ 256 "address_line_1": "counterparty_legal_address_line_1", 257 "address_line_2": "counterparty_legal_address_line_2", 258 "district": "counterparty_legal_district", 259 "city": "counterparty_legal_city", 260 "postal_code": "counterparty_legal_postal_code", 261 "country": "counterparty_legal_country", 262 "phone": "counterparty_legal_phone_number", 263 } 264 ) 265 output_table = "dim_counterparty" 266 267 elif table == "staff": 268 log_message(__name__, 10, "Entered staff inside process_table") 269 time.sleep(timer) 270 dim_department_df = combine_parquet_from_s3(bucket, "dim_department") 271 df = df.merge(dim_department_df, how="inner", on="department_id") 272 log_message(__name__, 10, "merged staff data frame with dim_department") 273 df = df.drop( 274 [ 275 "department_id", 276 "created_at", 277 "last_updated", 278 ], 279 axis=1, 280 ) 281 282 log_message(__name__, 10, "created staff data frame") 283 output_table = "dim_staff" 284 285 elif table == "sales_order": 286 # split the Name column into two columns using pd.Series.str.split() 287 df[["created_date", "created_time"]] = df["created_at"].str.split( 288 " ", expand=True 289 ) 290 df[["last_updated_date", "last_updated_time"]] = df[ 291 "last_updated" 292 ].str.split(" ", expand=True) 293 df = df.drop( 294 [ 295 "created_at", 296 "last_updated", 297 ], 298 axis=1, 299 ) 300 df = df.rename( 301 columns={ 302 "staff_id": "sales_staff_id", 303 } 304 ) 305 output_table = "fact_sales_order" 306 307 elif table == "purchase_order": 308 # split the Name column into two columns using pd.Series.str.split() 309 df[["created_date", "created_time"]] = df["created_at"].str.split( 310 " ", expand=True 311 ) 312 df[["last_updated_date", "last_updated_time"]] = df[ 313 "last_updated" 314 ].str.split(" ", expand=True) 315 df = df.drop( 316 [ 317 "created_at", 318 "last_updated", 319 ], 320 axis=1, 321 ) 322 output_table = "fact_purchase_order" 323 324 elif table == "payment": 325 # split the Name column into two columns using pd.Series.str.split() 326 df[["created_date", "created_time"]] = df["created_at"].str.split( 327 " ", expand=True 328 ) 329 df[["last_updated_date", "last_updated_time"]] = df[ 330 "last_updated" 331 ].str.split(" ", expand=True) 332 df = df.drop( 333 [ 334 "company_ac_number", 335 "counterparty_ac_number", 336 "created_at", 337 "last_updated", 338 ], 339 axis=1, 340 ) 341 output_table = "fact_payment" 342 343 else: 344 log_message( 345 __name__, 20, f"Unknown table encountered: {table}, skipping..." 346 ) 347 return 348 349 return (df, output_table) 350 351 except Exception as e: 352 log_message(__name__, 40, f"Error processing table {table}: {e}") 353 354 355def combine_parquet_from_s3(bucket: str, directory: str, s3_client=None): 356 """ 357 Reads a S3 bucket containing parquet files and combines them into a dataframe 358 Removes duplicate rows and keeps the last modification. 359 360 Args: 361 bucket (str): The name of the S3 bucket containing the source parquet files. 362 directory (str): The directory (table name) containing the parquet files. 363 s3_client (optional): AWS S3 client. 364 365 Returns: 366 Optional[pd.DataFrame]: A DataFrame containing the data from the combined parquet files, 367 or None if there are issues with the files or its content. 368 """ 369 log_message(__name__, 20, "Entered combine_parquet_from_s3") 370 if not s3_client: 371 s3_client = boto3.client("s3") 372 directory_files = list_s3_files_by_prefix(bucket, directory) 373 sorted_directory_files = sorted(directory_files) 374 dfs = [] 375 for file in sorted_directory_files: 376 response = s3_client.get_object(Bucket=bucket, Key=file) 377 data = response["Body"].read() 378 df = pd.read_parquet(BytesIO(data)) 379 dfs.append(df) 380 log_message(__name__, 10, "appended dfs in combine_parquet_from_s3") 381 combined_df = pd.concat(dfs, ignore_index=True) 382 combined_df.drop_duplicates(keep="last", inplace=True) 383 log_message(__name__, 10, "concatted and dropped dups") 384 return combined_df
13def log_message(name: str, level: int, message: str = ""): 14 """ 15 Sends a message to the logger at a specified level. 16 17 :param name: The name of the logger. 18 :param level: The logging level (one of 10, 20, 30, 40, 50). 19 :param message: The message to log. 20 """ 21 logger = logging.getLogger(name) 22 23 # Define a mapping of level integers to logging methods 24 level_map = { 25 10: logger.debug, 26 20: logger.info, 27 30: logger.warning, 28 40: logger.error, 29 50: logger.critical, 30 } 31 32 # Get the logging method from the map 33 log_method = level_map.get(level) 34 35 if log_method: 36 log_method(message) 37 else: 38 logger.error("Invalid log level: %d", level)
Sends a message to the logger at a specified level.
Parameters
- name: The name of the logger.
- level: The logging level (one of 10, 20, 30, 40, 50).
- message: The message to log.
41def list_s3_files_by_prefix(bucket: str, prefix: str = "", s3_client=None) -> list: 42 """ 43 Lists files in an S3 bucket. If a prefix is provided, it filters the files by that prefix. 44 If no prefix is provided, it lists all files in the bucket. 45 46 Args: 47 bucket (str): The name of the S3 bucket. 48 prefix (str, optional): The prefix to filter the S3 objects. Defaults to an empty string, which lists all files. 49 s3_client (boto3.client, optional): The S3 client. If not provided, a new client will be created. 50 51 Returns: 52 list: A list of keys (file paths) in the S3 bucket that match the prefix or all files if no prefix is provided. 53 """ 54 if not s3_client: 55 s3_client = boto3.client("s3") 56 57 try: 58 response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix) 59 60 # Check if 'Contents' is in the response 61 if "Contents" in response: 62 file_list = [content["Key"] for content in response["Contents"]] 63 log_message( 64 __name__, 65 20, 66 f"Found {len(file_list)} files in bucket '{bucket}' with prefix '{prefix}'", 67 ) 68 return file_list 69 else: 70 log_message( 71 __name__, 72 20, 73 f"No files found in bucket '{bucket}' with prefix '{prefix}'", 74 ) 75 return [] 76 77 except Exception as e: 78 log_message( 79 __name__, 80 40, 81 f"Failed to list files in S3 bucket '{bucket}' with prefix '{prefix}': {e}", 82 ) 83 return []
Lists files in an S3 bucket. If a prefix is provided, it filters the files by that prefix. If no prefix is provided, it lists all files in the bucket.
Args: bucket (str): The name of the S3 bucket. prefix (str, optional): The prefix to filter the S3 objects. Defaults to an empty string, which lists all files. s3_client (boto3.client, optional): The S3 client. If not provided, a new client will be created.
Returns: list: A list of keys (file paths) in the S3 bucket that match the prefix or all files if no prefix is provided.
86def create_df_from_json_in_bucket( 87 source_bucket: str, file_name: str, s3_client=None 88) -> Optional[pd.DataFrame]: 89 """ 90 Reads a JSON file from an S3 bucket and converts it to a pandas DataFrame. 91 92 Args: 93 source_bucket (str): The name of the S3 bucket. 94 file_name (str): The key (path) of the JSON file within the S3 bucket. 95 s3_client (client): AWS S3 client. 96 97 Returns: 98 Optional[pd.DataFrame]: A DataFrame containing the data from the JSON file, 99 or None if there are issues with the file or its content. 100 """ 101 if not s3_client: 102 s3_client = boto3.client("s3") 103 104 if not file_name.endswith(".json"): 105 log_message(__name__, 20, f"File {file_name} is not a JSON file.") 106 return None 107 108 try: 109 # Retrieve the JSON file from S3 110 json_file = s3_client.get_object(Bucket=source_bucket, Key=file_name) 111 json_contents = json_file["Body"].read().decode("utf-8") 112 json_data = json.loads(json_contents) 113 114 # Determine the table name from the file path 115 table = file_name.split("/")[0] 116 data = json_data.get(table, []) 117 118 if not data: 119 log_message( 120 __name__, 30, f"No data found for table {table} in file {file_name}." 121 ) 122 return None 123 124 df = pd.DataFrame(data) 125 return df 126 127 except json.JSONDecodeError as e: 128 log_message(__name__, 40, f"JSON decoding error for file {file_name}: {e}") 129 except ClientError as e: 130 log_message( 131 __name__, 132 40, 133 f"Client error when accessing {file_name} in bucket {source_bucket}: {e}", 134 ) 135 except Exception as e: 136 log_message( 137 __name__, 138 40, 139 f"Unexpected error reading or processing file {file_name}: {e}", 140 )
Reads a JSON file from an S3 bucket and converts it to a pandas DataFrame.
Args: source_bucket (str): The name of the S3 bucket. file_name (str): The key (path) of the JSON file within the S3 bucket. s3_client (client): AWS S3 client.
Returns: Optional[pd.DataFrame]: A DataFrame containing the data from the JSON file, or None if there are issues with the file or its content.
143def create_dim_date(start_date: str, end_date: str) -> pd.DataFrame: 144 """ 145 Creates a dimension date DataFrame for a given range of dates. 146 147 Args: 148 start_date (str): Start date in "YYYY-MM-DD" format. 149 end_date (str): End date in "YYYY-MM-DD" format. 150 151 Returns: 152 pd.DataFrame: DataFrame containing the date dimension table. 153 """ 154 try: 155 start = datetime.strptime(start_date, "%Y-%m-%d") 156 end = datetime.strptime(end_date, "%Y-%m-%d") 157 date_range = pd.date_range(start=start, end=end) 158 159 dim_date = pd.DataFrame(date_range, columns=["date_id"]) 160 dim_date["year"] = dim_date["date_id"].dt.year 161 dim_date["month"] = dim_date["date_id"].dt.month 162 dim_date["day"] = dim_date["date_id"].dt.day 163 dim_date["day_of_week"] = dim_date["date_id"].dt.weekday + 1 164 dim_date["day_name"] = dim_date["date_id"].dt.day_name() 165 dim_date["month_name"] = dim_date["date_id"].dt.month_name() 166 dim_date["quarter"] = dim_date["date_id"].dt.quarter 167 168 dim_date["date_id"] = dim_date["date_id"].dt.date 169 170 return dim_date 171 172 except Exception as e: 173 log_message(__name__, 40, f"Error creating dim_date DataFrame: {e}") 174 raise
Creates a dimension date DataFrame for a given range of dates.
Args: start_date (str): Start date in "YYYY-MM-DD" format. end_date (str): End date in "YYYY-MM-DD" format.
Returns: pd.DataFrame: DataFrame containing the date dimension table.
177def process_table( 178 df: pd.DataFrame, file: str, bucket: str, timer: int = 60, s3_client=None 179): 180 """ 181 Process specific table based on the file name, converts it to a dataframe. 182 183 Args: 184 source_bucket (str): The name of the S3 bucket containing the source JSON files. 185 file (str): str of file path (key) within the source bucket. 186 output_bucket (str): The name of the S3 bucket to upload processed files to. 187 timer (int): delay timer in order to allow files to be created before joining on another. 188 189 Returns: 190 pd.DataFrame: DataFrame containing the processed dim or fact table. 191 """ 192 if not s3_client: 193 s3_client = boto3.client("s3") 194 195 table = file.split("/")[0] 196 output_table = "" 197 198 try: 199 if table == "address": 200 df = df.rename(columns={"address_id": "location_id"}).drop( 201 ["created_at", "last_updated"], axis=1 202 ) 203 output_table = "dim_location" 204 205 elif table == "design": 206 df = df.drop(["created_at", "last_updated"], axis=1) 207 output_table = "dim_design" 208 209 elif table == "department": 210 df = df.drop(["created_at", "last_updated"], axis=1) 211 output_table = "dim_department" 212 213 elif table == "payment_type": 214 df = df.drop(["created_at", "last_updated"], axis=1) 215 output_table = "dim_payment_type" 216 217 elif table == "transaction": 218 df = df.drop(["created_at", "last_updated"], axis=1) 219 output_table = "dim_transaction" 220 221 elif table == "currency": 222 currency_mapping = { 223 "GBP": "British Pound Sterling", 224 "USD": "United States Dollar", 225 "EUR": "Euros", 226 } 227 df = df.drop(["created_at", "last_updated"], axis=1).assign( 228 currency_name=df["currency_code"].map(currency_mapping) 229 ) 230 if df["currency_name"].isnull().any(): 231 log_message(__name__, 30, f"Unmapped currency codes found in {file}.") 232 output_table = "dim_currency" 233 234 elif table == "counterparty": # combine counterparty with address table 235 log_message(__name__, 20, "Entered counterparty inside process_table") 236 time.sleep(timer) 237 dim_location_df = combine_parquet_from_s3(bucket, "dim_location") 238 df = df.merge( 239 dim_location_df, 240 how="inner", 241 left_on="legal_address_id", 242 right_on="location_id", 243 ) 244 df = df.drop( 245 [ 246 "commercial_contact", 247 "delivery_contact", 248 "created_at", 249 "last_updated", 250 "legal_address_id", 251 "location_id", 252 ], 253 axis=1, 254 ) 255 df = df.rename( 256 columns={ 257 "address_line_1": "counterparty_legal_address_line_1", 258 "address_line_2": "counterparty_legal_address_line_2", 259 "district": "counterparty_legal_district", 260 "city": "counterparty_legal_city", 261 "postal_code": "counterparty_legal_postal_code", 262 "country": "counterparty_legal_country", 263 "phone": "counterparty_legal_phone_number", 264 } 265 ) 266 output_table = "dim_counterparty" 267 268 elif table == "staff": 269 log_message(__name__, 10, "Entered staff inside process_table") 270 time.sleep(timer) 271 dim_department_df = combine_parquet_from_s3(bucket, "dim_department") 272 df = df.merge(dim_department_df, how="inner", on="department_id") 273 log_message(__name__, 10, "merged staff data frame with dim_department") 274 df = df.drop( 275 [ 276 "department_id", 277 "created_at", 278 "last_updated", 279 ], 280 axis=1, 281 ) 282 283 log_message(__name__, 10, "created staff data frame") 284 output_table = "dim_staff" 285 286 elif table == "sales_order": 287 # split the Name column into two columns using pd.Series.str.split() 288 df[["created_date", "created_time"]] = df["created_at"].str.split( 289 " ", expand=True 290 ) 291 df[["last_updated_date", "last_updated_time"]] = df[ 292 "last_updated" 293 ].str.split(" ", expand=True) 294 df = df.drop( 295 [ 296 "created_at", 297 "last_updated", 298 ], 299 axis=1, 300 ) 301 df = df.rename( 302 columns={ 303 "staff_id": "sales_staff_id", 304 } 305 ) 306 output_table = "fact_sales_order" 307 308 elif table == "purchase_order": 309 # split the Name column into two columns using pd.Series.str.split() 310 df[["created_date", "created_time"]] = df["created_at"].str.split( 311 " ", expand=True 312 ) 313 df[["last_updated_date", "last_updated_time"]] = df[ 314 "last_updated" 315 ].str.split(" ", expand=True) 316 df = df.drop( 317 [ 318 "created_at", 319 "last_updated", 320 ], 321 axis=1, 322 ) 323 output_table = "fact_purchase_order" 324 325 elif table == "payment": 326 # split the Name column into two columns using pd.Series.str.split() 327 df[["created_date", "created_time"]] = df["created_at"].str.split( 328 " ", expand=True 329 ) 330 df[["last_updated_date", "last_updated_time"]] = df[ 331 "last_updated" 332 ].str.split(" ", expand=True) 333 df = df.drop( 334 [ 335 "company_ac_number", 336 "counterparty_ac_number", 337 "created_at", 338 "last_updated", 339 ], 340 axis=1, 341 ) 342 output_table = "fact_payment" 343 344 else: 345 log_message( 346 __name__, 20, f"Unknown table encountered: {table}, skipping..." 347 ) 348 return 349 350 return (df, output_table) 351 352 except Exception as e: 353 log_message(__name__, 40, f"Error processing table {table}: {e}")
Process specific table based on the file name, converts it to a dataframe.
Args: source_bucket (str): The name of the S3 bucket containing the source JSON files. file (str): str of file path (key) within the source bucket. output_bucket (str): The name of the S3 bucket to upload processed files to. timer (int): delay timer in order to allow files to be created before joining on another.
Returns: pd.DataFrame: DataFrame containing the processed dim or fact table.
356def combine_parquet_from_s3(bucket: str, directory: str, s3_client=None): 357 """ 358 Reads a S3 bucket containing parquet files and combines them into a dataframe 359 Removes duplicate rows and keeps the last modification. 360 361 Args: 362 bucket (str): The name of the S3 bucket containing the source parquet files. 363 directory (str): The directory (table name) containing the parquet files. 364 s3_client (optional): AWS S3 client. 365 366 Returns: 367 Optional[pd.DataFrame]: A DataFrame containing the data from the combined parquet files, 368 or None if there are issues with the files or its content. 369 """ 370 log_message(__name__, 20, "Entered combine_parquet_from_s3") 371 if not s3_client: 372 s3_client = boto3.client("s3") 373 directory_files = list_s3_files_by_prefix(bucket, directory) 374 sorted_directory_files = sorted(directory_files) 375 dfs = [] 376 for file in sorted_directory_files: 377 response = s3_client.get_object(Bucket=bucket, Key=file) 378 data = response["Body"].read() 379 df = pd.read_parquet(BytesIO(data)) 380 dfs.append(df) 381 log_message(__name__, 10, "appended dfs in combine_parquet_from_s3") 382 combined_df = pd.concat(dfs, ignore_index=True) 383 combined_df.drop_duplicates(keep="last", inplace=True) 384 log_message(__name__, 10, "concatted and dropped dups") 385 return combined_df
Reads a S3 bucket containing parquet files and combines them into a dataframe Removes duplicate rows and keeps the last modification.
Args: bucket (str): The name of the S3 bucket containing the source parquet files. directory (str): The directory (table name) containing the parquet files. s3_client (optional): AWS S3 client.
Returns: Optional[pd.DataFrame]: A DataFrame containing the data from the combined parquet files, or None if there are issues with the files or its content.