Skip to content

Commit cb38102

Browse files
committed
Merge branch '3.4' of https://github.com/apache/kafka into 3.4
2 parents 92ebf0d + 16d08e9 commit cb38102

File tree

5 files changed

+61
-11
lines changed

5 files changed

+61
-11
lines changed

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ protected void finalOffsetCommit(boolean failed) {
233233
@Override
234234
public void removeMetrics() {
235235
Utils.closeQuietly(transactionMetrics, "source task transaction metrics tracker");
236+
super.removeMetrics();
236237
}
237238

238239
@Override

connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ public void putTargetState(String connector, TargetState state) {
637637
byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V0, connectTargetState);
638638
log.debug("Writing target state {} for connector {}", state, connector);
639639
try {
640-
configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
640+
configLog.sendWithReceipt(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
641641
} catch (InterruptedException | ExecutionException | TimeoutException e) {
642642
log.error("Failed to write target state to Kafka", e);
643643
throw new ConnectException("Error writing target state to Kafka", e);
@@ -789,7 +789,7 @@ private void sendPrivileged(List<ProducerKeyValue> keyValues, Timer timer) throw
789789
if (!usesFencableWriter) {
790790
List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
791791
keyValues.forEach(
792-
keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value))
792+
keyValue -> producerFutures.add(configLog.sendWithReceipt(keyValue.key, keyValue.value))
793793
);
794794

795795
timer.update();

connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@
7777
* calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked
7878
* and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required.
7979
* </p>
80+
* <p>
81+
* This is a useful utility that has been used outside of Connect. This isn't in Connect's public API,
82+
* but we've tried to maintain the method signatures and backward compatibility since early Kafka versions.
83+
* </p>
8084
*/
8185
public class KafkaBasedLog<K, V> {
8286
private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
@@ -351,6 +355,31 @@ public Future<Void> readToEnd() {
351355
return future;
352356
}
353357

358+
/**
359+
* Send a record asynchronously to the configured {@link #topic} without using a producer callback.
360+
* <p>
361+
* This method exists for backward compatibility reasons and delegates to the newer
362+
* {@link #sendWithReceipt(Object, Object)} method that returns a future.
363+
* @param key the key for the {@link ProducerRecord}
364+
* @param value the value for the {@link ProducerRecord}
365+
*/
366+
public void send(K key, V value) {
367+
sendWithReceipt(key, value);
368+
}
369+
370+
/**
371+
* Send a record asynchronously to the configured {@link #topic}.
372+
* <p>
373+
* This method exists for backward compatibility reasons and delegates to the newer
374+
* {@link #sendWithReceipt(Object, Object, org.apache.kafka.clients.producer.Callback)} method that returns a future.
375+
* @param key the key for the {@link ProducerRecord}
376+
* @param value the value for the {@link ProducerRecord}
377+
* @param callback the callback to invoke after completion; can be null if no callback is desired
378+
*/
379+
public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
380+
sendWithReceipt(key, value, callback);
381+
}
382+
354383
/**
355384
* Send a record asynchronously to the configured {@link #topic} without using a producer callback.
356385
* @param key the key for the {@link ProducerRecord}
@@ -359,20 +388,20 @@ public Future<Void> readToEnd() {
359388
* @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
360389
* future if synchronous behavior is desired.
361390
*/
362-
public Future<RecordMetadata> send(K key, V value) {
363-
return send(key, value, null);
391+
public Future<RecordMetadata> sendWithReceipt(K key, V value) {
392+
return sendWithReceipt(key, value, null);
364393
}
365394

366395
/**
367-
* Send a record asynchronously to the configured {@link #topic}
396+
* Send a record asynchronously to the configured {@link #topic}.
368397
* @param key the key for the {@link ProducerRecord}
369398
* @param value the value for the {@link ProducerRecord}
370399
* @param callback the callback to invoke after completion; can be null if no callback is desired
371400
*
372401
* @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
373402
* future if synchronous behavior is desired.
374403
*/
375-
public Future<RecordMetadata> send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
404+
public Future<RecordMetadata> sendWithReceipt(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
376405
return producer.orElseThrow(() ->
377406
new IllegalStateException("This KafkaBasedLog was created in read-only mode and does not support write operations")
378407
).send(new ProducerRecord<>(topic, key, value), callback);

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.clients.producer.Producer;
2020
import org.apache.kafka.clients.producer.RecordMetadata;
2121
import org.apache.kafka.common.KafkaException;
22+
import org.apache.kafka.common.MetricName;
2223
import org.apache.kafka.common.TopicPartition;
2324
import org.apache.kafka.common.errors.InvalidTopicException;
2425
import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -82,7 +83,9 @@
8283
import java.util.concurrent.TimeoutException;
8384
import java.util.concurrent.atomic.AtomicReference;
8485
import java.util.function.Consumer;
86+
import java.util.stream.Collectors;
8587

88+
import static java.util.Collections.emptySet;
8689
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
8790
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
8891
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -291,6 +294,23 @@ private void createWorkerTask(TargetState initialState, Converter keyConverter,
291294
sourceConfig, Runnable::run, preProducerCheck, postProducerCheck);
292295
}
293296

297+
@Test
298+
public void testRemoveMetrics() {
299+
createWorkerTask();
300+
301+
workerTask.removeMetrics();
302+
303+
assertEquals(emptySet(), filterToTaskMetrics(metrics.metrics().metrics().keySet()));
304+
}
305+
306+
private Set<MetricName> filterToTaskMetrics(Set<MetricName> metricNames) {
307+
return metricNames
308+
.stream()
309+
.filter(m -> metrics.registry().taskGroupName().equals(m.group())
310+
|| metrics.registry().sourceTaskGroupName().equals(m.group()))
311+
.collect(Collectors.toSet());
312+
}
313+
294314
@Test
295315
public void testStartPaused() throws Exception {
296316
createWorkerTask(TargetState.PAUSED);

connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ public void testPutConnectorConfigProducerError() throws Exception {
344344

345345
expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
346346

347-
storeLog.send(EasyMock.anyObject(), EasyMock.anyObject());
347+
storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.anyObject());
348348
EasyMock.expectLastCall().andReturn(producerFuture);
349349

350350
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
@@ -379,13 +379,13 @@ public void testRemoveConnectorConfigSlowProducer() throws Exception {
379379
@SuppressWarnings("unchecked")
380380
Future<RecordMetadata> connectorConfigProducerFuture = PowerMock.createMock(Future.class);
381381
// tombstone for the connector config
382-
storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
382+
storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
383383
EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture);
384384

385385
@SuppressWarnings("unchecked")
386386
Future<RecordMetadata> targetStateProducerFuture = PowerMock.createMock(Future.class);
387387
// tombstone for the connector target state
388-
storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
388+
storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
389389
EasyMock.expectLastCall().andReturn(targetStateProducerFuture);
390390

391391
connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), EasyMock.anyObject());
@@ -460,7 +460,7 @@ public void testWritePrivileges() throws Exception {
460460

461461
// In the meantime, write a target state (which doesn't require write privileges)
462462
expectConvert(KafkaConfigBackingStore.TARGET_STATE_V0, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
463-
storeLog.send("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
463+
storeLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
464464
EasyMock.expectLastCall().andReturn(producerFuture);
465465
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
466466
EasyMock.expectLastCall().andReturn(null);
@@ -1601,7 +1601,7 @@ private void expectConvertWriteRead(final String configKey, final Schema valueSc
16011601
EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
16021602
.andReturn(serialized);
16031603

1604-
storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
1604+
storeLog.sendWithReceipt(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
16051605
EasyMock.expectLastCall().andReturn(producerFuture);
16061606

16071607
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());

0 commit comments

Comments
 (0)