load
1import boto3, logging 2from botocore.exceptions import ClientError 3from sqlalchemy.exc import SQLAlchemyError 4from datetime import datetime, timezone 5from typing import Any 6from load_utils import ( 7 log_message, 8 read_parquets_from_s3, 9 write_df_to_warehouse, 10) 11 12# Configure logging 13logging.basicConfig( 14 level=logging.INFO, # Set the minimum logging level 15 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 16 force=True, 17 datefmt="%d/%m/%Y %I:%M:%S %p", 18) 19 20# Create a logger instance 21logger = logging.getLogger(__name__) 22 23 24def lambda_handler(event: dict, context: Any): 25 """ 26 AWS Lambda function entry point. 27 28 This function processes an event to load new files, transforms the data from 29 parquet files, and uploads the processed files to its associated data warehouse table. 30 31 :param event: The event data passed to the Lambda function (as a dictionary). 32 :param context: The runtime information of the Lambda function (e.g., function name, version). 33 :return: A dictionary with the status of the operation. 34 """ 35 log_message(__name__, 10, "Entered lambda_handler") 36 37 try: 38 # Extract source bucket from event or use default 39 source_bucket = event.get("source_bucket", "onyx-processed-data-bucket") 40 41 # Initialize the S3 client 42 s3_client = boto3.client("s3") 43 44 # Call the load function 45 load(source_bucket, s3_client=s3_client) 46 47 # Log completion and return success response 48 log_message(__name__, 20, "Load process completed successfully") 49 return {"statusCode": 200, "body": "Load process completed successfully"} 50 51 except Exception as e: 52 # Log the error and return failure response 53 log_message(__name__, 40, f"Error in lambda_handler: {str(e)}") 54 return {"statusCode": 500, "body": f"Error in lambda_handler: {str(e)}"} 55 56 57def load(bucket="onyx-processed-data-bucket", s3_client=None): 58 """ 59 Load function to process parquet files from S3 and write the data to a data warehouse. 60 61 Args: 62 bucket (str, optional): The S3 bucket name. Defaults to "onyx-processed-data-bucket". 63 s3_client (boto3.client, optional): The S3 client instance. Defaults to None. 64 """ 65 log_message(__name__, 10, "Entered load function") 66 67 # Initialize the S3 client if not provided 68 if not s3_client: 69 s3_client = boto3.client("s3") 70 71 # Retrieve the last load timestamp from S3 or set default if not present 72 try: 73 last_load_file = s3_client.get_object(Bucket=bucket, Key="last_load.txt") 74 last_load = last_load_file["Body"].read().decode("utf-8") 75 log_message(__name__, 20, f"Load function last ran at {last_load}") 76 except s3_client.exceptions.NoSuchKey: 77 last_load = "1900-01-01 00:00:00+0000" 78 log_message(__name__, 20, "Load function running for the first time") 79 80 # Write the DataFrame(s) to the data warehouse 81 try: 82 tables = [ 83 "dim_staff", 84 "dim_location", 85 "dim_counterparty", 86 "dim_currency", 87 "dim_design", 88 "dim_transaction", 89 "dim_payment_type", 90 "fact_sales_order", 91 "fact_purchase_order", 92 "fact_payment", 93 "dim_date", 94 ] 95 for table in tables: 96 97 df_list = read_parquets_from_s3(s3_client, table, last_load, bucket) 98 log_message( 99 __name__, 100 20, 101 f"Parquet file(s) for {table} read from processed data bucket", 102 ) 103 write_df_to_warehouse( 104 df_list, table, engine_string=None 105 ) # Pass engine_string if required 106 log_message(__name__, 20, f"Data written to {table} in data warehouse") 107 108 except SQLAlchemyError as e: # Handle SQLAlchemy errors specifically 109 log_message(__name__, 40, f"Warehouse write error: {str(e)}") 110 raise e 111 112 except ClientError as e: 113 log_message( 114 __name__, 115 40, 116 f"Error: {e.response['Error']['Message']}", 117 ) 118 raise e 119 120 # Update the last load timestamp in S3 121 try: 122 date = datetime.now(timezone.utc) 123 store_last_load = date.strftime("%Y-%m-%d %H:%M:%S%z") 124 s3_client.put_object(Bucket=bucket, Key="last_load.txt", Body=store_last_load) 125 log_message(__name__, 20, f"Updated last load timestamp to {store_last_load}") 126 127 except ClientError as e: 128 log_message( 129 __name__, 130 40, 131 f"Error updating last load timestamp: {e.response['Error']['Message']}", 132 )
logger =
<Logger load (INFO)>
def
lambda_handler(event: dict, context: Any):
25def lambda_handler(event: dict, context: Any): 26 """ 27 AWS Lambda function entry point. 28 29 This function processes an event to load new files, transforms the data from 30 parquet files, and uploads the processed files to its associated data warehouse table. 31 32 :param event: The event data passed to the Lambda function (as a dictionary). 33 :param context: The runtime information of the Lambda function (e.g., function name, version). 34 :return: A dictionary with the status of the operation. 35 """ 36 log_message(__name__, 10, "Entered lambda_handler") 37 38 try: 39 # Extract source bucket from event or use default 40 source_bucket = event.get("source_bucket", "onyx-processed-data-bucket") 41 42 # Initialize the S3 client 43 s3_client = boto3.client("s3") 44 45 # Call the load function 46 load(source_bucket, s3_client=s3_client) 47 48 # Log completion and return success response 49 log_message(__name__, 20, "Load process completed successfully") 50 return {"statusCode": 200, "body": "Load process completed successfully"} 51 52 except Exception as e: 53 # Log the error and return failure response 54 log_message(__name__, 40, f"Error in lambda_handler: {str(e)}") 55 return {"statusCode": 500, "body": f"Error in lambda_handler: {str(e)}"}
AWS Lambda function entry point.
This function processes an event to load new files, transforms the data from parquet files, and uploads the processed files to its associated data warehouse table.
Parameters
- event: The event data passed to the Lambda function (as a dictionary).
- context: The runtime information of the Lambda function (e.g., function name, version).
Returns
A dictionary with the status of the operation.
def
load(bucket='onyx-processed-data-bucket', s3_client=None):
58def load(bucket="onyx-processed-data-bucket", s3_client=None): 59 """ 60 Load function to process parquet files from S3 and write the data to a data warehouse. 61 62 Args: 63 bucket (str, optional): The S3 bucket name. Defaults to "onyx-processed-data-bucket". 64 s3_client (boto3.client, optional): The S3 client instance. Defaults to None. 65 """ 66 log_message(__name__, 10, "Entered load function") 67 68 # Initialize the S3 client if not provided 69 if not s3_client: 70 s3_client = boto3.client("s3") 71 72 # Retrieve the last load timestamp from S3 or set default if not present 73 try: 74 last_load_file = s3_client.get_object(Bucket=bucket, Key="last_load.txt") 75 last_load = last_load_file["Body"].read().decode("utf-8") 76 log_message(__name__, 20, f"Load function last ran at {last_load}") 77 except s3_client.exceptions.NoSuchKey: 78 last_load = "1900-01-01 00:00:00+0000" 79 log_message(__name__, 20, "Load function running for the first time") 80 81 # Write the DataFrame(s) to the data warehouse 82 try: 83 tables = [ 84 "dim_staff", 85 "dim_location", 86 "dim_counterparty", 87 "dim_currency", 88 "dim_design", 89 "dim_transaction", 90 "dim_payment_type", 91 "fact_sales_order", 92 "fact_purchase_order", 93 "fact_payment", 94 "dim_date", 95 ] 96 for table in tables: 97 98 df_list = read_parquets_from_s3(s3_client, table, last_load, bucket) 99 log_message( 100 __name__, 101 20, 102 f"Parquet file(s) for {table} read from processed data bucket", 103 ) 104 write_df_to_warehouse( 105 df_list, table, engine_string=None 106 ) # Pass engine_string if required 107 log_message(__name__, 20, f"Data written to {table} in data warehouse") 108 109 except SQLAlchemyError as e: # Handle SQLAlchemy errors specifically 110 log_message(__name__, 40, f"Warehouse write error: {str(e)}") 111 raise e 112 113 except ClientError as e: 114 log_message( 115 __name__, 116 40, 117 f"Error: {e.response['Error']['Message']}", 118 ) 119 raise e 120 121 # Update the last load timestamp in S3 122 try: 123 date = datetime.now(timezone.utc) 124 store_last_load = date.strftime("%Y-%m-%d %H:%M:%S%z") 125 s3_client.put_object(Bucket=bucket, Key="last_load.txt", Body=store_last_load) 126 log_message(__name__, 20, f"Updated last load timestamp to {store_last_load}") 127 128 except ClientError as e: 129 log_message( 130 __name__, 131 40, 132 f"Error updating last load timestamp: {e.response['Error']['Message']}", 133 )
Load function to process parquet files from S3 and write the data to a data warehouse.
Args: bucket (str, optional): The S3 bucket name. Defaults to "onyx-processed-data-bucket". s3_client (boto3.client, optional): The S3 client instance. Defaults to None.