Skip to content

Commit 072a5e6

Browse files
committed
properly add data platform instance to browsepath for DataFlow
1 parent 6b8b6aa commit 072a5e6

File tree

7 files changed

+92
-2
lines changed

7 files changed

+92
-2
lines changed

metadata-ingestion/src/datahub/sdk/dataflow.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from datahub.errors import (
1313
IngestionAttributionWarning,
1414
)
15-
from datahub.metadata.urns import DataFlowUrn, Urn
15+
from datahub.metadata.urns import DataFlowUrn, DataPlatformInstanceUrn, Urn
1616
from datahub.sdk._attribution import is_ingestion_attribution
1717
from datahub.sdk._shared import (
1818
DomainInputType,
@@ -123,6 +123,7 @@ def __init__(
123123
self._set_extra_aspects(extra_aspects)
124124

125125
self._set_platform_instance(urn.orchestrator, platform_instance)
126+
self._set_default_browse_path(platform_instance)
126127

127128
# Initialize DataFlowInfoClass directly with name
128129
self._setdefault_aspect(models.DataFlowInfoClass(name=display_name or name))
@@ -307,3 +308,25 @@ def set_last_modified(self, last_modified: datetime) -> None:
307308
def env(self) -> Optional[Union[str, models.FabricTypeClass]]:
308309
"""Get the environment of the dataflow."""
309310
return self._ensure_dataflow_props().env
311+
312+
def _set_default_browse_path(self, platform_instance: Optional[str]) -> None:
313+
"""Set a default browse path for the dataflow based on platform and instance.
314+
315+
This creates a browse path with the platform instance as the root if provided,
316+
preventing the dataflow from appearing in a generic "Default" folder.
317+
318+
Args:
319+
platform_instance: Optional platform instance identifier.
320+
"""
321+
browse_path = []
322+
if platform_instance:
323+
platform_instance_urn = DataPlatformInstanceUrn(
324+
self.urn.orchestrator, platform_instance
325+
)
326+
browse_path.append(
327+
models.BrowsePathEntryClass(
328+
id=platform_instance_urn.urn(), urn=platform_instance_urn.urn()
329+
)
330+
)
331+
332+
self._set_aspect(models.BrowsePathsV2Class(path=browse_path))

metadata-ingestion/tests/unit/sdk_v2/dataflow_golden/test_dataflow_basic_golden.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,17 @@
1010
}
1111
}
1212
},
13+
{
14+
"entityType": "dataFlow",
15+
"entityUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)",
16+
"changeType": "UPSERT",
17+
"aspectName": "browsePathsV2",
18+
"aspect": {
19+
"json": {
20+
"path": []
21+
}
22+
}
23+
},
1324
{
1425
"entityType": "dataFlow",
1526
"entityUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)",

metadata-ingestion/tests/unit/sdk_v2/dataflow_golden/test_dataflow_complex_golden.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,22 @@
1111
}
1212
}
1313
},
14+
{
15+
"entityType": "dataFlow",
16+
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
17+
"changeType": "UPSERT",
18+
"aspectName": "browsePathsV2",
19+
"aspect": {
20+
"json": {
21+
"path": [
22+
{
23+
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)",
24+
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
25+
}
26+
]
27+
}
28+
}
29+
},
1430
{
1531
"entityType": "dataFlow",
1632
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",

metadata-ingestion/tests/unit/sdk_v2/datajob_golden/test_datajob_complex_golden.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
"aspect": {
2020
"json": {
2121
"path": [
22+
{
23+
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)",
24+
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
25+
},
2226
{
2327
"id": "my_instance.example_dag",
2428
"urn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)"

metadata-ingestion/tests/unit/sdk_v2/datajob_golden/test_datajob_init_with_flow_urn_golden.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919
"aspect": {
2020
"json": {
2121
"path": [
22+
{
23+
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)",
24+
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
25+
},
2226
{
2327
"id": "my_instance.example_dag",
2428
"urn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)"

metadata-ingestion/tests/unit/sdk_v2/test_dataflow.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,32 @@ def test_dataflow_with_container() -> None:
204204
)
205205
assert flow.parent_container == container.urn
206206
assert flow.browse_path == [container.urn]
207+
208+
209+
def test_dataflow_browse_path_with_platform_instance() -> None:
210+
"""Test that DataFlow with platform_instance has a correct browse path."""
211+
flow = DataFlow(
212+
platform="fivetran",
213+
name="my_connector",
214+
platform_instance="prod_instance",
215+
)
216+
217+
assert flow.platform_instance is not None
218+
assert flow.browse_path is not None
219+
assert len(flow.browse_path) == 1
220+
assert (
221+
str(flow.browse_path[0])
222+
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,prod_instance)"
223+
)
224+
225+
226+
def test_dataflow_browse_path_without_platform_instance() -> None:
227+
"""Test that DataFlow without platform_instance has an empty browse path."""
228+
flow = DataFlow(
229+
platform="fivetran",
230+
name="my_connector",
231+
)
232+
233+
assert flow.platform_instance is None
234+
assert flow.browse_path is not None
235+
assert len(flow.browse_path) == 0

metadata-ingestion/tests/unit/sdk_v2/test_datajob.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ def test_datajob_complex() -> None:
117117
assert job.platform == DataPlatformUrn("airflow")
118118
assert job.platform_instance == flow.platform_instance
119119
assert job.platform_instance == DataPlatformInstanceUrn("airflow", "my_instance")
120-
assert job.browse_path == [flow.urn]
120+
assert job.browse_path == [
121+
DataPlatformInstanceUrn("airflow", "my_instance"),
122+
flow.urn,
123+
]
121124

122125
# Validate golden file
123126
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_complex_golden.json")

0 commit comments

Comments
 (0)