Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,34 @@ public final Stream<AspectValidationException> validateProposed(
@Nonnull Collection<? extends BatchItem> mcpItems,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach is not secure because the validateProposed method is executed outside of a transaction and the aspect lookup is subject to caching. The validation of a non-existent domain can only safely be done within a database transaction, otherwise it is subject to race conditions which could lead to privilege escalation.

@Nonnull RetrieverContext retrieverContext,
@Nullable AuthorizationSession session) {
return validateProposedAspects(
// Keep original batch for cross-aspect lookups (e.g., domain lookups in
// getEntityDomainsFromBatchOrDB)
Collection<? extends BatchItem> originalBatch = mcpItems;

// Filter to only items this validator should actually process
// Domains are NOT included here - they're only used for lookups via originalBatch
Collection<? extends BatchItem> filteredBatch =
mcpItems.stream()
.filter(i -> shouldApply(i.getChangeType(), i.getUrn(), i.getAspectName()))
.collect(Collectors.toList()),
retrieverContext,
session);
.collect(Collectors.toList());

return validateProposedAspectsWithOriginalBatch(
filteredBatch, originalBatch, retrieverContext, session);
}

/**
* Validate aspects with access to both filtered and original batch. The filtered batch contains
* only aspects this validator should process. The original batch contains ALL aspects for
* cross-aspect lookups (e.g., domain lookups).
*
* <p>Default implementation delegates to existing method for backward compatibility.
*/
protected Stream<AspectValidationException> validateProposedAspectsWithOriginalBatch(
@Nonnull Collection<? extends BatchItem> filteredBatch,
@Nonnull Collection<? extends BatchItem> originalBatch,
@Nonnull RetrieverContext retrieverContext,
@Nullable AuthorizationSession session) {
return validateProposedAspects(filteredBatch, retrieverContext, session);
}

/**
Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,12 @@ def _create_iceberg_table_aspects(
yield self._create_browse_paths_aspect(dpi.instance, str(namespace_urn))
yield ContainerClass(container=str(namespace_urn))

# Support for default_domain config option
if self.config.default_domain:
from datahub.metadata.schema_classes import DomainsClass

yield DomainsClass(domains=[self.config.default_domain])

self.report.report_table_processing_time(
timer.elapsed_seconds(), dataset_name, table.metadata_location
)
Expand Down Expand Up @@ -687,6 +693,12 @@ def _create_iceberg_namespace_aspects(
yield dpi
yield self._create_browse_paths_aspect(dpi.instance)

# Support for default_domain config option for containers
if self.config.default_domain:
from datahub.metadata.schema_classes import DomainsClass

yield DomainsClass(domains=[self.config.default_domain])


class ToAvroSchemaIcebergVisitor(SchemaVisitorPerPrimitiveType[Dict[str, Any]]):
"""Implementation of a visitor to build an Avro schema as a dictionary from an Iceberg schema."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin)
processing_threads: int = Field(
default=1, description="How many threads will be processing tables"
)
default_domain: Optional[str] = Field(
default=None,
description="Optional domain URN to associate with all ingested entities (tables, namespaces). "
"If specified, enables domain-scoped permission checks on the backend. "
"Example: 'urn:li:domain:engineering'",
)

@field_validator("catalog", mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.metadata.aspect.validation;

import static com.linkedin.metadata.Constants.APP_SOURCE;
import static com.linkedin.metadata.Constants.DOMAINS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.EDITABLE_SCHEMA_METADATA_ASPECT_NAME;
import static com.linkedin.metadata.Constants.GLOBAL_TAGS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.SCHEMA_METADATA_ASPECT_NAME;
Expand All @@ -15,6 +16,7 @@
import com.linkedin.common.TagAssociationArray;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.GetMode;
import com.linkedin.domain.Domains;
import com.linkedin.entity.Aspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.AspectRetriever;
Expand All @@ -35,6 +37,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -54,22 +57,26 @@
@Accessors(chain = true)
public class PrivilegeConstraintsValidator extends AspectPayloadValidator {
@Nonnull private AspectPluginConfig config;
private boolean domainBasedAuthorizationEnabled = false;

@Override
protected Stream<AspectValidationException> validateProposedAspectsWithAuth(
@Nonnull Collection<? extends BatchItem> mcpItems,
protected Stream<AspectValidationException> validateProposedAspectsWithOriginalBatch(
@Nonnull Collection<? extends BatchItem> filteredBatch,
@Nonnull Collection<? extends BatchItem> originalBatch,
@Nonnull RetrieverContext retrieverContext,
@Nullable AuthorizationSession session) {
ValidationExceptionCollection exceptions = ValidationExceptionCollection.newCollection();

if (session == null) {
exceptions.addException(
mcpItems.stream().findFirst().orElseThrow(IllegalStateException::new),
filteredBatch.stream().findFirst().orElseThrow(IllegalStateException::new),
"No authentication details found, cannot authorize change.");
return exceptions.streamAllExceptions();
}

for (BatchItem item : mcpItems) {
// Process only the filtered batch (aspects this validator should handle)
// Use originalBatch for cross-aspect lookups (e.g., finding domains)
for (BatchItem item : filteredBatch) {
if (item.getSystemMetadata() != null
&& item.getSystemMetadata().getProperties() != null
&& UI_SOURCE.equals(item.getSystemMetadata().getProperties().get(APP_SOURCE))) {
Expand All @@ -82,6 +89,7 @@ protected Stream<AspectValidationException> validateProposedAspectsWithAuth(
validateGlobalTags(
session,
item,
originalBatch, // Use originalBatch for domain lookups
aspectRetriever,
aspectRetriever.getLatestAspectObject(item.getUrn(), GLOBAL_TAGS_ASPECT_NAME))
.forEach(exceptions::addException);
Expand All @@ -90,6 +98,7 @@ protected Stream<AspectValidationException> validateProposedAspectsWithAuth(
validateSchemaMetadata(
session,
item,
originalBatch, // Use originalBatch for domain lookups
aspectRetriever,
aspectRetriever.getLatestAspectObject(item.getUrn(), SCHEMA_METADATA_ASPECT_NAME))
.forEach(exceptions::addException);
Expand All @@ -98,6 +107,7 @@ protected Stream<AspectValidationException> validateProposedAspectsWithAuth(
validateEditableSchemaMetadata(
session,
item,
originalBatch, // Use originalBatch for domain lookups
aspectRetriever,
aspectRetriever.getLatestAspectObject(
item.getUrn(), EDITABLE_SCHEMA_METADATA_ASPECT_NAME))
Expand All @@ -114,6 +124,7 @@ protected Stream<AspectValidationException> validateProposedAspectsWithAuth(
private List<AspectValidationException> validateGlobalTags(
AuthorizationSession session,
BatchItem item,
Collection<? extends BatchItem> allBatchItems,
AspectRetriever aspectRetriever,
@Nullable Aspect currentTagsAspect) {
GlobalTags newTags;
Expand All @@ -136,11 +147,22 @@ private List<AspectValidationException> validateGlobalTags(
}
if (newTags != null) {
Set<Urn> tagDifference = extractTagDifference(newTags, currentTags);

// Combine tags + domains (if enabled) as subResources for authorization check
Set<Urn> subResources = new HashSet<>(tagDifference);

// Only collect domain information if domain-based authorization is enabled
if (domainBasedAuthorizationEnabled) {
Set<Urn> domainUrns =
getEntityDomainsFromBatchOrDB(item.getUrn(), allBatchItems, aspectRetriever);
subResources.addAll(domainUrns);
}

if (!AuthUtil.isAPIAuthorizedEntityUrnsWithSubResources(
session,
ApiOperation.fromChangeType(item.getChangeType()),
List.of(item.getUrn()),
tagDifference)) {
subResources)) {
return List.of(
AspectValidationException.forItem(
item, "Unauthorized to modify one or more tag Urns: " + tagDifference));
Expand Down Expand Up @@ -181,6 +203,7 @@ private Set<Urn> extractTagDifference(GlobalTags newTags, @Nullable GlobalTags c
private List<AspectValidationException> validateSchemaMetadata(
AuthorizationSession session,
BatchItem item,
Collection<? extends BatchItem> allBatchItems,
AspectRetriever aspectRetriever,
@Nullable Aspect currentSchemaAspect) {
SchemaMetadata schemaMetadata;
Expand Down Expand Up @@ -222,11 +245,22 @@ private List<AspectValidationException> validateSchemaMetadata(
existingTagsMap.get(schemaField.getFieldPath())))
.flatMap(Set::stream)
.collect(Collectors.toSet());

// Combine tags + domains (if enabled) as subResources for authorization check
Set<Urn> subResources = new HashSet<>(tagDifference);

// Only collect domain information if domain-based authorization is enabled
if (domainBasedAuthorizationEnabled) {
Set<Urn> domainUrns =
getEntityDomainsFromBatchOrDB(item.getUrn(), allBatchItems, aspectRetriever);
subResources.addAll(domainUrns);
}

if (!AuthUtil.isAPIAuthorizedEntityUrnsWithSubResources(
session,
ApiOperation.fromChangeType(item.getChangeType()),
List.of(item.getUrn()),
tagDifference)) {
subResources)) {
return List.of(
AspectValidationException.forItem(
item, "Unauthorized to modify one or more tag Urns: " + tagDifference));
Expand All @@ -238,6 +272,7 @@ private List<AspectValidationException> validateSchemaMetadata(
private List<AspectValidationException> validateEditableSchemaMetadata(
AuthorizationSession session,
BatchItem item,
Collection<? extends BatchItem> allBatchItems,
AspectRetriever aspectRetriever,
@Nullable Aspect currentSchemaAspect) {
EditableSchemaMetadata editableSchemaMetadata;
Expand All @@ -261,9 +296,10 @@ private List<AspectValidationException> validateEditableSchemaMetadata(
} else {
editableSchemaMetadata = item.getAspect(EditableSchemaMetadata.class);
}
if (editableSchemaMetadata != null) {
if (editableSchemaMetadata != null
&& editableSchemaMetadata.getEditableSchemaFieldInfo() != null) {
final Map<String, GlobalTags> existingTagsMap = new HashMap<>();
if (currentSchema != null) {
if (currentSchema != null && currentSchema.getEditableSchemaFieldInfo() != null) {
existingTagsMap.putAll(
currentSchema.getEditableSchemaFieldInfo().stream()
.collect(
Expand All @@ -282,19 +318,90 @@ private List<AspectValidationException> validateEditableSchemaMetadata(
existingTagsMap.get(schemaField.getFieldPath())))
.flatMap(Set::stream)
.collect(Collectors.toSet());

// Combine tags + domains (if enabled) as subResources for authorization check
Set<Urn> subResources = new HashSet<>(tagDifference);

// Only collect domain information if domain-based authorization is enabled
if (domainBasedAuthorizationEnabled) {
Set<Urn> domainUrns =
getEntityDomainsFromBatchOrDB(item.getUrn(), allBatchItems, aspectRetriever);
subResources.addAll(domainUrns);
}

if (!AuthUtil.isAPIAuthorizedEntityUrnsWithSubResources(
session,
ApiOperation.fromChangeType(item.getChangeType()),
List.of(item.getUrn()),
tagDifference)) {
subResources)) {
return List.of(
AspectValidationException.forItem(
item, "Unauthorized to modify one or more tag Urns: " + tagDifference));
item,
"Unauthorized to modify editable schema field tags on entity: " + item.getUrn()));
}
}
return Collections.emptyList();
}

/**
* Get the domain URNs for an entity to include as subResources in authorization checks. This
* allows policies to filter based on both entity domains AND other subResources like tags.
*
* <p>The PolicyEngine is designed to handle heterogeneous subResources and has special logic to
* extract domain information from the entire subResource collection when evaluating DOMAIN field
* criteria.
*/
private Set<Urn> getEntityDomains(Urn entityUrn, AspectRetriever aspectRetriever) {
try {
Aspect domainsAspect = aspectRetriever.getLatestAspectObject(entityUrn, DOMAINS_ASPECT_NAME);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the read which is not protected by a transaction and subject to caching.

if (domainsAspect != null) {
Domains domains = RecordUtils.toRecordTemplate(Domains.class, domainsAspect.data());
return new HashSet<>(domains.getDomains());
}
} catch (Exception e) {
log.warn("Failed to retrieve domains for entity {}: {}", entityUrn, e.getMessage());
}
return Collections.emptySet();
}

/**
* Get domain URNs for an entity using UNION strategy.
*
* <p>UNION-BASED AUTHORIZATION: we must check permissions against ALL domains: - Existing domains
* (from entity) and current domains - New domains (from batch)
*
* @param entityUrn the entity URN to get domains for
* @param allBatchItems all items in the current batch
* @param aspectRetriever for DB lookup of existing domains
* @return set of domain URNs (union of existing + batch domains)
*/
private Set<Urn> getEntityDomainsFromBatchOrDB(
Urn entityUrn,
Collection<? extends BatchItem> allBatchItems,
AspectRetriever aspectRetriever) {

Set<Urn> domainUnion = new HashSet<>(getEntityDomains(entityUrn, aspectRetriever));

// Add domains from batch if entity is being updated
allBatchItems.stream()
.filter(
item ->
entityUrn.equals(item.getUrn()) && DOMAINS_ASPECT_NAME.equals(item.getAspectName()))
.forEach(
item -> {
try {
Domains domains = item.getAspect(Domains.class);
if (domains != null && domains.getDomains() != null) {
domainUnion.addAll(domains.getDomains());
}
} catch (Exception e) {
log.warn("Failed to extract domains from batch item: {}", e.getMessage());
}
});

return domainUnion;
}

@Override
protected Stream<AspectValidationException> validateProposedAspects(
@Nonnull Collection<? extends BatchItem> changeMCPs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,10 @@ public class DefaultAuthorizerConfiguration {

/** The duration between policies cache refreshes. */
private int cacheRefreshIntervalSecs;

/**
* Whether domain-based authorization is enabled. When enabled, policies can filter by entity
* domains.
*/
private boolean domainBasedAuthorizationEnabled;
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,29 @@ public DataHubAuthorizer getDefaultAuthorizer() {
return (DataHubAuthorizer) defaultAuthorizer;
}

/**
* Checks if domain-based authorization is enabled by inspecting the authorizer instance. Handles
* both direct DataHubAuthorizer and AuthorizerChain cases.
*
* @param authorizer the authorizer instance to check (can be null)
* @return true if domain-based authorization is enabled, false otherwise
*/
public static boolean isDomainBasedAuthorizationEnabled(@Nullable Authorizer authorizer) {
// If authorizer is null (authorization disabled), domain-based auth is not enabled
if (authorizer == null) {
return false;
}

// Check if authorizer is an AuthorizerChain and get the default DataHubAuthorizer
if (authorizer instanceof AuthorizerChain) {
DataHubAuthorizer defaultAuthorizer = ((AuthorizerChain) authorizer).getDefaultAuthorizer();
return defaultAuthorizer != null && defaultAuthorizer.isDomainBasedAuthorizationEnabled();
}
// Fallback to direct instance check
return authorizer instanceof DataHubAuthorizer
&& ((DataHubAuthorizer) authorizer).isDomainBasedAuthorizationEnabled();
}

@Override
public Set<DataHubPolicyInfo> getActorPolicies(@Nonnull Urn actorUrn) {
return authorizers.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public enum AuthorizationMode {
private EntitySpecResolver entitySpecResolver;
private AuthorizationMode mode;
@Getter private final OperationContext systemOpContext;
@Getter private final boolean domainBasedAuthorizationEnabled;

public static final String ALL = "ALL";

Expand All @@ -76,10 +77,13 @@ public DataHubAuthorizer(
final int delayIntervalSeconds,
final int refreshIntervalSeconds,
final AuthorizationMode mode,
final int policyFetchSize) {
final int policyFetchSize,
final boolean domainBasedAuthorizationEnabled) {
this.systemOpContext = systemOpContext;
this.mode = Objects.requireNonNull(mode);
policyEngine = new PolicyEngine(Objects.requireNonNull(entityClient));
this.domainBasedAuthorizationEnabled = domainBasedAuthorizationEnabled;
policyEngine =
new PolicyEngine(Objects.requireNonNull(entityClient), domainBasedAuthorizationEnabled);
if (refreshIntervalSeconds > 0) {
policyRefreshRunnable =
new PolicyRefreshRunnable(
Expand Down
Loading