From 3ec055e95bc39272e9828e2ec5d6aac913a84303 Mon Sep 17 00:00:00 2001 From: Yavor Georgiev Date: Mon, 13 Oct 2025 14:23:59 +0200 Subject: [PATCH 1/9] [Search] Implement gRPC and mTLS --- api/v1/search/mongodbsearch_types.go | 18 ++- api/v1/search/zz_generated.deepcopy.go | 8 +- controllers/om/deployment.go | 2 +- .../operator/mongodbsearch_controller.go | 6 +- .../operator/mongodbsearch_controller_test.go | 10 +- .../mongodbsearch_reconcile_helper.go | 121 ++++++++++-------- .../searchcontroller/search_construction.go | 59 ++++++--- ...nity-replicaset-sample-mflix-external.yaml | 45 +------ .../community-replicaset-sample-mflix.yaml | 2 +- .../enterprise-replicaset-sample-mflix.yaml | 2 +- .../search_community_external_mongod_basic.py | 1 + .../search_community_external_mongod_tls.py | 1 + .../tests/search/search_community_tls.py | 28 +--- .../tests/search/search_enterprise_tls.py | 33 +---- .../pkg/mongot/mongot_config.go | 20 ++- pkg/util/stringutil/stringutil.go | 2 +- 16 files changed, 170 insertions(+), 188 deletions(-) diff --git a/api/v1/search/mongodbsearch_types.go b/api/v1/search/mongodbsearch_types.go index ce1ef5176..6a429cd29 100644 --- a/api/v1/search/mongodbsearch_types.go +++ b/api/v1/search/mongodbsearch_types.go @@ -17,10 +17,13 @@ import ( ) const ( - MongotDefaultPort = 27027 + MongotDefaultWireprotoPort = 27027 + MongotDefaultGrpcPort = 27028 MongotDefaultMetricsPort = 9946 MongotDefautHealthCheckPort = 8080 MongotDefaultSyncSourceUsername = "search-sync-source" + + ForceWireprotoTransportAnnotation = "mongodb.com/v1.force-wireproto-transport" ) func init() { @@ -207,8 +210,12 @@ func (s *MongoDBSearch) GetMongoDBResourceRef() *userv1.MongoDBResourceRef { return &mdbResourceRef } -func (s *MongoDBSearch) GetMongotPort() int32 { - return MongotDefaultPort +func (s *MongoDBSearch) GetMongotWireprotoPort() int32 { + return MongotDefaultWireprotoPort +} + +func (s *MongoDBSearch) GetMongotGrpcPort() int32 { + return MongotDefaultGrpcPort } func (s *MongoDBSearch) GetMongotMetricsPort() int32 { @@ -241,3 +248,8 @@ func (s *MongoDBSearch) GetLogLevel() mdb.LogLevel { return s.Spec.LogLevel } + +func (s *MongoDBSearch) IsWireprotoForced() bool { + val, ok := s.Annotations[ForceWireprotoTransportAnnotation] + return ok && val == "true" +} diff --git a/api/v1/search/zz_generated.deepcopy.go b/api/v1/search/zz_generated.deepcopy.go index c66322146..d18d025f8 100644 --- a/api/v1/search/zz_generated.deepcopy.go +++ b/api/v1/search/zz_generated.deepcopy.go @@ -159,7 +159,7 @@ func (in *MongoDBSearchSpec) DeepCopyInto(out *MongoDBSearchSpec) { *out = new(v1.ResourceRequirements) (*in).DeepCopyInto(*out) } - out.Security = in.Security + in.Security.DeepCopyInto(&out.Security) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MongoDBSearchSpec. @@ -231,7 +231,11 @@ func (in *MongoDBSource) DeepCopy() *MongoDBSource { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Security) DeepCopyInto(out *Security) { *out = *in - out.TLS = in.TLS + if in.TLS != nil { + in, out := &in.TLS, &out.TLS + *out = new(TLS) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Security. diff --git a/controllers/om/deployment.go b/controllers/om/deployment.go index fd7f7f776..f32946b58 100644 --- a/controllers/om/deployment.go +++ b/controllers/om/deployment.go @@ -874,7 +874,7 @@ func (d Deployment) GetAllProcessNames() (names []string) { for _, p := range d.getProcesses() { names = append(names, p.Name()) } - return + return names } func (d Deployment) getProcesses() []Process { diff --git a/controllers/operator/mongodbsearch_controller.go b/controllers/operator/mongodbsearch_controller.go index 16db88027..a4fc8c63e 100644 --- a/controllers/operator/mongodbsearch_controller.go +++ b/controllers/operator/mongodbsearch_controller.go @@ -57,7 +57,11 @@ func (r *MongoDBSearchReconciler) Reconcile(ctx context.Context, request reconci return reconcile.Result{RequeueAfter: time.Second * util.RetryTimeSec}, err } - r.watch.AddWatchedResourceIfNotAdded(searchSource.KeyfileSecretName(), mdbSearch.Namespace, watch.Secret, mdbSearch.NamespacedName()) + if mdbSearch.IsWireprotoForced() { + log.Info("Enabling the mongot wireproto server as required by annotation") + // the keyfile secret is necessary for wireproto authentication + r.watch.AddWatchedResourceIfNotAdded(searchSource.KeyfileSecretName(), mdbSearch.Namespace, watch.Secret, mdbSearch.NamespacedName()) + } // Watch for changes in database source CA certificate secrets or configmaps tlsSourceConfig := searchSource.TLSConfig() diff --git a/controllers/operator/mongodbsearch_controller_test.go b/controllers/operator/mongodbsearch_controller_test.go index 472ea419a..49d99888d 100644 --- a/controllers/operator/mongodbsearch_controller_test.go +++ b/controllers/operator/mongodbsearch_controller_test.go @@ -115,13 +115,9 @@ func buildExpectedMongotConfig(search *searchv1.MongoDBSearch, mdbc *mdbcv1.Mong DataPath: searchcontroller.MongotDataPath, }, Server: mongot.ConfigServer{ - Wireproto: &mongot.ConfigWireproto{ - Address: "0.0.0.0:27027", - Authentication: &mongot.ConfigAuthentication{ - Mode: "keyfile", - KeyFile: searchcontroller.TempKeyfilePath, - }, - TLS: mongot.ConfigTLS{Mode: mongot.ConfigTLSModeDisabled}, + Grpc: &mongot.ConfigGrpc{ + Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotGrpcPort()), + TLS: &mongot.ConfigGrpcTLS{Mode: mongot.ConfigTLSModeDisabled}, }, }, Metrics: mongot.ConfigMetrics{ diff --git a/controllers/searchcontroller/mongodbsearch_reconcile_helper.go b/controllers/searchcontroller/mongodbsearch_reconcile_helper.go index a763be9a6..a8d50df5b 100644 --- a/controllers/searchcontroller/mongodbsearch_reconcile_helper.go +++ b/controllers/searchcontroller/mongodbsearch_reconcile_helper.go @@ -97,11 +97,15 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S return workflow.Failed(err) } - keyfileStsModification, err := r.ensureSourceKeyfile(ctx, log) - if apierrors.IsNotFound(err) { - return workflow.Pending("Waiting for keyfile secret to be created") - } else if err != nil { - return workflow.Failed(err) + keyfileStsModification := statefulset.NOOP() + if r.mdbSearch.IsWireprotoForced() { + var err error + keyfileStsModification, err = r.ensureSourceKeyfile(ctx, log) + if apierrors.IsNotFound(err) { + return workflow.Pending("Waiting for keyfile secret to be created") + } else if err != nil { + return workflow.Failed(err) + } } if err := r.ensureSearchService(ctx, r.mdbSearch); err != nil { @@ -113,11 +117,9 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S return workflow.Failed(err) } - egressTlsMongotModification, egressTlsStsModification, err := r.ensureEgressTlsConfig(ctx) - if err != nil { - return workflow.Failed(err) - } + egressTlsMongotModification, egressTlsStsModification := r.ensureEgressTlsConfig(ctx) + // the egress TLS modification needs to always be applied after the ingress one, because it toggles mTLS based on the mode set by the ingress modification configHash, err := r.ensureMongotConfig(ctx, log, createMongotConfig(r.mdbSearch, r.db), ingressTlsMongotModification, egressTlsMongotModification) if err != nil { return workflow.Failed(err) @@ -140,6 +142,7 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S return workflow.OK() } +// This is called only if the wireproto server is enabled, to set up they keyfile necessary for authentication. func (r *MongoDBSearchReconcileHelper) ensureSourceKeyfile(ctx context.Context, log *zap.SugaredLogger) (statefulset.Modification, error) { keyfileSecretName := kube.ObjectKey(r.mdbSearch.GetNamespace(), r.db.KeyfileSecretName()) keyfileSecret := &corev1.Secret{} @@ -154,6 +157,7 @@ func (r *MongoDBSearchReconcileHelper) ensureSourceKeyfile(ctx context.Context, "keyfileHash": hashBytes(keyfileSecret.Data[MongotKeyfileFilename]), }, )), + CreateKeyfileModificationFunc(r.db.KeyfileSecretName()), ), nil } @@ -229,10 +233,7 @@ func (r *MongoDBSearchReconcileHelper) ensureMongotConfig(ctx context.Context, l func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification, error) { if r.mdbSearch.Spec.Security.TLS == nil { - mongotModification := func(config *mongot.Config) { - config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeDisabled - } - return mongotModification, statefulset.NOOP(), nil + return mongot.NOOP(), statefulset.NOOP(), nil } // TODO: validate that the certificate in the user-provided Secret in .spec.security.tls.certificateKeySecret is issued by the CA in the operator's CA Secret @@ -244,8 +245,12 @@ func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Contex mongotModification := func(config *mongot.Config) { certPath := tls.OperatorSecretMountPath + certFileName - config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeTLS - config.Server.Wireproto.TLS.CertificateKeyFile = ptr.To(certPath) + config.Server.Grpc.TLS.Mode = mongot.ConfigTLSModeTLS + config.Server.Grpc.TLS.CertificateKeyFile = ptr.To(certPath) + if config.Server.Wireproto != nil { + config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeTLS + config.Server.Wireproto.TLS.CertificateKeyFile = ptr.To(certPath) + } } tlsSecret := r.mdbSearch.TLSOperatorSecretNamespacedName() @@ -261,46 +266,34 @@ func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Contex return mongotModification, statefulsetModification, nil } -func (r *MongoDBSearchReconcileHelper) ensureEgressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification, error) { +func (r *MongoDBSearchReconcileHelper) ensureEgressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification) { tlsSourceConfig := r.db.TLSConfig() if tlsSourceConfig == nil { - return mongot.NOOP(), statefulset.NOOP(), nil + return mongot.NOOP(), statefulset.NOOP() } mongotModification := func(config *mongot.Config) { config.SyncSource.ReplicaSet.TLS = ptr.To(true) + config.SyncSource.CertificateAuthorityFile = ptr.To(tls.CAMountPath + "/" + tlsSourceConfig.CAFileName) + + // if the gRPC server is configured to accept TLS connections then toggle mTLS as well + if config.Server.Grpc.TLS.Mode == mongot.ConfigTLSModeTLS { + config.Server.Grpc.TLS.Mode = mongot.ConfigTLSModeMTLS + config.Server.Grpc.TLS.CertificateAuthorityFile = config.SyncSource.CertificateAuthorityFile + } } - _, containerSecurityContext := podtemplatespec.WithDefaultSecurityContextsModifications() caVolume := tlsSourceConfig.CAVolume - trustStoreVolume := statefulset.CreateVolumeFromEmptyDir("cacerts") statefulsetModification := statefulset.WithPodSpecTemplate(podtemplatespec.Apply( podtemplatespec.WithVolume(caVolume), - podtemplatespec.WithVolume(trustStoreVolume), - podtemplatespec.WithInitContainer("init-cacerts", container.Apply( - container.WithImage(r.buildImageString()), - containerSecurityContext, - container.WithVolumeMounts([]corev1.VolumeMount{ - statefulset.CreateVolumeMount(caVolume.Name, tls.CAMountPath, statefulset.WithReadOnly(true)), - statefulset.CreateVolumeMount(trustStoreVolume.Name, "/java/trust-store", statefulset.WithReadOnly(false)), - }), - container.WithCommand([]string{"sh"}), - container.WithArgs([]string{ - "-c", - fmt.Sprintf(` -cp /mongot-community/bin/jdk/lib/security/cacerts /java/trust-store/cacerts -/mongot-community/bin/jdk/bin/keytool -keystore /java/trust-store/cacerts -storepass changeit -noprompt -trustcacerts -importcert -alias mongodcert -file %s/%s - `, tls.CAMountPath, tlsSourceConfig.CAFileName), - }), - )), podtemplatespec.WithContainer(MongotContainerName, container.Apply( container.WithVolumeMounts([]corev1.VolumeMount{ - statefulset.CreateVolumeMount(trustStoreVolume.Name, "/mongot-community/bin/jdk/lib/security/cacerts", statefulset.WithReadOnly(true), statefulset.WithSubPath("cacerts")), + statefulset.CreateVolumeMount(caVolume.Name, tls.CAMountPath, statefulset.WithReadOnly(true)), }), )), )) - return mongotModification, statefulsetModification, nil + return mongotModification, statefulsetModification } func hashBytes(bytes []byte) string { @@ -325,10 +318,17 @@ func buildSearchHeadlessService(search *searchv1.MongoDBSearch) corev1.Service { SetOwnerReferences(search.GetOwnerReferences()) serviceBuilder.AddPort(&corev1.ServicePort{ - Name: "mongot", + Name: "mongot-wireproto", + Protocol: corev1.ProtocolTCP, + Port: search.GetMongotWireprotoPort(), + TargetPort: intstr.FromInt32(search.GetMongotWireprotoPort()), + }) + + serviceBuilder.AddPort(&corev1.ServicePort{ + Name: "mongot-grpc", Protocol: corev1.ProtocolTCP, - Port: search.GetMongotPort(), - TargetPort: intstr.FromInt32(search.GetMongotPort()), + Port: search.GetMongotGrpcPort(), + TargetPort: intstr.FromInt32(search.GetMongotGrpcPort()), }) serviceBuilder.AddPort(&corev1.ServicePort{ @@ -366,13 +366,24 @@ func createMongotConfig(search *searchv1.MongoDBSearch, db SearchSourceDBResourc DataPath: MongotDataPath, } config.Server = mongot.ConfigServer{ - Wireproto: &mongot.ConfigWireproto{ - Address: "0.0.0.0:27027", + Grpc: &mongot.ConfigGrpc{ + Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotGrpcPort()), + TLS: &mongot.ConfigGrpcTLS{ + Mode: mongot.ConfigTLSModeDisabled, + }, + }, + } + if search.IsWireprotoForced() { + config.Server.Wireproto = &mongot.ConfigWireproto{ + Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotWireprotoPort()), Authentication: &mongot.ConfigAuthentication{ Mode: "keyfile", KeyFile: TempKeyfilePath, }, - }, + TLS: &mongot.ConfigWireprotoTLS{ + Mode: mongot.ConfigTLSModeDisabled, + }, + } } config.Metrics = mongot.ConfigMetrics{ Enabled: true, @@ -393,19 +404,27 @@ func GetMongodConfigParameters(search *searchv1.MongoDBSearch) map[string]any { if search.Spec.Security.TLS != nil { searchTLSMode = automationconfig.TLSModeRequired } + + parameters := map[string]any{ + "mongotHost": mongotHostAndPort(search), + "searchIndexManagementHostAndPort": mongotHostAndPort(search), + "skipAuthenticationToSearchIndexManagementServer": false, + "searchTLSMode": string(searchTLSMode), + "useGrpcForSearch": !search.IsWireprotoForced(), + } + return map[string]any{ - "setParameter": map[string]any{ - "mongotHost": mongotHostAndPort(search), - "searchIndexManagementHostAndPort": mongotHostAndPort(search), - "skipAuthenticationToSearchIndexManagementServer": false, - "searchTLSMode": string(searchTLSMode), - }, + "setParameter": parameters, } } func mongotHostAndPort(search *searchv1.MongoDBSearch) string { svcName := search.SearchServiceNamespacedName() - return fmt.Sprintf("%s.%s.svc.cluster.local:%d", svcName.Name, svcName.Namespace, search.GetMongotPort()) + port := search.GetMongotGrpcPort() + if search.IsWireprotoForced() { + port = search.GetMongotWireprotoPort() + } + return fmt.Sprintf("%s.%s.svc.cluster.local:%d", svcName.Name, svcName.Namespace, port) } func (r *MongoDBSearchReconcileHelper) ValidateSingleMongoDBSearchForSearchSource(ctx context.Context) error { diff --git a/controllers/searchcontroller/search_construction.go b/controllers/searchcontroller/search_construction.go index 4c9181e44..c1884c040 100644 --- a/controllers/searchcontroller/search_construction.go +++ b/controllers/searchcontroller/search_construction.go @@ -62,15 +62,11 @@ func CreateSearchStatefulSetFunc(mdbSearch *searchv1.MongoDBSearch, sourceDBReso tmpVolumeMount := statefulset.CreateVolumeMount(tmpVolume.Name, tempVolumePath, statefulset.WithReadOnly(false)) dataVolumeName := "data" - keyfileVolumeName := "keyfile" sourceUserPasswordVolumeName := "password" mongotConfigVolumeName := "config" pvcVolumeMount := statefulset.CreateVolumeMount(dataVolumeName, MongotDataPath, statefulset.WithSubPath("data")) - keyfileVolume := statefulset.CreateVolumeFromSecret(keyfileVolumeName, sourceDBResource.KeyfileSecretName()) - keyfileVolumeMount := statefulset.CreateVolumeMount(keyfileVolumeName, MongotKeyfilePath, statefulset.WithReadOnly(true), statefulset.WithSubPath(MongotKeyfileFilename)) - sourceUserPasswordSecretKey := mdbSearch.SourceUserPasswordSecretRef() sourceUserPasswordVolume := statefulset.CreateVolumeFromSecret(sourceUserPasswordVolumeName, sourceUserPasswordSecretKey.Name) sourceUserPasswordVolumeMount := statefulset.CreateVolumeMount(sourceUserPasswordVolumeName, MongotSourceUserPasswordPath, statefulset.WithReadOnly(true), statefulset.WithSubPath(sourceUserPasswordSecretKey.Key)) @@ -90,7 +86,6 @@ func CreateSearchStatefulSetFunc(mdbSearch *searchv1.MongoDBSearch, sourceDBReso volumeMounts := []corev1.VolumeMount{ pvcVolumeMount, - keyfileVolumeMount, tmpVolumeMount, mongotConfigVolumeMount, sourceUserPasswordVolumeMount, @@ -98,7 +93,6 @@ func CreateSearchStatefulSetFunc(mdbSearch *searchv1.MongoDBSearch, sourceDBReso volumes := []corev1.Volume{ tmpVolume, - keyfileVolume, mongotConfigVolume, sourceUserPasswordVolume, } @@ -135,6 +129,26 @@ func CreateSearchStatefulSetFunc(mdbSearch *searchv1.MongoDBSearch, sourceDBReso return statefulset.Apply(stsModifications...) } +func CreateKeyfileModificationFunc(keyfileSecretName string) statefulset.Modification { + keyfileVolumeName := "keyfile" + keyfileVolume := statefulset.CreateVolumeFromSecret(keyfileVolumeName, keyfileSecretName) + keyfileVolumeMount := statefulset.CreateVolumeMount(keyfileVolumeName, MongotKeyfilePath, statefulset.WithReadOnly(true), statefulset.WithSubPath(MongotKeyfileFilename)) + + return statefulset.Apply( + statefulset.WithPodSpecTemplate( + podtemplatespec.Apply( + podtemplatespec.WithVolumes([]corev1.Volume{keyfileVolume}), + podtemplatespec.WithContainer(MongotContainerName, + container.Apply( + container.WithVolumeMounts([]corev1.VolumeMount{keyfileVolumeMount}), + prependCommand(sensitiveFilePermissionsWorkaround(MongotKeyfilePath, TempKeyfilePath)), + ), + ), + ), + ), + ) +} + func mongodbSearchContainer(mdbSearch *searchv1.MongoDBSearch, volumeMounts []corev1.VolumeMount, searchImage string) container.Modification { _, containerSecurityContext := podtemplatespec.WithDefaultSecurityContextsModifications() return container.Apply( @@ -148,18 +162,9 @@ func mongodbSearchContainer(mdbSearch *searchv1.MongoDBSearch, volumeMounts []co container.WithCommand([]string{"sh"}), container.WithArgs([]string{ "-c", - fmt.Sprintf(` -cp %[1]s %[2]s -chown 2000:2000 %[2]s -chmod 0600 %[2]s - -cp %[3]s %[4]s -chown 2000:2000 %[4]s -chmod 0600 %[4]s - -/mongot-community/mongot --config /mongot/config.yml -`, MongotKeyfilePath, TempKeyfilePath, MongotSourceUserPasswordPath, TempSourceUserPasswordPath), + "/mongot-community/mongot --config /mongot/config.yml", }), + prependCommand(sensitiveFilePermissionsWorkaround(MongotSourceUserPasswordPath, TempSourceUserPasswordPath)), containerSecurityContext, ) } @@ -217,3 +222,23 @@ func newSearchDefaultRequirements() corev1.ResourceRequirements { }, } } + +// The container command is set to "sh" and args is ["-c", "