-
Notifications
You must be signed in to change notification settings - Fork 22
[Search] Implement gRPC and mTLS #527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
3ec055e
6593b18
b791c36
53e9334
cdc55d4
83e9b33
14bedcf
8dc3d81
f4a1a34
75c83de
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -115,13 +115,9 @@ func buildExpectedMongotConfig(search *searchv1.MongoDBSearch, mdbc *mdbcv1.Mong | |
DataPath: searchcontroller.MongotDataPath, | ||
}, | ||
Server: mongot.ConfigServer{ | ||
Wireproto: &mongot.ConfigWireproto{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we still keep a wireproto test as long as it is possible to enabled it? |
||
Address: "0.0.0.0:27027", | ||
Authentication: &mongot.ConfigAuthentication{ | ||
Mode: "keyfile", | ||
KeyFile: searchcontroller.TempKeyfilePath, | ||
}, | ||
TLS: mongot.ConfigTLS{Mode: mongot.ConfigTLSModeDisabled}, | ||
Grpc: &mongot.ConfigGrpc{ | ||
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotGrpcPort()), | ||
TLS: &mongot.ConfigGrpcTLS{Mode: mongot.ConfigTLSModeDisabled}, | ||
}, | ||
}, | ||
Metrics: mongot.ConfigMetrics{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -97,11 +97,15 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S | |
return workflow.Failed(err) | ||
} | ||
|
||
keyfileStsModification, err := r.ensureSourceKeyfile(ctx, log) | ||
if apierrors.IsNotFound(err) { | ||
return workflow.Pending("Waiting for keyfile secret to be created") | ||
} else if err != nil { | ||
return workflow.Failed(err) | ||
keyfileStsModification := statefulset.NOOP() | ||
if r.mdbSearch.IsWireprotoForced() { | ||
var err error | ||
keyfileStsModification, err = r.ensureSourceKeyfile(ctx, log) | ||
if apierrors.IsNotFound(err) { | ||
return workflow.Pending("Waiting for keyfile secret to be created") | ||
} else if err != nil { | ||
return workflow.Failed(err) | ||
} | ||
} | ||
|
||
if err := r.ensureSearchService(ctx, r.mdbSearch); err != nil { | ||
|
@@ -113,11 +117,9 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S | |
return workflow.Failed(err) | ||
} | ||
|
||
egressTlsMongotModification, egressTlsStsModification, err := r.ensureEgressTlsConfig(ctx) | ||
if err != nil { | ||
return workflow.Failed(err) | ||
} | ||
egressTlsMongotModification, egressTlsStsModification := r.ensureEgressTlsConfig(ctx) | ||
|
||
// the egress TLS modification needs to always be applied after the ingress one, because it toggles mTLS based on the mode set by the ingress modification | ||
configHash, err := r.ensureMongotConfig(ctx, log, createMongotConfig(r.mdbSearch, r.db), ingressTlsMongotModification, egressTlsMongotModification) | ||
if err != nil { | ||
return workflow.Failed(err) | ||
|
@@ -140,6 +142,7 @@ func (r *MongoDBSearchReconcileHelper) reconcile(ctx context.Context, log *zap.S | |
return workflow.OK() | ||
} | ||
|
||
// This is called only if the wireproto server is enabled, to set up they keyfile necessary for authentication. | ||
func (r *MongoDBSearchReconcileHelper) ensureSourceKeyfile(ctx context.Context, log *zap.SugaredLogger) (statefulset.Modification, error) { | ||
keyfileSecretName := kube.ObjectKey(r.mdbSearch.GetNamespace(), r.db.KeyfileSecretName()) | ||
keyfileSecret := &corev1.Secret{} | ||
|
@@ -154,6 +157,7 @@ func (r *MongoDBSearchReconcileHelper) ensureSourceKeyfile(ctx context.Context, | |
"keyfileHash": hashBytes(keyfileSecret.Data[MongotKeyfileFilename]), | ||
}, | ||
)), | ||
CreateKeyfileModificationFunc(r.db.KeyfileSecretName()), | ||
), nil | ||
} | ||
|
||
|
@@ -229,10 +233,7 @@ func (r *MongoDBSearchReconcileHelper) ensureMongotConfig(ctx context.Context, l | |
|
||
func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification, error) { | ||
if r.mdbSearch.Spec.Security.TLS == nil { | ||
mongotModification := func(config *mongot.Config) { | ||
config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeDisabled | ||
} | ||
return mongotModification, statefulset.NOOP(), nil | ||
return mongot.NOOP(), statefulset.NOOP(), nil | ||
} | ||
|
||
// TODO: validate that the certificate in the user-provided Secret in .spec.security.tls.certificateKeySecret is issued by the CA in the operator's CA Secret | ||
|
@@ -244,8 +245,12 @@ func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Contex | |
|
||
mongotModification := func(config *mongot.Config) { | ||
certPath := tls.OperatorSecretMountPath + certFileName | ||
config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeTLS | ||
config.Server.Wireproto.TLS.CertificateKeyFile = ptr.To(certPath) | ||
config.Server.Grpc.TLS.Mode = mongot.ConfigTLSModeTLS | ||
config.Server.Grpc.TLS.CertificateKeyFile = ptr.To(certPath) | ||
if config.Server.Wireproto != nil { | ||
config.Server.Wireproto.TLS.Mode = mongot.ConfigTLSModeTLS | ||
config.Server.Wireproto.TLS.CertificateKeyFile = ptr.To(certPath) | ||
} | ||
} | ||
|
||
tlsSecret := r.mdbSearch.TLSOperatorSecretNamespacedName() | ||
|
@@ -261,46 +266,34 @@ func (r *MongoDBSearchReconcileHelper) ensureIngressTlsConfig(ctx context.Contex | |
return mongotModification, statefulsetModification, nil | ||
} | ||
|
||
func (r *MongoDBSearchReconcileHelper) ensureEgressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification, error) { | ||
func (r *MongoDBSearchReconcileHelper) ensureEgressTlsConfig(ctx context.Context) (mongot.Modification, statefulset.Modification) { | ||
tlsSourceConfig := r.db.TLSConfig() | ||
if tlsSourceConfig == nil { | ||
return mongot.NOOP(), statefulset.NOOP(), nil | ||
return mongot.NOOP(), statefulset.NOOP() | ||
} | ||
|
||
mongotModification := func(config *mongot.Config) { | ||
config.SyncSource.ReplicaSet.TLS = ptr.To(true) | ||
config.SyncSource.CertificateAuthorityFile = ptr.To(tls.CAMountPath + "/" + tlsSourceConfig.CAFileName) | ||
|
||
// if the gRPC server is configured to accept TLS connections then toggle mTLS as well | ||
if config.Server.Grpc.TLS.Mode == mongot.ConfigTLSModeTLS { | ||
config.Server.Grpc.TLS.Mode = mongot.ConfigTLSModeMTLS | ||
config.Server.Grpc.TLS.CertificateAuthorityFile = config.SyncSource.CertificateAuthorityFile | ||
} | ||
} | ||
|
||
_, containerSecurityContext := podtemplatespec.WithDefaultSecurityContextsModifications() | ||
caVolume := tlsSourceConfig.CAVolume | ||
trustStoreVolume := statefulset.CreateVolumeFromEmptyDir("cacerts") | ||
statefulsetModification := statefulset.WithPodSpecTemplate(podtemplatespec.Apply( | ||
podtemplatespec.WithVolume(caVolume), | ||
podtemplatespec.WithVolume(trustStoreVolume), | ||
podtemplatespec.WithInitContainer("init-cacerts", container.Apply( | ||
container.WithImage(r.buildImageString()), | ||
containerSecurityContext, | ||
container.WithVolumeMounts([]corev1.VolumeMount{ | ||
statefulset.CreateVolumeMount(caVolume.Name, tls.CAMountPath, statefulset.WithReadOnly(true)), | ||
statefulset.CreateVolumeMount(trustStoreVolume.Name, "/java/trust-store", statefulset.WithReadOnly(false)), | ||
}), | ||
container.WithCommand([]string{"sh"}), | ||
container.WithArgs([]string{ | ||
"-c", | ||
fmt.Sprintf(` | ||
cp /mongot-community/bin/jdk/lib/security/cacerts /java/trust-store/cacerts | ||
/mongot-community/bin/jdk/bin/keytool -keystore /java/trust-store/cacerts -storepass changeit -noprompt -trustcacerts -importcert -alias mongodcert -file %s/%s | ||
`, tls.CAMountPath, tlsSourceConfig.CAFileName), | ||
}), | ||
)), | ||
podtemplatespec.WithContainer(MongotContainerName, container.Apply( | ||
container.WithVolumeMounts([]corev1.VolumeMount{ | ||
statefulset.CreateVolumeMount(trustStoreVolume.Name, "/mongot-community/bin/jdk/lib/security/cacerts", statefulset.WithReadOnly(true), statefulset.WithSubPath("cacerts")), | ||
statefulset.CreateVolumeMount(caVolume.Name, tls.CAMountPath, statefulset.WithReadOnly(true)), | ||
}), | ||
)), | ||
)) | ||
|
||
return mongotModification, statefulsetModification, nil | ||
return mongotModification, statefulsetModification | ||
} | ||
|
||
func hashBytes(bytes []byte) string { | ||
|
@@ -325,10 +318,17 @@ func buildSearchHeadlessService(search *searchv1.MongoDBSearch) corev1.Service { | |
SetOwnerReferences(search.GetOwnerReferences()) | ||
|
||
serviceBuilder.AddPort(&corev1.ServicePort{ | ||
Name: "mongot", | ||
Name: "mongot-wireproto", | ||
|
||
Protocol: corev1.ProtocolTCP, | ||
Port: search.GetMongotWireprotoPort(), | ||
TargetPort: intstr.FromInt32(search.GetMongotWireprotoPort()), | ||
}) | ||
|
||
serviceBuilder.AddPort(&corev1.ServicePort{ | ||
Name: "mongot-grpc", | ||
Protocol: corev1.ProtocolTCP, | ||
Port: search.GetMongotPort(), | ||
TargetPort: intstr.FromInt32(search.GetMongotPort()), | ||
Port: search.GetMongotGrpcPort(), | ||
TargetPort: intstr.FromInt32(search.GetMongotGrpcPort()), | ||
}) | ||
|
||
serviceBuilder.AddPort(&corev1.ServicePort{ | ||
|
@@ -366,13 +366,24 @@ func createMongotConfig(search *searchv1.MongoDBSearch, db SearchSourceDBResourc | |
DataPath: MongotDataPath, | ||
} | ||
config.Server = mongot.ConfigServer{ | ||
Wireproto: &mongot.ConfigWireproto{ | ||
Address: "0.0.0.0:27027", | ||
Grpc: &mongot.ConfigGrpc{ | ||
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotGrpcPort()), | ||
TLS: &mongot.ConfigGrpcTLS{ | ||
Mode: mongot.ConfigTLSModeDisabled, | ||
}, | ||
}, | ||
} | ||
if search.IsWireprotoForced() { | ||
config.Server.Wireproto = &mongot.ConfigWireproto{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so this is not mutually exclusive with grpc? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mongot can run both at the same time. I opted to always enable the gRPC server and only conditionally enable the wireproto server if the annotation is present - the annotation also controls whether the mongot address we generate in mongod's config uses grpc or wireproto. We could only enable one at a time, but to be honest I don't know whether we should: the wireproto endpoint will go away eventually and gRPC will be the only one left. |
||
Address: fmt.Sprintf("0.0.0.0:%d", search.GetMongotWireprotoPort()), | ||
Authentication: &mongot.ConfigAuthentication{ | ||
Mode: "keyfile", | ||
KeyFile: TempKeyfilePath, | ||
}, | ||
}, | ||
TLS: &mongot.ConfigWireprotoTLS{ | ||
Mode: mongot.ConfigTLSModeDisabled, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why TLS disabled in wireproto? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wireproto and Grpc always start out with TLS disabled in |
||
}, | ||
} | ||
} | ||
config.Metrics = mongot.ConfigMetrics{ | ||
Enabled: true, | ||
|
@@ -393,19 +404,27 @@ func GetMongodConfigParameters(search *searchv1.MongoDBSearch) map[string]any { | |
if search.Spec.Security.TLS != nil { | ||
searchTLSMode = automationconfig.TLSModeRequired | ||
} | ||
|
||
parameters := map[string]any{ | ||
"mongotHost": mongotHostAndPort(search), | ||
"searchIndexManagementHostAndPort": mongotHostAndPort(search), | ||
"skipAuthenticationToSearchIndexManagementServer": false, | ||
"searchTLSMode": string(searchTLSMode), | ||
"useGrpcForSearch": !search.IsWireprotoForced(), | ||
|
||
} | ||
|
||
return map[string]any{ | ||
"setParameter": map[string]any{ | ||
"mongotHost": mongotHostAndPort(search), | ||
"searchIndexManagementHostAndPort": mongotHostAndPort(search), | ||
"skipAuthenticationToSearchIndexManagementServer": false, | ||
"searchTLSMode": string(searchTLSMode), | ||
}, | ||
"setParameter": parameters, | ||
} | ||
} | ||
|
||
func mongotHostAndPort(search *searchv1.MongoDBSearch) string { | ||
svcName := search.SearchServiceNamespacedName() | ||
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", svcName.Name, svcName.Namespace, search.GetMongotPort()) | ||
port := search.GetMongotGrpcPort() | ||
if search.IsWireprotoForced() { | ||
port = search.GetMongotWireprotoPort() | ||
} | ||
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", svcName.Name, svcName.Namespace, port) | ||
} | ||
|
||
func (r *MongoDBSearchReconcileHelper) ValidateSingleMongoDBSearchForSearchSource(ctx context.Context) error { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of all these if statements, would it make sense to abstract the wireproto stuff away in a struct and then it's also easier to just remove it.