Skip to content

Commit 538f1a9

Browse files
committed
Merge branch 'fix/auto-run-doc-section-crawler' into 'develop'
Fix/auto run doc section crawler See merge request genaiic-reusable-assets/engagement-artifacts/genaiic-idp-accelerator!237
2 parents 03d576f + a5f5dee commit 538f1a9

File tree

7 files changed

+672
-12
lines changed

7 files changed

+672
-12
lines changed

CHANGELOG.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,26 @@ SPDX-License-Identifier: MIT-0
55

66
## [Unreleased]
77

8+
## [0.3.10]
9+
10+
### Added
11+
12+
- **Automatic Glue Table Creation for Document Sections**
13+
- Added automatic creation of AWS Glue tables for each document section type (classification) during processing
14+
- Tables are created dynamically when new section types are encountered, eliminating manual table creation
15+
- Consistent lowercase naming convention for tables ensures compatibility with case-sensitive S3 paths
16+
- Tables are configured with partition projection for efficient date-based queries without manual partition management
17+
- Automatic schema evolution - tables update when new fields are detected in extraction results
18+
19+
20+
## [0.3.9]
21+
822
### Added
923
- **Optional Permissions Boundary Support for Enterprise Deployments**
1024
- Added `PermissionsBoundaryArn` parameter to all CloudFormation templates for organizations with Service Control Policies (SCPs) requiring permissions boundaries
1125
- Comprehensive support for both explicit IAM roles and implicit roles created by AWS SAM functions and statemachines`
1226
- Conditional implementation ensures backward compatibility - when no permissions boundary is provided, roles deploy normally
1327

14-
## [0.3.8]
15-
1628
### Added
1729
- IDP Configuration and Prompting Best Practices documentation [doc](./docs/idp-configuration-best-practices.md)
1830

docs/reporting-database.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,21 @@ The metering table is particularly valuable for:
107107

108108
## Document Sections Tables
109109

110-
The document sections tables store the actual extracted data from document sections in a structured format suitable for analytics. These tables are automatically discovered by AWS Glue Crawler and are organized by section type (classification).
110+
The document sections tables store the actual extracted data from document sections in a structured format suitable for analytics. These tables are automatically created when new section types are encountered during document processing, eliminating the need for manual table creation.
111+
112+
### Automatic Table Creation
113+
114+
When a document is processed and a new section type (classification) is detected, the system automatically:
115+
1. Creates a new Glue table for that section type (e.g., `document_sections_invoice`, `document_sections_receipt`, `document_sections_w2`)
116+
2. Configures the table with appropriate schema based on the extracted data
117+
3. Sets up partition projection for efficient date-based queries
118+
4. Updates the table schema if new fields are detected in subsequent documents
119+
120+
**Important:** Section type names are normalized to lowercase for consistency with case-sensitive S3 paths. For example, a section classified as "W2" will create a table named `document_sections_w2` with data stored in `document_sections/w2/`.
111121

112122
### Dynamic Section Tables
113123

114-
Document sections are stored in dynamically created tables based on the section classification. Each section type gets its own table (e.g., `document_sections_invoice`, `document_sections_receipt`, `document_sections_bank_statement`, etc.) with the following characteristics:
124+
Document sections are stored in dynamically created tables based on the section classification. Each section type gets its own table with the following characteristics:
115125

116126
**Common Metadata Columns:**
117127
| Column | Type | Description |

lib/idp_common_pkg/idp_common/reporting/README.md

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,21 @@ The reporting module is designed to work seamlessly with AWS Glue and Amazon Ath
199199

200200
### Automatic Table Creation
201201

202-
AWS Glue tables are automatically created via CloudFormation with the following features:
202+
The reporting module provides two types of automatic table creation:
203203

204-
- **Predefined Tables**: `document_evaluations`, `section_evaluations`, `attribute_evaluations`, and `metering` tables
205-
- **Dynamic Tables**: Document sections tables are automatically discovered by AWS Glue Crawler
206-
- **Partition Projection**: All tables use partition projection for efficient querying
204+
#### Predefined Tables (CloudFormation)
205+
- **Evaluation Tables**: `document_evaluations`, `section_evaluations`, `attribute_evaluations`
206+
- **Metering Table**: `metering`
207+
- Created during stack deployment via CloudFormation
208+
209+
#### Dynamic Section Tables (Runtime)
210+
When processing documents with new section types, the `SaveReportingData` class automatically:
211+
- **Creates New Tables**: Generates a Glue table for each unique section type (e.g., `document_sections_invoice`, `document_sections_w2`)
212+
- **Updates Schemas**: Adds new columns when new fields are detected in extraction results
213+
- **Configures Partitions**: Sets up partition projection for efficient date-based queries
214+
- **Normalizes Names**: Converts section types to lowercase for S3 path consistency (e.g., "W2" → "w2")
215+
216+
This automatic table creation eliminates manual table management and ensures data is immediately queryable in Athena.
207217

208218
### Partition Projection Configuration
209219

lib/idp_common_pkg/idp_common/reporting/save_reporting_data.py

Lines changed: 180 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,18 @@ class SaveReportingData:
3232
to a reporting bucket in Parquet format for analytics.
3333
"""
3434

35-
def __init__(self, reporting_bucket: str):
35+
def __init__(self, reporting_bucket: str, database_name: str = None):
3636
"""
3737
Initialize the SaveReportingData class.
3838
3939
Args:
4040
reporting_bucket: S3 bucket name for reporting data
41+
database_name: Glue database name for creating tables (optional)
4142
"""
4243
self.reporting_bucket = reporting_bucket
44+
self.database_name = database_name
4345
self.s3_client = boto3.client("s3")
46+
self.glue_client = boto3.client("glue") if database_name else None
4447

4548
def _serialize_value(self, value: Any) -> str:
4649
"""
@@ -305,6 +308,165 @@ def _sanitize_records_for_schema(
305308

306309
return sanitized_records
307310

311+
def _convert_schema_to_glue_columns(
312+
self, schema: pa.Schema
313+
) -> List[Dict[str, str]]:
314+
"""
315+
Convert PyArrow schema to Glue table columns format.
316+
317+
Args:
318+
schema: PyArrow schema
319+
320+
Returns:
321+
List of column definitions for Glue
322+
"""
323+
columns = []
324+
for field in schema:
325+
# Map PyArrow types to Glue/Hive types
326+
if field.type == pa.string():
327+
glue_type = "string"
328+
elif field.type == pa.bool_():
329+
glue_type = "boolean"
330+
elif field.type == pa.int64():
331+
glue_type = "bigint"
332+
elif field.type == pa.int32():
333+
glue_type = "int"
334+
elif field.type == pa.float64():
335+
glue_type = "double"
336+
elif field.type == pa.float32():
337+
glue_type = "float"
338+
elif field.type == pa.timestamp("ms"):
339+
glue_type = "timestamp"
340+
else:
341+
# Default to string for unknown types
342+
glue_type = "string"
343+
344+
columns.append({"Name": field.name, "Type": glue_type})
345+
346+
return columns
347+
348+
def _create_or_update_glue_table(
349+
self, section_type: str, schema: pa.Schema, new_section_created: bool = False
350+
) -> bool:
351+
"""
352+
Create or update a Glue table for a document section type.
353+
354+
Args:
355+
section_type: The document section type (e.g., 'invoice', 'receipt')
356+
schema: PyArrow schema for the table
357+
new_section_created: Whether this is a new section type
358+
359+
Returns:
360+
True if table was created or updated, False otherwise
361+
"""
362+
if not self.glue_client or not self.database_name:
363+
logger.debug(
364+
"Glue client or database name not configured, skipping table creation"
365+
)
366+
return False
367+
368+
# Escape section_type to make it table-name-safe
369+
escaped_section_type = re.sub(r"[/\\:*?\"<>|]", "_", section_type.lower())
370+
table_name = f"document_sections_{escaped_section_type}"
371+
372+
# Convert schema to Glue columns
373+
columns = self._convert_schema_to_glue_columns(schema)
374+
375+
# Table input for create/update
376+
table_input = {
377+
"Name": table_name,
378+
"Description": f"Document sections table for type: {section_type}",
379+
"StorageDescriptor": {
380+
"Columns": columns,
381+
"Location": f"s3://{self.reporting_bucket}/document_sections/{escaped_section_type}/",
382+
"InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
383+
"OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
384+
"Compressed": True,
385+
"SerdeInfo": {
386+
"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
387+
},
388+
},
389+
"PartitionKeys": [{"Name": "date", "Type": "string"}],
390+
"TableType": "EXTERNAL_TABLE",
391+
"Parameters": {
392+
"classification": "parquet",
393+
"typeOfData": "file",
394+
"projection.enabled": "true",
395+
"projection.date.type": "date",
396+
"projection.date.format": "yyyy-MM-dd",
397+
"projection.date.range": "2024-01-01,2030-12-31",
398+
"projection.date.interval": "1",
399+
"projection.date.interval.unit": "DAYS",
400+
"storage.location.template": f"s3://{self.reporting_bucket}/document_sections/{escaped_section_type}/date=${{date}}/",
401+
},
402+
}
403+
404+
try:
405+
# Try to get the existing table
406+
existing_table = self.glue_client.get_table(
407+
DatabaseName=self.database_name, Name=table_name
408+
)
409+
410+
# Check if schema has changed significantly
411+
existing_columns = (
412+
existing_table.get("Table", {})
413+
.get("StorageDescriptor", {})
414+
.get("Columns", [])
415+
)
416+
existing_column_names = {col["Name"] for col in existing_columns}
417+
new_column_names = {col["Name"] for col in columns}
418+
419+
# If there are new columns, update the table
420+
if new_column_names - existing_column_names:
421+
logger.info(f"Updating Glue table {table_name} with new columns")
422+
self.glue_client.update_table(
423+
DatabaseName=self.database_name, TableInput=table_input
424+
)
425+
return True
426+
else:
427+
logger.debug(
428+
f"Glue table {table_name} already exists with current schema"
429+
)
430+
return False
431+
432+
except Exception as get_table_error:
433+
# Check if it's an EntityNotFoundException or similar (table doesn't exist)
434+
error_str = str(get_table_error)
435+
if (
436+
"EntityNotFoundException" in error_str
437+
or "not found" in error_str.lower()
438+
):
439+
# Table doesn't exist, create it
440+
logger.info(
441+
f"Creating new Glue table {table_name} for section type: {section_type}"
442+
)
443+
try:
444+
self.glue_client.create_table(
445+
DatabaseName=self.database_name, TableInput=table_input
446+
)
447+
logger.info(f"Successfully created Glue table {table_name}")
448+
return True
449+
except Exception as create_error:
450+
# Check if it's an AlreadyExistsException
451+
if "AlreadyExistsException" in str(create_error):
452+
logger.debug(
453+
f"Glue table {table_name} already exists (race condition)"
454+
)
455+
return False
456+
logger.error(
457+
f"Error creating Glue table {table_name}: {str(create_error)}"
458+
)
459+
return False
460+
else:
461+
# Some other error occurred
462+
logger.error(
463+
f"Error checking Glue table {table_name}: {str(get_table_error)}"
464+
)
465+
return False
466+
except Exception as e:
467+
logger.error(f"Error checking/updating Glue table {table_name}: {str(e)}")
468+
return False
469+
308470
def save(self, document: Document, data_to_save: List[str]) -> List[Dict[str, Any]]:
309471
"""
310472
Save document data based on the data_to_save list.
@@ -747,6 +909,7 @@ def save_document_sections(self, document: Document) -> Optional[Dict[str, Any]]
747909
sections_processed = 0
748910
sections_with_errors = 0
749911
total_records_saved = 0
912+
section_types_processed = set() # Track unique section types
750913

751914
logger.info(
752915
f"Processing {len(document.sections)} sections for document {document_id}"
@@ -845,8 +1008,10 @@ def save_document_sections(self, document: Document) -> Optional[Dict[str, Any]]
8451008
section_type = (
8461009
section.classification if section.classification else "unknown"
8471010
)
848-
# Escape section_type to make it filesystem-safe
849-
escaped_section_type = re.sub(r"[/\\:*?\"<>|]", "_", section_type)
1011+
# Escape section_type to make it filesystem-safe and lowercase for consistency
1012+
escaped_section_type = re.sub(
1013+
r"[/\\:*?\"<>|]", "_", section_type.lower()
1014+
)
8501015

8511016
s3_key = (
8521017
f"document_sections/"
@@ -866,6 +1031,18 @@ def save_document_sections(self, document: Document) -> Optional[Dict[str, Any]]
8661031
f"to s3://{self.reporting_bucket}/{s3_key}"
8671032
)
8681033

1034+
# Track this section type and create/update Glue table if needed
1035+
if section_type not in section_types_processed:
1036+
section_types_processed.add(section_type)
1037+
# Try to create or update the Glue table for this section type
1038+
table_created = self._create_or_update_glue_table(
1039+
section_type, schema
1040+
)
1041+
if table_created:
1042+
logger.info(
1043+
f"Created/updated Glue table for section type: {section_type}"
1044+
)
1045+
8691046
except Exception as e:
8701047
logger.error(f"Error processing section {section.section_id}: {str(e)}")
8711048
sections_with_errors += 1

0 commit comments

Comments
 (0)