diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 37cf90e541..8353a88de2 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -213,6 +213,8 @@ StreamInfo remove(ClientInvocationId key) { private final Executor requestExecutor; private final Executor writeExecutor; + private final boolean startTransactionEnable; + DataStreamManagement(RaftServer server) { this.server = server; this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass()); @@ -225,6 +227,7 @@ StreamInfo remove(ClientInvocationId key) { this.writeExecutor = ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool, RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties), name + "-write-"); + this.startTransactionEnable = RaftServerConfigKeys.DataStream.raftLogNeed(properties); } private CompletableFuture computeDataStreamIfAbsent(RaftClientRequest request) throws IOException { @@ -327,6 +330,12 @@ static void sendReply(List> remoteWrites, private CompletableFuture startTransaction(StreamInfo info, DataStreamRequestByteBuf request, long bytesWritten, ChannelHandlerContext ctx) { try { + if (!startTransactionEnable) { + RaftClientReply reply = RaftClientReply.newBuilder().setSuccess(true).build(); + ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, bytesWritten, info.getCommitInfos())); + return null; + } + AsyncRpcApi asyncRpcApi = (AsyncRpcApi) (server.getDivision(info.getRequest() .getRaftGroupId()) .getRaftClient() diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 520620d707..d9fe7652e6 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -567,6 +567,18 @@ static void setAsyncWriteThreadPoolSize(RaftProperties properties, int port) { setInt(properties::setInt, ASYNC_WRITE_THREAD_POOL_SIZE_KEY, port); } + String RAFT_LOG_NEED_KEY = PREFIX + ".raft-log.need"; + boolean RAFT_LOG_NEED_DEFAULT = true; + + static boolean raftLogNeed(RaftProperties properties) { + return getBoolean(properties::getBoolean, RAFT_LOG_NEED_KEY, + RAFT_LOG_NEED_DEFAULT, getDefaultLog()); + } + + static void setRaftLogNeed(RaftProperties properties, boolean enable) { + setBoolean(properties::setBoolean, RAFT_LOG_NEED_KEY, enable); + } + String CLIENT_POOL_SIZE_KEY = PREFIX + ".client.pool.size"; int CLIENT_POOL_SIZE_DEFAULT = 10;