Skip to content

Commit a40d549

Browse files
authored
core: Report marshaller error for uncompressed size too large back to the client
Report a status code of RESOURCE_EXHAUSTED instead of UNKNOWN, when the marshaller throws error for uncompressed message size being too large. Fixes : #11246
1 parent 589df4e commit a40d549

File tree

4 files changed

+80
-8
lines changed

4 files changed

+80
-8
lines changed

core/src/main/java/io/grpc/internal/ServerCallImpl.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -327,16 +327,20 @@ private void messagesAvailableInternal(final MessageProducer producer) {
327327
return;
328328
}
329329

330-
InputStream message;
331330
try {
331+
InputStream message;
332332
while ((message = producer.next()) != null) {
333-
try {
334-
listener.onMessage(call.method.parseRequest(message));
335-
} catch (Throwable t) {
333+
ReqT parsedMessage;
334+
try (InputStream ignored = message) {
335+
parsedMessage = call.method.parseRequest(message);
336+
} catch (StatusRuntimeException e) {
336337
GrpcUtil.closeQuietly(message);
337-
throw t;
338+
GrpcUtil.closeQuietly(producer);
339+
call.cancelled = true;
340+
call.close(e.getStatus(), new Metadata());
341+
return;
338342
}
339-
message.close();
343+
listener.onMessage(parsedMessage);
340344
}
341345
} catch (Throwable t) {
342346
GrpcUtil.closeQuietly(producer);

core/src/test/java/io/grpc/internal/ServerCallImplTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@
4848
import io.grpc.SecurityLevel;
4949
import io.grpc.ServerCall;
5050
import io.grpc.Status;
51+
import io.grpc.StatusRuntimeException;
5152
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
5253
import io.perfmark.PerfMark;
5354
import java.io.ByteArrayInputStream;
55+
import java.io.IOException;
5456
import java.io.InputStream;
5557
import java.io.InputStreamReader;
5658
import org.junit.Before;
@@ -69,6 +71,8 @@ public class ServerCallImplTest {
6971

7072
@Mock private ServerStream stream;
7173
@Mock private ServerCall.Listener<Long> callListener;
74+
@Mock private StreamListener.MessageProducer messageProducer;
75+
@Mock private InputStream message;
7276

7377
private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create();
7478
private ServerCallImpl<Long, Long> call;
@@ -493,6 +497,43 @@ public void streamListener_unexpectedRuntimeException() {
493497
assertThat(e).hasMessageThat().isEqualTo("unexpected exception");
494498
}
495499

500+
@Test
501+
public void streamListener_statusRuntimeException() throws IOException {
502+
MethodDescriptor<Long, Long> failingParseMethod = MethodDescriptor.<Long, Long>newBuilder()
503+
.setType(MethodType.UNARY)
504+
.setFullMethodName("service/method")
505+
.setRequestMarshaller(new LongMarshaller() {
506+
@Override
507+
public Long parse(InputStream stream) {
508+
throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED
509+
.withDescription("Decompressed gRPC message exceeds maximum size"));
510+
}
511+
})
512+
.setResponseMarshaller(new LongMarshaller())
513+
.build();
514+
515+
call = new ServerCallImpl<>(stream, failingParseMethod, requestHeaders, context,
516+
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
517+
serverCallTracer, PerfMark.createTag());
518+
519+
ServerStreamListenerImpl<Long> streamListener =
520+
new ServerCallImpl.ServerStreamListenerImpl<>(call, callListener, context);
521+
522+
when(messageProducer.next()).thenReturn(message, (InputStream) null);
523+
streamListener.messagesAvailable(messageProducer);
524+
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
525+
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
526+
527+
verify(stream).close(statusCaptor.capture(), metadataCaptor.capture());
528+
Status status = statusCaptor.getValue();
529+
assertEquals(Status.RESOURCE_EXHAUSTED.getCode(), status.getCode());
530+
assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription());
531+
532+
streamListener.halfClosed();
533+
verify(callListener, never()).onHalfClose();
534+
verify(callListener, never()).onMessage(any());
535+
}
536+
496537
private static class LongMarshaller implements Marshaller<Long> {
497538
@Override
498539
public InputStream stream(Long value) {

interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2030,7 +2030,7 @@ private void assertPayload(Payload expected, Payload actual) {
20302030
}
20312031
}
20322032

2033-
private static void assertCodeEquals(Status.Code expected, Status actual) {
2033+
protected static void assertCodeEquals(Status.Code expected, Status actual) {
20342034
assertWithMessage("Unexpected status: %s", actual).that(actual.getCode()).isEqualTo(expected);
20352035
}
20362036

interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.grpc.testing.integration;
1818

1919
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertThrows;
2021
import static org.junit.Assert.assertTrue;
2122

2223
import com.google.protobuf.ByteString;
@@ -37,6 +38,8 @@
3738
import io.grpc.ServerCall.Listener;
3839
import io.grpc.ServerCallHandler;
3940
import io.grpc.ServerInterceptor;
41+
import io.grpc.Status.Code;
42+
import io.grpc.StatusRuntimeException;
4043
import io.grpc.internal.GrpcUtil;
4144
import io.grpc.netty.InternalNettyChannelBuilder;
4245
import io.grpc.netty.InternalNettyServerBuilder;
@@ -53,7 +56,9 @@
5356
import java.io.OutputStream;
5457
import org.junit.Before;
5558
import org.junit.BeforeClass;
59+
import org.junit.Rule;
5660
import org.junit.Test;
61+
import org.junit.rules.TestName;
5762
import org.junit.runner.RunWith;
5863
import org.junit.runners.JUnit4;
5964

@@ -84,10 +89,16 @@ public static void registerCompressors() {
8489
compressors.register(Codec.Identity.NONE);
8590
}
8691

92+
@Rule
93+
public final TestName currentTest = new TestName();
94+
8795
@Override
8896
protected ServerBuilder<?> getServerBuilder() {
8997
NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create())
90-
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
98+
.maxInboundMessageSize(
99+
DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME.equals(currentTest.getMethodName())
100+
? 1000
101+
: AbstractInteropTest.MAX_MESSAGE_SIZE)
91102
.compressorRegistry(compressors)
92103
.decompressorRegistry(decompressors)
93104
.intercept(new ServerInterceptor() {
@@ -126,6 +137,22 @@ public void compresses() {
126137
assertTrue(FZIPPER.anyWritten);
127138
}
128139

140+
private static final String DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME =
141+
"decompressedMessageTooLong";
142+
143+
@Test
144+
public void decompressedMessageTooLong() {
145+
assertEquals(DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME, currentTest.getMethodName());
146+
final SimpleRequest bigRequest = SimpleRequest.newBuilder()
147+
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[10_000])))
148+
.build();
149+
StatusRuntimeException e = assertThrows(StatusRuntimeException.class,
150+
() -> blockingStub.withCompression("gzip").unaryCall(bigRequest));
151+
assertCodeEquals(Code.RESOURCE_EXHAUSTED, e.getStatus());
152+
assertEquals("Decompressed gRPC message exceeds maximum size 1000",
153+
e.getStatus().getDescription());
154+
}
155+
129156
@Override
130157
protected NettyChannelBuilder createChannelBuilder() {
131158
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())

0 commit comments

Comments
 (0)