extract

  1import json, os, boto3, logging
  2from datetime import datetime
  3from botocore.exceptions import ClientError
  4from typing import Any
  5from extract_utils import format_response, log_message
  6from connection import connect_to_db
  7
  8
  9# Configure logging
 10logging.basicConfig(
 11    level=logging.INFO,  # Set the minimum logging level
 12    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
 13    force=True,
 14    datefmt="%d/%m/%Y %I:%M:%S %p",
 15)
 16
 17# Create a logger instance
 18logger = logging.getLogger(__name__)
 19
 20
 21def lambda_handler(event: dict, context: Any):
 22    """
 23    AWS Lambda function entry point.
 24
 25    This function is triggered by an event and context, extracts data from a database,
 26    and stores the data in an S3 bucket. The bucket name is retrieved from environment variables.
 27
 28    :param event: The event data passed to the Lambda function (as a dictionary).
 29    :param context: The runtime information of the Lambda function (e.g., function name, version).
 30    """
 31    log_message(__name__, 10, "Entered lambda_handler")
 32    bucket_name = os.environ.get("S3_BUCKET_NAME")
 33    extract(bucket_name)
 34
 35
 36def extract(bucket: str, s3_client=None):
 37    """
 38    Extracts data from a database and stores it in an S3 bucket.
 39
 40    Connects to a database, retrieves data from specified tables, formats the data,
 41    and uploads it to an S3 bucket. If a last extract timestamp is found, only new
 42    or updated records are retrieved. Updates the timestamp of the last extraction in S3.
 43
 44    :param bucket: The name of the S3 bucket where data will be stored.
 45    :param s3_client: Optional S3 client to use for interactions with S3. If not provided, a new client is created.
 46
 47    :raises ClientError: If there is an issue with S3 operations or if the specified bucket or object does not exist.
 48    """
 49    log_message(__name__, 10, "Entered extract function")
 50    if not s3_client:
 51        s3_client = boto3.client("s3")
 52    conn = None
 53
 54    try:
 55        conn = connect_to_db()
 56        log_message(__name__, 20, "Connection to DB made")
 57
 58        date = datetime.now()
 59        date_str = date.strftime("%Y/%m/%d/%H-%M")
 60
 61        try:
 62            last_extract_file = s3_client.get_object(
 63                Bucket=bucket, Key="last_extract.txt"
 64            )
 65            last_extract = last_extract_file["Body"].read().decode("utf-8")
 66            log_message(__name__, 20, f"Extract function last ran at {last_extract}")
 67        except s3_client.exceptions.NoSuchKey:
 68            last_extract = None
 69            log_message(__name__, 20, "Extract function running for the first time")
 70
 71        totesys_table_list = [
 72            "purchase_order",
 73            "payment",
 74            "currency",
 75            "address",
 76            "design",
 77            "transaction",
 78            "sales_order",
 79            "counterparty",
 80            "staff",
 81            "payment_type",
 82            "department",
 83        ]
 84        for table in totesys_table_list:
 85            query = f"SELECT * FROM {table} "
 86            if last_extract:
 87                # Add check to compare new data with old data and update if there are updates.
 88                query += f"WHERE last_updated > '{last_extract}'"
 89            query += ";"
 90
 91            response = conn.run(query)
 92            # If response doesn't have modified data, don't upload file.
 93            if len(response):
 94                columns = [col["name"] for col in conn.columns]
 95                formatted_response = {table: format_response(columns, response)}
 96                extracted_json = json.dumps(formatted_response, indent=4)
 97                s3_key = f"{table}/{date_str}.json"
 98                s3_client.put_object(Bucket=bucket, Key=s3_key, Body=extracted_json)
 99
100                log_message(__name__, 20, f"{s3_key} was written to {bucket}")
101
102                # # compile all tables to add to updated-data.json
103                # if not last_extract:
104                #     compiled_data = formatted_response
105
106                # # if new data found, update the updated-data.json
107                # if "WHERE" in query:
108
109                #     pass
110
111        # # keep a json file to add updated data to.
112        # if not last_extract:
113        #     updated_data_key = "updated-data.json"
114        #     s3_client.put_object(Bucket=bucket, Key=updated_data_key, Body=extracted_json)
115
116        store_last_extract = date.strftime("%Y-%m-%d %H:%M:%S")
117        s3_client.put_object(
118            Bucket=bucket, Key="last_extract.txt", Body=store_last_extract
119        )
120
121    except ClientError as e:
122        log_message(__name__, 40, f"Error: {e.response['Error']['Message']}")
123
124    finally:
125        if conn:
126            conn.close()
127            log_message(__name__, 20, "DB connection closed")
logger = <Logger extract (INFO)>
def lambda_handler(event: dict, context: Any):
22def lambda_handler(event: dict, context: Any):
23    """
24    AWS Lambda function entry point.
25
26    This function is triggered by an event and context, extracts data from a database,
27    and stores the data in an S3 bucket. The bucket name is retrieved from environment variables.
28
29    :param event: The event data passed to the Lambda function (as a dictionary).
30    :param context: The runtime information of the Lambda function (e.g., function name, version).
31    """
32    log_message(__name__, 10, "Entered lambda_handler")
33    bucket_name = os.environ.get("S3_BUCKET_NAME")
34    extract(bucket_name)

AWS Lambda function entry point.

This function is triggered by an event and context, extracts data from a database, and stores the data in an S3 bucket. The bucket name is retrieved from environment variables.

Parameters
  • event: The event data passed to the Lambda function (as a dictionary).
  • context: The runtime information of the Lambda function (e.g., function name, version).
def extract(bucket: str, s3_client=None):
 37def extract(bucket: str, s3_client=None):
 38    """
 39    Extracts data from a database and stores it in an S3 bucket.
 40
 41    Connects to a database, retrieves data from specified tables, formats the data,
 42    and uploads it to an S3 bucket. If a last extract timestamp is found, only new
 43    or updated records are retrieved. Updates the timestamp of the last extraction in S3.
 44
 45    :param bucket: The name of the S3 bucket where data will be stored.
 46    :param s3_client: Optional S3 client to use for interactions with S3. If not provided, a new client is created.
 47
 48    :raises ClientError: If there is an issue with S3 operations or if the specified bucket or object does not exist.
 49    """
 50    log_message(__name__, 10, "Entered extract function")
 51    if not s3_client:
 52        s3_client = boto3.client("s3")
 53    conn = None
 54
 55    try:
 56        conn = connect_to_db()
 57        log_message(__name__, 20, "Connection to DB made")
 58
 59        date = datetime.now()
 60        date_str = date.strftime("%Y/%m/%d/%H-%M")
 61
 62        try:
 63            last_extract_file = s3_client.get_object(
 64                Bucket=bucket, Key="last_extract.txt"
 65            )
 66            last_extract = last_extract_file["Body"].read().decode("utf-8")
 67            log_message(__name__, 20, f"Extract function last ran at {last_extract}")
 68        except s3_client.exceptions.NoSuchKey:
 69            last_extract = None
 70            log_message(__name__, 20, "Extract function running for the first time")
 71
 72        totesys_table_list = [
 73            "purchase_order",
 74            "payment",
 75            "currency",
 76            "address",
 77            "design",
 78            "transaction",
 79            "sales_order",
 80            "counterparty",
 81            "staff",
 82            "payment_type",
 83            "department",
 84        ]
 85        for table in totesys_table_list:
 86            query = f"SELECT * FROM {table} "
 87            if last_extract:
 88                # Add check to compare new data with old data and update if there are updates.
 89                query += f"WHERE last_updated > '{last_extract}'"
 90            query += ";"
 91
 92            response = conn.run(query)
 93            # If response doesn't have modified data, don't upload file.
 94            if len(response):
 95                columns = [col["name"] for col in conn.columns]
 96                formatted_response = {table: format_response(columns, response)}
 97                extracted_json = json.dumps(formatted_response, indent=4)
 98                s3_key = f"{table}/{date_str}.json"
 99                s3_client.put_object(Bucket=bucket, Key=s3_key, Body=extracted_json)
100
101                log_message(__name__, 20, f"{s3_key} was written to {bucket}")
102
103                # # compile all tables to add to updated-data.json
104                # if not last_extract:
105                #     compiled_data = formatted_response
106
107                # # if new data found, update the updated-data.json
108                # if "WHERE" in query:
109
110                #     pass
111
112        # # keep a json file to add updated data to.
113        # if not last_extract:
114        #     updated_data_key = "updated-data.json"
115        #     s3_client.put_object(Bucket=bucket, Key=updated_data_key, Body=extracted_json)
116
117        store_last_extract = date.strftime("%Y-%m-%d %H:%M:%S")
118        s3_client.put_object(
119            Bucket=bucket, Key="last_extract.txt", Body=store_last_extract
120        )
121
122    except ClientError as e:
123        log_message(__name__, 40, f"Error: {e.response['Error']['Message']}")
124
125    finally:
126        if conn:
127            conn.close()
128            log_message(__name__, 20, "DB connection closed")

Extracts data from a database and stores it in an S3 bucket.

Connects to a database, retrieves data from specified tables, formats the data, and uploads it to an S3 bucket. If a last extract timestamp is found, only new or updated records are retrieved. Updates the timestamp of the last extraction in S3.

Parameters
  • bucket: The name of the S3 bucket where data will be stored.
  • s3_client: Optional S3 client to use for interactions with S3. If not provided, a new client is created.
Raises
  • ClientError: If there is an issue with S3 operations or if the specified bucket or object does not exist.