Skip to content
18 changes: 15 additions & 3 deletions api/v1/search/mongodbsearch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ import (
)

const (
MongotDefaultPort = 27027
MongotDefaultWireprotoPort = 27027
MongotDefaultGrpcPort = 27028
MongotDefaultMetricsPort = 9946
MongotDefautHealthCheckPort = 8080
MongotDefaultSyncSourceUsername = "search-sync-source"

ForceWireprotoTransportAnnotation = "mongodb.com/v1.force-wireproto-transport"
)

func init() {
Expand Down Expand Up @@ -207,8 +210,12 @@ func (s *MongoDBSearch) GetMongoDBResourceRef() *userv1.MongoDBResourceRef {
return &mdbResourceRef
}

func (s *MongoDBSearch) GetMongotPort() int32 {
return MongotDefaultPort
func (s *MongoDBSearch) GetMongotWireprotoPort() int32 {
return MongotDefaultWireprotoPort
}

func (s *MongoDBSearch) GetMongotGrpcPort() int32 {
return MongotDefaultGrpcPort
}

func (s *MongoDBSearch) GetMongotMetricsPort() int32 {
Expand Down Expand Up @@ -241,3 +248,8 @@ func (s *MongoDBSearch) GetLogLevel() mdb.LogLevel {

return s.Spec.LogLevel
}

func (s *MongoDBSearch) IsWireprotoForced() bool {
val, ok := s.Annotations[ForceWireprotoTransportAnnotation]
return ok && val == "true"
}
8 changes: 6 additions & 2 deletions api/v1/search/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion controllers/om/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ func (d Deployment) GetAllProcessNames() (names []string) {
for _, p := range d.getProcesses() {
names = append(names, p.Name())
}
return
return names
}

func (d Deployment) getProcesses() []Process {
Expand Down
6 changes: 5 additions & 1 deletion controllers/operator/mongodbsearch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func (r *MongoDBSearchReconciler) Reconcile(ctx context.Context, request reconci
return reconcile.Result{RequeueAfter: time.Second * util.RetryTimeSec}, err
}

r.watch.AddWatchedResourceIfNotAdded(searchSource.KeyfileSecretName(), mdbSearch.Namespace, watch.Secret, mdbSearch.NamespacedName())
if mdbSearch.IsWireprotoForced() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of all these if statements, would it make sense to abstract the wireproto stuff away in a struct and then it's also easier to just remove it.

log.Info("Enabling the mongot wireproto server as required by annotation")
// the keyfile secret is necessary for wireproto authentication
r.watch.AddWatchedResourceIfNotAdded(searchSource.KeyfileSecretName(), mdbSearch.Namespace, watch.Secret, mdbSearch.NamespacedName())
}

// Watch for changes in database source CA certificate secrets or configmaps
tlsSourceConfig := searchSource.TLSConfig()
Expand Down
10 changes: 3 additions & 7 deletions controllers/operator/mongodbsearch_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,9 @@ func buildExpectedMongotConfig(search *searchv1.MongoDBSearch, mdbc *mdbcv1.Mong
DataPath: searchcontroller.MongotDataPath,
},
Server: mongot.ConfigServer{
Wireproto: &mongot.ConfigWireproto{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still keep a wireproto test as long as it is possible to enabled it?

Address: "0.0.0.0:27027",
Authentication: &mongot.ConfigAuthentication{
Mode: "keyfile",
KeyFile: searchcontroller.TempKeyfilePath,
},
TLS: mongot.ConfigTLS{Mode: mongot.ConfigTLSModeDisabled},
Grpc: &mongot.ConfigGrpc{
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotGrpcPort()),
TLS: &mongot.ConfigGrpcTLS{Mode: mongot.ConfigTLSModeDisabled},
},
},
Metrics: mongot.ConfigMetrics{
Expand Down
121 changes: 70 additions & 51 deletions controllers/searchcontroller/mongodbsearch_reconcile_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,15 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S
return workflow.Failed(err)
}

keyfileStsModification, err := r.ensureSourceKeyfile(ctx, log)
if apierrors.IsNotFound(err) {
return workflow.Pending("Waiting for keyfile secret to be created")
} else if err != nil {
return workflow.Failed(err)
keyfileStsModification := statefulset.NOOP()
if r.mdbSearch.IsWireprotoForced() {
var err error
keyfileStsModification, err = r.ensureSourceKeyfile(ctx, log)
if apierrors.IsNotFound(err) {
return workflow.Pending("Waiting for keyfile secret to be created")
} else if err != nil {
return workflow.Failed(err)
}
}

if err := r.ensureSearchService(ctx, r.mdbSearch); err != nil {
Expand All @@ -113,11 +117,9 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S
return workflow.Failed(err)
}

egressTlsMongotModification, egressTlsStsModification, err := r.ensureEgressTlsConfig(ctx)
if err != nil {
return workflow.Failed(err)
}
egressTlsMongotModification, egressTlsStsModification := r.ensureEgressTlsConfig(ctx)

// the egress TLS modification needs to always be applied after the ingress one, because it toggles mTLS based on the mode set by the ingress modification
configHash, err := r.ensureMongotConfig(ctx, log, createMongotConfig(r.mdbSearch, r.db), ingressTlsMongotModification, egressTlsMongotModification)
if err != nil {
return workflow.Failed(err)
Expand All @@ -140,6 +142,7 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S
return workflow.OK()
}

// This is called only if the wireproto server is enabled, to set up they keyfile necessary for authentication.
func (r *MongoDBSearchReconcileHelper) ensureSourceKeyfile(ctx context.Context, log *zap.SugaredLogger) (statefulset.Modification, error) {
keyfileSecretName := kube.ObjectKey(r.mdbSearch.GetNamespace(), r.db.KeyfileSecretName())
keyfileSecret := &corev1.Secret{}
Expand All @@ -154,6 +157,7 @@ func (r *MongoDBSearchReconcileHelper) ensureSourceKeyfile(ctx context.Context,
"keyfileHash": hashBytes(keyfileSecret.Data[MongotKeyfileFilename]),
},
)),
CreateKeyfileModificationFunc(r.db.KeyfileSecretName()),
), nil
}

Expand Down Expand Up @@ -229,10 +233,7 @@ func (r *MongoDBSearchReconcileHelper) ensureMongotConfig(ctx context.Context, l

func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification, error) {
if r.mdbSearch.Spec.Security.TLS == nil {
mongotModification := func(config *mongot.Config) {
config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeDisabled
}
return mongotModification, statefulset.NOOP(), nil
return mongot.NOOP(), statefulset.NOOP(), nil
}

// TODO: validate that the certificate in the user-provided Secret in .spec.security.tls.certificateKeySecret is issued by the CA in the operator's CA Secret
Expand All @@ -244,8 +245,12 @@ func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Contex

mongotModification := func(config *mongot.Config) {
certPath := tls.OperatorSecretMountPath + certFileName
config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeTLS
config.Server.Wireproto.TLS.CertificateKeyFile = ptr.To(certPath)
config.Server.Grpc.TLS.Mode = mongot.ConfigTLSModeTLS
config.Server.Grpc.TLS.CertificateKeyFile = ptr.To(certPath)
if config.Server.Wireproto != nil {
config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeTLS
config.Server.Wireproto.TLS.CertificateKeyFile = ptr.To(certPath)
}
}

tlsSecret := r.mdbSearch.TLSOperatorSecretNamespacedName()
Expand All @@ -261,46 +266,34 @@ func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Contex
return mongotModification, statefulsetModification, nil
}

func (r *MongoDBSearchReconcileHelper) ensureEgressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification, error) {
func (r *MongoDBSearchReconcileHelper) ensureEgressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification) {
tlsSourceConfig := r.db.TLSConfig()
if tlsSourceConfig == nil {
return mongot.NOOP(), statefulset.NOOP(), nil
return mongot.NOOP(), statefulset.NOOP()
}

mongotModification := func(config *mongot.Config) {
config.SyncSource.ReplicaSet.TLS = ptr.To(true)
config.SyncSource.CertificateAuthorityFile = ptr.To(tls.CAMountPath + "/" + tlsSourceConfig.CAFileName)

// if the gRPC server is configured to accept TLS connections then toggle mTLS as well
if config.Server.Grpc.TLS.Mode == mongot.ConfigTLSModeTLS {
config.Server.Grpc.TLS.Mode = mongot.ConfigTLSModeMTLS
config.Server.Grpc.TLS.CertificateAuthorityFile = config.SyncSource.CertificateAuthorityFile
}
}

_, containerSecurityContext := podtemplatespec.WithDefaultSecurityContextsModifications()
caVolume := tlsSourceConfig.CAVolume
trustStoreVolume := statefulset.CreateVolumeFromEmptyDir("cacerts")
statefulsetModification := statefulset.WithPodSpecTemplate(podtemplatespec.Apply(
podtemplatespec.WithVolume(caVolume),
podtemplatespec.WithVolume(trustStoreVolume),
podtemplatespec.WithInitContainer("init-cacerts", container.Apply(
container.WithImage(r.buildImageString()),
containerSecurityContext,
container.WithVolumeMounts([]corev1.VolumeMount{
statefulset.CreateVolumeMount(caVolume.Name, tls.CAMountPath, statefulset.WithReadOnly(true)),
statefulset.CreateVolumeMount(trustStoreVolume.Name, "/java/trust-store", statefulset.WithReadOnly(false)),
}),
container.WithCommand([]string{"sh"}),
container.WithArgs([]string{
"-c",
fmt.Sprintf(`
cp /mongot-community/bin/jdk/lib/security/cacerts /java/trust-store/cacerts
/mongot-community/bin/jdk/bin/keytool -keystore /java/trust-store/cacerts -storepass changeit -noprompt -trustcacerts -importcert -alias mongodcert -file %s/%s
`, tls.CAMountPath, tlsSourceConfig.CAFileName),
}),
)),
podtemplatespec.WithContainer(MongotContainerName, container.Apply(
container.WithVolumeMounts([]corev1.VolumeMount{
statefulset.CreateVolumeMount(trustStoreVolume.Name, "/mongot-community/bin/jdk/lib/security/cacerts", statefulset.WithReadOnly(true), statefulset.WithSubPath("cacerts")),
statefulset.CreateVolumeMount(caVolume.Name, tls.CAMountPath, statefulset.WithReadOnly(true)),
}),
)),
))

return mongotModification, statefulsetModification, nil
return mongotModification, statefulsetModification
}

func hashBytes(bytes []byte) string {
Expand All @@ -325,10 +318,17 @@ func buildSearchHeadlessService(search *searchv1.MongoDBSearch) corev1.Service {
SetOwnerReferences(search.GetOwnerReferences())

serviceBuilder.AddPort(&corev1.ServicePort{
Name: "mongot",
Name: "mongot-wireproto",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add service port only if we force wireproto?

Protocol: corev1.ProtocolTCP,
Port: search.GetMongotWireprotoPort(),
TargetPort: intstr.FromInt32(search.GetMongotWireprotoPort()),
})

serviceBuilder.AddPort(&corev1.ServicePort{
Name: "mongot-grpc",
Protocol: corev1.ProtocolTCP,
Port: search.GetMongotPort(),
TargetPort: intstr.FromInt32(search.GetMongotPort()),
Port: search.GetMongotGrpcPort(),
TargetPort: intstr.FromInt32(search.GetMongotGrpcPort()),
})

serviceBuilder.AddPort(&corev1.ServicePort{
Expand Down Expand Up @@ -366,13 +366,24 @@ func createMongotConfig(search *searchv1.MongoDBSearch, db SearchSourceDBResourc
DataPath: MongotDataPath,
}
config.Server = mongot.ConfigServer{
Wireproto: &mongot.ConfigWireproto{
Address: "0.0.0.0:27027",
Grpc: &mongot.ConfigGrpc{
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotGrpcPort()),
TLS: &mongot.ConfigGrpcTLS{
Mode: mongot.ConfigTLSModeDisabled,
},
},
}
if search.IsWireprotoForced() {
config.Server.Wireproto = &mongot.ConfigWireproto{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this is not mutually exclusive with grpc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mongot can run both at the same time. I opted to always enable the gRPC server and only conditionally enable the wireproto server if the annotation is present - the annotation also controls whether the mongot address we generate in mongod's config uses grpc or wireproto. We could only enable one at a time, but to be honest I don't know whether we should: the wireproto endpoint will go away eventually and gRPC will be the only one left.

Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotWireprotoPort()),
Authentication: &mongot.ConfigAuthentication{
Mode: "keyfile",
KeyFile: TempKeyfilePath,
},
},
TLS: &mongot.ConfigWireprotoTLS{
Mode: mongot.ConfigTLSModeDisabled,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why TLS disabled in wireproto?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wireproto and Grpc always start out with TLS disabled in createMongotConfig(). The mongot config modifications in ensureIngressTlsConfig() and ensureEgressTlsConfig() populate the TLS config structures.

},
}
}
config.Metrics = mongot.ConfigMetrics{
Enabled: true,
Expand All @@ -393,19 +404,27 @@ func GetMongodConfigParameters(search *searchv1.MongoDBSearch) map[string]any {
if search.Spec.Security.TLS != nil {
searchTLSMode = automationconfig.TLSModeRequired
}

parameters := map[string]any{
"mongotHost": mongotHostAndPort(search),
"searchIndexManagementHostAndPort": mongotHostAndPort(search),
"skipAuthenticationToSearchIndexManagementServer": false,
"searchTLSMode": string(searchTLSMode),
"useGrpcForSearch": !search.IsWireprotoForced(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the meaning of this, don't we always enable this by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server doesn't enable useGrpcForSearch by default, no. We enable it only when the force-wireproto annotation is not set, but in essenceuseGrpcForSearch will be turned on by default, unless the annotation requests otherwise.

}

return map[string]any{
"setParameter": map[string]any{
"mongotHost": mongotHostAndPort(search),
"searchIndexManagementHostAndPort": mongotHostAndPort(search),
"skipAuthenticationToSearchIndexManagementServer": false,
"searchTLSMode": string(searchTLSMode),
},
"setParameter": parameters,
}
}

func mongotHostAndPort(search *searchv1.MongoDBSearch) string {
svcName := search.SearchServiceNamespacedName()
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", svcName.Name, svcName.Namespace, search.GetMongotPort())
port := search.GetMongotGrpcPort()
if search.IsWireprotoForced() {
port = search.GetMongotWireprotoPort()
}
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", svcName.Name, svcName.Namespace, port)
}

func (r *MongoDBSearchReconcileHelper) ValidateSingleMongoDBSearchForSearchSource(ctx context.Context) error {
Expand Down
Loading
Loading