extract
1import json, os, boto3, logging 2from datetime import datetime 3from botocore.exceptions import ClientError 4from typing import Any 5from extract_utils import format_response, log_message 6from connection import connect_to_db 7 8 9# Configure logging 10logging.basicConfig( 11 level=logging.INFO, # Set the minimum logging level 12 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 13 force=True, 14 datefmt="%d/%m/%Y %I:%M:%S %p", 15) 16 17# Create a logger instance 18logger = logging.getLogger(__name__) 19 20 21def lambda_handler(event: dict, context: Any): 22 """ 23 AWS Lambda function entry point. 24 25 This function is triggered by an event and context, extracts data from a database, 26 and stores the data in an S3 bucket. The bucket name is retrieved from environment variables. 27 28 :param event: The event data passed to the Lambda function (as a dictionary). 29 :param context: The runtime information of the Lambda function (e.g., function name, version). 30 """ 31 log_message(__name__, 10, "Entered lambda_handler") 32 bucket_name = os.environ.get("S3_BUCKET_NAME") 33 extract(bucket_name) 34 35 36def extract(bucket: str, s3_client=None): 37 """ 38 Extracts data from a database and stores it in an S3 bucket. 39 40 Connects to a database, retrieves data from specified tables, formats the data, 41 and uploads it to an S3 bucket. If a last extract timestamp is found, only new 42 or updated records are retrieved. Updates the timestamp of the last extraction in S3. 43 44 :param bucket: The name of the S3 bucket where data will be stored. 45 :param s3_client: Optional S3 client to use for interactions with S3. If not provided, a new client is created. 46 47 :raises ClientError: If there is an issue with S3 operations or if the specified bucket or object does not exist. 48 """ 49 log_message(__name__, 10, "Entered extract function") 50 if not s3_client: 51 s3_client = boto3.client("s3") 52 conn = None 53 54 try: 55 conn = connect_to_db() 56 log_message(__name__, 20, "Connection to DB made") 57 58 date = datetime.now() 59 date_str = date.strftime("%Y/%m/%d/%H-%M") 60 61 try: 62 last_extract_file = s3_client.get_object( 63 Bucket=bucket, Key="last_extract.txt" 64 ) 65 last_extract = last_extract_file["Body"].read().decode("utf-8") 66 log_message(__name__, 20, f"Extract function last ran at {last_extract}") 67 except s3_client.exceptions.NoSuchKey: 68 last_extract = None 69 log_message(__name__, 20, "Extract function running for the first time") 70 71 totesys_table_list = [ 72 "purchase_order", 73 "payment", 74 "currency", 75 "address", 76 "design", 77 "transaction", 78 "sales_order", 79 "counterparty", 80 "staff", 81 "payment_type", 82 "department", 83 ] 84 for table in totesys_table_list: 85 query = f"SELECT * FROM {table} " 86 if last_extract: 87 # Add check to compare new data with old data and update if there are updates. 88 query += f"WHERE last_updated > '{last_extract}'" 89 query += ";" 90 91 response = conn.run(query) 92 # If response doesn't have modified data, don't upload file. 93 if len(response): 94 columns = [col["name"] for col in conn.columns] 95 formatted_response = {table: format_response(columns, response)} 96 extracted_json = json.dumps(formatted_response, indent=4) 97 s3_key = f"{table}/{date_str}.json" 98 s3_client.put_object(Bucket=bucket, Key=s3_key, Body=extracted_json) 99 100 log_message(__name__, 20, f"{s3_key} was written to {bucket}") 101 102 # # compile all tables to add to updated-data.json 103 # if not last_extract: 104 # compiled_data = formatted_response 105 106 # # if new data found, update the updated-data.json 107 # if "WHERE" in query: 108 109 # pass 110 111 # # keep a json file to add updated data to. 112 # if not last_extract: 113 # updated_data_key = "updated-data.json" 114 # s3_client.put_object(Bucket=bucket, Key=updated_data_key, Body=extracted_json) 115 116 store_last_extract = date.strftime("%Y-%m-%d %H:%M:%S") 117 s3_client.put_object( 118 Bucket=bucket, Key="last_extract.txt", Body=store_last_extract 119 ) 120 121 except ClientError as e: 122 log_message(__name__, 40, f"Error: {e.response['Error']['Message']}") 123 124 finally: 125 if conn: 126 conn.close() 127 log_message(__name__, 20, "DB connection closed")
logger =
<Logger extract (INFO)>
def
lambda_handler(event: dict, context: Any):
22def lambda_handler(event: dict, context: Any): 23 """ 24 AWS Lambda function entry point. 25 26 This function is triggered by an event and context, extracts data from a database, 27 and stores the data in an S3 bucket. The bucket name is retrieved from environment variables. 28 29 :param event: The event data passed to the Lambda function (as a dictionary). 30 :param context: The runtime information of the Lambda function (e.g., function name, version). 31 """ 32 log_message(__name__, 10, "Entered lambda_handler") 33 bucket_name = os.environ.get("S3_BUCKET_NAME") 34 extract(bucket_name)
AWS Lambda function entry point.
This function is triggered by an event and context, extracts data from a database, and stores the data in an S3 bucket. The bucket name is retrieved from environment variables.
Parameters
- event: The event data passed to the Lambda function (as a dictionary).
- context: The runtime information of the Lambda function (e.g., function name, version).
def
extract(bucket: str, s3_client=None):
37def extract(bucket: str, s3_client=None): 38 """ 39 Extracts data from a database and stores it in an S3 bucket. 40 41 Connects to a database, retrieves data from specified tables, formats the data, 42 and uploads it to an S3 bucket. If a last extract timestamp is found, only new 43 or updated records are retrieved. Updates the timestamp of the last extraction in S3. 44 45 :param bucket: The name of the S3 bucket where data will be stored. 46 :param s3_client: Optional S3 client to use for interactions with S3. If not provided, a new client is created. 47 48 :raises ClientError: If there is an issue with S3 operations or if the specified bucket or object does not exist. 49 """ 50 log_message(__name__, 10, "Entered extract function") 51 if not s3_client: 52 s3_client = boto3.client("s3") 53 conn = None 54 55 try: 56 conn = connect_to_db() 57 log_message(__name__, 20, "Connection to DB made") 58 59 date = datetime.now() 60 date_str = date.strftime("%Y/%m/%d/%H-%M") 61 62 try: 63 last_extract_file = s3_client.get_object( 64 Bucket=bucket, Key="last_extract.txt" 65 ) 66 last_extract = last_extract_file["Body"].read().decode("utf-8") 67 log_message(__name__, 20, f"Extract function last ran at {last_extract}") 68 except s3_client.exceptions.NoSuchKey: 69 last_extract = None 70 log_message(__name__, 20, "Extract function running for the first time") 71 72 totesys_table_list = [ 73 "purchase_order", 74 "payment", 75 "currency", 76 "address", 77 "design", 78 "transaction", 79 "sales_order", 80 "counterparty", 81 "staff", 82 "payment_type", 83 "department", 84 ] 85 for table in totesys_table_list: 86 query = f"SELECT * FROM {table} " 87 if last_extract: 88 # Add check to compare new data with old data and update if there are updates. 89 query += f"WHERE last_updated > '{last_extract}'" 90 query += ";" 91 92 response = conn.run(query) 93 # If response doesn't have modified data, don't upload file. 94 if len(response): 95 columns = [col["name"] for col in conn.columns] 96 formatted_response = {table: format_response(columns, response)} 97 extracted_json = json.dumps(formatted_response, indent=4) 98 s3_key = f"{table}/{date_str}.json" 99 s3_client.put_object(Bucket=bucket, Key=s3_key, Body=extracted_json) 100 101 log_message(__name__, 20, f"{s3_key} was written to {bucket}") 102 103 # # compile all tables to add to updated-data.json 104 # if not last_extract: 105 # compiled_data = formatted_response 106 107 # # if new data found, update the updated-data.json 108 # if "WHERE" in query: 109 110 # pass 111 112 # # keep a json file to add updated data to. 113 # if not last_extract: 114 # updated_data_key = "updated-data.json" 115 # s3_client.put_object(Bucket=bucket, Key=updated_data_key, Body=extracted_json) 116 117 store_last_extract = date.strftime("%Y-%m-%d %H:%M:%S") 118 s3_client.put_object( 119 Bucket=bucket, Key="last_extract.txt", Body=store_last_extract 120 ) 121 122 except ClientError as e: 123 log_message(__name__, 40, f"Error: {e.response['Error']['Message']}") 124 125 finally: 126 if conn: 127 conn.close() 128 log_message(__name__, 20, "DB connection closed")
Extracts data from a database and stores it in an S3 bucket.
Connects to a database, retrieves data from specified tables, formats the data, and uploads it to an S3 bucket. If a last extract timestamp is found, only new or updated records are retrieved. Updates the timestamp of the last extraction in S3.
Parameters
- bucket: The name of the S3 bucket where data will be stored.
- s3_client: Optional S3 client to use for interactions with S3. If not provided, a new client is created.
Raises
- ClientError: If there is an issue with S3 operations or if the specified bucket or object does not exist.