Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 19 additions & 32 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@ import (

"github.com/instill-ai/pipeline-backend/config"
"github.com/instill-ai/pipeline-backend/pkg/acl"
"github.com/instill-ai/pipeline-backend/pkg/external"
"github.com/instill-ai/pipeline-backend/pkg/handler"
"github.com/instill-ai/pipeline-backend/pkg/memory"
"github.com/instill-ai/pipeline-backend/pkg/middleware"
"github.com/instill-ai/pipeline-backend/pkg/pubsub"
"github.com/instill-ai/pipeline-backend/pkg/repository"
"github.com/instill-ai/pipeline-backend/pkg/service"
"github.com/instill-ai/pipeline-backend/pkg/usage"
"github.com/instill-ai/x/temporal"

componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store"
database "github.com/instill-ai/pipeline-backend/pkg/db"
Expand All @@ -53,6 +51,7 @@ import (
otelx "github.com/instill-ai/x/otel"
servergrpcx "github.com/instill-ai/x/server/grpc"
gatewayx "github.com/instill-ai/x/server/grpc/gateway"
temporalx "github.com/instill-ai/x/temporal"
)

const gracefulShutdownWaitPeriod = 15 * time.Second
Expand Down Expand Up @@ -120,16 +119,28 @@ func main() {
// Initialize all clients
pipelinePublicServiceClient, mgmtPublicServiceClient, mgmtPrivateServiceClient,
artifactPublicServiceClient, artifactPrivateServiceClient, redisClient, db,
minIOClient, minIOFileGetter, aclClient, temporalClient, closeClients := newClients(ctx, logger)
aclClient, temporalClient, closeClients := newClients(ctx, logger)
defer closeClients()

// Keep NewArtifactBinaryFetcher as requested
binaryFetcher := external.NewArtifactBinaryFetcher(artifactPrivateServiceClient, minIOFileGetter)
// Initialize MinIO client
minIOParams := miniox.ClientParams{
Config: config.Config.Minio,
Logger: logger,
ExpiryRules: service.NewRetentionHandler().ListExpiryRules(),
AppInfo: miniox.AppInfo{
Name: serviceName,
Version: serviceVersion,
},
}

minIOClient, err := miniox.NewMinIOClientAndInitBucket(ctx, minIOParams)
if err != nil {
logger.Fatal("failed to create MinIO client", zap.Error(err))
}

compStore := componentstore.Init(componentstore.InitParams{
Logger: logger,
Secrets: config.Config.Component.Secrets,
BinaryFetcher: binaryFetcher,
TemporalClient: temporalClient,
})

Expand All @@ -156,7 +167,6 @@ func main() {
compStore,
ms,
service.NewRetentionHandler(),
binaryFetcher,
artifactPublicServiceClient,
artifactPrivateServiceClient,
)
Expand Down Expand Up @@ -368,8 +378,6 @@ func newClients(ctx context.Context, logger *zap.Logger) (
artifactpb.ArtifactPrivateServiceClient,
*redis.Client,
*gorm.DB,
miniox.Client,
*miniox.FileGetter,
acl.ACLClient,
temporalclient.Client,
func(),
Expand Down Expand Up @@ -437,7 +445,7 @@ func newClients(ctx context.Context, logger *zap.Logger) (
}

// Initialize Temporal client
temporalClientOptions, err := temporal.ClientOptions(config.Config.Temporal, logger)
temporalClientOptions, err := temporalx.ClientOptions(config.Config.Temporal, logger)
if err != nil {
logger.Fatal("Unable to build Temporal client options", zap.Error(err))
}
Expand Down Expand Up @@ -484,27 +492,6 @@ func newClients(ctx context.Context, logger *zap.Logger) (

aclClient := acl.NewACLClient(fgaClient, fgaReplicaClient, redisClient)

// Initialize MinIO client
minIOParams := miniox.ClientParams{
Config: config.Config.Minio,
Logger: logger,
AppInfo: miniox.AppInfo{
Name: serviceName,
Version: serviceVersion,
},
}
minIOFileGetter, err := miniox.NewFileGetter(minIOParams)
if err != nil {
logger.Fatal("Failed to create MinIO file getter", zap.Error(err))
}

retentionHandler := service.NewRetentionHandler()
minIOParams.ExpiryRules = retentionHandler.ListExpiryRules()
minIOClient, err := miniox.NewMinIOClientAndInitBucket(ctx, minIOParams)
if err != nil {
logger.Fatal("failed to create MinIO client", zap.Error(err))
}

closer := func() {
for conn, fn := range closeFuncs {
if err := fn(); err != nil {
Expand All @@ -515,5 +502,5 @@ func newClients(ctx context.Context, logger *zap.Logger) (

return pipelinePublicServiceClient, mgmtPublicServiceClient, mgmtPrivateServiceClient,
artifactPublicServiceClient, artifactPrivateServiceClient, redisClient, db,
minIOClient, minIOFileGetter, aclClient, temporalClient, closer
aclClient, temporalClient, closer
}
52 changes: 19 additions & 33 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@ import (
temporalclient "go.temporal.io/sdk/client"

"github.com/instill-ai/pipeline-backend/config"
"github.com/instill-ai/pipeline-backend/pkg/external"
"github.com/instill-ai/pipeline-backend/pkg/memory"
"github.com/instill-ai/pipeline-backend/pkg/pubsub"
"github.com/instill-ai/pipeline-backend/pkg/repository"
"github.com/instill-ai/pipeline-backend/pkg/service"
"github.com/instill-ai/x/client"
"github.com/instill-ai/x/minio"
"github.com/instill-ai/x/temporal"

componentstore "github.com/instill-ai/pipeline-backend/pkg/component/store"
database "github.com/instill-ai/pipeline-backend/pkg/db"
Expand All @@ -37,7 +34,9 @@ import (
pipelinepb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta"
clientgrpcx "github.com/instill-ai/x/client/grpc"
logx "github.com/instill-ai/x/log"
miniox "github.com/instill-ai/x/minio"
otelx "github.com/instill-ai/x/otel"
temporalx "github.com/instill-ai/x/temporal"
)

const gracefulShutdownWaitPeriod = 15 * time.Second
Expand Down Expand Up @@ -83,16 +82,27 @@ func main() {

// Initialize all clients
pipelinePublicServiceClient, artifactPublicServiceClient, artifactPrivateServiceClient,
redisClient, db, minIOClient, minIOFileGetter, temporalClient, timeseries, closeClients := newClients(ctx, logger)
redisClient, db, temporalClient, timeseries, closeClients := newClients(ctx, logger)
defer closeClients()

// Keep NewArtifactBinaryFetcher as requested
binaryFetcher := external.NewArtifactBinaryFetcher(artifactPrivateServiceClient, minIOFileGetter)
minIOParams := miniox.ClientParams{
Config: config.Config.Minio,
Logger: logger,
ExpiryRules: service.NewRetentionHandler().ListExpiryRules(),
AppInfo: miniox.AppInfo{
Name: serviceName,
Version: serviceVersion,
},
}

minIOClient, err := miniox.NewMinIOClientAndInitBucket(ctx, minIOParams)
if err != nil {
logger.Fatal("failed to create MinIO client", zap.Error(err))
}

compStore := componentstore.Init(componentstore.InitParams{
Logger: logger,
Secrets: config.Config.Component.Secrets,
BinaryFetcher: binaryFetcher,
TemporalClient: temporalClient,
})

Expand All @@ -111,7 +121,6 @@ func main() {
MemoryStore: ms,
ArtifactPublicServiceClient: artifactPublicServiceClient,
ArtifactPrivateServiceClient: artifactPrivateServiceClient,
BinaryFetcher: binaryFetcher,
PipelinePublicServiceClient: pipelinePublicServiceClient,
},
)
Expand Down Expand Up @@ -195,8 +204,6 @@ func newClients(ctx context.Context, logger *zap.Logger) (
artifactpb.ArtifactPrivateServiceClient,
*redis.Client,
*gorm.DB,
minio.Client,
*minio.FileGetter,
temporalclient.Client,
*repository.InfluxDB,
func(),
Expand Down Expand Up @@ -257,7 +264,7 @@ func newClients(ctx context.Context, logger *zap.Logger) (
}

// Initialize Temporal client
temporalClientOptions, err := temporal.ClientOptions(config.Config.Temporal, logger)
temporalClientOptions, err := temporalx.ClientOptions(config.Config.Temporal, logger)
if err != nil {
logger.Fatal("Unable to build Temporal client options", zap.Error(err))
}
Expand All @@ -283,27 +290,6 @@ func newClients(ctx context.Context, logger *zap.Logger) (
return nil
}

// Initialize MinIO client
minIOParams := minio.ClientParams{
Config: config.Config.Minio,
Logger: logger,
AppInfo: minio.AppInfo{
Name: serviceName,
Version: serviceVersion,
},
}
minIOFileGetter, err := minio.NewFileGetter(minIOParams)
if err != nil {
logger.Fatal("Failed to create MinIO file getter", zap.Error(err))
}

retentionHandler := service.NewRetentionHandler()
minIOParams.ExpiryRules = retentionHandler.ListExpiryRules()
minIOClient, err := minio.NewMinIOClientAndInitBucket(ctx, minIOParams)
if err != nil {
logger.Fatal("failed to create MinIO client", zap.Error(err))
}

closer := func() {
for conn, fn := range closeFuncs {
if err := fn(); err != nil {
Expand All @@ -313,5 +299,5 @@ func newClients(ctx context.Context, logger *zap.Logger) (
}

return pipelinePublicServiceClient, artifactPublicServiceClient, artifactPrivateServiceClient,
redisClient, db, minIOClient, minIOFileGetter, temporalClient, timeseries, closer
redisClient, db, temporalClient, timeseries, closer
}
34 changes: 17 additions & 17 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"github.com/knadh/koanf/providers/file"
"github.com/redis/go-redis/v9"

"github.com/instill-ai/x/client"
"github.com/instill-ai/x/minio"
"github.com/instill-ai/x/temporal"
clientx "github.com/instill-ai/x/client"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wouldn't alias these unless there's a conflict with other package name (e.g. when we use the Temporal SDK and x/temporal, or when partially migrating a package like resource and we might need to use both the local and the x resource packages).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The references of x packages across all backend repos can indeed often encounter naming conflict, especially for client, temporal, etc. I found it can improve readability by appending x as suffix to indicate the use of the x package no matter how.

miniox "github.com/instill-ai/x/minio"
temporalx "github.com/instill-ai/x/temporal"
)

const (
Expand All @@ -31,20 +31,20 @@ var Config AppConfig

// AppConfig defines
type AppConfig struct {
Server ServerConfig `koanf:"server"`
Component ComponentConfig `koanf:"component"`
Database DatabaseConfig `koanf:"database"`
InfluxDB InfluxDBConfig `koanf:"influxdb"`
Temporal temporal.ClientConfig `koanf:"temporal"`
Cache CacheConfig `koanf:"cache"`
OTELCollector OTELCollectorConfig `koanf:"otelcollector"`
MgmtBackend client.ServiceConfig `koanf:"mgmtbackend"`
ModelBackend client.ServiceConfig `koanf:"modelbackend"`
OpenFGA OpenFGAConfig `koanf:"openfga"`
ArtifactBackend client.ServiceConfig `koanf:"artifactbackend"`
Minio minio.Config `koanf:"minio"`
AgentBackend client.ServiceConfig `koanf:"agentbackend"`
APIGateway APIGatewayConfig `koanf:"apigateway"`
Server ServerConfig `koanf:"server"`
Component ComponentConfig `koanf:"component"`
Database DatabaseConfig `koanf:"database"`
InfluxDB InfluxDBConfig `koanf:"influxdb"`
Temporal temporalx.ClientConfig `koanf:"temporal"`
Cache CacheConfig `koanf:"cache"`
OTELCollector OTELCollectorConfig `koanf:"otelcollector"`
MgmtBackend clientx.ServiceConfig `koanf:"mgmtbackend"`
ModelBackend clientx.ServiceConfig `koanf:"modelbackend"`
OpenFGA OpenFGAConfig `koanf:"openfga"`
ArtifactBackend clientx.ServiceConfig `koanf:"artifactbackend"`
Minio miniox.Config `koanf:"minio"`
AgentBackend clientx.ServiceConfig `koanf:"agentbackend"`
APIGateway APIGatewayConfig `koanf:"apigateway"`
}

// APIGatewayConfig related to API gateway
Expand Down
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ minio:
port: 9000
user: minioadmin
password: minioadmin
bucketname: instill-ai-vdp
bucketname: core-pipeline
secure: false
agentbackend:
host: agent-backend
Expand Down
5 changes: 3 additions & 2 deletions pkg/component/base/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
temporalclient "go.temporal.io/sdk/client"

"github.com/instill-ai/pipeline-backend/pkg/component/internal/jsonref"
"github.com/instill-ai/pipeline-backend/pkg/data/binary"
"github.com/instill-ai/pipeline-backend/pkg/data/format"
"github.com/instill-ai/pipeline-backend/pkg/external"

pipelinepb "github.com/instill-ai/protogen-go/pipeline/pipeline/v1beta"
)
Expand Down Expand Up @@ -161,14 +161,15 @@ type IdentifierResult struct {
}

// Component implements the common component methods.

type Component struct {
Logger *zap.Logger
NewUsageHandler UsageHandlerCreator

definition *pipelinepb.ComponentDefinition
secretFields []string

BinaryFetcher external.BinaryFetcher
BinaryFetcher binary.Fetcher
TemporalClient temporalclient.Client
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/instill-ai/pipeline-backend/pkg/component/base"
"github.com/instill-ai/pipeline-backend/pkg/component/internal/mock"
"github.com/instill-ai/pipeline-backend/pkg/data"
"github.com/instill-ai/pipeline-backend/pkg/external"
"github.com/instill-ai/pipeline-backend/pkg/data/binary"
)

func TestDetectActivity(t *testing.T) {
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestDetectActivity(t *testing.T) {
jsonValue, err := data.NewJSONValue(segmentsMap)
c.Assert(err, qt.IsNil)

binaryFetcher := external.NewBinaryFetcher()
binaryFetcher := binary.NewFetcher()
unmarshaler := data.NewUnmarshaler(binaryFetcher)
c.Assert(unmarshaler.Unmarshal(context.Background(), jsonValue, &expectedSegmentsStruct), qt.IsNil)
expectedSegments := expectedSegmentsStruct.Segments
Expand Down
4 changes: 2 additions & 2 deletions pkg/component/operator/audio/v0/task_segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/instill-ai/pipeline-backend/pkg/component/base"
"github.com/instill-ai/pipeline-backend/pkg/component/internal/mock"
"github.com/instill-ai/pipeline-backend/pkg/data"
"github.com/instill-ai/pipeline-backend/pkg/external"
"github.com/instill-ai/pipeline-backend/pkg/data/binary"
)

func TestSegment(t *testing.T) {
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestSegment(t *testing.T) {
jsonValue, err := data.NewJSONValue(segmentsMap)
c.Assert(err, qt.IsNil)

binaryFetcher := external.NewBinaryFetcher()
binaryFetcher := binary.NewFetcher()
unmarshaler := data.NewUnmarshaler(binaryFetcher)
c.Assert(unmarshaler.Unmarshal(context.Background(), jsonValue, &segmentsStruct), qt.IsNil)
segments := segmentsStruct.Segments
Expand Down
Loading
Loading