-
Notifications
You must be signed in to change notification settings - Fork 14
rfc: spark structured streaming sink and source platform #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
cgpoh
wants to merge
10
commits into
datahub-project:main
Choose a base branch
from
cgpoh:spark-streaming-source-sink
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
88df956
rfc: spark structured streaming sink and source platform
cgpoh eec5125
chore: update RFC PR link
cgpoh 4423942
chore: update example implementation
cgpoh d1e49a4
chore: update example
cgpoh 1452d62
chore: change design to use streaming spec
cgpoh 8e7b940
chore: update document
cgpoh 5a9a937
chore: update document
cgpoh 0c909d3
chore: update document
cgpoh f65a133
chore: update document
cgpoh a274a54
chore: update document
cgpoh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,134 @@ | ||
| - Start Date: 2025-04-21 | ||
| - RFC PR: [#11](https://github.com/datahub-project/rfcs/pull/11) | ||
| - Discussion Issue: (GitHub issue this was discussed in before the RFC, if any) | ||
| - Implementation PR(s): (leave this empty) | ||
|
|
||
| # Spark Streaming Sink/Source Platform | ||
|
|
||
| ## Summary | ||
|
|
||
| Introduce configurable support for specifying the data platform of Spark Structured Streaming sources and sinks. | ||
|
|
||
| ## Motivation | ||
|
|
||
| This RFC addresses an issue encountered when capturing data lineage in DataHub with Spark Structured Streaming. In the DataHub [codebase](https://github.com/datahub-project/datahub/blob/master/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/converter/SparkStreamingEventToDatahub.java#L145), a regular expression matcher expects source descriptions to contain identifiable prefixes, such as Kafka[…], in order to extract the data platform. However, platforms like Iceberg do not use such prefixes (e.g., iceberg[…]), leading to DataHub's failure to detect the platform, resulting in missing lineage. | ||
|
|
||
| ## Requirements | ||
|
|
||
| - Support configuration of the data platform for a streaming source or sink within a Spark job. | ||
| - Use these configurations as fallbacks when regex-based extraction fails. | ||
|
|
||
| ## Detailed design | ||
|
|
||
| Propose adding the following Spark configuration: | ||
|
|
||
| - `spark.datahub.streaming.platform.instance` – explicitly specifies the data platform when automatic detection fails. | ||
|
|
||
| This configuration will be checked in the `generateUrnFromStreamingDescription` method within `SparkStreamingEventToDatahub.java`. If the regex pattern fails to identify the platform, and this configuration is set, its value will be used to construct the dataset URN. | ||
|
|
||
| Example implementation: | ||
| ```java | ||
| public static Optional<DatasetUrn> generateUrnFromStreamingDescription( | ||
| String description, SparkLineageConf sparkLineageConf) { | ||
| return SparkStreamingEventToDatahub.generateUrnFromStreamingDescription( | ||
| description, sparkLineageConf, false); | ||
| } | ||
| ``` | ||
| ```java | ||
| public static Optional<DatasetUrn> generateUrnFromStreamingDescription(String description, | ||
| SparkLineageConf sparkLineageConf, boolean isSink) { | ||
| String pattern = "(.*?)\\[(.*)]"; | ||
| Pattern r = Pattern.compile(pattern); | ||
| Matcher m = r.matcher(description); | ||
| if (m.find()) { | ||
| String namespace = m.group(1); | ||
| String platform = getDatahubPlatform(namespace); | ||
| String path = m.group(2); | ||
| log.debug("Streaming description Platform: {}, Path: {}", platform, path); | ||
| if (platform.equals(KAFKA_PLATFORM)) { | ||
| path = getKafkaTopicFromPath(m.group(2)); | ||
| } else if (platform.equals(FILE_PLATFORM) || platform.equals(DELTA_LAKE_PLATFORM)) { | ||
| try { | ||
| DatasetUrn urn = HdfsPathDataset.create(new URI(path), sparkLineageConf.getOpenLineageConf()).urn(); | ||
| return Optional.of(urn); | ||
| } catch (InstantiationException e) { | ||
| return Optional.empty(); | ||
| } catch (URISyntaxException e) { | ||
| log.error("Failed to parse path {}", path, e); | ||
| return Optional.empty(); | ||
| } | ||
| } | ||
| return Optional.of( | ||
| new DatasetUrn(new DataPlatformUrn(platform), path, sparkLineageConf.getOpenLineageConf().getFabricType())); | ||
| } else { | ||
| if (sparkLineageConf.getOpenLineageConf().getStreamingPlatformInstance() != null) { | ||
| try { | ||
| CatalogTableDataset catalogTableDataset = | ||
| CatalogTableDataset.create(sparkLineageConf.getOpenLineageConf(), description, | ||
| isSink ? "sink" : "source"); | ||
| if (catalogTableDataset == null) { | ||
| return Optional.empty(); | ||
| } else { | ||
| DatasetUrn urn = catalogTableDataset.urn(); | ||
| return Optional.of(urn); | ||
| } | ||
| } catch (InstantiationException e) { | ||
| return Optional.empty(); | ||
| } | ||
| } else { | ||
| return Optional.empty(); | ||
| } | ||
| } | ||
| } | ||
| ``` | ||
| ### Configuring Iceberg-based dataset URNs | ||
|
|
||
| This section follows the approach described in [Configuring Hdfs based dataset URNs](https://datahubproject.io/docs/metadata-integration/java/acryl-spark-lineage/#configuring-hdfs-based-dataset-urns) | ||
|
|
||
| Spark emits lineage between datasets. It has its own logic for generating URNs. Python sources emit metadata of datasets. To link these two, the URNs generated by both must match. This section helps ensure compatibility by aligning URN generation logic. | ||
|
|
||
| By default, URNs are created using the template: | ||
| `urn:li:dataset:(urn:li:dataPlatform:<$platform>,<$platformInstance>.<$name>,<$env>)` | ||
| Each of these fields can be configured to generate the desired URN. | ||
|
|
||
| **Platform**: | ||
| The platform is explicitly supported through the new Spark configuration key: | ||
| - `spark.datahub.streaming.platform.instance` | ||
|
|
||
| Platforms that do not set this configuration will default to `null`. | ||
|
|
||
| **Name**: | ||
| Defaults to the full table path in the streaming description. | ||
|
|
||
| **Platform Instance and Env:** | ||
| The default value for `env` is `'PROD'`, and `platformInstance` is `null`. These values can be overridden using: | ||
| ```properties | ||
| spark.datahub.streaming.platform.<platform>.<alias>.platformInstance | ||
| spark.datahub.streaming.platform.<platform>.<alias>.env | ||
| ``` | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| If Spark is processing data from different environments or platform instances, the `streaming_alias` allows for alias-based overrides of platform-specific settings. | ||
|
|
||
| **Configuration Keys** | ||
| Below is a summary of configuration options for per-alias streaming platform customization: | ||
| - `spark.datahub.streaming.platform.<platform>.<alias>.platformInstance` | ||
| Sets the platform instance name to be used in the URN (e.g., `stream1`, `catalog`). | ||
| - `spark.datahub.streaming.platform.<platform>.<alias>.env` | ||
| Sets the environment name to be used in the URN (e.g., `PROD`, `DEV`). | ||
| - `spark.datahub.streaming.platform.<platform>.<alias>.usePlatformInstance` | ||
| <br>(Optional, default: `false`) | ||
| <br>If set to `true`, the platform instance will be injected into the table name. This is useful for ensuring that the lineage reflects the correct platform instance when using a shared catalog or namespace. For example, `stream2.namespace.table` will be rewritten as `catalog.namespace.table` in the URN if `platformInstance=catalog`. | ||
| - `spark.datahub.streaming.platform.<platform>.<alias>.streaming.io.platform.type` | ||
| Indicates whether the alias represents a streaming `"source"` or `"sink"`. | ||
|
|
||
| **Example:** | ||
| ```properties | ||
| spark.datahub.streaming.platform.iceberg.stream1.env : PROD | ||
| spark.datahub.streaming.platform.iceberg.stream1.streaming.io.platform.type : source | ||
| spark.datahub.streaming.platform.iceberg.stream1.platformInstance : stream1 | ||
|
|
||
| spark.datahub.streaming.platform.iceberg.stream2.env : DEV | ||
| spark.datahub.streaming.platform.iceberg.stream2.streaming.io.platform.type : sink | ||
| spark.datahub.streaming.platform.iceberg.stream2.platformInstance : catalog | ||
| spark.datahub.streaming.platform.iceberg.stream2.usePlatformInstance : true | ||
| ``` | ||
| In this example, `stream2.namespace.table` will be rewritten as `catalog.namespace.table` when `usePlatformInstance = true`, allowing lineage to reflect the correct platform instance. The default behavior is `false`, meaning the platform instance is not injected into the table name. | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Anchor the regex to prevent over-matching.
The pattern
is unanchored and could over-match when there are multiple or nested brackets in the description. To ensure you only match the full string and capture minimal groups, anchor the expression, for example:
This change guards against unintended partial matches.