diff --git a/.changes/unreleased/Features-20250308-193830.yaml b/.changes/unreleased/Features-20250308-193830.yaml new file mode 100644 index 00000000000..d5e35a2a064 --- /dev/null +++ b/.changes/unreleased/Features-20250308-193830.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Allow configuring the format of EventTimeFilter +time: 2025-03-08T19:38:30.907199+09:00 +custom: + Author: logicoffee + Issue: "11331" diff --git a/core/dbt/artifacts/resources/v1/config.py b/core/dbt/artifacts/resources/v1/config.py index f253f091b89..2032761dba7 100644 --- a/core/dbt/artifacts/resources/v1/config.py +++ b/core/dbt/artifacts/resources/v1/config.py @@ -126,6 +126,7 @@ class NodeConfig(NodeAndTestConfig): metadata=MergeBehavior.Update.meta(), ) event_time: Any = None + event_time_filter_format: Any = None concurrent_batches: Any = None def __post_init__(self): diff --git a/core/dbt/artifacts/resources/v1/source_definition.py b/core/dbt/artifacts/resources/v1/source_definition.py index f827d114942..8e057e59cc6 100644 --- a/core/dbt/artifacts/resources/v1/source_definition.py +++ b/core/dbt/artifacts/resources/v1/source_definition.py @@ -20,6 +20,7 @@ class SourceConfig(BaseConfig): enabled: bool = True event_time: Any = None + event_time_filter_format: Any = None @dataclass diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 6b2c26a4f25..2eb4db546ad 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -258,7 +258,6 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF and target.config.event_time and isinstance(self.model, (ModelNode, SnapshotNode)) ): - # Handling of microbatch models if ( isinstance(self.model, ModelNode) @@ -281,6 +280,7 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF ) event_time_filter = EventTimeFilter( field_name=target.config.event_time, + filter_format=target.config.event_time_filter_format, start=start, end=end, ) @@ -289,6 +289,7 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF else: event_time_filter = EventTimeFilter( field_name=target.config.event_time, + filter_format=target.config.event_time_filter_format, start=self.model.batch.event_time_start, end=self.model.batch.event_time_end, ) @@ -297,6 +298,7 @@ def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeF elif sample_mode: event_time_filter = EventTimeFilter( field_name=target.config.event_time, + filter_format=target.config.event_time_filter_format, start=self.config.args.sample.start, end=self.config.args.sample.end, )