diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java index 8749c9cd3af46..7c6c1d232aa4b 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java @@ -39,6 +39,7 @@ import java.util.Iterator; import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired; +import static org.apache.flink.util.Preconditions.checkState; /** * An implementation of {@link WindowBuffer} that buffers input elements in a {@link @@ -78,20 +79,27 @@ public RecordsWindowBuffer( @Override public void addElement(RowData key, long sliceEnd, RowData element) throws Exception { - // track the lowest trigger time, if watermark exceeds the trigger time, - // it means there are some elements in the buffer belong to a window going to be fired, - // and we need to flush the buffer into state for firing. - minSliceEnd = Math.min(sliceEnd, minSliceEnd); - reuseWindowKey.replace(sliceEnd, key); - LookupInfo> lookup = recordsBuffer.lookup(reuseWindowKey); - try { - recordsBuffer.append(lookup, recordSerializer.toBinaryRow(element)); - } catch (EOFException e) { - // buffer is full, flush it to state - flush(); - // remember to add the input element again - addElement(key, sliceEnd, element); + while (true) { + LookupInfo> lookup = recordsBuffer.lookup(reuseWindowKey); + try { + recordsBuffer.append(lookup, recordSerializer.toBinaryRow(element)); + // Track the lowest trigger time. If watermark exceeds the trigger time, + // it means there are some elements in the buffer belong to a window going + // to be fired, and we need to flush the buffer into state for firing. + minSliceEnd = Math.min(sliceEnd, minSliceEnd); + break; + } catch (EOFException e) { + if (recordsBuffer.getNumKeys() == 0) { + // Buffer is empty, retry won't help (record is too large for the buffer) + throw e; + } + // Buffer has data, flush and retry + flush(); + checkState( + recordsBuffer.getNumKeys() == 0, + "The recordsBuffer should be empty after flushing."); + } } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java index 9fc7d81d72a9a..044297a96d7e8 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java @@ -36,7 +36,7 @@ * frequently accessing state, or flushes to output to reduce shuffling data. */ @Internal -public interface WindowBuffer { +public interface WindowBuffer extends AutoCloseable { /** * Adds an element with associated key into the buffer. The buffer may temporarily buffer the @@ -72,9 +72,6 @@ public interface WindowBuffer { */ void flush() throws Exception; - /** Release resources allocated by this buffer. */ - void close() throws Exception; - // ------------------------------------------------------------------------ /** A factory that creates a {@link WindowBuffer}. */ diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBufferTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBufferTest.java new file mode 100644 index 0000000000000..69061effcb52e --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBufferTest.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.aggregate.window.buffers; + +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.memory.MemoryManagerBuilder; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.runtime.util.WindowKey; +import org.apache.flink.table.runtime.util.collections.binary.BytesMap; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.EOFException; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link RecordsWindowBuffer}. */ +class RecordsWindowBufferTest { + + private static final int PAGE_SIZE = (int) MemorySize.parse("32 kb").getBytes(); + + /** Minimum memory size must be > {@link BytesMap#INIT_BUCKET_MEMORY_IN_BYTES}(1MB). */ + private static final long MIN_MEMORY_SIZE = MemorySize.parse("2 mb").getBytes(); + + private MemoryManager memoryManager; + private RowDataSerializer keySer; + private RowDataSerializer valueSer; + + @BeforeEach + void setUp() { + memoryManager = + MemoryManagerBuilder.newBuilder() + .setMemorySize(MIN_MEMORY_SIZE) + .setPageSize(PAGE_SIZE) + .build(); + + RowType keyType = new RowType(Arrays.asList(new RowType.RowField("key", new IntType()))); + RowType valueType = + new RowType( + Arrays.asList( + new RowType.RowField("key", new IntType()), + new RowType.RowField("data", new VarCharType(Integer.MAX_VALUE)))); + + keySer = new RowDataSerializer(keyType); + valueSer = new RowDataSerializer(valueType); + } + + @AfterEach + void tearDown() { + if (memoryManager != null) { + memoryManager.shutdown(); + } + } + + private RecordsWindowBuffer createBuffer(RecordsCombiner combiner) { + return new RecordsWindowBuffer( + this, + memoryManager, + MIN_MEMORY_SIZE, + combiner, + keySer, + valueSer, + false, + ZoneId.of("UTC")); + } + + private static String createLargeString(int sizeInKB) { + char[] chars = new char[sizeInKB * 1024]; + Arrays.fill(chars, 'x'); + return new String(chars); + } + + /** + * Recoverable scenario: buffer has data (numKeys > 0), EOFException triggers flush, retry + * succeeds. + */ + @Test + void testFlushAndRetrySucceeds() throws Exception { + List> flushedRecords = new ArrayList<>(); + RecordsCombiner combiner = + new RecordsCombiner() { + @Override + public void combine(WindowKey windowKey, Iterator records) { + List batch = new ArrayList<>(); + while (records.hasNext()) { + batch.add(records.next()); + } + flushedRecords.add(batch); + } + + @Override + public void close() {} + }; + + try (RecordsWindowBuffer buffer = createBuffer(combiner)) { + String largeData = createLargeString(100); + + int totalRecords = 50; + for (int i = 0; i < totalRecords; i++) { + GenericRowData key = GenericRowData.of(i); + GenericRowData value = GenericRowData.of(i, StringData.fromString(largeData + i)); + buffer.addElement(key, 1000L, value); + } + + buffer.flush(); + + assertThat(flushedRecords).hasSizeGreaterThanOrEqualTo(2); + + int totalFlushedRecords = flushedRecords.stream().mapToInt(List::size).sum(); + assertThat(totalFlushedRecords).isEqualTo(totalRecords); + } + } + + /** + * Unrecoverable scenario (1st attempt): empty buffer (numKeys == 0), single record too large. + * Should throw EOFException immediately without flush. + */ + @Test + void testFirstUnrecoverableAttemptOnEmptyBuffer() throws Exception { + final boolean[] flushCalled = {false}; + RecordsCombiner combiner = + new RecordsCombiner() { + @Override + public void combine(WindowKey windowKey, Iterator records) { + flushCalled[0] = true; + } + + @Override + public void close() {} + }; + + try (RecordsWindowBuffer buffer = createBuffer(combiner)) { + String largeString = createLargeString(4 * 1024); // 4MB > 2MB buffer + + GenericRowData key = GenericRowData.of(1); + GenericRowData value = GenericRowData.of(1, StringData.fromString(largeString)); + + assertThatThrownBy(() -> buffer.addElement(key, 1000L, value)) + .isInstanceOf(EOFException.class); + + assertThat(flushCalled[0]).isFalse(); + } + } + + /** + * Tests that minSliceEnd is correctly tracked when an internal flush occurs during addElement() + * due to buffer overflow. + */ + @Test + void testMinSliceEndPreservedAfterInternalFlush() throws Exception { + List flushedSliceEnds = new ArrayList<>(); + RecordsCombiner combiner = + new RecordsCombiner() { + @Override + public void combine(WindowKey windowKey, Iterator records) { + flushedSliceEnds.add(windowKey.getWindow()); + while (records.hasNext()) { + records.next(); + } + } + + @Override + public void close() {} + }; + + try (RecordsWindowBuffer buffer = createBuffer(combiner)) { + String largeData = createLargeString(100); + + // Fill buffer to trigger internal flush on next large record + int numRecordsToFillBuffer = 18; + for (int i = 0; i < numRecordsToFillBuffer; i++) { + GenericRowData key = GenericRowData.of(i); + GenericRowData value = GenericRowData.of(i, StringData.fromString(largeData + i)); + buffer.addElement(key, 1000L, value); + } + + flushedSliceEnds.clear(); + + // Add record with smaller sliceEnd, triggers internal flush + GenericRowData key = GenericRowData.of(999); + GenericRowData value = GenericRowData.of(999, StringData.fromString(largeData + 999)); + buffer.addElement(key, 500L, value); + + flushedSliceEnds.clear(); + + // Verify advanceProgress triggers flush for sliceEnd=500 + buffer.advanceProgress(500L); + + assertThat(flushedSliceEnds).contains(500L); + } + } + + /** + * Unrecoverable scenario (after flush): buffer has small records, then oversized record. Flush + * clears buffer (numKeys = 0), retry still fails. Should throw EOFException. + */ + @Test + void testUnrecoverableErrorAfterFlushRetryStillFails() throws Exception { + List flushedKeyIds = new ArrayList<>(); + RecordsCombiner combiner = + new RecordsCombiner() { + @Override + public void combine(WindowKey windowKey, Iterator records) { + while (records.hasNext()) { + RowData record = records.next(); + flushedKeyIds.add(record.getInt(0)); + } + } + + @Override + public void close() {} + }; + + try (RecordsWindowBuffer buffer = createBuffer(combiner)) { + for (int i = 0; i < 5; i++) { + GenericRowData key = GenericRowData.of(i); + GenericRowData value = GenericRowData.of(i, StringData.fromString("small" + i)); + buffer.addElement(key, 1000L, value); + } + + String largeString = createLargeString(4 * 1024); // 4MB > 2MB buffer + GenericRowData oversizedKey = GenericRowData.of(999); + GenericRowData oversizedValue = + GenericRowData.of(999, StringData.fromString(largeString)); + + assertThatThrownBy(() -> buffer.addElement(oversizedKey, 1000L, oversizedValue)) + .isInstanceOf(EOFException.class); + + assertThat(flushedKeyIds).containsExactly(0, 1, 2, 3, 4); + } + } +}