Skip to content

Commit bf35689

Browse files
authored
v0.0.2-preview
* Lambda changed to work with EventBridge rule * Template.yaml updated to include Lambda resource-based policy for eventBridge * Readme.md updated --------- Authored-by: Michael Greenshtein <greensht@amazon.com>
1 parent faa1ce6 commit bf35689

File tree

7 files changed

+124
-101
lines changed

7 files changed

+124
-101
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ temp
33
assets/cloudwatch-dashboard.rendered.json
44
samconfig.toml
55
.aws-sam
6-
.env.local.json
6+
.env.local.json
7+
events/my.event.json

README.md

Lines changed: 41 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
## Monitoring Apache Iceberg Table metadata layer using AWS Lambda, AWS Glue and AWS CloudWatch
22

3-
This repository provides you with sample code on how to collect metrics of an existing Apache Iceberg table managed in Amazon S3. The code consists of AWS Lambda deployment package that collects and submits metrics into AWS CloudWatch. Repository also includes helper scripts for deploying CloudWatch monitoring dashboard to visualize collected metrics.
3+
This repository provides you with sample code that collects metrics of existing Apache Iceberg tables managed in your Amazon S3 and catalogued to AWS Glue Data Catalog. The code consists of AWS Lambda deployment package that collects and submits metrics into AWS CloudWatch. Repository also includes helper script for deploying CloudWatch monitoring dashboard to visualize collected metrics.
44

55
### Table of Contents
66
- [Technical implementation](#technical-implementation)
@@ -20,9 +20,9 @@ This repository provides you with sample code on how to collect metrics of an ex
2020

2121
![Architectural diagram of the solution](assets/arch.png)
2222

23-
* AWS Lambda triggered on every Iceberg snapshot creation to collect and send metrics to CloudWatch. This achieved with [S3 event notification](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html). See [Setting up S3 event notification](#3-setting-up-s3-event-notification) section.
23+
* Amazon EventBridge rule triggers AWS Lambda on every event of *Glue Data Catalog Table State Change*. Event triggered every time transaction committed to Apache Iceberg Table.
24+
* Triggered AWS Lambda code aggregates information retrieved from metadata tables to create [metrics](#metrics-collected) and submits those to Amazon CloudWatch.
2425
* AWS Lambda code includes `pyiceberg` library and [AWS Glue interactive Sessions](https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions-overview.html) with minimal compute to read `snapshots`, `partitions` and `files` Apache Iceberg metadata tables with Apache Spark.
25-
* AWS Lambda code aggregates information retrieved from metadata tables to create metrics and submits those to AWS CloudWatch.
2626

2727

2828
### Metrics collected
@@ -58,7 +58,6 @@ This repository provides you with sample code on how to collect metrics of an ex
5858
* files.max_record_count
5959
* files.min_record_count
6060
* files.deviation_record_count
61-
* files.skew_record_count
6261
* files.avg_file_size
6362
* files.max_file_size
6463
* files.min_file_size
@@ -112,67 +111,59 @@ sam deploy --guided
112111
##### Parameters
113112

114113
- `CWNamespace` - A namespace is a container for CloudWatch metrics.
115-
- `DBName` - Glue Data Catalog Database Name.
116-
- `TableName` - Apache Iceberg Table name as it appears in the Glue Data Catalog.
117114
- `GlueServiceRole` - AWS Glue Role arn you created [earlier](#configuring-iam-permissions-for-aws-glue).
118115
- `Warehouse` - Required catalog property to determine the root path of the data warehouse on S3. This can be any path on your S3 bucket. Not critical for the solution.
119-
- `IcebergTableS3BucketName` - S3 bucket name is required to allow S3 bucket event notification. SAM will add resource-based permission to allow S3 bucket to invoke AWS Lambda.
120116

121117

122-
#### 3. Setting up S3 event notification
118+
#### 3. Configure EventBridge Trigger
123119

124-
You need to setup an automatic trigger that will activate AWS Lambda metrics collection on every Apache Iceberg commit. This solution is relying on S3 event notification feature to trigger AWS Lambda every time new `metadata.json` is written to S3 `metadata` folder of the table.
125-
126-
You can follow AWS Documentation on how to [enable and configuring event notifications using the Amazon S3 console](https://docs.aws.amazon.com/AmazonS3/latest/userguide/enable-event-notifications.html).
127-
128-
or use the Python Boto3 sample code below. Replace with your bucket name and path to metadata.
120+
In this section you will configure EventBridge Rule that will trigger Lambda function on every transaction commit to Apache Iceberg table.
121+
Default rule listens to `Glue Data Catalog Table State Change` event from all the tables in Glue Data Catalog catalog. Lambda code knows to skip non-iceberg tables.
122+
If you want to scope triggers to specific Iceberg Tables and not collecting metrics from all of them you can uncomment `glue_table_names = ["<<REPLACE TABLE 1>>", "<<REPLACE TABLE 1>>"]` and add relevant table names.
129123

130124
```python
131125
import boto3
132-
s3_client = boto3.client('s3')
133-
lambda_arn = "<REPLACE WITH YOUR ARN>"
134-
bucket_name = "<REPLACE WITH YOUR S3 BUCKET NAME>"
135-
path_to_metadata_folder = "<REPLACE WITH YOUR S3 PATH>"
126+
import json
127+
128+
# Initialize a boto3 client
129+
lambda_client = boto3.client('lambda')
130+
events_client = boto3.client('events')
131+
132+
# Parameters
133+
lambda_function_arn = '<<REPLACE WITH LAMBDA FUNCTION ARN>>'
134+
glue_table_names = None
135+
# glue_table_names = ["<<REPLACE TABLE 1>>", "<<REPLACE TABLE 1>>"]
136+
137+
# Create EventBridge Rule
138+
event_pattern = {
139+
"source": ["aws.glue"],
140+
"detail-type": ["Glue Data Catalog Table State Change"]
141+
}
136142

137-
notification_configuration = {
138-
'LambdaFunctionConfigurations': [
143+
if glue_table_names:
144+
event_pattern
145+
event_pattern["detail"] = {
146+
"tableName": glue_table_names
147+
}
148+
event_pattern_dump = json.dumps(event_pattern)
149+
rule_response = events_client.put_rule(
150+
Name='IcebergTablesUpdateRule',
151+
EventPattern=event_pattern_dump,
152+
State='ENABLED'
153+
)
154+
# Add Lambda as a target to the EventBridge Rule
155+
events_client.put_targets(
156+
Rule='IcebergTablesUpdateRule',
157+
Targets=[
139158
{
140-
'LambdaFunctionArn': lambda_arn,
141-
'Events': [
142-
's3:ObjectCreated:Put'
143-
],
144-
'Filter': {
145-
'Key': {
146-
'FilterRules': [
147-
{
148-
'Name': 'Prefix',
149-
'Value': path_to_metadata_folder
150-
},
151-
{
152-
'Name': 'Suffix',
153-
'Value': '.json'
154-
}
155-
]
156-
}
157-
}
159+
'Id': '1',
160+
'Arn': lambda_function_arn
158161
}
159162
]
160-
}
161-
response = s3_client.put_bucket_notification_configuration(
162-
Bucket=bucket_name,
163-
NotificationConfiguration=notification_configuration
164163
)
165-
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
166-
print("Success")
167-
else:
168-
print("Something went wrong")
169-
164+
print(f"Pattern updated = {event_pattern_dump}")
170165
```
171166

172-
The final result should look like this
173-
174-
![S3 to AWS Lambda trigger example](assets/trigger.png)
175-
176167
#### 4. (Optional) Create CloudWatch Dashboard
177168
Once your Iceberg Table metrics are submitted to CloudWatch you can start using them to monitor and create alarms. CloudWatch also let you visualize metrics using CloudWatch Dashboards.
178169

@@ -255,7 +246,6 @@ https://docs.docker.com/get-docker/
255246

256247
1. Delete AWS Lambda `sam delete`.
257248
2. Delete CloudWatch Dashboard.
258-
3. Remove S3 event notification.
259249

260250
## Security
261251

assets/arch.png

13.8 KB
Loading

assets/trigger.png

-60.5 KB
Binary file not shown.

events/event.json

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"version": "0",
3+
"id": "f9155a75-bc7c-d7b8-e240-asd234fdfg",
4+
"detail-type": "Glue Data Catalog Table State Change",
5+
"source": "aws.glue",
6+
"account": "12345678910",
7+
"time": "2024-04-29T19:11:21Z",
8+
"region": "us-east-1",
9+
"resources": [
10+
"arn:aws:glue:us-east-1:12345678910:table/sample_db_name/sample_iceberg_table"
11+
],
12+
"detail": {
13+
"databaseName": "sample_db_name",
14+
"changedPartitions": [],
15+
"typeOfChange": "UpdateTable",
16+
"tableName": "sample_iceberg_table"
17+
}
18+
}

lambda/app.py

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,14 @@
1414

1515
glue_client = boto3.client('glue')
1616

17-
required_vars = ['DBNAME', 'TABLENAME', 'CW_NAMESPACE', 'GLUE_SERVICE_ROLE', 'SPARK_CATALOG_S3_WAREHOUSE']
17+
required_vars = ['CW_NAMESPACE', 'GLUE_SERVICE_ROLE', 'SPARK_CATALOG_S3_WAREHOUSE']
1818
for var in required_vars:
1919
# Retrieve the environment variable value
2020
if os.getenv(var) is None:
2121
# If any variable is not set, raise an exception
2222
raise EnvironmentError(f"Required environment variable '{var}' is not set.")
2323

2424
cw_namespace = os.environ.get('CW_NAMESPACE')
25-
glue_db_name = os.environ.get('DBNAME')
26-
glue_table_name = os.environ.get('TABLENAME')
2725
glue_service_role = os.environ.get('GLUE_SERVICE_ROLE')
2826
warehouse_path = os.environ.get('SPARK_CATALOG_S3_WAREHOUSE')
2927

@@ -83,18 +81,26 @@ def wait_for_statement(session_id,statement_id,interval=1):
8381
logger.info(f"Statement status={status}")
8482
return response
8583
time.sleep(interval)
86-
87-
def parse_statement_result(data_str, columns):
88-
# Split the string into lines and filter out the irrelevant ones
89-
lines = data_str.split('\n')[3:-3] # Ignore the header and footer lines
90-
# Split each line into components and strip the whitespace
91-
data = [line.split('|')[1:-1] for line in lines] # Remove empty strings at start and end
92-
data = [[item.strip() for item in row] for row in data] # Strip whitespace from each item
93-
df = pd.DataFrame(data, columns=columns)
94-
return df
95-
9684

97-
def send_files_metrics(snapshot,session_id):
85+
86+
87+
def parse_spark_show_output(output):
88+
lines = output.strip().split('\n')
89+
header = lines[1] # Column names are typically in the second line
90+
columns = [col.strip() for col in header.split('|') if col.strip()] # Clean and split by '|'
91+
92+
data = []
93+
# Start reading data from the third line and skip the last line which is a border
94+
for row in lines[3:-1]:
95+
# Remove border and split
96+
row_data = [cell.strip() for cell in row.split('|') if cell.strip()]
97+
if row_data:
98+
data.append(row_data)
99+
100+
# Create DataFrame
101+
return pd.DataFrame(data, columns=columns)
102+
103+
def send_files_metrics(glue_db_name, glue_table_name, snapshot,session_id):
98104
sql_stmt = f"select file_path,record_count,file_size_in_bytes from glue_catalog.{glue_db_name}.{glue_table_name}.files"
99105
run_stmt_response = glue_client.run_statement(
100106
SessionId=session_id,
@@ -104,15 +110,13 @@ def send_files_metrics(snapshot,session_id):
104110
logger.info(f"select files statement_id={stmt_id}")
105111
stmt_response = wait_for_statement(session_id, run_stmt_response["Id"])
106112
data_str = stmt_response["Statement"]["Output"]["Data"]["TextPlain"]
107-
files_metrics_columns = ["file_path","record_count", "file_size_in_bytes"]
108-
df = parse_statement_result(data_str,files_metrics_columns)
109-
113+
logger.info(stmt_response)
114+
df = parse_spark_show_output(data_str)
110115
file_metrics = {
111116
"avg_record_count": df["record_count"].astype(int).mean().astype(int),
112117
"max_record_count": df["record_count"].astype(int).max(),
113118
"min_record_count": df["record_count"].astype(int).min(),
114119
"deviation_record_count": df['record_count'].astype(int).std().round(2),
115-
"skew_record_count": df['record_count'].astype(int).skew().round(2),
116120
"avg_file_size": df['file_size_in_bytes'].astype(int).mean().astype(int),
117121
"max_file_size": df['file_size_in_bytes'].astype(int).max(),
118122
"min_file_size": df['file_size_in_bytes'].astype(int).min(),
@@ -135,7 +139,7 @@ def send_files_metrics(snapshot,session_id):
135139
)
136140

137141

138-
def send_partition_metrics(snapshot,session_id):
142+
def send_partition_metrics(glue_db_name, glue_table_name, snapshot,session_id):
139143
sql_stmt = f"select partition,record_count,file_count from glue_catalog.{glue_db_name}.{glue_table_name}.partitions"
140144
run_stmt_response = glue_client.run_statement(
141145
SessionId=session_id,
@@ -146,8 +150,12 @@ def send_partition_metrics(snapshot,session_id):
146150
logger.info(f"send_partition_metrics() -> statement_id={stmt_id}")
147151
stmt_response = wait_for_statement(session_id, stmt_id)
148152
data_str = stmt_response["Statement"]["Output"]["Data"]["TextPlain"]
149-
partition_metrics_columns = ['partition', 'record_count', 'file_count']
150-
df = parse_statement_result(data_str,partition_metrics_columns)
153+
154+
if data_str == "":
155+
logger.info("No partitions found")
156+
return
157+
158+
df = parse_spark_show_output(data_str)
151159
partition_metrics = {
152160
"avg_record_count": df["record_count"].astype(int).mean().astype(int),
153161
"max_record_count": df["record_count"].astype(int).max(),
@@ -249,9 +257,11 @@ def dt_to_ts(dt_str):
249257
timestamp_seconds = dt_obj.timestamp()
250258
return int(timestamp_seconds * 1000)
251259

252-
def send_snapshot_metrics(snapshot_id, session_id):
260+
261+
def send_snapshot_metrics(glue_db_name, glue_table_name, snapshot_id, session_id):
253262
logger.info("send_snapshot_metrics")
254263
sql_stmt = f"select committed_at,snapshot_id,operation,summary from glue_catalog.{glue_db_name}.{glue_table_name}.snapshots where snapshot_id={snapshot_id}"
264+
logger.debug(sql_stmt)
255265
run_stmt_response = glue_client.run_statement(
256266
SessionId=session_id,
257267
Code=f"df=spark.sql(\"{sql_stmt}\");json_rdd=df.toJSON();json_strings=json_rdd.collect();print(json_strings)"
@@ -288,20 +298,38 @@ def send_snapshot_metrics(snapshot_id, session_id):
288298
timestamp = timestamp_ms,
289299
)
290300

301+
# check if glue table is of iceberg format, return boolean
302+
def check_table_is_of_iceberg_format(event):
303+
response = glue_client.get_table(
304+
DatabaseName=event["detail"]["databaseName"],
305+
Name=event["detail"]["tableName"],
306+
)
307+
try:
308+
return response["Table"]["Parameters"]["table_type"] == "ICEBERG"
309+
except KeyError:
310+
logger.warning("check_table_is_of_iceberg_format() -> table_type is missing")
311+
return False
312+
291313

292314
def lambda_handler(event, context):
293-
log_format = f"[{context.aws_request_id}:%(asctime)s.%(msecs)03d] %(message)s"
294-
logging.basicConfig(format=log_format, datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO)
315+
log_format = f"[{context.aws_request_id}:%(message)s"
316+
logging.basicConfig(format=log_format, level=logging.INFO)
295317

318+
# Ensure Table is of Iceberg format.
319+
if not check_table_is_of_iceberg_format(event):
320+
logger.info("Table is not of Iceberg format, skipping metrics generation")
321+
return
296322

297-
catalog = GlueCatalog("default")
323+
glue_db_name = event["detail"]["databaseName"]
324+
glue_table_name = event["detail"]["tableName"]
325+
326+
catalog = GlueCatalog(glue_db_name)
298327
table = catalog.load_table((glue_db_name, glue_table_name))
299328
logger.info(f"current snapshot id={table.metadata.current_snapshot_id}")
300329
snapshot = table.metadata.snapshot_by_id(table.metadata.current_snapshot_id)
301-
302330
logger.info("Using glue IS to produce metrics")
303331
session_id = create_or_reuse_glue_session()
304332

305-
send_snapshot_metrics(table.metadata.current_snapshot_id, session_id)
306-
send_partition_metrics(snapshot,session_id)
307-
send_files_metrics(snapshot,session_id)
333+
send_snapshot_metrics(glue_db_name, glue_table_name, table.metadata.current_snapshot_id, session_id)
334+
send_partition_metrics(glue_db_name, glue_table_name, snapshot,session_id)
335+
send_files_metrics(glue_db_name, glue_table_name, snapshot,session_id)

template.yaml

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,12 @@ Parameters:
44
CWNamespace:
55
Description: Amazon CloudWatch custom metric namespace
66
Type: String
7-
DBName:
8-
Description: AWS Glue Database Name
9-
Type: String
10-
TableName:
11-
Description: AWS Glue Table Name
12-
Type: String
137
GlueServiceRole:
148
Description: AWS Glue Service Role with permissions to create session.
159
Type: String
1610
Warehouse:
1711
Description: warehouse is a required catalog property to determine the root path of the data warehouse in storage.
1812
Type: String
19-
IcebergTableS3BucketName:
20-
Description: this property is required to allow S3 bucket event notification. SAM will add resource-based permission to allow S3 bucket to invoke AWS Lambda.
21-
Type: String
2213
Globals:
2314
Function:
2415
Timeout: 360
@@ -32,9 +23,8 @@ Resources:
3223
Runtime: python3.9
3324
Policies:
3425
- CloudWatchPutMetricPolicy: {}
35-
- S3ReadPolicy:
36-
BucketName: !Ref IcebergTableS3BucketName
3726
- AWSLambdaBasicExecutionRole
27+
- AmazonS3ReadOnlyAccess
3828
- Statement:
3929
- Sid: GlueInteractiveSessionPolicy
4030
Effect: Allow
@@ -68,20 +58,16 @@ Resources:
6858
- x86_64
6959
Environment:
7060
Variables:
71-
DBNAME: !Ref DBName
72-
TABLENAME: !Ref TableName
7361
CW_NAMESPACE: !Ref CWNamespace
7462
GLUE_SERVICE_ROLE: !Ref GlueServiceRole
7563
SPARK_CATALOG_S3_WAREHOUSE: !Ref Warehouse
76-
S3LambdaInvokeResourceBasedPermission:
64+
PermissionForEventBridgeToInvokeLambda:
7765
Type: AWS::Lambda::Permission
7866
Properties:
79-
FunctionName: !GetAtt IcebergMetricsLambda.Arn
80-
Action: lambda:InvokeFunction
81-
Principal: s3.amazonaws.com
82-
SourceAccount: !Ref 'AWS::AccountId'
83-
SourceArn: !Sub 'arn:aws:s3:::${IcebergTableS3BucketName}'
84-
67+
FunctionName: !Ref IcebergMetricsLambda
68+
Action: 'lambda:InvokeFunction'
69+
Principal: 'events.amazonaws.com'
70+
SourceArn: !Sub 'arn:aws:events:${AWS::Region}:${AWS::AccountId}:rule/IcebergTablesUpdateRule*'
8571
Outputs:
8672
IcebergMetricsLambda:
8773
Description: Lambda Function ARN

0 commit comments

Comments
 (0)