From 88df956165d175e9bfb8982c542574df2338bc0b Mon Sep 17 00:00:00 2001 From: cgpoh Date: Mon, 21 Apr 2025 16:58:38 +0800 Subject: [PATCH 01/10] rfc: spark structured streaming sink and source platform --- ...00-spark-streaming-sink-source-platform.md | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) create mode 100644 active/000-spark-streaming-sink-source-platform.md diff --git a/active/000-spark-streaming-sink-source-platform.md b/active/000-spark-streaming-sink-source-platform.md new file mode 100644 index 0000000..24cdf30 --- /dev/null +++ b/active/000-spark-streaming-sink-source-platform.md @@ -0,0 +1,71 @@ +- Start Date: 2025-04-21 +- RFC PR: (after opening the RFC PR, update this with a link to it and update the file name) +- Discussion Issue: (GitHub issue this was discussed in before the RFC, if any) +- Implementation PR(s): (leave this empty) + +# Spark Streaming Sink/Source Platform + +## Summary + +Allows configuration of Spark structured streaming sink and source platform. + +## Motivation + +The motivation for this RFC stems from an issue encountered while capturing data lineage using DataHub with Spark Structured Streaming. In the DataHub [code](https://github.com/datahub-project/datahub/blob/master/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java#L145), a regular expression matcher expects sources to be prefixed with identifiers like Kafka[…] to determine the data platform. However, since Iceberg tables lack such a prefix (e.g., iceberg[…]), DataHub fails to recognize the platform and thus shows no lineage. + +## Requirements + +- The proposal should be able to identify the data platform of a source or sink based on the configuration provided in the Spark job. + +## Detailed design + +It is proposed to introduce two new Spark configurations: `spark.datahub.streaming.source.platform` for specifying the streaming source platform, and `spark.datahub.streaming.sink.platform` for the streaming sink. Within the `generateUrnFromStreamingDescription` method in `SparkStreamingEventToDatahub.java`, these configurations will serve as fallbacks in cases where the regular expression matcher fails to extract the platform. If the configurations are set, their values will be used to determine the data platform. An example implementation is shown below: +```java +public static Optional generateUrnFromStreamingDescription( + String description, SparkLineageConf sparkLineageConf, boolean isSink) { + String pattern = "(.*?)\\[(.*)]"; + Pattern r = Pattern.compile(pattern); + Matcher m = r.matcher(description); + if (m.find()) { + String namespace = m.group(1); + String platform = getDatahubPlatform(namespace); + String path = m.group(2); + log.debug("Streaming description Platform: {}, Path: {}", platform, path); + if (platform.equals(KAFKA_PLATFORM)) { + path = getKafkaTopicFromPath(m.group(2)); + } else if (platform.equals(FILE_PLATFORM) || platform.equals(DELTA_LAKE_PLATFORM)) { + try { + DatasetUrn urn = + HdfsPathDataset.create(new URI(path), sparkLineageConf.getOpenLineageConf()).urn(); + return Optional.of(urn); + } catch (InstantiationException e) { + return Optional.empty(); + } catch (URISyntaxException e) { + log.error("Failed to parse path {}", path, e); + return Optional.empty(); + } + } + return Optional.of( + new DatasetUrn( + new DataPlatformUrn(platform), + path, + sparkLineageConf.getOpenLineageConf().getFabricType())); + } else { + if (sparkLineageConf.getOpenLineageConf().getStreamingSinkPlatform() != null && isSink) { + return generateUrnFromStreamingDescription( + description, + sparkLineageConf, + sparkLineageConf.getOpenLineageConf().getStreamingSinkPlatform() + ); + } else if (sparkLineageConf.getOpenLineageConf().getStreamingSourcePlatform() != null && !isSink) { + return generateUrnFromStreamingDescription( + description, + sparkLineageConf, + sparkLineageConf.getOpenLineageConf().getStreamingSourcePlatform() + ); + } else { + return Optional.empty(); + } + } +} +``` \ No newline at end of file From eec512513d80ac0d550ba9a40866ae4223136568 Mon Sep 17 00:00:00 2001 From: cgpoh Date: Mon, 21 Apr 2025 17:01:36 +0800 Subject: [PATCH 02/10] chore: update RFC PR link --- active/000-spark-streaming-sink-source-platform.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/active/000-spark-streaming-sink-source-platform.md b/active/000-spark-streaming-sink-source-platform.md index 24cdf30..04a920f 100644 --- a/active/000-spark-streaming-sink-source-platform.md +++ b/active/000-spark-streaming-sink-source-platform.md @@ -1,5 +1,5 @@ - Start Date: 2025-04-21 -- RFC PR: (after opening the RFC PR, update this with a link to it and update the file name) +- RFC PR: https://github.com/datahub-project/rfcs/pull/11 - Discussion Issue: (GitHub issue this was discussed in before the RFC, if any) - Implementation PR(s): (leave this empty) From 442394251bff725493e4a853bf9cad55b9307e45 Mon Sep 17 00:00:00 2001 From: cgpoh Date: Mon, 21 Apr 2025 17:53:58 +0800 Subject: [PATCH 03/10] chore: update example implementation --- active/000-spark-streaming-sink-source-platform.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/active/000-spark-streaming-sink-source-platform.md b/active/000-spark-streaming-sink-source-platform.md index 04a920f..ba929c1 100644 --- a/active/000-spark-streaming-sink-source-platform.md +++ b/active/000-spark-streaming-sink-source-platform.md @@ -21,6 +21,12 @@ The motivation for this RFC stems from an issue encountered while capturing data It is proposed to introduce two new Spark configurations: `spark.datahub.streaming.source.platform` for specifying the streaming source platform, and `spark.datahub.streaming.sink.platform` for the streaming sink. Within the `generateUrnFromStreamingDescription` method in `SparkStreamingEventToDatahub.java`, these configurations will serve as fallbacks in cases where the regular expression matcher fails to extract the platform. If the configurations are set, their values will be used to determine the data platform. An example implementation is shown below: ```java + public static Optional generateUrnFromStreamingDescription( + String description, SparkLineageConf sparkLineageConf) { + return SparkStreamingEventToDatahub.generateUrnFromStreamingDescription( + description, sparkLineageConf, false); +} + public static Optional generateUrnFromStreamingDescription( String description, SparkLineageConf sparkLineageConf, boolean isSink) { String pattern = "(.*?)\\[(.*)]"; From d1e49a49003caaa30b89625812d85d7725fd5f5c Mon Sep 17 00:00:00 2001 From: cgpoh Date: Mon, 21 Apr 2025 18:02:29 +0800 Subject: [PATCH 04/10] chore: update example --- ...000-spark-streaming-sink-source-platform.md | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/active/000-spark-streaming-sink-source-platform.md b/active/000-spark-streaming-sink-source-platform.md index ba929c1..cd5d42a 100644 --- a/active/000-spark-streaming-sink-source-platform.md +++ b/active/000-spark-streaming-sink-source-platform.md @@ -1,5 +1,5 @@ - Start Date: 2025-04-21 -- RFC PR: https://github.com/datahub-project/rfcs/pull/11 +- RFC PR: [#11](https://github.com/datahub-project/rfcs/pull/11) - Discussion Issue: (GitHub issue this was discussed in before the RFC, if any) - Implementation PR(s): (leave this empty) @@ -26,7 +26,8 @@ It is proposed to introduce two new Spark configurations: `spark.datahub.streami return SparkStreamingEventToDatahub.generateUrnFromStreamingDescription( description, sparkLineageConf, false); } - +``` +```java public static Optional generateUrnFromStreamingDescription( String description, SparkLineageConf sparkLineageConf, boolean isSink) { String pattern = "(.*?)\\[(.*)]"; @@ -74,4 +75,17 @@ public static Optional generateUrnFromStreamingDescription( } } } +``` +```java +public static Optional generateUrnFromStreamingDescription( + String description, SparkLineageConf sparkLineageConf, String streamingPlatform) { + String platform = getDatahubPlatform(streamingPlatform); + log.debug("Streaming description Platform: {}, Path: {}, FabricType: {}", + platform, description, sparkLineageConf.getOpenLineageConf().getFabricType()); + return Optional.of( + new DatasetUrn( + new DataPlatformUrn(platform), + description, + sparkLineageConf.getOpenLineageConf().getFabricType())); +} ``` \ No newline at end of file From 1452d62811a08b6f04fd93319805cdbb2a93f68d Mon Sep 17 00:00:00 2001 From: cgpoh Date: Thu, 24 Apr 2025 16:48:02 +0800 Subject: [PATCH 05/10] chore: change design to use streaming spec --- ...00-spark-streaming-sink-source-platform.md | 132 +++++++++++------- 1 file changed, 80 insertions(+), 52 deletions(-) diff --git a/active/000-spark-streaming-sink-source-platform.md b/active/000-spark-streaming-sink-source-platform.md index cd5d42a..04fa2df 100644 --- a/active/000-spark-streaming-sink-source-platform.md +++ b/active/000-spark-streaming-sink-source-platform.md @@ -19,7 +19,7 @@ The motivation for this RFC stems from an issue encountered while capturing data ## Detailed design -It is proposed to introduce two new Spark configurations: `spark.datahub.streaming.source.platform` for specifying the streaming source platform, and `spark.datahub.streaming.sink.platform` for the streaming sink. Within the `generateUrnFromStreamingDescription` method in `SparkStreamingEventToDatahub.java`, these configurations will serve as fallbacks in cases where the regular expression matcher fails to extract the platform. If the configurations are set, their values will be used to determine the data platform. An example implementation is shown below: +It is proposed to introduce two new Spark configurations: `spark.datahub.streaming_platform` for specifying the streaming platform, and streaming spec which is defined in the *Configuring Iceberg based dataset URNs* section below. Within the `generateUrnFromStreamingDescription` method in `SparkStreamingEventToDatahub.java`, these configurations will serve as fallbacks in cases where the regular expression matcher fails to extract the platform. If the configurations are set, their values will be used to determine the data platform. An example implementation is shown below: ```java public static Optional generateUrnFromStreamingDescription( String description, SparkLineageConf sparkLineageConf) { @@ -28,64 +28,92 @@ It is proposed to introduce two new Spark configurations: `spark.datahub.streami } ``` ```java -public static Optional generateUrnFromStreamingDescription( - String description, SparkLineageConf sparkLineageConf, boolean isSink) { + public static Optional generateUrnFromStreamingDescription(String description, + SparkLineageConf sparkLineageConf, boolean isSink) { String pattern = "(.*?)\\[(.*)]"; Pattern r = Pattern.compile(pattern); Matcher m = r.matcher(description); if (m.find()) { - String namespace = m.group(1); - String platform = getDatahubPlatform(namespace); - String path = m.group(2); - log.debug("Streaming description Platform: {}, Path: {}", platform, path); - if (platform.equals(KAFKA_PLATFORM)) { - path = getKafkaTopicFromPath(m.group(2)); - } else if (platform.equals(FILE_PLATFORM) || platform.equals(DELTA_LAKE_PLATFORM)) { - try { - DatasetUrn urn = - HdfsPathDataset.create(new URI(path), sparkLineageConf.getOpenLineageConf()).urn(); - return Optional.of(urn); - } catch (InstantiationException e) { - return Optional.empty(); - } catch (URISyntaxException e) { - log.error("Failed to parse path {}", path, e); - return Optional.empty(); + String namespace = m.group(1); + String platform = getDatahubPlatform(namespace); + String path = m.group(2); + log.debug("Streaming description Platform: {}, Path: {}", platform, path); + if (platform.equals(KAFKA_PLATFORM)) { + path = getKafkaTopicFromPath(m.group(2)); + } else if (platform.equals(FILE_PLATFORM) || platform.equals(DELTA_LAKE_PLATFORM)) { + try { + DatasetUrn urn = HdfsPathDataset.create(new URI(path), sparkLineageConf.getOpenLineageConf()).urn(); + return Optional.of(urn); + } catch (InstantiationException e) { + return Optional.empty(); + } catch (URISyntaxException e) { + log.error("Failed to parse path {}", path, e); + return Optional.empty(); + } } - } - return Optional.of( - new DatasetUrn( - new DataPlatformUrn(platform), - path, - sparkLineageConf.getOpenLineageConf().getFabricType())); + return Optional.of( + new DatasetUrn(new DataPlatformUrn(platform), path, sparkLineageConf.getOpenLineageConf().getFabricType())); } else { - if (sparkLineageConf.getOpenLineageConf().getStreamingSinkPlatform() != null && isSink) { - return generateUrnFromStreamingDescription( - description, - sparkLineageConf, - sparkLineageConf.getOpenLineageConf().getStreamingSinkPlatform() - ); - } else if (sparkLineageConf.getOpenLineageConf().getStreamingSourcePlatform() != null && !isSink) { - return generateUrnFromStreamingDescription( - description, - sparkLineageConf, - sparkLineageConf.getOpenLineageConf().getStreamingSourcePlatform() - ); - } else { - return Optional.empty(); - } + if (sparkLineageConf.getOpenLineageConf().getStreamingPlatform() != null) { + try { + CatalogTableDataset catalogTableDataset = + CatalogTableDataset.create(sparkLineageConf.getOpenLineageConf().getStreamingPlatform(), description, + sparkLineageConf.getOpenLineageConf(), isSink ? "sink" : "source"); + if (catalogTableDataset == null) { + return Optional.empty(); + } else { + DatasetUrn urn = catalogTableDataset.urn(); + return Optional.of(urn); + } + } catch (InstantiationException e) { + return Optional.empty(); + } + } else { + return Optional.empty(); + } } } ``` -```java -public static Optional generateUrnFromStreamingDescription( - String description, SparkLineageConf sparkLineageConf, String streamingPlatform) { - String platform = getDatahubPlatform(streamingPlatform); - log.debug("Streaming description Platform: {}, Path: {}, FabricType: {}", - platform, description, sparkLineageConf.getOpenLineageConf().getFabricType()); - return Optional.of( - new DatasetUrn( - new DataPlatformUrn(platform), - description, - sparkLineageConf.getOpenLineageConf().getFabricType())); -} +### Configuring Iceberg based dataset URNs + +This section follows the approach described in [Configuring Hdfs based dataset URNs](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns) + +Spark emits lineage between datasets. It has its own logic for generating urns. Python sources emit metadata of +datasets. To link these 2 things, urns generated by both have to match. +This section will help you to match urns to that of other ingestion sources. +By default, URNs are created using +template `urn:li:dataset:(urn:li:dataPlatform:<$platform>,<$platformInstance>.<$name>,<$env>)`. We can configure these 4 +things to generate the desired urn. + +**Platform**: +The platform is explicitly supported through the new Spark configuration key: + +- `spark.datahub.streaming_platform` + +Platforms that do not set this configuration will default to `null`. + +**Name**: +By default, the name is the complete path. + +**platform instance and env:** + +The default value for env is 'PROD' and the platform instance is None. env and platform instances can be set for all +datasets using configurations `spark.datahub.streaming.platform.<$platform>..platformInstance` and `spark.datahub.streaming.platform.<$platform>..env`. +If spark is processing data that belongs to a different env or platform instance, then 'streaming_alias' can be used to +specify `streaming_spec` specific values of these. 'streaming_alias' groups the env and platform instance +together. + +streaming_alias_list Example: + +The below example explains the configuration of the case, where data from 2 Iceberg tables are being processed in a single +spark application and data from my_table_1 are supposed to have "instance1" as platform instance and "PROD" as env, and +data from my_table_2 should have env "DEV" in their dataset URNs. + +``` +spark.datahub.streaming.platform.iceberg.stream1.env : PROD +spark.datahub.streaming.platform.iceberg.stream1.streaming_io_platform_type : source +spark.datahub.streaming.platform.iceberg.stream1.platform_instance : instance1 +spark.datahub.streaming.platform.iceberg.stream2.env : DEV +spark.datahub.streaming.platform.iceberg.stream1.streaming_io_platform_type : sink +spark.datahub.streaming.platform.iceberg.stream2.platform_instance : instance2 ``` \ No newline at end of file From 8e7b9401c703e74694ead52e71d9ea4f907b4e4a Mon Sep 17 00:00:00 2001 From: cgpoh Date: Thu, 24 Apr 2025 16:52:26 +0800 Subject: [PATCH 06/10] chore: update document --- active/000-spark-streaming-sink-source-platform.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/active/000-spark-streaming-sink-source-platform.md b/active/000-spark-streaming-sink-source-platform.md index 04fa2df..7770838 100644 --- a/active/000-spark-streaming-sink-source-platform.md +++ b/active/000-spark-streaming-sink-source-platform.md @@ -114,6 +114,6 @@ spark.datahub.streaming.platform.iceberg.stream1.env : PROD spark.datahub.streaming.platform.iceberg.stream1.streaming_io_platform_type : source spark.datahub.streaming.platform.iceberg.stream1.platform_instance : instance1 spark.datahub.streaming.platform.iceberg.stream2.env : DEV -spark.datahub.streaming.platform.iceberg.stream1.streaming_io_platform_type : sink +spark.datahub.streaming.platform.iceberg.stream2.streaming_io_platform_type : sink spark.datahub.streaming.platform.iceberg.stream2.platform_instance : instance2 ``` \ No newline at end of file From 5a9a9377dd71fe717f1ce494b8344e2f570790fa Mon Sep 17 00:00:00 2001 From: cgpoh Date: Thu, 24 Apr 2025 17:01:05 +0800 Subject: [PATCH 07/10] chore: update document --- active/000-spark-streaming-sink-source-platform.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/active/000-spark-streaming-sink-source-platform.md b/active/000-spark-streaming-sink-source-platform.md index 7770838..2f3c001 100644 --- a/active/000-spark-streaming-sink-source-platform.md +++ b/active/000-spark-streaming-sink-source-platform.md @@ -112,8 +112,8 @@ data from my_table_2 should have env "DEV" in their dataset URNs. ``` spark.datahub.streaming.platform.iceberg.stream1.env : PROD spark.datahub.streaming.platform.iceberg.stream1.streaming_io_platform_type : source -spark.datahub.streaming.platform.iceberg.stream1.platform_instance : instance1 +spark.datahub.streaming.platform.iceberg.stream1.platformInstance : instance1 spark.datahub.streaming.platform.iceberg.stream2.env : DEV spark.datahub.streaming.platform.iceberg.stream2.streaming_io_platform_type : sink -spark.datahub.streaming.platform.iceberg.stream2.platform_instance : instance2 +spark.datahub.streaming.platform.iceberg.stream2.platformInstance : instance2 ``` \ No newline at end of file From 0c909d3598092d487a872b262f78859461d51ee3 Mon Sep 17 00:00:00 2001 From: cgpoh Date: Fri, 25 Apr 2025 10:34:38 +0800 Subject: [PATCH 08/10] chore: update document --- active/000-spark-streaming-sink-source-platform.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/active/000-spark-streaming-sink-source-platform.md b/active/000-spark-streaming-sink-source-platform.md index 2f3c001..0380aea 100644 --- a/active/000-spark-streaming-sink-source-platform.md +++ b/active/000-spark-streaming-sink-source-platform.md @@ -7,7 +7,7 @@ ## Summary -Allows configuration of Spark structured streaming sink and source platform. +Allows configuration of the data platform for Spark Structured Streaming sources and sinks. ## Motivation @@ -19,7 +19,7 @@ The motivation for this RFC stems from an issue encountered while capturing data ## Detailed design -It is proposed to introduce two new Spark configurations: `spark.datahub.streaming_platform` for specifying the streaming platform, and streaming spec which is defined in the *Configuring Iceberg based dataset URNs* section below. Within the `generateUrnFromStreamingDescription` method in `SparkStreamingEventToDatahub.java`, these configurations will serve as fallbacks in cases where the regular expression matcher fails to extract the platform. If the configurations are set, their values will be used to determine the data platform. An example implementation is shown below: +It is proposed to introduce two new Spark configurations: `spark.datahub.streaming_platform` for specifying the streaming platform, and alias‐based streaming configuration (see *Configuring Iceberg-based dataset URNs* below). Within the `generateUrnFromStreamingDescription` method in `SparkStreamingEventToDatahub.java`, these configurations will serve as fallbacks in cases where the regular expression matcher fails to extract the platform. If the configurations are set, their values will be used to determine the data platform. An example implementation is shown below: ```java public static Optional generateUrnFromStreamingDescription( String description, SparkLineageConf sparkLineageConf) { @@ -74,7 +74,7 @@ It is proposed to introduce two new Spark configurations: `spark.datahub.streami } } ``` -### Configuring Iceberg based dataset URNs +### Configuring Iceberg-based dataset URNs This section follows the approach described in [Configuring Hdfs based dataset URNs](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns) @@ -109,7 +109,7 @@ The below example explains the configuration of the case, where data from 2 Iceb spark application and data from my_table_1 are supposed to have "instance1" as platform instance and "PROD" as env, and data from my_table_2 should have env "DEV" in their dataset URNs. -``` +```properties spark.datahub.streaming.platform.iceberg.stream1.env : PROD spark.datahub.streaming.platform.iceberg.stream1.streaming_io_platform_type : source spark.datahub.streaming.platform.iceberg.stream1.platformInstance : instance1 From f65a1330b4ab7e26205d28a5a40ce0f937138962 Mon Sep 17 00:00:00 2001 From: cgpoh Date: Tue, 29 Apr 2025 14:35:09 +0800 Subject: [PATCH 09/10] chore: update document --- ...00-spark-streaming-sink-source-platform.md | 83 ++++++++++--------- 1 file changed, 45 insertions(+), 38 deletions(-) diff --git a/active/000-spark-streaming-sink-source-platform.md b/active/000-spark-streaming-sink-source-platform.md index 0380aea..5312336 100644 --- a/active/000-spark-streaming-sink-source-platform.md +++ b/active/000-spark-streaming-sink-source-platform.md @@ -7,19 +7,26 @@ ## Summary -Allows configuration of the data platform for Spark Structured Streaming sources and sinks. +Introduce configurable support for specifying the data platform of Spark Structured Streaming sources and sinks. ## Motivation -The motivation for this RFC stems from an issue encountered while capturing data lineage using DataHub with Spark Structured Streaming. In the DataHub [code](https://github.com/datahub-project/datahub/blob/master/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java#L145), a regular expression matcher expects sources to be prefixed with identifiers like Kafka[…] to determine the data platform. However, since Iceberg tables lack such a prefix (e.g., iceberg[…]), DataHub fails to recognize the platform and thus shows no lineage. +This RFC addresses an issue encountered when capturing data lineage in DataHub with Spark Structured Streaming. In the DataHub [codebase](https://github.com/datahub-project/datahub/blob/master/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java#L145), a regular expression matcher expects source descriptions to contain identifiable prefixes, such as Kafka[…], in order to extract the data platform. However, platforms like Iceberg do not use such prefixes (e.g., iceberg[…]), leading to DataHub's failure to detect the platform, resulting in missing lineage. ## Requirements -- The proposal should be able to identify the data platform of a source or sink based on the configuration provided in the Spark job. +- Support configuration of the data platform for a streaming source or sink within a Spark job. +- Use these configurations as fallbacks when regex-based extraction fails. ## Detailed design -It is proposed to introduce two new Spark configurations: `spark.datahub.streaming_platform` for specifying the streaming platform, and alias‐based streaming configuration (see *Configuring Iceberg-based dataset URNs* below). Within the `generateUrnFromStreamingDescription` method in `SparkStreamingEventToDatahub.java`, these configurations will serve as fallbacks in cases where the regular expression matcher fails to extract the platform. If the configurations are set, their values will be used to determine the data platform. An example implementation is shown below: +Propose adding the following Spark configuration: + +- `spark.datahub.streaming.platform.instance` – explicitly specifies the data platform when automatic detection fails. + +This configuration will be checked in the `generateUrnFromStreamingDescription` method within `SparkStreamingEventToDatahub.java`. If the regex pattern fails to identify the platform, and this configuration is set, its value will be used to construct the dataset URN. + +Example implementation: ```java public static Optional generateUrnFromStreamingDescription( String description, SparkLineageConf sparkLineageConf) { @@ -28,8 +35,8 @@ It is proposed to introduce two new Spark configurations: `spark.datahub.streami } ``` ```java - public static Optional generateUrnFromStreamingDescription(String description, - SparkLineageConf sparkLineageConf, boolean isSink) { +public static Optional generateUrnFromStreamingDescription(String description, + SparkLineageConf sparkLineageConf, boolean isSink) { String pattern = "(.*?)\\[(.*)]"; Pattern r = Pattern.compile(pattern); Matcher m = r.matcher(description); @@ -54,11 +61,11 @@ It is proposed to introduce two new Spark configurations: `spark.datahub.streami return Optional.of( new DatasetUrn(new DataPlatformUrn(platform), path, sparkLineageConf.getOpenLineageConf().getFabricType())); } else { - if (sparkLineageConf.getOpenLineageConf().getStreamingPlatform() != null) { + if (sparkLineageConf.getOpenLineageConf().getStreamingPlatformInstance() != null) { try { CatalogTableDataset catalogTableDataset = - CatalogTableDataset.create(sparkLineageConf.getOpenLineageConf().getStreamingPlatform(), description, - sparkLineageConf.getOpenLineageConf(), isSink ? "sink" : "source"); + CatalogTableDataset.create(sparkLineageConf.getOpenLineageConf(), description, + isSink ? "sink" : "source"); if (catalogTableDataset == null) { return Optional.empty(); } else { @@ -76,44 +83,44 @@ It is proposed to introduce two new Spark configurations: `spark.datahub.streami ``` ### Configuring Iceberg-based dataset URNs -This section follows the approach described in [Configuring Hdfs based dataset URNs](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns) +This section is modeled after [Configuring Hdfs based dataset URNs](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns) -Spark emits lineage between datasets. It has its own logic for generating urns. Python sources emit metadata of -datasets. To link these 2 things, urns generated by both have to match. -This section will help you to match urns to that of other ingestion sources. -By default, URNs are created using -template `urn:li:dataset:(urn:li:dataPlatform:<$platform>,<$platformInstance>.<$name>,<$env>)`. We can configure these 4 -things to generate the desired urn. +Spark emits dataset lineage with its own logic for URN generation. Python ingestion sources emit metadata separately. For lineage to align correctly between these systems, the URNs generated by Spark and other ingestion tools must match. -**Platform**: -The platform is explicitly supported through the new Spark configuration key: +By default, dataset URNs follow this format: +`urn:li:dataset:(urn:li:dataPlatform:<$platform>,<$platformInstance>.<$name>,<$env>)` +Each of these fields can be configured to generate matching URNs across ingestion sources. -- `spark.datahub.streaming_platform` +**Platform**: +The platform can now be explicitly set using: +- `spark.datahub.streaming.platform.instance` -Platforms that do not set this configuration will default to `null`. +If not set, the platform will default to `null`. **Name**: -By default, the name is the complete path. - -**platform instance and env:** +Defaults to the full table path in the streaming description. -The default value for env is 'PROD' and the platform instance is None. env and platform instances can be set for all -datasets using configurations `spark.datahub.streaming.platform.<$platform>..platformInstance` and `spark.datahub.streaming.platform.<$platform>..env`. -If spark is processing data that belongs to a different env or platform instance, then 'streaming_alias' can be used to -specify `streaming_spec` specific values of these. 'streaming_alias' groups the env and platform instance -together. +**Platform Instance and Env:** +Defaults: +- `env`: `PROD` +- `platformInstance`: `null` -streaming_alias_list Example: - -The below example explains the configuration of the case, where data from 2 Iceberg tables are being processed in a single -spark application and data from my_table_1 are supposed to have "instance1" as platform instance and "PROD" as env, and -data from my_table_2 should have env "DEV" in their dataset URNs. +These can be overridden for specific platforms and aliases using: +```properties +spark.datahub.streaming.platform...platformInstance +spark.datahub.streaming.platform...env +``` +The alias (`streaming_alias`) groups values for datasets processed in the same Spark job but with different metadata contexts. +**Example:** ```properties spark.datahub.streaming.platform.iceberg.stream1.env : PROD -spark.datahub.streaming.platform.iceberg.stream1.streaming_io_platform_type : source -spark.datahub.streaming.platform.iceberg.stream1.platformInstance : instance1 +spark.datahub.streaming.platform.iceberg.stream1.streaming.io.platform.type : source +spark.datahub.streaming.platform.iceberg.stream1.platformInstance : stream1 + spark.datahub.streaming.platform.iceberg.stream2.env : DEV -spark.datahub.streaming.platform.iceberg.stream2.streaming_io_platform_type : sink -spark.datahub.streaming.platform.iceberg.stream2.platformInstance : instance2 -``` \ No newline at end of file +spark.datahub.streaming.platform.iceberg.stream2.streaming.io.platform.type : sink +spark.datahub.streaming.platform.iceberg.stream2.platformInstance : catalog +spark.datahub.streaming.platform.iceberg.stream2.usePlatformInstance : true +``` +In this example, `stream2.namespace.table` will be rewritten as `catalog.namespace.table` when `usePlatformInstance = true`, allowing lineage to reflect the correct platform instance. The default behavior is `false`, meaning the platform instance is not injected into the table name. \ No newline at end of file From a274a549c13dea13a5e0798d3fe07b411c4bfde1 Mon Sep 17 00:00:00 2001 From: cgpoh Date: Tue, 29 Apr 2025 14:57:43 +0800 Subject: [PATCH 10/10] chore: update document --- ...00-spark-streaming-sink-source-platform.md | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/active/000-spark-streaming-sink-source-platform.md b/active/000-spark-streaming-sink-source-platform.md index 5312336..6156c23 100644 --- a/active/000-spark-streaming-sink-source-platform.md +++ b/active/000-spark-streaming-sink-source-platform.md @@ -83,34 +83,42 @@ public static Optional generateUrnFromStreamingDescription(String de ``` ### Configuring Iceberg-based dataset URNs -This section is modeled after [Configuring Hdfs based dataset URNs](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns) +This section follows the approach described in [Configuring Hdfs based dataset URNs](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns) -Spark emits dataset lineage with its own logic for URN generation. Python ingestion sources emit metadata separately. For lineage to align correctly between these systems, the URNs generated by Spark and other ingestion tools must match. +Spark emits lineage between datasets. It has its own logic for generating URNs. Python sources emit metadata of datasets. To link these two, the URNs generated by both must match. This section helps ensure compatibility by aligning URN generation logic. -By default, dataset URNs follow this format: +By default, URNs are created using the template: `urn:li:dataset:(urn:li:dataPlatform:<$platform>,<$platformInstance>.<$name>,<$env>)` -Each of these fields can be configured to generate matching URNs across ingestion sources. +Each of these fields can be configured to generate the desired URN. **Platform**: -The platform can now be explicitly set using: +The platform is explicitly supported through the new Spark configuration key: - `spark.datahub.streaming.platform.instance` -If not set, the platform will default to `null`. +Platforms that do not set this configuration will default to `null`. **Name**: Defaults to the full table path in the streaming description. **Platform Instance and Env:** -Defaults: -- `env`: `PROD` -- `platformInstance`: `null` - -These can be overridden for specific platforms and aliases using: +The default value for `env` is `'PROD'`, and `platformInstance` is `null`. These values can be overridden using: ```properties spark.datahub.streaming.platform...platformInstance spark.datahub.streaming.platform...env ``` -The alias (`streaming_alias`) groups values for datasets processed in the same Spark job but with different metadata contexts. +If Spark is processing data from different environments or platform instances, the `streaming_alias` allows for alias-based overrides of platform-specific settings. + +**Configuration Keys** +Below is a summary of configuration options for per-alias streaming platform customization: +- `spark.datahub.streaming.platform...platformInstance` +Sets the platform instance name to be used in the URN (e.g., `stream1`, `catalog`). +- `spark.datahub.streaming.platform...env` +Sets the environment name to be used in the URN (e.g., `PROD`, `DEV`). +- `spark.datahub.streaming.platform...usePlatformInstance` +
(Optional, default: `false`) +
If set to `true`, the platform instance will be injected into the table name. This is useful for ensuring that the lineage reflects the correct platform instance when using a shared catalog or namespace. For example, `stream2.namespace.table` will be rewritten as `catalog.namespace.table` in the URN if `platformInstance=catalog`. +- `spark.datahub.streaming.platform...streaming.io.platform.type` +Indicates whether the alias represents a streaming `"source"` or `"sink"`. **Example:** ```properties