Skip to content

Commit 644f461

Browse files
committed
[FLINK-38746][table-runtime] Fix StackOverflowError in RecordsWindowBuffer.addElement
1 parent be6e7c5 commit 644f461

File tree

2 files changed

+285
-13
lines changed

2 files changed

+285
-13
lines changed

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/RecordsWindowBuffer.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Iterator;
4040

4141
import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
42+
import static org.apache.flink.util.Preconditions.checkState;
4243

4344
/**
4445
* An implementation of {@link WindowBuffer} that buffers input elements in a {@link
@@ -78,20 +79,27 @@ public RecordsWindowBuffer(
7879

7980
@Override
8081
public void addElement(RowData key, long sliceEnd, RowData element) throws Exception {
81-
// track the lowest trigger time, if watermark exceeds the trigger time,
82-
// it means there are some elements in the buffer belong to a window going to be fired,
83-
// and we need to flush the buffer into state for firing.
84-
minSliceEnd = Math.min(sliceEnd, minSliceEnd);
85-
8682
reuseWindowKey.replace(sliceEnd, key);
87-
LookupInfo<WindowKey, Iterator<RowData>> lookup = recordsBuffer.lookup(reuseWindowKey);
88-
try {
89-
recordsBuffer.append(lookup, recordSerializer.toBinaryRow(element));
90-
} catch (EOFException e) {
91-
// buffer is full, flush it to state
92-
flush();
93-
// remember to add the input element again
94-
addElement(key, sliceEnd, element);
83+
while (true) {
84+
LookupInfo<WindowKey, Iterator<RowData>> lookup = recordsBuffer.lookup(reuseWindowKey);
85+
try {
86+
recordsBuffer.append(lookup, recordSerializer.toBinaryRow(element));
87+
// Track the lowest trigger time. If watermark exceeds the trigger time,
88+
// it means there are some elements in the buffer belong to a window going
89+
// to be fired, and we need to flush the buffer into state for firing.
90+
minSliceEnd = Math.min(sliceEnd, minSliceEnd);
91+
break;
92+
} catch (EOFException e) {
93+
if (recordsBuffer.getNumKeys() == 0) {
94+
// Buffer is empty, retry won't help (record is too large for the buffer)
95+
throw e;
96+
}
97+
// Buffer has data, flush and retry
98+
flush();
99+
checkState(
100+
recordsBuffer.getNumKeys() == 0,
101+
"The recordsBuffer should be empty after flushing.");
102+
}
95103
}
96104
}
97105

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.operators.aggregate.window.buffers;
20+
21+
import org.apache.flink.configuration.MemorySize;
22+
import org.apache.flink.runtime.memory.MemoryManager;
23+
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
24+
import org.apache.flink.table.data.GenericRowData;
25+
import org.apache.flink.table.data.RowData;
26+
import org.apache.flink.table.data.StringData;
27+
import org.apache.flink.table.runtime.operators.window.tvf.combines.RecordsCombiner;
28+
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
29+
import org.apache.flink.table.runtime.util.WindowKey;
30+
import org.apache.flink.table.runtime.util.collections.binary.BytesMap;
31+
import org.apache.flink.table.types.logical.IntType;
32+
import org.apache.flink.table.types.logical.RowType;
33+
import org.apache.flink.table.types.logical.VarCharType;
34+
35+
import org.junit.jupiter.api.AfterEach;
36+
import org.junit.jupiter.api.BeforeEach;
37+
import org.junit.jupiter.api.Test;
38+
39+
import java.io.EOFException;
40+
import java.time.ZoneId;
41+
import java.util.ArrayList;
42+
import java.util.Arrays;
43+
import java.util.Iterator;
44+
import java.util.List;
45+
46+
import static org.assertj.core.api.Assertions.assertThat;
47+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
48+
49+
/** Tests for {@link RecordsWindowBuffer}. */
50+
class RecordsWindowBufferTest {
51+
52+
private static final int PAGE_SIZE = (int) MemorySize.parse("32 kb").getBytes();
53+
54+
/** Minimum memory size must be > {@link BytesMap#INIT_BUCKET_MEMORY_IN_BYTES}(1MB). */
55+
private static final long MIN_MEMORY_SIZE = MemorySize.parse("2 mb").getBytes();
56+
57+
private MemoryManager memoryManager;
58+
private RowDataSerializer keySer;
59+
private RowDataSerializer valueSer;
60+
61+
@BeforeEach
62+
void setUp() {
63+
memoryManager =
64+
MemoryManagerBuilder.newBuilder()
65+
.setMemorySize(MIN_MEMORY_SIZE)
66+
.setPageSize(PAGE_SIZE)
67+
.build();
68+
69+
RowType keyType = new RowType(Arrays.asList(new RowType.RowField("key", new IntType())));
70+
RowType valueType =
71+
new RowType(
72+
Arrays.asList(
73+
new RowType.RowField("key", new IntType()),
74+
new RowType.RowField("data", new VarCharType(Integer.MAX_VALUE))));
75+
76+
keySer = new RowDataSerializer(keyType);
77+
valueSer = new RowDataSerializer(valueType);
78+
}
79+
80+
@AfterEach
81+
void tearDown() {
82+
if (memoryManager != null) {
83+
memoryManager.shutdown();
84+
}
85+
}
86+
87+
private RecordsWindowBuffer createBuffer(RecordsCombiner combiner) {
88+
return new RecordsWindowBuffer(
89+
this,
90+
memoryManager,
91+
MIN_MEMORY_SIZE,
92+
combiner,
93+
keySer,
94+
valueSer,
95+
false,
96+
ZoneId.of("UTC"));
97+
}
98+
99+
private static String createLargeString(int sizeInKB) {
100+
char[] chars = new char[sizeInKB * 1024];
101+
Arrays.fill(chars, 'x');
102+
return new String(chars);
103+
}
104+
105+
/**
106+
* Recoverable scenario: buffer has data (numKeys > 0), EOFException triggers flush, retry
107+
* succeeds.
108+
*/
109+
@Test
110+
void testFlushAndRetrySucceeds() throws Exception {
111+
List<List<RowData>> flushedRecords = new ArrayList<>();
112+
RecordsCombiner combiner =
113+
new RecordsCombiner() {
114+
@Override
115+
public void combine(WindowKey windowKey, Iterator<RowData> records) {
116+
List<RowData> batch = new ArrayList<>();
117+
while (records.hasNext()) {
118+
batch.add(records.next());
119+
}
120+
flushedRecords.add(batch);
121+
}
122+
123+
@Override
124+
public void close() {}
125+
};
126+
127+
try (RecordsWindowBuffer buffer = createBuffer(combiner)) {
128+
String largeData = createLargeString(100);
129+
130+
int totalRecords = 50;
131+
for (int i = 0; i < totalRecords; i++) {
132+
GenericRowData key = GenericRowData.of(i);
133+
GenericRowData value = GenericRowData.of(i, StringData.fromString(largeData + i));
134+
buffer.addElement(key, 1000L, value);
135+
}
136+
137+
buffer.flush();
138+
139+
assertThat(flushedRecords).hasSizeGreaterThanOrEqualTo(2);
140+
141+
int totalFlushedRecords = flushedRecords.stream().mapToInt(List::size).sum();
142+
assertThat(totalFlushedRecords).isEqualTo(totalRecords);
143+
}
144+
}
145+
146+
/**
147+
* Unrecoverable scenario (1st attempt): empty buffer (numKeys == 0), single record too large.
148+
* Should throw EOFException immediately without flush.
149+
*/
150+
@Test
151+
void testFirstUnrecoverableAttemptOnEmptyBuffer() throws Exception {
152+
final boolean[] flushCalled = {false};
153+
RecordsCombiner combiner =
154+
new RecordsCombiner() {
155+
@Override
156+
public void combine(WindowKey windowKey, Iterator<RowData> records) {
157+
flushCalled[0] = true;
158+
}
159+
160+
@Override
161+
public void close() {}
162+
};
163+
164+
try (RecordsWindowBuffer buffer = createBuffer(combiner)) {
165+
String largeString = createLargeString(4 * 1024); // 4MB > 2MB buffer
166+
167+
GenericRowData key = GenericRowData.of(1);
168+
GenericRowData value = GenericRowData.of(1, StringData.fromString(largeString));
169+
170+
assertThatThrownBy(() -> buffer.addElement(key, 1000L, value))
171+
.isInstanceOf(EOFException.class);
172+
173+
assertThat(flushCalled[0]).isFalse();
174+
}
175+
}
176+
177+
/**
178+
* Tests that minSliceEnd is correctly tracked when an internal flush occurs during addElement()
179+
* due to buffer overflow.
180+
*/
181+
@Test
182+
void testMinSliceEndPreservedAfterInternalFlush() throws Exception {
183+
List<Long> flushedSliceEnds = new ArrayList<>();
184+
RecordsCombiner combiner =
185+
new RecordsCombiner() {
186+
@Override
187+
public void combine(WindowKey windowKey, Iterator<RowData> records) {
188+
flushedSliceEnds.add(windowKey.getWindow());
189+
while (records.hasNext()) {
190+
records.next();
191+
}
192+
}
193+
194+
@Override
195+
public void close() {}
196+
};
197+
198+
try (RecordsWindowBuffer buffer = createBuffer(combiner)) {
199+
String largeData = createLargeString(100);
200+
201+
// Fill buffer to trigger internal flush on next large record
202+
int numRecordsToFillBuffer = 18;
203+
for (int i = 0; i < numRecordsToFillBuffer; i++) {
204+
GenericRowData key = GenericRowData.of(i);
205+
GenericRowData value = GenericRowData.of(i, StringData.fromString(largeData + i));
206+
buffer.addElement(key, 1000L, value);
207+
}
208+
209+
flushedSliceEnds.clear();
210+
211+
// Add record with smaller sliceEnd, triggers internal flush
212+
GenericRowData key = GenericRowData.of(999);
213+
GenericRowData value = GenericRowData.of(999, StringData.fromString(largeData + 999));
214+
buffer.addElement(key, 500L, value);
215+
216+
flushedSliceEnds.clear();
217+
218+
// Verify advanceProgress triggers flush for sliceEnd=500
219+
buffer.advanceProgress(500L);
220+
221+
assertThat(flushedSliceEnds).contains(500L);
222+
}
223+
}
224+
225+
/**
226+
* Unrecoverable scenario (after flush): buffer has small records, then oversized record. Flush
227+
* clears buffer (numKeys = 0), retry still fails. Should throw EOFException.
228+
*/
229+
@Test
230+
void testUnrecoverableErrorAfterFlushRetryStillFails() throws Exception {
231+
List<Integer> flushedKeyIds = new ArrayList<>();
232+
RecordsCombiner combiner =
233+
new RecordsCombiner() {
234+
@Override
235+
public void combine(WindowKey windowKey, Iterator<RowData> records) {
236+
while (records.hasNext()) {
237+
RowData record = records.next();
238+
flushedKeyIds.add(record.getInt(0));
239+
}
240+
}
241+
242+
@Override
243+
public void close() {}
244+
};
245+
246+
try (RecordsWindowBuffer buffer = createBuffer(combiner)) {
247+
for (int i = 0; i < 5; i++) {
248+
GenericRowData key = GenericRowData.of(i);
249+
GenericRowData value = GenericRowData.of(i, StringData.fromString("small" + i));
250+
buffer.addElement(key, 1000L, value);
251+
}
252+
253+
String largeString = createLargeString(4 * 1024); // 4MB > 2MB buffer
254+
GenericRowData oversizedKey = GenericRowData.of(999);
255+
GenericRowData oversizedValue =
256+
GenericRowData.of(999, StringData.fromString(largeString));
257+
258+
assertThatThrownBy(() -> buffer.addElement(oversizedKey, 1000L, oversizedValue))
259+
.isInstanceOf(EOFException.class);
260+
261+
assertThat(flushedKeyIds).containsExactly(0, 1, 2, 3, 4);
262+
}
263+
}
264+
}

0 commit comments

Comments
 (0)