Skip to content

Using with RxJava leads to corrupt data #71

@st-h

Description

@st-h

I've been refactoring some code to RxJava and was suddenly having the weird issue of downloaded files being corrupt. After some investigation, I found that the issue disappears when I remove reactiverse:aws-sdk.

I've created a test, which reliably reproduces the issue:

dependencies:

vertxVersion = '4.4.3'
implementation 'software.amazon.awssdk:s3:2.17.167'
implementation 'com.github.akarnokd:rxjava2-jdk8-interop:0.3.7'

implementation 'io.reactiverse:aws-sdk:1.2.2'
also tested with:
implementation 'io.reactiverse:aws-sdk:1.0.0'
implementation 'io.reactiverse:aws-sdk:1.1.0'

and according to compatibility matrix:
vertxVersion = '4.4.0'
implementation 'software.amazon.awssdk:s3:2.20.2'
implementation 'io.reactiverse:aws-sdk:1.2.0'

The Spock test uploads and downloads a file to s3 and the compares it to the original. The result of the comparison is written to the response body which is then evaluated.

import hu.akarnokd.rxjava2.interop.SingleInterop
import io.reactiverse.awssdk.VertxSdkClient
import io.reactivex.Single
import io.vertx.core.Vertx
import io.vertx.core.http.HttpServer
import io.vertx.core.http.HttpServerOptions
import io.vertx.ext.web.Router
import io.vertx.ext.web.client.WebClient
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.core.retry.RetryPolicy
import software.amazon.awssdk.core.retry.conditions.RetryCondition
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3AsyncClient
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder
import software.amazon.awssdk.services.s3.model.GetObjectRequest
import software.amazon.awssdk.services.s3.model.GetObjectResponse
import software.amazon.awssdk.services.s3.model.PutObjectRequest
import software.amazon.awssdk.services.s3.model.PutObjectResponse
import spock.lang.Shared
import spock.lang.Specification
import io.vertx.ext.web.client.HttpResponse
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths

class S3Test extends Specification {

    @Shared S3AsyncClient client
    @Shared Vertx vertx

    def setupSpec() {
        vertx = Vertx.vertx()
        Router router = Router.router(vertx)

        router.get().handler({ context -> {
            S3AsyncClientBuilder builder = S3AsyncClient.builder();
            builder.overrideConfiguration(ClientOverrideConfiguration.builder()
                    .retryPolicy(RetryPolicy.builder().numRetries(6)
                            .retryCondition(RetryCondition.none()).build()
                    ).build())
            builder.endpointOverride(URI.create("http://127.0.0.1:8002"))
            builder.credentialsProvider(StaticCredentialsProvider
                    .create(AwsBasicCredentials.create("local-identity", "local-credential")))

            // This line breaks the test
            client = VertxSdkClient.withVertx(builder.region(Region.EU_CENTRAL_1), vertx.getOrCreateContext()).build()
            // Uncomment this line and the test runs fine
//                client = builder.region(Region.EU_CENTRAL_1).build()

            Files.deleteIfExists(Paths.get("/tmp/test.mp3"))
            this.upload("1", Paths.get("src/test/resources/img/test.JPG"), "mp3", "audio/mp3")
                    .flatMap({ PutObjectResponse resp ->
                        this.download("1", Paths.get("/tmp/test.mp3"))
                    }).subscribe({ GetObjectResponse resp1 ->
                            byte[] file1 = Files.readAllBytes(Paths.get("src/test/resources/img/test.JPG"));
                            byte[] file2 = Files.readAllBytes(Paths.get("/tmp/test.mp3"));
                            context.response().end(Arrays.equals(file1, file2).toString())
                    }, { t -> t.printStackTrace()})
        }})
        HttpServer server = vertx.createHttpServer(new HttpServerOptions()
                .setMaxFormAttributeSize(128 * 1024)
        ).requestHandler(router)

        server.listen(8088).toCompletionStage().toCompletableFuture().get()
    }

    def 'upload download'() {
        when:
        HttpResponse response = WebClient.create(vertx).get(8088, "localhost", "/").send().toCompletionStage().toCompletableFuture().get()

        then:
        response.bodyAsString() == "true"
    }

    public Single<GetObjectResponse> download(String id, Path path) {
        return SingleInterop.fromFuture(
                client.getObject(GetObjectRequest.builder()
                        .bucket("test")
                        .key(id).build(), path)
        )
    }

    public Single<PutObjectResponse> upload(String id, Path path, String format, String type) {
        return SingleInterop.fromFuture(
                client.putObject(PutObjectRequest.builder()
                        .bucket("test")
                        .key(id)
                        .metadata(Map.of("format", format, "type", type))
                        .contentType(type)
                        .build(), path)
        )
    }
}

When I replace this line:

client = VertxSdkClient.withVertx(builder.region(Region.EU_CENTRAL_1), vertx.getOrCreateContext()).build()`

with:

client = builder.region(Region.EU_CENTRAL_1).build()

the test succeeds repeatedly.

Another thing that puzzles me: If I do not use SingleInterop.fromFuture, but just Single.fromFuture, the s3 function blocks infinitely. (confirmed by adding logging statements. The log before the upload method gets called, but the flatMap never is resolved).

io.vertx.core.impl.BlockedThreadChecker - Thread Thread[#41,vert.x-eventloop-thread-0,5,main] has been blocked for 91557 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
	at jdk.internal.misc.Unsafe.park(Native Method) ~[?:?]
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:221) ~[?:?]
	at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1864) ~[?:?]
	at java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3745) ~[?:?]
	at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3690) ~[?:?]
	at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1898) ~[?:?]
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2072) ~[?:?]
	at io.reactivex.internal.operators.flowable.FlowableFromFuture.subscribeActual(FlowableFromFuture.java:42) ~[rxjava-2.2.21.jar:?]
	at io.reactivex.Flowable.subscribe(Flowable.java:14935) ~[rxjava-2.2.21.jar:?]
	at io.reactivex.internal.operators.flowable.FlowableSingleSingle.subscribeActual(FlowableSingleSingle.java:39) ~[rxjava-2.2.21.jar:?]
	at io.reactivex.Single.subscribe(Single.java:3666) ~[rxjava-2.2.21.jar:?]
	at io.reactivex.internal.operators.single.SingleFlatMap.subscribeActual(SingleFlatMap.java:36) ~[rxjava-2.2.21.jar:?]
	at io.reactivex.Single.subscribe(Single.java:3666) ~[rxjava-2.2.21.jar:?]
	at io.reactivex.Single.subscribe(Single.java:3652) ~[rxjava-2.2.21.jar:?]
	at org.codehaus.groovy.vmplugin.v8.IndyInterface.fromCache(IndyInterface.java:321) ~[groovy-4.0.12.jar:4.0.12]
	at com.test.S3Test$_setupSpec_closure1.doCall(S3Test.groovy:55) ~[test/:?]
	at jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104) ~[?:?]
	at java.lang.reflect.Method.invoke(Method.java:578) ~[?:?]
	at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343) ~[groovy-4.0.12.jar:4.0.12]
	at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328) ~[groovy-4.0.12.jar:4.0.12]
	at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:279) ~[groovy-4.0.12.jar:4.0.12]
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1006) ~[groovy-4.0.12.jar:4.0.12]
	at groovy.lang.Closure.call(Closure.java:418) ~[groovy-4.0.12.jar:4.0.12]
	at org.codehaus.groovy.runtime.ConvertedClosure.invokeCustom(ConvertedClosure.java:50) ~[groovy-4.0.12.jar:4.0.12]
	at org.codehaus.groovy.runtime.ConversionHandler.invoke(ConversionHandler.java:113) ~[groovy-4.0.12.jar:4.0.12]
	at jdk.proxy3.$Proxy39.handle(Unknown Source) ~[?:?]
	at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284) ~[vertx-web-4.4.3.jar:4.4.3]
	at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:177) ~[vertx-web-4.4.3.jar:4.4.3]
	at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:141) ~[vertx-web-4.4.3.jar:4.4.3]
	at io.vertx.ext.web.impl.RouterImpl.handle(RouterImpl.java:68) ~[vertx-web-4.4.3.jar:4.4.3]
	at io.vertx.ext.web.impl.RouterImpl.handle(RouterImpl.java:37) ~[vertx-web-4.4.3.jar:4.4.3]
	at io.vertx.core.http.impl.Http1xServerRequestHandler.handle(Http1xServerRequestHandler.java:67) ~[vertx-core-4.4.5.jar:4.4.5]
	at io.vertx.core.http.impl.Http1xServerRequestHandler.handle(Http1xServerRequestHandler.java:30) ~[vertx-core-4.4.5.jar:4.4.5]
	at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55) ~[vertx-core-4.4.5.jar:4.4.5]
	at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:179) ~[vertx-core-4.4.5.jar:4.4.5]
	at io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:174) ~[vertx-core-4.4.5.jar:4.4.5]
	at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:159) ~[vertx-core-4.4.5.jar:4.4.5]
	at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153) ~[vertx-core-4.4.5.jar:4.4.5]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.onHttpRequestChannelRead(WebSocketServerExtensionHandler.java:160) ~[netty-codec-http-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.channelRead(WebSocketServerExtensionHandler.java:83) ~[netty-codec-http-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.vertx.core.http.impl.Http1xUpgradeToH2CHandler.channelRead(Http1xUpgradeToH2CHandler.java:124) ~[vertx-core-4.4.5.jar:4.4.5]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.vertx.core.http.impl.Http1xOrH2CHandler.end(Http1xOrH2CHandler.java:61) ~[vertx-core-4.4.5.jar:4.4.5]
	at io.vertx.core.http.impl.Http1xOrH2CHandler.channelRead(Http1xOrH2CHandler.java:38) ~[vertx-core-4.4.5.jar:4.4.5]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.97.Final.jar:4.1.97.Final]
	at java.lang.Thread.run(Thread.java:1589) ~[?:?]

However, if I do not use reactivers:aws-sdk, I can just use Single.fromFuture and everything works fine. The download method then looks like this:

    public Single<GetObjectResponse> download(String id, Path path) {
        return Single.fromFuture(
                client.getObject(GetObjectRequest.builder()
                        .bucket("test")
                        .key(id).build(), path)
        )
    }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions