Skip to content
88 changes: 81 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,24 @@ class StructuredLogLevel(Enum):
ERROR = logging.ERROR


class StructuredLogCategory(Enum):
"""
This is used to categorise the errors mainly based on the biggest impact area
This is to be used to help in self-serve understand the impact of any log entry
More enums to be added as logs are updated to be self-serve
"""

LINEAGE = "LINEAGE"
USAGE = "USAGE"
PROFILING = "PROFILING"


@dataclass
class StructuredLogEntry(Report):
title: Optional[str]
message: str
context: LossyList[str]
log_category: Optional[StructuredLogCategory] = None


@dataclass
Expand All @@ -108,16 +121,20 @@ def report_log(
exc: Optional[BaseException] = None,
log: bool = False,
stacklevel: int = 1,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
Report a user-facing warning for the ingestion run.
Report a user-facing log for the ingestion run.

Args:
level: The level of the log entry.
message: The main message associated with the report entry. This should be a human-readable message.
title: The category / heading to present on for this message in the UI.
context: Additional context (e.g. where, how) for the log entry.
exc: The exception associated with the event. We'll show the stack trace when in debug mode.
log_category: The type of the log entry. This is used to categorise the log entry.
log: Whether to log the entry to the console.
stacklevel: The stack level to use for the log entry.
"""

# One for this method, and one for the containing report_* call.
Expand Down Expand Up @@ -160,6 +177,7 @@ def report_log(
title=title,
message=message,
context=context_list,
log_category=log_category,
)
else:
if context is not None:
Expand Down Expand Up @@ -219,9 +237,19 @@ def report_warning(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
See docs of StructuredLogs.report_log for details of args
"""
self._structured_logs.report_log(
StructuredLogLevel.WARN, message, title, context, exc, log=False
StructuredLogLevel.WARN,
message,
title,
context,
exc,
log=False,
log_category=log_category,
)

def warning(
Expand All @@ -231,9 +259,19 @@ def warning(
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
See docs of StructuredLogs.report_log for details of args
"""
self._structured_logs.report_log(
StructuredLogLevel.WARN, message, title, context, exc, log=log
StructuredLogLevel.WARN,
message,
title,
context,
exc,
log=log,
log_category=log_category,
)

def report_failure(
Expand All @@ -243,9 +281,19 @@ def report_failure(
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
See docs of StructuredLogs.report_log for details of args
"""
self._structured_logs.report_log(
StructuredLogLevel.ERROR, message, title, context, exc, log=log
StructuredLogLevel.ERROR,
message,
title,
context,
exc,
log=log,
log_category=log_category,
)

def failure(
Expand All @@ -255,9 +303,19 @@ def failure(
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
See docs of StructuredLogs.report_log for details of args
"""
self._structured_logs.report_log(
StructuredLogLevel.ERROR, message, title, context, exc, log=log
StructuredLogLevel.ERROR,
message,
title,
context,
exc,
log=log,
log_category=log_category,
)

def info(
Expand All @@ -267,9 +325,19 @@ def info(
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
log_category: Optional[StructuredLogCategory] = None,
) -> None:
"""
See docs of StructuredLogs.report_log for details of args
"""
self._structured_logs.report_log(
StructuredLogLevel.INFO, message, title, context, exc, log=log
StructuredLogLevel.INFO,
message,
title,
context,
exc,
log=log,
log_category=log_category,
)

@contextlib.contextmanager
Expand All @@ -279,6 +347,7 @@ def report_exc(
title: Optional[LiteralString] = None,
context: Optional[str] = None,
level: StructuredLogLevel = StructuredLogLevel.ERROR,
log_category: Optional[StructuredLogCategory] = None,
) -> Iterator[None]:
# Convenience method that helps avoid boilerplate try/except blocks.
# TODO: I'm not super happy with the naming here - it's not obvious that this
Expand All @@ -287,7 +356,12 @@ def report_exc(
yield
except Exception as exc:
self._structured_logs.report_log(
level, message=message, title=title, context=context, exc=exc
level,
message=message,
title=title,
context=context,
exc=exc,
log_category=log_category,
)

def __post_init__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,8 @@
"datahub-mock-data": {
"capabilities": [],
"classname": "datahub.ingestion.source.mock_data.datahub_mock_data.DataHubMockDataSource",
"platform_id": "datahubmockdata",
"platform_name": "DataHubMockData",
"platform_id": "fake",
"platform_name": "fake",
"support_status": "TESTING"
},
"dbt": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.source import (
MetadataWorkUnitProcessor,
SourceReport,
StructuredLogCategory,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.fivetran.config import (
KNOWN_DATA_PLATFORM_MAPPING,
Expand Down Expand Up @@ -96,8 +100,10 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
self.report.info(
title="Guessing source platform for lineage",
message="We encountered a connector type that we don't fully support yet. "
"We will attempt to guess the platform based on the connector type.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})",
"We will attempt to guess the platform based on the connector type. "
"Note that we use connector_id as the key not connector_name which you may see in the UI of Fivetran. ",
context=f"connector_name: {connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})",
log_category=StructuredLogCategory.LINEAGE,
)
source_details.platform = connector.connector_type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import Source, SourceReport, StructuredLogCategory
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.mock_data.datahub_mock_data_report import (
Expand All @@ -35,6 +35,8 @@

logger = logging.getLogger(__name__)

PLATFORM_NAME = "fake"


class SubTypePattern(StrEnum):
ALTERNATING = "alternating"
Expand Down Expand Up @@ -144,7 +146,7 @@ class DataHubMockDataConfig(ConfigModel):
)


@platform_name("DataHubMockData")
@platform_name(PLATFORM_NAME)
@config_class(DataHubMockDataConfig)
@support_status(SupportStatus.TESTING)
class DataHubMockDataSource(Source):
Expand Down Expand Up @@ -176,6 +178,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
message="This is test warning",
title="Test Warning",
context=f"This is test warning {i}",
log_category=StructuredLogCategory.LINEAGE,
)

# We don't want any implicit aspects to be produced
Expand Down Expand Up @@ -309,7 +312,7 @@ def _get_subtypes_aspect(
table_level, table_index, subtype_pattern, subtype_types, level_subtypes
)

urn = make_dataset_urn(platform="fake", name=table_name)
urn = make_dataset_urn(platform=PLATFORM_NAME, name=table_name)
mcp = MetadataChangeProposalWrapper(
entityUrn=urn,
entityType="dataset",
Expand Down Expand Up @@ -433,7 +436,7 @@ def _generate_downstream_lineage(

def _get_status_aspect(self, table: str) -> MetadataWorkUnit:
urn = make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=table,
)
mcp = MetadataChangeProposalWrapper(
Expand All @@ -448,15 +451,15 @@ def _get_upstream_aspect(
) -> MetadataWorkUnit:
mcp = MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=downstream_table,
),
entityType="dataset",
aspect=UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=upstream_table,
),
type=DatasetLineageTypeClass.TRANSFORMED,
Expand All @@ -468,7 +471,7 @@ def _get_upstream_aspect(

def _get_profile_aspect(self, table: str) -> MetadataWorkUnit:
urn = make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=table,
)
mcp = MetadataChangeProposalWrapper(
Expand All @@ -485,7 +488,7 @@ def _get_profile_aspect(self, table: str) -> MetadataWorkUnit:

def _get_usage_aspect(self, table: str) -> MetadataWorkUnit:
urn = make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=table,
)
mcp = MetadataChangeProposalWrapper(
Expand Down
Loading