Skip to content

Commit fd9e81f

Browse files
committed
Catching SFlow Counter Samples into Metadata
1 parent bdcfba4 commit fd9e81f

File tree

3 files changed

+113
-38
lines changed

3 files changed

+113
-38
lines changed

sflow/agent.go

Lines changed: 102 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -109,14 +109,23 @@ func (sfa *Agent) feedFlowTable() {
109109
sfa.FlowTable.FeedWithSFlowSample(&sample, bpf)
110110
}
111111

112+
// SFlow Counter Samples
113+
var Countersamples []layers.SFlowCounterSample
112114
var gen layers.SFlowGenericInterfaceCounters
113115
var ovsdp layers.SFlowOVSDPCounters
114116
var app layers.SFlowAppresourcesCounters
115117
var vlan layers.SFlowVLANCounters
116118

117119
for _, sample := range sflowPacket.CounterSamples {
118120
records := sample.GetRecords()
119-
121+
var counter layers.SFlowCounterSample
122+
counter.EnterpriseID = sample.EnterpriseID
123+
counter.Format = sample.Format
124+
counter.SampleLength = sample.SampleLength
125+
counter.SourceIDClass = sample.SourceIDClass
126+
counter.SourceIDIndex = sample.SourceIDIndex
127+
counter.RecordCount = sample.RecordCount
128+
counter.SequenceNumber = sample.SequenceNumber
120129
maxuint64 := func(key uint64) uint64 {
121130
if key == math.MaxUint64 {
122131
key = 0
@@ -134,48 +143,109 @@ func (sfa *Agent) feedFlowTable() {
134143
switch record.(type) {
135144
case layers.SFlowGenericInterfaceCounters:
136145
gen1 := record.(layers.SFlowGenericInterfaceCounters)
137-
gen.IfInOctets += maxuint64(gen1.IfInOctets)
138-
gen.IfInUcastPkts += maxuint32(gen1.IfInUcastPkts)
139-
gen.IfInMulticastPkts += maxuint32(gen1.IfInMulticastPkts)
140-
gen.IfInBroadcastPkts += maxuint32(gen1.IfInBroadcastPkts)
141-
gen.IfInDiscards += maxuint32(gen1.IfInDiscards)
142-
gen.IfInErrors += maxuint32(gen1.IfInErrors)
143-
gen.IfInUnknownProtos += maxuint32(gen1.IfInUnknownProtos)
144-
gen.IfOutOctets += maxuint64(gen1.IfOutOctets)
145-
gen.IfOutUcastPkts += maxuint32(gen1.IfOutUcastPkts)
146-
gen.IfOutMulticastPkts += maxuint32(gen1.IfOutMulticastPkts)
147-
gen.IfOutBroadcastPkts += maxuint32(gen1.IfOutBroadcastPkts)
148-
gen.IfOutDiscards += maxuint32(gen1.IfOutDiscards)
149-
gen.IfOutErrors += maxuint32(gen1.IfOutErrors)
146+
gen1.IfInOctets = maxuint64(gen1.IfInOctets)
147+
gen.IfInOctets += gen1.IfInOctets
148+
gen1.IfInUcastPkts = maxuint32(gen1.IfInUcastPkts)
149+
gen.IfInUcastPkts += gen1.IfInUcastPkts
150+
gen1.IfInMulticastPkts = maxuint32(gen1.IfInMulticastPkts)
151+
gen.IfInMulticastPkts += gen1.IfInMulticastPkts
152+
gen1.IfInBroadcastPkts = maxuint32(gen1.IfInBroadcastPkts)
153+
gen.IfInBroadcastPkts += gen1.IfInBroadcastPkts
154+
gen1.IfInDiscards = maxuint32(gen1.IfInDiscards)
155+
gen.IfInDiscards += gen1.IfInDiscards
156+
gen1.IfInErrors = maxuint32(gen1.IfInErrors)
157+
gen.IfInErrors += gen1.IfInErrors
158+
gen1.IfInUnknownProtos = maxuint32(gen1.IfInUnknownProtos)
159+
gen.IfInUnknownProtos += gen1.IfInUnknownProtos
160+
gen1.IfOutOctets = maxuint64(gen1.IfOutOctets)
161+
gen.IfOutOctets += gen1.IfOutOctets
162+
gen1.IfOutUcastPkts = maxuint32(gen1.IfOutUcastPkts)
163+
gen.IfOutUcastPkts += gen1.IfOutUcastPkts
164+
gen1.IfOutMulticastPkts = maxuint32(gen1.IfOutMulticastPkts)
165+
gen.IfOutMulticastPkts += gen1.IfOutMulticastPkts
166+
gen1.IfOutBroadcastPkts = maxuint32(gen1.IfOutBroadcastPkts)
167+
gen.IfOutBroadcastPkts += gen1.IfOutBroadcastPkts
168+
gen1.IfOutDiscards = maxuint32(gen1.IfOutDiscards)
169+
gen.IfOutDiscards += gen1.IfOutDiscards
170+
gen1.IfOutErrors = maxuint32(gen1.IfOutErrors)
171+
gen.IfOutErrors += gen1.IfOutErrors
172+
counter.Records = append(counter.Records, gen1)
150173

151174
case layers.SFlowOVSDPCounters:
152175
ovsdp1 := record.(layers.SFlowOVSDPCounters)
153-
ovsdp.NHit += maxuint32(ovsdp1.NHit)
154-
ovsdp.NMissed += maxuint32(ovsdp1.NMissed)
155-
ovsdp.NLost += maxuint32(ovsdp1.NLost)
156-
ovsdp.NMaskHit += maxuint32(ovsdp1.NMaskHit)
157-
ovsdp.NFlows += maxuint32(ovsdp1.NFlows)
158-
ovsdp.NMasks += maxuint32(ovsdp1.NMasks)
176+
ovsdp1.NHit = maxuint32(ovsdp1.NHit)
177+
ovsdp.NHit += ovsdp1.NHit
178+
ovsdp1.NMissed = maxuint32(ovsdp1.NMissed)
179+
ovsdp.NMissed += ovsdp1.NMissed
180+
ovsdp1.NLost = maxuint32(ovsdp1.NLost)
181+
ovsdp.NLost += ovsdp1.NLost
182+
ovsdp1.NMaskHit = maxuint32(ovsdp1.NMaskHit)
183+
ovsdp.NMaskHit += ovsdp1.NMaskHit
184+
ovsdp1.NFlows = maxuint32(ovsdp1.NFlows)
185+
ovsdp.NFlows += ovsdp1.NFlows
186+
ovsdp1.NMasks = maxuint32(ovsdp1.NMasks)
187+
ovsdp.NMasks += ovsdp1.NMasks
188+
counter.Records = append(counter.Records, ovsdp1)
159189

160190
case layers.SFlowAppresourcesCounters:
161191
app1 := record.(layers.SFlowAppresourcesCounters)
162-
app.FdOpen += maxuint32(app1.FdOpen)
163-
app.FdMax += maxuint32(app1.FdMax)
164-
app.ConnOpen += maxuint32(app1.ConnOpen)
165-
app.ConnMax += maxuint32(app1.ConnMax)
166-
app.MemUsed += maxuint64(app1.MemUsed)
167-
app.MemMax += maxuint64(app1.MemMax)
192+
app1.FdOpen = maxuint32(app1.FdOpen)
193+
app.FdOpen += app1.FdOpen
194+
app1.FdMax = maxuint32(app1.FdMax)
195+
app.FdMax += app1.FdMax
196+
app1.ConnOpen = maxuint32(app1.ConnOpen)
197+
app.ConnOpen += app1.ConnOpen
198+
app1.ConnMax = maxuint32(app1.ConnMax)
199+
app.ConnMax += app1.ConnMax
200+
app1.MemUsed = maxuint64(app1.MemUsed)
201+
app.MemUsed += app1.MemUsed
202+
app1.MemMax = maxuint64(app1.MemMax)
203+
app.MemMax += app1.MemMax
204+
counter.Records = append(counter.Records, app1)
168205

169206
case layers.SFlowVLANCounters:
170207
vlan1 := record.(layers.SFlowVLANCounters)
171-
vlan.Octets += maxuint64(vlan1.Octets)
172-
vlan.UcastPkts += maxuint32(vlan1.UcastPkts)
173-
vlan.MulticastPkts += maxuint32(vlan1.MulticastPkts)
174-
vlan.BroadcastPkts += maxuint32(vlan1.BroadcastPkts)
175-
vlan.Discards += maxuint32(vlan1.Discards)
208+
vlan1.Octets = maxuint64(vlan1.Octets)
209+
vlan.Octets += vlan1.Octets
210+
vlan1.UcastPkts = maxuint32(vlan1.UcastPkts)
211+
vlan.UcastPkts += vlan1.UcastPkts
212+
vlan1.MulticastPkts = maxuint32(vlan1.MulticastPkts)
213+
vlan.MulticastPkts += vlan1.MulticastPkts
214+
vlan1.BroadcastPkts = maxuint32(vlan1.BroadcastPkts)
215+
vlan.BroadcastPkts += vlan1.BroadcastPkts
216+
vlan1.Discards = maxuint32(vlan1.Discards)
217+
vlan.Discards += vlan1.Discards
218+
counter.Records = append(counter.Records, vlan1)
219+
220+
case layers.SFlowOpenflowPortCounters:
221+
ofpc1 := record.(layers.SFlowOpenflowPortCounters)
222+
ofpc1.DatapathID = maxuint64(ofpc1.DatapathID)
223+
ofpc1.PortNo = maxuint32(ofpc1.PortNo)
224+
counter.Records = append(counter.Records, ofpc1)
225+
226+
case layers.SFlowEthernetCounters:
227+
eth1 := record.(layers.SFlowEthernetCounters)
228+
eth1.AlignmentErrors = maxuint32(eth1.AlignmentErrors)
229+
eth1.FCSErrors = maxuint32(eth1.FCSErrors)
230+
eth1.SingleCollisionFrames = maxuint32(eth1.SingleCollisionFrames)
231+
eth1.MultipleCollisionFrames = maxuint32(eth1.MultipleCollisionFrames)
232+
eth1.SQETestErrors = maxuint32(eth1.SQETestErrors)
233+
eth1.DeferredTransmissions = maxuint32(eth1.DeferredTransmissions)
234+
eth1.LateCollisions = maxuint32(eth1.LateCollisions)
235+
eth1.ExcessiveCollisions = maxuint32(eth1.ExcessiveCollisions)
236+
eth1.InternalMacReceiveErrors = maxuint32(eth1.InternalMacReceiveErrors)
237+
eth1.InternalMacTransmitErrors = maxuint32(eth1.InternalMacTransmitErrors)
238+
eth1.CarrierSenseErrors = maxuint32(eth1.CarrierSenseErrors)
239+
eth1.FrameTooLongs = maxuint32(eth1.FrameTooLongs)
240+
eth1.SymbolErrors = maxuint32(eth1.SymbolErrors)
241+
counter.Records = append(counter.Records, eth1)
242+
243+
case layers.SFlowPORTNAME, layers.SFlowLACPCounters:
244+
counter.RecordCount--
176245

177246
}
178247
}
248+
Countersamples = append(Countersamples, counter)
179249
}
180250

181251
sfa.Graph.Lock()
@@ -237,6 +307,7 @@ func (sfa *Agent) feedFlowTable() {
237307
}
238308

239309
sfl := &SFlow{
310+
Counters: Countersamples,
240311
Metric: totalMetric,
241312
LastUpdateMetric: lastUpdateMetric,
242313
}

sflow/sflowmetric.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@ package sflow
2525
import (
2626
"encoding/json"
2727

28+
"github.com/google/gopacket/layers"
2829
"github.com/skydive-project/skydive/common"
2930
)
3031

3132
//SFlow all sflow information
3233
type SFlow struct {
33-
Metric *SFMetric `json:"Metric,omitempty"`
34-
LastUpdateMetric *SFMetric `json:"LastUpdateMetric,omitempty"`
34+
Counters []layers.SFlowCounterSample `json:"Counters,omitempty"`
35+
Metric *SFMetric `json:"Metric,omitempty"`
36+
LastUpdateMetric *SFMetric `json:"LastUpdateMetric,omitempty"`
3537
}
3638

3739
//SFMetadataDecoder implements a json message raw decoder
@@ -51,6 +53,8 @@ func (sf *SFlow) GetField(key string) (interface{}, error) {
5153
return sf.Metric, nil
5254
case "LastUpdateMetric":
5355
return sf.LastUpdateMetric, nil
56+
case "Counters":
57+
return sf.Counters, nil
5458
}
5559

5660
return nil, common.ErrFieldNotFound

tests/topology_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,13 +1055,13 @@ func TestSFlowMetric(t *testing.T) {
10551055
{"ip link set vm1-eth0 up", true},
10561056
{"ovs-vsctl add-port br-sfmt vm1-eth0", true},
10571057
{"ip netns exec vm1 ip link set eth0 up", true},
1058-
{"ip netns exec vm1 ip address add 192.168.0.1/24 dev eth0", true},
1058+
{"ip netns exec vm1 ip address add 192.168.0.11/24 dev eth0", true},
10591059
{"ip netns add vm2", true},
10601060
{"ip link add vm2-eth0 type veth peer name eth0 netns vm2", true},
10611061
{"ip link set vm2-eth0 up", true},
10621062
{"ovs-vsctl add-port br-sfmt vm2-eth0", true},
10631063
{"ip netns exec vm2 ip link set eth0 up", true},
1064-
{"ip netns exec vm2 ip address add 192.168.0.2/24 dev eth0", true},
1064+
{"ip netns exec vm2 ip address add 192.168.0.21/24 dev eth0", true},
10651065
},
10661066

10671067
injections: []TestInjection{{
@@ -1087,7 +1087,7 @@ func TestSFlowMetric(t *testing.T) {
10871087
mode: OneShot,
10881088

10891089
checks: []CheckFunction{func(c *CheckContext) error {
1090-
sfmetrics, err := c.gh.GetSFlowMetrics(c.gremlin.V().Metrics("SFlow").Aggregates())
1090+
sfmetrics, err := c.gh.GetSFlowMetrics(c.gremlin.V().Metrics("SFlow.LastUpdateMetric").Aggregates())
10911091
if err != nil {
10921092
return err
10931093
}
@@ -1118,9 +1118,9 @@ func TestSFlowMetric(t *testing.T) {
11181118
return fmt.Errorf("Expected at least IfInUcastPkts, got %d", totalInUc)
11191119
}
11201120

1121-
m, err := c.gh.GetSFlowMetric(c.gremlin.V().Metrics("SFlow").Aggregates().Sum())
1121+
m, err := c.gh.GetSFlowMetric(c.gremlin.V().Metrics("SFlow.LastUpdateMetric").Aggregates().Sum())
11221122
if err != nil {
1123-
return fmt.Errorf("Could not find metrics with: %s", "c.gremlin.V().Metrics('SFlow').Aggregates().Sum()")
1123+
return fmt.Errorf("Could not find metrics with: %s", "c.gremlin.V().Metrics('SFlow.LastUpdateMetric').Aggregates().Sum()")
11241124
}
11251125

11261126
if inUc, _ := m.GetFieldInt64("IfInUcastPkts"); inUc != totalInUc {

0 commit comments

Comments
 (0)