transform
1import boto3, logging 2from botocore.exceptions import ClientError 3from typing import Any 4from datetime import datetime 5import pandas as pd 6from transform_utils import ( 7 list_s3_files_by_prefix, 8 log_message, 9 create_df_from_json_in_bucket, 10 create_dim_date, 11 process_table, 12) 13 14 15# Configure logging 16logging.basicConfig( 17 level=logging.INFO, # Set the minimum logging level 18 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 19 force=True, 20 datefmt="%d/%m/%Y %I:%M:%S %p", 21) 22 23# Create a logger instance 24logger = logging.getLogger(__name__) 25 26 27def lambda_handler(event: dict, context: Any): 28 """ 29 AWS Lambda function entry point. 30 31 This function processes an event to extract new files, transforms the data from JSON files 32 and uploads the processed files to a specified S3 bucket. It also generates a 'dim_date' 33 parquet file and uploads it to S3. 34 35 :param event: The event data passed to the Lambda function (as a dictionary). 36 :param context: The runtime information of the Lambda function (e.g., function name, version). 37 """ 38 log_message(__name__, 10, "Entered transform_lambda_handler") 39 source_bucket = event["Records"][0]["s3"]["bucket"]["name"] 40 new_file = event["Records"][0]["s3"]["object"]["key"] 41 log_message(__name__, 10, "Calling transform with " + new_file) 42 transform(source_bucket, new_file, "onyx-processed-data-bucket") 43 44 45def transform(source_bucket: str, file: str, output_bucket: str, timer: int = 60): 46 """ 47 Transforms JSON files from S3 and uploads the processed files back to S3, 48 including generating dim_date separately. 49 50 Args: 51 source_bucket (str): The name of the S3 bucket containing the source JSON files. 52 file (str): str of file path (key) within the source bucket. 53 output_bucket (str): The name of the S3 bucket to upload processed files to. 54 timer (int): delay timer in order to allow files to be created before joining on another. 55 """ 56 log_message(__name__, 20, "Transform started with " + file) 57 58 s3_client = boto3.client("s3") 59 60 output_bucket_contents = list_s3_files_by_prefix(output_bucket) 61 date = datetime.now() 62 date_str = date.strftime("%Y/%m/%d/%H-%M") 63 64 # Create the dim_date parquet if it does not exist 65 if not any([file.startswith("dim_date") for file in output_bucket_contents]): 66 dim_date_df = create_dim_date("2020-01-01", "2024-12-31") 67 dim_date_df.to_parquet("/tmp/dim_date.parquet") 68 s3_client.upload_file( 69 "/tmp/dim_date.parquet", output_bucket, f"dim_date/{date_str}.parquet" 70 ) 71 if not file.endswith(".txt"): 72 table = file.split("/")[0] 73 df = create_df_from_json_in_bucket(source_bucket, file) 74 df, output_table = process_table(df, table, output_bucket, timer=timer) 75 76 # Save and upload the processed file 77 if output_table: 78 s3_key = f"{output_table}/{date_str}.parquet" 79 df.to_parquet(f"/tmp/{output_table}.parquet") 80 s3_client.upload_file(f"/tmp/{output_table}.parquet", output_bucket, s3_key) 81 log_message(__name__, 20, f"Uploaded {output_table} to {output_bucket}")
logger =
<Logger transform (INFO)>
def
lambda_handler(event: dict, context: Any):
28def lambda_handler(event: dict, context: Any): 29 """ 30 AWS Lambda function entry point. 31 32 This function processes an event to extract new files, transforms the data from JSON files 33 and uploads the processed files to a specified S3 bucket. It also generates a 'dim_date' 34 parquet file and uploads it to S3. 35 36 :param event: The event data passed to the Lambda function (as a dictionary). 37 :param context: The runtime information of the Lambda function (e.g., function name, version). 38 """ 39 log_message(__name__, 10, "Entered transform_lambda_handler") 40 source_bucket = event["Records"][0]["s3"]["bucket"]["name"] 41 new_file = event["Records"][0]["s3"]["object"]["key"] 42 log_message(__name__, 10, "Calling transform with " + new_file) 43 transform(source_bucket, new_file, "onyx-processed-data-bucket")
AWS Lambda function entry point.
This function processes an event to extract new files, transforms the data from JSON files and uploads the processed files to a specified S3 bucket. It also generates a 'dim_date' parquet file and uploads it to S3.
Parameters
- event: The event data passed to the Lambda function (as a dictionary).
- context: The runtime information of the Lambda function (e.g., function name, version).
def
transform(source_bucket: str, file: str, output_bucket: str, timer: int = 60):
46def transform(source_bucket: str, file: str, output_bucket: str, timer: int = 60): 47 """ 48 Transforms JSON files from S3 and uploads the processed files back to S3, 49 including generating dim_date separately. 50 51 Args: 52 source_bucket (str): The name of the S3 bucket containing the source JSON files. 53 file (str): str of file path (key) within the source bucket. 54 output_bucket (str): The name of the S3 bucket to upload processed files to. 55 timer (int): delay timer in order to allow files to be created before joining on another. 56 """ 57 log_message(__name__, 20, "Transform started with " + file) 58 59 s3_client = boto3.client("s3") 60 61 output_bucket_contents = list_s3_files_by_prefix(output_bucket) 62 date = datetime.now() 63 date_str = date.strftime("%Y/%m/%d/%H-%M") 64 65 # Create the dim_date parquet if it does not exist 66 if not any([file.startswith("dim_date") for file in output_bucket_contents]): 67 dim_date_df = create_dim_date("2020-01-01", "2024-12-31") 68 dim_date_df.to_parquet("/tmp/dim_date.parquet") 69 s3_client.upload_file( 70 "/tmp/dim_date.parquet", output_bucket, f"dim_date/{date_str}.parquet" 71 ) 72 if not file.endswith(".txt"): 73 table = file.split("/")[0] 74 df = create_df_from_json_in_bucket(source_bucket, file) 75 df, output_table = process_table(df, table, output_bucket, timer=timer) 76 77 # Save and upload the processed file 78 if output_table: 79 s3_key = f"{output_table}/{date_str}.parquet" 80 df.to_parquet(f"/tmp/{output_table}.parquet") 81 s3_client.upload_file(f"/tmp/{output_table}.parquet", output_bucket, s3_key) 82 log_message(__name__, 20, f"Uploaded {output_table} to {output_bucket}")
Transforms JSON files from S3 and uploads the processed files back to S3, including generating dim_date separately.
Args: source_bucket (str): The name of the S3 bucket containing the source JSON files. file (str): str of file path (key) within the source bucket. output_bucket (str): The name of the S3 bucket to upload processed files to. timer (int): delay timer in order to allow files to be created before joining on another.