2626warehouse_path = os .environ .get ('SPARK_CATALOG_S3_WAREHOUSE' )
2727
2828glue_session_tags = {
29- "app" : "iceberg-monitoring "
29+ "app" : "monitor-iceberg "
3030}
3131
3232def send_custom_metric ( metric_name , dimensions , value , unit , namespace , timestamp = None ):
@@ -101,7 +101,7 @@ def parse_spark_show_output(output):
101101 return pd .DataFrame (data , columns = columns )
102102
103103def send_files_metrics (glue_db_name , glue_table_name , snapshot ,session_id ):
104- sql_stmt = f"select file_path, record_count, file_size_in_bytes from glue_catalog.{ glue_db_name } .{ glue_table_name } .files"
104+ sql_stmt = f"SELECT CAST(AVG(record_count) as INT) as avg_record_count, MAX( record_count) as max_record_count, MIN(record_count) as min_record_count, CAST(AVG( file_size_in_bytes) as INT) as avg_file_size, MAX(file_size_in_bytes) as max_file_size, MIN(file_size_in_bytes) as min_file_size FROM glue_catalog.{ glue_db_name } .{ glue_table_name } .files"
105105 run_stmt_response = glue_client .run_statement (
106106 SessionId = session_id ,
107107 Code = f"df = spark.sql(\" { sql_stmt } \" );df.show(df.count(),truncate=False)"
@@ -112,14 +112,14 @@ def send_files_metrics(glue_db_name, glue_table_name, snapshot,session_id):
112112 data_str = stmt_response ["Statement" ]["Output" ]["Data" ]["TextPlain" ]
113113 logger .info (stmt_response )
114114 df = parse_spark_show_output (data_str )
115+ df = df .applymap (int )
115116 file_metrics = {
116- "avg_record_count" : df ["record_count" ].astype (int ).mean ().astype (int ),
117- "max_record_count" : df ["record_count" ].astype (int ).max (),
118- "min_record_count" : df ["record_count" ].astype (int ).min (),
119- "deviation_record_count" : df ['record_count' ].astype (int ).std ().round (2 ),
120- "avg_file_size" : df ['file_size_in_bytes' ].astype (int ).mean ().astype (int ),
121- "max_file_size" : df ['file_size_in_bytes' ].astype (int ).max (),
122- "min_file_size" : df ['file_size_in_bytes' ].astype (int ).min (),
117+ "avg_record_count" : df .iloc [0 ]["avg_record_count" ],
118+ "max_record_count" : df .iloc [0 ]["max_record_count" ],
119+ "min_record_count" : df .iloc [0 ]["min_record_count" ],
120+ "avg_file_size" : df .iloc [0 ]['avg_file_size' ],
121+ "max_file_size" : df .iloc [0 ]['max_file_size' ],
122+ "min_file_size" : df .iloc [0 ]['min_file_size' ],
123123 }
124124 logger .info ("file_metrics=" )
125125 logger .info (file_metrics )
@@ -215,13 +215,30 @@ def send_partition_metrics(glue_db_name, glue_table_name, snapshot,session_id):
215215 timestamp = snapshot .timestamp_ms ,
216216 )
217217
218+ def get_all_sessions ():
219+ sessions = []
220+ next_token = None
221+
222+ while True :
223+ if next_token :
224+ response = glue_client .list_sessions (Tags = glue_session_tags , NextToken = next_token )
225+ else :
226+ response = glue_client .list_sessions (Tags = glue_session_tags )
227+
228+ sessions .extend (response ['Sessions' ])
229+ next_token = response .get ('NextToken' )
230+
231+ if not next_token :
232+ break
233+
234+ return sessions
235+
218236def create_or_reuse_glue_session ():
219237 session_id = None
220238
221- glue_sessions = glue_client .list_sessions (
222- Tags = glue_session_tags ,
223- )
224- for session in glue_sessions ["Sessions" ]:
239+ glue_sessions = get_all_sessions ()
240+
241+ for session in glue_sessions :
225242 if (session ["Status" ] == "READY" ):
226243 session_id = session ["Id" ]
227244 logger .info (f"Found existing session_id={ session_id } " )
@@ -235,7 +252,7 @@ def create_or_reuse_glue_session():
235252 Id = session_id ,
236253 Role = glue_service_role ,
237254 Command = {'Name' : 'glueetl' , "PythonVersion" : "3" },
238- Timeout = 60 ,
255+ Timeout = 120 ,
239256 DefaultArguments = {
240257 "--enable-glue-datacatalog" : "true" ,
241258 "--enable-observability-metrics" : "true" ,
@@ -245,7 +262,7 @@ def create_or_reuse_glue_session():
245262 GlueVersion = "4.0" ,
246263 NumberOfWorkers = 2 ,
247264 WorkerType = "G.1X" ,
248- IdleTimeout = 120 ,
265+ IdleTimeout = 30 ,
249266 Tags = glue_session_tags ,
250267 )
251268 wait_for_session (session_id )
0 commit comments