Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions lambda/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def wait_for_session(session_id,interval=1):
break
time.sleep(interval)

def wait_for_statement(session_id,statement_id,interval=1):
def wait_for_statement(session_id,statement_id,interval=2):
while True:
response = glue_client.get_statement(
SessionId=session_id,
Expand All @@ -88,7 +88,6 @@ def parse_spark_show_output(output):
lines = output.strip().split('\n')
header = lines[1] # Column names are typically in the second line
columns = [col.strip() for col in header.split('|') if col.strip()] # Clean and split by '|'

data = []
# Start reading data from the third line and skip the last line which is a border
for row in lines[3:-1]:
Expand All @@ -111,6 +110,9 @@ def send_files_metrics(glue_db_name, glue_table_name, snapshot,session_id):
stmt_response = wait_for_statement(session_id, run_stmt_response["Id"])
data_str = stmt_response["Statement"]["Output"]["Data"]["TextPlain"]
logger.info(stmt_response)
if data_str == "":
logger.info("No files found")
return
df = parse_spark_show_output(data_str)
df = df.applymap(int)
file_metrics = {
Expand Down Expand Up @@ -262,7 +264,7 @@ def create_or_reuse_glue_session():
GlueVersion="4.0",
NumberOfWorkers=2,
WorkerType="G.1X",
IdleTimeout=30,
IdleTimeout=6,
Tags=glue_session_tags,
)
wait_for_session(session_id)
Expand All @@ -287,6 +289,9 @@ def send_snapshot_metrics(glue_db_name, glue_table_name, snapshot_id, session_id
logger.info(f"send_snapshot_metrics() -> statement_id={stmt_id}")
stmt_response = wait_for_statement(session_id, stmt_id)
json_list_str = stmt_response["Statement"]["Output"]["Data"]["TextPlain"].replace("\'", "")
if json_list_str.strip() == '':
logger.info("No snapshots info found")
return
snapshots = json.loads(json_list_str)
logger.info("send_snapshot_metrics()->response")
logger.info(json.dumps(snapshots, indent=4))
Expand All @@ -307,7 +312,8 @@ def send_snapshot_metrics(glue_db_name, glue_table_name, snapshot_id, session_id
send_custom_metric(
metric_name=f"snapshot.{normalized_metric_name}",
dimensions=[
{'Name': 'table_name', 'Value': f"{glue_db_name}.{glue_table_name}"}
{'Name': 'table_name', 'Value': f"{glue_db_name}.{glue_table_name}"},
{'Name': 'snapshot_id', 'Value': str(snapshot_id)}
],
value=int(metric_value),
unit='Bytes' if "size" in normalized_metric_name else "Count",
Expand Down
2 changes: 1 addition & 1 deletion template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Resources:
Properties:
CodeUri: lambda/
Handler: app.lambda_handler
Runtime: python3.9
Runtime: python3.12
Policies:
- CloudWatchPutMetricPolicy: {}
- AWSLambdaBasicExecutionRole
Expand Down