Skip to content

feat(ingest): add structured log category #14229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 1, 2025
88 changes: 81 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,24 @@ class StructuredLogLevel(Enum):
ERROR = logging.ERROR


class StructuredLogCategory(Enum):
"""
This is used to categorise the errors mainly based on the biggest impact area
This is to be used to help in self-serve understand the impact of any log entry
More enums to be added as logs are updated to be self-serve
"""

LINEAGE = "LINEAGE"
USAGE = "USAGE"
PROFILING = "PROFILING"


@dataclass
class StructuredLogEntry(Report):
title: Optional[str]
message: str
context: LossyList[str]
log_category: Optional[StructuredLogCategory] = None


@dataclass
Expand All @@ -108,16 +121,20 @@ def report_log(
exc: Optional[BaseException] = None,
log: bool = False,
stacklevel: int = 1,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
Report a user-facing warning for the ingestion run.
Report a user-facing log for the ingestion run.

Args:
level: The level of the log entry.
message: The main message associated with the report entry. This should be a human-readable message.
title: The category / heading to present on for this message in the UI.
context: Additional context (e.g. where, how) for the log entry.
exc: The exception associated with the event. We'll show the stack trace when in debug mode.
log_category: The type of the log entry. This is used to categorise the log entry.
log: Whether to log the entry to the console.
stacklevel: The stack level to use for the log entry.
"""

# One for this method, and one for the containing report_* call.
Expand Down Expand Up @@ -160,6 +177,7 @@ def report_log(
title=title,
message=message,
context=context_list,
log_category=log_category,
)
else:
if context is not None:
Expand Down Expand Up @@ -219,9 +237,19 @@ def report_warning(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
See docs of StructuredLogs.report_log for details of args
"""
self._structured_logs.report_log(
StructuredLogLevel.WARN, message, title, context, exc, log=False
StructuredLogLevel.WARN,
message,
title,
context,
exc,
log=False,
log_category=log_category,
)

def warning(
Expand All @@ -231,9 +259,19 @@ def warning(
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
See docs of StructuredLogs.report_log for details of args
"""
self._structured_logs.report_log(
StructuredLogLevel.WARN, message, title, context, exc, log=log
StructuredLogLevel.WARN,
message,
title,
context,
exc,
log=log,
log_category=log_category,
)

def report_failure(
Expand All @@ -243,9 +281,19 @@ def report_failure(
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
See docs of StructuredLogs.report_log for details of args
"""
self._structured_logs.report_log(
StructuredLogLevel.ERROR, message, title, context, exc, log=log
StructuredLogLevel.ERROR,
message,
title,
context,
exc,
log=log,
log_category=log_category,
)

def failure(
Expand All @@ -255,9 +303,19 @@ def failure(
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
See docs of StructuredLogs.report_log for details of args
"""
self._structured_logs.report_log(
StructuredLogLevel.ERROR, message, title, context, exc, log=log
StructuredLogLevel.ERROR,
message,
title,
context,
exc,
log=log,
log_category=log_category,
)

def info(
Expand All @@ -267,9 +325,19 @@ def info(
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
See docs of StructuredLogs.report_log for details of args
"""
self._structured_logs.report_log(
StructuredLogLevel.INFO, message, title, context, exc, log=log
StructuredLogLevel.INFO,
message,
title,
context,
exc,
log=log,
log_category=log_category,
)

@contextlib.contextmanager
Expand All @@ -279,6 +347,7 @@ def report_exc(
title: Optional[LiteralString] = None,
context: Optional[str] = None,
level: StructuredLogLevel = StructuredLogLevel.ERROR,
log_category: Optional[StructuredLogCategory] = None,
) -> Iterator[None]:
# Convenience method that helps avoid boilerplate try/except blocks.
# TODO: I'm not super happy with the naming here - it's not obvious that this
Expand All @@ -287,7 +356,12 @@ def report_exc(
yield
except Exception as exc:
self._structured_logs.report_log(
level, message=message, title=title, context=context, exc=exc
level,
message=message,
title=title,
context=context,
exc=exc,
log_category=log_category,
)

def __post_init__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,8 @@
"datahub-mock-data": {
"capabilities": [],
"classname": "datahub.ingestion.source.mock_data.datahub_mock_data.DataHubMockDataSource",
"platform_id": "datahubmockdata",
"platform_name": "DataHubMockData",
"platform_id": "fake",
"platform_name": "fake",
"support_status": "TESTING"
},
"dbt": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.source import (
MetadataWorkUnitProcessor,
SourceReport,
StructuredLogCategory,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.fivetran.config import (
KNOWN_DATA_PLATFORM_MAPPING,
Expand Down Expand Up @@ -96,8 +100,10 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
self.report.info(
title="Guessing source platform for lineage",
message="We encountered a connector type that we don't fully support yet. "
"We will attempt to guess the platform based on the connector type.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})",
"We will attempt to guess the platform based on the connector type. "
"Note that we use connector_id as the key not connector_name which you may see in the UI of Fivetran. ",
context=f"connector_name: {connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})",
log_category=StructuredLogCategory.LINEAGE,
)
source_details.platform = connector.connector_type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import Source, SourceReport, StructuredLogCategory
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.mock_data.datahub_mock_data_report import (
Expand All @@ -35,6 +35,8 @@

logger = logging.getLogger(__name__)

PLATFORM_NAME = "fake"


class SubTypePattern(StrEnum):
ALTERNATING = "alternating"
Expand Down Expand Up @@ -144,7 +146,7 @@ class DataHubMockDataConfig(ConfigModel):
)


@platform_name("DataHubMockData")
@platform_name(PLATFORM_NAME)
@config_class(DataHubMockDataConfig)
@support_status(SupportStatus.TESTING)
class DataHubMockDataSource(Source):
Expand Down Expand Up @@ -176,6 +178,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
message="This is test warning",
title="Test Warning",
context=f"This is test warning {i}",
log_category=StructuredLogCategory.LINEAGE,
)

# We don't want any implicit aspects to be produced
Expand Down Expand Up @@ -309,7 +312,7 @@ def _get_subtypes_aspect(
table_level, table_index, subtype_pattern, subtype_types, level_subtypes
)

urn = make_dataset_urn(platform="fake", name=table_name)
urn = make_dataset_urn(platform=PLATFORM_NAME, name=table_name)
mcp = MetadataChangeProposalWrapper(
entityUrn=urn,
entityType="dataset",
Expand Down Expand Up @@ -433,7 +436,7 @@ def _generate_downstream_lineage(

def _get_status_aspect(self, table: str) -> MetadataWorkUnit:
urn = make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=table,
)
mcp = MetadataChangeProposalWrapper(
Expand All @@ -448,15 +451,15 @@ def _get_upstream_aspect(
) -> MetadataWorkUnit:
mcp = MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=downstream_table,
),
entityType="dataset",
aspect=UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=upstream_table,
),
type=DatasetLineageTypeClass.TRANSFORMED,
Expand All @@ -468,7 +471,7 @@ def _get_upstream_aspect(

def _get_profile_aspect(self, table: str) -> MetadataWorkUnit:
urn = make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=table,
)
mcp = MetadataChangeProposalWrapper(
Expand All @@ -485,7 +488,7 @@ def _get_profile_aspect(self, table: str) -> MetadataWorkUnit:

def _get_usage_aspect(self, table: str) -> MetadataWorkUnit:
urn = make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=table,
)
mcp = MetadataChangeProposalWrapper(
Expand Down
Loading