-
Notifications
You must be signed in to change notification settings - Fork 14
Description
I've been refactoring some code to RxJava and was suddenly having the weird issue of downloaded files being corrupt. After some investigation, I found that the issue disappears when I remove reactiverse:aws-sdk.
I've created a test, which reliably reproduces the issue:
dependencies:
vertxVersion = '4.4.3'
implementation 'software.amazon.awssdk:s3:2.17.167'
implementation 'com.github.akarnokd:rxjava2-jdk8-interop:0.3.7'
implementation 'io.reactiverse:aws-sdk:1.2.2'
also tested with:
implementation 'io.reactiverse:aws-sdk:1.0.0'
implementation 'io.reactiverse:aws-sdk:1.1.0'
and according to compatibility matrix:
vertxVersion = '4.4.0'
implementation 'software.amazon.awssdk:s3:2.20.2'
implementation 'io.reactiverse:aws-sdk:1.2.0'
The Spock test uploads and downloads a file to s3 and the compares it to the original. The result of the comparison is written to the response body which is then evaluated.
import hu.akarnokd.rxjava2.interop.SingleInterop
import io.reactiverse.awssdk.VertxSdkClient
import io.reactivex.Single
import io.vertx.core.Vertx
import io.vertx.core.http.HttpServer
import io.vertx.core.http.HttpServerOptions
import io.vertx.ext.web.Router
import io.vertx.ext.web.client.WebClient
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.core.retry.RetryPolicy
import software.amazon.awssdk.core.retry.conditions.RetryCondition
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder
import software.amazon.awssdk.services.s3.model.GetObjectRequest
import software.amazon.awssdk.services.s3.model.GetObjectResponse
import software.amazon.awssdk.services.s3.model.PutObjectRequest
import software.amazon.awssdk.services.s3.model.PutObjectResponse
import spock.lang.Shared
import spock.lang.Specification
import io.vertx.ext.web.client.HttpResponse
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
class S3Test extends Specification {
@Shared S3AsyncClient client
@Shared Vertx vertx
def setupSpec() {
vertx = Vertx.vertx()
Router router = Router.router(vertx)
router.get().handler({ context -> {
S3AsyncClientBuilder builder = S3AsyncClient.builder();
builder.overrideConfiguration(ClientOverrideConfiguration.builder()
.retryPolicy(RetryPolicy.builder().numRetries(6)
.retryCondition(RetryCondition.none()).build()
).build())
builder.endpointOverride(URI.create("http://127.0.0.1:8002"))
builder.credentialsProvider(StaticCredentialsProvider
.create(AwsBasicCredentials.create("local-identity", "local-credential")))
// This line breaks the test
client = VertxSdkClient.withVertx(builder.region(Region.EU_CENTRAL_1), vertx.getOrCreateContext()).build()
// Uncomment this line and the test runs fine
// client = builder.region(Region.EU_CENTRAL_1).build()
Files.deleteIfExists(Paths.get("/tmp/test.mp3"))
this.upload("1", Paths.get("src/test/resources/img/test.JPG"), "mp3", "audio/mp3")
.flatMap({ PutObjectResponse resp ->
this.download("1", Paths.get("/tmp/test.mp3"))
}).subscribe({ GetObjectResponse resp1 ->
byte[] file1 = Files.readAllBytes(Paths.get("src/test/resources/img/test.JPG"));
byte[] file2 = Files.readAllBytes(Paths.get("/tmp/test.mp3"));
context.response().end(Arrays.equals(file1, file2).toString())
}, { t -> t.printStackTrace()})
}})
HttpServer server = vertx.createHttpServer(new HttpServerOptions()
.setMaxFormAttributeSize(128 * 1024)
).requestHandler(router)
server.listen(8088).toCompletionStage().toCompletableFuture().get()
}
def 'upload download'() {
when:
HttpResponse response = WebClient.create(vertx).get(8088, "localhost", "/").send().toCompletionStage().toCompletableFuture().get()
then:
response.bodyAsString() == "true"
}
public Single<GetObjectResponse> download(String id, Path path) {
return SingleInterop.fromFuture(
client.getObject(GetObjectRequest.builder()
.bucket("test")
.key(id).build(), path)
)
}
public Single<PutObjectResponse> upload(String id, Path path, String format, String type) {
return SingleInterop.fromFuture(
client.putObject(PutObjectRequest.builder()
.bucket("test")
.key(id)
.metadata(Map.of("format", format, "type", type))
.contentType(type)
.build(), path)
)
}
}
When I replace this line:
client = VertxSdkClient.withVertx(builder.region(Region.EU_CENTRAL_1), vertx.getOrCreateContext()).build()`
with:
client = builder.region(Region.EU_CENTRAL_1).build()
the test succeeds repeatedly.
Another thing that puzzles me: If I do not use SingleInterop.fromFuture
, but just Single.fromFuture
, the s3 function blocks infinitely. (confirmed by adding logging statements. The log before the upload method gets called, but the flatMap never is resolved).
io.vertx.core.impl.BlockedThreadChecker - Thread Thread[#41,vert.x-eventloop-thread-0,5,main] has been blocked for 91557 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
at jdk.internal.misc.Unsafe.park(Native Method) ~[?:?]
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:221) ~[?:?]
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1864) ~[?:?]
at java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3745) ~[?:?]
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3690) ~[?:?]
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898) ~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072) ~[?:?]
at io.reactivex.internal.operators.flowable.FlowableFromFuture.subscribeActual(FlowableFromFuture.java:42) ~[rxjava-2.2.21.jar:?]
at io.reactivex.Flowable.subscribe(Flowable.java:14935) ~[rxjava-2.2.21.jar:?]
at io.reactivex.internal.operators.flowable.FlowableSingleSingle.subscribeActual(FlowableSingleSingle.java:39) ~[rxjava-2.2.21.jar:?]
at io.reactivex.Single.subscribe(Single.java:3666) ~[rxjava-2.2.21.jar:?]
at io.reactivex.internal.operators.single.SingleFlatMap.subscribeActual(SingleFlatMap.java:36) ~[rxjava-2.2.21.jar:?]
at io.reactivex.Single.subscribe(Single.java:3666) ~[rxjava-2.2.21.jar:?]
at io.reactivex.Single.subscribe(Single.java:3652) ~[rxjava-2.2.21.jar:?]
at org.codehaus.groovy.vmplugin.v8.IndyInterface.fromCache(IndyInterface.java:321) ~[groovy-4.0.12.jar:4.0.12]
at com.test.S3Test$_setupSpec_closure1.doCall(S3Test.groovy:55) ~[test/:?]
at jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[?:?]
at java.lang.reflect.Method.invoke(Method.java:578) ~[?:?]
at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343) ~[groovy-4.0.12.jar:4.0.12]
at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328) ~[groovy-4.0.12.jar:4.0.12]
at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:279) ~[groovy-4.0.12.jar:4.0.12]
at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1006) ~[groovy-4.0.12.jar:4.0.12]
at groovy.lang.Closure.call(Closure.java:418) ~[groovy-4.0.12.jar:4.0.12]
at org.codehaus.groovy.runtime.ConvertedClosure.invokeCustom(ConvertedClosure.java:50) ~[groovy-4.0.12.jar:4.0.12]
at org.codehaus.groovy.runtime.ConversionHandler.invoke(ConversionHandler.java:113) ~[groovy-4.0.12.jar:4.0.12]
at jdk.proxy3.$Proxy39.handle(Unknown Source) ~[?:?]
at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284) ~[vertx-web-4.4.3.jar:4.4.3]
at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:177) ~[vertx-web-4.4.3.jar:4.4.3]
at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:141) ~[vertx-web-4.4.3.jar:4.4.3]
at io.vertx.ext.web.impl.RouterImpl.handle(RouterImpl.java:68) ~[vertx-web-4.4.3.jar:4.4.3]
at io.vertx.ext.web.impl.RouterImpl.handle(RouterImpl.java:37) ~[vertx-web-4.4.3.jar:4.4.3]
at io.vertx.core.http.impl.Http1xServerRequestHandler.handle(Http1xServerRequestHandler.java:67) ~[vertx-core-4.4.5.jar:4.4.5]
at io.vertx.core.http.impl.Http1xServerRequestHandler.handle(Http1xServerRequestHandler.java:30) ~[vertx-core-4.4.5.jar:4.4.5]
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55) ~[vertx-core-4.4.5.jar:4.4.5]
at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:179) ~[vertx-core-4.4.5.jar:4.4.5]
at io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:174) ~[vertx-core-4.4.5.jar:4.4.5]
at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:159) ~[vertx-core-4.4.5.jar:4.4.5]
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153) ~[vertx-core-4.4.5.jar:4.4.5]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.onHttpRequestChannelRead(WebSocketServerExtensionHandler.java:160) ~[netty-codec-http-4.1.97.Final.jar:4.1.97.Final]
at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.channelRead(WebSocketServerExtensionHandler.java:83) ~[netty-codec-http-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.vertx.core.http.impl.Http1xUpgradeToH2CHandler.channelRead(Http1xUpgradeToH2CHandler.java:124) ~[vertx-core-4.4.5.jar:4.4.5]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.97.Final.jar:4.1.97.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.vertx.core.http.impl.Http1xOrH2CHandler.end(Http1xOrH2CHandler.java:61) ~[vertx-core-4.4.5.jar:4.4.5]
at io.vertx.core.http.impl.Http1xOrH2CHandler.channelRead(Http1xOrH2CHandler.java:38) ~[vertx-core-4.4.5.jar:4.4.5]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
at java.lang.Thread.run(Thread.java:1589) ~[?:?]
However, if I do not use reactivers:aws-sdk, I can just use Single.fromFuture and everything works fine. The download method then looks like this:
public Single<GetObjectResponse> download(String id, Path path) {
return Single.fromFuture(
client.getObject(GetObjectRequest.builder()
.bucket("test")
.key(id).build(), path)
)
}