From e75a4f249a36d6b6626e835ee8c554c1a2183d83 Mon Sep 17 00:00:00 2001 From: Ry Biesemeyer Date: Thu, 3 Jul 2025 06:30:58 +0000 Subject: [PATCH] PQ: avoid deserialization cost (P-O-C) Avoids deserialization cost by holding a list of soft-references on each PQ page to the actual event objects from the page, allowing GC to collect the referenced objects in the event of memory pressure. While forming batches during reads, elements are returned from the cache until one is not present, at which point we fall through to reading the bytes from the memory-mapped file, returning an intermediate `BoxedQueueable` in either case. The elements are then unboxed outside of the queue locking mechanism. --- .../java/org/logstash/ackedqueue/Batch.java | 17 ++- .../logstash/ackedqueue/BoxedQueueable.java | 56 +++++++++ .../java/org/logstash/ackedqueue/Page.java | 117 ++++++++++++++++-- .../org/logstash/ackedqueue/PageFactory.java | 4 +- .../java/org/logstash/ackedqueue/Queue.java | 46 ++++--- .../org/logstash/ackedqueue/Queueable.java | 1 + .../logstash/ackedqueue/SequencedList.java | 29 ++++- .../logstash/ackedqueue/io/LongVector.java | 84 ------------- .../logstash/ackedqueue/io/MmapPageIOV1.java | 4 +- .../logstash/ackedqueue/io/MmapPageIOV2.java | 5 +- .../org/logstash/ackedqueue/HeadPageTest.java | 12 +- .../org/logstash/ackedqueue/QueueTest.java | 2 +- .../ackedqueue/io/LongVectorTest.java | 60 --------- 13 files changed, 236 insertions(+), 201 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/ackedqueue/BoxedQueueable.java delete mode 100644 logstash-core/src/main/java/org/logstash/ackedqueue/io/LongVector.java delete mode 100644 logstash-core/src/test/java/org/logstash/ackedqueue/io/LongVectorTest.java diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java index 9d08bb01a05..4da5a43f6f0 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java @@ -38,15 +38,8 @@ public class Batch implements Closeable { private final Queue queue; private final AtomicBoolean closed; - public Batch(SequencedList serialized, Queue q) { - this( - serialized.getElements(), - serialized.getSeqNums().size() == 0 ? -1L : serialized.getSeqNums().get(0), q - ); - } - - public Batch(List elements, long firstSeqNum, Queue q) { - this.elements = deserializeElements(elements, q); + Batch(List elements, long firstSeqNum, Queue q) { + this.elements = elements; this.firstSeqNum = elements.isEmpty() ? -1L : firstSeqNum; this.queue = q; this.closed = new AtomicBoolean(false); @@ -90,4 +83,10 @@ private static List deserializeElements(List serialized, Queu } return deserialized; } + + private static long minSeqNum(SequencedList sequencedList) { + return sequencedList.getMinSeqNum(); + } + + } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/BoxedQueueable.java b/logstash-core/src/main/java/org/logstash/ackedqueue/BoxedQueueable.java new file mode 100644 index 00000000000..82346c0fcf2 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/BoxedQueueable.java @@ -0,0 +1,56 @@ +package org.logstash.ackedqueue; + +import java.util.function.Function; + +/** + * In its simplest form a {@code BoxedQueueable} is a box around a {@link Queueable}, + * which can be useful for passing a mixture of live references and to-be-deserialized + * byte arrays. it is an internal implementation detail of the acked queue. + */ +interface BoxedQueueable { + Queueable unbox(); + + static BoxedQueueable fromLiveReference(final Queueable queueable) { + return new LiveReference(queueable); + } + + static BoxedQueueable fromSerializedBytes(final byte[] bytes, Function deserializer) { + return new SerializedBytes(bytes, deserializer); + } + + /** + * A {@code BoxedQueueable.LiveReference} is an implementation of {@link BoxedQueueable} that + * wraps a live object + */ + class LiveReference implements BoxedQueueable { + private final Queueable boxed; + + public LiveReference(Queueable boxed) { + this.boxed = boxed; + } + + @Override + public Queueable unbox() { + return this.boxed; + } + } + + /** + * A {@code BoxedQueueable.SerializedBytes} is an implementation of {@link BoxedQueueable} that + * wraps bytes and a deserializer + */ + class SerializedBytes implements BoxedQueueable { + private final byte[] bytes; + private final Function deserializer; + + public SerializedBytes(byte[] bytes, Function deserializer) { + this.bytes = bytes; + this.deserializer = deserializer; + } + + @Override + public Queueable unbox() { + return deserializer.apply(bytes); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Page.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Page.java index 2cc77c788dc..03b5a33b808 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Page.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Page.java @@ -23,7 +23,12 @@ import com.google.common.primitives.Ints; import java.io.Closeable; import java.io.IOException; +import java.lang.ref.SoftReference; +import java.util.ArrayList; import java.util.BitSet; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + import org.codehaus.commons.nullanalysis.NotNull; import org.logstash.ackedqueue.io.CheckpointIO; import org.logstash.ackedqueue.io.PageIO; @@ -41,6 +46,8 @@ public final class Page implements Closeable { protected PageIO pageIO; private boolean writable; + private final ElementCache elementCache; + // bit 0 is minSeqNum // TODO: go steal LocalCheckpointService in feature/seq_no from ES // TODO: https://github.com/elastic/elasticsearch/blob/feature/seq_no/core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointService.java @@ -59,6 +66,8 @@ public Page(int pageNum, Queue queue, long minSeqNum, int elementCount, long fir this.pageIO = pageIO; this.writable = writable; + this.elementCache = writable ? new ElementCache<>() : null; + assert this.pageIO != null : "invalid null pageIO"; } @@ -71,30 +80,55 @@ public String toString() { * @return {@link SequencedList} collection of serialized elements read * @throws IOException if an IO error occurs */ - public SequencedList read(int limit) throws IOException { + SequencedList read(int limit) throws IOException { // first make sure this page is activated, activating previously activated is harmless this.pageIO.activate(); - SequencedList serialized = this.pageIO.read(this.firstUnreadSeqNum, limit); - assert serialized.getSeqNums().get(0) == this.firstUnreadSeqNum : - String.format("firstUnreadSeqNum=%d != first result seqNum=%d", this.firstUnreadSeqNum, serialized.getSeqNums().get(0)); + // rescope limit + limit = Math.max(0, Math.min(limit, elementCount - Math.toIntExact(firstUnreadSeqNum-minSeqNum))); + + final ArrayList elements = new ArrayList<>(limit); + + int pageOffset = Math.toIntExact(this.firstUnreadSeqNum - this.minSeqNum); + + // first attempt to read from the event cache; as soon as we are unable + // to read events directly from the cache, fall through to the pageIO + if (this.elementCache != null) { + for (Queueable cachedElement : elementCache.releaseMany(this.firstUnreadSeqNum, limit)) { + elements.add(BoxedQueueable.fromLiveReference(cachedElement)); + } + } + + int limitRemaining = limit - elements.size(); + if (limitRemaining > 0 && pageOffset < this.elementCount) { + SequencedList serialized = this.pageIO.read(this.firstUnreadSeqNum, limitRemaining); + assert serialized.getMinSeqNum() == this.firstUnreadSeqNum + elements.size() : + String.format("firstUnreadSeqNum=%d != first result seqNum=%d + cached=%d", this.firstUnreadSeqNum, serialized.getMinSeqNum(), elements.size()); + for (SequencedList.Entry entry : serialized.entries()) { + elements.add(BoxedQueueable.fromSerializedBytes(entry.element, this.queue::deserialize)); + } + } + + final SequencedList result = new SequencedList<>(elements, this.firstUnreadSeqNum); - this.firstUnreadSeqNum += serialized.getElements().size(); + this.firstUnreadSeqNum += elements.size(); - return serialized; + return result; } - public void write(byte[] bytes, long seqNum, int checkpointMaxWrites) throws IOException { + public void write(Queueable element, byte[] bytes, long seqNum, int checkpointMaxWrites) throws IOException { if (! this.writable) { throw new IllegalStateException(String.format("page=%d is not writable", this.pageNum)); } this.pageIO.write(bytes, seqNum); + this.elementCache.stash(element, seqNum); if (this.minSeqNum <= 0) { this.minSeqNum = seqNum; this.firstUnreadSeqNum = seqNum; } + this.elementCount++; // force a checkpoint if we wrote checkpointMaxWrites elements since last checkpoint @@ -154,7 +188,7 @@ public boolean ack(long firstSeqNum, int count, int checkpointMaxAcks) throws IO this.minSeqNum, this.elementCount, this.minSeqNum + this.elementCount ); final int offset = Ints.checkedCast(firstSeqNum - this.minSeqNum); - ackedSeqNums.flip(offset, offset + count); + ackedSeqNums.set(offset, offset + count); // checkpoint if totally acked or we acked more than checkpointMaxAcks elements in this page since last checkpoint // note that fully acked pages cleanup is done at queue level in Queue.ack() final long firstUnackedSeqNum = firstUnackedSeqNum(); @@ -250,6 +284,9 @@ public void behead() throws IOException { */ public void deactivate() throws IOException { this.getPageIO().deactivate(); + if (this.elementCache != null) { + this.elementCache.releaseAll(); + } } public boolean hasSpace(int byteSize) { @@ -300,4 +337,68 @@ protected long firstUnackedSeqNum() { return this.ackedSeqNums.nextClearBit(0) + this.minSeqNum; } + static class ReleaseableSoftReference extends SoftReference { + public ReleaseableSoftReference(T referent) { + super(referent); + } + public T getAndReleaseElement() { + T element = super.get(); + super.clear(); + return element; + } + } + + static class ElementCache { + private long minSeqNum = Long.MIN_VALUE; + private int elementCount = 0; + private ArrayList> elementCache; + + private static final AtomicLong ID_GENERATOR = new AtomicLong(); + private final long id = ID_GENERATOR.incrementAndGet(); + + public void stash(T element, long seqNum) { + final int offset; + if (minSeqNum < 0) { + minSeqNum = seqNum; + elementCache = new ArrayList<>(); + offset = 0; + } else { + offset = Math.toIntExact(seqNum - minSeqNum); + } + + elementCache.add(offset, new ReleaseableSoftReference<>(element)); + elementCount++; + } + + public T release(long seqNum) { + if (elementCount <= 0 || seqNum < minSeqNum) { + return null; + } + final int offset = Math.toIntExact(seqNum - minSeqNum); + if (offset >= elementCount) { + return null; + } + final ReleaseableSoftReference releaseableSoftReference = elementCache.get(offset); + if (releaseableSoftReference == null) { + return null; + } + return releaseableSoftReference.getAndReleaseElement(); + } + + public List releaseMany(long seqNum, int limit) { + final List elements = new ArrayList<>(limit); + for (; elements.size() < limit; seqNum++) { + T released = release(seqNum); + if (released == null) { break; } + elements.add(released); + } + return elements; + } + + public void releaseAll() { + this.minSeqNum = Long.MIN_VALUE; + this.elementCache = null; + this.elementCount = 0; + } + } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/PageFactory.java b/logstash-core/src/main/java/org/logstash/ackedqueue/PageFactory.java index 2240da30623..dcd78d245ff 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/PageFactory.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/PageFactory.java @@ -64,7 +64,7 @@ public static Page newHeadPage(Checkpoint checkpoint, Queue queue, PageIO pageIO // this page ackedSeqNums bitset is a new empty bitset, if we have some acked elements, set them in the bitset if (checkpoint.getFirstUnackedSeqNum() > checkpoint.getMinSeqNum()) { - p.ackedSeqNums.flip(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum())); + p.ackedSeqNums.set(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum())); } return p; @@ -97,7 +97,7 @@ public static Page newTailPage(Checkpoint checkpoint, Queue queue, PageIO pageIO try { // this page ackedSeqNums bitset is a new empty bitset, if we have some acked elements, set them in the bitset if (checkpoint.getFirstUnackedSeqNum() > checkpoint.getMinSeqNum()) { - p.ackedSeqNums.flip(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum())); + p.ackedSeqNums.set(0, (int) (checkpoint.getFirstUnackedSeqNum() - checkpoint.getMinSeqNum())); } return p; diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 691987793c5..48bc2794041 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -130,6 +130,16 @@ public Queue(Settings settings) { } } + Batch createBatch(SequencedList boxedElements) { + return createBatch(boxedElements.getElements(), boxedElements.getMinSeqNum()); + } + + Batch createBatch(List boxes, long firstSeqNum) { + final ArrayList elements = new ArrayList<>(boxes.size()); + boxes.stream().map(BoxedQueueable::unbox).forEach(elements::add); + return new Batch(elements, firstSeqNum, this); + } + public String getDirPath() { return this.dirPath.toString(); } @@ -460,7 +470,7 @@ public long write(Queueable element) throws IOException { } long seqNum = this.seqNum += 1; - this.headPage.write(data, seqNum, this.checkpointMaxWrites); + this.headPage.write(element, data, seqNum, this.checkpointMaxWrites); this.unreadCount++; maybeSignalReadDemand(needsForceFlush); @@ -601,18 +611,18 @@ public void ensurePersistedUpto(long seqNum) throws IOException{ * @throws IOException if an IO error occurs */ public synchronized Batch nonBlockReadBatch(int limit) throws IOException { - final SerializedBatchHolder serializedBatchHolder; + final BoxedBatchHolder boxedBatchHolder; lock.lock(); try { Page p = nextReadPage(); if (isHeadPage(p) && p.isFullyRead()) { return null; } - serializedBatchHolder = readPageBatch(p, limit, 0L); + boxedBatchHolder = readPageBatch(p, limit, 0L); } finally { lock.unlock(); } - return serializedBatchHolder.deserialize(); + return boxedBatchHolder.unbox(); } /** @@ -624,10 +634,10 @@ public synchronized Batch nonBlockReadBatch(int limit) throws IOException { * @throws IOException if an IO error occurs */ public Batch readBatch(int limit, long timeout) throws IOException { - return readSerializedBatch(limit, timeout).deserialize(); + return readBoxedBatch(limit, timeout).unbox(); } - private synchronized SerializedBatchHolder readSerializedBatch(int limit, long timeout) throws IOException { + private synchronized BoxedBatchHolder readBoxedBatch(int limit, long timeout) throws IOException { lock.lock(); try { @@ -638,7 +648,7 @@ private synchronized SerializedBatchHolder readSerializedBatch(int limit, long t } /** - * read a {@link SerializedBatchHolder} from the given {@link Page}. If the page is a head page, try to maximize the + * read a {@link BoxedBatchHolder} from the given {@link Page}. If the page is a head page, try to maximize the * batch size by waiting for writes. * @param p the {@link Page} to read from. * @param limit size limit of the batch to read. @@ -646,9 +656,9 @@ private synchronized SerializedBatchHolder readSerializedBatch(int limit, long t * @return {@link Batch} with read elements or null if nothing was read * @throws IOException if an IO error occurs */ - private SerializedBatchHolder readPageBatch(Page p, int limit, long timeout) throws IOException { + private BoxedBatchHolder readPageBatch(Page p, int limit, long timeout) throws IOException { int left = limit; - final List elements = new ArrayList<>(limit); + final List elements = new ArrayList<>(limit); // NOTE: the tricky thing here is that upon entering this method, if p is initially a head page // it could become a tail page upon returning from the notEmpty.await call. @@ -673,12 +683,12 @@ private SerializedBatchHolder readPageBatch(Page p, int limit, long timeout) thr if (! p.isFullyRead()) { boolean wasFull = isMaxUnreadReached(); - final SequencedList serialized = p.read(left); + final SequencedList serialized = p.read(left); int n = serialized.getElements().size(); assert n > 0 : "page read returned 0 elements"; elements.addAll(serialized.getElements()); if (firstSeqNum == -1L) { - firstSeqNum = serialized.getSeqNums().get(0); + firstSeqNum = serialized.getMinSeqNum(); } this.unreadCount -= n; @@ -698,7 +708,7 @@ private SerializedBatchHolder readPageBatch(Page p, int limit, long timeout) thr removeUnreadPage(p); } - return new SerializedBatchHolder(elements, firstSeqNum); + return new BoxedBatchHolder(elements, firstSeqNum); } /** @@ -915,17 +925,17 @@ private static boolean containsSeq(final Page page, final long seqNum) { return seqNum >= pMinSeq && seqNum < pMaxSeq; } - class SerializedBatchHolder { - private final List elements; + class BoxedBatchHolder { + private final List boxedElements; private final long firstSeqNum; - private SerializedBatchHolder(List elements, long firstSeqNum) { - this.elements = elements; + BoxedBatchHolder(final List boxedElements, final long firstSeqNum) { + this.boxedElements = boxedElements; this.firstSeqNum = firstSeqNum; } - private Batch deserialize() { - return new Batch(elements, firstSeqNum, Queue.this); + Batch unbox() { + return Queue.this.createBatch(boxedElements, firstSeqNum); } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queueable.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queueable.java index 7cb203d256f..5c507a31bc8 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queueable.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queueable.java @@ -30,4 +30,5 @@ public interface Queueable { byte[] serialize() throws IOException; static Object deserialize(byte[] bytes) { throw new RuntimeException("please implement deserialize"); }; + } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/SequencedList.java b/logstash-core/src/main/java/org/logstash/ackedqueue/SequencedList.java index 4a26a47e6e9..5ce753c5fca 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/SequencedList.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/SequencedList.java @@ -20,26 +20,43 @@ package org.logstash.ackedqueue; +import java.util.ArrayList; import java.util.List; -import org.logstash.ackedqueue.io.LongVector; /** * Carries sequence numbers and items read from queue. * */ public class SequencedList { private final List elements; - private final LongVector seqNums; + private final long minSeqNum; - public SequencedList(List elements, LongVector seqNums) { + public SequencedList(List elements, long minSeqNum) { this.elements = elements; - this.seqNums = seqNums; + this.minSeqNum = minSeqNum; } public List getElements() { return elements; } - public LongVector getSeqNums() { - return seqNums; + public long getMinSeqNum() { + return this.minSeqNum; + } + + public List> entries() { + List> entries = new ArrayList<>(elements.size()); + for (int i = 0; i < elements.size(); i++) { + entries.add(new Entry<>(elements.get(i), this.minSeqNum + i)); + } + return entries; + } + + public static class Entry { + public final E element; + public final long seqNum; + public Entry(E element, long seqNum) { + this.element = element; + this.seqNum = seqNum; + } } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/io/LongVector.java b/logstash-core/src/main/java/org/logstash/ackedqueue/io/LongVector.java deleted file mode 100644 index 05343a8a5eb..00000000000 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/io/LongVector.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.logstash.ackedqueue.io; - -/** - * Internal class used in persistent queue implementation. - * - * It'a vector that stores primitives long (no autoboxing) expanding, by copy, the underling array when - * more space is needed. - * */ -public final class LongVector { - - private int count; - - private long[] data; - - public LongVector(final int size) { - data = new long[size]; - count = 0; - } - - /** - * Store the {@code long} to the underlying {@code long[]}, resizing it if necessary. - * @param num Long to store - */ - public void add(final long num) { - if (data.length < count + 1) { - final long[] old = data; - data = new long[(data.length << 1) + 1]; - System.arraycopy(old, 0, data, 0, old.length); - } - data[count++] = num; - } - - /** - * Store the {@code long[]} to the underlying {@code long[]}, resizing it if necessary. - * @param nums {@code long[]} to store - */ - public void add(final LongVector nums) { - if (data.length < count + nums.size()) { - final long[] old = data; - data = new long[(data.length << 1) + nums.size()]; - System.arraycopy(old, 0, data, 0, old.length); - } - for (int i = 0; i < nums.size(); i++) { - data[count + i] = nums.get(i); - } - count += nums.size(); - } - - /** - * Get value stored at given index. - * @param index Array index (only values smaller than {@link LongVector#count} are valid) - * @return Int - */ - public long get(final int index) { - return data[index]; - } - - /** - * @return Number of elements stored in this instance - */ - public int size() { - return count; - } -} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV1.java b/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV1.java index 25011d22b56..9040dc9d603 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV1.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV1.java @@ -106,7 +106,6 @@ public SequencedList read(long seqNum, int limit) throws IOException { String.format("seqNum=%d is > maxSeqNum=%d", seqNum, maxSeqNum()); List elements = new ArrayList<>(); - final LongVector seqNums = new LongVector(limit); int offset = this.offsetMap.get((int) (seqNum - this.minSeqNum)); @@ -128,14 +127,13 @@ public SequencedList read(long seqNum, int limit) throws IOException { } elements.add(readBytes); - seqNums.add(readSeqNum); if (seqNum + i >= maxSeqNum()) { break; } } - return new SequencedList<>(elements, seqNums); + return new SequencedList<>(elements, seqNum); } public void recover() { diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV2.java b/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV2.java index 736da422bbb..91f19b6906d 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV2.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/io/MmapPageIOV2.java @@ -116,7 +116,6 @@ public SequencedList read(long seqNum, int limit) throws IOException { String.format("seqNum=%d is > maxSeqNum=%d", seqNum, maxSeqNum()); List elements = new ArrayList<>(); - final LongVector seqNums = new LongVector(limit); int offset = this.offsetMap.get((int) (seqNum - this.minSeqNum)); @@ -138,14 +137,13 @@ public SequencedList read(long seqNum, int limit) throws IOException { } elements.add(readBytes); - seqNums.add(readSeqNum); if (seqNum + i >= maxSeqNum()) { break; } } - return new SequencedList<>(elements, seqNums); + return new SequencedList<>(elements, seqNum); } // recover will overwrite/update/set this object minSeqNum, capacity and elementCount attributes @@ -236,7 +234,6 @@ public void close() { if (this.buffer != null) { this.buffer.force(); BUFFER_CLEANER.clean(buffer); - } this.buffer = null; } diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/HeadPageTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/HeadPageTest.java index 810ad7f031b..77b675a060f 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/HeadPageTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/HeadPageTest.java @@ -77,7 +77,7 @@ public void pageWrite() throws IOException { Page p = q.headPage; assertThat(p.hasSpace(element.serialize().length), is(true)); - p.write(element.serialize(), 0, 1); + p.write(element, element.serialize(), 0, 1); assertThat(p.hasSpace(element.serialize().length), is(false)); assertThat(p.isFullyRead(), is(false)); @@ -96,9 +96,9 @@ public void pageWriteAndReadSingle() throws IOException { Page p = q.headPage; assertThat(p.hasSpace(element.serialize().length), is(true)); - p.write(element.serialize(), seqNum, 1); + p.write(element, element.serialize(), seqNum, 1); - Batch b = new Batch(p.read(1), q); + Batch b = q.createBatch(p.read(1)); assertThat(b.getElements().size(), is(equalTo(1))); assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString()))); @@ -119,7 +119,7 @@ public void inEmpty() throws IOException { Page p = q.headPage; assertThat(p.isEmpty(), is(true)); - p.write(element.serialize(), 1, 1); + p.write(element, element.serialize(), 1, 1); assertThat(p.isEmpty(), is(false)); Batch b = q.readBatch(1, TimeUnit.SECONDS.toMillis(1)); assertThat(p.isEmpty(), is(false)); @@ -141,9 +141,9 @@ public void pageWriteAndReadMulti() throws IOException { Page p = q.headPage; assertThat(p.hasSpace(element.serialize().length), is(true)); - p.write(element.serialize(), seqNum, 1); + p.write(element, element.serialize(), seqNum, 1); - Batch b = new Batch(p.read(10), q); + Batch b = q.createBatch(p.read(10)); assertThat(b.getElements().size(), is(equalTo(1))); assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString()))); diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index 1b69fb1bec5..c0f418c84bd 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -981,7 +981,7 @@ public void testZeroByteFullyAckedPageOnOpen() throws IOException { // work directly on the tail page and not the queue to avoid having the queue purge the page // but make sure the tail page checkpoint marks it as fully acked Page tp = q.tailPages.get(0); - Batch b = new Batch(tp.read(1), q); + Batch b = q.createBatch(tp.read(1)); assertThat(b.getElements().get(0), is(element1)); tp.ack(firstSeq, 1, 1); assertThat(tp.isFullyAcked(), is(true)); diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/io/LongVectorTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/io/LongVectorTest.java deleted file mode 100644 index 10a690be66c..00000000000 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/io/LongVectorTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.logstash.ackedqueue.io; - -import org.junit.Test; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -public class LongVectorTest { - - @Test - public void storesAndResizes() { - final int count = 10_000; - final LongVector vector = new LongVector(1000); - for (long i = 0L; i < count; ++i) { - vector.add(i); - } - assertThat(vector.size(), is(count)); - for (int i = 0; i < count; ++i) { - assertThat((long) i, is(vector.get(i))); - } - } - - @Test - public void storesVectorAndResizes() { - final int count = 1000; - final LongVector vector1 = new LongVector(count); - for (long i = 0L; i < count; ++i) { - vector1.add(i); - } - final LongVector vector2 = new LongVector(count); - for (long i = 0L + count; i < 2 * count; ++i) { - vector2.add(i); - } - vector1.add(vector2); - assertThat(vector1.size(), is(2 * count)); - for (int i = 0; i < 2 * count; ++i) { - assertThat((long) i, is(vector1.get(i))); - } - } -}