load

  1import boto3, logging
  2from botocore.exceptions import ClientError
  3from sqlalchemy.exc import SQLAlchemyError
  4from datetime import datetime, timezone
  5from typing import Any
  6from load_utils import (
  7    log_message,
  8    read_parquets_from_s3,
  9    write_df_to_warehouse,
 10)
 11
 12# Configure logging
 13logging.basicConfig(
 14    level=logging.INFO,  # Set the minimum logging level
 15    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
 16    force=True,
 17    datefmt="%d/%m/%Y %I:%M:%S %p",
 18)
 19
 20# Create a logger instance
 21logger = logging.getLogger(__name__)
 22
 23
 24def lambda_handler(event: dict, context: Any):
 25    """
 26    AWS Lambda function entry point.
 27
 28    This function processes an event to load new files, transforms the data from
 29    parquet files, and uploads the processed files to its associated data warehouse table.
 30
 31    :param event: The event data passed to the Lambda function (as a dictionary).
 32    :param context: The runtime information of the Lambda function (e.g., function name, version).
 33    :return: A dictionary with the status of the operation.
 34    """
 35    log_message(__name__, 10, "Entered lambda_handler")
 36
 37    try:
 38        # Extract source bucket from event or use default
 39        source_bucket = event.get("source_bucket", "onyx-processed-data-bucket")
 40
 41        # Initialize the S3 client
 42        s3_client = boto3.client("s3")
 43
 44        # Call the load function
 45        load(source_bucket, s3_client=s3_client)
 46
 47        # Log completion and return success response
 48        log_message(__name__, 20, "Load process completed successfully")
 49        return {"statusCode": 200, "body": "Load process completed successfully"}
 50
 51    except Exception as e:
 52        # Log the error and return failure response
 53        log_message(__name__, 40, f"Error in lambda_handler: {str(e)}")
 54        return {"statusCode": 500, "body": f"Error in lambda_handler: {str(e)}"}
 55
 56
 57def load(bucket="onyx-processed-data-bucket", s3_client=None):
 58    """
 59    Load function to process parquet files from S3 and write the data to a data warehouse.
 60
 61    Args:
 62        bucket (str, optional): The S3 bucket name. Defaults to "onyx-processed-data-bucket".
 63        s3_client (boto3.client, optional): The S3 client instance. Defaults to None.
 64    """
 65    log_message(__name__, 10, "Entered load function")
 66
 67    # Initialize the S3 client if not provided
 68    if not s3_client:
 69        s3_client = boto3.client("s3")
 70
 71    # Retrieve the last load timestamp from S3 or set default if not present
 72    try:
 73        last_load_file = s3_client.get_object(Bucket=bucket, Key="last_load.txt")
 74        last_load = last_load_file["Body"].read().decode("utf-8")
 75        log_message(__name__, 20, f"Load function last ran at {last_load}")
 76    except s3_client.exceptions.NoSuchKey:
 77        last_load = "1900-01-01 00:00:00+0000"
 78        log_message(__name__, 20, "Load function running for the first time")
 79
 80    # Write the DataFrame(s) to the data warehouse
 81    try:
 82        tables = [
 83            "dim_staff",
 84            "dim_location",
 85            "dim_counterparty",
 86            "dim_currency",
 87            "dim_design",
 88            "dim_transaction",
 89            "dim_payment_type",
 90            "fact_sales_order",
 91            "fact_purchase_order",
 92            "fact_payment",
 93            "dim_date",
 94        ]
 95        for table in tables:
 96
 97            df_list = read_parquets_from_s3(s3_client, table, last_load, bucket)
 98            log_message(
 99                __name__,
100                20,
101                f"Parquet file(s) for {table} read from processed data bucket",
102            )
103            write_df_to_warehouse(
104                df_list, table, engine_string=None
105            )  # Pass engine_string if required
106            log_message(__name__, 20, f"Data written to {table} in data warehouse")
107
108    except SQLAlchemyError as e:  # Handle SQLAlchemy errors specifically
109        log_message(__name__, 40, f"Warehouse write error: {str(e)}")
110        raise e
111
112    except ClientError as e:
113        log_message(
114            __name__,
115            40,
116            f"Error: {e.response['Error']['Message']}",
117        )
118        raise e
119
120    # Update the last load timestamp in S3
121    try:
122        date = datetime.now(timezone.utc)
123        store_last_load = date.strftime("%Y-%m-%d %H:%M:%S%z")
124        s3_client.put_object(Bucket=bucket, Key="last_load.txt", Body=store_last_load)
125        log_message(__name__, 20, f"Updated last load timestamp to {store_last_load}")
126
127    except ClientError as e:
128        log_message(
129            __name__,
130            40,
131            f"Error updating last load timestamp: {e.response['Error']['Message']}",
132        )
logger = <Logger load (INFO)>
def lambda_handler(event: dict, context: Any):
25def lambda_handler(event: dict, context: Any):
26    """
27    AWS Lambda function entry point.
28
29    This function processes an event to load new files, transforms the data from
30    parquet files, and uploads the processed files to its associated data warehouse table.
31
32    :param event: The event data passed to the Lambda function (as a dictionary).
33    :param context: The runtime information of the Lambda function (e.g., function name, version).
34    :return: A dictionary with the status of the operation.
35    """
36    log_message(__name__, 10, "Entered lambda_handler")
37
38    try:
39        # Extract source bucket from event or use default
40        source_bucket = event.get("source_bucket", "onyx-processed-data-bucket")
41
42        # Initialize the S3 client
43        s3_client = boto3.client("s3")
44
45        # Call the load function
46        load(source_bucket, s3_client=s3_client)
47
48        # Log completion and return success response
49        log_message(__name__, 20, "Load process completed successfully")
50        return {"statusCode": 200, "body": "Load process completed successfully"}
51
52    except Exception as e:
53        # Log the error and return failure response
54        log_message(__name__, 40, f"Error in lambda_handler: {str(e)}")
55        return {"statusCode": 500, "body": f"Error in lambda_handler: {str(e)}"}

AWS Lambda function entry point.

This function processes an event to load new files, transforms the data from parquet files, and uploads the processed files to its associated data warehouse table.

Parameters
  • event: The event data passed to the Lambda function (as a dictionary).
  • context: The runtime information of the Lambda function (e.g., function name, version).
Returns

A dictionary with the status of the operation.

def load(bucket='onyx-processed-data-bucket', s3_client=None):
 58def load(bucket="onyx-processed-data-bucket", s3_client=None):
 59    """
 60    Load function to process parquet files from S3 and write the data to a data warehouse.
 61
 62    Args:
 63        bucket (str, optional): The S3 bucket name. Defaults to "onyx-processed-data-bucket".
 64        s3_client (boto3.client, optional): The S3 client instance. Defaults to None.
 65    """
 66    log_message(__name__, 10, "Entered load function")
 67
 68    # Initialize the S3 client if not provided
 69    if not s3_client:
 70        s3_client = boto3.client("s3")
 71
 72    # Retrieve the last load timestamp from S3 or set default if not present
 73    try:
 74        last_load_file = s3_client.get_object(Bucket=bucket, Key="last_load.txt")
 75        last_load = last_load_file["Body"].read().decode("utf-8")
 76        log_message(__name__, 20, f"Load function last ran at {last_load}")
 77    except s3_client.exceptions.NoSuchKey:
 78        last_load = "1900-01-01 00:00:00+0000"
 79        log_message(__name__, 20, "Load function running for the first time")
 80
 81    # Write the DataFrame(s) to the data warehouse
 82    try:
 83        tables = [
 84            "dim_staff",
 85            "dim_location",
 86            "dim_counterparty",
 87            "dim_currency",
 88            "dim_design",
 89            "dim_transaction",
 90            "dim_payment_type",
 91            "fact_sales_order",
 92            "fact_purchase_order",
 93            "fact_payment",
 94            "dim_date",
 95        ]
 96        for table in tables:
 97
 98            df_list = read_parquets_from_s3(s3_client, table, last_load, bucket)
 99            log_message(
100                __name__,
101                20,
102                f"Parquet file(s) for {table} read from processed data bucket",
103            )
104            write_df_to_warehouse(
105                df_list, table, engine_string=None
106            )  # Pass engine_string if required
107            log_message(__name__, 20, f"Data written to {table} in data warehouse")
108
109    except SQLAlchemyError as e:  # Handle SQLAlchemy errors specifically
110        log_message(__name__, 40, f"Warehouse write error: {str(e)}")
111        raise e
112
113    except ClientError as e:
114        log_message(
115            __name__,
116            40,
117            f"Error: {e.response['Error']['Message']}",
118        )
119        raise e
120
121    # Update the last load timestamp in S3
122    try:
123        date = datetime.now(timezone.utc)
124        store_last_load = date.strftime("%Y-%m-%d %H:%M:%S%z")
125        s3_client.put_object(Bucket=bucket, Key="last_load.txt", Body=store_last_load)
126        log_message(__name__, 20, f"Updated last load timestamp to {store_last_load}")
127
128    except ClientError as e:
129        log_message(
130            __name__,
131            40,
132            f"Error updating last load timestamp: {e.response['Error']['Message']}",
133        )

Load function to process parquet files from S3 and write the data to a data warehouse.

Args: bucket (str, optional): The S3 bucket name. Defaults to "onyx-processed-data-bucket". s3_client (boto3.client, optional): The S3 client instance. Defaults to None.