diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 9ddda6d58c627..bf1d29f7a5945 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -81,11 +81,24 @@ class StructuredLogLevel(Enum): ERROR = logging.ERROR +class StructuredLogCategory(Enum): + """ + This is used to categorise the errors mainly based on the biggest impact area + This is to be used to help in self-serve understand the impact of any log entry + More enums to be added as logs are updated to be self-serve + """ + + LINEAGE = "LINEAGE" + USAGE = "USAGE" + PROFILING = "PROFILING" + + @dataclass class StructuredLogEntry(Report): title: Optional[str] message: str context: LossyList[str] + log_category: Optional[StructuredLogCategory] = None @dataclass @@ -108,9 +121,10 @@ def report_log( exc: Optional[BaseException] = None, log: bool = False, stacklevel: int = 1, + log_category: Optional[StructuredLogCategory] = None, ) -> None: """ - Report a user-facing warning for the ingestion run. + Report a user-facing log for the ingestion run. Args: level: The level of the log entry. @@ -118,6 +132,9 @@ def report_log( title: The category / heading to present on for this message in the UI. context: Additional context (e.g. where, how) for the log entry. exc: The exception associated with the event. We'll show the stack trace when in debug mode. + log_category: The type of the log entry. This is used to categorise the log entry. + log: Whether to log the entry to the console. + stacklevel: The stack level to use for the log entry. """ # One for this method, and one for the containing report_* call. @@ -160,6 +177,7 @@ def report_log( title=title, message=message, context=context_list, + log_category=log_category, ) else: if context is not None: @@ -219,9 +237,19 @@ def report_warning( context: Optional[str] = None, title: Optional[LiteralString] = None, exc: Optional[BaseException] = None, + log_category: Optional[StructuredLogCategory] = None, ) -> None: + """ + See docs of StructuredLogs.report_log for details of args + """ self._structured_logs.report_log( - StructuredLogLevel.WARN, message, title, context, exc, log=False + StructuredLogLevel.WARN, + message, + title, + context, + exc, + log=False, + log_category=log_category, ) def warning( @@ -231,9 +259,19 @@ def warning( title: Optional[LiteralString] = None, exc: Optional[BaseException] = None, log: bool = True, + log_category: Optional[StructuredLogCategory] = None, ) -> None: + """ + See docs of StructuredLogs.report_log for details of args + """ self._structured_logs.report_log( - StructuredLogLevel.WARN, message, title, context, exc, log=log + StructuredLogLevel.WARN, + message, + title, + context, + exc, + log=log, + log_category=log_category, ) def report_failure( @@ -243,9 +281,19 @@ def report_failure( title: Optional[LiteralString] = None, exc: Optional[BaseException] = None, log: bool = True, + log_category: Optional[StructuredLogCategory] = None, ) -> None: + """ + See docs of StructuredLogs.report_log for details of args + """ self._structured_logs.report_log( - StructuredLogLevel.ERROR, message, title, context, exc, log=log + StructuredLogLevel.ERROR, + message, + title, + context, + exc, + log=log, + log_category=log_category, ) def failure( @@ -255,9 +303,19 @@ def failure( title: Optional[LiteralString] = None, exc: Optional[BaseException] = None, log: bool = True, + log_category: Optional[StructuredLogCategory] = None, ) -> None: + """ + See docs of StructuredLogs.report_log for details of args + """ self._structured_logs.report_log( - StructuredLogLevel.ERROR, message, title, context, exc, log=log + StructuredLogLevel.ERROR, + message, + title, + context, + exc, + log=log, + log_category=log_category, ) def info( @@ -267,9 +325,19 @@ def info( title: Optional[LiteralString] = None, exc: Optional[BaseException] = None, log: bool = True, + log_category: Optional[StructuredLogCategory] = None, ) -> None: + """ + See docs of StructuredLogs.report_log for details of args + """ self._structured_logs.report_log( - StructuredLogLevel.INFO, message, title, context, exc, log=log + StructuredLogLevel.INFO, + message, + title, + context, + exc, + log=log, + log_category=log_category, ) @contextlib.contextmanager @@ -279,6 +347,7 @@ def report_exc( title: Optional[LiteralString] = None, context: Optional[str] = None, level: StructuredLogLevel = StructuredLogLevel.ERROR, + log_category: Optional[StructuredLogCategory] = None, ) -> Iterator[None]: # Convenience method that helps avoid boilerplate try/except blocks. # TODO: I'm not super happy with the naming here - it's not obvious that this @@ -287,7 +356,12 @@ def report_exc( yield except Exception as exc: self._structured_logs.report_log( - level, message=message, title=title, context=context, exc=exc + level, + message=message, + title=title, + context=context, + exc=exc, + log_category=log_category, ) def __post_init__(self) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/autogenerated/capability_summary.json b/metadata-ingestion/src/datahub/ingestion/autogenerated/capability_summary.json index c38a3fdce7cf4..1fd14a3af6e7b 100644 --- a/metadata-ingestion/src/datahub/ingestion/autogenerated/capability_summary.json +++ b/metadata-ingestion/src/datahub/ingestion/autogenerated/capability_summary.json @@ -534,8 +534,8 @@ "datahub-mock-data": { "capabilities": [], "classname": "datahub.ingestion.source.mock_data.datahub_mock_data.DataHubMockDataSource", - "platform_id": "datahubmockdata", - "platform_name": "DataHubMockData", + "platform_id": "fake", + "platform_name": "fake", "support_status": "TESTING" }, "dbt": { diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index fe98c26e335fd..eb4dee3201efc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -16,7 +16,11 @@ platform_name, support_status, ) -from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport +from datahub.ingestion.api.source import ( + MetadataWorkUnitProcessor, + SourceReport, + StructuredLogCategory, +) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.fivetran.config import ( KNOWN_DATA_PLATFORM_MAPPING, @@ -96,8 +100,10 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s self.report.info( title="Guessing source platform for lineage", message="We encountered a connector type that we don't fully support yet. " - "We will attempt to guess the platform based on the connector type.", - context=f"{connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})", + "We will attempt to guess the platform based on the connector type. " + "Note that we use connector_id as the key not connector_name which you may see in the UI of Fivetran. ", + context=f"connector_name: {connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})", + log_category=StructuredLogCategory.LINEAGE, ) source_details.platform = connector.connector_type diff --git a/metadata-ingestion/src/datahub/ingestion/source/mock_data/datahub_mock_data.py b/metadata-ingestion/src/datahub/ingestion/source/mock_data/datahub_mock_data.py index 6522f8222acdf..9077536a7172f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mock_data/datahub_mock_data.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mock_data/datahub_mock_data.py @@ -13,7 +13,7 @@ platform_name, support_status, ) -from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.source import Source, SourceReport, StructuredLogCategory from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.common.subtypes import DatasetSubTypes from datahub.ingestion.source.mock_data.datahub_mock_data_report import ( @@ -35,6 +35,8 @@ logger = logging.getLogger(__name__) +PLATFORM_NAME = "fake" + class SubTypePattern(StrEnum): ALTERNATING = "alternating" @@ -144,7 +146,7 @@ class DataHubMockDataConfig(ConfigModel): ) -@platform_name("DataHubMockData") +@platform_name(PLATFORM_NAME) @config_class(DataHubMockDataConfig) @support_status(SupportStatus.TESTING) class DataHubMockDataSource(Source): @@ -176,6 +178,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: message="This is test warning", title="Test Warning", context=f"This is test warning {i}", + log_category=StructuredLogCategory.LINEAGE, ) # We don't want any implicit aspects to be produced @@ -309,7 +312,7 @@ def _get_subtypes_aspect( table_level, table_index, subtype_pattern, subtype_types, level_subtypes ) - urn = make_dataset_urn(platform="fake", name=table_name) + urn = make_dataset_urn(platform=PLATFORM_NAME, name=table_name) mcp = MetadataChangeProposalWrapper( entityUrn=urn, entityType="dataset", @@ -433,7 +436,7 @@ def _generate_downstream_lineage( def _get_status_aspect(self, table: str) -> MetadataWorkUnit: urn = make_dataset_urn( - platform="fake", + platform=PLATFORM_NAME, name=table, ) mcp = MetadataChangeProposalWrapper( @@ -448,7 +451,7 @@ def _get_upstream_aspect( ) -> MetadataWorkUnit: mcp = MetadataChangeProposalWrapper( entityUrn=make_dataset_urn( - platform="fake", + platform=PLATFORM_NAME, name=downstream_table, ), entityType="dataset", @@ -456,7 +459,7 @@ def _get_upstream_aspect( upstreams=[ UpstreamClass( dataset=make_dataset_urn( - platform="fake", + platform=PLATFORM_NAME, name=upstream_table, ), type=DatasetLineageTypeClass.TRANSFORMED, @@ -468,7 +471,7 @@ def _get_upstream_aspect( def _get_profile_aspect(self, table: str) -> MetadataWorkUnit: urn = make_dataset_urn( - platform="fake", + platform=PLATFORM_NAME, name=table, ) mcp = MetadataChangeProposalWrapper( @@ -485,7 +488,7 @@ def _get_profile_aspect(self, table: str) -> MetadataWorkUnit: def _get_usage_aspect(self, table: str) -> MetadataWorkUnit: urn = make_dataset_urn( - platform="fake", + platform=PLATFORM_NAME, name=table, ) mcp = MetadataChangeProposalWrapper(