Skip to content

Commit cfc39c2

Browse files
committed
sflow: fix metrics not having start set and protect aggregates
1 parent c73a0ea commit cfc39c2

File tree

3 files changed

+26
-22
lines changed

3 files changed

+26
-22
lines changed

gremlin/traversal/metrics.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333

3434
const (
3535
defaultAggregatesSliceLength = int64(30000) // 30 seconds
36+
aggregatesMaxSlices = 10000
3637
)
3738

3839
// MetricsTraversalExtension describes a new extension to enhance the topology
@@ -268,6 +269,10 @@ func (m *MetricsTraversalStep) Aggregates(ctx traversal.StepContext, s ...interf
268269
steps++
269270
}
270271

272+
if steps > aggregatesMaxSlices {
273+
return NewMetricsTraversalStepFromError(fmt.Errorf("Aggregates available slices exceeded: %d/%d", steps, aggregatesMaxSlices))
274+
}
275+
271276
aggregated := make([]common.Metric, steps, steps)
272277
for _, metrics := range m.metrics {
273278
aggregateMetrics(metrics, start, last, sliceLength, aggregated)

sflow/agent.go

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func (sfa *Agent) feedFlowTable() {
109109
}
110110

111111
// SFlow Counter Samples
112-
var Countersamples []layers.SFlowCounterSample
112+
var counterSamples []layers.SFlowCounterSample
113113
var gen layers.SFlowGenericInterfaceCounters
114114
var ovsdp layers.SFlowOVSDPCounters
115115
var app layers.SFlowAppresourcesCounters
@@ -244,12 +244,9 @@ func (sfa *Agent) feedFlowTable() {
244244

245245
}
246246
}
247-
Countersamples = append(Countersamples, counter)
247+
counterSamples = append(counterSamples, counter)
248248
}
249249

250-
sfa.Graph.Lock()
251-
tr := sfa.Graph.StartMetadataTransaction(sfa.Node)
252-
253250
currMetric := &SFMetric{
254251
IfInOctets: int64(gen.IfInOctets),
255252
IfInUcastPkts: int64(gen.IfInUcastPkts),
@@ -282,37 +279,36 @@ func (sfa *Agent) feedFlowTable() {
282279
VlanBroadcastPkts: int64(vlan.BroadcastPkts),
283280
VlanDiscards: int64(vlan.Discards),
284281
}
285-
now := int64(common.UnixMillis(time.Now()))
286282

283+
if currMetric.IsZero() {
284+
continue
285+
}
286+
287+
now := int64(common.UnixMillis(time.Now()))
287288
currMetric.Last = now
288289

289-
var prevMetric, lastUpdateMetric, totalMetric *SFMetric
290+
sfa.Graph.Lock()
290291

292+
var totalMetric *SFMetric
291293
if metric, err := sfa.Node.GetField("SFlow.Metric"); err == nil {
292-
prevMetric = metric.(*SFMetric)
293-
lastUpdateMetric = currMetric
294-
totalMetric = currMetric.Add(prevMetric).(*SFMetric)
294+
prevMetric := metric.(*SFMetric)
295+
currMetric.Start = prevMetric.Last
296+
297+
totalMetric = prevMetric.Add(currMetric).(*SFMetric)
298+
totalMetric.Start = prevMetric.Start
295299
totalMetric.Last = now
296300
} else {
301+
currMetric.Start = now
297302
totalMetric = currMetric
298303
}
299304

300-
// nothing changed since last update
301-
if lastUpdateMetric != nil && !lastUpdateMetric.IsZero() {
302-
lastUpdateMetric.Start = prevMetric.Last
303-
lastUpdateMetric.Last = now
304-
} else {
305-
lastUpdateMetric = currMetric
306-
}
307-
308305
sfl := &SFlow{
309-
Counters: Countersamples,
306+
Counters: counterSamples,
310307
Metric: totalMetric,
311-
LastUpdateMetric: lastUpdateMetric,
308+
LastUpdateMetric: currMetric,
312309
}
313310

314-
tr.AddMetadata("SFlow", sfl)
315-
tr.Commit()
311+
sfa.Graph.AddMetadata(sfa.Node, "SFlow", sfl)
316312
sfa.Graph.Unlock()
317313
}
318314
}

tests/tests.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,9 @@ func RunTest(t *testing.T, test *Test) {
513513
t.Fatalf("Failed to setup captures: %s", err)
514514
}
515515

516+
// wait a bit after the capture creation
517+
time.Sleep(2 * time.Second)
518+
516519
retries := test.retries
517520
if retries <= 0 {
518521
retries = 30

0 commit comments

Comments
 (0)