Skip to content

Commit 30e9688

Browse files
author
thomasva
committed
Add injection of custom executor service to S3Base supplyAsync calls
1 parent 8de1be7 commit 30e9688

File tree

3 files changed

+44
-11
lines changed

3 files changed

+44
-11
lines changed

.DS_Store

8 KB
Binary file not shown.

api/src/main/java/io/minio/MinioAsyncClient.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@
7676
import java.util.concurrent.CompletableFuture;
7777
import java.util.concurrent.CompletionException;
7878
import java.util.concurrent.ExecutionException;
79+
import java.util.concurrent.ExecutorService;
80+
import java.util.concurrent.ForkJoinPool;
7981
import java.util.regex.Matcher;
8082
import okhttp3.HttpUrl;
8183
import okhttp3.OkHttpClient;
@@ -140,7 +142,8 @@ private MinioAsyncClient(
140142
String region,
141143
Provider provider,
142144
OkHttpClient httpClient,
143-
boolean closeHttpClient) {
145+
boolean closeHttpClient,
146+
ExecutorService executorService) {
144147
super(
145148
baseUrl,
146149
awsS3Prefix,
@@ -150,7 +153,8 @@ private MinioAsyncClient(
150153
region,
151154
provider,
152155
httpClient,
153-
closeHttpClient);
156+
closeHttpClient,
157+
executorService);
154158
}
155159

156160
protected MinioAsyncClient(MinioAsyncClient client) {
@@ -453,7 +457,7 @@ public CompletableFuture<ObjectWriteResponse> copyObject(CopyObjectArgs args)
453457
args.validateSse(this.baseUrl);
454458

455459
return CompletableFuture.supplyAsync(
456-
() -> args.source().offset() != null && args.source().length() != null)
460+
() -> args.source().offset() != null && args.source().length() != null, executorService)
457461
.thenCompose(
458462
condition -> {
459463
if (condition) {
@@ -667,7 +671,7 @@ public CompletableFuture<ObjectWriteResponse> composeObject(ComposeObjectArgs ar
667671
Multimap<String, String> headers = newMultimap(args.extraHeaders());
668672
headers.putAll(args.genHeaders());
669673
return headers;
670-
})
674+
}, executorService)
671675
.thenCompose(
672676
headers -> {
673677
try {
@@ -705,7 +709,7 @@ public CompletableFuture<ObjectWriteResponse> composeObject(ComposeObjectArgs ar
705709
CompletableFuture.supplyAsync(
706710
() -> {
707711
return new Part[partCount[0]];
708-
});
712+
}, executorService);
709713
for (ComposeSource src : sources) {
710714
long size = 0;
711715
try {
@@ -3155,7 +3159,7 @@ public CompletableFuture<ObjectWriteResponse> uploadSnowballObjects(
31553159
}
31563160
}
31573161
return baos;
3158-
})
3162+
}, executorService)
31593163
.thenCompose(
31603164
baos -> {
31613165
Multimap<String, String> headers = newMultimap(args.extraHeaders());
@@ -3224,6 +3228,7 @@ public static final class Builder {
32243228
private Provider provider;
32253229
private OkHttpClient httpClient;
32263230
private boolean closeHttpClient;
3231+
private ExecutorService executorService = ForkJoinPool.commonPool();
32273232

32283233
private void setAwsInfo(String host, boolean https) {
32293234
this.awsS3Prefix = null;
@@ -3337,6 +3342,11 @@ public Builder httpClient(OkHttpClient httpClient, boolean close) {
33373342
return this;
33383343
}
33393344

3345+
public Builder executorService(ExecutorService executorService) {
3346+
this.executorService = executorService;
3347+
return this;
3348+
}
3349+
33403350
public MinioAsyncClient build() {
33413351
HttpUtils.validateNotNull(this.baseUrl, "endpoint");
33423352

@@ -3364,7 +3374,8 @@ public MinioAsyncClient build() {
33643374
region,
33653375
provider,
33663376
httpClient,
3367-
closeHttpClient);
3377+
closeHttpClient,
3378+
executorService());
33683379
}
33693380
}
33703381
}

api/src/main/java/io/minio/S3Base.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import java.util.concurrent.CompletionException;
8585
import java.util.concurrent.ConcurrentHashMap;
8686
import java.util.concurrent.ExecutionException;
87+
import java.util.concurrent.ExecutorService;
8788
import java.util.concurrent.TimeUnit;
8889
import java.util.logging.Logger;
8990
import java.util.stream.Collectors;
@@ -137,6 +138,7 @@ public abstract class S3Base implements AutoCloseable {
137138
protected Provider provider;
138139
protected OkHttpClient httpClient;
139140
protected boolean closeHttpClient;
141+
protected final ExecutorService executorService;
140142

141143
/** @deprecated This method is no longer supported. */
142144
@Deprecated
@@ -161,6 +163,20 @@ protected S3Base(
161163
false);
162164
}
163165

166+
protected S3Base(
167+
HttpUrl baseUrl,
168+
String awsS3Prefix,
169+
String awsDomainSuffix,
170+
boolean awsDualstack,
171+
boolean useVirtualStyle,
172+
String region,
173+
Provider provider,
174+
OkHttpClient httpClient,
175+
boolean closeHttpClient) {
176+
this(baseUrl, awsS3Prefix, awsDomainSuffix, awsDualstack, useVirtualStyle, region, provider, httpClient, closeHttpClient);
177+
this.executorService = ForkJoinPool.commonPool();
178+
}
179+
164180
protected S3Base(
165181
HttpUrl baseUrl,
166182
String awsS3Prefix,
@@ -170,7 +186,8 @@ protected S3Base(
170186
String region,
171187
Provider provider,
172188
OkHttpClient httpClient,
173-
boolean closeHttpClient) {
189+
boolean closeHttpClient,
190+
ExecutorService executorService) {
174191
this.baseUrl = baseUrl;
175192
this.awsS3Prefix = awsS3Prefix;
176193
this.awsDomainSuffix = awsDomainSuffix;
@@ -180,6 +197,7 @@ protected S3Base(
180197
this.provider = provider;
181198
this.httpClient = httpClient;
182199
this.closeHttpClient = closeHttpClient;
200+
this.executorService = executorService;
183201
}
184202

185203
/** @deprecated This method is no longer supported. */
@@ -221,6 +239,7 @@ protected S3Base(S3Base client) {
221239
this.provider = client.provider;
222240
this.httpClient = client.httpClient;
223241
this.closeHttpClient = client.closeHttpClient;
242+
this.executorService = client.executorService;
224243
}
225244

226245
/** Check whether argument is valid or not. */
@@ -1163,7 +1182,8 @@ protected CompletableFuture<Integer> calculatePartCountAsync(List<ComposeSource>
11631182
long[] objectSize = {0};
11641183
int index = 0;
11651184

1166-
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 0);
1185+
CompletableFuture<Integer> completableFuture =
1186+
CompletableFuture.supplyAsync(() -> 0, executorService);
11671187
for (ComposeSource src : sources) {
11681188
index++;
11691189
final int i = index;
@@ -2882,7 +2902,8 @@ private CompletableFuture<ObjectWriteResponse> putMultipartObjectAsync(
28822902
}
28832903
}
28842904
return response;
2885-
});
2905+
},
2906+
executorService);
28862907
}
28872908

28882909
/**
@@ -2928,7 +2949,8 @@ protected CompletableFuture<ObjectWriteResponse> putObjectAsync(
29282949
} catch (NoSuchAlgorithmException | IOException e) {
29292950
throw new CompletionException(e);
29302951
}
2931-
})
2952+
},
2953+
executorService)
29322954
.thenCompose(
29332955
partSource -> {
29342956
try {

0 commit comments

Comments
 (0)