Skip to content

Commit ab1ca08

Browse files
authored
stats: Re-use objects while calling multiple Handlers (#8639)
This PR improves performance by eliminating heap allocations when multiple stats handlers are configured. Previously, iterating through a list of handlers caused one heap allocation per handler for each RPC. This change introduces a Handler that combines multiple Handlers and implements the `Handler` interface. The combined handler delegates calls to the handlers it contains. This approach allows gRPC clients and servers to operate as if there were only a single `Handler` registered, simplifying the internal logic and removing the per-RPC allocation overhead. To avoid any performance impact when stats are disabled, the combined `Handler` is only created when at least one handler is registered. # Tested Since existing benchmarks don't register stats handler, I modified the benchmark to add 2 stats handlers each on the server and client (36ba616). ```sh # test command go run benchmark/benchmain/main.go -benchtime=60s -workloads=unary \ -compression=off -maxConcurrentCalls=200 -trace=off \ -reqSizeBytes=100 -respSizeBytes=100 -networkMode=Local -resultFile="${RUN_NAME}" # results go run benchmark/benchresult/main.go unary-before unary-after Title Before After Percentage TotalOps 7336128 7638892 4.13% SendOps 0 0 NaN% RecvOps 0 0 NaN% Bytes/op 12382.19 11467.03 -7.39% Allocs/op 173.93 165.91 -4.60% ReqT/op 97815040.00 101851893.33 4.13% RespT/op 97815040.00 101851893.33 4.13% 50th-Lat 1.463345ms 1.403011ms -4.12% 90th-Lat 2.557136ms 2.46828ms -3.47% 99th-Lat 3.073264ms 3.080081ms 0.22% Avg-Lat 1.634153ms 1.569391ms -3.96% GoVersion go1.24.7 go1.24.7 GrpcVersion 1.77.0-dev 1.77.0-dev ``` RELEASE NOTES: * stats: Reduce heap allocations when multiple stats Handlers are registered.
1 parent b8a0fc9 commit ab1ca08

File tree

9 files changed

+203
-159
lines changed

9 files changed

+203
-159
lines changed

clientconn.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,12 @@ import (
4040
"google.golang.org/grpc/internal/grpcsync"
4141
"google.golang.org/grpc/internal/idle"
4242
iresolver "google.golang.org/grpc/internal/resolver"
43-
"google.golang.org/grpc/internal/stats"
43+
istats "google.golang.org/grpc/internal/stats"
4444
"google.golang.org/grpc/internal/transport"
4545
"google.golang.org/grpc/keepalive"
4646
"google.golang.org/grpc/resolver"
4747
"google.golang.org/grpc/serviceconfig"
48+
"google.golang.org/grpc/stats"
4849
"google.golang.org/grpc/status"
4950

5051
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
@@ -210,7 +211,8 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
210211
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
211212
cc.pickerWrapper = newPickerWrapper()
212213

213-
cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
214+
cc.metricsRecorderList = istats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
215+
cc.statsHandler = istats.NewCombinedHandler(cc.dopts.copts.StatsHandlers...)
214216

215217
cc.initIdleStateLocked() // Safe to call without the lock, since nothing else has a reference to cc.
216218
cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
@@ -621,7 +623,8 @@ type ClientConn struct {
621623
channelz *channelz.Channel // Channelz object.
622624
resolverBuilder resolver.Builder // See initParsedTargetAndResolverBuilder().
623625
idlenessMgr *idle.Manager
624-
metricsRecorderList *stats.MetricsRecorderList
626+
metricsRecorderList *istats.MetricsRecorderList
627+
statsHandler stats.Handler
625628

626629
// The following provide their own synchronization, and therefore don't
627630
// require cc.mu to be held to access them.

internal/stats/stats.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
*
3+
* Copyright 2025 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package stats
20+
21+
import (
22+
"context"
23+
24+
"google.golang.org/grpc/stats"
25+
)
26+
27+
type combinedHandler struct {
28+
handlers []stats.Handler
29+
}
30+
31+
// NewCombinedHandler combines multiple stats.Handlers into a single handler.
32+
//
33+
// It returns nil if no handlers are provided. If only one handler is
34+
// provided, it is returned directly without wrapping.
35+
func NewCombinedHandler(handlers ...stats.Handler) stats.Handler {
36+
switch len(handlers) {
37+
case 0:
38+
return nil
39+
case 1:
40+
return handlers[0]
41+
default:
42+
return &combinedHandler{handlers: handlers}
43+
}
44+
}
45+
46+
func (ch *combinedHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
47+
for _, h := range ch.handlers {
48+
ctx = h.TagRPC(ctx, info)
49+
}
50+
return ctx
51+
}
52+
53+
func (ch *combinedHandler) HandleRPC(ctx context.Context, stats stats.RPCStats) {
54+
for _, h := range ch.handlers {
55+
h.HandleRPC(ctx, stats)
56+
}
57+
}
58+
59+
func (ch *combinedHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
60+
for _, h := range ch.handlers {
61+
ctx = h.TagConn(ctx, info)
62+
}
63+
return ctx
64+
}
65+
66+
func (ch *combinedHandler) HandleConn(ctx context.Context, stats stats.ConnStats) {
67+
for _, h := range ch.handlers {
68+
h.HandleConn(ctx, stats)
69+
}
70+
}

internal/transport/handler_server.go

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import (
5050
// NewServerHandlerTransport returns a ServerTransport handling gRPC from
5151
// inside an http.Handler, or writes an HTTP error to w and returns an error.
5252
// It requires that the http Server supports HTTP/2.
53-
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler, bufferPool mem.BufferPool) (ServerTransport, error) {
53+
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler, bufferPool mem.BufferPool) (ServerTransport, error) {
5454
if r.Method != http.MethodPost {
5555
w.Header().Set("Allow", http.MethodPost)
5656
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
@@ -170,7 +170,7 @@ type serverHandlerTransport struct {
170170
// TODO make sure this is consistent across handler_server and http2_server
171171
contentSubtype string
172172

173-
stats []stats.Handler
173+
stats stats.Handler
174174
logger *grpclog.PrefixLogger
175175

176176
bufferPool mem.BufferPool
@@ -274,15 +274,13 @@ func (ht *serverHandlerTransport) writeStatus(s *ServerStream, st *status.Status
274274
}
275275
})
276276

277-
if err == nil { // transport has not been closed
277+
if err == nil && ht.stats != nil { // transport has not been closed
278278
// Note: The trailer fields are compressed with hpack after this call returns.
279279
// No WireLength field is set here.
280280
s.hdrMu.Lock()
281-
for _, sh := range ht.stats {
282-
sh.HandleRPC(s.Context(), &stats.OutTrailer{
283-
Trailer: s.trailer.Copy(),
284-
})
285-
}
281+
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
282+
Trailer: s.trailer.Copy(),
283+
})
286284
s.hdrMu.Unlock()
287285
}
288286
ht.Close(errors.New("finished writing status"))
@@ -374,15 +372,13 @@ func (ht *serverHandlerTransport) writeHeader(s *ServerStream, md metadata.MD) e
374372
ht.rw.(http.Flusher).Flush()
375373
})
376374

377-
if err == nil {
378-
for _, sh := range ht.stats {
379-
// Note: The header fields are compressed with hpack after this call returns.
380-
// No WireLength field is set here.
381-
sh.HandleRPC(s.Context(), &stats.OutHeader{
382-
Header: md.Copy(),
383-
Compression: s.sendCompress,
384-
})
385-
}
375+
if err == nil && ht.stats != nil {
376+
// Note: The header fields are compressed with hpack after this call returns.
377+
// No WireLength field is set here.
378+
ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
379+
Header: md.Copy(),
380+
Compression: s.sendCompress,
381+
})
386382
}
387383
return err
388384
}

internal/transport/handler_server_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func (h *mockStatsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) co
266266
func (h *mockStatsHandler) HandleConn(context.Context, stats.ConnStats) {
267267
}
268268

269-
func newHandleStreamTest(t *testing.T, statsHandlers []stats.Handler) *handleStreamTest {
269+
func newHandleStreamTest(t *testing.T, statsHandler stats.Handler) *handleStreamTest {
270270
bodyr, bodyw := io.Pipe()
271271
req := &http.Request{
272272
ProtoMajor: 2,
@@ -280,7 +280,7 @@ func newHandleStreamTest(t *testing.T, statsHandlers []stats.Handler) *handleStr
280280
Body: bodyr,
281281
}
282282
rw := newTestHandlerResponseWriter().(testHandlerResponseWriter)
283-
ht, err := NewServerHandlerTransport(rw, req, statsHandlers, mem.DefaultBufferPool())
283+
ht, err := NewServerHandlerTransport(rw, req, statsHandler, mem.DefaultBufferPool())
284284
if err != nil {
285285
t.Fatal(err)
286286
}
@@ -555,7 +555,7 @@ func (s) TestHandlerTransport_HandleStreams_StatsHandlers(t *testing.T) {
555555
statsHandler := &mockStatsHandler{
556556
rpcStatsCh: make(chan stats.RPCStats, 2),
557557
}
558-
hst := newHandleStreamTest(t, []stats.Handler{statsHandler})
558+
hst := newHandleStreamTest(t, statsHandler)
559559
handleStream := func(s *ServerStream) {
560560
if err := s.SendHeader(metadata.New(map[string]string{})); err != nil {
561561
t.Error(err)

internal/transport/http2_client.go

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"google.golang.org/grpc/internal/grpcutil"
4545
imetadata "google.golang.org/grpc/internal/metadata"
4646
"google.golang.org/grpc/internal/proxyattributes"
47+
istats "google.golang.org/grpc/internal/stats"
4748
istatus "google.golang.org/grpc/internal/status"
4849
isyscall "google.golang.org/grpc/internal/syscall"
4950
"google.golang.org/grpc/internal/transport/networktype"
@@ -105,7 +106,7 @@ type http2Client struct {
105106
kp keepalive.ClientParameters
106107
keepaliveEnabled bool
107108

108-
statsHandlers []stats.Handler
109+
statsHandler stats.Handler
109110

110111
initialWindowSize int32
111112

@@ -342,7 +343,7 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
342343
isSecure: isSecure,
343344
perRPCCreds: perRPCCreds,
344345
kp: kp,
345-
statsHandlers: opts.StatsHandlers,
346+
statsHandler: istats.NewCombinedHandler(opts.StatsHandlers...),
346347
initialWindowSize: initialWindowSize,
347348
nextID: 1,
348349
maxConcurrentStreams: defaultMaxStreamsClient,
@@ -386,15 +387,14 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
386387
updateFlowControl: t.updateFlowControl,
387388
}
388389
}
389-
for _, sh := range t.statsHandlers {
390-
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
390+
if t.statsHandler != nil {
391+
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
391392
RemoteAddr: t.remoteAddr,
392393
LocalAddr: t.localAddr,
393394
})
394-
connBegin := &stats.ConnBegin{
395+
t.statsHandler.HandleConn(t.ctx, &stats.ConnBegin{
395396
Client: true,
396-
}
397-
sh.HandleConn(t.ctx, connBegin)
397+
})
398398
}
399399
if t.keepaliveEnabled {
400400
t.kpDormancyCond = sync.NewCond(&t.mu)
@@ -901,27 +901,23 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
901901
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
902902
}
903903
}
904-
if len(t.statsHandlers) != 0 {
904+
if t.statsHandler != nil {
905905
header, ok := metadata.FromOutgoingContext(ctx)
906906
if ok {
907907
header.Set("user-agent", t.userAgent)
908908
} else {
909909
header = metadata.Pairs("user-agent", t.userAgent)
910910
}
911-
for _, sh := range t.statsHandlers {
912-
// Note: The header fields are compressed with hpack after this call returns.
913-
// No WireLength field is set here.
914-
// Note: Creating a new stats object to prevent pollution.
915-
outHeader := &stats.OutHeader{
916-
Client: true,
917-
FullMethod: callHdr.Method,
918-
RemoteAddr: t.remoteAddr,
919-
LocalAddr: t.localAddr,
920-
Compression: callHdr.SendCompress,
921-
Header: header,
922-
}
923-
sh.HandleRPC(s.ctx, outHeader)
924-
}
911+
// Note: The header fields are compressed with hpack after this call returns.
912+
// No WireLength field is set here.
913+
t.statsHandler.HandleRPC(s.ctx, &stats.OutHeader{
914+
Client: true,
915+
FullMethod: callHdr.Method,
916+
RemoteAddr: t.remoteAddr,
917+
LocalAddr: t.localAddr,
918+
Compression: callHdr.SendCompress,
919+
Header: header,
920+
})
925921
}
926922
if transportDrainRequired {
927923
if t.logger.V(logLevel) {
@@ -1062,11 +1058,10 @@ func (t *http2Client) Close(err error) {
10621058
for _, s := range streams {
10631059
t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
10641060
}
1065-
for _, sh := range t.statsHandlers {
1066-
connEnd := &stats.ConnEnd{
1061+
if t.statsHandler != nil {
1062+
t.statsHandler.HandleConn(t.ctx, &stats.ConnEnd{
10671063
Client: true,
1068-
}
1069-
sh.HandleConn(t.ctx, connEnd)
1064+
})
10701065
}
10711066
}
10721067

@@ -1598,22 +1593,20 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
15981593
}
15991594
}
16001595

1601-
for _, sh := range t.statsHandlers {
1596+
if t.statsHandler != nil {
16021597
if !endStream {
1603-
inHeader := &stats.InHeader{
1598+
t.statsHandler.HandleRPC(s.ctx, &stats.InHeader{
16041599
Client: true,
16051600
WireLength: int(frame.Header().Length),
16061601
Header: metadata.MD(mdata).Copy(),
16071602
Compression: s.recvCompress,
1608-
}
1609-
sh.HandleRPC(s.ctx, inHeader)
1603+
})
16101604
} else {
1611-
inTrailer := &stats.InTrailer{
1605+
t.statsHandler.HandleRPC(s.ctx, &stats.InTrailer{
16121606
Client: true,
16131607
WireLength: int(frame.Header().Length),
16141608
Trailer: metadata.MD(mdata).Copy(),
1615-
}
1616-
sh.HandleRPC(s.ctx, inTrailer)
1609+
})
16171610
}
16181611
}
16191612

internal/transport/http2_server.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ type http2Server struct {
8787
// updates, reset streams, and various settings) to the controller.
8888
controlBuf *controlBuffer
8989
fc *trInFlow
90-
stats []stats.Handler
90+
stats stats.Handler
9191
// Keepalive and max-age parameters for the server.
9292
kp keepalive.ServerParameters
9393
// Keepalive enforcement policy.
@@ -261,7 +261,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
261261
fc: &trInFlow{limit: uint32(icwz)},
262262
state: reachable,
263263
activeStreams: make(map[uint32]*ServerStream),
264-
stats: config.StatsHandlers,
264+
stats: config.StatsHandler,
265265
kp: kp,
266266
idle: time.Now(),
267267
kep: kep,
@@ -1055,14 +1055,13 @@ func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
10551055
t.closeStream(s, true, http2.ErrCodeInternal, false)
10561056
return ErrHeaderListSizeLimitViolation
10571057
}
1058-
for _, sh := range t.stats {
1058+
if t.stats != nil {
10591059
// Note: Headers are compressed with hpack after this call returns.
10601060
// No WireLength field is set here.
1061-
outHeader := &stats.OutHeader{
1061+
t.stats.HandleRPC(s.Context(), &stats.OutHeader{
10621062
Header: s.header.Copy(),
10631063
Compression: s.sendCompress,
1064-
}
1065-
sh.HandleRPC(s.Context(), outHeader)
1064+
})
10661065
}
10671066
return nil
10681067
}
@@ -1130,10 +1129,10 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
11301129
// Send a RST_STREAM after the trailers if the client has not already half-closed.
11311130
rst := s.getState() == streamActive
11321131
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
1133-
for _, sh := range t.stats {
1132+
if t.stats != nil {
11341133
// Note: The trailer fields are compressed with hpack after this call returns.
11351134
// No WireLength field is set here.
1136-
sh.HandleRPC(s.Context(), &stats.OutTrailer{
1135+
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
11371136
Trailer: s.trailer.Copy(),
11381137
})
11391138
}

internal/transport/transport.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ type ServerConfig struct {
478478
ConnectionTimeout time.Duration
479479
Credentials credentials.TransportCredentials
480480
InTapHandle tap.ServerInHandle
481-
StatsHandlers []stats.Handler
481+
StatsHandler stats.Handler
482482
KeepaliveParams keepalive.ServerParameters
483483
KeepalivePolicy keepalive.EnforcementPolicy
484484
InitialWindowSize int32

0 commit comments

Comments
 (0)