Skip to content

Commit 7de7b5e

Browse files
authored
Merge pull request #1539 from dvandra/OVS_SFlow_Metric
Adding OVS-SFlow Metric to Metadata
2 parents 34005d6 + fd9e81f commit 7de7b5e

File tree

16 files changed

+1073
-48
lines changed

16 files changed

+1073
-48
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ EASYJSON_FILES_TAG=\
5252
flow/storage/elasticsearch/elasticsearch.go \
5353
flow/storage/orientdb/orientdb.go \
5454
graffiti/graph/elasticsearch.go \
55+
sflow/sflowmetric.go \
5556
topology/metrics.go \
5657
topology/probes/netlink/route.go \
5758
topology/probes/netlink/neighbor.go

analyzer/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"github.com/skydive-project/skydive/logging"
4848
"github.com/skydive-project/skydive/packetinjector"
4949
"github.com/skydive-project/skydive/probe"
50+
"github.com/skydive-project/skydive/sflow"
5051
"github.com/skydive-project/skydive/topology"
5152
usertopology "github.com/skydive-project/skydive/topology/enhancers"
5253
"github.com/skydive-project/skydive/topology/probes/netlink"
@@ -223,6 +224,7 @@ func NewServerFromConfig() (*Server, error) {
223224
uiServer.AddGlobalVar("ui", config.Get("ui"))
224225
uiServer.AddGlobalVar("flow-metric-keys", (&flow.FlowMetric{}).GetFieldKeys())
225226
uiServer.AddGlobalVar("interface-metric-keys", (&topology.InterfaceMetric{}).GetFieldKeys())
227+
uiServer.AddGlobalVar("sflow-metric-keys", (&sflow.SFMetric{}).GetFieldKeys())
226228
uiServer.AddGlobalVar("probes", config.Get("analyzer.topology.probes"))
227229

228230
persistent, err := newGraphBackendFromConfig(etcdClient)
@@ -383,4 +385,5 @@ func init() {
383385
graph.NodeMetadataDecoders["Neighbors"] = netlink.NeighborMetadataDecoder
384386
graph.NodeMetadataDecoders["Metric"] = topology.InterfaceMetricMetadataDecoder
385387
graph.NodeMetadataDecoders["LastUpdateMetric"] = topology.InterfaceMetricMetadataDecoder
388+
graph.NodeMetadataDecoders["SFlow"] = sflow.SFMetadataDecoder
386389
}

api/client/gremlin.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/skydive-project/skydive/graffiti/graph"
3636
"github.com/skydive-project/skydive/gremlin"
3737
shttp "github.com/skydive-project/skydive/http"
38+
"github.com/skydive-project/skydive/sflow"
3839
"github.com/skydive-project/skydive/topology"
3940
"github.com/skydive-project/skydive/topology/probes/socketinfo"
4041
)
@@ -178,6 +179,25 @@ func (g *GremlinQueryHelper) GetInterfaceMetrics(query interface{}) (map[string]
178179
return result[0], nil
179180
}
180181

182+
// GetSFlowMetrics from Gremlin query
183+
func (g *GremlinQueryHelper) GetSFlowMetrics(query interface{}) (map[string][]*sflow.SFMetric, error) {
184+
data, err := g.Query(query)
185+
if err != nil {
186+
return nil, err
187+
}
188+
189+
var result []map[string][]*sflow.SFMetric
190+
if err := json.Unmarshal(data, &result); err != nil {
191+
return nil, err
192+
}
193+
194+
if len(result) == 0 {
195+
return nil, nil
196+
}
197+
198+
return result[0], nil
199+
}
200+
181201
// GetFlowMetrics from Gremlin query
182202
func (g *GremlinQueryHelper) GetFlowMetrics(query interface{}) (map[string][]*flow.FlowMetric, error) {
183203
data, err := g.Query(query)
@@ -227,6 +247,21 @@ func (g *GremlinQueryHelper) GetInterfaceMetric(query interface{}) (*topology.In
227247
return &result, nil
228248
}
229249

250+
// GetSFlowMetric from Gremlin query
251+
func (g *GremlinQueryHelper) GetSFlowMetric(query interface{}) (*sflow.SFMetric, error) {
252+
data, err := g.Query(query)
253+
if err != nil {
254+
return nil, err
255+
}
256+
257+
var result sflow.SFMetric
258+
if err := json.Unmarshal(data, &result); err != nil {
259+
return nil, err
260+
}
261+
262+
return &result, nil
263+
}
264+
230265
// GetSockets from the Gremlin query
231266
func (g *GremlinQueryHelper) GetSockets(query interface{}) (sockets map[string][]*socketinfo.ConnectionInfo, err error) {
232267
data, err := g.Query(query)

flow/probes/ovssflow.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type OvsSFlowProbesHandler struct {
5656
probes map[string]OvsSFlowProbe
5757
probesLock common.RWMutex
5858
Graph *graph.Graph
59+
Node *graph.Node
5960
fpta *FlowProbeTableAllocator
6061
ovsClient *ovsdb.OvsClient
6162
allocator *sflow.AgentAllocator
@@ -178,7 +179,7 @@ func (o *OvsSFlowProbesHandler) UnregisterSFlowProbeFromBridge(bridgeUUID string
178179
}
179180

180181
// RegisterProbeOnBridge registers a new probe on the OVS bridge
181-
func (o *OvsSFlowProbesHandler) RegisterProbeOnBridge(bridgeUUID string, tid string, capture *types.Capture) error {
182+
func (o *OvsSFlowProbesHandler) RegisterProbeOnBridge(bridgeUUID string, tid string, capture *types.Capture, n *graph.Node) error {
182183
headerSize := flow.DefaultCaptureLength
183184
if capture.HeaderSize != 0 {
184185
headerSize = uint32(capture.HeaderSize)
@@ -192,7 +193,7 @@ func (o *OvsSFlowProbesHandler) RegisterProbeOnBridge(bridgeUUID string, tid str
192193
Interface: "lo",
193194
HeaderSize: headerSize,
194195
Sampling: 1,
195-
Polling: 0,
196+
Polling: 10,
196197
flowTable: ft,
197198
}
198199

@@ -202,7 +203,7 @@ func (o *OvsSFlowProbesHandler) RegisterProbeOnBridge(bridgeUUID string, tid str
202203
}
203204

204205
addr := common.ServiceAddress{Addr: address, Port: 0}
205-
agent, err := o.allocator.Alloc(bridgeUUID, probe.flowTable, capture.BPFFilter, headerSize, &addr)
206+
agent, err := o.allocator.Alloc(bridgeUUID, probe.flowTable, capture.BPFFilter, headerSize, &addr, n, o.Graph)
206207
if err != nil && err != sflow.ErrAgentAlreadyAllocated {
207208
return err
208209
}
@@ -231,7 +232,7 @@ func (o *OvsSFlowProbesHandler) registerProbe(n *graph.Node, capture *types.Capt
231232

232233
if isOvsBridge(n) {
233234
if uuid, _ := n.GetFieldString("UUID"); uuid != "" {
234-
if err := o.RegisterProbeOnBridge(uuid, tid, capture); err != nil {
235+
if err := o.RegisterProbeOnBridge(uuid, tid, capture, n); err != nil {
235236
return err
236237
}
237238
go e.OnStarted()

flow/probes/sflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (d *SFlowProbesHandler) registerProbe(n *graph.Node, capture *types.Capture
107107
ft := d.fpta.Alloc(tid, opts)
108108

109109
addr := common.ServiceAddress{Addr: address, Port: capture.Port}
110-
if _, err := d.allocator.Alloc(tid, ft, capture.BPFFilter, headerSize, &addr); err != nil {
110+
if _, err := d.allocator.Alloc(tid, ft, capture.BPFFilter, headerSize, &addr, n, d.Graph); err != nil {
111111
return err
112112
}
113113

gremlin/query.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,8 @@ func (q QueryString) BothV() QueryString {
188188
}
189189

190190
// Metrics append a Metrics() operation to query
191-
func (q QueryString) Metrics() QueryString {
192-
return q.newQueryString("Metrics")
191+
func (q QueryString) Metrics(key ...interface{}) QueryString {
192+
return q.newQueryString("Metrics", key...)
193193
}
194194

195195
// Sum append a Sum() operation to query

gremlin/traversal/metrics.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type MetricsTraversalExtension struct {
4343
// MetricsGremlinTraversalStep describes the Metrics gremlin traversal step
4444
type MetricsGremlinTraversalStep struct {
4545
traversal.GremlinTraversalContext
46+
key string
4647
}
4748

4849
// NewMetricsTraversalExtension returns a new graph traversal extension
@@ -65,16 +66,38 @@ func (e *MetricsTraversalExtension) ScanIdent(s string) (traversal.Token, bool)
6566
func (e *MetricsTraversalExtension) ParseStep(t traversal.Token, p traversal.GremlinTraversalContext) (traversal.GremlinTraversalStep, error) {
6667
switch t {
6768
case e.MetricsToken:
68-
return &MetricsGremlinTraversalStep{GremlinTraversalContext: p}, nil
69+
default:
70+
return nil, nil
6971
}
70-
return nil, nil
72+
var key string
73+
switch len(p.Params) {
74+
case 0:
75+
key = "LastUpdateMetric"
76+
case 1:
77+
k, ok := p.Params[0].(string)
78+
if !ok {
79+
return nil, errors.New("Metrics parameter have to be a string")
80+
}
81+
switch k {
82+
case "LastUpdateMetric":
83+
key = k
84+
case "SFlow.LastUpdateMetric", "SFlow":
85+
key = "SFlow.LastUpdateMetric"
86+
default:
87+
return nil, fmt.Errorf("Metric field unknown : %v", p.Params)
88+
}
89+
default:
90+
return nil, fmt.Errorf("Metrics accepts one parameter : %v", p.Params)
91+
}
92+
93+
return &MetricsGremlinTraversalStep{GremlinTraversalContext: p, key: key}, nil
7194
}
7295

7396
// Exec executes the metrics step
7497
func (s *MetricsGremlinTraversalStep) Exec(last traversal.GraphTraversalStep) (traversal.GraphTraversalStep, error) {
7598
switch tv := last.(type) {
7699
case *traversal.GraphTraversalV:
77-
return InterfaceMetrics(s.StepContext, tv), nil
100+
return InterfaceMetrics(s.StepContext, tv, s.key), nil
78101
case *FlowTraversalStep:
79102
return tv.FlowMetrics(s.StepContext), nil
80103
}

gremlin/traversal/topology.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,19 @@ import (
2929
"github.com/skydive-project/skydive/common"
3030
"github.com/skydive-project/skydive/graffiti/graph"
3131
"github.com/skydive-project/skydive/graffiti/graph/traversal"
32-
"github.com/skydive-project/skydive/topology"
3332
"github.com/skydive-project/skydive/topology/probes/socketinfo"
3433
)
3534

3635
// InterfaceMetrics returns a Metrics step from interface metric metadata
37-
func InterfaceMetrics(ctx traversal.StepContext, tv *traversal.GraphTraversalV) *MetricsTraversalStep {
36+
func InterfaceMetrics(ctx traversal.StepContext, tv *traversal.GraphTraversalV, key string) *MetricsTraversalStep {
3837
if tv.Error() != nil {
3938
return NewMetricsTraversalStepFromError(tv.Error())
4039
}
4140

42-
tv = tv.Dedup(ctx, "ID", "LastUpdateMetric.Start").Sort(ctx, common.SortAscending, "LastUpdateMetric.Start")
41+
startField := key + ".Start"
42+
43+
tv = tv.Dedup(ctx, "ID", startField).Sort(ctx, common.SortAscending, startField)
44+
4345
if tv.Error() != nil {
4446
return NewMetricsTraversalStepFromError(tv.Error())
4547
}
@@ -57,18 +59,18 @@ nodeloop:
5759
break nodeloop
5860
}
5961

60-
m, _ := n.GetField("LastUpdateMetric")
62+
m, _ := n.GetField(key)
6163
if m == nil {
6264
continue
6365
}
6466

65-
lastMetric, ok := m.(*topology.InterfaceMetric)
67+
lastmetric, ok := m.(common.Metric)
6668
if !ok {
6769
return NewMetricsTraversalStepFromError(errors.New("wrong interface metric type"))
6870
}
6971

70-
if gslice == nil || (lastMetric.Start > gslice.Start && lastMetric.Last < gslice.Last) && it.Next() {
71-
metrics[string(n.ID)] = append(metrics[string(n.ID)], lastMetric)
72+
if gslice == nil || (lastmetric.GetStart() > gslice.Start && lastmetric.GetLast() < gslice.Last) && it.Next() {
73+
metrics[string(n.ID)] = append(metrics[string(n.ID)], lastmetric)
7274
}
7375
}
7476

0 commit comments

Comments
 (0)