diff --git a/ICEBERG_INTEGRATION_SUMMARY.md b/ICEBERG_INTEGRATION_SUMMARY.md new file mode 100644 index 0000000..de321bb --- /dev/null +++ b/ICEBERG_INTEGRATION_SUMMARY.md @@ -0,0 +1,30 @@ +# Iceberg Integration Summary + +## Core Integration Files + +### Essential Library +- `libs/glue_functions/iceberg_glue_functions.py` - Core Iceberg functions for Glue Catalog integration + +### Production Code +- `sparkLambdaHandler.py` - Main Lambda handler with Iceberg support + +### Key Examples (Kept) +- `examples/advanced-iceberg-features.py` - Time travel and metadata queries +- `examples/lambda-handler-templates.py` - Production Lambda templates +- `examples/production-etl-pipeline.py` - Complete ETL pipeline +- `examples/USAGE_GUIDE.md` - Usage documentation + +### Infrastructure (Kept) +- `test-infrastructure/iceberg-test-setup.yaml` - CloudFormation template +- `test-infrastructure/create-sample-iceberg-table.py` - Table creation +- `test-infrastructure/deploy-test-environment.sh` - Environment deployment +- `test-infrastructure/cleanup-test-environment.sh` - Cleanup script + +## Removed Files +- All redundant example scripts +- Test and demo scripts +- Duplicate functionality files +- Temporary files and guides + +## Usage +Your Iceberg integration is now streamlined with only essential files for production use. \ No newline at end of file diff --git a/examples/USAGE_GUIDE.md b/examples/USAGE_GUIDE.md new file mode 100644 index 0000000..c32d3f8 --- /dev/null +++ b/examples/USAGE_GUIDE.md @@ -0,0 +1,337 @@ +# Iceberg Integration Usage Guide + +This guide shows you how to use the Iceberg integration code in your Spark on AWS Lambda applications. + +## ๐Ÿš€ Quick Start + +### 1. Basic Setup + +```python +# In your Lambda function +import sys +sys.path.append('/home/glue_functions') + +from iceberg_glue_functions import read_iceberg_table_with_spark +from pyspark.sql import SparkSession + +# Create Spark session (use the provided helper) +spark = create_iceberg_spark_session() + +# Read Iceberg table +df = spark.read.format("iceberg").load("glue_catalog.your_database.your_table") +``` + +### 2. Environment Variables + +Set these in your Lambda function: + +```bash +SCRIPT_BUCKET=your-s3-bucket +SPARK_SCRIPT=your-script.py +DATABASE_NAME=your_database +TABLE_NAME=your_table +AWS_REGION=us-east-1 +``` + +## ๐Ÿ“– Usage Examples + +### Example 1: Simple Data Reading + +```python +def lambda_handler(event, context): + spark = create_iceberg_spark_session() + + try: + # Read table + df = spark.read.format("iceberg").load("glue_catalog.analytics.customer_data") + + # Basic operations + print(f"Row count: {df.count()}") + df.show(10) + + # Filter data + recent_data = df.filter(col("created_date") >= "2024-01-01") + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'total_rows': df.count(), + 'recent_rows': recent_data.count() + }) + } + finally: + spark.stop() +``` + +### Example 2: Time Travel Queries + +```python +def lambda_handler(event, context): + spark = create_iceberg_spark_session() + + try: + # Current data + current_df = spark.read.format("iceberg").load("glue_catalog.sales.transactions") + + # Historical data (yesterday) + historical_df = spark.read.format("iceberg") \ + .option("as-of-timestamp", "2024-01-20 00:00:00.000") \ + .load("glue_catalog.sales.transactions") + + # Compare + current_count = current_df.count() + historical_count = historical_df.count() + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'current_transactions': current_count, + 'historical_transactions': historical_count, + 'new_transactions': current_count - historical_count + }) + } + finally: + spark.stop() +``` + +### Example 3: Data Processing Pipeline + +```python +def lambda_handler(event, context): + spark = create_iceberg_spark_session() + + try: + # Read source data + raw_df = spark.read.format("iceberg").load("glue_catalog.raw.events") + + # Process data + processed_df = raw_df \ + .filter(col("event_type") == "purchase") \ + .withColumn("processing_date", current_date()) \ + .groupBy("customer_id", "product_category") \ + .agg( + sum("amount").alias("total_spent"), + count("*").alias("purchase_count") + ) + + # Write to target table + processed_df.write \ + .format("iceberg") \ + .mode("overwrite") \ + .save("glue_catalog.analytics.customer_purchases") + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'processed_customers': processed_df.count(), + 'message': 'Processing completed successfully' + }) + } + finally: + spark.stop() +``` + +## ๐Ÿ”ง Configuration Options + +### Spark Session Configuration + +```python +spark = SparkSession.builder \ + .appName("Your-App-Name") \ + .master("local[*]") \ + .config("spark.driver.memory", "5g") \ + .config("spark.executor.memory", "5g") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .getOrCreate() +``` + +### Lambda Function Settings + +- **Memory**: 3008 MB (recommended for Spark workloads) +- **Timeout**: 15 minutes (maximum) +- **Runtime**: Use container image with Iceberg support +- **Environment Variables**: Set database and table names + +## ๐Ÿ“Š Available Functions + +### Basic Operations + +```python +# Read table +df = read_iceberg_table_with_spark(spark, "database", "table") + +# Get table metadata +metadata = get_iceberg_table_metadata("database", "table", "us-east-1") + +# Get table location +location = get_iceberg_table_location(metadata) +``` + +### Advanced Operations + +```python +# Time travel +historical_df = read_iceberg_table_at_timestamp(spark, "db", "table", "2024-01-01 00:00:00") + +# Snapshot queries +snapshot_df = read_iceberg_table_at_snapshot(spark, "db", "table", "snapshot_id") + +# Table history +history_df = query_iceberg_table_history(spark, "db", "table") + +# Table snapshots +snapshots_df = query_iceberg_table_snapshots(spark, "db", "table") +``` + +## ๐ŸŽฏ Event Formats + +### Simple Read Event + +```json +{ + "handler_type": "simple_reader", + "database": "analytics", + "table": "customer_data", + "limit": 100 +} +``` + +### Analytics Event + +```json +{ + "handler_type": "analytics", + "database": "sales", + "table": "transactions", + "filters": ["date >= '2024-01-01'", "amount > 100"], + "aggregations": { + "group_by": ["product_category"], + "metrics": ["sum(amount) as total_sales", "count(*) as transaction_count"] + } +} +``` + +### Time Travel Event + +```json +{ + "handler_type": "time_travel", + "database": "analytics", + "table": "customer_data", + "timestamp": "2024-01-15 10:00:00.000", + "compare_with_current": true +} +``` + +## ๐Ÿ” Error Handling + +```python +def lambda_handler(event, context): + spark = None + + try: + spark = create_iceberg_spark_session() + + # Your processing logic here + df = spark.read.format("iceberg").load("glue_catalog.db.table") + + return {'statusCode': 200, 'body': 'Success'} + + except Exception as e: + print(f"Error: {e}") + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + if spark: + spark.stop() +``` + +## ๐Ÿ“ˆ Performance Tips + +1. **Use appropriate filters** to reduce data volume +2. **Set proper memory allocation** (3008 MB recommended) +3. **Enable adaptive query execution** +4. **Use columnar operations** when possible +5. **Consider partitioning** for large tables + +## ๐Ÿ”— Integration Patterns + +### Event-Driven Processing + +```python +# Triggered by S3 events +def process_new_data(event, context): + for record in event['Records']: + bucket = record['s3']['bucket']['name'] + key = record['s3']['object']['key'] + + # Process new file and update Iceberg table + process_file(f"s3a://{bucket}/{key}") +``` + +### Scheduled Processing + +```python +# Triggered by CloudWatch Events +def daily_aggregation(event, context): + yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d') + + # Process yesterday's data + df = spark.read.format("iceberg") \ + .load("glue_catalog.raw.events") \ + .filter(f"date = '{yesterday}'") + + # Aggregate and save + aggregated = df.groupBy("category").agg(sum("amount")) + aggregated.write.format("iceberg").mode("append").save("glue_catalog.analytics.daily_summary") +``` + +## ๐Ÿ› ๏ธ Troubleshooting + +### Common Issues + +1. **"Table not found"** + - Check database and table names + - Verify Glue Catalog permissions + +2. **"Access Denied"** + - Check S3 permissions for table location + - Verify IAM role has Glue access + +3. **Memory errors** + - Increase Lambda memory allocation + - Add filters to reduce data volume + +4. **Timeout errors** + - Optimize queries with filters + - Consider breaking into smaller chunks + +### Debug Commands + +```python +# Check table exists +metadata = get_iceberg_table_metadata("db", "table", "us-east-1") +print(f"Table type: {metadata['Table']['Parameters'].get('table_type')}") + +# Check table location +location = get_iceberg_table_location(metadata) +print(f"Location: {location}") + +# Test S3 access +s3_client = boto3.client('s3') +response = s3_client.list_objects_v2(Bucket='bucket', Prefix='prefix', MaxKeys=1) +print(f"S3 accessible: {response.get('KeyCount', 0) >= 0}") +``` + +## ๐Ÿ“š Next Steps + +1. **Start with simple examples** and gradually add complexity +2. **Test with small datasets** before scaling up +3. **Monitor CloudWatch logs** for debugging +4. **Set up proper error handling** and retry logic +5. **Consider cost optimization** with appropriate filtering \ No newline at end of file diff --git a/examples/advanced-iceberg-features.py b/examples/advanced-iceberg-features.py new file mode 100644 index 0000000..0b06f8e --- /dev/null +++ b/examples/advanced-iceberg-features.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +""" +Advanced example: Using Iceberg's time travel and metadata features +""" + +import os +import sys +from datetime import datetime, timedelta +from pyspark.sql import SparkSession +from pyspark.sql.functions import * + +sys.path.append('/home/glue_functions') +from iceberg_glue_functions import ( + read_iceberg_table_at_timestamp, + read_iceberg_table_at_snapshot, + query_iceberg_table_history, + query_iceberg_table_snapshots, + get_iceberg_table_metadata +) + +def create_spark_session(): + """Create Spark session for advanced Iceberg features""" + + aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] + aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] + session_token = os.environ['AWS_SESSION_TOKEN'] + + return SparkSession.builder \ + .appName("Advanced-Iceberg-Features") \ + .master("local[*]") \ + .config("spark.driver.bindAddress", "127.0.0.1") \ + .config("spark.driver.memory", "5g") \ + .config("spark.executor.memory", "5g") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \ + .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \ + .config("spark.hadoop.fs.s3a.session.token", session_token) \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \ + .getOrCreate() + +def time_travel_example(): + """Example: Time travel queries""" + + print("โฐ Time Travel Example") + print("=" * 30) + + spark = create_spark_session() + database_name = "analytics" + table_name = "customer_transactions" + + try: + # 1. Read current data + current_df = spark.read.format("iceberg").load(f"glue_catalog.{database_name}.{table_name}") + print(f"Current data count: {current_df.count()}") + + # 2. Read data as it was yesterday + yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d %H:%M:%S.000') + + historical_df = spark.read.format("iceberg") \ + .option("as-of-timestamp", yesterday) \ + .load(f"glue_catalog.{database_name}.{table_name}") + + print(f"Yesterday's data count: {historical_df.count()}") + + # 3. Compare changes + current_ids = set([row.customer_id for row in current_df.select("customer_id").collect()]) + historical_ids = set([row.customer_id for row in historical_df.select("customer_id").collect()]) + + new_customers = current_ids - historical_ids + print(f"New customers since yesterday: {len(new_customers)}") + + return current_df, historical_df + + finally: + spark.stop() + +def metadata_analysis_example(): + """Example: Analyzing table metadata and history""" + + print("๐Ÿ“Š Metadata Analysis Example") + print("=" * 35) + + spark = create_spark_session() + database_name = "analytics" + table_name = "customer_transactions" + + try: + # 1. Get table history + print("๐Ÿ“š Table History:") + history_df = query_iceberg_table_history(spark, database_name, table_name) + + # 2. Get snapshots + print("๐Ÿ“ธ Table Snapshots:") + snapshots_df = query_iceberg_table_snapshots(spark, database_name, table_name) + + # 3. Analyze table evolution + print("๐Ÿ“ˆ Table Evolution Analysis:") + + # Count operations by type + operation_counts = history_df.groupBy("operation").count().collect() + for row in operation_counts: + print(f" {row.operation}: {row.count} times") + + # Show recent changes + print("๐Ÿ• Recent Changes (last 5):") + recent_changes = history_df.orderBy(desc("made_current_at")).limit(5) + recent_changes.select("made_current_at", "operation", "snapshot_id").show(truncate=False) + + return history_df, snapshots_df + + finally: + spark.stop() + +def snapshot_comparison_example(): + """Example: Compare data between specific snapshots""" + + print("๐Ÿ” Snapshot Comparison Example") + print("=" * 40) + + spark = create_spark_session() + database_name = "analytics" + table_name = "customer_transactions" + + try: + # Get available snapshots + snapshots_df = spark.read.format("iceberg") \ + .load(f"glue_catalog.{database_name}.{table_name}.snapshots") + + snapshots = snapshots_df.select("snapshot_id", "committed_at").orderBy(desc("committed_at")).collect() + + if len(snapshots) >= 2: + # Compare latest two snapshots + latest_snapshot = snapshots[0].snapshot_id + previous_snapshot = snapshots[1].snapshot_id + + print(f"Comparing snapshots:") + print(f" Latest: {latest_snapshot}") + print(f" Previous: {previous_snapshot}") + + # Read data from both snapshots + latest_df = spark.read.format("iceberg") \ + .option("snapshot-id", latest_snapshot) \ + .load(f"glue_catalog.{database_name}.{table_name}") + + previous_df = spark.read.format("iceberg") \ + .option("snapshot-id", previous_snapshot) \ + .load(f"glue_catalog.{database_name}.{table_name}") + + # Compare counts + print(f"Latest snapshot count: {latest_df.count()}") + print(f"Previous snapshot count: {previous_df.count()}") + + # Find differences (example for transactions table) + if "transaction_id" in latest_df.columns: + latest_ids = latest_df.select("transaction_id").distinct() + previous_ids = previous_df.select("transaction_id").distinct() + + new_transactions = latest_ids.subtract(previous_ids) + print(f"New transactions: {new_transactions.count()}") + + return snapshots_df + + finally: + spark.stop() + +if __name__ == "__main__": + # Run advanced examples + time_travel_example() + metadata_analysis_example() + snapshot_comparison_example() \ No newline at end of file diff --git a/examples/lambda-handler-templates.py b/examples/lambda-handler-templates.py new file mode 100644 index 0000000..1ab2d75 --- /dev/null +++ b/examples/lambda-handler-templates.py @@ -0,0 +1,431 @@ +#!/usr/bin/env python3 +""" +Lambda Handler Templates for different Iceberg use cases +""" + +import json +import os +import sys +from datetime import datetime +from pyspark.sql import SparkSession +from pyspark.sql.functions import * + +sys.path.append('/home/glue_functions') +from iceberg_glue_functions import ( + read_iceberg_table_with_spark, + read_iceberg_table_at_timestamp, + query_iceberg_table_history +) + +def create_spark_session(): + """Standard Spark session for Lambda""" + + aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] + aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] + session_token = os.environ['AWS_SESSION_TOKEN'] + + return SparkSession.builder \ + .appName("Lambda-Iceberg-Handler") \ + .master("local[*]") \ + .config("spark.driver.bindAddress", "127.0.0.1") \ + .config("spark.driver.memory", "5g") \ + .config("spark.executor.memory", "5g") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \ + .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \ + .config("spark.hadoop.fs.s3a.session.token", session_token) \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \ + .getOrCreate() + +# Template 1: Simple Data Reader +def simple_reader_handler(event, context): + """ + Template for simple Iceberg table reading + + Event format: + { + "database": "your_database", + "table": "your_table", + "limit": 100 + } + """ + + database = event.get('database') + table = event.get('table') + limit = event.get('limit', 100) + + if not database or not table: + return { + 'statusCode': 400, + 'body': json.dumps({'error': 'database and table are required'}) + } + + spark = create_spark_session() + + try: + # Read table + df = spark.read.format("iceberg").load(f"glue_catalog.{database}.{table}") + + # Get sample data + sample_data = df.limit(limit).collect() + + # Convert to JSON-serializable format + result = [] + for row in sample_data: + result.append(row.asDict()) + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'database': database, + 'table': table, + 'row_count': df.count(), + 'sample_data': result + }) + } + + except Exception as e: + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + spark.stop() + +# Template 2: Filtered Analytics +def analytics_handler(event, context): + """ + Template for analytical queries on Iceberg tables + + Event format: + { + "database": "analytics", + "table": "sales_data", + "filters": ["date >= '2024-01-01'", "region = 'US'"], + "aggregations": { + "group_by": ["product_category"], + "metrics": ["sum(sales_amount) as total_sales", "count(*) as transaction_count"] + } + } + """ + + database = event.get('database') + table = event.get('table') + filters = event.get('filters', []) + aggregations = event.get('aggregations', {}) + + spark = create_spark_session() + + try: + # Read table + df = spark.read.format("iceberg").load(f"glue_catalog.{database}.{table}") + + # Apply filters + for filter_condition in filters: + df = df.filter(filter_condition) + + # Apply aggregations if specified + if aggregations: + group_by = aggregations.get('group_by', []) + metrics = aggregations.get('metrics', []) + + if group_by and metrics: + df = df.groupBy(*group_by).agg(*[expr(metric) for metric in metrics]) + + # Collect results + results = df.collect() + + # Convert to JSON + result_data = [] + for row in results: + result_data.append(row.asDict()) + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'database': database, + 'table': table, + 'filters_applied': filters, + 'result_count': len(result_data), + 'results': result_data + }) + } + + except Exception as e: + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + spark.stop() + +# Template 3: Time Travel Query +def time_travel_handler(event, context): + """ + Template for time travel queries + + Event format: + { + "database": "analytics", + "table": "customer_data", + "timestamp": "2024-01-15 10:00:00.000", + "compare_with_current": true + } + """ + + database = event.get('database') + table = event.get('table') + timestamp = event.get('timestamp') + compare_with_current = event.get('compare_with_current', False) + + spark = create_spark_session() + + try: + table_identifier = f"glue_catalog.{database}.{table}" + + # Read historical data + historical_df = spark.read.format("iceberg") \ + .option("as-of-timestamp", timestamp) \ + .load(table_identifier) + + historical_count = historical_df.count() + + result = { + 'database': database, + 'table': table, + 'timestamp': timestamp, + 'historical_count': historical_count + } + + # Compare with current if requested + if compare_with_current: + current_df = spark.read.format("iceberg").load(table_identifier) + current_count = current_df.count() + + result.update({ + 'current_count': current_count, + 'difference': current_count - historical_count + }) + + return { + 'statusCode': 200, + 'body': json.dumps(result) + } + + except Exception as e: + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + spark.stop() + +# Template 4: Data Quality Checker +def data_quality_handler(event, context): + """ + Template for data quality checks on Iceberg tables + + Event format: + { + "database": "data_lake", + "table": "customer_records", + "checks": [ + {"type": "row_count", "min": 1000}, + {"type": "null_check", "columns": ["customer_id", "email"]}, + {"type": "duplicate_check", "columns": ["customer_id"]}, + {"type": "value_range", "column": "age", "min": 0, "max": 120} + ] + } + """ + + database = event.get('database') + table = event.get('table') + checks = event.get('checks', []) + + spark = create_spark_session() + + try: + # Read table + df = spark.read.format("iceberg").load(f"glue_catalog.{database}.{table}") + + quality_results = [] + + for check in checks: + check_result = {'type': check['type'], 'status': 'passed'} + + if check['type'] == 'row_count': + count = df.count() + min_count = check.get('min', 0) + max_count = check.get('max', float('inf')) + + if not (min_count <= count <= max_count): + check_result['status'] = 'failed' + check_result['message'] = f"Row count {count} outside range [{min_count}, {max_count}]" + else: + check_result['message'] = f"Row count: {count}" + + elif check['type'] == 'null_check': + columns = check['columns'] + null_counts = {} + + for column in columns: + null_count = df.filter(col(column).isNull()).count() + null_counts[column] = null_count + + if null_count > 0: + check_result['status'] = 'failed' + + check_result['null_counts'] = null_counts + + elif check['type'] == 'duplicate_check': + columns = check['columns'] + total_count = df.count() + distinct_count = df.select(*columns).distinct().count() + + if total_count != distinct_count: + check_result['status'] = 'failed' + check_result['message'] = f"Found {total_count - distinct_count} duplicates" + else: + check_result['message'] = "No duplicates found" + + elif check['type'] == 'value_range': + column = check['column'] + min_val = check.get('min') + max_val = check.get('max') + + out_of_range = df.filter( + (col(column) < min_val) | (col(column) > max_val) + ).count() + + if out_of_range > 0: + check_result['status'] = 'failed' + check_result['message'] = f"{out_of_range} values outside range [{min_val}, {max_val}]" + else: + check_result['message'] = f"All values within range [{min_val}, {max_val}]" + + quality_results.append(check_result) + + # Overall status + overall_status = 'passed' if all(r['status'] == 'passed' for r in quality_results) else 'failed' + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'database': database, + 'table': table, + 'overall_status': overall_status, + 'checks': quality_results + }) + } + + except Exception as e: + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + spark.stop() + +# Template 5: Event-Driven Processing +def event_driven_handler(event, context): + """ + Template for event-driven processing (e.g., S3 trigger) + + Event format (S3 event): + { + "Records": [ + { + "s3": { + "bucket": {"name": "my-bucket"}, + "object": {"key": "data/new-file.parquet"} + } + } + ], + "processing_config": { + "target_database": "processed_data", + "target_table": "aggregated_metrics" + } + } + """ + + spark = create_spark_session() + + try: + # Process S3 events + if 'Records' in event: + for record in event['Records']: + if 's3' in record: + bucket = record['s3']['bucket']['name'] + key = record['s3']['object']['key'] + + print(f"Processing file: s3://{bucket}/{key}") + + # Read the new file + file_df = spark.read.parquet(f"s3a://{bucket}/{key}") + + # Process the data (example: simple aggregation) + processed_df = file_df.groupBy("category") \ + .agg( + count("*").alias("record_count"), + sum("amount").alias("total_amount") + ) \ + .withColumn("processed_at", current_timestamp()) + + # Write to target Iceberg table + config = event.get('processing_config', {}) + target_db = config.get('target_database', 'processed_data') + target_table = config.get('target_table', 'processed_metrics') + + processed_df.write \ + .format("iceberg") \ + .mode("append") \ + .save(f"glue_catalog.{target_db}.{target_table}") + + print(f"Processed {processed_df.count()} records") + + return { + 'statusCode': 200, + 'body': json.dumps({ + 'message': 'Event processing completed', + 'processed_files': len(event.get('Records', [])) + }) + } + + except Exception as e: + return { + 'statusCode': 500, + 'body': json.dumps({'error': str(e)}) + } + finally: + spark.stop() + +# Main handler router +def lambda_handler(event, context): + """ + Main handler that routes to different templates based on event type + """ + + handler_type = event.get('handler_type', 'simple_reader') + + handlers = { + 'simple_reader': simple_reader_handler, + 'analytics': analytics_handler, + 'time_travel': time_travel_handler, + 'data_quality': data_quality_handler, + 'event_driven': event_driven_handler + } + + if handler_type in handlers: + return handlers[handler_type](event, context) + else: + return { + 'statusCode': 400, + 'body': json.dumps({ + 'error': f'Unknown handler type: {handler_type}', + 'available_types': list(handlers.keys()) + }) + } \ No newline at end of file diff --git a/lambda_function.py b/lambda_function.py new file mode 100644 index 0000000..5a4c845 --- /dev/null +++ b/lambda_function.py @@ -0,0 +1,108 @@ +import json +import boto3 +import logging + +# Set up logging +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +def lambda_handler(event, context): + """ + Simple Lambda function to test Glue Catalog access for Iceberg tables + """ + + logger.info("๐Ÿš€ Starting Iceberg Glue Catalog test") + + # Get parameters from event or environment + database_name = event.get('DATABASE_NAME', 'iceberg_test_db') + table_name = event.get('TABLE_NAME', 'sample_customers') + + logger.info(f"๐Ÿ“‹ Testing table: {database_name}.{table_name}") + + try: + # Initialize Glue client + glue_client = boto3.client('glue') + + # Test 1: Get database + logger.info("1๏ธโƒฃ Testing database access...") + db_response = glue_client.get_database(Name=database_name) + logger.info(f"โœ… Database found: {db_response['Database']['Name']}") + + # Test 2: Get table + logger.info("2๏ธโƒฃ Testing table access...") + table_response = glue_client.get_table(DatabaseName=database_name, Name=table_name) + table = table_response['Table'] + + logger.info(f"โœ… Table found: {table['Name']}") + + # Test 3: Check if it's an Iceberg table + logger.info("3๏ธโƒฃ Validating Iceberg table...") + table_type = table.get('Parameters', {}).get('table_type', '').upper() + + if table_type == 'ICEBERG': + logger.info("โœ… Confirmed: This is an Iceberg table") + else: + logger.warning(f"โš ๏ธ Warning: Table type is '{table_type}', not 'ICEBERG'") + + # Test 4: Get table schema + logger.info("4๏ธโƒฃ Checking table schema...") + storage_descriptor = table.get('StorageDescriptor', {}) + columns = storage_descriptor.get('Columns', []) + location = storage_descriptor.get('Location', 'N/A') + + logger.info(f"๐Ÿ“ Location: {location}") + logger.info(f"๐Ÿ“Š Column count: {len(columns)}") + + # Test 5: Check S3 access + logger.info("5๏ธโƒฃ Testing S3 location access...") + if location and location.startswith('s3://'): + s3_client = boto3.client('s3') + + # Parse S3 location + location_parts = location.replace('s3://', '').split('/', 1) + bucket_name = location_parts[0] + prefix = location_parts[1] if len(location_parts) > 1 else '' + + try: + response = s3_client.list_objects_v2( + Bucket=bucket_name, + Prefix=prefix, + MaxKeys=10 + ) + + object_count = response.get('KeyCount', 0) + logger.info(f"๐Ÿ“ S3 bucket accessible: {bucket_name}") + logger.info(f"๐Ÿ“„ Objects found: {object_count}") + + except Exception as s3_error: + logger.error(f"โŒ S3 access failed: {s3_error}") + + # Prepare response + result = { + 'statusCode': 200, + 'body': json.dumps({ + 'message': 'Iceberg Glue Catalog test completed successfully', + 'database': database_name, + 'table': table_name, + 'table_type': table_type, + 'location': location, + 'column_count': len(columns), + 'columns': [{'name': col['Name'], 'type': col['Type']} for col in columns] + }) + } + + logger.info("๐ŸŽ‰ Test completed successfully!") + return result + + except Exception as e: + logger.error(f"โŒ Test failed: {str(e)}") + + error_result = { + 'statusCode': 500, + 'body': json.dumps({ + 'error': str(e), + 'message': 'Iceberg Glue Catalog test failed' + }) + } + + return error_result \ No newline at end of file diff --git a/libs/glue_functions/__init__.py b/libs/glue_functions/__init__.py index 19d722f..3729099 100644 --- a/libs/glue_functions/__init__.py +++ b/libs/glue_functions/__init__.py @@ -1 +1,12 @@ from .glue_catalog_functions import get_table, build_schema_for_table, query_table +from .iceberg_glue_functions import ( + get_iceberg_table_metadata, + get_iceberg_table_location, + get_iceberg_table_properties, + read_iceberg_table_with_spark, + read_iceberg_table_by_location, + query_iceberg_table_history, + query_iceberg_table_snapshots, + read_iceberg_table_at_timestamp, + read_iceberg_table_at_snapshot +) diff --git a/libs/glue_functions/iceberg_glue_functions.py b/libs/glue_functions/iceberg_glue_functions.py new file mode 100644 index 0000000..f1f77ed --- /dev/null +++ b/libs/glue_functions/iceberg_glue_functions.py @@ -0,0 +1,281 @@ +import logging +import sys +import boto3 +from typing import Dict, Optional, Any + +from pyspark.sql import SparkSession +from pyspark.sql.types import * + +logger = logging.getLogger() +logger.setLevel(logging.INFO) +handler = logging.StreamHandler(sys.stdout) +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) + + +def get_iceberg_table_metadata(db_name: str, table_name: str, aws_region: str) -> Optional[Dict[str, Any]]: + """ + Fetches Iceberg table metadata from AWS Glue Catalog. + + Parameters: + - db_name (str): The name of the database in Glue Catalog. + - table_name (str): The name of the Iceberg table in Glue Database. + - aws_region (str): AWS region for Glue client. + + Returns: + - dict: The response from the Glue `get_table` API call, or None if error. + """ + try: + glue = boto3.client('glue', region_name=aws_region) + response = glue.get_table(DatabaseName=db_name, Name=table_name) + + # Validate that this is an Iceberg table + table_params = response.get('Table', {}).get('Parameters', {}) + table_type = table_params.get('table_type', '').upper() + + if table_type != 'ICEBERG': + logger.warning(f"Table {table_name} is not an Iceberg table (type: {table_type})") + + return response + except Exception as e: + logger.error(f"Error fetching Iceberg table {table_name} from database {db_name}: {e}") + return None + + +def get_iceberg_table_location(glue_table: Dict[str, Any]) -> Optional[str]: + """ + Extracts the S3 location of an Iceberg table from Glue metadata. + + Parameters: + - glue_table (dict): The table metadata from AWS Glue. + + Returns: + - str: The S3 location of the Iceberg table, or None if not found. + """ + try: + if not glue_table or 'Table' not in glue_table: + return None + + # For Iceberg tables, location is in StorageDescriptor + storage_descriptor = glue_table['Table'].get('StorageDescriptor', {}) + location = storage_descriptor.get('Location', '') + + if location: + # Convert s3:// to s3a:// for Spark compatibility + if location.startswith("s3://"): + location = location.replace("s3://", "s3a://") + return location + + return None + except Exception as e: + logger.error(f"Error extracting table location: {e}") + return None + + +def get_iceberg_table_properties(glue_table: Dict[str, Any]) -> Dict[str, str]: + """ + Extracts Iceberg-specific table properties from Glue metadata. + + Parameters: + - glue_table (dict): The table metadata from AWS Glue. + + Returns: + - dict: Dictionary of Iceberg table properties. + """ + try: + if not glue_table or 'Table' not in glue_table: + return {} + + table_params = glue_table['Table'].get('Parameters', {}) + + # Extract Iceberg-specific properties + iceberg_props = {} + for key, value in table_params.items(): + if key.startswith('iceberg.') or key in ['table_type', 'metadata_location']: + iceberg_props[key] = value + + return iceberg_props + except Exception as e: + logger.error(f"Error extracting Iceberg table properties: {e}") + return {} + + +def read_iceberg_table_with_spark(spark: SparkSession, db_name: str, table_name: str, + catalog_name: str = "glue_catalog"): + """ + Reads an Iceberg table using Spark with Glue Catalog integration. + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - db_name (str): The database name in Glue Catalog. + - table_name (str): The table name in Glue Catalog. + - catalog_name (str): The catalog name configured in Spark (default: "glue_catalog"). + + Returns: + - DataFrame: Spark DataFrame containing the Iceberg table data. + """ + try: + table_identifier = f"{catalog_name}.{db_name}.{table_name}" + logger.info(f"Reading Iceberg table: {table_identifier}") + + df = spark.read.format("iceberg").load(table_identifier) + + logger.info(f"Successfully loaded Iceberg table with {df.count()} rows") + logger.info("Table schema:") + df.printSchema() + + return df + except Exception as e: + logger.error(f"Error reading Iceberg table {table_identifier}: {e}") + raise + + +def read_iceberg_table_by_location(spark: SparkSession, table_location: str): + """ + Reads an Iceberg table directly from its S3 location. + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - table_location (str): The S3 location of the Iceberg table. + + Returns: + - DataFrame: Spark DataFrame containing the Iceberg table data. + """ + try: + # Ensure s3a:// protocol + if table_location.startswith("s3://"): + table_location = table_location.replace("s3://", "s3a://") + + logger.info(f"Reading Iceberg table from location: {table_location}") + + df = spark.read.format("iceberg").load(table_location) + + logger.info(f"Successfully loaded Iceberg table with {df.count()} rows") + logger.info("Table schema:") + df.printSchema() + + return df + except Exception as e: + logger.error(f"Error reading Iceberg table from location {table_location}: {e}") + raise + + +def query_iceberg_table_history(spark: SparkSession, db_name: str, table_name: str, + catalog_name: str = "glue_catalog"): + """ + Queries the history of an Iceberg table to see snapshots and changes. + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - db_name (str): The database name in Glue Catalog. + - table_name (str): The table name in Glue Catalog. + - catalog_name (str): The catalog name configured in Spark. + + Returns: + - DataFrame: DataFrame containing the table history. + """ + try: + table_identifier = f"{catalog_name}.{db_name}.{table_name}" + logger.info(f"Querying history for Iceberg table: {table_identifier}") + + history_df = spark.read.format("iceberg").load(f"{table_identifier}.history") + + logger.info("Table history:") + history_df.show(truncate=False) + + return history_df + except Exception as e: + logger.error(f"Error querying table history for {table_identifier}: {e}") + raise + + +def query_iceberg_table_snapshots(spark: SparkSession, db_name: str, table_name: str, + catalog_name: str = "glue_catalog"): + """ + Queries the snapshots of an Iceberg table. + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - db_name (str): The database name in Glue Catalog. + - table_name (str): The table name in Glue Catalog. + - catalog_name (str): The catalog name configured in Spark. + + Returns: + - DataFrame: DataFrame containing the table snapshots. + """ + try: + table_identifier = f"{catalog_name}.{db_name}.{table_name}" + logger.info(f"Querying snapshots for Iceberg table: {table_identifier}") + + snapshots_df = spark.read.format("iceberg").load(f"{table_identifier}.snapshots") + + logger.info("Table snapshots:") + snapshots_df.show(truncate=False) + + return snapshots_df + except Exception as e: + logger.error(f"Error querying table snapshots for {table_identifier}: {e}") + raise + + +def read_iceberg_table_at_timestamp(spark: SparkSession, db_name: str, table_name: str, + timestamp: str, catalog_name: str = "glue_catalog"): + """ + Reads an Iceberg table as it existed at a specific timestamp (time travel). + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - db_name (str): The database name in Glue Catalog. + - table_name (str): The table name in Glue Catalog. + - timestamp (str): Timestamp in format 'YYYY-MM-DD HH:MM:SS.SSS' + - catalog_name (str): The catalog name configured in Spark. + + Returns: + - DataFrame: Spark DataFrame containing the table data at the specified timestamp. + """ + try: + table_identifier = f"{catalog_name}.{db_name}.{table_name}" + logger.info(f"Reading Iceberg table {table_identifier} at timestamp: {timestamp}") + + df = spark.read.format("iceberg") \ + .option("as-of-timestamp", timestamp) \ + .load(table_identifier) + + logger.info(f"Successfully loaded table at timestamp with {df.count()} rows") + + return df + except Exception as e: + logger.error(f"Error reading table at timestamp {timestamp}: {e}") + raise + + +def read_iceberg_table_at_snapshot(spark: SparkSession, db_name: str, table_name: str, + snapshot_id: str, catalog_name: str = "glue_catalog"): + """ + Reads an Iceberg table at a specific snapshot ID (time travel). + + Parameters: + - spark (SparkSession): The active SparkSession configured for Iceberg. + - db_name (str): The database name in Glue Catalog. + - table_name (str): The table name in Glue Catalog. + - snapshot_id (str): The snapshot ID to read from. + - catalog_name (str): The catalog name configured in Spark. + + Returns: + - DataFrame: Spark DataFrame containing the table data at the specified snapshot. + """ + try: + table_identifier = f"{catalog_name}.{db_name}.{table_name}" + logger.info(f"Reading Iceberg table {table_identifier} at snapshot: {snapshot_id}") + + df = spark.read.format("iceberg") \ + .option("snapshot-id", snapshot_id) \ + .load(table_identifier) + + logger.info(f"Successfully loaded table at snapshot with {df.count()} rows") + + return df + except Exception as e: + logger.error(f"Error reading table at snapshot {snapshot_id}: {e}") + raise \ No newline at end of file diff --git a/spark-scripts/.DS_Store b/spark-scripts/.DS_Store deleted file mode 100644 index c927a13..0000000 Binary files a/spark-scripts/.DS_Store and /dev/null differ diff --git a/sparkLambdaHandler.py b/sparkLambdaHandler.py index 990b44f..2dc57ca 100644 --- a/sparkLambdaHandler.py +++ b/sparkLambdaHandler.py @@ -1,73 +1,309 @@ -import boto3 -import sys -import os -import subprocess -import logging +#!/usr/bin/env python3 +""" +Production Lambda function for reading Iceberg tables from Glue Catalog +This is the actual handler that will run in the Lambda container +""" + import json +import logging +import os +import sys +from datetime import datetime + +from pyspark.sql import SparkSession +from pyspark.sql.functions import * + +# Add glue functions to path +sys.path.append('/home/glue_functions') # Set up logging logger = logging.getLogger() logger.setLevel(logging.INFO) -handler = logging.StreamHandler(sys.stdout) -formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -handler.setFormatter(formatter) -logger.addHandler(handler) -def s3_script_download(s3_bucket_script: str,input_script: str)-> None: - """ - """ - s3_client = boto3.resource("s3") +def create_iceberg_spark_session(): + """Create Spark session optimized for Lambda with Iceberg support""" + + aws_access_key_id = os.environ['AWS_ACCESS_KEY_ID'] + aws_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY'] + session_token = os.environ['AWS_SESSION_TOKEN'] + aws_region = os.environ.get('AWS_REGION', 'us-east-1') + + logger.info("Creating Spark session with Iceberg configuration...") + + spark = SparkSession.builder \ + .appName("Lambda-Iceberg-Reader") \ + .master("local[*]") \ + .config("spark.driver.bindAddress", "127.0.0.1") \ + .config("spark.driver.memory", "5g") \ + .config("spark.executor.memory", "5g") \ + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ + .config("spark.sql.adaptive.enabled", "true") \ + .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .config("spark.sql.catalog.glue_catalog.glue.region", aws_region) \ + .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \ + .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \ + .config("spark.hadoop.fs.s3a.session.token", session_token) \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \ + .getOrCreate() + + logger.info("Spark session created successfully") + return spark +def read_iceberg_table(spark, database_name, table_name, limit=None, filters=None): + """Read Iceberg table with optional filters and limit""" + + table_identifier = f"glue_catalog.{database_name}.{table_name}" + logger.info(f"Reading Iceberg table: {table_identifier}") + try: - logger.info(f'Now downloading script {input_script} in {s3_bucket_script} to /tmp') - s3_client.Bucket(s3_bucket_script).download_file(input_script, "/tmp/spark_script.py") - - except Exception as e : - logger.error(f'Error downloading the script {input_script} in {s3_bucket_script}: {e}') - else: - logger.info(f'Script {input_script} successfully downloaded to /tmp') - - + # Read the table + df = spark.read.format("iceberg").load(table_identifier) + + # Apply filters if provided + if filters: + for filter_condition in filters: + df = df.filter(filter_condition) + logger.info(f"Applied filter: {filter_condition}") + + # Apply limit if provided + if limit: + df = df.limit(limit) + + return df + + except Exception as e: + logger.error(f"Error reading table {table_identifier}: {e}") + raise -def spark_submit(s3_bucket_script: str,input_script: str, event: dict)-> None: - """ - Submits a local Spark script using spark-submit. - """ - # Set the environment variables for the Spark application - # pyspark_submit_args = event.get('PYSPARK_SUBMIT_ARGS', '') - # # Source input and output if available in event - # input_path = event.get('INPUT_PATH','') - # output_path = event.get('OUTPUT_PATH', '') - - for key,value in event.items(): - os.environ[key] = value - # Run the spark-submit command on the local copy of teh script +def get_table_analytics(df): + """Get basic analytics from the DataFrame""" + + logger.info("Computing table analytics...") + try: - logger.info(f'Spark-Submitting the Spark script {input_script} from {s3_bucket_script}') - subprocess.run(["spark-submit", "/tmp/spark_script.py", "--event", json.dumps(event)], check=True, env=os.environ) - except Exception as e : - logger.error(f'Error Spark-Submit with exception: {e}') - raise e - else: - logger.info(f'Script {input_script} successfully submitted') + # Basic stats + total_count = df.count() + + # Get numeric columns for analytics + numeric_columns = [] + for field in df.schema.fields: + if field.dataType.typeName() in ['integer', 'long', 'double', 'decimal', 'float']: + numeric_columns.append(field.name) + + analytics = { + 'total_rows': total_count, + 'columns': len(df.columns), + 'column_names': df.columns + } + + # Add numeric analytics if available + if numeric_columns: + for col_name in numeric_columns: + try: + stats = df.agg( + avg(col_name).alias('avg'), + min(col_name).alias('min'), + max(col_name).alias('max') + ).collect()[0] + + analytics[f'{col_name}_stats'] = { + 'avg': float(stats['avg']) if stats['avg'] else None, + 'min': float(stats['min']) if stats['min'] else None, + 'max': float(stats['max']) if stats['max'] else None + } + except Exception as e: + logger.warning(f"Could not compute stats for {col_name}: {e}") + + return analytics + + except Exception as e: + logger.error(f"Error computing analytics: {e}") + return {'total_rows': 0, 'error': str(e)} -def lambda_handler(event, context): +def convert_rows_to_json(rows): + """Convert Spark rows to JSON-serializable format""" + + results = [] + for row in rows: + row_dict = {} + for field in row.__fields__: + value = getattr(row, field) + + # Handle different data types + if value is None: + row_dict[field] = None + elif hasattr(value, 'isoformat'): # datetime + row_dict[field] = value.isoformat() + elif str(type(value)) == "": + row_dict[field] = float(value) + elif str(type(value)) == "": + row_dict[field] = value.isoformat() + else: + row_dict[field] = value + + results.append(row_dict) + + return results +def lambda_handler(event, context): """ - Lambda_handler is called when the AWS Lambda - is triggered. The function is downloading file - from Amazon S3 location and spark submitting - the script in AWS Lambda + Main Lambda handler for Iceberg table operations + + Event format: + { + "operation": "read_table", + "database": "iceberg_test_db", + "table": "sample_customers", + "limit": 10, + "filters": ["total_spent > 300"], + "include_analytics": true + } """ + + logger.info("๐Ÿš€ Starting Iceberg Lambda handler") + logger.info(f"Event: {json.dumps(event)}") + + # Parse event parameters + operation = event.get('operation', 'read_table') + database_name = event.get('database', os.environ.get('DATABASE_NAME', 'iceberg_test_db')) + table_name = event.get('table', os.environ.get('TABLE_NAME', 'sample_customers')) + limit = event.get('limit', 10) + filters = event.get('filters', []) + include_analytics = event.get('include_analytics', True) + + logger.info(f"Operation: {operation}") + logger.info(f"Target table: {database_name}.{table_name}") + + spark = None + + try: + # Create Spark session + spark = create_iceberg_spark_session() + + if operation == 'read_table': + # Read table data + df = read_iceberg_table(spark, database_name, table_name, limit, filters) + + # Get sample data + sample_rows = df.collect() + sample_data = convert_rows_to_json(sample_rows) + + # Prepare response + response_body = { + 'message': 'Successfully read Iceberg table', + 'database': database_name, + 'table': table_name, + 'operation': operation, + 'filters_applied': filters, + 'sample_data': sample_data, + 'sample_count': len(sample_data), + 'timestamp': datetime.now().isoformat() + } + + # Add analytics if requested + if include_analytics: + # Read full table for analytics (without limit) + full_df = read_iceberg_table(spark, database_name, table_name, filters=filters) + analytics = get_table_analytics(full_df) + response_body['analytics'] = analytics + + logger.info(f"โœ… Successfully processed {len(sample_data)} rows") + + return { + 'statusCode': 200, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps(response_body) + } + + elif operation == 'table_info': + # Get table information only + df = read_iceberg_table(spark, database_name, table_name, limit=1) + + schema_info = [] + for field in df.schema.fields: + schema_info.append({ + 'name': field.name, + 'type': str(field.dataType), + 'nullable': field.nullable + }) + + # Get full count + full_df = read_iceberg_table(spark, database_name, table_name) + total_count = full_df.count() + + response_body = { + 'message': 'Table information retrieved', + 'database': database_name, + 'table': table_name, + 'total_rows': total_count, + 'schema': schema_info, + 'timestamp': datetime.now().isoformat() + } + + return { + 'statusCode': 200, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps(response_body) + } + + else: + return { + 'statusCode': 400, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps({ + 'error': f'Unknown operation: {operation}', + 'supported_operations': ['read_table', 'table_info'] + }) + } + + except Exception as e: + logger.error(f"โŒ Lambda execution failed: {str(e)}") + + return { + 'statusCode': 500, + 'headers': { + 'Content-Type': 'application/json' + }, + 'body': json.dumps({ + 'error': str(e), + 'message': 'Iceberg table operation failed', + 'timestamp': datetime.now().isoformat() + }) + } + + finally: + if spark: + logger.info("๐Ÿ”ง Stopping Spark session") + spark.stop() - logger.info("******************Start AWS Lambda Handler************") - s3_bucket_script = os.environ['SCRIPT_BUCKET'] - input_script = os.environ['SPARK_SCRIPT'] - os.environ['INPUT_PATH'] = event.get('INPUT_PATH','') - os.environ['OUTPUT_PATH'] = event.get('OUTPUT_PATH', '') - - s3_script_download(s3_bucket_script,input_script) +# For testing locally +if __name__ == "__main__": + # Test event + test_event = { + "operation": "read_table", + "database": "iceberg_test_db", + "table": "sample_customers", + "limit": 5, + "include_analytics": True + } + + # Mock context + class MockContext: + def __init__(self): + self.function_name = "test-iceberg-reader" + self.memory_limit_in_mb = 3008 + self.invoked_function_arn = "arn:aws:lambda:us-east-1:123456789012:function:test" - # Set the environment variables for the Spark application - spark_submit(s3_bucket_script,input_script, event) - + result = lambda_handler(test_event, MockContext()) + print(json.dumps(result, indent=2)) \ No newline at end of file diff --git a/test-infrastructure/cleanup-test-environment.sh b/test-infrastructure/cleanup-test-environment.sh new file mode 100755 index 0000000..4828b46 --- /dev/null +++ b/test-infrastructure/cleanup-test-environment.sh @@ -0,0 +1,106 @@ +#!/bin/bash + +# Cleanup Test Environment for Iceberg Glue Catalog Integration + +set -e + +# Configuration +STACK_NAME="spark-lambda-iceberg-test" +AWS_REGION="us-east-1" +LAMBDA_FUNCTION_NAME="spark-iceberg-test" +ECR_REPO_NAME="sparkonlambda-iceberg" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +echo -e "${BLUE}๐Ÿงน Starting Test Environment Cleanup${NC}" +echo "========================================" + +# Get AWS Account ID +AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) +BUCKET_NAME="spark-lambda-iceberg-test-${AWS_ACCOUNT_ID}" + +echo -e "${BLUE}๐Ÿ“‹ Configuration:${NC}" +echo " Stack Name: $STACK_NAME" +echo " AWS Region: $AWS_REGION" +echo " AWS Account: $AWS_ACCOUNT_ID" +echo " S3 Bucket: $BUCKET_NAME" +echo " Lambda Function: $LAMBDA_FUNCTION_NAME" +echo " ECR Repository: $ECR_REPO_NAME" +echo "" + +# Step 1: Delete Lambda function +echo -e "${YELLOW}โšก Step 1: Deleting Lambda function...${NC}" +if aws lambda get-function --function-name $LAMBDA_FUNCTION_NAME --region $AWS_REGION > /dev/null 2>&1; then + aws lambda delete-function --function-name $LAMBDA_FUNCTION_NAME --region $AWS_REGION + echo -e "${GREEN}โœ… Lambda function deleted${NC}" +else + echo -e "${YELLOW}โš ๏ธ Lambda function not found${NC}" +fi + +# Step 2: Empty and delete S3 bucket contents +echo -e "${YELLOW}๐Ÿ—‘๏ธ Step 2: Emptying S3 bucket...${NC}" +if aws s3 ls s3://$BUCKET_NAME --region $AWS_REGION > /dev/null 2>&1; then + aws s3 rm s3://$BUCKET_NAME --recursive --region $AWS_REGION + echo -e "${GREEN}โœ… S3 bucket emptied${NC}" +else + echo -e "${YELLOW}โš ๏ธ S3 bucket not found or already empty${NC}" +fi + +# Step 3: Delete ECR repository +echo -e "${YELLOW}๐Ÿณ Step 3: Deleting ECR repository...${NC}" +if aws ecr describe-repositories --repository-names $ECR_REPO_NAME --region $AWS_REGION > /dev/null 2>&1; then + aws ecr delete-repository --repository-name $ECR_REPO_NAME --force --region $AWS_REGION + echo -e "${GREEN}โœ… ECR repository deleted${NC}" +else + echo -e "${YELLOW}โš ๏ธ ECR repository not found${NC}" +fi + +# Step 4: Delete CloudFormation stack +echo -e "${YELLOW}๐Ÿ“ฆ Step 4: Deleting CloudFormation stack...${NC}" +if aws cloudformation describe-stacks --stack-name $STACK_NAME --region $AWS_REGION > /dev/null 2>&1; then + aws cloudformation delete-stack --stack-name $STACK_NAME --region $AWS_REGION + + echo -e "${YELLOW}โณ Waiting for stack deletion to complete...${NC}" + aws cloudformation wait stack-delete-complete --stack-name $STACK_NAME --region $AWS_REGION + + if [ $? -eq 0 ]; then + echo -e "${GREEN}โœ… CloudFormation stack deleted successfully${NC}" + else + echo -e "${RED}โŒ CloudFormation stack deletion failed or timed out${NC}" + echo -e "${YELLOW}โš ๏ธ Please check the AWS Console for stack status${NC}" + fi +else + echo -e "${YELLOW}โš ๏ธ CloudFormation stack not found${NC}" +fi + +# Step 5: Clean up local Docker images (optional) +echo -e "${YELLOW}๐Ÿณ Step 5: Cleaning up local Docker images...${NC}" +if docker images | grep -q $ECR_REPO_NAME; then + docker rmi $(docker images | grep $ECR_REPO_NAME | awk '{print $3}') 2>/dev/null || true + echo -e "${GREEN}โœ… Local Docker images cleaned up${NC}" +else + echo -e "${YELLOW}โš ๏ธ No local Docker images found${NC}" +fi + +echo "" +echo -e "${BLUE}๐ŸŽ‰ Cleanup Complete!${NC}" +echo "====================" +echo "" +echo -e "${GREEN}โœ… All test resources have been cleaned up${NC}" +echo "" +echo -e "${YELLOW}๐Ÿ“‹ Resources Removed:${NC}" +echo " โ€ข Lambda Function: $LAMBDA_FUNCTION_NAME" +echo " โ€ข S3 Bucket: $BUCKET_NAME (emptied)" +echo " โ€ข ECR Repository: $ECR_REPO_NAME" +echo " โ€ข CloudFormation Stack: $STACK_NAME" +echo " โ€ข IAM Role: SparkLambdaIcebergRole-$STACK_NAME" +echo " โ€ข Glue Database: iceberg_test_db" +echo " โ€ข Glue Table: sample_customers" +echo "" +echo -e "${BLUE}๐Ÿ’ก Note: The S3 bucket itself will be deleted by CloudFormation${NC}" +echo -e "${BLUE}๐Ÿ’ก Note: Check AWS Console to verify all resources are removed${NC}" \ No newline at end of file diff --git a/test-infrastructure/create-sample-iceberg-table.py b/test-infrastructure/create-sample-iceberg-table.py new file mode 100644 index 0000000..f7423f0 --- /dev/null +++ b/test-infrastructure/create-sample-iceberg-table.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +""" +Script to create a sample Iceberg table with test data for Lambda testing. +This script should be run locally or on an EC2 instance with Spark and Iceberg configured. +""" + +import os +import sys +from datetime import datetime, date +from decimal import Decimal + +from pyspark.sql import SparkSession +from pyspark.sql.types import * +from pyspark.sql.functions import * + +def create_spark_session(bucket_name, aws_region='us-east-1'): + """Create Spark session configured for Iceberg with Glue Catalog""" + + spark = SparkSession.builder \ + .appName("CreateSampleIcebergTable") \ + .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ + .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \ + .config("spark.sql.catalog.glue_catalog.warehouse", f"s3a://{bucket_name}/iceberg-warehouse/") \ + .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ + .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \ + .config("spark.sql.catalog.glue_catalog.glue.region", aws_region) \ + .getOrCreate() + + return spark + +def create_sample_data(spark): + """Create sample customer data""" + + # Define schema + schema = StructType([ + StructField("customer_id", LongType(), False), + StructField("customer_name", StringType(), False), + StructField("email", StringType(), False), + StructField("registration_date", DateType(), False), + StructField("total_orders", IntegerType(), False), + StructField("total_spent", DecimalType(10, 2), False), + StructField("last_updated", TimestampType(), False) + ]) + + # Sample data + sample_data = [ + (1, "John Doe", "john.doe@email.com", date(2023, 1, 15), 5, Decimal("299.99"), datetime(2024, 1, 15, 10, 30, 0)), + (2, "Jane Smith", "jane.smith@email.com", date(2023, 2, 20), 8, Decimal("599.50"), datetime(2024, 1, 16, 14, 45, 0)), + (3, "Bob Johnson", "bob.johnson@email.com", date(2023, 3, 10), 3, Decimal("149.75"), datetime(2024, 1, 17, 9, 15, 0)), + (4, "Alice Brown", "alice.brown@email.com", date(2023, 4, 5), 12, Decimal("899.25"), datetime(2024, 1, 18, 16, 20, 0)), + (5, "Charlie Wilson", "charlie.wilson@email.com", date(2023, 5, 12), 7, Decimal("449.80"), datetime(2024, 1, 19, 11, 10, 0)), + (6, "Diana Davis", "diana.davis@email.com", date(2023, 6, 8), 15, Decimal("1299.99"), datetime(2024, 1, 20, 13, 25, 0)), + (7, "Frank Miller", "frank.miller@email.com", date(2023, 7, 22), 4, Decimal("199.95"), datetime(2024, 1, 21, 8, 40, 0)), + (8, "Grace Lee", "grace.lee@email.com", date(2023, 8, 18), 9, Decimal("679.30"), datetime(2024, 1, 22, 15, 55, 0)), + (9, "Henry Taylor", "henry.taylor@email.com", date(2023, 9, 3), 6, Decimal("359.60"), datetime(2024, 1, 23, 12, 5, 0)), + (10, "Ivy Anderson", "ivy.anderson@email.com", date(2023, 10, 14), 11, Decimal("799.85"), datetime(2024, 1, 24, 17, 30, 0)) + ] + + df = spark.createDataFrame(sample_data, schema) + return df + +def create_iceberg_table(spark, database_name, table_name, df): + """Create Iceberg table in Glue Catalog""" + + table_identifier = f"glue_catalog.{database_name}.{table_name}" + + print(f"Creating Iceberg table: {table_identifier}") + + # Write DataFrame as Iceberg table + df.writeTo(table_identifier) \ + .using("iceberg") \ + .tableProperty("format-version", "2") \ + .tableProperty("write.parquet.compression-codec", "snappy") \ + .create() + + print(f"Successfully created Iceberg table: {table_identifier}") + + # Verify the table + verify_df = spark.read.format("iceberg").load(table_identifier) + print(f"Table verification - Row count: {verify_df.count()}") + verify_df.show() + + return table_identifier + +def add_more_data(spark, table_identifier): + """Add more data to demonstrate table evolution""" + + print(f"Adding more data to: {table_identifier}") + + # Additional data + additional_schema = StructType([ + StructField("customer_id", LongType(), False), + StructField("customer_name", StringType(), False), + StructField("email", StringType(), False), + StructField("registration_date", DateType(), False), + StructField("total_orders", IntegerType(), False), + StructField("total_spent", DecimalType(10, 2), False), + StructField("last_updated", TimestampType(), False) + ]) + + additional_data = [ + (11, "Kevin White", "kevin.white@email.com", date(2023, 11, 5), 2, Decimal("99.99"), datetime(2024, 1, 25, 10, 0, 0)), + (12, "Laura Green", "laura.green@email.com", date(2023, 12, 1), 13, Decimal("999.75"), datetime(2024, 1, 26, 14, 30, 0)), + (13, "Mike Black", "mike.black@email.com", date(2024, 1, 10), 1, Decimal("49.99"), datetime(2024, 1, 27, 9, 45, 0)) + ] + + additional_df = spark.createDataFrame(additional_data, additional_schema) + + # Append to existing table + additional_df.writeTo(table_identifier).using("iceberg").append() + + print("Successfully added more data") + + # Verify updated table + updated_df = spark.read.format("iceberg").load(table_identifier) + print(f"Updated table - Row count: {updated_df.count()}") + +def main(): + if len(sys.argv) != 3: + print("Usage: python create-sample-iceberg-table.py ") + print("Example: python create-sample-iceberg-table.py my-test-bucket-123456789 iceberg_test_db") + sys.exit(1) + + bucket_name = sys.argv[1] + database_name = sys.argv[2] + table_name = "sample_customers" + + print(f"Creating sample Iceberg table in bucket: {bucket_name}") + print(f"Database: {database_name}, Table: {table_name}") + + # Create Spark session + spark = create_spark_session(bucket_name) + + try: + # Create sample data + df = create_sample_data(spark) + + # Create Iceberg table + table_identifier = create_iceberg_table(spark, database_name, table_name, df) + + # Add more data to demonstrate table evolution + add_more_data(spark, table_identifier) + + print("\n" + "="*50) + print("SUCCESS: Sample Iceberg table created successfully!") + print(f"Table: {table_identifier}") + print(f"Location: s3://{bucket_name}/iceberg-warehouse/{database_name}/{table_name}/") + print("="*50) + + except Exception as e: + print(f"ERROR: Failed to create sample table: {e}") + raise + finally: + spark.stop() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test-infrastructure/deploy-test-environment.sh b/test-infrastructure/deploy-test-environment.sh new file mode 100755 index 0000000..24e66eb --- /dev/null +++ b/test-infrastructure/deploy-test-environment.sh @@ -0,0 +1,198 @@ +#!/bin/bash + +# Deploy Test Environment for Iceberg Glue Catalog Integration +# This script sets up the complete test environment + +set -e + +# Configuration +STACK_NAME="spark-lambda-iceberg-test" +AWS_REGION="us-east-1" +BUCKET_PREFIX="spark-lambda-iceberg-test" +DATABASE_NAME="iceberg_test_db" + +# Colors for output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +echo -e "${BLUE}๐Ÿš€ Starting Iceberg Test Environment Deployment${NC}" +echo "==================================================" + +# Check if AWS CLI is configured +if ! aws sts get-caller-identity > /dev/null 2>&1; then + echo -e "${RED}โŒ AWS CLI not configured. Please run 'aws configure' first.${NC}" + exit 1 +fi + +# Get AWS Account ID +AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) +BUCKET_NAME="${BUCKET_PREFIX}-${AWS_ACCOUNT_ID}" + +echo -e "${BLUE}๐Ÿ“‹ Configuration:${NC}" +echo " Stack Name: $STACK_NAME" +echo " AWS Region: $AWS_REGION" +echo " AWS Account: $AWS_ACCOUNT_ID" +echo " S3 Bucket: $BUCKET_NAME" +echo " Database: $DATABASE_NAME" +echo "" + +# Step 1: Deploy CloudFormation stack +echo -e "${YELLOW}๐Ÿ“ฆ Step 1: Deploying CloudFormation stack...${NC}" +aws cloudformation deploy \ + --template-file test-infrastructure/iceberg-test-setup.yaml \ + --stack-name $STACK_NAME \ + --parameter-overrides \ + BucketName=$BUCKET_PREFIX \ + DatabaseName=$DATABASE_NAME \ + --capabilities CAPABILITY_NAMED_IAM \ + --region $AWS_REGION + +if [ $? -eq 0 ]; then + echo -e "${GREEN}โœ… CloudFormation stack deployed successfully${NC}" +else + echo -e "${RED}โŒ CloudFormation deployment failed${NC}" + exit 1 +fi + +# Step 2: Upload test scripts to S3 +echo -e "${YELLOW}๐Ÿ“ค Step 2: Uploading test scripts to S3...${NC}" + +# Create scripts directory in S3 +aws s3 cp spark-scripts/test-iceberg-integration.py s3://$BUCKET_NAME/scripts/ --region $AWS_REGION +aws s3 cp spark-scripts/simple-iceberg-reader.py s3://$BUCKET_NAME/scripts/ --region $AWS_REGION + +echo -e "${GREEN}โœ… Test scripts uploaded to S3${NC}" + +# Step 3: Build and push Docker image (if ECR repo exists) +echo -e "${YELLOW}๐Ÿณ Step 3: Checking for Docker image...${NC}" + +# Check if ECR repository exists +ECR_REPO_NAME="sparkonlambda-iceberg" +if aws ecr describe-repositories --repository-names $ECR_REPO_NAME --region $AWS_REGION > /dev/null 2>&1; then + echo -e "${GREEN}โœ… ECR repository exists: $ECR_REPO_NAME${NC}" + + # Get ECR login + aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com + + # Build and push image + echo -e "${YELLOW}๐Ÿ”จ Building Docker image with Iceberg support...${NC}" + docker build --build-arg FRAMEWORK=ICEBERG -t $ECR_REPO_NAME . + + docker tag $ECR_REPO_NAME:latest ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/$ECR_REPO_NAME:latest + docker push ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/$ECR_REPO_NAME:latest + + echo -e "${GREEN}โœ… Docker image built and pushed${NC}" +else + echo -e "${YELLOW}โš ๏ธ ECR repository not found. Creating it...${NC}" + aws ecr create-repository --repository-name $ECR_REPO_NAME --region $AWS_REGION + + # Get ECR login + aws ecr get-login-password --region $AWS_REGION | docker login --username AWS --password-stdin ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com + + # Build and push image + echo -e "${YELLOW}๐Ÿ”จ Building Docker image with Iceberg support...${NC}" + docker build --build-arg FRAMEWORK=ICEBERG -t $ECR_REPO_NAME . + + docker tag $ECR_REPO_NAME:latest ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/$ECR_REPO_NAME:latest + docker push ${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/$ECR_REPO_NAME:latest + + echo -e "${GREEN}โœ… ECR repository created and Docker image pushed${NC}" +fi + +# Step 4: Get outputs from CloudFormation +echo -e "${YELLOW}๐Ÿ“‹ Step 4: Getting CloudFormation outputs...${NC}" + +LAMBDA_ROLE_ARN=$(aws cloudformation describe-stacks \ + --stack-name $STACK_NAME \ + --query 'Stacks[0].Outputs[?OutputKey==`LambdaRoleArn`].OutputValue' \ + --output text \ + --region $AWS_REGION) + +echo -e "${GREEN}โœ… Lambda Role ARN: $LAMBDA_ROLE_ARN${NC}" + +# Step 5: Create Lambda function +echo -e "${YELLOW}โšก Step 5: Creating Lambda function...${NC}" + +LAMBDA_FUNCTION_NAME="spark-iceberg-test" +IMAGE_URI="${AWS_ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/$ECR_REPO_NAME:latest" + +# Check if Lambda function exists +if aws lambda get-function --function-name $LAMBDA_FUNCTION_NAME --region $AWS_REGION > /dev/null 2>&1; then + echo -e "${YELLOW}โš ๏ธ Lambda function exists. Updating...${NC}" + aws lambda update-function-code \ + --function-name $LAMBDA_FUNCTION_NAME \ + --image-uri $IMAGE_URI \ + --region $AWS_REGION + + aws lambda update-function-configuration \ + --function-name $LAMBDA_FUNCTION_NAME \ + --role $LAMBDA_ROLE_ARN \ + --timeout 900 \ + --memory-size 3008 \ + --environment Variables="{ + SCRIPT_BUCKET=$BUCKET_NAME, + SPARK_SCRIPT=test-iceberg-integration.py, + DATABASE_NAME=$DATABASE_NAME, + TABLE_NAME=sample_customers, + AWS_REGION=$AWS_REGION + }" \ + --region $AWS_REGION +else + echo -e "${YELLOW}๐Ÿ†• Creating new Lambda function...${NC}" + aws lambda create-function \ + --function-name $LAMBDA_FUNCTION_NAME \ + --role $LAMBDA_ROLE_ARN \ + --code ImageUri=$IMAGE_URI \ + --package-type Image \ + --timeout 900 \ + --memory-size 3008 \ + --environment Variables="{ + SCRIPT_BUCKET=$BUCKET_NAME, + SPARK_SCRIPT=test-iceberg-integration.py, + DATABASE_NAME=$DATABASE_NAME, + TABLE_NAME=sample_customers, + AWS_REGION=$AWS_REGION + }" \ + --region $AWS_REGION +fi + +echo -e "${GREEN}โœ… Lambda function created/updated: $LAMBDA_FUNCTION_NAME${NC}" + +# Step 6: Display next steps +echo "" +echo -e "${BLUE}๐ŸŽ‰ Test Environment Deployed Successfully!${NC}" +echo "==================================================" +echo "" +echo -e "${YELLOW}๐Ÿ“‹ Next Steps:${NC}" +echo "" +echo "1. Create sample Iceberg table (run on EC2 or local machine with Spark):" +echo " python test-infrastructure/create-sample-iceberg-table.py $BUCKET_NAME $DATABASE_NAME" +echo "" +echo "2. Test the Lambda function:" +echo " aws lambda invoke \\" +echo " --function-name $LAMBDA_FUNCTION_NAME \\" +echo " --payload '{\"DATABASE_NAME\":\"$DATABASE_NAME\",\"TABLE_NAME\":\"sample_customers\",\"TEST_TYPE\":\"comprehensive\"}' \\" +echo " --region $AWS_REGION \\" +echo " response.json" +echo "" +echo "3. View the results:" +echo " cat response.json" +echo "" +echo -e "${YELLOW}๐Ÿ“Š Resources Created:${NC}" +echo " โ€ข S3 Bucket: $BUCKET_NAME" +echo " โ€ข Glue Database: $DATABASE_NAME" +echo " โ€ข Lambda Function: $LAMBDA_FUNCTION_NAME" +echo " โ€ข ECR Repository: $ECR_REPO_NAME" +echo " โ€ข IAM Role: SparkLambdaIcebergRole-$STACK_NAME" +echo "" +echo -e "${YELLOW}๐Ÿ”— Useful Commands:${NC}" +echo " โ€ข View CloudWatch logs:" +echo " aws logs tail /aws/lambda/$LAMBDA_FUNCTION_NAME --follow --region $AWS_REGION" +echo "" +echo " โ€ข Clean up resources:" +echo " ./test-infrastructure/cleanup-test-environment.sh" +echo "" +echo -e "${GREEN}โœ… Deployment Complete!${NC}" \ No newline at end of file diff --git a/test-infrastructure/iceberg-test-setup.yaml b/test-infrastructure/iceberg-test-setup.yaml new file mode 100644 index 0000000..554fb99 --- /dev/null +++ b/test-infrastructure/iceberg-test-setup.yaml @@ -0,0 +1,140 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: 'Test infrastructure for Iceberg Glue Catalog integration with Spark on Lambda' + +Parameters: + BucketName: + Type: String + Default: 'spark-lambda-iceberg-test' + Description: 'S3 bucket name for test data and scripts' + + DatabaseName: + Type: String + Default: 'iceberg_test_db' + Description: 'Glue database name for Iceberg tables' + +Resources: + # S3 Bucket for test data and scripts + TestDataBucket: + Type: AWS::S3::Bucket + Properties: + BucketName: !Sub '${BucketName}-${AWS::AccountId}' + VersioningConfiguration: + Status: Enabled + PublicAccessBlockConfiguration: + BlockPublicAcls: true + BlockPublicPolicy: true + IgnorePublicAcls: true + RestrictPublicBuckets: true + + # Glue Database + IcebergTestDatabase: + Type: AWS::Glue::Database + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseInput: + Name: !Ref DatabaseName + Description: 'Test database for Iceberg tables' + + # Sample Iceberg Table + SampleIcebergTable: + Type: AWS::Glue::Table + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseName: !Ref IcebergTestDatabase + TableInput: + Name: 'sample_customers' + Description: 'Sample Iceberg table for testing' + TableType: 'EXTERNAL_TABLE' + Parameters: + table_type: 'ICEBERG' + metadata_location: !Sub 's3://${BucketName}-${AWS::AccountId}/iceberg-tables/sample_customers/metadata/00000-409702ba-4735-4645-8f14-09537cc0b2c8.metadata.json' + StorageDescriptor: + Location: !Sub 's3://${BucketName}-${AWS::AccountId}/iceberg-tables/sample_customers/' + InputFormat: 'org.apache.iceberg.mr.hive.HiveIcebergInputFormat' + OutputFormat: 'org.apache.iceberg.mr.hive.HiveIcebergOutputFormat' + SerdeInfo: + SerializationLibrary: 'org.apache.iceberg.mr.hive.HiveIcebergSerDe' + Columns: + - Name: 'customer_id' + Type: 'bigint' + - Name: 'customer_name' + Type: 'string' + - Name: 'email' + Type: 'string' + - Name: 'registration_date' + Type: 'date' + - Name: 'total_orders' + Type: 'int' + - Name: 'total_spent' + Type: 'decimal(10,2)' + - Name: 'last_updated' + Type: 'timestamp' + + # Lambda Execution Role + LambdaExecutionRole: + Type: AWS::IAM::Role + Properties: + RoleName: !Sub 'SparkLambdaIcebergRole-${AWS::StackName}' + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + Policies: + - PolicyName: GlueCatalogAccess + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - glue:GetDatabase + - glue:GetTable + - glue:GetTables + - glue:GetPartition + - glue:GetPartitions + Resource: + - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog' + - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${DatabaseName}' + - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${DatabaseName}/*' + - PolicyName: S3Access + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:ListBucket + - s3:GetBucketLocation + - s3:PutObject + Resource: + - !GetAtt TestDataBucket.Arn + - !Sub '${TestDataBucket.Arn}/*' + +Outputs: + BucketName: + Description: 'S3 bucket name for test data' + Value: !Ref TestDataBucket + Export: + Name: !Sub '${AWS::StackName}-BucketName' + + DatabaseName: + Description: 'Glue database name' + Value: !Ref IcebergTestDatabase + Export: + Name: !Sub '${AWS::StackName}-DatabaseName' + + TableName: + Description: 'Iceberg table name' + Value: 'sample_customers' + Export: + Name: !Sub '${AWS::StackName}-TableName' + + LambdaRoleArn: + Description: 'Lambda execution role ARN' + Value: !GetAtt LambdaExecutionRole.Arn + Export: + Name: !Sub '${AWS::StackName}-LambdaRoleArn' \ No newline at end of file