@@ -70,7 +70,7 @@ def wait_for_session(session_id,interval=1):
7070 break
7171 time .sleep (interval )
7272
73- def wait_for_statement (session_id ,statement_id ,interval = 1 ):
73+ def wait_for_statement (session_id ,statement_id ,interval = 2 ):
7474 while True :
7575 response = glue_client .get_statement (
7676 SessionId = session_id ,
@@ -88,7 +88,6 @@ def parse_spark_show_output(output):
8888 lines = output .strip ().split ('\n ' )
8989 header = lines [1 ] # Column names are typically in the second line
9090 columns = [col .strip () for col in header .split ('|' ) if col .strip ()] # Clean and split by '|'
91-
9291 data = []
9392 # Start reading data from the third line and skip the last line which is a border
9493 for row in lines [3 :- 1 ]:
@@ -111,6 +110,9 @@ def send_files_metrics(glue_db_name, glue_table_name, snapshot,session_id):
111110 stmt_response = wait_for_statement (session_id , run_stmt_response ["Id" ])
112111 data_str = stmt_response ["Statement" ]["Output" ]["Data" ]["TextPlain" ]
113112 logger .info (stmt_response )
113+ if data_str == "" :
114+ logger .info ("No files found" )
115+ return
114116 df = parse_spark_show_output (data_str )
115117 df = df .applymap (int )
116118 file_metrics = {
@@ -262,7 +264,7 @@ def create_or_reuse_glue_session():
262264 GlueVersion = "4.0" ,
263265 NumberOfWorkers = 2 ,
264266 WorkerType = "G.1X" ,
265- IdleTimeout = 30 ,
267+ IdleTimeout = 6 ,
266268 Tags = glue_session_tags ,
267269 )
268270 wait_for_session (session_id )
@@ -287,6 +289,9 @@ def send_snapshot_metrics(glue_db_name, glue_table_name, snapshot_id, session_id
287289 logger .info (f"send_snapshot_metrics() -> statement_id={ stmt_id } " )
288290 stmt_response = wait_for_statement (session_id , stmt_id )
289291 json_list_str = stmt_response ["Statement" ]["Output" ]["Data" ]["TextPlain" ].replace ("\' " , "" )
292+ if json_list_str .strip () == '' :
293+ logger .info ("No snapshots info found" )
294+ return
290295 snapshots = json .loads (json_list_str )
291296 logger .info ("send_snapshot_metrics()->response" )
292297 logger .info (json .dumps (snapshots , indent = 4 ))
@@ -307,7 +312,8 @@ def send_snapshot_metrics(glue_db_name, glue_table_name, snapshot_id, session_id
307312 send_custom_metric (
308313 metric_name = f"snapshot.{ normalized_metric_name } " ,
309314 dimensions = [
310- {'Name' : 'table_name' , 'Value' : f"{ glue_db_name } .{ glue_table_name } " }
315+ {'Name' : 'table_name' , 'Value' : f"{ glue_db_name } .{ glue_table_name } " },
316+ {'Name' : 'snapshot_id' , 'Value' : str (snapshot_id )}
311317 ],
312318 value = int (metric_value ),
313319 unit = 'Bytes' if "size" in normalized_metric_name else "Count" ,
0 commit comments