From 491251a66530bd1ba800e5279a000ca39acf9bc4 Mon Sep 17 00:00:00 2001 From: guohao1 Date: Tue, 15 Feb 2022 13:33:55 +0800 Subject: [PATCH 1/2] RATIS-1524. Optional DataStreamManagement#startTransaction configuration --- .../ratis/netty/server/DataStreamManagement.java | 9 +++++++++ .../apache/ratis/server/RaftServerConfigKeys.java | 12 ++++++++++++ 2 files changed, 21 insertions(+) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 37cf90e541..37eaac5763 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -213,6 +213,8 @@ StreamInfo remove(ClientInvocationId key) { private final Executor requestExecutor; private final Executor writeExecutor; + private final boolean startTransactionEnable; + DataStreamManagement(RaftServer server) { this.server = server; this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass()); @@ -225,6 +227,7 @@ StreamInfo remove(ClientInvocationId key) { this.writeExecutor = ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool, RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties), name + "-write-"); + this.startTransactionEnable = RaftServerConfigKeys.DataStream.startTransactionEnable(properties); } private CompletableFuture computeDataStreamIfAbsent(RaftClientRequest request) throws IOException { @@ -327,6 +330,12 @@ static void sendReply(List> remoteWrites, private CompletableFuture startTransaction(StreamInfo info, DataStreamRequestByteBuf request, long bytesWritten, ChannelHandlerContext ctx) { try { + if (!startTransactionEnable) { + RaftClientReply reply = RaftClientReply.newBuilder().setSuccess(true).build(); + ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, bytesWritten, info.getCommitInfos())); + return null; + } + AsyncRpcApi asyncRpcApi = (AsyncRpcApi) (server.getDivision(info.getRequest() .getRaftGroupId()) .getRaftClient() diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 520620d707..b0c36e0a0a 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -567,6 +567,18 @@ static void setAsyncWriteThreadPoolSize(RaftProperties properties, int port) { setInt(properties::setInt, ASYNC_WRITE_THREAD_POOL_SIZE_KEY, port); } + String START_TRANSACTION_ENABLE = PREFIX + ".start-transaction.enable"; + boolean START_TRANSACTION_ENABLE_DEFAULT = true; + + static boolean startTransactionEnable(RaftProperties properties) { + return getBoolean(properties::getBoolean, START_TRANSACTION_ENABLE, + START_TRANSACTION_ENABLE_DEFAULT, getDefaultLog()); + } + + static void setStartTransactionEnable(RaftProperties properties, boolean enable) { + setBoolean(properties::setBoolean, START_TRANSACTION_ENABLE, enable); + } + String CLIENT_POOL_SIZE_KEY = PREFIX + ".client.pool.size"; int CLIENT_POOL_SIZE_DEFAULT = 10; From 5483b7603ae07c3244c151a9c42d39a430356e1a Mon Sep 17 00:00:00 2001 From: guohao1 Date: Tue, 15 Feb 2022 15:57:04 +0800 Subject: [PATCH 2/2] config --- .../ratis/netty/server/DataStreamManagement.java | 2 +- .../apache/ratis/server/RaftServerConfigKeys.java | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 37eaac5763..8353a88de2 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -227,7 +227,7 @@ StreamInfo remove(ClientInvocationId key) { this.writeExecutor = ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool, RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties), name + "-write-"); - this.startTransactionEnable = RaftServerConfigKeys.DataStream.startTransactionEnable(properties); + this.startTransactionEnable = RaftServerConfigKeys.DataStream.raftLogNeed(properties); } private CompletableFuture computeDataStreamIfAbsent(RaftClientRequest request) throws IOException { diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index b0c36e0a0a..d9fe7652e6 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -567,16 +567,16 @@ static void setAsyncWriteThreadPoolSize(RaftProperties properties, int port) { setInt(properties::setInt, ASYNC_WRITE_THREAD_POOL_SIZE_KEY, port); } - String START_TRANSACTION_ENABLE = PREFIX + ".start-transaction.enable"; - boolean START_TRANSACTION_ENABLE_DEFAULT = true; + String RAFT_LOG_NEED_KEY = PREFIX + ".raft-log.need"; + boolean RAFT_LOG_NEED_DEFAULT = true; - static boolean startTransactionEnable(RaftProperties properties) { - return getBoolean(properties::getBoolean, START_TRANSACTION_ENABLE, - START_TRANSACTION_ENABLE_DEFAULT, getDefaultLog()); + static boolean raftLogNeed(RaftProperties properties) { + return getBoolean(properties::getBoolean, RAFT_LOG_NEED_KEY, + RAFT_LOG_NEED_DEFAULT, getDefaultLog()); } - static void setStartTransactionEnable(RaftProperties properties, boolean enable) { - setBoolean(properties::setBoolean, START_TRANSACTION_ENABLE, enable); + static void setRaftLogNeed(RaftProperties properties, boolean enable) { + setBoolean(properties::setBoolean, RAFT_LOG_NEED_KEY, enable); } String CLIENT_POOL_SIZE_KEY = PREFIX + ".client.pool.size";