Skip to content

Commit 0f44d60

Browse files
authored
feat(ingest): add structured log category (#14229)
1 parent aac5982 commit 0f44d60

File tree

4 files changed

+103
-20
lines changed

4 files changed

+103
-20
lines changed

metadata-ingestion/src/datahub/ingestion/api/source.py

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,24 @@ class StructuredLogLevel(Enum):
8181
ERROR = logging.ERROR
8282

8383

84+
class StructuredLogCategory(Enum):
85+
"""
86+
This is used to categorise the errors mainly based on the biggest impact area
87+
This is to be used to help in self-serve understand the impact of any log entry
88+
More enums to be added as logs are updated to be self-serve
89+
"""
90+
91+
LINEAGE = "LINEAGE"
92+
USAGE = "USAGE"
93+
PROFILING = "PROFILING"
94+
95+
8496
@dataclass
8597
class StructuredLogEntry(Report):
8698
title: Optional[str]
8799
message: str
88100
context: LossyList[str]
101+
log_category: Optional[StructuredLogCategory] = None
89102

90103

91104
@dataclass
@@ -108,16 +121,20 @@ def report_log(
108121
exc: Optional[BaseException] = None,
109122
log: bool = False,
110123
stacklevel: int = 1,
124+
log_category: Optional[StructuredLogCategory] = None,
111125
) -> None:
112126
"""
113-
Report a user-facing warning for the ingestion run.
127+
Report a user-facing log for the ingestion run.
114128
115129
Args:
116130
level: The level of the log entry.
117131
message: The main message associated with the report entry. This should be a human-readable message.
118132
title: The category / heading to present on for this message in the UI.
119133
context: Additional context (e.g. where, how) for the log entry.
120134
exc: The exception associated with the event. We'll show the stack trace when in debug mode.
135+
log_category: The type of the log entry. This is used to categorise the log entry.
136+
log: Whether to log the entry to the console.
137+
stacklevel: The stack level to use for the log entry.
121138
"""
122139

123140
# One for this method, and one for the containing report_* call.
@@ -160,6 +177,7 @@ def report_log(
160177
title=title,
161178
message=message,
162179
context=context_list,
180+
log_category=log_category,
163181
)
164182
else:
165183
if context is not None:
@@ -219,9 +237,19 @@ def report_warning(
219237
context: Optional[str] = None,
220238
title: Optional[LiteralString] = None,
221239
exc: Optional[BaseException] = None,
240+
log_category: Optional[StructuredLogCategory] = None,
222241
) -> None:
242+
"""
243+
See docs of StructuredLogs.report_log for details of args
244+
"""
223245
self._structured_logs.report_log(
224-
StructuredLogLevel.WARN, message, title, context, exc, log=False
246+
StructuredLogLevel.WARN,
247+
message,
248+
title,
249+
context,
250+
exc,
251+
log=False,
252+
log_category=log_category,
225253
)
226254

227255
def warning(
@@ -231,9 +259,19 @@ def warning(
231259
title: Optional[LiteralString] = None,
232260
exc: Optional[BaseException] = None,
233261
log: bool = True,
262+
log_category: Optional[StructuredLogCategory] = None,
234263
) -> None:
264+
"""
265+
See docs of StructuredLogs.report_log for details of args
266+
"""
235267
self._structured_logs.report_log(
236-
StructuredLogLevel.WARN, message, title, context, exc, log=log
268+
StructuredLogLevel.WARN,
269+
message,
270+
title,
271+
context,
272+
exc,
273+
log=log,
274+
log_category=log_category,
237275
)
238276

239277
def report_failure(
@@ -243,9 +281,19 @@ def report_failure(
243281
title: Optional[LiteralString] = None,
244282
exc: Optional[BaseException] = None,
245283
log: bool = True,
284+
log_category: Optional[StructuredLogCategory] = None,
246285
) -> None:
286+
"""
287+
See docs of StructuredLogs.report_log for details of args
288+
"""
247289
self._structured_logs.report_log(
248-
StructuredLogLevel.ERROR, message, title, context, exc, log=log
290+
StructuredLogLevel.ERROR,
291+
message,
292+
title,
293+
context,
294+
exc,
295+
log=log,
296+
log_category=log_category,
249297
)
250298

251299
def failure(
@@ -255,9 +303,19 @@ def failure(
255303
title: Optional[LiteralString] = None,
256304
exc: Optional[BaseException] = None,
257305
log: bool = True,
306+
log_category: Optional[StructuredLogCategory] = None,
258307
) -> None:
308+
"""
309+
See docs of StructuredLogs.report_log for details of args
310+
"""
259311
self._structured_logs.report_log(
260-
StructuredLogLevel.ERROR, message, title, context, exc, log=log
312+
StructuredLogLevel.ERROR,
313+
message,
314+
title,
315+
context,
316+
exc,
317+
log=log,
318+
log_category=log_category,
261319
)
262320

263321
def info(
@@ -267,9 +325,19 @@ def info(
267325
title: Optional[LiteralString] = None,
268326
exc: Optional[BaseException] = None,
269327
log: bool = True,
328+
log_category: Optional[StructuredLogCategory] = None,
270329
) -> None:
330+
"""
331+
See docs of StructuredLogs.report_log for details of args
332+
"""
271333
self._structured_logs.report_log(
272-
StructuredLogLevel.INFO, message, title, context, exc, log=log
334+
StructuredLogLevel.INFO,
335+
message,
336+
title,
337+
context,
338+
exc,
339+
log=log,
340+
log_category=log_category,
273341
)
274342

275343
@contextlib.contextmanager
@@ -279,6 +347,7 @@ def report_exc(
279347
title: Optional[LiteralString] = None,
280348
context: Optional[str] = None,
281349
level: StructuredLogLevel = StructuredLogLevel.ERROR,
350+
log_category: Optional[StructuredLogCategory] = None,
282351
) -> Iterator[None]:
283352
# Convenience method that helps avoid boilerplate try/except blocks.
284353
# TODO: I'm not super happy with the naming here - it's not obvious that this
@@ -287,7 +356,12 @@ def report_exc(
287356
yield
288357
except Exception as exc:
289358
self._structured_logs.report_log(
290-
level, message=message, title=title, context=context, exc=exc
359+
level,
360+
message=message,
361+
title=title,
362+
context=context,
363+
exc=exc,
364+
log_category=log_category,
291365
)
292366

293367
def __post_init__(self) -> None:

metadata-ingestion/src/datahub/ingestion/autogenerated/capability_summary.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -534,8 +534,8 @@
534534
"datahub-mock-data": {
535535
"capabilities": [],
536536
"classname": "datahub.ingestion.source.mock_data.datahub_mock_data.DataHubMockDataSource",
537-
"platform_id": "datahubmockdata",
538-
"platform_name": "DataHubMockData",
537+
"platform_id": "fake",
538+
"platform_name": "fake",
539539
"support_status": "TESTING"
540540
},
541541
"dbt": {

metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616
platform_name,
1717
support_status,
1818
)
19-
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
19+
from datahub.ingestion.api.source import (
20+
MetadataWorkUnitProcessor,
21+
SourceReport,
22+
StructuredLogCategory,
23+
)
2024
from datahub.ingestion.api.workunit import MetadataWorkUnit
2125
from datahub.ingestion.source.fivetran.config import (
2226
KNOWN_DATA_PLATFORM_MAPPING,
@@ -96,8 +100,10 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
96100
self.report.info(
97101
title="Guessing source platform for lineage",
98102
message="We encountered a connector type that we don't fully support yet. "
99-
"We will attempt to guess the platform based on the connector type.",
100-
context=f"{connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})",
103+
"We will attempt to guess the platform based on the connector type. "
104+
"Note that we use connector_id as the key not connector_name which you may see in the UI of Fivetran. ",
105+
context=f"connector_name: {connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})",
106+
log_category=StructuredLogCategory.LINEAGE,
101107
)
102108
source_details.platform = connector.connector_type
103109

metadata-ingestion/src/datahub/ingestion/source/mock_data/datahub_mock_data.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
platform_name,
1414
support_status,
1515
)
16-
from datahub.ingestion.api.source import Source, SourceReport
16+
from datahub.ingestion.api.source import Source, SourceReport, StructuredLogCategory
1717
from datahub.ingestion.api.workunit import MetadataWorkUnit
1818
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
1919
from datahub.ingestion.source.mock_data.datahub_mock_data_report import (
@@ -35,6 +35,8 @@
3535

3636
logger = logging.getLogger(__name__)
3737

38+
PLATFORM_NAME = "fake"
39+
3840

3941
class SubTypePattern(StrEnum):
4042
ALTERNATING = "alternating"
@@ -144,7 +146,7 @@ class DataHubMockDataConfig(ConfigModel):
144146
)
145147

146148

147-
@platform_name("DataHubMockData")
149+
@platform_name(PLATFORM_NAME)
148150
@config_class(DataHubMockDataConfig)
149151
@support_status(SupportStatus.TESTING)
150152
class DataHubMockDataSource(Source):
@@ -176,6 +178,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
176178
message="This is test warning",
177179
title="Test Warning",
178180
context=f"This is test warning {i}",
181+
log_category=StructuredLogCategory.LINEAGE,
179182
)
180183

181184
# We don't want any implicit aspects to be produced
@@ -309,7 +312,7 @@ def _get_subtypes_aspect(
309312
table_level, table_index, subtype_pattern, subtype_types, level_subtypes
310313
)
311314

312-
urn = make_dataset_urn(platform="fake", name=table_name)
315+
urn = make_dataset_urn(platform=PLATFORM_NAME, name=table_name)
313316
mcp = MetadataChangeProposalWrapper(
314317
entityUrn=urn,
315318
entityType="dataset",
@@ -433,7 +436,7 @@ def _generate_downstream_lineage(
433436

434437
def _get_status_aspect(self, table: str) -> MetadataWorkUnit:
435438
urn = make_dataset_urn(
436-
platform="fake",
439+
platform=PLATFORM_NAME,
437440
name=table,
438441
)
439442
mcp = MetadataChangeProposalWrapper(
@@ -448,15 +451,15 @@ def _get_upstream_aspect(
448451
) -> MetadataWorkUnit:
449452
mcp = MetadataChangeProposalWrapper(
450453
entityUrn=make_dataset_urn(
451-
platform="fake",
454+
platform=PLATFORM_NAME,
452455
name=downstream_table,
453456
),
454457
entityType="dataset",
455458
aspect=UpstreamLineageClass(
456459
upstreams=[
457460
UpstreamClass(
458461
dataset=make_dataset_urn(
459-
platform="fake",
462+
platform=PLATFORM_NAME,
460463
name=upstream_table,
461464
),
462465
type=DatasetLineageTypeClass.TRANSFORMED,
@@ -468,7 +471,7 @@ def _get_upstream_aspect(
468471

469472
def _get_profile_aspect(self, table: str) -> MetadataWorkUnit:
470473
urn = make_dataset_urn(
471-
platform="fake",
474+
platform=PLATFORM_NAME,
472475
name=table,
473476
)
474477
mcp = MetadataChangeProposalWrapper(
@@ -485,7 +488,7 @@ def _get_profile_aspect(self, table: str) -> MetadataWorkUnit:
485488

486489
def _get_usage_aspect(self, table: str) -> MetadataWorkUnit:
487490
urn = make_dataset_urn(
488-
platform="fake",
491+
platform=PLATFORM_NAME,
489492
name=table,
490493
)
491494
mcp = MetadataChangeProposalWrapper(

0 commit comments

Comments
 (0)