From 298edba3440f962e3574c2375c9beff175039ea3 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Fri, 6 Jun 2025 15:24:27 +0000 Subject: [PATCH 01/29] handled cardinality violation and added tests --- server.go | 1 + stream.go | 4 + test/end2end_test.go | 262 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 267 insertions(+) diff --git a/server.go b/server.go index 976e70ae068e..b238a1bdf720 100644 --- a/server.go +++ b/server.go @@ -1572,6 +1572,7 @@ func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.Serv s: stream, p: &parser{r: stream, bufferPool: s.opts.bufferPool}, codec: s.getCodec(stream.ContentSubtype()), + desc: sd, maxReceiveMessageSize: s.opts.maxReceiveMessageSize, maxSendMessageSize: s.opts.maxSendMessageSize, trInfo: trInfo, diff --git a/stream.go b/stream.go index d58bb6471a8a..441ac728b589 100644 --- a/stream.go +++ b/stream.go @@ -1580,6 +1580,7 @@ type serverStream struct { s *transport.ServerStream p *parser codec baseCodec + desc *StreamDesc compressorV0 Compressor compressorV1 encoding.Compressor @@ -1774,6 +1775,9 @@ func (ss *serverStream) RecvMsg(m any) (err error) { binlog.Log(ss.ctx, chc) } } + if !ss.desc.ClientStreams { + return status.Errorf(codes.Internal, "RecvMsg is called twice") + } return err } if err == io.ErrUnexpectedEOF { diff --git a/test/end2end_test.go b/test/end2end_test.go index 75b27f4c224d..424ba229f983 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3740,6 +3740,268 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } +// Tests the behavior for server-side streaming when server call +// RecvMsg twice. Second call to RecvMsg should fail with Internal +// error. +func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "FullDuplexCall", + Handler: func(srv interface{}, stream grpc.ServerStream) error { + err := stream.RecvMsg(&testpb.Empty{}) + if err != nil { + t.Errorf("stream.RecvMsg() = %v, want ", err) + } + + if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } + return nil + }, + ClientStreams: false, + ServerStreams: true, + }, + }, + Metadata: "grpc/testing/test.proto", + } + s.RegisterService(&serviceDesc, &testServer{}) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + desc := &grpc.StreamDesc{ + StreamName: "FullDuplexCall", + ServerStreams: true, + ClientStreams: false, // This is the test case: client is *not* allowed to stream + } + + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/FullDuplexCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + + if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + +// Tests the behavior for server-side streaming when client call +// SendMsg twice. Second call to SendMsg should fail with Internal +// error. +func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "FullDuplexCall", + Handler: func(srv interface{}, stream grpc.ServerStream) error { + err := stream.RecvMsg(&testpb.Empty{}) + if err != nil { + t.Errorf("stream.RecvMsg() = %v, want ", err) + } + + return nil + }, + ClientStreams: false, + ServerStreams: true, + }, + }, + Metadata: "grpc/testing/test.proto", + } + s.RegisterService(&serviceDesc, &testServer{}) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + desc := &grpc.StreamDesc{ + StreamName: "FullDuplexCall", + ServerStreams: true, + ClientStreams: false, + } + + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/FullDuplexCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + + if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.SendMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + +// Tests the behavior for unary RPC when server call +// RecvMsg twice. Second call to RecvMsg should fail with Internal +// error. +func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "UnaryCall", + Handler: func(srv interface{}, stream grpc.ServerStream) error { + err := stream.RecvMsg(&testpb.Empty{}) + if err != nil { + t.Errorf("stream.RecvMsg() = %v, want ", err) + } + + if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } + return nil + }, + ClientStreams: false, + ServerStreams: false, + }, + }, + Metadata: "grpc/testing/test.proto", + } + s.RegisterService(&serviceDesc, &testServer{}) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + desc := &grpc.StreamDesc{ + StreamName: "UnaryCall", + ServerStreams: false, + ClientStreams: false, + } + + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + + if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + +// Tests the behavior for unary RPC when client call +// SendMsg twice. Second call to SendMsg should fail with Internal +// error. +func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "UnaryCall", + Handler: func(srv interface{}, stream grpc.ServerStream) error { + err := stream.RecvMsg(&testpb.Empty{}) + if err != nil { + t.Errorf("stream.RecvMsg() = %v, want ", err) + } + + return nil + }, + ClientStreams: false, + ServerStreams: false, + }, + }, + Metadata: "grpc/testing/test.proto", + } + s.RegisterService(&serviceDesc, &testServer{}) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + desc := &grpc.StreamDesc{ + StreamName: "UnaryCall", + ServerStreams: false, + ClientStreams: false, + } + + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + + if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.SendMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + func (s) TestExceedMaxStreamsLimit(t *testing.T) { for _, e := range listTestEnv() { testExceedMaxStreamsLimit(t, e) From fd5d614e1cc7b23be1792eb330cc462dc25e7a50 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Fri, 6 Jun 2025 18:11:22 +0000 Subject: [PATCH 02/29] remove vet errors --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 7154cd0052c8..3f13e68682f7 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3999,7 +3999,7 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { t.Errorf("stream.SendMsg() = %v, want error %v", status.Code(err), codes.Internal) - } + } } // Tests that a client receives a cardinality violation error for client-streaming From 7e4206cda94310f02b678f1844372b457fbb0102 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Fri, 6 Jun 2025 18:58:01 +0000 Subject: [PATCH 03/29] modified tests --- test/end2end_test.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 3f13e68682f7..2200c25ebcfd 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3826,11 +3826,6 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { { StreamName: "FullDuplexCall", Handler: func(srv interface{}, stream grpc.ServerStream) error { - err := stream.RecvMsg(&testpb.Empty{}) - if err != nil { - t.Errorf("stream.RecvMsg() = %v, want ", err) - } - return nil }, ClientStreams: false, @@ -3957,11 +3952,6 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { { StreamName: "UnaryCall", Handler: func(srv interface{}, stream grpc.ServerStream) error { - err := stream.RecvMsg(&testpb.Empty{}) - if err != nil { - t.Errorf("stream.RecvMsg() = %v, want ", err) - } - return nil }, ClientStreams: false, From 324566b93bdd7c3a2c3faa544c50f63ac9516c71 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Fri, 6 Jun 2025 19:51:31 +0000 Subject: [PATCH 04/29] modified tests --- test/end2end_test.go | 46 ++++++++++---------------------------------- 1 file changed, 10 insertions(+), 36 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 2200c25ebcfd..40465e154b16 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3817,24 +3817,11 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { } defer lis.Close() - s := grpc.NewServer() - serviceDesc := grpc.ServiceDesc{ - ServiceName: "grpc.testing.TestService", - HandlerType: (*any)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "FullDuplexCall", - Handler: func(srv interface{}, stream grpc.ServerStream) error { - return nil - }, - ClientStreams: false, - ServerStreams: true, - }, - }, - Metadata: "grpc/testing/test.proto", - } - s.RegisterService(&serviceDesc, &testServer{}) + ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error { + return nil + }) + + s := grpc.NewServer(ss) go s.Serve(lis) defer s.Stop() @@ -3943,24 +3930,11 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { } defer lis.Close() - s := grpc.NewServer() - serviceDesc := grpc.ServiceDesc{ - ServiceName: "grpc.testing.TestService", - HandlerType: (*any)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "UnaryCall", - Handler: func(srv interface{}, stream grpc.ServerStream) error { - return nil - }, - ClientStreams: false, - ServerStreams: false, - }, - }, - Metadata: "grpc/testing/test.proto", - } - s.RegisterService(&serviceDesc, &testServer{}) + ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error { + return nil + }) + + s := grpc.NewServer(ss) go s.Serve(lis) defer s.Stop() From 9b9cddffbd72cedf4c3ad835a4b35e7eba66b8d7 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 23 Jun 2025 20:34:47 +0000 Subject: [PATCH 05/29] change server.recvmsg() to catch cardinality violation --- stream.go | 6 ++++ test/end2end_test.go | 71 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index 1adfc7a328a0..b2492083e1de 100644 --- a/stream.go +++ b/stream.go @@ -1812,6 +1812,12 @@ func (ss *serverStream) RecvMsg(m any) (err error) { binlog.Log(ss.ctx, cm) } } + // Special handling for non-client-stream rpcs. + if !ss.desc.ClientStreams { + if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != io.EOF { + return status.Errorf(codes.Internal, "cardinality violation: expected for non client-streaming RPCs, but received another message") + } + } return nil } diff --git a/test/end2end_test.go b/test/end2end_test.go index 4392889329a8..1aa47a86f477 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3863,7 +3863,7 @@ func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { Streams: []grpc.StreamDesc{ { StreamName: "FullDuplexCall", - Handler: func(srv interface{}, stream grpc.ServerStream) error { + Handler: func(srv any, stream grpc.ServerStream) error { err := stream.RecvMsg(&testpb.Empty{}) if err != nil { t.Errorf("stream.RecvMsg() = %v, want ", err) @@ -3976,7 +3976,7 @@ func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { Streams: []grpc.StreamDesc{ { StreamName: "UnaryCall", - Handler: func(srv interface{}, stream grpc.ServerStream) error { + Handler: func(srv any, stream grpc.ServerStream) error { err := stream.RecvMsg(&testpb.Empty{}) if err != nil { t.Errorf("stream.RecvMsg() = %v, want ", err) @@ -4071,6 +4071,73 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { } } +func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "UnaryCall", + Handler: func(srv any, stream grpc.ServerStream) error { + if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } + return nil + }, + ClientStreams: false, + ServerStreams: true, + }, + }, + Metadata: "grpc/testing/test.proto", + } + s.RegisterService(&serviceDesc, &testServer{}) + go s.Serve(lis) + defer s.Stop() + + // s := grpc.NewServer(ss) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + desc := &grpc.StreamDesc{ + StreamName: "UnaryCall", + ServerStreams: true, + ClientStreams: true, + } + + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + + if err := stream.SendMsg(&testpb.Empty{}); err != nil { + t.Errorf("stream.SendMsg() = %v, want ", err) + } + + if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + // Tests that a client receives a cardinality violation error for client-streaming // RPCs if the server call SendMsg multiple times. func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { From 67549e723e5e6dcf2109e3adb1aefc2664bef69f Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 23 Jun 2025 21:16:33 +0000 Subject: [PATCH 06/29] replace srv with _ --- test/end2end_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 853d4f765a70..18f6e13cd2e7 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3758,7 +3758,7 @@ func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { Streams: []grpc.StreamDesc{ { StreamName: "FullDuplexCall", - Handler: func(srv any, stream grpc.ServerStream) error { + Handler: func(_ any, stream grpc.ServerStream) error { err := stream.RecvMsg(&testpb.Empty{}) if err != nil { t.Errorf("stream.RecvMsg() = %v, want ", err) @@ -3871,7 +3871,7 @@ func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { Streams: []grpc.StreamDesc{ { StreamName: "UnaryCall", - Handler: func(srv any, stream grpc.ServerStream) error { + Handler: func(_ any, stream grpc.ServerStream) error { err := stream.RecvMsg(&testpb.Empty{}) if err != nil { t.Errorf("stream.RecvMsg() = %v, want ", err) @@ -3981,7 +3981,7 @@ func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { Streams: []grpc.StreamDesc{ { StreamName: "UnaryCall", - Handler: func(srv any, stream grpc.ServerStream) error { + Handler: func(_ any, stream grpc.ServerStream) error { if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) } From 6030b90f19bc0649c4ad795b9d497bce7592ff66 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 1 Jul 2025 07:23:04 +0000 Subject: [PATCH 07/29] resolving comments --- stream.go | 18 ++++--- test/end2end_test.go | 119 +++++++++++++++++++++++++++++++------------ 2 files changed, 98 insertions(+), 39 deletions(-) diff --git a/stream.go b/stream.go index 2ce85f5ff010..f2698d47f216 100644 --- a/stream.go +++ b/stream.go @@ -1776,7 +1776,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) { } } if !ss.desc.ClientStreams { - return status.Errorf(codes.Internal, "RecvMsg is called twice") + return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-stream RPC") } return err } @@ -1804,13 +1804,19 @@ func (ss *serverStream) RecvMsg(m any) (err error) { binlog.Log(ss.ctx, cm) } } + + if ss.desc.ClientStreams { + // Subsequent messages should be received by subsequent RecvMsg calls. + return nil + } // Special handling for non-client-stream rpcs. - if !ss.desc.ClientStreams { - if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, payInfo, ss.decompressorV1, true); err != io.EOF { - return status.Errorf(codes.Internal, "cardinality violation: expected for non client-streaming RPCs, but received another message") - } + // This recv expects EOF or errors, so we don't collect inPayload. + if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, nil, ss.decompressorV1, true); err == io.EOF { + return nil + } else if err != nil { + return err } - return nil + return status.Error(codes.Internal, "cardinality violation: expected for non client-streaming RPCs, but received another message") } // MethodFromServerStream returns the method string for the input stream. diff --git a/test/end2end_test.go b/test/end2end_test.go index 18f6e13cd2e7..95898fe84bcc 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3740,10 +3740,9 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } -// Tests the behavior for server-side streaming when server call -// RecvMsg twice. Second call to RecvMsg should fail with Internal -// error. -func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { +// Tests the behavior for server-side streaming when server calls RecvMsg twice. +// Second call to RecvMsg should fail with Internal error. +func TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -3757,7 +3756,7 @@ func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { - StreamName: "FullDuplexCall", + StreamName: "ServerStreaming", Handler: func(_ any, stream grpc.ServerStream) error { err := stream.RecvMsg(&testpb.Empty{}) if err != nil { @@ -3773,7 +3772,6 @@ func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { ServerStreams: true, }, }, - Metadata: "grpc/testing/test.proto", } s.RegisterService(&serviceDesc, &testServer{}) go s.Serve(lis) @@ -3788,12 +3786,12 @@ func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { defer cc.Close() desc := &grpc.StreamDesc{ - StreamName: "FullDuplexCall", + StreamName: "ServerStreaming", ServerStreams: true, - ClientStreams: false, // This is the test case: client is *not* allowed to stream + ClientStreams: false, } - stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/FullDuplexCall") + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/ServerStreaming") if err != nil { t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) } @@ -3807,10 +3805,9 @@ func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { } } -// Tests the behavior for server-side streaming when client call -// SendMsg twice. Second call to SendMsg should fail with Internal -// error. -func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { +// Tests the behavior for server-side streaming when client calls SendMsg twice. +// Second call to SendMsg should fail with Internal error. +func TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -3834,12 +3831,12 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { defer cc.Close() desc := &grpc.StreamDesc{ - StreamName: "FullDuplexCall", + StreamName: "ServerStreaming", ServerStreams: true, ClientStreams: false, } - stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/FullDuplexCall") + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/ServerStreaming") if err != nil { t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) } @@ -3853,10 +3850,9 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { } } -// Tests the behavior for unary RPC when server call -// RecvMsg twice. Second call to RecvMsg should fail with Internal -// error. -func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { +// Tests the behavior for unary RPC when server calls RecvMsg twice. Second call +// to RecvMsg should fail with Internal error. +func TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -3886,7 +3882,6 @@ func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { ServerStreams: false, }, }, - Metadata: "grpc/testing/test.proto", } s.RegisterService(&serviceDesc, &testServer{}) go s.Serve(lis) @@ -3920,10 +3915,9 @@ func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { } } -// Tests the behavior for unary RPC when client call -// SendMsg twice. Second call to SendMsg should fail with Internal -// error. -func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { +// Tests the behavior for unary RPC when client calls SendMsg twice. Second call +// to SendMsg should fail with Internal error. +func TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -3966,7 +3960,9 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { } } -func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { +// Tests the behavior for server-side streaming RPC when client misbehaves as Bidi-streaming +// and calls SendMsg twice. +func TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -3980,7 +3976,7 @@ func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { - StreamName: "UnaryCall", + StreamName: "ServerStreaming", Handler: func(_ any, stream grpc.ServerStream) error { if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) @@ -3991,16 +3987,11 @@ func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { ServerStreams: true, }, }, - Metadata: "grpc/testing/test.proto", } s.RegisterService(&serviceDesc, &testServer{}) go s.Serve(lis) defer s.Stop() - // s := grpc.NewServer(ss) - go s.Serve(lis) - defer s.Stop() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -4010,12 +4001,12 @@ func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { defer cc.Close() desc := &grpc.StreamDesc{ - StreamName: "UnaryCall", + StreamName: "ServerStreaming", ServerStreams: true, ClientStreams: true, } - stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/ServerStreaming") if err != nil { t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) } @@ -4033,6 +4024,68 @@ func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { } } +// Tests the behavior for server-side streaming RPC when client sends zero request message. +func TestServerStreaming_ClientSendsZeroRequest(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + defer lis.Close() + + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "ServerStreaming", + Handler: func(_ any, stream grpc.ServerStream) error { + if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } + return nil + }, + ClientStreams: false, + ServerStreams: true, + }, + }, + } + s.RegisterService(&serviceDesc, &testServer{}) + go s.Serve(lis) + defer s.Stop() + + // s := grpc.NewServer(ss) + go s.Serve(lis) + defer s.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + } + defer cc.Close() + + desc := &grpc.StreamDesc{ + StreamName: "ServerStreaming", + ServerStreams: true, + ClientStreams: false, + } + + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/ServerStreaming") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.CloseSend(); err != nil { + t.Errorf("stream.CloseSend() = %v, want ", err) + } + if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } +} + // Tests that a client receives a cardinality violation error for client-streaming // RPCs if the server call SendMsg multiple times. func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { From dff08b346458456e677040dd162170340244e010 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 1 Jul 2025 07:33:20 +0000 Subject: [PATCH 08/29] resolving vets --- test/end2end_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 95898fe84bcc..ddfbad80341a 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3742,7 +3742,7 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { // Tests the behavior for server-side streaming when server calls RecvMsg twice. // Second call to RecvMsg should fail with Internal error. -func TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { +func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -3807,7 +3807,7 @@ func TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { // Tests the behavior for server-side streaming when client calls SendMsg twice. // Second call to SendMsg should fail with Internal error. -func TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { +func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -3852,7 +3852,7 @@ func TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { // Tests the behavior for unary RPC when server calls RecvMsg twice. Second call // to RecvMsg should fail with Internal error. -func TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { +func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -3917,7 +3917,7 @@ func TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { // Tests the behavior for unary RPC when client calls SendMsg twice. Second call // to SendMsg should fail with Internal error. -func TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { +func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -3962,7 +3962,7 @@ func TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { // Tests the behavior for server-side streaming RPC when client misbehaves as Bidi-streaming // and calls SendMsg twice. -func TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { +func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -4025,7 +4025,7 @@ func TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { } // Tests the behavior for server-side streaming RPC when client sends zero request message. -func TestServerStreaming_ClientSendsZeroRequest(t *testing.T) { +func (s) TestServerStreaming_ClientSendsZeroRequest(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) From f4f1b61ac960d0f5971dd316bd5c06b430a965ce Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 8 Jul 2025 07:04:55 +0000 Subject: [PATCH 09/29] addressed comments --- test/end2end_test.go | 111 ++++++++++++++++++++----------------------- 1 file changed, 52 insertions(+), 59 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index ddfbad80341a..58cb61b60afb 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3749,55 +3749,26 @@ func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { } defer lis.Close() - s := grpc.NewServer() - serviceDesc := grpc.ServiceDesc{ - ServiceName: "grpc.testing.TestService", - HandlerType: (*any)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "ServerStreaming", - Handler: func(_ any, stream grpc.ServerStream) error { - err := stream.RecvMsg(&testpb.Empty{}) - if err != nil { - t.Errorf("stream.RecvMsg() = %v, want ", err) - } - - if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { - t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) - } - return nil - }, - ClientStreams: false, - ServerStreams: true, - }, + ss := stubserver.StubServer{ + StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { + // This is second call to RecvMsg(), the initial call having been performed by the server handler. + if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + } + return nil }, } - s.RegisterService(&serviceDesc, &testServer{}) - go s.Serve(lis) - defer s.Stop() + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) - } - defer cc.Close() - desc := &grpc.StreamDesc{ - StreamName: "ServerStreaming", - ServerStreams: true, - ClientStreams: false, - } - - stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/ServerStreaming") + stream, err := ss.Client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{}) if err != nil { - t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) - } - - if err := stream.SendMsg(&testpb.Empty{}); err != nil { - t.Errorf("stream.SendMsg() = %v, want ", err) + t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) } if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { @@ -3814,11 +3785,23 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { } defer lis.Close() - ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error { - return nil - }) - - s := grpc.NewServer(ss) + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "ServerStreaming", + Handler: func(_ any, stream grpc.ServerStream) error { + return nil + }, + ClientStreams: false, + ServerStreams: true, + }, + }, + } + s.RegisterService(&serviceDesc, &testServer{}) go s.Serve(lis) defer s.Stop() @@ -3924,11 +3907,23 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { } defer lis.Close() - ss := grpc.UnknownServiceHandler(func(any, grpc.ServerStream) error { - return nil - }) - - s := grpc.NewServer(ss) + s := grpc.NewServer() + serviceDesc := grpc.ServiceDesc{ + ServiceName: "grpc.testing.TestService", + HandlerType: (*any)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "UnaryCall", + Handler: func(_ any, stream grpc.ServerStream) error { + return nil + }, + ClientStreams: false, + ServerStreams: false, + }, + }, + } + s.RegisterService(&serviceDesc, &testServer{}) go s.Serve(lis) defer s.Stop() @@ -3961,8 +3956,8 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { } // Tests the behavior for server-side streaming RPC when client misbehaves as Bidi-streaming -// and calls SendMsg twice. -func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { +// and sends multiple nessages. +func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -4000,6 +3995,8 @@ func (s) TestServerStreaming_ClientBehaveAsBidiStreaming(t *testing.T) { } defer cc.Close() + // Making the client bi-di to bypass the client side checks that stop a non-streaming client + // from sending multiple messages. desc := &grpc.StreamDesc{ StreamName: "ServerStreaming", ServerStreams: true, @@ -4055,10 +4052,6 @@ func (s) TestServerStreaming_ClientSendsZeroRequest(t *testing.T) { go s.Serve(lis) defer s.Stop() - // s := grpc.NewServer(ss) - go s.Serve(lis) - defer s.Stop() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -6466,7 +6459,7 @@ func streamingInterceptorVerifyConn(_ any, ss grpc.ServerStream, _ *grpc.StreamS // TestStreamingServerInterceptorGetsConnection tests whether the accepted conn on // the server gets to any streaming interceptors on the server side. -func (s) TestStreamingServerInterceptorGetsConnection(t *testing.T) { +func TestStreamingServerInterceptorGetsConnection(t *testing.T) { ss := &stubserver.StubServer{} if err := ss.Start([]grpc.ServerOption{grpc.StreamInterceptor(streamingInterceptorVerifyConn)}); err != nil { t.Fatalf("Error starting endpoint server: %v", err) From 83e86647ea7bee8d3921fdad4a337e98ff87ede2 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 8 Jul 2025 07:09:21 +0000 Subject: [PATCH 10/29] resolving vets --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 58cb61b60afb..8a94b94792af 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -6459,7 +6459,7 @@ func streamingInterceptorVerifyConn(_ any, ss grpc.ServerStream, _ *grpc.StreamS // TestStreamingServerInterceptorGetsConnection tests whether the accepted conn on // the server gets to any streaming interceptors on the server side. -func TestStreamingServerInterceptorGetsConnection(t *testing.T) { +func (s) TestStreamingServerInterceptorGetsConnection(t *testing.T) { ss := &stubserver.StubServer{} if err := ss.Start([]grpc.ServerOption{grpc.StreamInterceptor(streamingInterceptorVerifyConn)}); err != nil { t.Fatalf("Error starting endpoint server: %v", err) From 5acd9abda24516ca818b80900e13d5eae7aae735 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 8 Jul 2025 07:17:02 +0000 Subject: [PATCH 11/29] resolving vets --- test/end2end_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 8a94b94792af..c2f3d5456f2b 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3793,7 +3793,7 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { Streams: []grpc.StreamDesc{ { StreamName: "ServerStreaming", - Handler: func(_ any, stream grpc.ServerStream) error { + Handler: func(_ any, _ grpc.ServerStream) error { return nil }, ClientStreams: false, @@ -3915,7 +3915,7 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { Streams: []grpc.StreamDesc{ { StreamName: "UnaryCall", - Handler: func(_ any, stream grpc.ServerStream) error { + Handler: func(_ any, _ grpc.ServerStream) error { return nil }, ClientStreams: false, From 8f4f39b33ad3654e9cfcddb3268c86f556ffdd83 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 15 Jul 2025 09:16:39 +0000 Subject: [PATCH 12/29] minor change --- test/end2end_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index c2f3d5456f2b..5d0ea429dd02 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3742,7 +3742,7 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { // Tests the behavior for server-side streaming when server calls RecvMsg twice. // Second call to RecvMsg should fail with Internal error. -func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { +func TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) @@ -3752,7 +3752,7 @@ func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { ss := stubserver.StubServer{ StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { // This is second call to RecvMsg(), the initial call having been performed by the server handler. - if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { + if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) } return nil @@ -3768,7 +3768,7 @@ func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { stream, err := ss.Client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{}) if err != nil { - t.Fatalf(".StreamingInputCall(_) = _, %v, want ", err) + t.Fatalf(".StreamingOutputCall(_) = _, %v, want ", err) } if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { From 6de922dd088919d7f2a8236fe729ec5ab7847c03 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 15 Jul 2025 09:44:35 +0000 Subject: [PATCH 13/29] minor change --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 5d0ea429dd02..416ea459a2cf 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3742,7 +3742,7 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { // Tests the behavior for server-side streaming when server calls RecvMsg twice. // Second call to RecvMsg should fail with Internal error. -func TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { +func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatal(err) From 5f6c715cf1cba5ba997b914e889c1e4e9899e7e9 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 22 Jul 2025 05:53:30 +0000 Subject: [PATCH 14/29] minor change --- stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream.go b/stream.go index f2698d47f216..6de8fff998cc 100644 --- a/stream.go +++ b/stream.go @@ -1776,7 +1776,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) { } } if !ss.desc.ClientStreams { - return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-stream RPC") + return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC") } return err } From 990efe17ffafb2c888a1a1638e8025c1abb01617 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Thu, 24 Jul 2025 08:34:35 +0000 Subject: [PATCH 15/29] resolving comments --- interop/grpc_testing/test_grpc.pb.go | 1 + stream.go | 5 +- test/end2end_test.go | 240 ++++----------------------- 3 files changed, 37 insertions(+), 209 deletions(-) diff --git a/interop/grpc_testing/test_grpc.pb.go b/interop/grpc_testing/test_grpc.pb.go index 32d683433d2c..a480b59c30e5 100644 --- a/interop/grpc_testing/test_grpc.pb.go +++ b/interop/grpc_testing/test_grpc.pb.go @@ -25,6 +25,7 @@ package grpc_testing import ( context "context" + grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" diff --git a/stream.go b/stream.go index 6de8fff998cc..a8f3be8a55e7 100644 --- a/stream.go +++ b/stream.go @@ -1589,6 +1589,8 @@ type serverStream struct { sendCompressorName string + recvFirstMsg bool // recv frist msg from client + maxReceiveMessageSize int maxSendMessageSize int trInfo *traceInfo @@ -1775,7 +1777,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) { binlog.Log(ss.ctx, chc) } } - if !ss.desc.ClientStreams { + if !ss.desc.ClientStreams && !ss.recvFirstMsg { return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC") } return err @@ -1785,6 +1787,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) { } return toRPCErr(err) } + ss.recvFirstMsg = true if len(ss.statsHandler) != 0 { for _, sh := range ss.statsHandler { sh.HandleRPC(ss.s.Context(), &stats.InPayload{ diff --git a/test/end2end_test.go b/test/end2end_test.go index 416ea459a2cf..6d22ae606067 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3740,18 +3740,11 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } -// Tests the behavior for server-side streaming when server calls RecvMsg twice. -// Second call to RecvMsg should fail with Internal error. -func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatal(err) - } - defer lis.Close() - +// Tests the behavior for server-side streaming when client calls SendMsg twice. +// Second call to SendMsg should fail with Internal error. +func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { ss := stubserver.StubServer{ StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { - // This is second call to RecvMsg(), the initial call having been performed by the server handler. if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) } @@ -3765,61 +3758,19 @@ func (s) TestServerStreaming_ServerCallRecvMsgTwice(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - - stream, err := ss.Client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{}) - if err != nil { - t.Fatalf(".StreamingOutputCall(_) = _, %v, want ", err) - } - - if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { - t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) - } -} - -// Tests the behavior for server-side streaming when client calls SendMsg twice. -// Second call to SendMsg should fail with Internal error. -func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatal(err) - } - defer lis.Close() - - s := grpc.NewServer() - serviceDesc := grpc.ServiceDesc{ - ServiceName: "grpc.testing.TestService", - HandlerType: (*any)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "ServerStreaming", - Handler: func(_ any, _ grpc.ServerStream) error { - return nil - }, - ClientStreams: false, - ServerStreams: true, - }, - }, - } - s.RegisterService(&serviceDesc, &testServer{}) - go s.Serve(lis) - defer s.Stop() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) } defer cc.Close() desc := &grpc.StreamDesc{ - StreamName: "ServerStreaming", + StreamName: "StreamingOutputCall", ServerStreams: true, ClientStreams: false, } - stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/ServerStreaming") + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall") if err != nil { t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) } @@ -3833,105 +3784,24 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { } } -// Tests the behavior for unary RPC when server calls RecvMsg twice. Second call -// to RecvMsg should fail with Internal error. -func (s) TestUnaryRPC_ServerCallRecvMsgTwice(t *testing.T) { - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatal(err) - } - defer lis.Close() - - s := grpc.NewServer() - serviceDesc := grpc.ServiceDesc{ - ServiceName: "grpc.testing.TestService", - HandlerType: (*any)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "UnaryCall", - Handler: func(_ any, stream grpc.ServerStream) error { - err := stream.RecvMsg(&testpb.Empty{}) - if err != nil { - t.Errorf("stream.RecvMsg() = %v, want ", err) - } - - if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { - t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) - } - return nil - }, - ClientStreams: false, - ServerStreams: false, - }, - }, - } - s.RegisterService(&serviceDesc, &testServer{}) - go s.Serve(lis) - defer s.Stop() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) - } - defer cc.Close() - - desc := &grpc.StreamDesc{ - StreamName: "UnaryCall", - ServerStreams: false, - ClientStreams: false, - } - - stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") - if err != nil { - t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) - } - - if err := stream.SendMsg(&testpb.Empty{}); err != nil { - t.Errorf("stream.SendMsg() = %v, want ", err) - } - - if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { - t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) - } -} - // Tests the behavior for unary RPC when client calls SendMsg twice. Second call // to SendMsg should fail with Internal error. func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatal(err) - } - defer lis.Close() - - s := grpc.NewServer() - serviceDesc := grpc.ServiceDesc{ - ServiceName: "grpc.testing.TestService", - HandlerType: (*any)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "UnaryCall", - Handler: func(_ any, _ grpc.ServerStream) error { - return nil - }, - ClientStreams: false, - ServerStreams: false, - }, + ss := stubserver.StubServer{ + UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil }, } - s.RegisterService(&serviceDesc, &testServer{}) - go s.Serve(lis) - defer s.Stop() + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) } defer cc.Close() @@ -3958,52 +3828,29 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { // Tests the behavior for server-side streaming RPC when client misbehaves as Bidi-streaming // and sends multiple nessages. func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatal(err) - } - defer lis.Close() - - s := grpc.NewServer() - serviceDesc := grpc.ServiceDesc{ - ServiceName: "grpc.testing.TestService", - HandlerType: (*any)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "ServerStreaming", - Handler: func(_ any, stream grpc.ServerStream) error { - if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { - t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) - } - return nil - }, - ClientStreams: false, - ServerStreams: true, - }, - }, + ss := stubserver.StubServer{} + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) } - s.RegisterService(&serviceDesc, &testServer{}) - go s.Serve(lis) - defer s.Stop() + defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) } defer cc.Close() // Making the client bi-di to bypass the client side checks that stop a non-streaming client // from sending multiple messages. desc := &grpc.StreamDesc{ - StreamName: "ServerStreaming", + StreamName: "StreamingOutputCall", ServerStreams: true, ClientStreams: true, } - stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/ServerStreaming") + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall") if err != nil { t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) } @@ -4023,50 +3870,27 @@ func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { // Tests the behavior for server-side streaming RPC when client sends zero request message. func (s) TestServerStreaming_ClientSendsZeroRequest(t *testing.T) { - lis, err := testutils.LocalTCPListener() - if err != nil { - t.Fatal(err) - } - defer lis.Close() - - s := grpc.NewServer() - serviceDesc := grpc.ServiceDesc{ - ServiceName: "grpc.testing.TestService", - HandlerType: (*any)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "ServerStreaming", - Handler: func(_ any, stream grpc.ServerStream) error { - if err = stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { - t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) - } - return nil - }, - ClientStreams: false, - ServerStreams: true, - }, - }, + ss := stubserver.StubServer{} + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) } - s.RegisterService(&serviceDesc, &testServer{}) - go s.Serve(lis) - defer s.Stop() + defer ss.Stop() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", lis.Addr(), err) + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) } defer cc.Close() desc := &grpc.StreamDesc{ - StreamName: "ServerStreaming", + StreamName: "StreamingOutputCall", ServerStreams: true, ClientStreams: false, } - stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/ServerStreaming") + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall") if err != nil { t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) } From 7f6d31f787853913e3eb92a45a847dbcb8efcc43 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Thu, 24 Jul 2025 09:00:45 +0000 Subject: [PATCH 16/29] resolving nits --- test/end2end_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 6d22ae606067..22e982f38c5f 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3745,9 +3745,6 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { ss := stubserver.StubServer{ StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { - if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { - t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) - } return nil }, } From 41d8328a06f1a5b11df88986cfdf748ec2723a5a Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Thu, 24 Jul 2025 09:09:51 +0000 Subject: [PATCH 17/29] vet changes --- interop/grpc_testing/test_grpc.pb.go | 1 - test/end2end_test.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/interop/grpc_testing/test_grpc.pb.go b/interop/grpc_testing/test_grpc.pb.go index a480b59c30e5..32d683433d2c 100644 --- a/interop/grpc_testing/test_grpc.pb.go +++ b/interop/grpc_testing/test_grpc.pb.go @@ -25,7 +25,6 @@ package grpc_testing import ( context "context" - grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" diff --git a/test/end2end_test.go b/test/end2end_test.go index 22e982f38c5f..3043722e9d51 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3744,7 +3744,7 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { // Second call to SendMsg should fail with Internal error. func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { ss := stubserver.StubServer{ - StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { + StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, _ testgrpc.TestService_StreamingOutputCallServer) error { return nil }, } From 59ad1226802fe5a3603baf7f5d534ef81495915f Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Thu, 24 Jul 2025 11:43:13 +0000 Subject: [PATCH 18/29] added comment --- test/end2end_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/end2end_test.go b/test/end2end_test.go index 3043722e9d51..03075a75f2d9 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3744,6 +3744,7 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { // Second call to SendMsg should fail with Internal error. func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { ss := stubserver.StubServer{ + // The initial call to recvMsg made by the generated code, will return the error. StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, _ testgrpc.TestService_StreamingOutputCallServer) error { return nil }, @@ -3825,6 +3826,7 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { // Tests the behavior for server-side streaming RPC when client misbehaves as Bidi-streaming // and sends multiple nessages. func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { + // The initial call to recvMsg made by the generated code, will return the error. ss := stubserver.StubServer{} if err := ss.Start(nil); err != nil { t.Fatal("Error starting server:", err) @@ -3867,6 +3869,7 @@ func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { // Tests the behavior for server-side streaming RPC when client sends zero request message. func (s) TestServerStreaming_ClientSendsZeroRequest(t *testing.T) { + // The initial call to recvMsg made by the generated code, will return the error. ss := stubserver.StubServer{} if err := ss.Start(nil); err != nil { t.Fatal("Error starting server:", err) From 0f5248a9343140606e86ca94027ab0a721f36407 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Fri, 25 Jul 2025 11:18:48 +0000 Subject: [PATCH 19/29] added comment --- stream.go | 1 + 1 file changed, 1 insertion(+) diff --git a/stream.go b/stream.go index a8f3be8a55e7..acb6287a589a 100644 --- a/stream.go +++ b/stream.go @@ -1777,6 +1777,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) { binlog.Log(ss.ctx, chc) } } + // Received no request msg for non-client streaming rpcs. if !ss.desc.ClientStreams && !ss.recvFirstMsg { return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC") } From 1ff88683c1a79fd1facd544a4add176809beb979 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 28 Jul 2025 18:35:30 +0000 Subject: [PATCH 20/29] resolving comments --- test/end2end_test.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 03075a75f2d9..5b30cc5d91e4 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3741,11 +3741,23 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } // Tests the behavior for server-side streaming when client calls SendMsg twice. -// Second call to SendMsg should fail with Internal error. +// Second call to SendMsg should fail with Internal error and result in closing +// the connection with a RST_STREAM. func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { + // To ensure server.recvMsg() is successfully completed. + recvDoneOnServer := make(chan struct{}) + // To ensure goroutine for test does not end before RPC handler performs error + // checking. + handlerDone := make(chan struct{}) ss := stubserver.StubServer{ - // The initial call to recvMsg made by the generated code, will return the error. - StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, _ testgrpc.TestService_StreamingOutputCallServer) error { + StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { + // The initial call to recvMsg is made by the generated code. + close(recvDoneOnServer) + <-stream.Context().Done() + if err := stream.SendMsg(&testpb.StreamingOutputCallRequest{}); status.Code(err) != codes.Canceled { + t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Canceled) + } + close(handlerDone) return nil }, } @@ -3776,10 +3788,11 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { if err := stream.SendMsg(&testpb.Empty{}); err != nil { t.Errorf("stream.SendMsg() = %v, want ", err) } - + <-recvDoneOnServer if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { - t.Errorf("stream.SendMsg() = %v, want error %v", status.Code(err), codes.Internal) + t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Internal) } + <-handlerDone } // Tests the behavior for unary RPC when client calls SendMsg twice. Second call From 68fd0f830288f321bd1f4b0ece7f4696c6a68fe2 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 28 Jul 2025 20:16:17 +0000 Subject: [PATCH 21/29] update comment --- stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index acb6287a589a..2a7d56a2e3a0 100644 --- a/stream.go +++ b/stream.go @@ -1589,7 +1589,7 @@ type serverStream struct { sendCompressorName string - recvFirstMsg bool // recv frist msg from client + recvFirstMsg bool // set after the first message is received maxReceiveMessageSize int maxSendMessageSize int @@ -1820,7 +1820,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) { } else if err != nil { return err } - return status.Error(codes.Internal, "cardinality violation: expected for non client-streaming RPCs, but received another message") + return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-server-streaming RPC") } // MethodFromServerStream returns the method string for the input stream. From 19e9e7192b2ea9c8415c13900649734f6c8bd572 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Tue, 29 Jul 2025 11:21:33 +0000 Subject: [PATCH 22/29] nits --- test/end2end_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 5b30cc5d91e4..13d653357817 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3837,7 +3837,7 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { } // Tests the behavior for server-side streaming RPC when client misbehaves as Bidi-streaming -// and sends multiple nessages. +// and sends multiple messages. func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { // The initial call to recvMsg made by the generated code, will return the error. ss := stubserver.StubServer{} @@ -3880,8 +3880,8 @@ func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { } } -// Tests the behavior for server-side streaming RPC when client sends zero request message. -func (s) TestServerStreaming_ClientSendsZeroRequest(t *testing.T) { +// Tests the behavior for server-side streaming RPC when client sends zero request messages. +func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { // The initial call to recvMsg made by the generated code, will return the error. ss := stubserver.StubServer{} if err := ss.Start(nil); err != nil { From 0d30b1854af1c4ae3e19a3b0ff1f48ca19aa836e Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Wed, 30 Jul 2025 19:28:10 +0000 Subject: [PATCH 23/29] modifying test --- stream.go | 2 +- test/end2end_test.go | 87 ++++++++++++++++++++++++++++++-------------- 2 files changed, 61 insertions(+), 28 deletions(-) diff --git a/stream.go b/stream.go index 2a7d56a2e3a0..626af2349904 100644 --- a/stream.go +++ b/stream.go @@ -1820,7 +1820,7 @@ func (ss *serverStream) RecvMsg(m any) (err error) { } else if err != nil { return err } - return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-server-streaming RPC") + return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC") } // MethodFromServerStream returns the method string for the input stream. diff --git a/test/end2end_test.go b/test/end2end_test.go index 13d653357817..4164098d7cae 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3881,38 +3881,71 @@ func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { } // Tests the behavior for server-side streaming RPC when client sends zero request messages. -func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { - // The initial call to recvMsg made by the generated code, will return the error. - ss := stubserver.StubServer{} - if err := ss.Start(nil); err != nil { - t.Fatal("Error starting server:", err) +// Server runs against multiple StreamDesc configurations, including server-streaming, +// bidi-streaming and client-streaming. +func TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { + testCases := []struct { + name string + desc *grpc.StreamDesc + wantCode codes.Code // expected error code from RecvMsg + }{ + { + name: "ServerStreaming", + desc: &grpc.StreamDesc{ + StreamName: "StreamingOutputCall", + ServerStreams: true, + ClientStreams: false, + }, + wantCode: codes.Internal, + }, + { + name: "BidiStreaming", + desc: &grpc.StreamDesc{ + StreamName: "StreamingOutputCall", + ServerStreams: true, + ClientStreams: true, + }, + wantCode: codes.Internal, + }, + { + name: "ClientStreaming", + desc: &grpc.StreamDesc{ + StreamName: "StreamingOutputCall", + ServerStreams: false, + ClientStreams: true, + }, + wantCode: codes.Internal, + }, } - defer ss.Stop() - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) - } - defer cc.Close() + for _, tc := range testCases { + // The initial call to recvMsg made by the generated code, will return the error. + ss := stubserver.StubServer{} + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() - desc := &grpc.StreamDesc{ - StreamName: "StreamingOutputCall", - ServerStreams: true, - ClientStreams: false, - } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) + } + defer cc.Close() - stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall") - if err != nil { - t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) - } + stream, err := cc.NewStream(ctx, tc.desc, "/grpc.testing.TestService/StreamingOutputCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } - if err := stream.CloseSend(); err != nil { - t.Errorf("stream.CloseSend() = %v, want ", err) - } - if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { - t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) + if err := stream.CloseSend(); err != nil { + t.Errorf("stream.CloseSend() = %v, want ", err) + } + + if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != tc.wantCode { + t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), tc.wantCode) + } } } From 83fd598a1ba157fb0279c021b0a446f69753a5d4 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Wed, 30 Jul 2025 19:32:11 +0000 Subject: [PATCH 24/29] resolving vet --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 4164098d7cae..e4e2087146cb 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3883,7 +3883,7 @@ func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { // Tests the behavior for server-side streaming RPC when client sends zero request messages. // Server runs against multiple StreamDesc configurations, including server-streaming, // bidi-streaming and client-streaming. -func TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { +func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { testCases := []struct { name string desc *grpc.StreamDesc From efa8e5add494a0adae1d1c438067c94a0536eadc Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Wed, 30 Jul 2025 19:43:15 +0000 Subject: [PATCH 25/29] remove comment --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index e4e2087146cb..1bdb842d3faf 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3887,7 +3887,7 @@ func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { testCases := []struct { name string desc *grpc.StreamDesc - wantCode codes.Code // expected error code from RecvMsg + wantCode codes.Code }{ { name: "ServerStreaming", From 29f66575f3378434515d37ce3937a80c243e368b Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Thu, 31 Jul 2025 19:39:26 +0000 Subject: [PATCH 26/29] modifying tests --- test/end2end_test.go | 49 ++++++++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 1bdb842d3faf..d32e9ac62161 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3880,24 +3880,13 @@ func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { } } -// Tests the behavior for server-side streaming RPC when client sends zero request messages. -// Server runs against multiple StreamDesc configurations, including server-streaming, -// bidi-streaming and client-streaming. -func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { +// Tests the behavior of server for server-side streaming RPC when client sends zero request messages. +func (s) TestServerStreaming_ServerRecvZeroRequests(t *testing.T) { testCases := []struct { name string desc *grpc.StreamDesc wantCode codes.Code }{ - { - name: "ServerStreaming", - desc: &grpc.StreamDesc{ - StreamName: "StreamingOutputCall", - ServerStreams: true, - ClientStreams: false, - }, - wantCode: codes.Internal, - }, { name: "BidiStreaming", desc: &grpc.StreamDesc{ @@ -3949,6 +3938,40 @@ func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { } } +// Tests the behavior of client for server-side streaming RPC when client sends zero request messages. +func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { + t.Skip() + // The initial call to recvMsg made by the generated code, will return the error. + ss := stubserver.StubServer{} + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) + } + defer cc.Close() + + desc := &grpc.StreamDesc{ + StreamName: "StreamingOutputCall", + ServerStreams: true, + ClientStreams: false, + } + + stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall") + if err != nil { + t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) + } + + if err := stream.CloseSend(); status.Code(err) != codes.Internal { + t.Errorf("stream.CloseSend() = %v, want error %v", status.Code(err), codes.Internal) + } +} + // Tests that a client receives a cardinality violation error for client-streaming // RPCs if the server call SendMsg multiple times. func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { From 183b1dae879acb5c53174bda0245f9118d50ccb5 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Sun, 3 Aug 2025 18:07:58 +0000 Subject: [PATCH 27/29] resolving comments --- test/end2end_test.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index d32e9ac62161..077633194f0a 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -51,6 +51,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/credentials/local" "google.golang.org/grpc/health" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/binarylog" @@ -3740,9 +3741,13 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } -// Tests the behavior for server-side streaming when client calls SendMsg twice. -// Second call to SendMsg should fail with Internal error and result in closing -// the connection with a RST_STREAM. +// Tests the behavior for server-side streaming RPCs when client calls SendMsg twice. +// The first client.SendMsg() sends EOF along with the message. When client calls a +// second SendMsg, it triggers a RST_STREAM which cancels the stream context on the +// server. There would be a race, the RST_STREAM could cause the server’s first RecvMsg +// to fail, even if the request message was already delivered. By synchronizing, we +// ensure that the server has read the first message before the client triggers RST_STREAM +// and validating expected error codes. func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { // To ensure server.recvMsg() is successfully completed. recvDoneOnServer := make(chan struct{}) @@ -3768,7 +3773,7 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials())) if err != nil { t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) } @@ -3795,6 +3800,8 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { <-handlerDone } +// TODO : https://github.com/grpc/grpc-go/issues/7286 - Add tests to check +// server-side behavior for Unary RPC. // Tests the behavior for unary RPC when client calls SendMsg twice. Second call // to SendMsg should fail with Internal error. func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { @@ -3810,7 +3817,7 @@ func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials())) if err != nil { t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) } @@ -3848,7 +3855,7 @@ func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials())) if err != nil { t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) } @@ -3917,7 +3924,7 @@ func (s) TestServerStreaming_ServerRecvZeroRequests(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials())) if err != nil { t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) } @@ -3940,6 +3947,8 @@ func (s) TestServerStreaming_ServerRecvZeroRequests(t *testing.T) { // Tests the behavior of client for server-side streaming RPC when client sends zero request messages. func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { + // TODO : https://github.com/grpc/grpc-go/issues/7286 - remove `t.Skip()` + // after this is fixed. t.Skip() // The initial call to recvMsg made by the generated code, will return the error. ss := stubserver.StubServer{} @@ -3950,7 +3959,7 @@ func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials())) if err != nil { t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) } From 749a52c8d8db2a21910e4bf2d3304bf74533a49c Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 4 Aug 2025 18:08:27 +0000 Subject: [PATCH 28/29] resolving comments --- test/end2end_test.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 077633194f0a..b9b93f3887d1 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3741,13 +3741,9 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { } } -// Tests the behavior for server-side streaming RPCs when client calls SendMsg twice. -// The first client.SendMsg() sends EOF along with the message. When client calls a -// second SendMsg, it triggers a RST_STREAM which cancels the stream context on the -// server. There would be a race, the RST_STREAM could cause the server’s first RecvMsg -// to fail, even if the request message was already delivered. By synchronizing, we -// ensure that the server has read the first message before the client triggers RST_STREAM -// and validating expected error codes. +// Tests the behavior for server-side streaming when client calls SendMsg twice. +// Second call to SendMsg should fail with Internal error and result in closing +// the connection with a RST_STREAM. func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { // To ensure server.recvMsg() is successfully completed. recvDoneOnServer := make(chan struct{}) @@ -3756,8 +3752,9 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { handlerDone := make(chan struct{}) ss := stubserver.StubServer{ StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { - // The initial call to recvMsg is made by the generated code. + // The initial call to recvMsg is made by the generated code. Signal test when done. close(recvDoneOnServer) + // Block until the stream’s context is done (cancelled by client). <-stream.Context().Done() if err := stream.SendMsg(&testpb.StreamingOutputCallRequest{}); status.Code(err) != codes.Canceled { t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Canceled) @@ -3790,9 +3787,12 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) } + // First SendMsg sends EOF along with the message. if err := stream.SendMsg(&testpb.Empty{}); err != nil { t.Errorf("stream.SendMsg() = %v, want ", err) } + + // To ensure that the server has read the first message before client triggers RST_STREAM. <-recvDoneOnServer if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Internal) @@ -3800,8 +3800,7 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { <-handlerDone } -// TODO : https://github.com/grpc/grpc-go/issues/7286 - Add tests to check -// server-side behavior for Unary RPC. +// TODO(i/7286) : Add tests to check server-side behavior for Unary RPC. // Tests the behavior for unary RPC when client calls SendMsg twice. Second call // to SendMsg should fail with Internal error. func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { @@ -3947,9 +3946,7 @@ func (s) TestServerStreaming_ServerRecvZeroRequests(t *testing.T) { // Tests the behavior of client for server-side streaming RPC when client sends zero request messages. func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { - // TODO : https://github.com/grpc/grpc-go/issues/7286 - remove `t.Skip()` - // after this is fixed. - t.Skip() + t.Skip("blocked on i/7286") // The initial call to recvMsg made by the generated code, will return the error. ss := stubserver.StubServer{} if err := ss.Start(nil); err != nil { From 1b1800da0e299d4d200df87f90e31562b10d79d4 Mon Sep 17 00:00:00 2001 From: Pranjali-2501 Date: Mon, 4 Aug 2025 18:48:56 +0000 Subject: [PATCH 29/29] resolving comments --- test/end2end_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index b9b93f3887d1..ce1c5dbec70f 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3745,16 +3745,20 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { // Second call to SendMsg should fail with Internal error and result in closing // the connection with a RST_STREAM. func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { - // To ensure server.recvMsg() is successfully completed. + // To ensure initial call to server.recvMsg() made by the generated code is successfully + // completed. Otherwise, if the client attempts to send a second request message, that + // will trigger a RST_STREAM from the client due to the application violating the RPC's + // protocol. The RST_STREAM could cause the server’s first RecvMsg to fail and will prevent + // the method handler from being called. recvDoneOnServer := make(chan struct{}) // To ensure goroutine for test does not end before RPC handler performs error // checking. handlerDone := make(chan struct{}) ss := stubserver.StubServer{ StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { - // The initial call to recvMsg is made by the generated code. Signal test when done. close(recvDoneOnServer) - // Block until the stream’s context is done (cancelled by client). + // Block until the stream’s context is done. Second call to client.SendMsg + // triggers a RST_STREAM which cancels the stream context on the server. <-stream.Context().Done() if err := stream.SendMsg(&testpb.StreamingOutputCallRequest{}); status.Code(err) != codes.Canceled { t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Canceled) @@ -3787,12 +3791,10 @@ func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) } - // First SendMsg sends EOF along with the message. if err := stream.SendMsg(&testpb.Empty{}); err != nil { t.Errorf("stream.SendMsg() = %v, want ", err) } - // To ensure that the server has read the first message before client triggers RST_STREAM. <-recvDoneOnServer if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Internal)