Skip to content

Commit af9680a

Browse files
Merge pull request #310 from ava-labs/timeout-dont-include-get
dont include Get messages in request latency calculation
2 parents 86885b8 + ed1b891 commit af9680a

File tree

7 files changed

+39
-104
lines changed

7 files changed

+39
-104
lines changed

snow/networking/router/chain_router.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (cr *ChainRouter) RegisterRequest(
161161
cr.log.Error("expected message type to be one of GetMsg, PullQueryMsg, PushQueryMsg, GetAcceptedFrontierMsg, GetAcceptedMsg but got %s", msgType)
162162
return
163163
}
164-
cr.timeoutManager.RegisterRequest(validatorID, chainID, uniqueRequestID, timeoutHandler)
164+
cr.timeoutManager.RegisterRequest(validatorID, chainID, msgType, uniqueRequestID, timeoutHandler)
165165
}
166166

167167
// Shutdown shuts down this router

snow/networking/timeout/manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ func (m *Manager) RegisterChain(ctx *snow.Context, namespace string) error {
6262
func (m *Manager) RegisterRequest(
6363
validatorID ids.ShortID,
6464
chainID ids.ID,
65+
msgType constants.MsgType,
6566
uniqueRequestID ids.ID,
6667
timeoutHandler func(),
6768
) (time.Time, bool) {
@@ -70,7 +71,7 @@ func (m *Manager) RegisterRequest(
7071
m.benchlistMgr.RegisterFailure(chainID, validatorID)
7172
timeoutHandler()
7273
}
73-
return m.tm.Put(uniqueRequestID, newTimeoutHandler), true
74+
return m.tm.Put(uniqueRequestID, msgType, newTimeoutHandler), true
7475
}
7576

7677
// RegisterResponse registers that we received a response from [validatorID]

snow/networking/timeout/manager_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestManagerFire(t *testing.T) {
3636
wg := sync.WaitGroup{}
3737
wg.Add(1)
3838

39-
manager.RegisterRequest(ids.ShortID{}, ids.ID{}, ids.GenerateTestID(), wg.Done)
39+
manager.RegisterRequest(ids.ShortID{}, ids.ID{}, constants.PullQueryMsg, ids.GenerateTestID(), wg.Done)
4040

4141
wg.Wait()
4242
}
@@ -64,11 +64,11 @@ func TestManagerCancel(t *testing.T) {
6464
fired := new(bool)
6565

6666
id := ids.GenerateTestID()
67-
manager.RegisterRequest(ids.ShortID{}, ids.ID{}, id, func() { *fired = true })
67+
manager.RegisterRequest(ids.ShortID{}, ids.ID{}, constants.PullQueryMsg, id, func() { *fired = true })
6868

6969
manager.RegisterResponse(ids.ShortID{}, ids.ID{}, id, constants.GetMsg, 1*time.Second)
7070

71-
manager.RegisterRequest(ids.ShortID{}, ids.ID{}, ids.GenerateTestID(), wg.Done)
71+
manager.RegisterRequest(ids.ShortID{}, ids.ID{}, constants.PullQueryMsg, ids.GenerateTestID(), wg.Done)
7272

7373
wg.Wait()
7474

utils/timer/adaptive_timeout_manager.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/prometheus/client_golang/prometheus"
1414

1515
"github.com/ava-labs/avalanchego/ids"
16+
"github.com/ava-labs/avalanchego/utils/constants"
1617
"github.com/ava-labs/avalanchego/utils/math"
1718
"github.com/ava-labs/avalanchego/utils/wrappers"
1819
)
@@ -22,11 +23,12 @@ var (
2223
)
2324

2425
type adaptiveTimeout struct {
25-
index int // Index in the wait queue
26-
id ids.ID // Unique ID of this timeout
27-
handler func() // Function to execute if timed out
28-
duration time.Duration // How long this timeout was set for
29-
deadline time.Time // When this timeout should be fired
26+
index int // Index in the wait queue
27+
id ids.ID // Unique ID of this timeout
28+
handler func() // Function to execute if timed out
29+
duration time.Duration // How long this timeout was set for
30+
deadline time.Time // When this timeout should be fired
31+
msgType constants.MsgType // Type of this outstanding request
3032
}
3133

3234
// A timeoutQueue implements heap.Interface and holds adaptiveTimeouts.
@@ -78,6 +80,7 @@ type AdaptiveTimeoutManager struct {
7880
// Tells the time. Can be faked for testing.
7981
clock Clock
8082
networkTimeoutMetric, avgLatency prometheus.Gauge
83+
numTimeouts prometheus.Counter
8184
// Averages the response time from all peers
8285
averager math.Averager
8386
// Timeout is [timeoutCoefficient] * average response time
@@ -103,6 +106,11 @@ func (tm *AdaptiveTimeoutManager) Initialize(config *AdaptiveTimeoutConfig) erro
103106
Name: "avg_network_latency",
104107
Help: "Average network latency in nanoseconds",
105108
})
109+
tm.numTimeouts = prometheus.NewCounter(prometheus.CounterOpts{
110+
Namespace: config.MetricsNamespace,
111+
Name: "request_timeouts",
112+
Help: "Number of timed out requests",
113+
})
106114

107115
switch {
108116
case config.InitialTimeout > config.MaximumTimeout:
@@ -126,6 +134,7 @@ func (tm *AdaptiveTimeoutManager) Initialize(config *AdaptiveTimeoutConfig) erro
126134
errs := &wrappers.Errs{}
127135
errs.Add(config.Registerer.Register(tm.networkTimeoutMetric))
128136
errs.Add(config.Registerer.Register(tm.avgLatency))
137+
errs.Add(config.Registerer.Register(tm.numTimeouts))
129138
return errs.Err
130139
}
131140

@@ -145,14 +154,14 @@ func (tm *AdaptiveTimeoutManager) Stop() { tm.timer.Stop() }
145154
// Put registers a timeout for [id]. If the timeout occurs, [timeoutHandler] is called.
146155
// Returns the time at which the timeout will fire if it is not first
147156
// removed by calling [tm.Remove].
148-
func (tm *AdaptiveTimeoutManager) Put(id ids.ID, timeoutHandler func()) time.Time {
157+
func (tm *AdaptiveTimeoutManager) Put(id ids.ID, msgType constants.MsgType, timeoutHandler func()) time.Time {
149158
tm.lock.Lock()
150159
defer tm.lock.Unlock()
151-
return tm.put(id, timeoutHandler)
160+
return tm.put(id, msgType, timeoutHandler)
152161
}
153162

154163
// Assumes [tm.lock] is held
155-
func (tm *AdaptiveTimeoutManager) put(id ids.ID, handler func()) time.Time {
164+
func (tm *AdaptiveTimeoutManager) put(id ids.ID, msgType constants.MsgType, handler func()) time.Time {
156165
currentTime := tm.clock.Time()
157166
tm.remove(id, currentTime)
158167

@@ -161,6 +170,7 @@ func (tm *AdaptiveTimeoutManager) put(id ids.ID, handler func()) time.Time {
161170
handler: handler,
162171
duration: tm.currentTimeout,
163172
deadline: currentTime.Add(tm.currentTimeout),
173+
msgType: msgType,
164174
}
165175
tm.timeoutMap[id] = timeout
166176
heap.Push(&tm.timeoutQueue, timeout)
@@ -185,9 +195,14 @@ func (tm *AdaptiveTimeoutManager) remove(id ids.ID, now time.Time) {
185195
}
186196

187197
// Observe the response time to update average network response time
188-
timeoutRegisteredAt := timeout.deadline.Add(-1 * timeout.duration)
189-
latency := now.Sub(timeoutRegisteredAt)
190-
tm.observeLatencyAndUpdateTimeout(latency, now)
198+
// Don't include Get requests in calculation, since an adversary
199+
// can cause you to issue a Get request and then cause it to timeout,
200+
// increasing your timeout.
201+
if timeout.msgType != constants.GetMsg {
202+
timeoutRegisteredAt := timeout.deadline.Add(-1 * timeout.duration)
203+
latency := now.Sub(timeoutRegisteredAt)
204+
tm.observeLatencyAndUpdateTimeout(latency, now)
205+
}
191206

192207
// Remove the timeout from the map
193208
delete(tm.timeoutMap, id)
@@ -213,6 +228,7 @@ func (tm *AdaptiveTimeoutManager) timeout() {
213228
if timeoutHandler == nil {
214229
break
215230
}
231+
tm.numTimeouts.Inc()
216232

217233
// Don't execute a callback with a lock held
218234
tm.lock.Unlock()

utils/timer/adaptive_timeout_manager_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ func TestAdaptiveTimeoutManager(t *testing.T) {
135135
timeoutZeroCalled := utils.AtomicBool{}
136136
tm.Put(
137137
id0,
138+
constants.PullQueryMsg,
138139
func() { timeoutZeroCalled.SetValue(true) },
139140
)
140141

@@ -171,6 +172,7 @@ func TestAdaptiveTimeoutManager(t *testing.T) {
171172
// This should overwrite the first Put for id0
172173
tm.Put(
173174
id0,
175+
constants.PullQueryMsg,
174176
func() { wg.Done() },
175177
)
176178

@@ -208,10 +210,12 @@ func TestAdaptiveTimeoutManager(t *testing.T) {
208210
wg.Add(2)
209211
tm.Put(
210212
id1,
213+
constants.PullQueryMsg,
211214
func() { wg.Done() },
212215
)
213216
tm.Put(
214217
id2,
218+
constants.PullQueryMsg,
215219
func() { wg.Done() },
216220
)
217221

@@ -270,14 +274,14 @@ func TestAdaptiveTimeoutManager2(t *testing.T) {
270274

271275
numSuccessful--
272276
if numSuccessful > 0 {
273-
tm.Put(ids.ID{byte(numSuccessful)}, *callback)
277+
tm.Put(ids.ID{byte(numSuccessful)}, constants.PullQueryMsg, *callback)
274278
}
275279
if numSuccessful >= 0 {
276280
wg.Done()
277281
}
278282
if numSuccessful%2 == 0 {
279283
tm.Remove(ids.ID{byte(numSuccessful)})
280-
tm.Put(ids.ID{byte(numSuccessful)}, *callback)
284+
tm.Put(ids.ID{byte(numSuccessful)}, constants.PullQueryMsg, *callback)
281285
}
282286
}
283287
(*callback)()

utils/timer/executor.go

Lines changed: 0 additions & 64 deletions
This file was deleted.

utils/timer/executor_test.go

Lines changed: 0 additions & 22 deletions
This file was deleted.

0 commit comments

Comments
 (0)