transform

 1import boto3, logging
 2from botocore.exceptions import ClientError
 3from typing import Any
 4from datetime import datetime
 5import pandas as pd
 6from transform_utils import (
 7    list_s3_files_by_prefix,
 8    log_message,
 9    create_df_from_json_in_bucket,
10    create_dim_date,
11    process_table,
12)
13
14
15# Configure logging
16logging.basicConfig(
17    level=logging.INFO,  # Set the minimum logging level
18    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
19    force=True,
20    datefmt="%d/%m/%Y %I:%M:%S %p",
21)
22
23# Create a logger instance
24logger = logging.getLogger(__name__)
25
26
27def lambda_handler(event: dict, context: Any):
28    """
29    AWS Lambda function entry point.
30
31    This function processes an event to extract new files, transforms the data from JSON files
32    and uploads the processed files to a specified S3 bucket. It also generates a 'dim_date'
33    parquet file and uploads it to S3.
34
35    :param event: The event data passed to the Lambda function (as a dictionary).
36    :param context: The runtime information of the Lambda function (e.g., function name, version).
37    """
38    log_message(__name__, 10, "Entered transform_lambda_handler")
39    source_bucket = event["Records"][0]["s3"]["bucket"]["name"]
40    new_file = event["Records"][0]["s3"]["object"]["key"]
41    log_message(__name__, 10, "Calling transform with " + new_file)
42    transform(source_bucket, new_file, "onyx-processed-data-bucket")
43
44
45def transform(source_bucket: str, file: str, output_bucket: str, timer: int = 60):
46    """
47    Transforms JSON files from S3 and uploads the processed files back to S3,
48    including generating dim_date separately.
49
50    Args:
51        source_bucket (str): The name of the S3 bucket containing the source JSON files.
52        file (str): str of file path (key) within the source bucket.
53        output_bucket (str): The name of the S3 bucket to upload processed files to.
54        timer (int): delay timer in order to allow files to be created before joining on another.
55    """
56    log_message(__name__, 20, "Transform started with " + file)
57
58    s3_client = boto3.client("s3")
59
60    output_bucket_contents = list_s3_files_by_prefix(output_bucket)
61    date = datetime.now()
62    date_str = date.strftime("%Y/%m/%d/%H-%M")
63
64    # Create the dim_date parquet if it does not exist
65    if not any([file.startswith("dim_date") for file in output_bucket_contents]):
66        dim_date_df = create_dim_date("2020-01-01", "2024-12-31")
67        dim_date_df.to_parquet("/tmp/dim_date.parquet")
68        s3_client.upload_file(
69            "/tmp/dim_date.parquet", output_bucket, f"dim_date/{date_str}.parquet"
70        )
71    if not file.endswith(".txt"):
72        table = file.split("/")[0]
73        df = create_df_from_json_in_bucket(source_bucket, file)
74        df, output_table = process_table(df, table, output_bucket, timer=timer)
75
76    # Save and upload the processed file
77    if output_table:
78        s3_key = f"{output_table}/{date_str}.parquet"
79        df.to_parquet(f"/tmp/{output_table}.parquet")
80        s3_client.upload_file(f"/tmp/{output_table}.parquet", output_bucket, s3_key)
81        log_message(__name__, 20, f"Uploaded {output_table} to {output_bucket}")
logger = <Logger transform (INFO)>
def lambda_handler(event: dict, context: Any):
28def lambda_handler(event: dict, context: Any):
29    """
30    AWS Lambda function entry point.
31
32    This function processes an event to extract new files, transforms the data from JSON files
33    and uploads the processed files to a specified S3 bucket. It also generates a 'dim_date'
34    parquet file and uploads it to S3.
35
36    :param event: The event data passed to the Lambda function (as a dictionary).
37    :param context: The runtime information of the Lambda function (e.g., function name, version).
38    """
39    log_message(__name__, 10, "Entered transform_lambda_handler")
40    source_bucket = event["Records"][0]["s3"]["bucket"]["name"]
41    new_file = event["Records"][0]["s3"]["object"]["key"]
42    log_message(__name__, 10, "Calling transform with " + new_file)
43    transform(source_bucket, new_file, "onyx-processed-data-bucket")

AWS Lambda function entry point.

This function processes an event to extract new files, transforms the data from JSON files and uploads the processed files to a specified S3 bucket. It also generates a 'dim_date' parquet file and uploads it to S3.

Parameters
  • event: The event data passed to the Lambda function (as a dictionary).
  • context: The runtime information of the Lambda function (e.g., function name, version).
def transform(source_bucket: str, file: str, output_bucket: str, timer: int = 60):
46def transform(source_bucket: str, file: str, output_bucket: str, timer: int = 60):
47    """
48    Transforms JSON files from S3 and uploads the processed files back to S3,
49    including generating dim_date separately.
50
51    Args:
52        source_bucket (str): The name of the S3 bucket containing the source JSON files.
53        file (str): str of file path (key) within the source bucket.
54        output_bucket (str): The name of the S3 bucket to upload processed files to.
55        timer (int): delay timer in order to allow files to be created before joining on another.
56    """
57    log_message(__name__, 20, "Transform started with " + file)
58
59    s3_client = boto3.client("s3")
60
61    output_bucket_contents = list_s3_files_by_prefix(output_bucket)
62    date = datetime.now()
63    date_str = date.strftime("%Y/%m/%d/%H-%M")
64
65    # Create the dim_date parquet if it does not exist
66    if not any([file.startswith("dim_date") for file in output_bucket_contents]):
67        dim_date_df = create_dim_date("2020-01-01", "2024-12-31")
68        dim_date_df.to_parquet("/tmp/dim_date.parquet")
69        s3_client.upload_file(
70            "/tmp/dim_date.parquet", output_bucket, f"dim_date/{date_str}.parquet"
71        )
72    if not file.endswith(".txt"):
73        table = file.split("/")[0]
74        df = create_df_from_json_in_bucket(source_bucket, file)
75        df, output_table = process_table(df, table, output_bucket, timer=timer)
76
77    # Save and upload the processed file
78    if output_table:
79        s3_key = f"{output_table}/{date_str}.parquet"
80        df.to_parquet(f"/tmp/{output_table}.parquet")
81        s3_client.upload_file(f"/tmp/{output_table}.parquet", output_bucket, s3_key)
82        log_message(__name__, 20, f"Uploaded {output_table} to {output_bucket}")

Transforms JSON files from S3 and uploads the processed files back to S3, including generating dim_date separately.

Args: source_bucket (str): The name of the S3 bucket containing the source JSON files. file (str): str of file path (key) within the source bucket. output_bucket (str): The name of the S3 bucket to upload processed files to. timer (int): delay timer in order to allow files to be created before joining on another.