Skip to content
Binary file added .DS_Store
Binary file not shown.
27 changes: 19 additions & 8 deletions api/src/main/java/io/minio/MinioAsyncClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.regex.Matcher;
import okhttp3.HttpUrl;
import okhttp3.MultipartBody;
Expand Down Expand Up @@ -149,7 +151,8 @@ private MinioAsyncClient(
String region,
Provider provider,
OkHttpClient httpClient,
boolean closeHttpClient) {
boolean closeHttpClient,
ExecutorService executorService) {
super(
baseUrl,
awsS3Prefix,
Expand All @@ -159,7 +162,8 @@ private MinioAsyncClient(
region,
provider,
httpClient,
closeHttpClient);
closeHttpClient,
executorService);
}

protected MinioAsyncClient(MinioAsyncClient client) {
Expand Down Expand Up @@ -462,7 +466,7 @@ public CompletableFuture<ObjectWriteResponse> copyObject(CopyObjectArgs args)
args.validateSse(this.baseUrl);

return CompletableFuture.supplyAsync(
() -> args.source().offset() != null && args.source().length() != null)
() -> args.source().offset() != null && args.source().length() != null, executorService)
.thenCompose(
condition -> {
if (condition) {
Expand Down Expand Up @@ -677,7 +681,7 @@ public CompletableFuture<ObjectWriteResponse> composeObject(ComposeObjectArgs ar
Multimap<String, String> headers = newMultimap(args.extraHeaders());
headers.putAll(args.genHeaders());
return headers;
})
}, executorService)
.thenCompose(
headers -> {
try {
Expand Down Expand Up @@ -715,7 +719,7 @@ public CompletableFuture<ObjectWriteResponse> composeObject(ComposeObjectArgs ar
CompletableFuture.supplyAsync(
() -> {
return new Part[partCount[0]];
});
}, executorService);
for (ComposeSource src : sources) {
long size = 0;
try {
Expand Down Expand Up @@ -3491,7 +3495,7 @@ public CompletableFuture<ObjectWriteResponse> uploadSnowballObjects(
}
}
return baos;
})
}, executorService)
.thenCompose(
baos -> {
Multimap<String, String> headers = newMultimap(args.extraHeaders());
Expand Down Expand Up @@ -3633,7 +3637,7 @@ public CompletableFuture<PutObjectFanOutResponse> putObjectFanOut(PutObjectFanOu
| XmlParserException e) {
throw new CompletionException(e);
}
})
}, executorService)
.thenCompose(
body -> {
try {
Expand Down Expand Up @@ -3721,6 +3725,7 @@ public static final class Builder {
private Provider provider;
private OkHttpClient httpClient;
private boolean closeHttpClient;
private ExecutorService executorService = ForkJoinPool.commonPool();

private void setAwsInfo(String host, boolean https) {
this.awsS3Prefix = null;
Expand Down Expand Up @@ -3834,6 +3839,11 @@ public Builder httpClient(OkHttpClient httpClient, boolean close) {
return this;
}

public Builder executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}

public MinioAsyncClient build() {
HttpUtils.validateNotNull(this.baseUrl, "endpoint");

Expand Down Expand Up @@ -3861,7 +3871,8 @@ public MinioAsyncClient build() {
region,
provider,
httpClient,
closeHttpClient);
closeHttpClient,
executorService);
}
}
}
31 changes: 27 additions & 4 deletions api/src/main/java/io/minio/S3Base.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -147,6 +149,7 @@ public abstract class S3Base implements AutoCloseable {
protected Provider provider;
protected OkHttpClient httpClient;
protected boolean closeHttpClient;
protected final ExecutorService executorService;

/** @deprecated This method is no longer supported. */
@Deprecated
Expand All @@ -171,6 +174,19 @@ protected S3Base(
false);
}

protected S3Base(
HttpUrl baseUrl,
String awsS3Prefix,
String awsDomainSuffix,
boolean awsDualstack,
boolean useVirtualStyle,
String region,
Provider provider,
OkHttpClient httpClient,
boolean closeHttpClient) {
this(baseUrl, awsS3Prefix, awsDomainSuffix, awsDualstack, useVirtualStyle, region, provider, httpClient, closeHttpClient, ForkJoinPool.commonPool());
}

protected S3Base(
HttpUrl baseUrl,
String awsS3Prefix,
Expand All @@ -180,7 +196,8 @@ protected S3Base(
String region,
Provider provider,
OkHttpClient httpClient,
boolean closeHttpClient) {
boolean closeHttpClient,
ExecutorService executorService) {
this.baseUrl = baseUrl;
this.awsS3Prefix = awsS3Prefix;
this.awsDomainSuffix = awsDomainSuffix;
Expand All @@ -190,6 +207,7 @@ protected S3Base(
this.provider = provider;
this.httpClient = httpClient;
this.closeHttpClient = closeHttpClient;
this.executorService = executorService;
}

/** @deprecated This method is no longer supported. */
Expand Down Expand Up @@ -219,6 +237,7 @@ protected S3Base(
this.provider = provider;
this.httpClient = httpClient;
this.closeHttpClient = false;
this.executorService = ForkJoinPool.commonPool();
}

protected S3Base(S3Base client) {
Expand All @@ -231,6 +250,7 @@ protected S3Base(S3Base client) {
this.provider = client.provider;
this.httpClient = client.httpClient;
this.closeHttpClient = client.closeHttpClient;
this.executorService = client.executorService;
}

/** Check whether argument is valid or not. */
Expand Down Expand Up @@ -1204,7 +1224,8 @@ protected CompletableFuture<Integer> calculatePartCountAsync(List<ComposeSource>
long[] objectSize = {0};
int index = 0;

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 0);
CompletableFuture<Integer> completableFuture =
CompletableFuture.supplyAsync(() -> 0, executorService);
for (ComposeSource src : sources) {
index++;
final int i = index;
Expand Down Expand Up @@ -2947,7 +2968,8 @@ private CompletableFuture<ObjectWriteResponse> putMultipartObjectAsync(
throw new CompletionException(throwable);
}
return response;
});
},
executorService);
}

/**
Expand Down Expand Up @@ -2993,7 +3015,8 @@ protected CompletableFuture<ObjectWriteResponse> putObjectAsync(
} catch (NoSuchAlgorithmException | IOException e) {
throw new CompletionException(e);
}
})
},
executorService)
.thenCompose(
partSource -> {
try {
Expand Down