Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ def extract_raw_sql_fields(
report: Optional[Any] = None,
) -> List[SchemaFieldClass]:
"""Extract fields from raw SQL queries using DataHub's SQL parsing."""
raw_sql = target.get("rawSql", "")
# Handle case variations: rawSql, rawSQL, etc.
raw_sql = ""
for key, value in target.items():
if key.lower() == "rawsql" and value:
raw_sql = value
break
Comment on lines +68 to +72
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about ?

Suggested change
raw_sql = ""
for key, value in target.items():
if key.lower() == "rawsql" and value:
raw_sql = value
break
raw_sql = target.get("rawSql") or target.get("rawSQL") or target.get("rawsql")

any other variant to be considered?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The short answer is that this is dictated by the implementation of the individual connector - so, although we've found Athena to be different to Postgres, there may be more, hence why leaving it as flexible would I think be best.


if not raw_sql:
return []

Expand All @@ -77,13 +83,15 @@ def extract_raw_sql_fields(
schema_aware = False

if panel and panel.datasource_ref and connection_to_platform_map:
ds_type = panel.datasource_ref.type or "unknown"
ds_uid = panel.datasource_ref.uid or "unknown"
ds_type = panel.datasource_ref.type
ds_uid = panel.datasource_ref.uid

# Try to find mapping by datasource UID first, then by type
platform_config = connection_to_platform_map.get(
ds_uid
) or connection_to_platform_map.get(ds_type)
# Try to find mapping by datasource UID first (if it exists), then by type
platform_config = None
if ds_uid:
platform_config = connection_to_platform_map.get(ds_uid)
if not platform_config and ds_type:
platform_config = connection_to_platform_map.get(ds_type)

if platform_config:
platform = platform_config.platform
Expand Down Expand Up @@ -141,7 +149,13 @@ def extract_raw_sql_fields(

def _extract_raw_sql_fields_fallback(target: Dict[str, Any]) -> List[SchemaFieldClass]:
"""Fallback basic SQL parsing for when sqlglot fails."""
raw_sql = target.get("rawSql", "").lower()
# Handle case variations: rawSql, rawSQL, etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as before

raw_sql = ""
for key, value in target.items():
if key.lower() == "rawsql" and value:
raw_sql = value
break

if not raw_sql:
return []

Expand Down Expand Up @@ -185,18 +199,20 @@ def _extract_raw_sql_fields_fallback(target: Dict[str, Any]) -> List[SchemaField
# Clean up any remaining quotes or parentheses
field_name = field_name.strip("\"'()")

fields.append(
SchemaFieldClass(
fieldPath=field_name,
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="sql_column",
# Only create field if field_name is not empty
if field_name:
fields.append(
SchemaFieldClass(
fieldPath=field_name,
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="sql_column",
)
)
)

return fields

except (IndexError, ValueError, StopIteration) as e:
logger.warning(f"Failed to parse SQL: {target.get('rawSql')}", e)
logger.warning(f"Failed to parse SQL: {raw_sql}", e)
return []


Expand All @@ -208,13 +224,27 @@ def extract_fields_from_panel(
) -> List[SchemaFieldClass]:
"""Extract all fields from a panel."""
fields = []
fields.extend(
extract_fields_from_targets(
panel.query_targets, panel, connection_to_platform_map, graph, report

# Extract fields from targets (only if there are targets)
if panel.safe_query_targets:
target_fields = extract_fields_from_targets(
panel.safe_query_targets, panel, connection_to_platform_map, graph, report
)
)
fields.extend(get_fields_from_field_config(panel.field_config))
fields.extend(get_fields_from_transformations(panel.transformations))
if target_fields:
fields.extend(target_fields)

# Extract fields from field config - use safe property to ensure non-None
field_config_fields = get_fields_from_field_config(panel.safe_field_config)
if field_config_fields:
fields.extend(field_config_fields)

# Extract fields from transformations (only if there are transformations)
if panel.safe_transformations:
transformation_fields = get_fields_from_transformations(
panel.safe_transformations
)
if transformation_fields:
fields.extend(transformation_fields)

# Track schema field extraction
if report:
Expand All @@ -234,6 +264,7 @@ def extract_fields_from_targets(
report: Optional[Any] = None,
) -> List[SchemaFieldClass]:
"""Extract fields from panel targets."""

fields = []
for target in targets:
fields.extend(extract_sql_column_fields(target))
Expand Down Expand Up @@ -275,7 +306,9 @@ def get_fields_from_field_config(
nativeDataType="value",
)
)
for override in field_config.get("overrides", []):

overrides = field_config.get("overrides") or []
for override in overrides:
if override.get("matcher", {}).get("id") == "byName":
field_name = override.get("matcher", {}).get("options")
if field_name:
Expand All @@ -293,15 +326,18 @@ def get_fields_from_transformations(
transformations: List[Dict[str, Any]],
) -> List[SchemaFieldClass]:
"""Extract fields from transformations."""

fields = []
for transform in transformations:
if transform.get("type") == "organize":
for field_name in transform.get("options", {}).get("indexByName", {}):
fields.append(
SchemaFieldClass(
fieldPath=field_name,
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="transformed",
# Only create field if field_name is not empty
if field_name and field_name.strip():
fields.append(
SchemaFieldClass(
fieldPath=field_name.strip(),
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="transformed",
)
)
)
return fields
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ def __init__(
verify_ssl: bool,
page_size: int,
report: GrafanaSourceReport,
skip_text_panels: bool = False,
) -> None:
self.base_url = base_url
self.verify_ssl = verify_ssl
self.page_size = page_size
self.report = report
self.skip_text_panels = skip_text_panels
self.session = self._create_session(token)

def _create_session(self, token: SecretStr) -> requests.Session:
Expand Down Expand Up @@ -88,7 +90,10 @@ def get_dashboard(self, uid: str) -> Optional[Dashboard]:
try:
response = self.session.get(f"{self.base_url}/api/dashboards/uid/{uid}")
response.raise_for_status()
return Dashboard.model_validate(response.json())
dashboard_data = response.json()
if self.skip_text_panels:
dashboard_data["_skip_text_panels"] = True
return Dashboard.model_validate(dashboard_data)
except requests.exceptions.RequestException as e:
self.report.warning(
title="Dashboard Fetch Error",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
Expand Down Expand Up @@ -80,6 +83,11 @@ class GrafanaSourceConfig(
ingest_owners: bool = Field(
default=True, description="Whether to ingest dashboard ownership information"
)
skip_text_panels: bool = Field(
default=False,
description="Whether to skip text panels during ingestion. "
"Text panels don't contain data visualizations and may not be relevant for data lineage.",
)

include_lineage: bool = Field(
default=True,
Expand All @@ -99,6 +107,10 @@ class GrafanaSourceConfig(
description="Map of Grafana datasource types/UIDs to platform connection configs for lineage extraction",
)

stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description="Stateful ingestion configuration"
)

@field_validator("url", mode="after")
@classmethod
def remove_trailing_slash(cls, v: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,9 @@
DashboardInfoClass,
DataPlatformInstanceClass,
DatasetPropertiesClass,
DatasetSnapshotClass,
GlobalTagsClass,
InputFieldClass,
InputFieldsClass,
MetadataChangeEventClass,
OtherSchemaClass,
SchemaFieldClass,
SchemaMetadataClass,
Expand Down Expand Up @@ -154,6 +152,7 @@ def __init__(self, config: GrafanaSourceConfig, ctx: PipelineContext):
verify_ssl=self.config.verify_ssl,
page_size=self.config.page_size,
report=self.report,
skip_text_panels=self.config.skip_text_panels,
)

# Initialize lineage extractor with graph
Expand Down Expand Up @@ -496,47 +495,58 @@ def _process_panel_dataset(
env=self.env,
)

# Create dataset snapshot
dataset_snapshot = DatasetSnapshotClass(
urn=dataset_urn,
aspects=[
DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(
platform=self.platform,
instance=self.platform_instance,
)
if self.platform_instance
else None,
),
DatasetPropertiesClass(
name=f"{ds_uid} ({panel.title or panel.id})",
description="",
customProperties={
"type": ds_type,
"uid": ds_uid,
"full_path": dataset_name,
},
),
StatusClass(removed=False),
],
)
# Platform instance aspect
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(
platform=self.platform,
instance=self.platform_instance,
)
if self.platform_instance
else None,
),
).as_workunit()

# Dataset properties aspect
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=DatasetPropertiesClass(
name=f"{ds_uid} ({panel.title or panel.id})",
description="",
customProperties={
"type": ds_type,
"uid": ds_uid,
"full_path": dataset_name,
},
),
).as_workunit()

# Status aspect
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=StatusClass(removed=False),
).as_workunit()

# Add schema metadata if available
schema_fields = extract_fields_from_panel(
panel, self.config.connection_to_platform_map, self.ctx.graph, self.report
)
if schema_fields:
schema_metadata = SchemaMetadataClass(
schemaName=f"{ds_type}.{ds_uid}.{panel.id}",
platform=make_data_platform_urn(self.platform),
version=0,
fields=schema_fields,
hash="",
platformSchema=OtherSchemaClass(rawSchema=""),
)
dataset_snapshot.aspects.append(schema_metadata)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=SchemaMetadataClass(
schemaName=f"{ds_type}.{ds_uid}.{panel.id}",
platform=make_data_platform_urn(self.platform),
version=0,
fields=schema_fields,
hash="",
platformSchema=OtherSchemaClass(rawSchema=""),
),
).as_workunit()

# Add tags if available
if dashboard_uid and self.config.ingest_tags:
dashboard = self.api_client.get_dashboard(dashboard_uid)
if dashboard and dashboard.tags:
Expand All @@ -545,13 +555,12 @@ def _process_panel_dataset(
tags.append(TagAssociationClass(tag=make_tag_urn(tag)))

if tags:
dataset_snapshot.aspects.append(GlobalTagsClass(tags=tags))
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=GlobalTagsClass(tags=tags),
).as_workunit()

self.report.report_dataset_scanned()
yield MetadataWorkUnit(
id=f"grafana-dataset-{ds_uid}-{panel.id}",
mce=MetadataChangeEventClass(proposedSnapshot=dataset_snapshot),
)

# Add dataset to dashboard container
if dashboard_uid:
Expand Down
Loading
Loading