Skip to content

Commit a094ef7

Browse files
authored
Merge pull request #379 from streamdal/blinktag/remove_streamdal_backend
#375 - Remove Streamdal Action
2 parents 21c852f + 3f29788 commit a094ef7

32 files changed

+594
-4968
lines changed

backends/gcppubsub/relay.go

Lines changed: 2 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,16 @@ package gcppubsub
22

33
import (
44
"context"
5-
"fmt"
65
"sync"
76
"time"
87

98
"cloud.google.com/go/pubsub"
10-
"github.com/pkg/errors"
11-
"github.com/sirupsen/logrus"
12-
sdk "github.com/streamdal/streamdal/sdks/go"
13-
149
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
1510
"github.com/batchcorp/plumber-schemas/build/go/protos/records"
11+
"github.com/pkg/errors"
12+
"github.com/sirupsen/logrus"
1613

1714
"github.com/streamdal/plumber/backends/gcppubsub/types"
18-
"github.com/streamdal/plumber/util"
19-
2015
"github.com/streamdal/plumber/prometheus"
2116
"github.com/streamdal/plumber/validate"
2217
)
@@ -33,14 +28,6 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel
3328
"backend": "gcp-pubsub",
3429
})
3530

36-
// streamdal sdk BEGIN
37-
sc, err := util.SetupStreamdalSDK(relayOpts, llog)
38-
if err != nil {
39-
return errors.Wrap(err, "kafka.Relay(): unable to create new streamdal client")
40-
}
41-
// defer sc.Close()
42-
// streamdal sdk END
43-
4431
var m sync.Mutex
4532

4633
var readFunc = func(ctx context.Context, msg *pubsub.Message) {
@@ -53,42 +40,6 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel
5340

5441
prometheus.Incr("gcp-pubsub-relay-consumer", 1)
5542

56-
// streamdal sdk BEGIN
57-
// If streamdal integration is enabled, process message via sdk
58-
if sc != nil {
59-
g.log.Debug("Processing message via streamdal SDK")
60-
61-
operationName := "relay"
62-
63-
if relayOpts != nil && relayOpts.GcpPubsub != nil && relayOpts.GcpPubsub.GetArgs() != nil {
64-
if relayOpts.GcpPubsub.GetArgs().SubscriptionId == "" {
65-
operationName = "relay-unknown-subid"
66-
} else {
67-
operationName = "relay-" + relayOpts.GcpPubsub.GetArgs().SubscriptionId
68-
}
69-
}
70-
71-
resp := sc.Process(ctx, &sdk.ProcessRequest{
72-
ComponentName: "gcp-pubsub",
73-
OperationType: sdk.OperationTypeConsumer,
74-
OperationName: operationName,
75-
Data: msg.Data,
76-
})
77-
78-
if resp.Status == sdk.ExecStatusError {
79-
wrappedErr := fmt.Errorf("unable to process message via streamdal: %v", resp.StatusMessage)
80-
81-
prometheus.IncrPromCounter("plumber_sdk_errors", 1)
82-
util.WriteError(llog, errorCh, wrappedErr)
83-
84-
return
85-
}
86-
87-
// Update msg value with processed data
88-
msg.Data = resp.Data
89-
}
90-
// streamdal sdk END
91-
9243
g.log.Debug("Writing message to relay channel")
9344

9445
relayCh <- &types.RelayMessage{

backends/kafka/relay.go

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"github.com/streamdal/plumber/prometheus"
1616
"github.com/streamdal/plumber/util"
1717
"github.com/streamdal/plumber/validate"
18-
19-
sdk "github.com/streamdal/streamdal/sdks/go"
2018
)
2119

2220
const (
@@ -36,14 +34,6 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh
3634

3735
defer reader.Close()
3836

39-
// streamdal sdk BEGIN
40-
sc, err := util.SetupStreamdalSDK(relayOpts, k.log)
41-
if err != nil {
42-
return errors.Wrap(err, "kafka.Relay(): unable to create new streamdal client")
43-
}
44-
// defer sc.Close()
45-
// streamdal sdk END
46-
4737
llog := k.log.WithFields(logrus.Fields{
4838
"relay-id": relayOpts.XRelayId,
4939
"backend": "kafka",
@@ -74,32 +64,6 @@ func (k *Kafka) Relay(ctx context.Context, relayOpts *opts.RelayOptions, relayCh
7464

7565
prometheus.Incr("kafka-relay-consumer", 1)
7666

77-
// streamdal sdk BEGIN
78-
// If streamdal integration is enabled, process message via sdk
79-
if sc != nil {
80-
k.log.Debug("Processing message via streamdal SDK")
81-
82-
resp := sc.Process(ctx, &sdk.ProcessRequest{
83-
ComponentName: "kafka",
84-
OperationType: sdk.OperationTypeConsumer,
85-
OperationName: "relay",
86-
Data: msg.Value,
87-
})
88-
89-
if resp.Status == sdk.ExecStatusError {
90-
wrappedErr := fmt.Errorf("unable to process message via streamdal: %v", resp.StatusMessage)
91-
92-
prometheus.IncrPromCounter("plumber_sdk_errors", 1)
93-
util.WriteError(llog, errorCh, wrappedErr)
94-
95-
continue
96-
}
97-
98-
// Update msg value with processed data
99-
msg.Value = resp.Data
100-
}
101-
// streamdal sdk END
102-
10367
k.log.Debugf("Writing Kafka message to relay channel: %s", msg.Value)
10468

10569
relayCh <- &types.RelayMessage{

backends/streamdal/auth.go

Lines changed: 0 additions & 154 deletions
This file was deleted.

backends/streamdal/auth_test.go

Lines changed: 0 additions & 63 deletions
This file was deleted.

0 commit comments

Comments
 (0)