Skip to content

Commit 1a67d0c

Browse files
authored
Merge pull request #1593 from safchain/finally-protobuf
Use protobuf between agent/analyzer as first level protocol
2 parents 2029c9b + ac37b81 commit 1a67d0c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+561
-454
lines changed

Makefile

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ endef
2929

3030
define PROTOC_GEN
3131
$(call VENDOR_RUN,${PROTOC_GEN_GOFAST_GITHUB})
32-
$(call VENDOR_RUN,${PROTOC_GEN_GO_GITHUB}) protoc -Ivendor -I. --plugin=${BUILD_TOOLS}/protoc-gen-gogofaster --gogofaster_out . $1
32+
$(call VENDOR_RUN,${PROTOC_GEN_GO_GITHUB}) protoc -Ivendor -I. --plugin=${BUILD_TOOLS}/protoc-gen-gogofaster --gogofaster_out $$GOPATH/src $1
3333
endef
3434

3535
VERSION?=$(shell $(VERSION_CMD))
@@ -211,8 +211,8 @@ debug.analyzer:
211211
%.pb.go: %.proto
212212
$(call PROTOC_GEN,$<)
213213

214-
flow/flow.pb.go: flow/flow.proto
215-
$(call PROTOC_GEN,$<)
214+
flow/flow.pb.go: flow/flow.proto filters/filters.proto
215+
$(call PROTOC_GEN,flow/flow.proto)
216216

217217
# always export flow.ParentUUID as we need to store this information to know
218218
# if it's a Outer or Inner packet.
@@ -232,10 +232,17 @@ flow/flow.pb.go: flow/flow.proto
232232
flow/layers/generated.proto: flow/layers/layers.go
233233
$(call VENDOR_RUN,${PROTEUS_GITHUB}) proteus proto -f $${GOPATH}/src -p github.com/skydive-project/skydive/flow/layers
234234
sed -e 's/^package .*;/package layers;/' -i $@
235+
sed -e 's/^option go_package = "layers"/option go_package = "github.com\/skydive-project\/skydive\/flow\/layers"/' -i $@
235236
sed -e 's/^message Layer/message /' -i $@
236237
sed -e 's/option (gogoproto.typedecl) = false;//' -i $@
237238
sed 's/\((gogoproto\.customname) = "\([^\"]*\)"\)/\1, (gogoproto.jsontag) = "\2,omitempty"/' -i $@
238239

240+
websocket/structmessage.pb.go: websocket/structmessage.proto
241+
$(call PROTOC_GEN,$<)
242+
243+
sed -e 's/type StructMessage struct {/type StructMessage struct { XXX_state structMessageState `json:"-"`/' -i websocket/structmessage.pb.go
244+
gofmt -s -w $@
245+
239246
.proto: govendor flow/layers/generated.pb.go flow/flow.pb.go filters/filters.pb.go websocket/structmessage.pb.go
240247

241248
.PHONY: .proto.clean

agent/agent.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import (
4545
"github.com/skydive-project/skydive/probe"
4646
"github.com/skydive-project/skydive/topology"
4747
"github.com/skydive-project/skydive/ui"
48+
"github.com/skydive-project/skydive/websocket"
4849
ws "github.com/skydive-project/skydive/websocket"
4950
)
5051

@@ -67,7 +68,7 @@ type Agent struct {
6768

6869
// NewAnalyzerStructClientPool creates a new http WebSocket client Pool
6970
// with authentification
70-
func NewAnalyzerStructClientPool(authOptions *shttp.AuthenticationOpts) (*ws.StructClientPool, error) {
71+
func NewAnalyzerStructClientPool(authOpts *shttp.AuthenticationOpts) (*ws.StructClientPool, error) {
7172
pool := ws.NewStructClientPool("AnalyzerClientPool")
7273

7374
addresses, err := config.GetAnalyzerServiceAddresses()
@@ -81,7 +82,8 @@ func NewAnalyzerStructClientPool(authOptions *shttp.AuthenticationOpts) (*ws.Str
8182
}
8283

8384
for _, sa := range addresses {
84-
c, err := config.NewWSClient(common.AgentService, config.GetURL("ws", sa.Addr, sa.Port, "/ws/agent/topology"), authOptions, nil)
85+
url := config.GetURL("ws", sa.Addr, sa.Port, "/ws/agent/topology")
86+
c, err := config.NewWSClient(common.AgentService, url, websocket.ClientOpts{AuthOpts: authOpts, Protocol: websocket.ProtobufProtocol})
8587
if err != nil {
8688
return nil, err
8789
}

analyzer/flow_client.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func (c *FlowClientWebSocketConn) Close() error {
110110

111111
// Connect to the WebSocket flow server
112112
func (c *FlowClientWebSocketConn) Connect() (err error) {
113-
if c.wsClient, err = config.NewWSClient(common.AgentService, c.url, c.authOpts, nil); err != nil {
113+
if c.wsClient, err = config.NewWSClient(common.AgentService, c.url, ws.ClientOpts{AuthOpts: c.authOpts}); err != nil {
114114
return nil
115115
}
116116

@@ -146,7 +146,7 @@ func (c *FlowClient) close() {
146146

147147
// SendFlow sends a flow to the server
148148
func (c *FlowClient) SendFlow(f *flow.Flow) error {
149-
data, err := f.GetData()
149+
data, err := f.Marshal()
150150
if err != nil {
151151
return err
152152
}
@@ -164,10 +164,11 @@ retry:
164164
}
165165

166166
// SendFlows sends flows to the server
167-
func (c *FlowClient) SendFlows(flows []*flow.Flow) {
168-
for _, flow := range flows {
169-
err := c.SendFlow(flow)
170-
if err != nil {
167+
func (c *FlowClient) SendFlows(flowArray *flow.FlowArray) {
168+
// TODO(safchain) in case of websocket there is no size limitation we should send
169+
// bulk directly
170+
for _, flow := range flowArray.Flows {
171+
if err := c.SendFlow(flow); err != nil {
171172
logging.GetLogger().Errorf("Unable to send flow: %s", err)
172173
}
173174
}
@@ -240,7 +241,7 @@ func (p *FlowClientPool) OnDisconnected(c ws.Speaker) {
240241
}
241242

242243
// SendFlows sends flows using a random connection
243-
func (p *FlowClientPool) SendFlows(flows []*flow.Flow) {
244+
func (p *FlowClientPool) SendFlows(flowArray *flow.FlowArray) {
244245
p.RLock()
245246
defer p.RUnlock()
246247

@@ -249,7 +250,7 @@ func (p *FlowClientPool) SendFlows(flows []*flow.Flow) {
249250
}
250251

251252
fc := p.flowClients[rand.Intn(len(p.flowClients))]
252-
fc.SendFlows(flows)
253+
fc.SendFlows(flowArray)
253254
}
254255

255256
// Close all connections

analyzer/flow_server.go

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,15 @@ type FlowServer struct {
101101

102102
// OnMessage event
103103
func (c *FlowServerWebSocketConn) OnMessage(client ws.Speaker, m ws.Message) {
104-
f, err := flow.FromData(m.Bytes(client.GetClientProtocol()))
105-
if err != nil {
104+
// rawmessage at this point
105+
b, _ := m.Bytes(ws.RawProtocol)
106+
107+
var f flow.Flow
108+
if err := f.Unmarshal(b); err != nil {
106109
logging.GetLogger().Errorf("Error while parsing flow: %s", err)
107110
return
108111
}
112+
109113
logging.GetLogger().Debugf("New flow from Websocket connection: %+v", f)
110114
if len(c.ch) >= c.maxFlowBufferSize {
111115
c.numOfLostFlows++
@@ -118,7 +122,7 @@ func (c *FlowServerWebSocketConn) OnMessage(client ws.Speaker, m ws.Message) {
118122
return
119123
}
120124

121-
c.ch <- f
125+
c.ch <- &f
122126
}
123127

124128
// Serve starts a WebSocket flow server
@@ -161,8 +165,8 @@ func (c *FlowServerUDPConn) Serve(ch chan *flow.Flow, quit chan struct{}, wg *sy
161165
logging.GetLogger().Errorf("Error while reading: %s", err)
162166
}
163167

164-
f, err := flow.FromData(data[0:n])
165-
if err != nil {
168+
var f flow.Flow
169+
if err := f.Unmarshal(data[0:n]); err != nil {
166170
logging.GetLogger().Errorf("Error while parsing flow: %s", err)
167171
continue
168172
}
@@ -178,7 +182,7 @@ func (c *FlowServerUDPConn) Serve(ch chan *flow.Flow, quit chan struct{}, wg *sy
178182
}
179183
return
180184
}
181-
ch <- f
185+
ch <- &f
182186
}
183187
}
184188
}()
@@ -202,13 +206,13 @@ func NewFlowServerUDPConn(addr string, port int) (*FlowServerUDPConn, error) {
202206
return &FlowServerUDPConn{conn: conn, maxFlowBufferSize: flowsMax}, err
203207
}
204208

205-
func (s *FlowServer) storeFlows(flows []*flow.Flow) {
206-
if len(flows) > 0 {
209+
func (s *FlowServer) storeFlows(flows *flow.FlowArray) {
210+
if len(flows.Flows) > 0 {
207211
if s.storage != nil {
208-
if err := s.storage.StoreFlows(flows); err != nil {
212+
if err := s.storage.StoreFlows(flows.Flows); err != nil {
209213
logging.GetLogger().Error(err)
210214
} else {
211-
logging.GetLogger().Debugf("%d flows stored", len(flows))
215+
logging.GetLogger().Debugf("%d flows stored", len(flows.Flows))
212216
}
213217
}
214218

@@ -228,21 +232,21 @@ func (s *FlowServer) Start() {
228232
dlTimer := time.NewTicker(s.bulkInsertDeadline)
229233
defer dlTimer.Stop()
230234

231-
var flowBuffer []*flow.Flow
232-
defer s.storeFlows(flowBuffer)
235+
var flowArray flow.FlowArray
236+
defer s.storeFlows(&flowArray)
233237

234238
for {
235239
select {
236240
case <-s.quit:
237241
return
238242
case <-dlTimer.C:
239-
s.storeFlows(flowBuffer)
240-
flowBuffer = flowBuffer[:0]
243+
s.storeFlows(&flowArray)
244+
flowArray.Flows = flowArray.Flows[:0]
241245
case f := <-s.ch:
242-
flowBuffer = append(flowBuffer, f)
243-
if len(flowBuffer) >= s.bulkInsert {
244-
s.storeFlows(flowBuffer)
245-
flowBuffer = flowBuffer[:0]
246+
flowArray.Flows = append(flowArray.Flows, f)
247+
if len(flowArray.Flows) >= s.bulkInsert {
248+
s.storeFlows(&flowArray)
249+
flowArray.Flows = flowArray.Flows[:0]
246250
}
247251
}
248252
}

analyzer/flow_subscriber_endpoint.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ type FlowSubscriberEndpoint struct {
3333
}
3434

3535
// SendFlows sends flow to the subscribers
36-
func (fs *FlowSubscriberEndpoint) SendFlows(flows []*flow.Flow) {
37-
msg := ws.NewStructMessage("flow", "store", flows)
36+
func (fs *FlowSubscriberEndpoint) SendFlows(flowArray *flow.FlowArray) {
37+
msg := ws.NewStructMessage("flow", "store", flowArray)
3838
fs.pool.BroadcastMessage(msg)
3939
}
4040

api/server/pcap.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"net/http"
2727
"time"
2828

29-
"github.com/abbot/go-http-auth"
29+
auth "github.com/abbot/go-http-auth"
3030
"github.com/skydive-project/skydive/config"
3131
"github.com/skydive-project/skydive/flow"
3232
"github.com/skydive-project/skydive/flow/storage"
@@ -40,10 +40,10 @@ type PcapAPI struct {
4040
Storage storage.Storage
4141
}
4242

43-
func (p *PcapAPI) flowExpireUpdate(flows []*flow.Flow) {
44-
if p.Storage != nil && len(flows) > 0 {
45-
p.Storage.StoreFlows(flows)
46-
logging.GetLogger().Debugf("%d flows stored", len(flows))
43+
func (p *PcapAPI) flowExpireUpdate(flowArray *flow.FlowArray) {
44+
if p.Storage != nil && len(flowArray.Flows) > 0 {
45+
p.Storage.StoreFlows(flowArray.Flows)
46+
logging.GetLogger().Debugf("%d flows stored", len(flowArray.Flows))
4747
}
4848
}
4949

api/server/topology.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
"net/http"
3131
"strings"
3232

33-
"github.com/abbot/go-http-auth"
33+
auth "github.com/abbot/go-http-auth"
3434
"github.com/skydive-project/skydive/api/types"
3535
"github.com/skydive-project/skydive/flow"
3636
"github.com/skydive-project/skydive/graffiti/graph"
@@ -193,7 +193,7 @@ func (t *TopologyAPI) topologySearch(w http.ResponseWriter, r *auth.Authenticate
193193
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
194194
w.WriteHeader(http.StatusOK)
195195
if err := json.NewEncoder(w).Encode(res); err != nil {
196-
logging.GetLogger().Warningf("Error while writing response: %s", err)
196+
logging.GetLogger().Errorf("Error while writing response: %s", err)
197197
}
198198
}
199199
}

cmd/client/topology.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"errors"
2828
"fmt"
2929
"io/ioutil"
30-
"net/http"
3130
"os"
3231

3332
"github.com/skydive-project/skydive/common"
@@ -75,9 +74,9 @@ var TopologyImport = &cobra.Command{
7574
}
7675

7776
url := config.GetURL("ws", sa.Addr, sa.Port, "/ws/publisher")
78-
headers := http.Header{}
79-
headers.Add("X-Persistence-Policy", string(hub.Persistent))
80-
client, err := config.NewWSClient(common.UnknownService, url, &AuthenticationOpts, headers)
77+
opts := websocket.ClientOpts{AuthOpts: &AuthenticationOpts}
78+
opts.Headers.Add("X-Persistence-Policy", string(hub.Persistent))
79+
client, err := config.NewWSClient(common.UnknownService, url, opts)
8180
if err != nil {
8281
exitOnError(err)
8382
}

config/websocket.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
package config
2424

2525
import (
26-
"net/http"
2726
"net/url"
2827
"time"
2928

@@ -33,16 +32,19 @@ import (
3332
)
3433

3534
// NewWSClient creates a Client based on the configuration
36-
func NewWSClient(clientType common.ServiceType, url *url.URL, authOpts *shttp.AuthenticationOpts, headers http.Header) (*websocket.Client, error) {
35+
func NewWSClient(clientType common.ServiceType, url *url.URL, opts websocket.ClientOpts) (*websocket.Client, error) {
3736
host := GetString("host_id")
38-
queueSize := GetInt("http.ws.queue_size")
39-
writeCompression := GetBool("http.ws.enable_write_compression")
37+
38+
// override some of the options with config value
39+
opts.QueueSize = GetInt("http.ws.queue_size")
40+
opts.WriteCompression = GetBool("http.ws.enable_write_compression")
4041
tlsConfig, err := GetTLSClientConfig(true)
4142
if err != nil {
4243
return nil, err
4344
}
45+
opts.TLSConfig = tlsConfig
4446

45-
return websocket.NewClient(host, clientType, url, authOpts, headers, queueSize, writeCompression, tlsConfig), nil
47+
return websocket.NewClient(host, clientType, url, opts), nil
4648
}
4749

4850
// NewWSServer creates a Server based on the configuration

contrib/objectstore/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package main
22

33
import (
4-
"net/http"
54
"net/url"
65
"os"
76
"os/signal"
@@ -12,6 +11,7 @@ import (
1211
"github.com/skydive-project/skydive/contrib/objectstore/subscriber"
1312
shttp "github.com/skydive-project/skydive/http"
1413
"github.com/skydive-project/skydive/logging"
14+
"github.com/skydive-project/skydive/websocket"
1515
)
1616

1717
const defaultConfigurationFile = "/etc/skydive/skydive-objectstore.yml"
@@ -40,7 +40,7 @@ func main() {
4040
subscriberPassword := cfg.GetString("subscriber_password")
4141
maxSecondsPerStream := cfg.GetInt("max_seconds_per_stream")
4242

43-
authOptions := &shttp.AuthenticationOpts{
43+
authOpts := &shttp.AuthenticationOpts{
4444
Username: subscriberUsername,
4545
Password: subscriberPassword,
4646
}
@@ -51,7 +51,7 @@ func main() {
5151
os.Exit(1)
5252
}
5353

54-
wsClient, err := config.NewWSClient(common.AnalyzerService, subscriberURL, authOptions, http.Header{})
54+
wsClient, err := config.NewWSClient(common.AnalyzerService, subscriberURL, websocket.ClientOpts{AuthOpts: authOpts})
5555
if err != nil {
5656
logging.GetLogger().Errorf("Failed to create websocket client: %s", err)
5757
os.Exit(1)

0 commit comments

Comments
 (0)