Skip to content

Commit 89d4437

Browse files
committed
feat: use input caching to avoid multiple executions within a short time window (3h)
1 parent 2c7cbea commit 89d4437

File tree

1 file changed

+74
-6
lines changed

1 file changed

+74
-6
lines changed

lambda/app.py

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,20 @@
2929
"app": "monitor-iceberg"
3030
}
3131

32+
CACHE_FILE_PATH = '/tmp/iceberg_monitoring_cache.json'
33+
34+
35+
def get_cache_expiry_hours():
36+
"""Get cache expiry hours from environment variable or default to 3."""
37+
env_value = os.getenv('CACHE_EXPIRY_HOURS')
38+
if env_value is not None:
39+
try:
40+
# Try to convert to float first, then to int to handle decimal values
41+
return int(float(env_value))
42+
except (ValueError, TypeError):
43+
logger.warning(f"Invalid CACHE_EXPIRY_HOURS value '{env_value}', using default of 3 hours")
44+
return 3
45+
3246
def send_custom_metric( metric_name, dimensions, value, unit, namespace, timestamp=None):
3347
"""
3448
Send a custom metric to AWS CloudWatch.
@@ -326,27 +340,81 @@ def check_table_is_of_iceberg_format(event):
326340
except KeyError:
327341
logger.warning("check_table_is_of_iceberg_format() -> table_type is missing")
328342
return False
343+
344+
345+
def load_cache():
346+
"""Load cache from JSON file in /tmp/ directory."""
347+
if os.path.exists(CACHE_FILE_PATH):
348+
try:
349+
with open(CACHE_FILE_PATH, 'r') as f:
350+
return json.load(f)
351+
except (json.JSONDecodeError, IOError) as e:
352+
logger.warning(f"Failed to load cache file: {e}")
353+
return {}
354+
355+
356+
def save_cache(cache):
357+
"""Save cache to JSON file in /tmp/ directory."""
358+
try:
359+
with open(CACHE_FILE_PATH, 'w') as f:
360+
json.dump(cache, f)
361+
except IOError as e:
362+
logger.warning(f"Failed to save cache file: {e}")
363+
364+
365+
def should_skip_execution(database_name, table_name):
366+
"""Check if execution should be skipped based on cache."""
367+
cache = load_cache()
368+
cache_key = f"{database_name}.{table_name}"
369+
370+
if cache_key in cache:
371+
last_execution_time = cache[cache_key]
372+
current_time = time.time()
373+
time_diff_hours = (current_time - last_execution_time) / 3600
374+
cache_expiry_hours = get_cache_expiry_hours()
375+
376+
if time_diff_hours < cache_expiry_hours:
377+
logger.info(f"Skipping execution for {cache_key}. Last execution was {time_diff_hours:.2f} hours ago (expiry: {cache_expiry_hours} hours).")
378+
return True
379+
380+
return False
381+
382+
383+
def update_cache(database_name, table_name):
384+
"""Update cache with current execution time."""
385+
cache = load_cache()
386+
cache_key = f"{database_name}.{table_name}"
387+
cache[cache_key] = time.time()
388+
save_cache(cache)
329389

330390

331391
def lambda_handler(event, context):
332392
log_format = f"[{context.aws_request_id}:%(message)s"
333393
logging.basicConfig(format=log_format, level=logging.INFO)
334-
394+
395+
glue_db_name = event["detail"]["databaseName"]
396+
glue_table_name = event["detail"]["tableName"]
397+
398+
# Check cache to see if we should skip execution
399+
if should_skip_execution(glue_db_name, glue_table_name):
400+
logger.info(f"Skipping metrics generation for {glue_db_name}.{glue_table_name} due to recent execution")
401+
return
402+
403+
# Update cache with current execution time
404+
update_cache(glue_db_name, glue_table_name)
405+
335406
# Ensure Table is of Iceberg format.
336407
if not check_table_is_of_iceberg_format(event):
337408
logger.info("Table is not of Iceberg format, skipping metrics generation")
338409
return
339-
340-
glue_db_name = event["detail"]["databaseName"]
341-
glue_table_name = event["detail"]["tableName"]
342-
410+
343411
catalog = GlueCatalog(glue_db_name)
344412
table = catalog.load_table((glue_db_name, glue_table_name))
345413
logger.info(f"current snapshot id={table.metadata.current_snapshot_id}")
346414
snapshot = table.metadata.snapshot_by_id(table.metadata.current_snapshot_id)
347415
logger.info("Using glue IS to produce metrics")
348416
session_id = create_or_reuse_glue_session()
349-
417+
350418
send_snapshot_metrics(glue_db_name, glue_table_name, table.metadata.current_snapshot_id, session_id)
351419
send_partition_metrics(glue_db_name, glue_table_name, snapshot,session_id)
352420
send_files_metrics(glue_db_name, glue_table_name, snapshot,session_id)

0 commit comments

Comments
 (0)