Skip to content

Commit d9224eb

Browse files
acrylJonnyesteban
authored andcommitted
feat(metadata-integration/openlineage): support env and orchestrator in openlineage event emission (#15251)
1 parent 07c1422 commit d9224eb

File tree

8 files changed

+619
-15
lines changed

8 files changed

+619
-15
lines changed

docs/lineage/openlineage.md

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,88 @@ The DataHub OpenLineage integration can be configured using environment variable
8888

8989
##### Environment Variables
9090

91-
| Environment Variable | Property | Type | Default | Description |
92-
| ------------------------------------------------------ | ------------------------------------------------------ | ------- | ------- | --------------------------------------------------------------- |
93-
| `DATAHUB_OPENLINEAGE_PLATFORM_INSTANCE` | `datahub.openlineage.platform-instance` | String | `null` | Specific platform instance identifier |
94-
| `DATAHUB_OPENLINEAGE_COMMON_DATASET_PLATFORM_INSTANCE` | `datahub.openlineage.common-dataset-platform-instance` | String | `null` | Common platform instance for datasets |
95-
| `DATAHUB_OPENLINEAGE_MATERIALIZE_DATASET` | `datahub.openlineage.materialize-dataset` | Boolean | `true` | Whether to materialize dataset entities |
96-
| `DATAHUB_OPENLINEAGE_INCLUDE_SCHEMA_METADATA` | `datahub.openlineage.include-schema-metadata` | Boolean | `true` | Whether to include schema metadata in lineage |
97-
| `DATAHUB_OPENLINEAGE_CAPTURE_COLUMN_LEVEL_LINEAGE` | `datahub.openlineage.capture-column-level-lineage` | Boolean | `true` | Whether to capture column-level lineage information |
98-
| `DATAHUB_OPENLINEAGE_FILE_PARTITION_REGEXP_PATTERN` | `datahub.openlineage.file-partition-regexp-pattern` | String | `null` | Regular expression pattern for file partition detection |
99-
| `DATAHUB_OPENLINEAGE_USE_PATCH` | `datahub.openlineage.use-patch` | Boolean | `false` | Whether to use patch operations for lineage/incremental lineage |
91+
| Environment Variable | Property | Type | Default | Description |
92+
| ------------------------------------------------------ | ------------------------------------------------------ | ------- | ------- | ----------------------------------------------------------------------------------------------------------------- |
93+
| `DATAHUB_OPENLINEAGE_ENV` | `datahub.openlineage.env` | String | `PROD` | Environment for DataFlow cluster and Dataset fabricType (see valid values below) |
94+
| `DATAHUB_OPENLINEAGE_ORCHESTRATOR` | `datahub.openlineage.orchestrator` | String | `null` | Orchestrator name for DataFlow entities. When set, takes precedence over processing_engine facet and producer URL |
95+
| `DATAHUB_OPENLINEAGE_PLATFORM_INSTANCE` | `datahub.openlineage.platform-instance` | String | `null` | Override DataFlow cluster (defaults to env if not specified) |
96+
| `DATAHUB_OPENLINEAGE_COMMON_DATASET_ENV` | `datahub.openlineage.common-dataset-env` | String | `null` | Override Dataset environment independently from DataFlow cluster |
97+
| `DATAHUB_OPENLINEAGE_COMMON_DATASET_PLATFORM_INSTANCE` | `datahub.openlineage.common-dataset-platform-instance` | String | `null` | Common platform instance for dataset entities |
98+
| `DATAHUB_OPENLINEAGE_MATERIALIZE_DATASET` | `datahub.openlineage.materialize-dataset` | Boolean | `true` | Whether to materialize dataset entities |
99+
| `DATAHUB_OPENLINEAGE_INCLUDE_SCHEMA_METADATA` | `datahub.openlineage.include-schema-metadata` | Boolean | `true` | Whether to include schema metadata in lineage |
100+
| `DATAHUB_OPENLINEAGE_CAPTURE_COLUMN_LEVEL_LINEAGE` | `datahub.openlineage.capture-column-level-lineage` | Boolean | `true` | Whether to capture column-level lineage information |
101+
| `DATAHUB_OPENLINEAGE_USE_PATCH` | `datahub.openlineage.use-patch` | Boolean | `false` | Whether to use patch operations for lineage/incremental lineage |
102+
| `DATAHUB_OPENLINEAGE_FILE_PARTITION_REGEXP_PATTERN` | `datahub.openlineage.file-partition-regexp-pattern` | String | `null` | Regular expression pattern for file partition detection |
103+
104+
> **Valid `env` values**: `PROD`, `DEV`, `TEST`, `QA`, `UAT`, `EI`, `PRE`, `STG`, `NON_PROD`, `CORP`, `RVW`, `PRD`, `TST`, `SIT`, `SBX`, `SANDBOX`
105+
>
106+
> **How `env` works**:
107+
>
108+
> - **By default**, `env` sets both the DataFlow cluster and Dataset fabricType for simplicity
109+
> - **For advanced scenarios**, use `platform-instance` to override the DataFlow cluster or `common-dataset-env` to override the Dataset environment independently
110+
>
111+
> **Note**: The `env` property naming matches DataHub SDK conventions where `env` is the user-facing parameter that internally maps to the URN `cluster` field.
112+
113+
##### Usage Examples
114+
115+
**Setting Environment and Orchestrator**
116+
117+
_Simple Configuration (Recommended):_
118+
119+
For most use cases, set `env` to configure both DataFlow and Datasets:
120+
121+
```bash
122+
# Development environment - sets DataFlow cluster to "dev" and Dataset fabricType to DEV
123+
DATAHUB_OPENLINEAGE_ENV=DEV
124+
DATAHUB_OPENLINEAGE_ORCHESTRATOR=my-orchestrator
125+
126+
# Production environment - sets DataFlow cluster to "prod" and Dataset fabricType to PROD
127+
DATAHUB_OPENLINEAGE_ENV=PROD
128+
DATAHUB_OPENLINEAGE_ORCHESTRATOR=dagster
129+
130+
# Staging environment
131+
DATAHUB_OPENLINEAGE_ENV=STG
132+
DATAHUB_OPENLINEAGE_ORCHESTRATOR=custom-pipeline
133+
```
134+
135+
_Advanced Configuration (Multi-Region/Complex Deployments):_
136+
137+
Override DataFlow cluster or Dataset environment independently:
138+
139+
```bash
140+
# DataFlow in specific regional cluster, but datasets marked as generic PROD
141+
DATAHUB_OPENLINEAGE_ENV=PROD
142+
DATAHUB_OPENLINEAGE_PLATFORM_INSTANCE=prod-us-west-2 # DataFlow cluster override
143+
144+
# Test pipeline against DEV data (cross-environment testing)
145+
DATAHUB_OPENLINEAGE_ENV=PROD # DataFlow cluster: prod
146+
DATAHUB_OPENLINEAGE_COMMON_DATASET_ENV=DEV # Dataset fabricType: DEV
147+
148+
# Blue-green deployment
149+
DATAHUB_OPENLINEAGE_ENV=PROD
150+
DATAHUB_OPENLINEAGE_PLATFORM_INSTANCE=prod-blue # or prod-green
151+
```
152+
153+
**Using Application Properties**
154+
155+
Alternatively, configure via `application.yml`:
156+
157+
```yaml
158+
datahub:
159+
openlineage:
160+
env: PROD
161+
orchestrator: my-custom-orchestrator
162+
platform-instance: us-west-2
163+
capture-column-level-lineage: true
164+
```
165+
166+
**Priority Order for Orchestrator Determination**
167+
168+
The orchestrator name is determined in the following priority order:
169+
170+
1. `DATAHUB_OPENLINEAGE_ORCHESTRATOR` environment variable (highest priority)
171+
2. `processing_engine` facet in the OpenLineage event
172+
3. Parsing the `producer` URL field with known patterns (Airflow, etc.)
100173

101174
#### Known Limitations
102175

metadata-integration/java/openlineage-converter/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies {
2424
implementation externalDependency.json
2525

2626
testImplementation externalDependency.testng
27+
testImplementation "io.openlineage:openlineage-java:$openLineageVersion"
2728
}
2829

2930
test {

metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/config/DatahubOpenlineageConfig.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,31 @@
1616
@Getter
1717
@ToString
1818
public class DatahubOpenlineageConfig {
19-
@Builder.Default private final boolean isSpark = false;
20-
@Builder.Default private final boolean isStreaming = false;
19+
// Pipeline/Flow configuration
2120
@Builder.Default private final String pipelineName = null;
21+
private final String orchestrator;
22+
@Builder.Default private final FabricType fabricType = FabricType.PROD;
23+
24+
// Platform configuration
2225
private final String platformInstance;
2326
private final String commonDatasetPlatformInstance;
27+
private final String commonDatasetEnv;
2428
private final String platform;
29+
30+
// Spark-specific configuration
31+
@Builder.Default private final boolean isSpark = false;
32+
@Builder.Default private final boolean isStreaming = false;
33+
34+
// Dataset path configuration
2535
@Builder.Default private final Map<String, List<PathSpec>> pathSpecs = new HashMap<>();
2636
private final String filePartitionRegexpPattern;
27-
@Builder.Default private final FabricType fabricType = FabricType.PROD;
37+
38+
// Metadata ingestion configuration
2839
private final boolean materializeDataset;
2940
private final boolean includeSchemaMetadata;
3041
@Builder.Default private final boolean captureColumnLevelLineage = true;
42+
43+
// Advanced configuration
3144
@Builder.Default private final DataJobUrn parentJobUrn = null;
3245
// This is disabled until column level patch support won't be fixed in GMS
3346
@Builder.Default private final boolean usePatch = true;

metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/converter/OpenLineageToDataHub.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,7 +1244,8 @@ public static DataFlowUrn getFlowUrn(
12441244
producerName = producer.toString();
12451245
}
12461246

1247-
String orchestrator = getOrchestrator(processingEngine, producerName);
1247+
String orchestrator =
1248+
getOrchestrator(processingEngine, producerName, datahubOpenlineageConfig.getOrchestrator());
12481249
String flowName = datahubOpenlineageConfig.getPipelineName();
12491250
if (datahubOpenlineageConfig.getPlatformInstance() != null) {
12501251
namespace = datahubOpenlineageConfig.getPlatformInstance();
@@ -1259,7 +1260,13 @@ public static DataFlowInfo convertRunEventToDataFlowInfo(
12591260
return dataFlowInfo;
12601261
}
12611262

1262-
private static String getOrchestrator(String processingEngine, String producer) {
1263+
private static String getOrchestrator(
1264+
String processingEngine, String producer, String orchestratorConfig) {
1265+
// If orchestrator is configured, use it with highest priority
1266+
if (orchestratorConfig != null && !orchestratorConfig.isEmpty()) {
1267+
return orchestratorConfig;
1268+
}
1269+
12631270
String regex = "https://github.com/OpenLineage/OpenLineage/.*/(.*)$";
12641271
Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE);
12651272
String orchestrator = null;
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package io.datahubproject.openlineage;
2+
3+
import static org.testng.Assert.assertEquals;
4+
import static org.testng.Assert.assertNotNull;
5+
6+
import com.linkedin.common.FabricType;
7+
import io.datahubproject.openlineage.config.DatahubOpenlineageConfig;
8+
import io.datahubproject.openlineage.converter.OpenLineageToDataHub;
9+
import io.datahubproject.openlineage.dataset.DatahubJob;
10+
import io.openlineage.client.OpenLineage;
11+
import java.net.URI;
12+
import java.time.ZonedDateTime;
13+
import java.util.UUID;
14+
import org.testng.annotations.Test;
15+
16+
/** Tests for OpenLineage configuration including orchestrator and fabric type */
17+
public class OpenLineageConfigTest {
18+
19+
private OpenLineage.RunEvent createTestEvent(URI producerUri) {
20+
OpenLineage openLineage = new OpenLineage(producerUri);
21+
return openLineage
22+
.newRunEventBuilder()
23+
.eventTime(ZonedDateTime.now())
24+
.eventType(OpenLineage.RunEvent.EventType.START)
25+
.run(openLineage.newRunBuilder().runId(UUID.randomUUID()).build())
26+
.job(
27+
openLineage
28+
.newJobBuilder()
29+
.namespace("test-namespace")
30+
.name("test-job")
31+
.facets(openLineage.newJobFacetsBuilder().build())
32+
.build())
33+
.inputs(java.util.Collections.emptyList())
34+
.outputs(java.util.Collections.emptyList())
35+
.build();
36+
}
37+
38+
@Test
39+
public void testOrchestratorOverride() throws Exception {
40+
// Create config with orchestrator override
41+
DatahubOpenlineageConfig config =
42+
DatahubOpenlineageConfig.builder()
43+
.fabricType(FabricType.PROD)
44+
.orchestrator("custom-orchestrator")
45+
.build();
46+
47+
OpenLineage.RunEvent runEvent =
48+
createTestEvent(URI.create("https://github.com/OpenLineage/OpenLineage/"));
49+
50+
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, config);
51+
52+
assertNotNull(datahubJob);
53+
assertEquals(
54+
datahubJob.getFlowUrn().getOrchestratorEntity(),
55+
"custom-orchestrator",
56+
"Orchestrator should be overridden to custom-orchestrator");
57+
}
58+
59+
@Test
60+
public void testOrchestratorFromProducerUrl() throws Exception {
61+
// Test with an Airflow producer URL and no override
62+
DatahubOpenlineageConfig config =
63+
DatahubOpenlineageConfig.builder().fabricType(FabricType.PROD).build();
64+
65+
URI producerUri = URI.create("https://github.com/apache/airflow/");
66+
OpenLineage.RunEvent runEvent = createTestEvent(producerUri);
67+
68+
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, config);
69+
70+
assertNotNull(datahubJob);
71+
assertEquals(
72+
datahubJob.getFlowUrn().getOrchestratorEntity(),
73+
"airflow",
74+
"Orchestrator should be derived from Airflow producer URL");
75+
}
76+
77+
@Test
78+
public void testOrchestratorOverrideTakesPrecedence() throws Exception {
79+
// Even if producer URL suggests Airflow, override should win
80+
DatahubOpenlineageConfig config =
81+
DatahubOpenlineageConfig.builder()
82+
.fabricType(FabricType.DEV)
83+
.orchestrator("my-platform")
84+
.build();
85+
86+
URI producerUri = URI.create("https://github.com/apache/airflow/");
87+
OpenLineage.RunEvent runEvent = createTestEvent(producerUri);
88+
89+
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, config);
90+
91+
assertNotNull(datahubJob);
92+
assertEquals(
93+
datahubJob.getFlowUrn().getOrchestratorEntity(),
94+
"my-platform",
95+
"Orchestrator override should take precedence over producer URL");
96+
}
97+
98+
@Test
99+
public void testFabricTypeConfiguration() throws Exception {
100+
// Test that fabric type can be configured (fabric applies to datasets, not flows)
101+
DatahubOpenlineageConfig config =
102+
DatahubOpenlineageConfig.builder()
103+
.fabricType(FabricType.QA)
104+
.orchestrator("test-orchestrator")
105+
.build();
106+
107+
OpenLineage.RunEvent runEvent =
108+
createTestEvent(URI.create("https://github.com/OpenLineage/OpenLineage/"));
109+
110+
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, config);
111+
112+
assertNotNull(datahubJob);
113+
// Fabric type is applied to datasets not flows, which is tested in existing tests
114+
assertEquals(config.getFabricType(), FabricType.QA, "Config should have QA fabric type");
115+
}
116+
117+
@Test
118+
public void testDefaultFabricTypeIsProd() throws Exception {
119+
// Test that default fabric type is PROD when not specified
120+
// Need to set orchestrator since no producer URL pattern will match
121+
DatahubOpenlineageConfig config =
122+
DatahubOpenlineageConfig.builder().orchestrator("test").build();
123+
124+
OpenLineage.RunEvent runEvent =
125+
createTestEvent(URI.create("https://github.com/OpenLineage/OpenLineage/spark/"));
126+
127+
DatahubJob datahubJob = OpenLineageToDataHub.convertRunEventToJob(runEvent, config);
128+
129+
assertNotNull(datahubJob);
130+
// Fabric type default is tested via the config
131+
assertEquals(config.getFabricType(), FabricType.PROD, "Default fabric should be PROD");
132+
}
133+
}

metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/openlineage/config/DatahubOpenlineageProperties.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,25 @@
99
@ConfigurationProperties(prefix = "datahub.openlineage")
1010
public class DatahubOpenlineageProperties {
1111

12+
// Pipeline/Flow configuration
1213
private String pipelineName;
14+
private String orchestrator;
15+
private String env;
16+
17+
// Platform configuration
1318
private String platformInstance;
1419
private String commonDatasetPlatformInstance;
20+
private String commonDatasetEnv;
1521
private String platform;
22+
23+
// Dataset path configuration
1624
private String filePartitionRegexpPattern;
25+
26+
// Metadata ingestion configuration
1727
private boolean materializeDataset = true;
1828
private boolean includeSchemaMetadata = true;
1929
private boolean captureColumnLevelLineage = true;
30+
31+
// Advanced configuration
2032
private boolean usePatch = false;
2133
}

metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/openlineage/config/OpenLineageServletConfig.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.datahubproject.openapi.openlineage.config;
22

3+
import com.linkedin.common.FabricType;
34
import io.datahubproject.openapi.openlineage.mapping.RunEventMapper;
45
import io.datahubproject.openlineage.config.DatahubOpenlineageConfig;
56
import lombok.extern.slf4j.Slf4j;
@@ -18,16 +19,46 @@ public OpenLineageServletConfig(DatahubOpenlineageProperties properties) {
1819

1920
@Bean
2021
public RunEventMapper.MappingConfig mappingConfig() {
22+
// Parse FabricType from string property
23+
// Use commonDatasetEnv if specified, otherwise fall back to env
24+
String envValue =
25+
properties.getCommonDatasetEnv() != null
26+
? properties.getCommonDatasetEnv()
27+
: properties.getEnv();
28+
29+
FabricType fabricType = FabricType.PROD; // default
30+
if (envValue != null && !envValue.isEmpty()) {
31+
try {
32+
fabricType = FabricType.valueOf(envValue.toUpperCase());
33+
} catch (IllegalArgumentException e) {
34+
log.warn(
35+
"Invalid env value '{}'. Using default PROD. Valid values: PROD, DEV, TEST, QA, UAT, EI, PRE, STG, NON_PROD, CORP, RVW, PRD, TST, SIT, SBX, SANDBOX",
36+
envValue);
37+
}
38+
}
39+
40+
// Use platformInstance if specified, otherwise use env as the cluster
41+
String platformInstance = properties.getPlatformInstance();
42+
if (platformInstance == null && properties.getEnv() != null && !properties.getEnv().isEmpty()) {
43+
// Default: use env as the DataFlow cluster
44+
platformInstance = properties.getEnv().toLowerCase();
45+
log.debug(
46+
"Using env '{}' as DataFlow cluster (platformInstance not specified)", platformInstance);
47+
}
48+
2149
DatahubOpenlineageConfig datahubOpenlineageConfig =
2250
DatahubOpenlineageConfig.builder()
23-
.platformInstance(properties.getPlatformInstance())
51+
.platformInstance(platformInstance)
2452
.commonDatasetPlatformInstance(properties.getCommonDatasetPlatformInstance())
53+
.commonDatasetEnv(properties.getCommonDatasetEnv())
2554
.platform(properties.getPlatform())
2655
.filePartitionRegexpPattern(properties.getFilePartitionRegexpPattern())
2756
.materializeDataset(properties.isMaterializeDataset())
2857
.includeSchemaMetadata(properties.isIncludeSchemaMetadata())
2958
.captureColumnLevelLineage(properties.isCaptureColumnLevelLineage())
3059
.usePatch(properties.isUsePatch())
60+
.fabricType(fabricType)
61+
.orchestrator(properties.getOrchestrator())
3162
.parentJobUrn(null)
3263
.build();
3364
log.info("Starting OpenLineage Endpoint with config: {}", datahubOpenlineageConfig);

0 commit comments

Comments
 (0)