Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ The following environment variables are available to configure the NetObserv eBP
* `TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow or packet collector.
* `GRPC_MESSAGE_MAX_FLOWS` (default: `10000`). Specifies the limit, in number of flows, of each GRPC
message. Messages larger than that number will be split and submitted sequentially.
* `GRPC_RECONNECT_TIMER` specifies a period after which the GRPC connection is re-established. This is
useful for load rebalancing across receivers. Disabled by default, which means
connections are not actively re-established.
* `GRPC_RECONNECT_TIMER_RANDOMIZATION` specifies how much `GRPC_RECONNECT_TIMER` should be randomized,
to avoid several agents reconnecting all at the same time. The value must be lower than `GRPC_RECONNECT_TIMER`.
For instance, if `GRPC_RECONNECT_TIMER` is 5m and `GRPC_RECONNECT_TIMER_RANDOMIZATION` is 30s,
the randomization yields a value between 4m30s and 5m30s.
* `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow.
* `AGENT_IP_IFACE` (default: `external`). Specifies which interface should the agent pick the IP
address from in order to report it in the AgentIP field on each flow. Accepted values are:
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func buildGRPCExporter(cfg *config.Agent, m *metrics.Metrics) (node.TerminalFunc
return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort)
}
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.TargetTLSCACertPath, cfg.TargetTLSUserCertPath, cfg.TargetTLSUserKeyPath, cfg.GRPCMessageMaxFlows, m)
grpcExporter, err := exporter.StartGRPCProto(cfg, m)
if err != nil {
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,15 @@ type Agent struct {
// GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages
// larger than that number will be split and submitted sequentially.
GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"`
// GRPCReconnectTimer specifies a period after which the GRPC connection is re-established. This is
// useful for load rebalancing across receivers. Disabled by default, which means
// connections are not actively re-established.
GRPCReconnectTimer time.Duration `env:"GRPC_RECONNECT_TIMER"`
// GRPCReconnectTimerRandomization specifies how much GRPCReconnectTimer should be randomized,
// to avoid several agents reconnecting all at the same time. The value must be lower than GRPCReconnectTimer.
// For instance, if GRPCReconnectTimer is 5m and GRPCReconnectTimerRandomization is 30s,
// the randomization yields a value between 4m30s and 5m30s.
GRPCReconnectTimerRandomization time.Duration `env:"GRPC_RECONNECT_TIMER_RANDOMIZATION"`
// Interfaces contains the interface names from where flows will be collected. If empty, the agent
// will fetch all the interfaces in the system, excepting the ones listed in ExcludeInterfaces.
// If an entry is enclosed by slashes (e.g. `/br-/`), it will match as regular expression,
Expand Down
123 changes: 98 additions & 25 deletions pkg/exporter/grpc_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package exporter

import (
"context"
"math/rand/v2"
"sync"
"time"

"github.com/netobserv/netobserv-ebpf-agent/pkg/config"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
"github.com/netobserv/netobserv-ebpf-agent/pkg/model"
Expand All @@ -21,51 +25,120 @@ const componentGRPC = "grpc"
// by its input channel, converts them to *pbflow.Records instances, and submits
// them to the collector.
type GRPCProto struct {
hostIP string
hostPort int
clientConn *grpc.ClientConnection
hostIP string
hostPort int
caCertPath string
userCertPath string
userKeyPath string
m sync.RWMutex
clientConn *grpc.ClientConnection
// maxFlowsPerMessage limits the maximum number of flows per GRPC message.
// If a message contains more flows than this number, the GRPC message will be split into
// multiple messages.
maxFlowsPerMessage int
reconnectTimer time.Duration
metrics *metrics.Metrics
batchCounter prometheus.Counter
batchCounterMetric prometheus.Counter
}

func StartGRPCProto(hostIP string, hostPort int, caPath, userCertPath, userKeyPath string, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
clientConn, err := grpc.ConnectClient(hostIP, hostPort, caPath, userCertPath, userKeyPath)
if err != nil {
func StartGRPCProto(cfg *config.Agent, m *metrics.Metrics) (*GRPCProto, error) {
exporter := GRPCProto{
hostIP: cfg.TargetHost,
hostPort: cfg.TargetPort,
caCertPath: cfg.TargetTLSCACertPath,
userCertPath: cfg.TargetTLSUserCertPath,
userKeyPath: cfg.TargetTLSUserKeyPath,
maxFlowsPerMessage: cfg.GRPCMessageMaxFlows,
reconnectTimer: randomizeTimer(cfg),
metrics: m,
batchCounterMetric: m.CreateBatchCounter(componentGRPC),
}
if err := exporter.reconnect(); err != nil {
return nil, err
}
return &GRPCProto{
hostIP: hostIP,
hostPort: hostPort,
clientConn: clientConn,
maxFlowsPerMessage: maxFlowsPerMessage,
metrics: m,
batchCounter: m.CreateBatchCounter(componentGRPC),
}, nil
return &exporter, nil
}

func (g *GRPCProto) reconnect() error {
g.m.Lock()
defer g.m.Unlock()
if g.clientConn != nil {
if err := g.clientConn.Close(); err != nil {
return err
}
}
clientConn, err := grpc.ConnectClient(g.hostIP, g.hostPort, g.caCertPath, g.userCertPath, g.userKeyPath)
if err != nil {
return err
}
g.clientConn = clientConn
return nil
}

// ExportFlows accepts slices of *model.Record by its input channel, converts them
// to *pbflow.Records instances, and submits them to the collector.
func (g *GRPCProto) ExportFlows(input <-chan []*model.Record) {
socket := utils.GetSocket(g.hostIP, g.hostPort)
log := glog.WithField("collector", socket)
for inputRecords := range input {
g.metrics.EvictionCounter.WithSource(componentGRPC).Inc()
for _, pbRecords := range pbflow.FlowsToPB(inputRecords, g.maxFlowsPerMessage) {
log.Debugf("sending %d records", len(pbRecords.Entries))
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage", metrics.HighSeverity).Inc()
log.WithError(err).Error("couldn't send flow records to collector")

if g.reconnectTimer > 0 {
ticker := time.NewTicker(g.reconnectTimer)
log.Infof("Reconnect timer set to: %v", g.reconnectTimer)
done := make(chan bool)
defer func() {
ticker.Stop()
done <- true
}()
go func() {
for {
select {
case <-done:
return
case <-ticker.C:
// Re-establish the connection
if err := g.reconnect(); err != nil {
log.WithError(err).Warn("couldn't reconnect the GRPC export client")
g.metrics.Errors.WithErrorName(componentGRPC, "CannotReconnectClient", metrics.HighSeverity).Inc()
}
}
}
g.batchCounter.Inc()
g.metrics.EvictedFlowsCounter.WithSource(componentGRPC).Add(float64(len(pbRecords.Entries)))
}
}()
}
for inputRecords := range input {
g.sendBatch(inputRecords, log)
}
if err := g.clientConn.Close(); err != nil {
log.WithError(err).Warn("couldn't close flow export client")
g.metrics.Errors.WithErrorName(componentGRPC, "CannotCloseClient", metrics.MediumSeverity).Inc()
}
}

func (g *GRPCProto) sendBatch(batch []*model.Record, log *logrus.Entry) {
g.m.RLock()
defer g.m.RUnlock()
g.metrics.EvictionCounter.WithSource(componentGRPC).Inc()
for _, pbRecords := range pbflow.FlowsToPB(batch, g.maxFlowsPerMessage) {
log.Debugf("sending %d records", len(pbRecords.Entries))
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
g.metrics.Errors.WithErrorName(componentGRPC, "CannotWriteMessage", metrics.HighSeverity).Inc()
log.WithError(err).Error("couldn't send flow records to collector")
}
g.batchCounterMetric.Inc()
g.metrics.EvictedFlowsCounter.WithSource(componentGRPC).Add(float64(len(pbRecords.Entries)))
}
}

func randomizeTimer(cfg *config.Agent) time.Duration {
if cfg.GRPCReconnectTimer <= 0 {
return 0
}
timer := cfg.GRPCReconnectTimer
if cfg.GRPCReconnectTimerRandomization <= 0 || cfg.GRPCReconnectTimerRandomization >= timer {
return timer
}
timer += time.Duration(rand.Int64N(2*int64(cfg.GRPCReconnectTimerRandomization)) - int64(cfg.GRPCReconnectTimerRandomization))
if timer < 0 {
return time.Second
}
return timer
}
126 changes: 116 additions & 10 deletions pkg/exporter/grpc_proto_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package exporter

import (
"context"
"fmt"
"net"
"testing"
"time"

"github.com/netobserv/netobserv-ebpf-agent/pkg/config"
grpcflow "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
"github.com/netobserv/netobserv-ebpf-agent/pkg/model"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
test2 "github.com/netobserv/netobserv-ebpf-agent/pkg/test"

"github.com/mariomac/guara/pkg/test"
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/peer"
)

const timeout = 2 * time.Second
Expand All @@ -24,12 +28,17 @@ func TestIPv4GRPCProto_ExportFlows_AgentIP(t *testing.T) {
port, err := test.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
coll, err := grpc.StartCollector(port, serverOut)
coll, err := grpcflow.StartCollector(port, serverOut)
require.NoError(t, err)
defer coll.Close()

// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("127.0.0.1", port, "", "", "", 1000, metrics.NoOp())
cfg := config.Agent{
TargetHost: "127.0.0.1",
TargetPort: port,
GRPCMessageMaxFlows: 1000,
}
exporter, err := StartGRPCProto(&cfg, metrics.NoOp())
require.NoError(t, err)

// Send some flows to the input of the exporter stage
Expand Down Expand Up @@ -66,12 +75,17 @@ func TestIPv6GRPCProto_ExportFlows_AgentIP(t *testing.T) {
port, err := test.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
coll, err := grpc.StartCollector(port, serverOut)
coll, err := grpcflow.StartCollector(port, serverOut)
require.NoError(t, err)
defer coll.Close()

// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("::1", port, "", "", "", 1000, metrics.NoOp())
cfg := config.Agent{
TargetHost: "::1",
TargetPort: port,
GRPCMessageMaxFlows: 1000,
}
exporter, err := StartGRPCProto(&cfg, metrics.NoOp())
require.NoError(t, err)

// Send some flows to the input of the exporter stage
Expand Down Expand Up @@ -108,13 +122,17 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {
port, err := test.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)
coll, err := grpc.StartCollector(port, serverOut)
coll, err := grpcflow.StartCollector(port, serverOut)
require.NoError(t, err)
defer coll.Close()

const msgMaxLen = 10000
// Start GRPCProto exporter stage
exporter, err := StartGRPCProto("127.0.0.1", port, "", "", "", msgMaxLen, metrics.NoOp())
cfg := config.Agent{
TargetHost: "127.0.0.1",
TargetPort: port,
GRPCMessageMaxFlows: 10_000,
}
exporter, err := StartGRPCProto(&cfg, metrics.NoOp())
require.NoError(t, err)

// Send a message much longer than the limit length
Expand All @@ -130,9 +148,9 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {

// expect that the submitted message is split in chunks no longer than msgMaxLen
rs := test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, msgMaxLen)
assert.Len(t, rs.Entries, cfg.GRPCMessageMaxFlows)
rs = test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, msgMaxLen)
assert.Len(t, rs.Entries, cfg.GRPCMessageMaxFlows)
rs = test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, 5000)

Expand All @@ -144,3 +162,91 @@ func TestGRPCProto_SplitLargeMessages(t *testing.T) {
// ok!
}
}

type collectorAPI struct {
pbflow.UnimplementedCollectorServer
recordForwarder chan<- *pbflow.Records
lastClient string
clientOccurrences []int
}

func (c *collectorAPI) Send(ctx context.Context, records *pbflow.Records) (*pbflow.CollectorReply, error) {
if p, ok := peer.FromContext(ctx); ok {
addr := p.Addr.String()
if len(c.clientOccurrences) == 0 || addr != c.lastClient {
c.clientOccurrences = append(c.clientOccurrences, 1)
c.lastClient = addr
} else {
c.clientOccurrences[len(c.clientOccurrences)-1]++
}
}
c.recordForwarder <- records
return &pbflow.CollectorReply{}, nil
}

func TestConnectionReset(t *testing.T) {
// start remote ingestor
port, err := test.FreeTCPPort()
require.NoError(t, err)
serverOut := make(chan *pbflow.Records)

api := collectorAPI{
recordForwarder: serverOut,
clientOccurrences: []int{},
}
coll, err := grpcflow.StartCollectorWithAPI(port, &api)
require.NoError(t, err)
defer coll.Close()

// Start GRPCProto exporter stage
cfg := config.Agent{
TargetHost: "127.0.0.1",
TargetPort: port,
GRPCMessageMaxFlows: 1000,
GRPCReconnectTimer: 500 * time.Millisecond,
}
exporter, err := StartGRPCProto(&cfg, metrics.NoOp())
require.NoError(t, err)

// Send some flows to the input of the exporter stage
nFlows := 5
flows := make(chan []*model.Record, nFlows+1)
go exporter.ExportFlows(flows)
go func() {
for i := range nFlows {
flows <- []*model.Record{
{AgentIP: net.ParseIP(fmt.Sprintf("10.9.8.%d", i)), Metrics: model.BpfFlowContent{BpfFlowMetrics: &ebpf.BpfFlowMetrics{}}},
}
time.Sleep(300 * time.Millisecond)
}
}()

for i := 0; i < nFlows; i++ {
rs := test2.ReceiveTimeout(t, serverOut, timeout)
assert.Len(t, rs.Entries, 1)
r := rs.Entries[0]
assert.EqualValues(t, 0x0a090800+i, r.GetAgentIp().GetIpv4())
}

// Expect flows sent at +0ms, +300ms | +600ms, +900ms | +1200ms ("|" means reconnect event / new client detected)
// If it ends up too flaky, we can increase the durations, e.g. over 5s, or relax the assertion
assert.Equal(t, []int{2, 2, 1}, api.clientOccurrences)

select {
case rs := <-serverOut:
assert.Failf(t, "shouldn't have received any flow", "Got: %#v", rs)
default:
// ok!
}
}

func TestRandomizeMaxMessages(t *testing.T) {
for i := 0; i < 1000; i++ {
v := randomizeTimer(&config.Agent{
GRPCReconnectTimer: 5 * time.Minute,
GRPCReconnectTimerRandomization: 30 * time.Second,
})
assert.GreaterOrEqual(t, v, 270*time.Second /*4m30s*/)
assert.LessOrEqual(t, v, 330*time.Second /*5m30s*/)
}
}
Loading
Loading