Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.system_telemetry.OpenTelemetryBaseFactory;
import com.linkedin.metadata.event.GenericProducer;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import io.datahubproject.metadata.context.SystemTelemetryContext;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
Expand All @@ -24,8 +24,8 @@ protected String getApplicationComponent() {
protected SystemTelemetryContext traceContext(
MetricUtils metricUtils,
ConfigurationProvider configurationProvider,
@Autowired(required = false) @Qualifier("dataHubUsageProducer") @Nullable
Producer<String, String> dueProducer) {
@Autowired(required = false) @Qualifier("dataHubUsageEventProducer") @Nullable
GenericProducer<String> dueProducer) {
return super.traceContext(metricUtils, configurationProvider, dueProducer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.linkedin.datahub.upgrade.shared.ElasticSearchUpgradeUtils;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.BuildIndicesStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.service.UpdateIndicesService;
Expand Down Expand Up @@ -36,7 +35,6 @@ public class LoadIndices implements Upgrade {

public LoadIndices(
@Nullable final Database server,
final EntityService<?> entityService,
final UpdateIndicesService updateIndicesService,
@Nullable final LoadIndicesIndexManager indexManager,
@Nullable final SystemMetadataService systemMetadataService,
Expand All @@ -48,7 +46,6 @@ public LoadIndices(
_steps =
buildSteps(
server,
entityService,
updateIndicesService,
indexManager,
systemMetadataService,
Expand All @@ -73,7 +70,6 @@ public List<UpgradeStep> steps() {

private List<UpgradeStep> buildSteps(
final Database server,
final EntityService<?> entityService,
final UpdateIndicesService updateIndicesService,
final LoadIndicesIndexManager indexManager,
final SystemMetadataService systemMetadataService,
Expand All @@ -99,7 +95,7 @@ private List<UpgradeStep> buildSteps(
steps.add(new BuildIndicesStep(indexedServices, structuredProperties));
}

steps.add(new LoadIndicesStep(server, entityService, updateIndicesService, indexManager));
steps.add(new LoadIndicesStep(server, updateIndicesService, indexManager));
return steps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.config.EbeanConfiguration;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.EbeanAspectDao;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
Expand Down Expand Up @@ -41,17 +40,14 @@
public class LoadIndicesStep implements UpgradeStep {

private final Database server;
private final EntityService<?> entityService;
private final UpdateIndicesService updateIndicesService;
private final LoadIndicesIndexManager indexManager;

public LoadIndicesStep(
final Database server,
final EntityService<?> entityService,
final UpdateIndicesService updateIndicesService,
final LoadIndicesIndexManager indexManager) {
this.server = server;
this.entityService = entityService;
this.updateIndicesService = updateIndicesService;
this.indexManager = indexManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.service.UpdateIndicesService;
Expand Down Expand Up @@ -59,7 +58,6 @@ public LoadIndicesIndexManager createIndexManager(
@Nonnull
public LoadIndices createInstance(
final Database ebeanServer,
final EntityService<?> entityService,
final UpdateIndicesService updateIndicesService,
@Qualifier("loadIndicesIndexManager") final LoadIndicesIndexManager indexManager,
final SystemMetadataService systemMetadataService,
Expand All @@ -69,7 +67,6 @@ public LoadIndices createInstance(
final AspectDao aspectDao) {
return new LoadIndices(
ebeanServer,
entityService,
updateIndicesService,
indexManager,
systemMetadataService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.EbeanTestUtils;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.models.AspectSpec;
Expand Down Expand Up @@ -46,7 +45,6 @@ public class LoadIndicesStepTest {

private LoadIndicesStep loadIndicesStep;
private Database database;
@Mock private EntityService<?> mockEntityService;
@Mock private UpdateIndicesService mockUpdateIndicesService;
@Mock private LoadIndicesIndexManager mockIndexManager;
@Mock private UpgradeContext mockUpgradeContext;
Expand All @@ -67,9 +65,7 @@ public void setup() {
// Setup test database with some sample data
setupTestDatabase();

loadIndicesStep =
new LoadIndicesStep(
database, mockEntityService, mockUpdateIndicesService, mockIndexManager);
loadIndicesStep = new LoadIndicesStep(database, mockUpdateIndicesService, mockIndexManager);

when(mockUpgradeContext.report()).thenReturn(mockUpgradeReport);
when(mockUpgradeContext.opContext()).thenReturn(mockOperationContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.BuildIndicesStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.service.UpdateIndicesService;
Expand All @@ -25,7 +24,6 @@ public class LoadIndicesTest {

@Mock private OperationContext mockOperationContext;
@Mock private Database mockDatabase;
@Mock private EntityService<?> mockEntityService;
@Mock private UpdateIndicesService mockUpdateIndicesService;
@Mock private LoadIndicesIndexManager mockIndexManager;
@Mock private SystemMetadataService mockSystemMetadataService;
Expand All @@ -42,7 +40,6 @@ public void setUp() {
loadIndices =
new LoadIndices(
mockDatabase,
mockEntityService,
mockUpdateIndicesService,
mockIndexManager,
mockSystemMetadataService,
Expand Down Expand Up @@ -89,7 +86,7 @@ public void testLoadIndicesCleanupSteps() {
public void testLoadIndicesWithNullDependencies() {
// Test constructor with null dependencies (graceful degradation)
LoadIndices loadIndicesWithoutDeps =
new LoadIndices(null, null, null, null, null, null, null, null, null);
new LoadIndices(null, null, null, null, null, null, null, null);
assertNotNull(loadIndicesWithoutDeps);
assertEquals("LoadIndices", loadIndicesWithoutDeps.id());
// When server or indexManager is null, should return empty steps list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testLoadIndicesConfigClass() {
@Test
public void testLoadIndicesClass() {
// Test that the LoadIndices class can be instantiated with null dependencies
LoadIndices loadIndices = new LoadIndices(null, null, null, null, null, null, null, null, null);
LoadIndices loadIndices = new LoadIndices(null, null, null, null, null, null, null, null);
assertNotNull(loadIndices);
assertEquals(loadIndices.id(), "LoadIndices");
assertNotNull(loadIndices.steps());
Expand Down
4 changes: 4 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,10 @@ public class Constants {
public static final String MDC_ENTITY_TYPE = "entityType";
public static final String MDC_CHANGE_TYPE = "changeType";

// Log messages
public static final String READ_ONLY_LOG =
"DataHub is currently in read only mode and this write will be dropped.";

public static final String RESTLI_SUCCESS = "success";

// Wildcard entity urn, allows auth on unspecified subresources. Avoids issues with
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.linkedin.metadata.dao.producer;

import static com.linkedin.metadata.Constants.READ_ONLY_LOG;

import com.linkedin.metadata.event.GenericProducer;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

@Slf4j
public class GenericProducerImpl<T> implements GenericProducer<T> {
private final Producer<String, T> producer;
private final KafkaHealthChecker kafkaHealthChecker;
private final MetricUtils metricUtils;
private boolean canWrite = true;

public GenericProducerImpl(
Producer<String, T> producer,
KafkaHealthChecker kafkaHealthChecker,
MetricUtils metricUtils) {
this.producer = producer;
this.kafkaHealthChecker = kafkaHealthChecker;
this.metricUtils = metricUtils;
}

@Override
public void setWritable(boolean writable) {
canWrite = writable;
}

@Override
public Future<?> send(ProducerRecord<String, T> producerRecord, @Nullable Callback callback) {
if (!canWrite) {
log.warn(READ_ONLY_LOG);
return CompletableFuture.completedFuture(Optional.empty());
}
Callback finalCallback;
if (callback == null) {
finalCallback =
kafkaHealthChecker.getKafkaCallBack(
metricUtils,
"GENERIC",
producerRecord.key() != null ? producerRecord.key() : StringUtils.EMPTY);
} else {
finalCallback = callback;
}
return producer.send(producerRecord, finalCallback);
}

@Override
public void flush() {
producer.flush();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.metadata.dao.producer;

import static com.linkedin.metadata.Constants.READ_ONLY_LOG;

import com.datahub.util.exception.ModelConversionException;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.EventUtils;
Expand All @@ -16,6 +18,7 @@
import io.datahubproject.metadata.context.OperationContext;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought there was another producer besides this one?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's the one for usage and platform events which should still be relevant even in read mode to capture when someone is making read calls. Unless we think read-only should not produce any analytics about its usage.

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.io.IOException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
Expand All @@ -40,6 +43,7 @@ public class KafkaEventProducer extends EventProducer {
private final TopicConvention _topicConvention;
private final KafkaHealthChecker _kafkaHealthChecker;
private final MetricUtils metricUtils;
private boolean canWrite = true;

@Override
public void flush() {
Expand All @@ -64,12 +68,20 @@ public KafkaEventProducer(
this.metricUtils = metricUtils;
}

public void setWritable(boolean writable) {
canWrite = writable;
}

@Override
@WithSpan
public Future<?> produceMetadataChangeLog(
@Nonnull final Urn urn,
@Nonnull AspectSpec aspectSpec,
@Nonnull final MetadataChangeLog metadataChangeLog) {
if (!canWrite) {
log.warn(READ_ONLY_LOG);
return CompletableFuture.completedFuture(Optional.empty());
}
GenericRecord record;
try {
log.debug(
Expand Down Expand Up @@ -101,6 +113,10 @@ public String getMetadataChangeLogTopicName(@Nonnull AspectSpec aspectSpec) {
@WithSpan
public Future<?> produceMetadataChangeProposal(
@Nonnull final Urn urn, @Nonnull final MetadataChangeProposal metadataChangeProposal) {
if (!canWrite) {
log.warn(READ_ONLY_LOG);
return CompletableFuture.completedFuture(Optional.empty());
}
GenericRecord record;

try {
Expand Down Expand Up @@ -131,6 +147,10 @@ public Future<?> produceFailedMetadataChangeProposalAsync(
@Nonnull OperationContext opContext,
@Nonnull MetadataChangeProposal mcp,
@Nonnull Set<Throwable> throwables) {
if (!canWrite) {
log.warn(READ_ONLY_LOG);
return CompletableFuture.completedFuture(Optional.empty());
}

try {
String topic = _topicConvention.getFailedMetadataChangeProposalTopicName();
Expand Down Expand Up @@ -160,6 +180,10 @@ public Future<?> produceFailedMetadataChangeProposalAsync(
@Override
public Future<?> producePlatformEvent(
@Nonnull String name, @Nullable String key, @Nonnull PlatformEvent event) {
if (!canWrite) {
log.warn(READ_ONLY_LOG);
return CompletableFuture.completedFuture(Optional.empty());
}
GenericRecord record;
try {
log.debug(
Expand All @@ -183,6 +207,7 @@ public String getPlatformEventTopicName() {

@Override
public void produceDataHubUpgradeHistoryEvent(@Nonnull DataHubUpgradeHistoryEvent event) {
// We allow this to write even when in not writable mode to allow DH to start up
GenericRecord record;
try {
log.debug(String.format("Converting Pegasus Event to Avro Event\nEvent: %s", event));
Expand Down
Loading
Loading