Skip to content

Commit 2b48b22

Browse files
authored
fix(ingestion/grafana): fix fails caused by text panels in ingestion (#15291)
1 parent 511ffb2 commit 2b48b22

File tree

12 files changed

+2331
-932
lines changed

12 files changed

+2331
-932
lines changed

metadata-ingestion/src/datahub/ingestion/source/grafana/field_utils.py

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,13 @@ def extract_raw_sql_fields(
6464
report: Optional[Any] = None,
6565
) -> List[SchemaFieldClass]:
6666
"""Extract fields from raw SQL queries using DataHub's SQL parsing."""
67-
raw_sql = target.get("rawSql", "")
67+
# Handle case variations: rawSql, rawSQL, etc.
68+
raw_sql = ""
69+
for key, value in target.items():
70+
if key.lower() == "rawsql" and value:
71+
raw_sql = value
72+
break
73+
6874
if not raw_sql:
6975
return []
7076

@@ -77,13 +83,15 @@ def extract_raw_sql_fields(
7783
schema_aware = False
7884

7985
if panel and panel.datasource_ref and connection_to_platform_map:
80-
ds_type = panel.datasource_ref.type or "unknown"
81-
ds_uid = panel.datasource_ref.uid or "unknown"
86+
ds_type = panel.datasource_ref.type
87+
ds_uid = panel.datasource_ref.uid
8288

83-
# Try to find mapping by datasource UID first, then by type
84-
platform_config = connection_to_platform_map.get(
85-
ds_uid
86-
) or connection_to_platform_map.get(ds_type)
89+
# Try to find mapping by datasource UID first (if it exists), then by type
90+
platform_config = None
91+
if ds_uid:
92+
platform_config = connection_to_platform_map.get(ds_uid)
93+
if not platform_config and ds_type:
94+
platform_config = connection_to_platform_map.get(ds_type)
8795

8896
if platform_config:
8997
platform = platform_config.platform
@@ -141,7 +149,13 @@ def extract_raw_sql_fields(
141149

142150
def _extract_raw_sql_fields_fallback(target: Dict[str, Any]) -> List[SchemaFieldClass]:
143151
"""Fallback basic SQL parsing for when sqlglot fails."""
144-
raw_sql = target.get("rawSql", "").lower()
152+
# Handle case variations: rawSql, rawSQL, etc.
153+
raw_sql = ""
154+
for key, value in target.items():
155+
if key.lower() == "rawsql" and value:
156+
raw_sql = value
157+
break
158+
145159
if not raw_sql:
146160
return []
147161

@@ -185,18 +199,20 @@ def _extract_raw_sql_fields_fallback(target: Dict[str, Any]) -> List[SchemaField
185199
# Clean up any remaining quotes or parentheses
186200
field_name = field_name.strip("\"'()")
187201

188-
fields.append(
189-
SchemaFieldClass(
190-
fieldPath=field_name,
191-
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
192-
nativeDataType="sql_column",
202+
# Only create field if field_name is not empty
203+
if field_name:
204+
fields.append(
205+
SchemaFieldClass(
206+
fieldPath=field_name,
207+
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
208+
nativeDataType="sql_column",
209+
)
193210
)
194-
)
195211

196212
return fields
197213

198214
except (IndexError, ValueError, StopIteration) as e:
199-
logger.warning(f"Failed to parse SQL: {target.get('rawSql')}", e)
215+
logger.warning(f"Failed to parse SQL: {raw_sql}", e)
200216
return []
201217

202218

@@ -208,13 +224,27 @@ def extract_fields_from_panel(
208224
) -> List[SchemaFieldClass]:
209225
"""Extract all fields from a panel."""
210226
fields = []
211-
fields.extend(
212-
extract_fields_from_targets(
213-
panel.query_targets, panel, connection_to_platform_map, graph, report
227+
228+
# Extract fields from targets (only if there are targets)
229+
if panel.safe_query_targets:
230+
target_fields = extract_fields_from_targets(
231+
panel.safe_query_targets, panel, connection_to_platform_map, graph, report
214232
)
215-
)
216-
fields.extend(get_fields_from_field_config(panel.field_config))
217-
fields.extend(get_fields_from_transformations(panel.transformations))
233+
if target_fields:
234+
fields.extend(target_fields)
235+
236+
# Extract fields from field config - use safe property to ensure non-None
237+
field_config_fields = get_fields_from_field_config(panel.safe_field_config)
238+
if field_config_fields:
239+
fields.extend(field_config_fields)
240+
241+
# Extract fields from transformations (only if there are transformations)
242+
if panel.safe_transformations:
243+
transformation_fields = get_fields_from_transformations(
244+
panel.safe_transformations
245+
)
246+
if transformation_fields:
247+
fields.extend(transformation_fields)
218248

219249
# Track schema field extraction
220250
if report:
@@ -234,6 +264,7 @@ def extract_fields_from_targets(
234264
report: Optional[Any] = None,
235265
) -> List[SchemaFieldClass]:
236266
"""Extract fields from panel targets."""
267+
237268
fields = []
238269
for target in targets:
239270
fields.extend(extract_sql_column_fields(target))
@@ -275,7 +306,9 @@ def get_fields_from_field_config(
275306
nativeDataType="value",
276307
)
277308
)
278-
for override in field_config.get("overrides", []):
309+
310+
overrides = field_config.get("overrides") or []
311+
for override in overrides:
279312
if override.get("matcher", {}).get("id") == "byName":
280313
field_name = override.get("matcher", {}).get("options")
281314
if field_name:
@@ -293,15 +326,18 @@ def get_fields_from_transformations(
293326
transformations: List[Dict[str, Any]],
294327
) -> List[SchemaFieldClass]:
295328
"""Extract fields from transformations."""
329+
296330
fields = []
297331
for transform in transformations:
298332
if transform.get("type") == "organize":
299333
for field_name in transform.get("options", {}).get("indexByName", {}):
300-
fields.append(
301-
SchemaFieldClass(
302-
fieldPath=field_name,
303-
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
304-
nativeDataType="transformed",
334+
# Only create field if field_name is not empty
335+
if field_name and field_name.strip():
336+
fields.append(
337+
SchemaFieldClass(
338+
fieldPath=field_name.strip(),
339+
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
340+
nativeDataType="transformed",
341+
)
305342
)
306-
)
307343
return fields

metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_api.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ def __init__(
2323
verify_ssl: bool,
2424
page_size: int,
2525
report: GrafanaSourceReport,
26+
skip_text_panels: bool = False,
2627
) -> None:
2728
self.base_url = base_url
2829
self.verify_ssl = verify_ssl
2930
self.page_size = page_size
3031
self.report = report
32+
self.skip_text_panels = skip_text_panels
3133
self.session = self._create_session(token)
3234

3335
def _create_session(self, token: SecretStr) -> requests.Session:
@@ -88,7 +90,10 @@ def get_dashboard(self, uid: str) -> Optional[Dashboard]:
8890
try:
8991
response = self.session.get(f"{self.base_url}/api/dashboards/uid/{uid}")
9092
response.raise_for_status()
91-
return Dashboard.model_validate(response.json())
93+
dashboard_data = response.json()
94+
if self.skip_text_panels:
95+
dashboard_data["_skip_text_panels"] = True
96+
return Dashboard.model_validate(dashboard_data)
9297
except requests.exceptions.RequestException as e:
9398
self.report.warning(
9499
title="Dashboard Fetch Error",

metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_config.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
EnvConfigMixin,
99
PlatformInstanceConfigMixin,
1010
)
11+
from datahub.ingestion.source.state.stale_entity_removal_handler import (
12+
StatefulStaleMetadataRemovalConfig,
13+
)
1114
from datahub.ingestion.source.state.stateful_ingestion_base import (
1215
StatefulIngestionConfigBase,
1316
)
@@ -80,6 +83,11 @@ class GrafanaSourceConfig(
8083
ingest_owners: bool = Field(
8184
default=True, description="Whether to ingest dashboard ownership information"
8285
)
86+
skip_text_panels: bool = Field(
87+
default=False,
88+
description="Whether to skip text panels during ingestion. "
89+
"Text panels don't contain data visualizations and may not be relevant for data lineage.",
90+
)
8391

8492
include_lineage: bool = Field(
8593
default=True,
@@ -99,6 +107,10 @@ class GrafanaSourceConfig(
99107
description="Map of Grafana datasource types/UIDs to platform connection configs for lineage extraction",
100108
)
101109

110+
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
111+
default=None, description="Stateful ingestion configuration"
112+
)
113+
102114
@field_validator("url", mode="after")
103115
@classmethod
104116
def remove_trailing_slash(cls, v: str) -> str:

metadata-ingestion/src/datahub/ingestion/source/grafana/grafana_source.py

Lines changed: 50 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,9 @@
6161
DashboardInfoClass,
6262
DataPlatformInstanceClass,
6363
DatasetPropertiesClass,
64-
DatasetSnapshotClass,
6564
GlobalTagsClass,
6665
InputFieldClass,
6766
InputFieldsClass,
68-
MetadataChangeEventClass,
6967
OtherSchemaClass,
7068
SchemaFieldClass,
7169
SchemaMetadataClass,
@@ -154,6 +152,7 @@ def __init__(self, config: GrafanaSourceConfig, ctx: PipelineContext):
154152
verify_ssl=self.config.verify_ssl,
155153
page_size=self.config.page_size,
156154
report=self.report,
155+
skip_text_panels=self.config.skip_text_panels,
157156
)
158157

159158
# Initialize lineage extractor with graph
@@ -496,47 +495,58 @@ def _process_panel_dataset(
496495
env=self.env,
497496
)
498497

499-
# Create dataset snapshot
500-
dataset_snapshot = DatasetSnapshotClass(
501-
urn=dataset_urn,
502-
aspects=[
503-
DataPlatformInstanceClass(
504-
platform=make_data_platform_urn(self.platform),
505-
instance=make_dataplatform_instance_urn(
506-
platform=self.platform,
507-
instance=self.platform_instance,
508-
)
509-
if self.platform_instance
510-
else None,
511-
),
512-
DatasetPropertiesClass(
513-
name=f"{ds_uid} ({panel.title or panel.id})",
514-
description="",
515-
customProperties={
516-
"type": ds_type,
517-
"uid": ds_uid,
518-
"full_path": dataset_name,
519-
},
520-
),
521-
StatusClass(removed=False),
522-
],
523-
)
498+
# Platform instance aspect
499+
yield MetadataChangeProposalWrapper(
500+
entityUrn=dataset_urn,
501+
aspect=DataPlatformInstanceClass(
502+
platform=make_data_platform_urn(self.platform),
503+
instance=make_dataplatform_instance_urn(
504+
platform=self.platform,
505+
instance=self.platform_instance,
506+
)
507+
if self.platform_instance
508+
else None,
509+
),
510+
).as_workunit()
511+
512+
# Dataset properties aspect
513+
yield MetadataChangeProposalWrapper(
514+
entityUrn=dataset_urn,
515+
aspect=DatasetPropertiesClass(
516+
name=f"{ds_uid} ({panel.title or panel.id})",
517+
description="",
518+
customProperties={
519+
"type": ds_type,
520+
"uid": ds_uid,
521+
"full_path": dataset_name,
522+
},
523+
),
524+
).as_workunit()
525+
526+
# Status aspect
527+
yield MetadataChangeProposalWrapper(
528+
entityUrn=dataset_urn,
529+
aspect=StatusClass(removed=False),
530+
).as_workunit()
524531

525532
# Add schema metadata if available
526533
schema_fields = extract_fields_from_panel(
527534
panel, self.config.connection_to_platform_map, self.ctx.graph, self.report
528535
)
529536
if schema_fields:
530-
schema_metadata = SchemaMetadataClass(
531-
schemaName=f"{ds_type}.{ds_uid}.{panel.id}",
532-
platform=make_data_platform_urn(self.platform),
533-
version=0,
534-
fields=schema_fields,
535-
hash="",
536-
platformSchema=OtherSchemaClass(rawSchema=""),
537-
)
538-
dataset_snapshot.aspects.append(schema_metadata)
537+
yield MetadataChangeProposalWrapper(
538+
entityUrn=dataset_urn,
539+
aspect=SchemaMetadataClass(
540+
schemaName=f"{ds_type}.{ds_uid}.{panel.id}",
541+
platform=make_data_platform_urn(self.platform),
542+
version=0,
543+
fields=schema_fields,
544+
hash="",
545+
platformSchema=OtherSchemaClass(rawSchema=""),
546+
),
547+
).as_workunit()
539548

549+
# Add tags if available
540550
if dashboard_uid and self.config.ingest_tags:
541551
dashboard = self.api_client.get_dashboard(dashboard_uid)
542552
if dashboard and dashboard.tags:
@@ -545,13 +555,12 @@ def _process_panel_dataset(
545555
tags.append(TagAssociationClass(tag=make_tag_urn(tag)))
546556

547557
if tags:
548-
dataset_snapshot.aspects.append(GlobalTagsClass(tags=tags))
558+
yield MetadataChangeProposalWrapper(
559+
entityUrn=dataset_urn,
560+
aspect=GlobalTagsClass(tags=tags),
561+
).as_workunit()
549562

550563
self.report.report_dataset_scanned()
551-
yield MetadataWorkUnit(
552-
id=f"grafana-dataset-{ds_uid}-{panel.id}",
553-
mce=MetadataChangeEventClass(proposedSnapshot=dataset_snapshot),
554-
)
555564

556565
# Add dataset to dashboard container
557566
if dashboard_uid:

0 commit comments

Comments
 (0)