From e8129a8f479d8bd274aa4f63da95fe9feff133a2 Mon Sep 17 00:00:00 2001 From: Dimitris Aloupis Date: Fri, 29 Aug 2025 18:58:45 +0300 Subject: [PATCH] Apply workable modification to lambda code --- lambda/app.py | 14 ++++++++++---- template.yaml | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/lambda/app.py b/lambda/app.py index f0de489..de0a949 100644 --- a/lambda/app.py +++ b/lambda/app.py @@ -70,7 +70,7 @@ def wait_for_session(session_id,interval=1): break time.sleep(interval) -def wait_for_statement(session_id,statement_id,interval=1): +def wait_for_statement(session_id,statement_id,interval=2): while True: response = glue_client.get_statement( SessionId=session_id, @@ -88,7 +88,6 @@ def parse_spark_show_output(output): lines = output.strip().split('\n') header = lines[1] # Column names are typically in the second line columns = [col.strip() for col in header.split('|') if col.strip()] # Clean and split by '|' - data = [] # Start reading data from the third line and skip the last line which is a border for row in lines[3:-1]: @@ -111,6 +110,9 @@ def send_files_metrics(glue_db_name, glue_table_name, snapshot,session_id): stmt_response = wait_for_statement(session_id, run_stmt_response["Id"]) data_str = stmt_response["Statement"]["Output"]["Data"]["TextPlain"] logger.info(stmt_response) + if data_str == "": + logger.info("No files found") + return df = parse_spark_show_output(data_str) df = df.applymap(int) file_metrics = { @@ -262,7 +264,7 @@ def create_or_reuse_glue_session(): GlueVersion="4.0", NumberOfWorkers=2, WorkerType="G.1X", - IdleTimeout=30, + IdleTimeout=6, Tags=glue_session_tags, ) wait_for_session(session_id) @@ -287,6 +289,9 @@ def send_snapshot_metrics(glue_db_name, glue_table_name, snapshot_id, session_id logger.info(f"send_snapshot_metrics() -> statement_id={stmt_id}") stmt_response = wait_for_statement(session_id, stmt_id) json_list_str = stmt_response["Statement"]["Output"]["Data"]["TextPlain"].replace("\'", "") + if json_list_str.strip() == '': + logger.info("No snapshots info found") + return snapshots = json.loads(json_list_str) logger.info("send_snapshot_metrics()->response") logger.info(json.dumps(snapshots, indent=4)) @@ -307,7 +312,8 @@ def send_snapshot_metrics(glue_db_name, glue_table_name, snapshot_id, session_id send_custom_metric( metric_name=f"snapshot.{normalized_metric_name}", dimensions=[ - {'Name': 'table_name', 'Value': f"{glue_db_name}.{glue_table_name}"} + {'Name': 'table_name', 'Value': f"{glue_db_name}.{glue_table_name}"}, + {'Name': 'snapshot_id', 'Value': str(snapshot_id)} ], value=int(metric_value), unit='Bytes' if "size" in normalized_metric_name else "Count", diff --git a/template.yaml b/template.yaml index 12a0981..c234125 100644 --- a/template.yaml +++ b/template.yaml @@ -20,7 +20,7 @@ Resources: Properties: CodeUri: lambda/ Handler: app.lambda_handler - Runtime: python3.9 + Runtime: python3.12 Policies: - CloudWatchPutMetricPolicy: {} - AWSLambdaBasicExecutionRole