transform_utils

  1import pandas as pd
  2import logging, json, boto3, time
  3from datetime import datetime
  4from typing import Optional
  5from botocore.exceptions import ClientError
  6from io import BytesIO
  7
  8pd.set_option("display.max_rows", None)
  9pd.set_option("display.max_columns", None)
 10
 11
 12def log_message(name: str, level: int, message: str = ""):
 13    """
 14    Sends a message to the logger at a specified level.
 15
 16    :param name: The name of the logger.
 17    :param level: The logging level (one of 10, 20, 30, 40, 50).
 18    :param message: The message to log.
 19    """
 20    logger = logging.getLogger(name)
 21
 22    # Define a mapping of level integers to logging methods
 23    level_map = {
 24        10: logger.debug,
 25        20: logger.info,
 26        30: logger.warning,
 27        40: logger.error,
 28        50: logger.critical,
 29    }
 30
 31    # Get the logging method from the map
 32    log_method = level_map.get(level)
 33
 34    if log_method:
 35        log_method(message)
 36    else:
 37        logger.error("Invalid log level: %d", level)
 38
 39
 40def list_s3_files_by_prefix(bucket: str, prefix: str = "", s3_client=None) -> list:
 41    """
 42    Lists files in an S3 bucket. If a prefix is provided, it filters the files by that prefix.
 43    If no prefix is provided, it lists all files in the bucket.
 44
 45    Args:
 46        bucket (str): The name of the S3 bucket.
 47        prefix (str, optional): The prefix to filter the S3 objects. Defaults to an empty string, which lists all files.
 48        s3_client (boto3.client, optional): The S3 client. If not provided, a new client will be created.
 49
 50    Returns:
 51        list: A list of keys (file paths) in the S3 bucket that match the prefix or all files if no prefix is provided.
 52    """
 53    if not s3_client:
 54        s3_client = boto3.client("s3")
 55
 56    try:
 57        response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
 58
 59        # Check if 'Contents' is in the response
 60        if "Contents" in response:
 61            file_list = [content["Key"] for content in response["Contents"]]
 62            log_message(
 63                __name__,
 64                20,
 65                f"Found {len(file_list)} files in bucket '{bucket}' with prefix '{prefix}'",
 66            )
 67            return file_list
 68        else:
 69            log_message(
 70                __name__,
 71                20,
 72                f"No files found in bucket '{bucket}' with prefix '{prefix}'",
 73            )
 74            return []
 75
 76    except Exception as e:
 77        log_message(
 78            __name__,
 79            40,
 80            f"Failed to list files in S3 bucket '{bucket}' with prefix '{prefix}': {e}",
 81        )
 82        return []
 83
 84
 85def create_df_from_json_in_bucket(
 86    source_bucket: str, file_name: str, s3_client=None
 87) -> Optional[pd.DataFrame]:
 88    """
 89    Reads a JSON file from an S3 bucket and converts it to a pandas DataFrame.
 90
 91    Args:
 92        source_bucket (str): The name of the S3 bucket.
 93        file_name (str): The key (path) of the JSON file within the S3 bucket.
 94        s3_client (client): AWS S3 client.
 95
 96    Returns:
 97        Optional[pd.DataFrame]: A DataFrame containing the data from the JSON file,
 98        or None if there are issues with the file or its content.
 99    """
100    if not s3_client:
101        s3_client = boto3.client("s3")
102
103    if not file_name.endswith(".json"):
104        log_message(__name__, 20, f"File {file_name} is not a JSON file.")
105        return None
106
107    try:
108        # Retrieve the JSON file from S3
109        json_file = s3_client.get_object(Bucket=source_bucket, Key=file_name)
110        json_contents = json_file["Body"].read().decode("utf-8")
111        json_data = json.loads(json_contents)
112
113        # Determine the table name from the file path
114        table = file_name.split("/")[0]
115        data = json_data.get(table, [])
116
117        if not data:
118            log_message(
119                __name__, 30, f"No data found for table {table} in file {file_name}."
120            )
121            return None
122
123        df = pd.DataFrame(data)
124        return df
125
126    except json.JSONDecodeError as e:
127        log_message(__name__, 40, f"JSON decoding error for file {file_name}: {e}")
128    except ClientError as e:
129        log_message(
130            __name__,
131            40,
132            f"Client error when accessing {file_name} in bucket {source_bucket}: {e}",
133        )
134    except Exception as e:
135        log_message(
136            __name__,
137            40,
138            f"Unexpected error reading or processing file {file_name}: {e}",
139        )
140
141
142def create_dim_date(start_date: str, end_date: str) -> pd.DataFrame:
143    """
144    Creates a dimension date DataFrame for a given range of dates.
145
146    Args:
147        start_date (str): Start date in "YYYY-MM-DD" format.
148        end_date (str): End date in "YYYY-MM-DD" format.
149
150    Returns:
151        pd.DataFrame: DataFrame containing the date dimension table.
152    """
153    try:
154        start = datetime.strptime(start_date, "%Y-%m-%d")
155        end = datetime.strptime(end_date, "%Y-%m-%d")
156        date_range = pd.date_range(start=start, end=end)
157
158        dim_date = pd.DataFrame(date_range, columns=["date_id"])
159        dim_date["year"] = dim_date["date_id"].dt.year
160        dim_date["month"] = dim_date["date_id"].dt.month
161        dim_date["day"] = dim_date["date_id"].dt.day
162        dim_date["day_of_week"] = dim_date["date_id"].dt.weekday + 1
163        dim_date["day_name"] = dim_date["date_id"].dt.day_name()
164        dim_date["month_name"] = dim_date["date_id"].dt.month_name()
165        dim_date["quarter"] = dim_date["date_id"].dt.quarter
166
167        dim_date["date_id"] = dim_date["date_id"].dt.date
168
169        return dim_date
170
171    except Exception as e:
172        log_message(__name__, 40, f"Error creating dim_date DataFrame: {e}")
173        raise
174
175
176def process_table(
177    df: pd.DataFrame, file: str, bucket: str, timer: int = 60, s3_client=None
178):
179    """
180    Process specific table based on the file name, converts it to a dataframe.
181
182    Args:
183        source_bucket (str): The name of the S3 bucket containing the source JSON files.
184        file (str): str of file path (key) within the source bucket.
185        output_bucket (str): The name of the S3 bucket to upload processed files to.
186        timer (int): delay timer in order to allow files to be created before joining on another.
187
188    Returns:
189        pd.DataFrame: DataFrame containing the processed dim or fact table.
190    """
191    if not s3_client:
192        s3_client = boto3.client("s3")
193
194    table = file.split("/")[0]
195    output_table = ""
196
197    try:
198        if table == "address":
199            df = df.rename(columns={"address_id": "location_id"}).drop(
200                ["created_at", "last_updated"], axis=1
201            )
202            output_table = "dim_location"
203
204        elif table == "design":
205            df = df.drop(["created_at", "last_updated"], axis=1)
206            output_table = "dim_design"
207
208        elif table == "department":
209            df = df.drop(["created_at", "last_updated"], axis=1)
210            output_table = "dim_department"
211
212        elif table == "payment_type":
213            df = df.drop(["created_at", "last_updated"], axis=1)
214            output_table = "dim_payment_type"
215
216        elif table == "transaction":
217            df = df.drop(["created_at", "last_updated"], axis=1)
218            output_table = "dim_transaction"
219
220        elif table == "currency":
221            currency_mapping = {
222                "GBP": "British Pound Sterling",
223                "USD": "United States Dollar",
224                "EUR": "Euros",
225            }
226            df = df.drop(["created_at", "last_updated"], axis=1).assign(
227                currency_name=df["currency_code"].map(currency_mapping)
228            )
229            if df["currency_name"].isnull().any():
230                log_message(__name__, 30, f"Unmapped currency codes found in {file}.")
231            output_table = "dim_currency"
232
233        elif table == "counterparty":  # combine counterparty with address table
234            log_message(__name__, 20, "Entered counterparty inside process_table")
235            time.sleep(timer)
236            dim_location_df = combine_parquet_from_s3(bucket, "dim_location")
237            df = df.merge(
238                dim_location_df,
239                how="inner",
240                left_on="legal_address_id",
241                right_on="location_id",
242            )
243            df = df.drop(
244                [
245                    "commercial_contact",
246                    "delivery_contact",
247                    "created_at",
248                    "last_updated",
249                    "legal_address_id",
250                    "location_id",
251                ],
252                axis=1,
253            )
254            df = df.rename(
255                columns={
256                    "address_line_1": "counterparty_legal_address_line_1",
257                    "address_line_2": "counterparty_legal_address_line_2",
258                    "district": "counterparty_legal_district",
259                    "city": "counterparty_legal_city",
260                    "postal_code": "counterparty_legal_postal_code",
261                    "country": "counterparty_legal_country",
262                    "phone": "counterparty_legal_phone_number",
263                }
264            )
265            output_table = "dim_counterparty"
266
267        elif table == "staff":
268            log_message(__name__, 10, "Entered staff inside process_table")
269            time.sleep(timer)
270            dim_department_df = combine_parquet_from_s3(bucket, "dim_department")
271            df = df.merge(dim_department_df, how="inner", on="department_id")
272            log_message(__name__, 10, "merged staff data frame with dim_department")
273            df = df.drop(
274                [
275                    "department_id",
276                    "created_at",
277                    "last_updated",
278                ],
279                axis=1,
280            )
281
282            log_message(__name__, 10, "created staff data frame")
283            output_table = "dim_staff"
284
285        elif table == "sales_order":
286            # split the Name column into two columns using pd.Series.str.split()
287            df[["created_date", "created_time"]] = df["created_at"].str.split(
288                " ", expand=True
289            )
290            df[["last_updated_date", "last_updated_time"]] = df[
291                "last_updated"
292            ].str.split(" ", expand=True)
293            df = df.drop(
294                [
295                    "created_at",
296                    "last_updated",
297                ],
298                axis=1,
299            )
300            df = df.rename(
301                columns={
302                    "staff_id": "sales_staff_id",
303                }
304            )
305            output_table = "fact_sales_order"
306
307        elif table == "purchase_order":
308            # split the Name column into two columns using pd.Series.str.split()
309            df[["created_date", "created_time"]] = df["created_at"].str.split(
310                " ", expand=True
311            )
312            df[["last_updated_date", "last_updated_time"]] = df[
313                "last_updated"
314            ].str.split(" ", expand=True)
315            df = df.drop(
316                [
317                    "created_at",
318                    "last_updated",
319                ],
320                axis=1,
321            )
322            output_table = "fact_purchase_order"
323
324        elif table == "payment":
325            # split the Name column into two columns using pd.Series.str.split()
326            df[["created_date", "created_time"]] = df["created_at"].str.split(
327                " ", expand=True
328            )
329            df[["last_updated_date", "last_updated_time"]] = df[
330                "last_updated"
331            ].str.split(" ", expand=True)
332            df = df.drop(
333                [
334                    "company_ac_number",
335                    "counterparty_ac_number",
336                    "created_at",
337                    "last_updated",
338                ],
339                axis=1,
340            )
341            output_table = "fact_payment"
342
343        else:
344            log_message(
345                __name__, 20, f"Unknown table encountered: {table}, skipping..."
346            )
347            return
348
349        return (df, output_table)
350
351    except Exception as e:
352        log_message(__name__, 40, f"Error processing table {table}: {e}")
353
354
355def combine_parquet_from_s3(bucket: str, directory: str, s3_client=None):
356    """
357    Reads a S3 bucket containing parquet files and combines them into a dataframe
358    Removes duplicate rows and keeps the last modification.
359
360    Args:
361        bucket (str): The name of the S3 bucket containing the source parquet files.
362        directory (str): The directory (table name) containing the parquet files.
363        s3_client (optional): AWS S3 client.
364
365    Returns:
366        Optional[pd.DataFrame]: A DataFrame containing the data from the combined parquet files,
367        or None if there are issues with the files or its content.
368    """
369    log_message(__name__, 20, "Entered combine_parquet_from_s3")
370    if not s3_client:
371        s3_client = boto3.client("s3")
372    directory_files = list_s3_files_by_prefix(bucket, directory)
373    sorted_directory_files = sorted(directory_files)
374    dfs = []
375    for file in sorted_directory_files:
376        response = s3_client.get_object(Bucket=bucket, Key=file)
377        data = response["Body"].read()
378        df = pd.read_parquet(BytesIO(data))
379        dfs.append(df)
380    log_message(__name__, 10, "appended dfs in combine_parquet_from_s3")
381    combined_df = pd.concat(dfs, ignore_index=True)
382    combined_df.drop_duplicates(keep="last", inplace=True)
383    log_message(__name__, 10, "concatted and dropped dups")
384    return combined_df
def log_message(name: str, level: int, message: str = ''):
13def log_message(name: str, level: int, message: str = ""):
14    """
15    Sends a message to the logger at a specified level.
16
17    :param name: The name of the logger.
18    :param level: The logging level (one of 10, 20, 30, 40, 50).
19    :param message: The message to log.
20    """
21    logger = logging.getLogger(name)
22
23    # Define a mapping of level integers to logging methods
24    level_map = {
25        10: logger.debug,
26        20: logger.info,
27        30: logger.warning,
28        40: logger.error,
29        50: logger.critical,
30    }
31
32    # Get the logging method from the map
33    log_method = level_map.get(level)
34
35    if log_method:
36        log_method(message)
37    else:
38        logger.error("Invalid log level: %d", level)

Sends a message to the logger at a specified level.

Parameters
  • name: The name of the logger.
  • level: The logging level (one of 10, 20, 30, 40, 50).
  • message: The message to log.
def list_s3_files_by_prefix(bucket: str, prefix: str = '', s3_client=None) -> list:
41def list_s3_files_by_prefix(bucket: str, prefix: str = "", s3_client=None) -> list:
42    """
43    Lists files in an S3 bucket. If a prefix is provided, it filters the files by that prefix.
44    If no prefix is provided, it lists all files in the bucket.
45
46    Args:
47        bucket (str): The name of the S3 bucket.
48        prefix (str, optional): The prefix to filter the S3 objects. Defaults to an empty string, which lists all files.
49        s3_client (boto3.client, optional): The S3 client. If not provided, a new client will be created.
50
51    Returns:
52        list: A list of keys (file paths) in the S3 bucket that match the prefix or all files if no prefix is provided.
53    """
54    if not s3_client:
55        s3_client = boto3.client("s3")
56
57    try:
58        response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
59
60        # Check if 'Contents' is in the response
61        if "Contents" in response:
62            file_list = [content["Key"] for content in response["Contents"]]
63            log_message(
64                __name__,
65                20,
66                f"Found {len(file_list)} files in bucket '{bucket}' with prefix '{prefix}'",
67            )
68            return file_list
69        else:
70            log_message(
71                __name__,
72                20,
73                f"No files found in bucket '{bucket}' with prefix '{prefix}'",
74            )
75            return []
76
77    except Exception as e:
78        log_message(
79            __name__,
80            40,
81            f"Failed to list files in S3 bucket '{bucket}' with prefix '{prefix}': {e}",
82        )
83        return []

Lists files in an S3 bucket. If a prefix is provided, it filters the files by that prefix. If no prefix is provided, it lists all files in the bucket.

Args: bucket (str): The name of the S3 bucket. prefix (str, optional): The prefix to filter the S3 objects. Defaults to an empty string, which lists all files. s3_client (boto3.client, optional): The S3 client. If not provided, a new client will be created.

Returns: list: A list of keys (file paths) in the S3 bucket that match the prefix or all files if no prefix is provided.

def create_df_from_json_in_bucket( source_bucket: str, file_name: str, s3_client=None) -> Optional[pandas.core.frame.DataFrame]:
 86def create_df_from_json_in_bucket(
 87    source_bucket: str, file_name: str, s3_client=None
 88) -> Optional[pd.DataFrame]:
 89    """
 90    Reads a JSON file from an S3 bucket and converts it to a pandas DataFrame.
 91
 92    Args:
 93        source_bucket (str): The name of the S3 bucket.
 94        file_name (str): The key (path) of the JSON file within the S3 bucket.
 95        s3_client (client): AWS S3 client.
 96
 97    Returns:
 98        Optional[pd.DataFrame]: A DataFrame containing the data from the JSON file,
 99        or None if there are issues with the file or its content.
100    """
101    if not s3_client:
102        s3_client = boto3.client("s3")
103
104    if not file_name.endswith(".json"):
105        log_message(__name__, 20, f"File {file_name} is not a JSON file.")
106        return None
107
108    try:
109        # Retrieve the JSON file from S3
110        json_file = s3_client.get_object(Bucket=source_bucket, Key=file_name)
111        json_contents = json_file["Body"].read().decode("utf-8")
112        json_data = json.loads(json_contents)
113
114        # Determine the table name from the file path
115        table = file_name.split("/")[0]
116        data = json_data.get(table, [])
117
118        if not data:
119            log_message(
120                __name__, 30, f"No data found for table {table} in file {file_name}."
121            )
122            return None
123
124        df = pd.DataFrame(data)
125        return df
126
127    except json.JSONDecodeError as e:
128        log_message(__name__, 40, f"JSON decoding error for file {file_name}: {e}")
129    except ClientError as e:
130        log_message(
131            __name__,
132            40,
133            f"Client error when accessing {file_name} in bucket {source_bucket}: {e}",
134        )
135    except Exception as e:
136        log_message(
137            __name__,
138            40,
139            f"Unexpected error reading or processing file {file_name}: {e}",
140        )

Reads a JSON file from an S3 bucket and converts it to a pandas DataFrame.

Args: source_bucket (str): The name of the S3 bucket. file_name (str): The key (path) of the JSON file within the S3 bucket. s3_client (client): AWS S3 client.

Returns: Optional[pd.DataFrame]: A DataFrame containing the data from the JSON file, or None if there are issues with the file or its content.

def create_dim_date(start_date: str, end_date: str) -> pandas.core.frame.DataFrame:
143def create_dim_date(start_date: str, end_date: str) -> pd.DataFrame:
144    """
145    Creates a dimension date DataFrame for a given range of dates.
146
147    Args:
148        start_date (str): Start date in "YYYY-MM-DD" format.
149        end_date (str): End date in "YYYY-MM-DD" format.
150
151    Returns:
152        pd.DataFrame: DataFrame containing the date dimension table.
153    """
154    try:
155        start = datetime.strptime(start_date, "%Y-%m-%d")
156        end = datetime.strptime(end_date, "%Y-%m-%d")
157        date_range = pd.date_range(start=start, end=end)
158
159        dim_date = pd.DataFrame(date_range, columns=["date_id"])
160        dim_date["year"] = dim_date["date_id"].dt.year
161        dim_date["month"] = dim_date["date_id"].dt.month
162        dim_date["day"] = dim_date["date_id"].dt.day
163        dim_date["day_of_week"] = dim_date["date_id"].dt.weekday + 1
164        dim_date["day_name"] = dim_date["date_id"].dt.day_name()
165        dim_date["month_name"] = dim_date["date_id"].dt.month_name()
166        dim_date["quarter"] = dim_date["date_id"].dt.quarter
167
168        dim_date["date_id"] = dim_date["date_id"].dt.date
169
170        return dim_date
171
172    except Exception as e:
173        log_message(__name__, 40, f"Error creating dim_date DataFrame: {e}")
174        raise

Creates a dimension date DataFrame for a given range of dates.

Args: start_date (str): Start date in "YYYY-MM-DD" format. end_date (str): End date in "YYYY-MM-DD" format.

Returns: pd.DataFrame: DataFrame containing the date dimension table.

def process_table( df: pandas.core.frame.DataFrame, file: str, bucket: str, timer: int = 60, s3_client=None):
177def process_table(
178    df: pd.DataFrame, file: str, bucket: str, timer: int = 60, s3_client=None
179):
180    """
181    Process specific table based on the file name, converts it to a dataframe.
182
183    Args:
184        source_bucket (str): The name of the S3 bucket containing the source JSON files.
185        file (str): str of file path (key) within the source bucket.
186        output_bucket (str): The name of the S3 bucket to upload processed files to.
187        timer (int): delay timer in order to allow files to be created before joining on another.
188
189    Returns:
190        pd.DataFrame: DataFrame containing the processed dim or fact table.
191    """
192    if not s3_client:
193        s3_client = boto3.client("s3")
194
195    table = file.split("/")[0]
196    output_table = ""
197
198    try:
199        if table == "address":
200            df = df.rename(columns={"address_id": "location_id"}).drop(
201                ["created_at", "last_updated"], axis=1
202            )
203            output_table = "dim_location"
204
205        elif table == "design":
206            df = df.drop(["created_at", "last_updated"], axis=1)
207            output_table = "dim_design"
208
209        elif table == "department":
210            df = df.drop(["created_at", "last_updated"], axis=1)
211            output_table = "dim_department"
212
213        elif table == "payment_type":
214            df = df.drop(["created_at", "last_updated"], axis=1)
215            output_table = "dim_payment_type"
216
217        elif table == "transaction":
218            df = df.drop(["created_at", "last_updated"], axis=1)
219            output_table = "dim_transaction"
220
221        elif table == "currency":
222            currency_mapping = {
223                "GBP": "British Pound Sterling",
224                "USD": "United States Dollar",
225                "EUR": "Euros",
226            }
227            df = df.drop(["created_at", "last_updated"], axis=1).assign(
228                currency_name=df["currency_code"].map(currency_mapping)
229            )
230            if df["currency_name"].isnull().any():
231                log_message(__name__, 30, f"Unmapped currency codes found in {file}.")
232            output_table = "dim_currency"
233
234        elif table == "counterparty":  # combine counterparty with address table
235            log_message(__name__, 20, "Entered counterparty inside process_table")
236            time.sleep(timer)
237            dim_location_df = combine_parquet_from_s3(bucket, "dim_location")
238            df = df.merge(
239                dim_location_df,
240                how="inner",
241                left_on="legal_address_id",
242                right_on="location_id",
243            )
244            df = df.drop(
245                [
246                    "commercial_contact",
247                    "delivery_contact",
248                    "created_at",
249                    "last_updated",
250                    "legal_address_id",
251                    "location_id",
252                ],
253                axis=1,
254            )
255            df = df.rename(
256                columns={
257                    "address_line_1": "counterparty_legal_address_line_1",
258                    "address_line_2": "counterparty_legal_address_line_2",
259                    "district": "counterparty_legal_district",
260                    "city": "counterparty_legal_city",
261                    "postal_code": "counterparty_legal_postal_code",
262                    "country": "counterparty_legal_country",
263                    "phone": "counterparty_legal_phone_number",
264                }
265            )
266            output_table = "dim_counterparty"
267
268        elif table == "staff":
269            log_message(__name__, 10, "Entered staff inside process_table")
270            time.sleep(timer)
271            dim_department_df = combine_parquet_from_s3(bucket, "dim_department")
272            df = df.merge(dim_department_df, how="inner", on="department_id")
273            log_message(__name__, 10, "merged staff data frame with dim_department")
274            df = df.drop(
275                [
276                    "department_id",
277                    "created_at",
278                    "last_updated",
279                ],
280                axis=1,
281            )
282
283            log_message(__name__, 10, "created staff data frame")
284            output_table = "dim_staff"
285
286        elif table == "sales_order":
287            # split the Name column into two columns using pd.Series.str.split()
288            df[["created_date", "created_time"]] = df["created_at"].str.split(
289                " ", expand=True
290            )
291            df[["last_updated_date", "last_updated_time"]] = df[
292                "last_updated"
293            ].str.split(" ", expand=True)
294            df = df.drop(
295                [
296                    "created_at",
297                    "last_updated",
298                ],
299                axis=1,
300            )
301            df = df.rename(
302                columns={
303                    "staff_id": "sales_staff_id",
304                }
305            )
306            output_table = "fact_sales_order"
307
308        elif table == "purchase_order":
309            # split the Name column into two columns using pd.Series.str.split()
310            df[["created_date", "created_time"]] = df["created_at"].str.split(
311                " ", expand=True
312            )
313            df[["last_updated_date", "last_updated_time"]] = df[
314                "last_updated"
315            ].str.split(" ", expand=True)
316            df = df.drop(
317                [
318                    "created_at",
319                    "last_updated",
320                ],
321                axis=1,
322            )
323            output_table = "fact_purchase_order"
324
325        elif table == "payment":
326            # split the Name column into two columns using pd.Series.str.split()
327            df[["created_date", "created_time"]] = df["created_at"].str.split(
328                " ", expand=True
329            )
330            df[["last_updated_date", "last_updated_time"]] = df[
331                "last_updated"
332            ].str.split(" ", expand=True)
333            df = df.drop(
334                [
335                    "company_ac_number",
336                    "counterparty_ac_number",
337                    "created_at",
338                    "last_updated",
339                ],
340                axis=1,
341            )
342            output_table = "fact_payment"
343
344        else:
345            log_message(
346                __name__, 20, f"Unknown table encountered: {table}, skipping..."
347            )
348            return
349
350        return (df, output_table)
351
352    except Exception as e:
353        log_message(__name__, 40, f"Error processing table {table}: {e}")

Process specific table based on the file name, converts it to a dataframe.

Args: source_bucket (str): The name of the S3 bucket containing the source JSON files. file (str): str of file path (key) within the source bucket. output_bucket (str): The name of the S3 bucket to upload processed files to. timer (int): delay timer in order to allow files to be created before joining on another.

Returns: pd.DataFrame: DataFrame containing the processed dim or fact table.

def combine_parquet_from_s3(bucket: str, directory: str, s3_client=None):
356def combine_parquet_from_s3(bucket: str, directory: str, s3_client=None):
357    """
358    Reads a S3 bucket containing parquet files and combines them into a dataframe
359    Removes duplicate rows and keeps the last modification.
360
361    Args:
362        bucket (str): The name of the S3 bucket containing the source parquet files.
363        directory (str): The directory (table name) containing the parquet files.
364        s3_client (optional): AWS S3 client.
365
366    Returns:
367        Optional[pd.DataFrame]: A DataFrame containing the data from the combined parquet files,
368        or None if there are issues with the files or its content.
369    """
370    log_message(__name__, 20, "Entered combine_parquet_from_s3")
371    if not s3_client:
372        s3_client = boto3.client("s3")
373    directory_files = list_s3_files_by_prefix(bucket, directory)
374    sorted_directory_files = sorted(directory_files)
375    dfs = []
376    for file in sorted_directory_files:
377        response = s3_client.get_object(Bucket=bucket, Key=file)
378        data = response["Body"].read()
379        df = pd.read_parquet(BytesIO(data))
380        dfs.append(df)
381    log_message(__name__, 10, "appended dfs in combine_parquet_from_s3")
382    combined_df = pd.concat(dfs, ignore_index=True)
383    combined_df.drop_duplicates(keep="last", inplace=True)
384    log_message(__name__, 10, "concatted and dropped dups")
385    return combined_df

Reads a S3 bucket containing parquet files and combines them into a dataframe Removes duplicate rows and keeps the last modification.

Args: bucket (str): The name of the S3 bucket containing the source parquet files. directory (str): The directory (table name) containing the parquet files. s3_client (optional): AWS S3 client.

Returns: Optional[pd.DataFrame]: A DataFrame containing the data from the combined parquet files, or None if there are issues with the files or its content.