Skip to content

Commit 008d5ac

Browse files
authored
Merge pull request #32 from rsocket/feature/latest-rsocket-java
Feature/latest rsocket java
2 parents 5948ac6 + d08cabe commit 008d5ac

File tree

9 files changed

+68
-40
lines changed

9 files changed

+68
-40
lines changed

build.gradle

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
plugins {
2-
id 'com.gradle.build-scan' version '1.16'
2+
id 'com.gradle.build-scan' version '2.1'
33
id 'com.google.osdetector' version '1.4.0'
4-
id 'com.github.sherter.google-java-format' version '0.7.1' apply false
5-
id 'com.github.johnrengelman.shadow' version '2.0.1' apply false
4+
id 'com.github.sherter.google-java-format' version '0.8' apply false
5+
id 'com.github.johnrengelman.shadow' version '4.0.2' apply false
66
id 'com.jfrog.artifactory' version '4.7.3' apply false
77
id 'com.jfrog.bintray' version '1.8.4' apply false
88
id 'me.champeau.gradle.jmh' version '0.4.7' apply false
@@ -13,7 +13,10 @@ apply from: 'artifactory.gradle'
1313
apply from: 'bintray.gradle'
1414
apply from: 'publication.gradle'
1515

16-
buildScan { licenseAgreementUrl = 'https://gradle.com/terms-of-service'; licenseAgree = 'yes' }
16+
buildScan {
17+
termsOfServiceUrl = 'https://gradle.com/terms-of-service'
18+
termsOfServiceAgree = 'yes'
19+
}
1720

1821
subprojects {
1922
apply plugin: 'idea'
@@ -30,7 +33,7 @@ subprojects {
3033
sourceCompatibility = 1.8
3134
targetCompatibility = 1.8
3235

33-
ext['reactor-bom.version'] = 'Californium-SR1'
36+
ext['reactor-bom.version'] = 'Californium-SR5'
3437
ext['rsocket.version'] = '0.11.16'
3538

3639
ext['protobuf.version'] = '3.6.1'

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
group=io.rsocket.rpc
2-
version=0.2.12
2+
version=0.2.13
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-5.2.1-bin.zip
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists

rsocket-rpc-core/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ description = 'RSocket RPC Library'
99

1010
dependencies {
1111
implementation 'com.google.protobuf:protobuf-java'
12+
implementation 'org.slf4j:slf4j-api'
1213

1314
api 'io.opentracing:opentracing-api'
1415
api 'javax.inject:javax.inject'
@@ -54,6 +55,8 @@ dependencyManagement {
5455
entry "log4j-core"
5556
entry "log4j-slf4j-impl"
5657
}
58+
59+
dependency "org.slf4j:slf4j-api:${ext['slf4j.version']}"
5760
dependency "org.slf4j:slf4j-simple:${ext['slf4j.version']}"
5861

5962
dependencySet(group: 'io.rsocket', version: ext['rsocket.version']) {

rsocket-rpc-core/src/main/java/io/rsocket/rpc/AbstractRSocketService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.rsocket.AbstractRSocket;
44
import io.rsocket.Payload;
5+
import org.reactivestreams.Publisher;
56
import reactor.core.publisher.Flux;
67

78
public abstract class AbstractRSocketService extends AbstractRSocket implements RSocketRpcService {
@@ -11,7 +12,7 @@ public String getService() {
1112
}
1213

1314
@Override
14-
public Flux<Payload> requestChannel(Payload payload, Flux<Payload> publisher) {
15+
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> publisher) {
1516
return Flux.error(new UnsupportedOperationException("Request-Channel not implemented."));
1617
}
1718

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
package io.rsocket.rpc;
22

3-
import io.rsocket.Payload;
4-
import io.rsocket.RSocket;
5-
import reactor.core.publisher.Flux;
3+
import io.rsocket.ResponderRSocket;
64

7-
public interface RSocketRpcService extends RSocket {
5+
public interface RSocketRpcService extends ResponderRSocket {
86
String getService();
9-
10-
Flux<Payload> requestChannel(Payload payload, Flux<Payload> publisher);
117
}

rsocket-rpc-core/src/main/java/io/rsocket/rpc/rsocket/RequestHandlingRSocket.java

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io.netty.util.ReferenceCountUtil;
55
import io.rsocket.AbstractRSocket;
66
import io.rsocket.Payload;
7-
import io.rsocket.internal.SwitchTransformFlux;
7+
import io.rsocket.ResponderRSocket;
88
import io.rsocket.rpc.RSocketRpcService;
99
import io.rsocket.rpc.exception.ServiceNotFound;
1010
import io.rsocket.rpc.frames.Metadata;
@@ -14,7 +14,7 @@
1414
import reactor.core.publisher.Flux;
1515
import reactor.core.publisher.Mono;
1616

17-
public class RequestHandlingRSocket extends AbstractRSocket {
17+
public class RequestHandlingRSocket extends AbstractRSocket implements ResponderRSocket {
1818
private final ConcurrentMap<String, RSocketRpcService> registeredServices =
1919
new ConcurrentHashMap<>();
2020

@@ -102,25 +102,50 @@ public Flux<Payload> requestStream(Payload payload) {
102102

103103
@Override
104104
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
105-
return new SwitchTransformFlux<>(
106-
payloads,
107-
(payload, flux) -> {
108-
try {
109-
ByteBuf metadata = payload.sliceMetadata();
110-
String service = Metadata.getService(metadata);
111-
112-
RSocketRpcService rsocketService = registeredServices.get(service);
113-
114-
if (rsocketService == null) {
115-
ReferenceCountUtil.safeRelease(payload);
116-
return Flux.error(new ServiceNotFound(service));
117-
}
118-
119-
return rsocketService.requestChannel(payload, flux);
120-
} catch (Throwable t) {
121-
ReferenceCountUtil.safeRelease(payload);
122-
return Flux.error(t);
123-
}
124-
});
105+
return Flux.from(payloads)
106+
.switchOnFirst(
107+
(firstSignal, flux) -> {
108+
if (firstSignal.hasValue()) {
109+
Payload payload = firstSignal.get();
110+
try {
111+
ByteBuf metadata = payload.sliceMetadata();
112+
String service = Metadata.getService(metadata);
113+
114+
RSocketRpcService rsocketService = registeredServices.get(service);
115+
116+
if (rsocketService == null) {
117+
ReferenceCountUtil.safeRelease(payload);
118+
return Flux.error(new ServiceNotFound(service));
119+
}
120+
121+
return rsocketService.requestChannel(payload, flux);
122+
} catch (Throwable t) {
123+
ReferenceCountUtil.safeRelease(payload);
124+
return Flux.error(t);
125+
}
126+
}
127+
128+
return flux;
129+
});
130+
}
131+
132+
@Override
133+
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
134+
try {
135+
ByteBuf metadata = payload.sliceMetadata();
136+
String service = Metadata.getService(metadata);
137+
138+
RSocketRpcService rsocketService = registeredServices.get(service);
139+
140+
if (rsocketService == null) {
141+
ReferenceCountUtil.safeRelease(payload);
142+
return Flux.error(new ServiceNotFound(service));
143+
}
144+
145+
return rsocketService.requestChannel(payload, payloads);
146+
} catch (Throwable t) {
147+
ReferenceCountUtil.safeRelease(payload);
148+
return Flux.error(t);
149+
}
125150
}
126151
}

rsocket-rpc-protobuf/src/java_plugin/cpp/blocking_java_generator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,7 @@ static void PrintServer(const ServiceDescriptor* service,
924924
p->Print(
925925
*vars,
926926
"@$Override$\n"
927-
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Flux$<$Payload$> publisher) {\n");
927+
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Publisher$<$Payload$> publisher) {\n");
928928
p->Indent();
929929
if (request_channel.empty()) {
930930
p->Print(
@@ -956,7 +956,7 @@ static void PrintServer(const ServiceDescriptor* service,
956956
p->Indent();
957957
p->Print(
958958
*vars,
959-
"publisher.map(deserializer($input_type$.parser()));\n");
959+
"$Flux$.from(publisher).map(deserializer($input_type$.parser()));\n");
960960
p->Outdent();
961961
if (method->server_streaming()) {
962962
p->Print(

rsocket-rpc-protobuf/src/java_plugin/cpp/java_generator.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,7 +1311,7 @@ static void PrintServer(const ServiceDescriptor* service,
13111311
p->Print(
13121312
*vars,
13131313
"@$Override$\n"
1314-
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Flux$<$Payload$> publisher) {\n");
1314+
"public $Flux$<$Payload$> requestChannel($Payload$ payload, $Publisher$<$Payload$> publisher) {\n");
13151315
p->Indent();
13161316
if (request_channel.empty()) {
13171317
p->Print(
@@ -1344,7 +1344,7 @@ static void PrintServer(const ServiceDescriptor* service,
13441344
p->Indent();
13451345
p->Print(
13461346
*vars,
1347-
"publisher.map(deserializer($input_type$.parser()));\n");
1347+
"$Flux$.from(publisher).map(deserializer($input_type$.parser()));\n");
13481348
p->Outdent();
13491349
if (method->server_streaming()) {
13501350
p->Print(

0 commit comments

Comments
 (0)