Skip to content

feat(ingest): add structured log type #14229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,16 @@ class StructuredLogLevel(Enum):
ERROR = logging.ERROR


class StructuredLogType(Enum):
LINEAGE = "LINEAGE"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make it more clear, can we do LINEAGE, USAGE, PROFILING?



@dataclass
class StructuredLogEntry(Report):
title: Optional[str]
message: str
context: LossyList[str]
log_type: Optional[StructuredLogType] = None


@dataclass
Expand All @@ -108,6 +113,7 @@ def report_log(
exc: Optional[BaseException] = None,
log: bool = False,
stacklevel: int = 1,
log_type: Optional[StructuredLogType] = None,
) -> None:
"""
Report a user-facing warning for the ingestion run.
Expand Down Expand Up @@ -160,6 +166,7 @@ def report_log(
title=title,
message=message,
context=context_list,
log_type=log_type,
)
else:
if context is not None:
Expand Down Expand Up @@ -219,9 +226,16 @@ def report_warning(
context: Optional[str] = None,
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log_type: Optional[StructuredLogType] = None,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.WARN, message, title, context, exc, log=False
StructuredLogLevel.WARN,
message,
title,
context,
exc,
log=False,
log_type=log_type,
)

def warning(
Expand All @@ -231,9 +245,16 @@ def warning(
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
log_type: Optional[StructuredLogType] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets rename to log_category, since type would make me think that it would be warn/error/etc

) -> None:
self._structured_logs.report_log(
StructuredLogLevel.WARN, message, title, context, exc, log=log
StructuredLogLevel.WARN,
message,
title,
context,
exc,
log=log,
log_type=log_type,
)

def report_failure(
Expand All @@ -243,9 +264,16 @@ def report_failure(
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
log_type: Optional[StructuredLogType] = None,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.ERROR, message, title, context, exc, log=log
StructuredLogLevel.ERROR,
message,
title,
context,
exc,
log=log,
log_type=log_type,
)

def failure(
Expand All @@ -267,9 +295,16 @@ def info(
title: Optional[LiteralString] = None,
exc: Optional[BaseException] = None,
log: bool = True,
log_type: Optional[StructuredLogType] = None,
) -> None:
self._structured_logs.report_log(
StructuredLogLevel.INFO, message, title, context, exc, log=log
StructuredLogLevel.INFO,
message,
title,
context,
exc,
log=log,
log_type=log_type,
)

@contextlib.contextmanager
Expand All @@ -279,6 +314,7 @@ def report_exc(
title: Optional[LiteralString] = None,
context: Optional[str] = None,
level: StructuredLogLevel = StructuredLogLevel.ERROR,
log_type: Optional[StructuredLogType] = None,
) -> Iterator[None]:
# Convenience method that helps avoid boilerplate try/except blocks.
# TODO: I'm not super happy with the naming here - it's not obvious that this
Expand All @@ -287,7 +323,12 @@ def report_exc(
yield
except Exception as exc:
self._structured_logs.report_log(
level, message=message, title=title, context=context, exc=exc
level,
message=message,
title=title,
context=context,
exc=exc,
log_type=log_type,
)

def __post_init__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.source import (
MetadataWorkUnitProcessor,
SourceReport,
StructuredLogType,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.fivetran.config import (
KNOWN_DATA_PLATFORM_MAPPING,
Expand Down Expand Up @@ -96,8 +100,10 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
self.report.info(
title="Guessing source platform for lineage",
message="We encountered a connector type that we don't fully support yet. "
"We will attempt to guess the platform based on the connector type.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})",
"We will attempt to guess the platform based on the connector type. "
"Note that we use connector_id as the key not connector_name which you may see in the UI of Fivetran. ",
context=f"connector_name: {connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})",
log_type=StructuredLogType.LINEAGE,
)
source_details.platform = connector.connector_type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import Source, SourceReport, StructuredLogType
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.mock_data.datahub_mock_data_report import (
Expand All @@ -35,6 +35,8 @@

logger = logging.getLogger(__name__)

PLATFORM_NAME = "fake"


class SubTypePattern(StrEnum):
ALTERNATING = "alternating"
Expand Down Expand Up @@ -144,7 +146,7 @@ class DataHubMockDataConfig(ConfigModel):
)


@platform_name("DataHubMockData")
@platform_name(PLATFORM_NAME)
@config_class(DataHubMockDataConfig)
@support_status(SupportStatus.TESTING)
class DataHubMockDataSource(Source):
Expand Down Expand Up @@ -176,6 +178,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
message="This is test warning",
title="Test Warning",
context=f"This is test warning {i}",
log_type=StructuredLogType.LINEAGE,
)

# We don't want any implicit aspects to be produced
Expand Down Expand Up @@ -309,7 +312,7 @@ def _get_subtypes_aspect(
table_level, table_index, subtype_pattern, subtype_types, level_subtypes
)

urn = make_dataset_urn(platform="fake", name=table_name)
urn = make_dataset_urn(platform=PLATFORM_NAME, name=table_name)
mcp = MetadataChangeProposalWrapper(
entityUrn=urn,
entityType="dataset",
Expand Down Expand Up @@ -433,7 +436,7 @@ def _generate_downstream_lineage(

def _get_status_aspect(self, table: str) -> MetadataWorkUnit:
urn = make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=table,
)
mcp = MetadataChangeProposalWrapper(
Expand All @@ -448,15 +451,15 @@ def _get_upstream_aspect(
) -> MetadataWorkUnit:
mcp = MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=downstream_table,
),
entityType="dataset",
aspect=UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=upstream_table,
),
type=DatasetLineageTypeClass.TRANSFORMED,
Expand All @@ -468,7 +471,7 @@ def _get_upstream_aspect(

def _get_profile_aspect(self, table: str) -> MetadataWorkUnit:
urn = make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=table,
)
mcp = MetadataChangeProposalWrapper(
Expand All @@ -485,7 +488,7 @@ def _get_profile_aspect(self, table: str) -> MetadataWorkUnit:

def _get_usage_aspect(self, table: str) -> MetadataWorkUnit:
urn = make_dataset_urn(
platform="fake",
platform=PLATFORM_NAME,
name=table,
)
mcp = MetadataChangeProposalWrapper(
Expand Down
Loading