From 1eeb7851ce0a469bd3a74438366c4cef5f627b5f Mon Sep 17 00:00:00 2001 From: Marcos Tischer Vallim Date: Thu, 31 Jul 2025 00:07:27 -0300 Subject: [PATCH 1/2] fix: race condition Signed-off-by: Marcos Tischer Vallim --- .../concurrent/RingBufferBlockingQueue.java | 163 +++++++++--------- .../lib/core/AbstractAmazonSnsConsumer.java | 19 +- .../lib/core/AbstractAmazonSnsProducer.java | 7 +- .../messaging/lib/core/ListenableFuture.java | 7 + .../lib/core/ListenableFutureImpl.java | 108 ++++++++++++ .../lib/core/ListenableFutureRegistry.java | 54 ------ .../RingBufferBlockingQueueTest.java | 82 ++++----- .../lib/core/ListenableFutureImplTest.java | 133 ++++++++++++++ .../core/ListenableFutureRegistryTest.java | 85 --------- .../messaging/lib/core/AmazonSnsConsumer.java | 12 +- .../messaging/lib/core/AmazonSnsProducer.java | 4 +- .../messaging/lib/core/AmazonSnsTemplate.java | 4 +- .../lib/core/AmazonSnsProducerAsyncTest.java | 23 ++- .../messaging/lib/core/AmazonSnsConsumer.java | 12 +- .../messaging/lib/core/AmazonSnsProducer.java | 4 +- .../messaging/lib/core/AmazonSnsTemplate.java | 4 +- .../lib/core/AmazonSnsProducerAsyncTest.java | 4 +- 17 files changed, 419 insertions(+), 306 deletions(-) create mode 100644 amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java delete mode 100644 amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistry.java create mode 100644 amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureImplTest.java delete mode 100644 amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java index cd617a1..1f34c5c 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueue.java @@ -29,186 +29,177 @@ import java.util.stream.IntStream; import lombok.Getter; +import lombok.Locked; import lombok.Setter; import lombok.SneakyThrows; public class RingBufferBlockingQueue extends AbstractQueue implements BlockingQueue { - + private static final int DEFAULT_CAPACITY = 2048; - + private final AtomicReferenceArray> buffer; - + private final int capacity; - + private final AtomicLong writeSequence = new AtomicLong(-1); - + private final AtomicLong readSequence = new AtomicLong(0); - + private final AtomicInteger size = new AtomicInteger(0); - + private final ReentrantLock reentrantLock; - + private final Condition waitingConsumer; - + private final Condition waitingProducer; - + public RingBufferBlockingQueue(final int capacity) { this.capacity = capacity; - this.buffer = new AtomicReferenceArray<>(capacity); - this.reentrantLock = new ReentrantLock(true); - this.waitingConsumer = this.reentrantLock.newCondition(); - this.waitingProducer = this.reentrantLock.newCondition(); - IntStream.range(0, capacity).forEach(idx -> this.buffer.set(idx, new Entry<>())); + buffer = new AtomicReferenceArray<>(capacity); + reentrantLock = new ReentrantLock(true); + waitingConsumer = reentrantLock.newCondition(); + waitingProducer = reentrantLock.newCondition(); + IntStream.range(0, capacity).forEach(idx -> buffer.set(idx, new Entry<>())); } - + public RingBufferBlockingQueue() { this(RingBufferBlockingQueue.DEFAULT_CAPACITY); } - + private long avoidSequenceOverflow(final long sequence) { return (sequence < Long.MAX_VALUE ? sequence : wrap(sequence)); } - + private int wrap(final long sequence) { - return Math.toIntExact(sequence % this.capacity); + return Math.toIntExact(sequence % capacity); } - + public int capacity() { - return this.capacity; + return capacity; } - + @Override public int size() { - return this.size.get(); + return size.get(); } - + @Override public boolean isEmpty() { - return this.size.get() == 0; + return size.get() == 0; } - + public boolean isFull() { - return this.size.get() >= this.capacity; + return size.get() >= capacity; } - + public long writeSequence() { - return this.writeSequence.get(); + return writeSequence.get(); } - + public long readSequence() { - return this.readSequence.get(); + return readSequence.get(); } - + @Override public E peek() { - return isEmpty() ? null : this.buffer.get(wrap(this.readSequence.get())).getValue(); + return isEmpty() ? null : buffer.get(wrap(readSequence.get())).getValue(); } - + @Override @SneakyThrows + @Locked("reentrantLock") public void put(final E element) { - try { - reentrantLock.lock(); - - while (isFull()) { - waitingProducer.await(); - } + while (isFull()) { + waitingProducer.await(); + } - final long prevWriteSeq = writeSequence.get(); - final long nextWriteSeq = avoidSequenceOverflow(prevWriteSeq) + 1; + final long prevWriteSeq = writeSequence.get(); + final long nextWriteSeq = avoidSequenceOverflow(prevWriteSeq) + 1; - buffer.get(wrap(nextWriteSeq)).setValue(element); + buffer.get(wrap(nextWriteSeq)).setValue(element); - writeSequence.compareAndSet(prevWriteSeq, nextWriteSeq); + writeSequence.compareAndSet(prevWriteSeq, nextWriteSeq); - size.incrementAndGet(); + size.incrementAndGet(); - waitingConsumer.signal(); - } finally { - reentrantLock.unlock(); - } + waitingConsumer.signal(); } - + @Override @SneakyThrows + @Locked("reentrantLock") public E take() { - try { - reentrantLock.lock(); - - while (isEmpty()) { - waitingConsumer.await(); - } - - final long prevReadSeq = readSequence.get(); - final long nextReadSeq = avoidSequenceOverflow(prevReadSeq) + 1; - - final E nextValue = buffer.get(wrap(prevReadSeq)).getValue(); - - buffer.get(wrap(prevReadSeq)).setValue(null); - - readSequence.compareAndSet(prevReadSeq, nextReadSeq); - - size.decrementAndGet(); - - waitingProducer.signal(); - - return nextValue; - } finally { - reentrantLock.unlock(); + while (isEmpty()) { + waitingConsumer.await(); } + + final long prevReadSeq = readSequence.get(); + final long nextReadSeq = avoidSequenceOverflow(prevReadSeq) + 1; + + final E nextValue = buffer.get(wrap(prevReadSeq)).getValue(); + + buffer.get(wrap(prevReadSeq)).setValue(null); + + readSequence.compareAndSet(prevReadSeq, nextReadSeq); + + size.decrementAndGet(); + + waitingProducer.signal(); + + return nextValue; } - + @Override public boolean offer(final E element) { throw new UnsupportedOperationException(); } - + @Override public boolean offer(final E element, final long timeout, final TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException(); } - + @Override public E poll() { throw new UnsupportedOperationException(); } - + @Override public E poll(final long timeout, final TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException(); } - + @Override public Iterator iterator() { throw new UnsupportedOperationException(); } - + @Override public boolean add(final E element) { throw new UnsupportedOperationException(); } - + @Override public int remainingCapacity() { throw new UnsupportedOperationException(); } - + @Override public int drainTo(final Collection collection) { throw new UnsupportedOperationException(); } - + @Override public int drainTo(final Collection collection, final int maxElements) { throw new UnsupportedOperationException(); } - + @Getter @Setter static class Entry { - + private E value; - + } - + } diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java index dac04c0..f056462 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsConsumer.java @@ -17,6 +17,7 @@ package com.amazon.sns.messaging.lib.core; import java.io.IOException; +import java.time.Duration; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -29,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; import java.util.function.BiFunction; import java.util.function.UnaryOperator; @@ -41,6 +43,8 @@ import com.amazon.sns.messaging.lib.core.RequestEntryInternalFactory.RequestEntryInternal; import com.amazon.sns.messaging.lib.model.PublishRequestBuilder; import com.amazon.sns.messaging.lib.model.RequestEntry; +import com.amazon.sns.messaging.lib.model.ResponseFailEntry; +import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; import com.fasterxml.jackson.databind.ObjectMapper; @@ -63,7 +67,7 @@ abstract class AbstractAmazonSnsConsumer implements Runnable { private final RequestEntryInternalFactory requestEntryInternalFactory; - protected final ConcurrentMap pendingRequests; + protected final ConcurrentMap> pendingRequests; private final BlockingQueue> topicRequests; @@ -75,7 +79,7 @@ protected AbstractAmazonSnsConsumer( final C amazonSnsClient, final TopicProperty topicProperty, final ObjectMapper objectMapper, - final ConcurrentMap pendingRequests, + final ConcurrentMap> pendingRequests, final BlockingQueue> topicRequests, final ExecutorService executorService, final UnaryOperator publishDecorator) { @@ -216,18 +220,11 @@ private Optional createBatch(final BlockingQueue> requests) { @SneakyThrows public CompletableFuture await() { return CompletableFuture.runAsync(() -> { - while ( - MapUtils.isNotEmpty(this.pendingRequests) || - CollectionUtils.isNotEmpty(this.topicRequests)) { - sleep(topicProperty.getLinger()); + while (MapUtils.isNotEmpty(this.pendingRequests) || CollectionUtils.isNotEmpty(this.topicRequests)) { + LockSupport.parkNanos(Duration.ofMillis(topicProperty.getLinger()).toNanos()); } }); } - @SneakyThrows - private static void sleep(final long millis) { - Thread.sleep(millis); - } - } // @formatter:on diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java index 1cac9e9..05af313 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/AbstractAmazonSnsProducer.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -40,7 +39,7 @@ abstract class AbstractAmazonSnsProducer { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAmazonSnsProducer.class); - private final ConcurrentMap pendingRequests; + private final ConcurrentMap> pendingRequests; private final BlockingQueue> topicRequests; @@ -48,7 +47,7 @@ abstract class AbstractAmazonSnsProducer { @SneakyThrows public ListenableFuture send(final RequestEntry requestEntry) { - return CompletableFuture.supplyAsync(() -> enqueueRequest(requestEntry), executorService).get(); + return enqueueRequest(requestEntry); } @SneakyThrows @@ -65,7 +64,7 @@ public void shutdown() { @SneakyThrows private ListenableFuture enqueueRequest(final RequestEntry requestEntry) { - final ListenableFutureRegistry trackPendingRequest = new ListenableFutureRegistry(); + final ListenableFuture trackPendingRequest = new ListenableFutureImpl(); pendingRequests.put(requestEntry.getId(), trackPendingRequest); topicRequests.put(requestEntry); return trackPendingRequest; diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java index 3da35a2..f5cdd9b 100644 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFuture.java @@ -18,6 +18,9 @@ import java.util.function.Consumer; +import com.amazon.sns.messaging.lib.model.ResponseFailEntry; +import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; + // @formatter:off public interface ListenableFuture { @@ -27,5 +30,9 @@ default void addCallback(final Consumer successCallback) { addCallback(successCallback, result -> { }); } + void success(final ResponseSuccessEntry entry); + + void fail(final ResponseFailEntry entry); + } // @formatter:on diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java new file mode 100644 index 0000000..c1f28e6 --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureImpl.java @@ -0,0 +1,108 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.core; + +import java.util.LinkedList; +import java.util.Objects; +import java.util.Queue; +import java.util.function.Consumer; + +import org.apache.commons.collections4.CollectionUtils; + +import com.amazon.sns.messaging.lib.model.ResponseFailEntry; +import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; + +import lombok.AccessLevel; +import lombok.Getter; + +// @formatter:off +class ListenableFutureImpl implements ListenableFuture { + + private final Object mutex = new Object(); + + @Getter(value = AccessLevel.PACKAGE) + private State state = State.NEW; + + @Getter(value = AccessLevel.PACKAGE) + private ResponseSuccessEntry successResult; + + @Getter(value = AccessLevel.PACKAGE) + private ResponseFailEntry failureResult; + + private final Queue> successCallback = new LinkedList<>(); + + private final Queue> failureCallback = new LinkedList<>(); + + @Override + public void addCallback(final Consumer successCallback, final Consumer failureCallback) { + synchronized (mutex) { + final Consumer success = Objects.nonNull(successCallback) ? successCallback : result -> { }; + final Consumer failure = Objects.nonNull(failureCallback) ? failureCallback : result -> { }; + + switch (state) { + case NEW: + this.successCallback.add(success); + this.failureCallback.add(failure); + break; + case SUCCESS: + notifySuccess(success); + break; + case FAILURE: + notifyFailure(failure); + break; + } + } + } + + @Override + public void success(final ResponseSuccessEntry entry) { + synchronized (mutex) { + state = State.SUCCESS; + successResult = entry; + + while (CollectionUtils.isNotEmpty(successCallback)) { + notifySuccess(successCallback.poll()); + } + } + } + + @Override + public void fail(final ResponseFailEntry entry) { + synchronized (mutex) { + state = State.FAILURE; + failureResult = entry; + + while (CollectionUtils.isNotEmpty(failureCallback)) { + notifyFailure(failureCallback.poll()); + } + } + } + + private void notifySuccess(final Consumer callback) { + callback.accept(successResult); + } + + private void notifyFailure(final Consumer callback) { + callback.accept(failureResult); + } + + enum State { + NEW, SUCCESS, FAILURE + } + +} +// @formatter:on diff --git a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistry.java b/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistry.java deleted file mode 100644 index 7e02fed..0000000 --- a/amazon-sns-java-messaging-lib-template/src/main/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistry.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.amazon.sns.messaging.lib.core; - -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; - -import com.amazon.sns.messaging.lib.model.ResponseFailEntry; -import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; - -// @formatter:off -class ListenableFutureRegistry implements ListenableFuture { - - private final CompletableFuture completableFutureSuccess = new CompletableFuture<>(); - - private final CompletableFuture completableFutureFailure = new CompletableFuture<>(); - - @Override - public void addCallback(final Consumer successCallback, final Consumer failureCallback) { - final Consumer internalSuccessCallback = Objects.nonNull(successCallback) ? successCallback : result -> { }; - final Consumer internalFailureCallback = Objects.nonNull(failureCallback) ? failureCallback : result -> { }; - completableFutureSuccess.whenComplete((result, throwable) -> internalSuccessCallback.accept(result)); - completableFutureFailure.whenComplete((result, throwable) -> internalFailureCallback.accept(result)); - } - - public void success(final ResponseSuccessEntry entry) { - completableFutureSuccess.complete(entry); - } - - public void fail(final ResponseFailEntry entry) { - completableFutureFailure.complete(entry); - } - - public CompletableFuture completable() { - return CompletableFuture.anyOf(completableFutureSuccess, completableFutureFailure); - } - -} -// @formatter:on diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java index d94171c..c686473 100644 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/concurrent/RingBufferBlockingQueueTest.java @@ -38,164 +38,164 @@ @SuppressWarnings({ "java:S2925", "java:S5778" }) class RingBufferBlockingQueueTest { - + @Test void testSuccess() { final ExecutorService producer = ExecutorsProvider.getExecutorService(); - + final ScheduledExecutorService consumer = Executors.newSingleThreadScheduledExecutor(ThreadFactoryProvider.getThreadFactory()); - + final List> requestEntriesOut = new LinkedList<>(); - + final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); - + producer.submit(() -> { IntStream.range(0, 100_000).forEach(value -> { ringBlockingQueue.put(RequestEntry.builder().withValue(value).build()); }); }); - + consumer.scheduleAtFixedRate(() -> { while (!ringBlockingQueue.isEmpty()) { final List> requestEntries = new LinkedList<>(); - + while ((requestEntries.size() < 10) && Objects.nonNull(ringBlockingQueue.peek())) { final RequestEntry take = ringBlockingQueue.take(); requestEntries.add(take); } - + requestEntriesOut.addAll(requestEntries); } }, 0, 100L, TimeUnit.MILLISECONDS); - + await().pollInterval(5, TimeUnit.SECONDS).pollDelay(200, TimeUnit.MILLISECONDS).until(() -> { return (ringBlockingQueue.writeSequence() == 99_999) && (ringBlockingQueue.readSequence() == 100_000); }); - + producer.shutdown(); consumer.shutdown(); - + assertThat(ringBlockingQueue.size(), is(0)); assertThat(ringBlockingQueue.isEmpty(), is(true)); - + assertThat(requestEntriesOut, hasSize(100_000)); requestEntriesOut.sort((a, b) -> a.getValue() - b.getValue()); - + for (int i = 0; i < 100_000; i++) { assertThat(requestEntriesOut.get(i).getValue(), is(i)); } } - + @Test void testSuccessWhenIsEmpty() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); - + final ExecutorService producer = ExecutorsProvider.getExecutorService(); - + final ExecutorService consumer = ExecutorsProvider.getExecutorService(); - + consumer.submit(() -> { assertThat(ringBlockingQueue.take().getValue(), is(0)); assertThat(ringBlockingQueue.take().getValue(), is(1)); }); - + await().pollDelay(2, TimeUnit.SECONDS).until(() -> true); - + producer.submit(() -> { ringBlockingQueue.put(RequestEntry.builder().withValue(0).build()); ringBlockingQueue.put(RequestEntry.builder().withValue(1).build()); }); - + await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.writeSequence() == 1); producer.shutdownNow(); - + await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.readSequence() == 2); consumer.shutdownNow(); - + assertThat(ringBlockingQueue.isEmpty(), is(true)); } - + @Test void testSuccessWhenIsFull() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(1); - + final ExecutorService producer = ExecutorsProvider.getExecutorService(); - + final ExecutorService consumer = ExecutorsProvider.getExecutorService(); - + producer.submit(() -> { ringBlockingQueue.put(RequestEntry.builder().withValue(0).build()); ringBlockingQueue.put(RequestEntry.builder().withValue(1).build()); }); - + await().pollDelay(2, TimeUnit.SECONDS).until(() -> true); - + consumer.submit(() -> { assertThat(ringBlockingQueue.take().getValue(), is(0)); assertThat(ringBlockingQueue.take().getValue(), is(1)); }); - + await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.writeSequence() == 1); producer.shutdownNow(); - + await().atMost(1, TimeUnit.MINUTES).until(() -> ringBlockingQueue.readSequence() == 2); consumer.shutdownNow(); - + assertThat(ringBlockingQueue.isEmpty(), is(true)); } - + @Test void testFailOffer() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.offer(RequestEntry.builder().withValue(0).build())); } - + @Test void testFailOfferWithParams() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.offer(RequestEntry.builder().withValue(0).build(), 1, TimeUnit.MILLISECONDS)); } - + @Test void testFailPoll() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, ringBlockingQueue::poll); } - + @Test void testFailPollWithParams() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.poll(1, TimeUnit.MILLISECONDS)); } - + @Test void testFailIterator() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, ringBlockingQueue::iterator); } - + @Test void testFailAdd() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.add(RequestEntry.builder().withValue(0).build())); } - + @Test void testFailRemainingCapacity() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, ringBlockingQueue::remainingCapacity); } - + @Test void testFailDrainTo() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.drainTo(Collections.emptyList())); } - + @Test void testFailDrainToWithParams() { final RingBufferBlockingQueue> ringBlockingQueue = new RingBufferBlockingQueue<>(); assertThrows(UnsupportedOperationException.class, () -> ringBlockingQueue.drainTo(Collections.emptyList(), 1)); } - + } diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureImplTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureImplTest.java new file mode 100644 index 0000000..e3c12de --- /dev/null +++ b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureImplTest.java @@ -0,0 +1,133 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazon.sns.messaging.lib.core; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; + +import java.util.function.Consumer; + +import org.junit.jupiter.api.Test; + +import com.amazon.sns.messaging.lib.core.ListenableFutureImpl.State; +import com.amazon.sns.messaging.lib.model.ResponseFailEntry; +import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; + +// @formatter:off +class ListenableFutureImplTest { + + @Test + void testSuccessWithCallbacksBefore() { + final Consumer successCallback = entry -> assertThat(entry, notNullValue()); + final Consumer failureCallback = entry -> assertThat(entry, notNullValue()); + + final ListenableFutureImpl listenableFutureRegistry = new ListenableFutureImpl(); + + listenableFutureRegistry.addCallback(successCallback, failureCallback); + + listenableFutureRegistry.success(mock(ResponseSuccessEntry.class)); + listenableFutureRegistry.fail(mock(ResponseFailEntry.class)); + } + + @Test + void testSuccessWithCallbacksAfter() { + final Consumer successCallback = entry -> assertThat(entry, notNullValue()); + final Consumer failureCallback = entry -> assertThat(entry, notNullValue()); + + final ListenableFutureImpl listenableFutureRegistry = new ListenableFutureImpl(); + + listenableFutureRegistry.success(mock(ResponseSuccessEntry.class)); + listenableFutureRegistry.fail(mock(ResponseFailEntry.class)); + + listenableFutureRegistry.addCallback(successCallback, failureCallback); + } + + @Test + void testSuccessWithCallbackSuccessBefore() { + final Consumer successCallback = entry -> assertThat(entry, notNullValue()); + + final ListenableFutureImpl listenableFutureRegistry = new ListenableFutureImpl(); + + listenableFutureRegistry.addCallback(successCallback, null); + + listenableFutureRegistry.success(mock(ResponseSuccessEntry.class)); + } + + @Test + void testSuccessWithCallbackSuccessAfter() { + final Consumer successCallback = entry -> assertThat(entry, notNullValue()); + + final ListenableFutureImpl listenableFutureRegistry = new ListenableFutureImpl(); + + listenableFutureRegistry.success(mock(ResponseSuccessEntry.class)); + + listenableFutureRegistry.addCallback(successCallback, null); + } + + @Test + void testSuccessWithCallbackFailBefore() { + final Consumer failureCallback = entry -> assertThat(entry, notNullValue()); + + final ListenableFutureImpl listenableFutureRegistry = new ListenableFutureImpl(); + + listenableFutureRegistry.addCallback(null, failureCallback); + + listenableFutureRegistry.fail(mock(ResponseFailEntry.class)); + } + + @Test + void testSuccessWithCallbackFailAfter() { + final Consumer failureCallback = entry -> assertThat(entry, notNullValue()); + + final ListenableFutureImpl listenableFutureRegistry = new ListenableFutureImpl(); + + listenableFutureRegistry.fail(mock(ResponseFailEntry.class)); + + listenableFutureRegistry.addCallback(null, failureCallback); + } + + @Test + void testSuccessWithoutCallbacksBefore() { + final ListenableFutureImpl listenableFutureRegistry = new ListenableFutureImpl(); + + listenableFutureRegistry.addCallback(null, null); + + listenableFutureRegistry.success(mock(ResponseSuccessEntry.class)); + + assertThat(listenableFutureRegistry.getState(), is(State.SUCCESS)); + assertThat(listenableFutureRegistry.getSuccessResult(), notNullValue()); + assertThat(listenableFutureRegistry.getFailureResult(), nullValue()); + } + + @Test + void testSuccessWithoutCallbacksAfter() { + final ListenableFutureImpl listenableFutureRegistry = new ListenableFutureImpl(); + + listenableFutureRegistry.fail(mock(ResponseFailEntry.class)); + + listenableFutureRegistry.addCallback(null, null); + + assertThat(listenableFutureRegistry.getState(), is(State.FAILURE)); + assertThat(listenableFutureRegistry.getFailureResult(), notNullValue()); + assertThat(listenableFutureRegistry.getSuccessResult(), nullValue()); + } + +} +// @formatter:on diff --git a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java b/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java deleted file mode 100644 index be0fcdc..0000000 --- a/amazon-sns-java-messaging-lib-template/src/test/java/com/amazon/sns/messaging/lib/core/ListenableFutureRegistryTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.amazon.sns.messaging.lib.core; - -import static org.hamcrest.CoreMatchers.notNullValue; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.mockito.Mockito.mock; - -import java.util.function.Consumer; - -import org.junit.jupiter.api.Test; - -import com.amazon.sns.messaging.lib.model.ResponseFailEntry; -import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; - -// @formatter:off -class ListenableFutureRegistryTest { - - @Test - void testSuccessWithCallbacks() { - final Consumer successCallback = entry -> assertThat(entry, notNullValue()); - final Consumer failureCallback = entry -> assertThat(entry, notNullValue()); - - final ListenableFutureRegistry listenableFutureRegistry = new ListenableFutureRegistry(); - - listenableFutureRegistry.addCallback(successCallback, failureCallback); - - listenableFutureRegistry.success(mock(ResponseSuccessEntry.class)); - listenableFutureRegistry.fail(mock(ResponseFailEntry.class)); - - assertThat(listenableFutureRegistry.completable(), notNullValue()); - } - - @Test - void testSuccessWithCallback() { - final Consumer successCallback = entry -> assertThat(entry, notNullValue()); - - final ListenableFutureRegistry listenableFutureRegistry = new ListenableFutureRegistry(); - - listenableFutureRegistry.addCallback(successCallback); - - listenableFutureRegistry.success(mock(ResponseSuccessEntry.class)); - - assertThat(listenableFutureRegistry.completable(), notNullValue()); - } - - @Test - void testSuccessWithouCallback() { - final ListenableFutureRegistry listenableFutureRegistry = new ListenableFutureRegistry(); - - listenableFutureRegistry.addCallback(null); - - listenableFutureRegistry.success(mock(ResponseSuccessEntry.class)); - - assertThat(listenableFutureRegistry.completable(), notNullValue()); - } - - @Test - void testSuccessWithouCallbacks() { - final ListenableFutureRegistry listenableFutureRegistry = new ListenableFutureRegistry(); - - listenableFutureRegistry.addCallback(null, null); - - listenableFutureRegistry.success(mock(ResponseSuccessEntry.class)); - listenableFutureRegistry.fail(mock(ResponseFailEntry.class)); - - assertThat(listenableFutureRegistry.completable(), notNullValue()); - } - -} -// @formatter:on diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java index 910c7f4..7e0ed41 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java @@ -52,7 +52,7 @@ public AmazonSnsConsumer( final AmazonSNS amazonSnsClient, final TopicProperty topicProperty, final ObjectMapper objectMapper, - final ConcurrentMap pendingRequests, + final ConcurrentMap> pendingRequests, final BlockingQueue> topicRequests, final ExecutorService executorService, final UnaryOperator publishDecorator) { @@ -73,7 +73,7 @@ protected BiFunction, PublishBatchRequest> su .withSubject(StringUtils.isNotBlank(entry.getSubject()) ? entry.getSubject() : null) .withMessageGroupId(StringUtils.isNotBlank(entry.getGroupId()) ? entry.getGroupId() : null) .withMessageDeduplicationId(StringUtils.isNotBlank(entry.getDeduplicationId()) ? entry.getDeduplicationId() : null) - .withMessageAttributes(messageAttributes.messageAttributes(entry.getMessageHeaders())) + .withMessageAttributes(AmazonSnsConsumer.messageAttributes.messageAttributes(entry.getMessageHeaders())) .withMessage(entry.getMessage())) .collect(Collectors.toList()); return new PublishBatchRequest().withPublishBatchRequestEntries(entries).withTopicArn(topicArn); @@ -85,10 +85,10 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final final String code = throwable instanceof AmazonServiceException ? AmazonServiceException.class.cast(throwable).getErrorCode() : "000"; final String message = throwable instanceof AmazonServiceException ? AmazonServiceException.class.cast(throwable).getErrorMessage() : throwable.getMessage(); - LOGGER.error(throwable.getMessage(), throwable); + AmazonSnsConsumer.LOGGER.error(throwable.getMessage(), throwable); publishBatchRequest.getPublishBatchRequestEntries().forEach(entry -> { - final ListenableFutureRegistry listenableFuture = pendingRequests.remove(entry.getId()); + final ListenableFuture listenableFuture = pendingRequests.remove(entry.getId()); listenableFuture.fail(ResponseFailEntry.builder() .withId(entry.getId()) .withCode(code) @@ -101,7 +101,7 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final @Override protected void handleResponse(final PublishBatchResult publishBatchResult) { publishBatchResult.getSuccessful().forEach(entry -> { - final ListenableFutureRegistry listenableFuture = pendingRequests.remove(entry.getId()); + final ListenableFuture listenableFuture = pendingRequests.remove(entry.getId()); listenableFuture.success(ResponseSuccessEntry.builder() .withId(entry.getId()) .withMessageId(entry.getMessageId()) @@ -110,7 +110,7 @@ protected void handleResponse(final PublishBatchResult publishBatchResult) { }); publishBatchResult.getFailed().forEach(entry -> { - final ListenableFutureRegistry listenableFuture = pendingRequests.remove(entry.getId()); + final ListenableFuture listenableFuture = pendingRequests.remove(entry.getId()); listenableFuture.fail(ResponseFailEntry.builder() .withId(entry.getId()) .withCode(entry.getCode()) diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java index 241bd53..eb61f94 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java @@ -21,12 +21,14 @@ import java.util.concurrent.ExecutorService; import com.amazon.sns.messaging.lib.model.RequestEntry; +import com.amazon.sns.messaging.lib.model.ResponseFailEntry; +import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; // @formatter:off class AmazonSnsProducer extends AbstractAmazonSnsProducer { public AmazonSnsProducer( - final ConcurrentMap pendingRequests, + final ConcurrentMap> pendingRequests, final BlockingQueue> topicRequests, final ExecutorService executorService) { super(pendingRequests, topicRequests, executorService); diff --git a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java index 5b10038..cd7430f 100644 --- a/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-v1/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java @@ -24,6 +24,8 @@ import com.amazon.sns.messaging.lib.concurrent.ExecutorsProvider; import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue; import com.amazon.sns.messaging.lib.model.RequestEntry; +import com.amazon.sns.messaging.lib.model.ResponseFailEntry; +import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; import com.amazonaws.services.sns.AmazonSNS; import com.amazonaws.services.sns.model.PublishBatchRequest; @@ -36,7 +38,7 @@ public class AmazonSnsTemplate extends AbstractAmazonSnsTemplate pendingRequests, + final ConcurrentMap> pendingRequests, final BlockingQueue> topicRequests, final ObjectMapper objectMapper, final UnaryOperator publishDecorator) { diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java index e4e8869..9f196c2 100644 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java +++ b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java @@ -27,11 +27,13 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.time.Duration; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; @@ -71,6 +73,7 @@ void before() { .maximumPoolSize(10) .topicArn("arn:aws:sns:us-east-2:000000000000:topic") .build(); + snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty, new RingBufferBlockingQueue<>(2048)); } @@ -136,10 +139,16 @@ void testSuccessMultipleEntry() { assertThat(result, notNullValue()); })); + final List> listenableFutures = new LinkedList<>(); + entries(30000).forEach(entry -> { - snsTemplate.send(entry).addCallback(successCallback); + listenableFutures.add(snsTemplate.send(entry)); }); + LockSupport.parkNanos(Duration.ofMillis(500).toNanos()); + + listenableFutures.forEach(listenableFuture -> listenableFuture.addCallback(successCallback)); + snsTemplate.await().thenAccept(result -> { verify(successCallback, timeout(300000).times(30000)).accept(any()); verify(amazonSNS, atLeastOnce()).publishBatch(any()); @@ -208,12 +217,12 @@ void testFailRiseAwsServiceException() { @Test void testSuccessBlockingSubmissionPolicy() { final TopicProperty topicProperty = TopicProperty.builder() - .fifo(false) - .linger(50L) - .maxBatchSize(1) - .maximumPoolSize(1) - .topicArn("arn:aws:sns:us-east-2:000000000000:topic") - .build(); + .fifo(false) + .linger(50L) + .maxBatchSize(1) + .maximumPoolSize(1) + .topicArn("arn:aws:sns:us-east-2:000000000000:topic") + .build(); snsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty); diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java index 515177f..4bc0d42 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsConsumer.java @@ -53,7 +53,7 @@ public AmazonSnsConsumer( final SnsClient amazonSnsClient, final TopicProperty topicProperty, final ObjectMapper objectMapper, - final ConcurrentMap pendingRequests, + final ConcurrentMap> pendingRequests, final BlockingQueue> topicRequests, final ExecutorService executorService, final UnaryOperator publishDecorator) { @@ -74,7 +74,7 @@ protected BiFunction, PublishBatchRequest> su .subject(StringUtils.isNotBlank(entry.getSubject()) ? entry.getSubject() : null) .messageGroupId(StringUtils.isNotBlank(entry.getGroupId()) ? entry.getGroupId() : null) .messageDeduplicationId(StringUtils.isNotBlank(entry.getDeduplicationId()) ? entry.getDeduplicationId() : null) - .messageAttributes(messageAttributes.messageAttributes(entry.getMessageHeaders())) + .messageAttributes(AmazonSnsConsumer.messageAttributes.messageAttributes(entry.getMessageHeaders())) .message(entry.getMessage()) .build()) .collect(Collectors.toList()); @@ -87,10 +87,10 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final final String code = throwable instanceof AwsServiceException ? AwsServiceException.class.cast(throwable).awsErrorDetails().errorCode() : "000"; final String message = throwable instanceof AwsServiceException ? AwsServiceException.class.cast(throwable).awsErrorDetails().errorMessage() : throwable.getMessage(); - LOGGER.error(throwable.getMessage(), throwable); + AmazonSnsConsumer.LOGGER.error(throwable.getMessage(), throwable); publishBatchRequest.publishBatchRequestEntries().forEach(entry -> { - final ListenableFutureRegistry listenableFuture = pendingRequests.remove(entry.id()); + final ListenableFuture listenableFuture = pendingRequests.remove(entry.id()); listenableFuture.fail(ResponseFailEntry.builder() .withId(entry.id()) .withCode(code) @@ -103,7 +103,7 @@ protected void handleError(final PublishBatchRequest publishBatchRequest, final @Override protected void handleResponse(final PublishBatchResponse publishBatchResult) { publishBatchResult.successful().forEach(entry -> { - final ListenableFutureRegistry listenableFuture = pendingRequests.remove(entry.id()); + final ListenableFuture listenableFuture = pendingRequests.remove(entry.id()); listenableFuture.success(ResponseSuccessEntry.builder() .withId(entry.id()) .withMessageId(entry.messageId()) @@ -112,7 +112,7 @@ protected void handleResponse(final PublishBatchResponse publishBatchResult) { }); publishBatchResult.failed().forEach(entry -> { - final ListenableFutureRegistry listenableFuture = pendingRequests.remove(entry.id()); + final ListenableFuture listenableFuture = pendingRequests.remove(entry.id()); listenableFuture.fail(ResponseFailEntry.builder() .withId(entry.id()) .withCode(entry.code()) diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java index 241bd53..eb61f94 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducer.java @@ -21,12 +21,14 @@ import java.util.concurrent.ExecutorService; import com.amazon.sns.messaging.lib.model.RequestEntry; +import com.amazon.sns.messaging.lib.model.ResponseFailEntry; +import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; // @formatter:off class AmazonSnsProducer extends AbstractAmazonSnsProducer { public AmazonSnsProducer( - final ConcurrentMap pendingRequests, + final ConcurrentMap> pendingRequests, final BlockingQueue> topicRequests, final ExecutorService executorService) { super(pendingRequests, topicRequests, executorService); diff --git a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java index 7466ab9..5d2707c 100644 --- a/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java +++ b/amazon-sns-java-messaging-lib-v2/src/main/java/com/amazon/sns/messaging/lib/core/AmazonSnsTemplate.java @@ -24,6 +24,8 @@ import com.amazon.sns.messaging.lib.concurrent.ExecutorsProvider; import com.amazon.sns.messaging.lib.concurrent.RingBufferBlockingQueue; import com.amazon.sns.messaging.lib.model.RequestEntry; +import com.amazon.sns.messaging.lib.model.ResponseFailEntry; +import com.amazon.sns.messaging.lib.model.ResponseSuccessEntry; import com.amazon.sns.messaging.lib.model.TopicProperty; import com.fasterxml.jackson.databind.ObjectMapper; @@ -37,7 +39,7 @@ public class AmazonSnsTemplate extends AbstractAmazonSnsTemplate pendingRequests, + final ConcurrentMap> pendingRequests, final BlockingQueue> topicRequests, final ObjectMapper objectMapper, final UnaryOperator publishDecorator) { diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java index 04e62ea..2d309d6 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java @@ -163,12 +163,12 @@ void testFailureMultipleEntry() { assertThat(result, notNullValue()); })); - entries(30000).forEach(entry -> { + entries(3000000).forEach(entry -> { snsTemplate.send(entry).addCallback(null, failureCallback); }); snsTemplate.await().thenAccept(result -> { - verify(failureCallback, timeout(300000).times(30000)).accept(any()); + verify(failureCallback, timeout(300000).times(3000000)).accept(any()); verify(amazonSNS, atLeastOnce()).publishBatch(any(PublishBatchRequest.class)); }).join(); } From 89d8d3df9a12f111dff463e20b2ccf19cf625ded Mon Sep 17 00:00:00 2001 From: Marcos Tischer Vallim Date: Thu, 31 Jul 2025 00:29:12 -0300 Subject: [PATCH 2/2] fix: race condition Signed-off-by: Marcos Tischer Vallim --- .../core/jmh/AmazonSnsTemplateBenchmark.java | 127 ------------------ .../jmh/AmazonSnsTemplateBenchmarkRunner.java | 25 ---- .../lib/core/AmazonSnsProducerAsyncTest.java | 4 +- 3 files changed, 2 insertions(+), 154 deletions(-) delete mode 100644 amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmark.java delete mode 100644 amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmarkRunner.java diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmark.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmark.java deleted file mode 100644 index 806192b..0000000 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmark.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright 2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.amazon.sns.messaging.lib.core.jmh; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OperationsPerInvocation; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.annotations.Warmup; - -import com.amazon.sns.messaging.lib.core.AmazonSnsTemplate; -import com.amazon.sns.messaging.lib.model.RequestEntry; -import com.amazon.sns.messaging.lib.model.TopicProperty; -import com.amazonaws.services.sns.AmazonSNS; -import com.amazonaws.services.sns.model.PublishBatchRequest; -import com.amazonaws.services.sns.model.PublishBatchResult; -import com.amazonaws.services.sns.model.PublishBatchResultEntry; - -// @formatter:off -@Fork(5) -@State(Scope.Benchmark) -@BenchmarkMode(Mode.Throughput) -@OutputTimeUnit(TimeUnit.SECONDS) -@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) -@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) -public class AmazonSnsTemplateBenchmark { - - private AmazonSnsTemplate amazonSnsTemplate; - - @Setup - public void setup() { - final AmazonSNS amazonSNS = mock(AmazonSNS.class); - - when(amazonSNS.publishBatch(any())).thenAnswer(invocation -> { - final PublishBatchRequest request = invocation.getArgument(0, PublishBatchRequest.class); - final List resultEntries = request.getPublishBatchRequestEntries().stream() - .map(entry -> new PublishBatchResultEntry().withId(entry.getId())) - .collect(Collectors.toList()); - return new PublishBatchResult().withSuccessful(resultEntries); - }); - - final TopicProperty topicProperty = TopicProperty.builder() - .fifo(false) - .linger(70) - .maxBatchSize(10) - .maximumPoolSize(512) - .topicArn("arn:aws:sns:us-east-2:000000000000:topic") - .build(); - - amazonSnsTemplate = new AmazonSnsTemplate<>(amazonSNS, topicProperty); - } - - @TearDown - public void tearDown() { - amazonSnsTemplate.await().join(); - amazonSnsTemplate.shutdown(); - } - - @Benchmark - @OperationsPerInvocation(1) - public void testSend_1() { - amazonSnsTemplate.send(RequestEntry.builder().withValue(1).build()); - } - - @Benchmark - @OperationsPerInvocation(10) - public void testSend_10() { - for (int i = 0; i < 10; i++) { - amazonSnsTemplate.send(RequestEntry.builder().withValue(i).build()); - } - } - - @Benchmark - @OperationsPerInvocation(100) - public void testSend_100() { - for (int i = 0; i < 100; i++) { - amazonSnsTemplate.send(RequestEntry.builder().withValue(i).build()); - } - } - - @Benchmark - @OperationsPerInvocation(1000) - public void testSend_1000() { - for (int i = 0; i < 1000; i++) { - amazonSnsTemplate.send(RequestEntry.builder().withValue(i).build()); - } - } - - @Benchmark - @OperationsPerInvocation(10000) - public void testSend_10000() { - for (int i = 0; i < 10000; i++) { - amazonSnsTemplate.send(RequestEntry.builder().withValue(i).build()); - } - } - -} -// @formatter:on diff --git a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmarkRunner.java b/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmarkRunner.java deleted file mode 100644 index 2454333..0000000 --- a/amazon-sns-java-messaging-lib-v1/src/test/java/com/amazon/sns/messaging/lib/core/jmh/AmazonSnsTemplateBenchmarkRunner.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2024 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.amazon.sns.messaging.lib.core.jmh; - -public class AmazonSnsTemplateBenchmarkRunner { - - public static void main(final String[] args) throws Exception { - org.openjdk.jmh.Main.main(args); - } - -} diff --git a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java index 2d309d6..04e62ea 100644 --- a/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java +++ b/amazon-sns-java-messaging-lib-v2/src/test/java/com/amazon/sns/messaging/lib/core/AmazonSnsProducerAsyncTest.java @@ -163,12 +163,12 @@ void testFailureMultipleEntry() { assertThat(result, notNullValue()); })); - entries(3000000).forEach(entry -> { + entries(30000).forEach(entry -> { snsTemplate.send(entry).addCallback(null, failureCallback); }); snsTemplate.await().thenAccept(result -> { - verify(failureCallback, timeout(300000).times(3000000)).accept(any()); + verify(failureCallback, timeout(300000).times(30000)).accept(any()); verify(amazonSNS, atLeastOnce()).publishBatch(any(PublishBatchRequest.class)); }).join(); }