Skip to content

Commit ab9412d

Browse files
authored
[Fix] improve batch mode when streamload failed (#560)
1 parent 118bea9 commit ab9412d

File tree

4 files changed

+36
-29
lines changed

4 files changed

+36
-29
lines changed

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ public DorisBatchStreamLoad(
184184
public void writeRecord(String database, String table, byte[] record) {
185185
checkFlushException();
186186
String bufferKey = getTableIdentifier(database, table);
187+
187188
getLock(bufferKey).readLock().lock();
188189
BatchRecordBuffer buffer =
189190
bufferMap.computeIfAbsent(
@@ -198,6 +199,7 @@ public void writeRecord(String database, String table, byte[] record) {
198199
int bytes = buffer.insert(record);
199200
currentCacheBytes.addAndGet(bytes);
200201
getLock(bufferKey).readLock().unlock();
202+
201203
if (currentCacheBytes.get() > maxBlockedBytes) {
202204
lock.lock();
203205
try {
@@ -258,8 +260,9 @@ private synchronized boolean doFlush(
258260
}
259261

260262
private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
261-
if (bufferMap.isEmpty()) {
263+
if (!waitUtilDone && bufferMap.isEmpty()) {
262264
// bufferMap may have been flushed by other threads
265+
LOG.info("bufferMap is empty, no need to flush {}", bufferKey);
263266
return false;
264267
}
265268
if (null == bufferKey) {
@@ -295,6 +298,7 @@ private synchronized void flushBuffer(String bufferKey) {
295298
getLock(bufferKey).writeLock().unlock();
296299
}
297300
if (buffer == null) {
301+
LOG.info("buffer key is not exist {}, skipped", bufferKey);
298302
return;
299303
}
300304
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
@@ -312,6 +316,9 @@ private void putRecordToFlushQueue(BatchRecordBuffer buffer) {
312316
} catch (InterruptedException e) {
313317
throw new RuntimeException("Failed to put record buffer to flush queue");
314318
}
319+
// When the load thread reports an error, the flushQueue will be cleared,
320+
// and need to force a check for the exception.
321+
checkFlushException();
315322
}
316323

317324
private void checkFlushException() {
@@ -321,7 +328,9 @@ private void checkFlushException() {
321328
}
322329

323330
private void waitAsyncLoadFinish() {
324-
for (int i = 0; i < executionOptions.getFlushQueueSize() + 1; i++) {
331+
// Because the queue will have a drainTo operation, it needs to be multiplied by 2
332+
for (int i = 0; i < executionOptions.getFlushQueueSize() * 2 + 1; i++) {
333+
// eof buffer
325334
BatchRecordBuffer empty = new BatchRecordBuffer();
326335
putRecordToFlushQueue(empty);
327336
}
@@ -335,8 +344,6 @@ public void close() {
335344
// close async executor
336345
this.loadExecutorService.shutdown();
337346
this.started.set(false);
338-
// clear buffer
339-
this.flushQueue.clear();
340347
}
341348

342349
@VisibleForTesting
@@ -407,10 +414,14 @@ public void run() {
407414
recordList.clear();
408415
try {
409416
BatchRecordBuffer buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS);
410-
if (buffer == null || buffer.getLabelName() == null) {
411-
// label is empty and does not need to load. It is the flag of waitUtilDone
417+
if (buffer == null) {
412418
continue;
413419
}
420+
if (buffer.getLabelName() == null) {
421+
// When the label is empty, it is the eof buffer for checkpoint flush.
422+
continue;
423+
}
424+
414425
recordList.add(buffer);
415426
boolean merge = false;
416427
if (!flushQueue.isEmpty()) {
@@ -424,6 +435,7 @@ public void run() {
424435
if (!merge) {
425436
for (BatchRecordBuffer bf : recordList) {
426437
if (bf == null || bf.getLabelName() == null) {
438+
// When the label is empty, it's eof buffer for checkpointFlush.
427439
continue;
428440
}
429441
load(bf.getLabelName(), bf);

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.doris.flink.sink.batch;
1919

2020
import org.apache.flink.api.connector.sink2.Sink;
21+
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
2122
import org.apache.flink.util.Preconditions;
2223
import org.apache.flink.util.StringUtils;
2324
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -67,6 +68,12 @@ public DorisBatchWriter(
6768
DorisOptions dorisOptions,
6869
DorisReadOptions dorisReadOptions,
6970
DorisExecutionOptions executionOptions) {
71+
72+
long restoreCheckpointId =
73+
initContext
74+
.getRestoredCheckpointId()
75+
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
76+
LOG.info("restore from checkpointId {}", restoreCheckpointId);
7077
if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
7178
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
7279
Preconditions.checkState(
@@ -75,6 +82,7 @@ public DorisBatchWriter(
7582
this.database = tableInfo[0];
7683
this.table = tableInfo[1];
7784
}
85+
7886
LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
7987
this.subtaskId = initContext.getSubtaskId();
8088
this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
@@ -130,12 +138,13 @@ public void flush(boolean flush) throws IOException, InterruptedException {
130138

131139
@Override
132140
public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException {
133-
// nothing to commit
141+
checkFlushException();
134142
return Collections.emptyList();
135143
}
136144

137145
@Override
138146
public List<DorisWriterState> snapshotState(long checkpointId) throws IOException {
147+
checkFlushException();
139148
return new ArrayList<>();
140149
}
141150

flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.time.Duration;
4646
import java.util.ArrayList;
4747
import java.util.List;
48-
import java.util.concurrent.atomic.AtomicReference;
4948

5049
import static org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
5150
import static org.mockito.ArgumentMatchers.any;
@@ -124,17 +123,10 @@ public void testLoadFail() throws Exception {
124123
when(httpClientBuilder.build()).thenReturn(httpClient);
125124
when(httpClient.execute(any())).thenReturn(response);
126125
loader.writeRecord("db", "tbl", "1,data".getBytes());
127-
loader.checkpointFlush();
128126

129-
TestUtil.waitUntilCondition(
130-
() -> !loader.isLoadThreadAlive(),
131-
Deadline.fromNow(Duration.ofSeconds(20)),
132-
100L,
133-
"testLoadFail wait loader exit failed." + loader.isLoadThreadAlive());
134-
AtomicReference<Throwable> exception = loader.getException();
135-
Assert.assertTrue(exception.get() instanceof Exception);
136-
Assert.assertTrue(exception.get().getMessage().contains("stream load error"));
137-
LOG.info("testLoadFail end");
127+
thrown.expect(Exception.class);
128+
thrown.expectMessage("stream load error");
129+
loader.checkpointFlush();
138130
}
139131

140132
@Test
@@ -175,17 +167,10 @@ public void testLoadError() throws Exception {
175167
when(httpClientBuilder.build()).thenReturn(httpClient);
176168
when(httpClient.execute(any())).thenReturn(response);
177169
loader.writeRecord("db", "tbl", "1,data".getBytes());
178-
loader.checkpointFlush();
179170

180-
TestUtil.waitUntilCondition(
181-
() -> !loader.isLoadThreadAlive(),
182-
Deadline.fromNow(Duration.ofSeconds(20)),
183-
100L,
184-
"testLoadError wait loader exit failed." + loader.isLoadThreadAlive());
185-
AtomicReference<Throwable> exception = loader.getException();
186-
Assert.assertTrue(exception.get() instanceof Exception);
187-
Assert.assertTrue(exception.get().getMessage().contains("stream load error"));
188-
LOG.info("testLoadError end");
171+
thrown.expect(Exception.class);
172+
thrown.expectMessage("stream load error");
173+
loader.checkpointFlush();
189174
}
190175

191176
@After

flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ public void testInit() {
5757
.build();
5858
thrown.expect(IllegalStateException.class);
5959
thrown.expectMessage("tableIdentifier input error");
60-
DorisBatchWriter batchWriter = new DorisBatchWriter(null, null, options, null, null);
60+
Sink.InitContext initContext = mock(Sink.InitContext.class);
61+
DorisBatchWriter batchWriter = new DorisBatchWriter(initContext, null, options, null, null);
6162
}
6263

6364
@Test

0 commit comments

Comments
 (0)