Skip to content

Commit 118bea9

Browse files
authored
[Improve] add buffer-map read-write-lock in batch mode (#555)
dorisBatchStreamLoad.bufferMap get/remove operation add read-write lock.
1 parent efe53f3 commit 118bea9

File tree

1 file changed

+19
-3
lines changed

1 file changed

+19
-3
lines changed

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@
6363
import java.util.concurrent.atomic.AtomicReference;
6464
import java.util.concurrent.locks.Condition;
6565
import java.util.concurrent.locks.Lock;
66+
import java.util.concurrent.locks.ReadWriteLock;
6667
import java.util.concurrent.locks.ReentrantLock;
68+
import java.util.concurrent.locks.ReentrantReadWriteLock;
6769

6870
import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
6971
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
@@ -111,6 +113,7 @@ public class DorisBatchStreamLoad implements Serializable {
111113
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
112114
private final Lock lock = new ReentrantLock();
113115
private final Condition block = lock.newCondition();
116+
private final Map<String, ReadWriteLock> bufferMapLock = new ConcurrentHashMap<>();
114117

115118
public DorisBatchStreamLoad(
116119
DorisOptions dorisOptions,
@@ -181,7 +184,7 @@ public DorisBatchStreamLoad(
181184
public void writeRecord(String database, String table, byte[] record) {
182185
checkFlushException();
183186
String bufferKey = getTableIdentifier(database, table);
184-
187+
getLock(bufferKey).readLock().lock();
185188
BatchRecordBuffer buffer =
186189
bufferMap.computeIfAbsent(
187190
bufferKey,
@@ -194,6 +197,7 @@ public void writeRecord(String database, String table, byte[] record) {
194197

195198
int bytes = buffer.insert(record);
196199
currentCacheBytes.addAndGet(bytes);
200+
getLock(bufferKey).readLock().unlock();
197201
if (currentCacheBytes.get() > maxBlockedBytes) {
198202
lock.lock();
199203
try {
@@ -283,11 +287,19 @@ private synchronized boolean flush(String bufferKey, boolean waitUtilDone) {
283287
}
284288

285289
private synchronized void flushBuffer(String bufferKey) {
286-
BatchRecordBuffer buffer = bufferMap.get(bufferKey);
290+
BatchRecordBuffer buffer;
291+
try {
292+
getLock(bufferKey).writeLock().lock();
293+
buffer = bufferMap.remove(bufferKey);
294+
} finally {
295+
getLock(bufferKey).writeLock().unlock();
296+
}
297+
if (buffer == null) {
298+
return;
299+
}
287300
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
288301
LOG.debug("flush buffer for key {} with label {}", bufferKey, buffer.getLabelName());
289302
putRecordToFlushQueue(buffer);
290-
bufferMap.remove(bufferKey);
291303
}
292304

293305
private void putRecordToFlushQueue(BatchRecordBuffer buffer) {
@@ -374,6 +386,10 @@ private boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer buffer) {
374386
return true;
375387
}
376388

389+
private ReadWriteLock getLock(String bufferKey) {
390+
return bufferMapLock.computeIfAbsent(bufferKey, k -> new ReentrantReadWriteLock());
391+
}
392+
377393
class LoadAsyncExecutor implements Runnable {
378394

379395
private int flushQueueSize;

0 commit comments

Comments
 (0)