From 2abe65e1cb597fcc7941b245ce18f80a57f84815 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 23 Jun 2025 08:55:36 -0400 Subject: [PATCH 1/2] Conformance tests: improve awaitServiceImport We can improve it by utilizing the form of 'Eventually' that accepts a function that that takes a single Gomega argument which is used to make assertions. 'Eventually' succeeds only if all the assertions in the polled function pass. This is simpler than first polling to find the ServiceImport based on initial checks then making separate assertions to provide better error output that typically overlap with the initial checks. Signed-off-by: Tom Pantelis --- conformance/clusterip_service_dns.go | 9 +-- conformance/conformance_suite.go | 32 +++++--- conformance/service_import.go | 108 +++++++++++---------------- 3 files changed, 68 insertions(+), 81 deletions(-) diff --git a/conformance/clusterip_service_dns.go b/conformance/clusterip_service_dns.go index c239782..82d940d 100644 --- a/conformance/clusterip_service_dns.go +++ b/conformance/clusterip_service_dns.go @@ -44,11 +44,10 @@ var _ = Describe("", Label(OptionalLabel, DNSLabel, ClusterIPLabel), func() { serviceImports := []*v1alpha1.ServiceImport{} for _, client := range clients { - serviceImport := t.awaitServiceImport(&client, t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Spec.IPs) > 0 - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found on cluster %q", client.name) - Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), "ServiceImport on cluster %q does not contain an IP", client.name) + serviceImport := t.awaitServiceImport(&client, t.helloService.Name, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), "ServiceImport on cluster %q does not contain an IP", client.name) + }) serviceImports = append(serviceImports, serviceImport) } diff --git a/conformance/conformance_suite.go b/conformance/conformance_suite.go index 5d97f31..ccb9701 100644 --- a/conformance/conformance_suite.go +++ b/conformance/conformance_suite.go @@ -37,7 +37,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" rest "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -227,22 +226,31 @@ func (t *testDriver) getServiceImport(c *clusterClients, name string) *v1alpha1. return si } -func (t *testDriver) awaitServiceImport(c *clusterClients, name string, verify func(*v1alpha1.ServiceImport) bool) *v1alpha1.ServiceImport { +func (t *testDriver) awaitServiceImport(c *clusterClients, name string, reportNonConformanceOnMissing bool, + verify func(Gomega, *v1alpha1.ServiceImport)) *v1alpha1.ServiceImport { var serviceImport *v1alpha1.ServiceImport - _ = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, - 20*time.Second, true, func(ctx context.Context) (bool, error) { - defer GinkgoRecover() + Eventually(func(g Gomega) { + si := t.getServiceImport(c, name) - si := t.getServiceImport(c, name) - if si == nil { - return false, nil - } + missingMsg := fmt.Sprintf("ServiceImport was not found on cluster %q", c.name) - serviceImport = si + var missing any = missingMsg + if reportNonConformanceOnMissing { + missing = reportNonConformant(missingMsg) + } - return verify == nil || verify(serviceImport), nil - }) + g.Expect(si).NotTo(BeNil(), missing) + + serviceImport = si + + if verify != nil { + verify(g, serviceImport) + } + + // The final run succeeded so cancel any prior non-conformance reported. + cancelNonConformanceReport() + }).Within(20 * time.Second).WithPolling(100 * time.Millisecond).Should(Succeed()) return serviceImport } diff --git a/conformance/service_import.go b/conformance/service_import.go index 4284d6d..ac44bd6 100644 --- a/conformance/service_import.go +++ b/conformance/service_import.go @@ -47,14 +47,15 @@ func testGeneralServiceImport() { t.createServiceExport(&clients[0], helloServiceExport) }) - assertHasKeyValues := func(actual, expected map[string]string) { + assertHasKeyValues := func(g Gomega, actual, expected map[string]string) { for k, v := range expected { - Expect(actual).To(HaveKeyWithValue(k, v), reportNonConformant("")) + g.Expect(actual).To(HaveKeyWithValue(k, v), reportNonConformant("")) } } - assertNotHasKeyValues := func(actual, expected map[string]string) { + + assertNotHasKeyValues := func(g Gomega, actual, expected map[string]string) { for k, v := range expected { - Expect(actual).ToNot(HaveKeyWithValue(k, v), reportNonConformant("")) + g.Expect(actual).ToNot(HaveKeyWithValue(k, v), reportNonConformant("")) } } @@ -66,10 +67,7 @@ func testGeneralServiceImport() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/blob/master/keps/sig-multicluster/1645-multi-cluster-services-api/README.md#importing-services") - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return true - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") + t.awaitServiceImport(&clients[0], t.helloService.Name, false, nil) By(fmt.Sprintf("Exporting the service on the second cluster %q", clients[1].name)) @@ -109,16 +107,14 @@ func testGeneralServiceImport() { Label(OptionalLabel), Label(ExportedLabelsLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#labels-and-annotations") - serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Labels) > 0 - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") + t.awaitServiceImport(&clients[0], helloServiceName, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + assertHasKeyValues(g, serviceImport.Annotations, helloServiceExport.Annotations) + assertNotHasKeyValues(g, serviceImport.Annotations, t.helloService.Annotations) - assertHasKeyValues(serviceImport.Annotations, helloServiceExport.Annotations) - assertNotHasKeyValues(serviceImport.Annotations, t.helloService.Annotations) - - assertHasKeyValues(serviceImport.Labels, helloServiceExport.Labels) - assertNotHasKeyValues(serviceImport.Labels, t.helloService.Labels) + assertHasKeyValues(g, serviceImport.Labels, helloServiceExport.Labels) + assertNotHasKeyValues(g, serviceImport.Labels, t.helloService.Labels) + }) }) }) @@ -141,16 +137,14 @@ func testGeneralServiceImport() { t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Labels) > 0 - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") - - assertHasKeyValues(serviceImport.Annotations, helloServiceExport.Annotations) - assertNotHasKeyValues(serviceImport.Annotations, tt.helloServiceExport2.Annotations) + t.awaitServiceImport(&clients[0], t.helloService.Name, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + assertHasKeyValues(g, serviceImport.Annotations, helloServiceExport.Annotations) + assertNotHasKeyValues(g, serviceImport.Annotations, tt.helloServiceExport2.Annotations) - assertHasKeyValues(serviceImport.Labels, helloServiceExport.Labels) - assertNotHasKeyValues(serviceImport.Labels, tt.helloServiceExport2.Labels) + assertHasKeyValues(g, serviceImport.Labels, helloServiceExport.Labels) + assertNotHasKeyValues(g, serviceImport.Labels, tt.helloServiceExport2.Labels) + }) }) }) }) @@ -173,9 +167,7 @@ func testClusterIPServiceImport() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#importing-services") for i := range clients { - serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, nil) - Expect(serviceImport).NotTo(BeNil(), reportNonConformant(fmt.Sprintf("ServiceImport was not found on cluster %q", - clients[i].name))) + serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil) Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.ClusterSetIP), reportNonConformant( fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type))) @@ -195,24 +187,22 @@ func testClusterIPServiceImport() { Label(RequiredLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#session-affinity") - serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, nil) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") + t.awaitServiceImport(&clients[0], helloServiceName, false, func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(serviceImport.Spec.SessionAffinity).To(Equal(t.helloService.Spec.SessionAffinity), reportNonConformant("")) - Expect(serviceImport.Spec.SessionAffinity).To(Equal(t.helloService.Spec.SessionAffinity), reportNonConformant("")) - - Expect(serviceImport.Spec.SessionAffinityConfig).To(Equal(t.helloService.Spec.SessionAffinityConfig), reportNonConformant( - "The SessionAffinityConfig of the ServiceImport does not match the exported Service's SessionAffinityConfig")) + g.Expect(serviceImport.Spec.SessionAffinityConfig).To(Equal(t.helloService.Spec.SessionAffinityConfig), reportNonConformant( + "The SessionAffinityConfig of the ServiceImport does not match the exported Service's SessionAffinityConfig")) + }) }) Specify("An IP should be allocated for a ClusterSetIP ServiceImport", Label(RequiredLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#clustersetip") - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Spec.IPs) > 0 - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") + serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), reportNonConformant("")) + }) - Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), reportNonConformant("")) Expect(net.ParseIP(serviceImport.Spec.IPs[0])).ToNot(BeNil(), reportNonConformant(fmt.Sprintf("The value %q is not a valid IP", serviceImport.Spec.IPs[0]))) }) @@ -220,12 +210,9 @@ func testClusterIPServiceImport() { Specify("The ports for a ClusterSetIP ServiceImport should match those of the exported service", Label(RequiredLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-port") - serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Spec.Ports) > 0 + t.awaitServiceImport(&clients[0], helloServiceName, false, func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), reportNonConformant("")) }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") - - Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), reportNonConformant("")) }) Context("A ClusterIP service exported on two clusters", func() { @@ -246,13 +233,11 @@ func testClusterIPServiceImport() { Specify("should expose the union of the constituent service ports", Label(RequiredLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-port") - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Spec.Ports) == 3 - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") - - Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts( - append(t.helloService.Spec.Ports, tt.helloService2.Spec.Ports[1]))), reportNonConformant("")) + t.awaitServiceImport(&clients[0], t.helloService.Name, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts( + append(t.helloService.Spec.Ports, tt.helloService2.Spec.Ports[1]))), reportNonConformant("")) + }) }) }) @@ -268,13 +253,11 @@ func testClusterIPServiceImport() { t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Spec.Ports) == len(t.helloService.Spec.Ports) - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") - - Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), - reportNonConformant("The service ports were not resolved correctly")) + t.awaitServiceImport(&clients[0], t.helloService.Name, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), + reportNonConformant("The service ports were not resolved correctly")) + }) }) }) }) @@ -296,9 +279,7 @@ func testHeadlessServiceImport() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-types") for i := range clients { - serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, nil) - Expect(serviceImport).NotTo(BeNil(), reportNonConformant(fmt.Sprintf("ServiceImport was not found on cluster %q", - clients[i].name))) + serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil) Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.Headless), reportNonConformant( fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type))) @@ -317,8 +298,7 @@ func testHeadlessServiceImport() { Specify("No clusterset IP should be allocated for a Headless ServiceImport", Label(RequiredLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#clustersetip") - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, nil) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") + t.awaitServiceImport(&clients[0], t.helloService.Name, false, nil) Consistently(func() []string { return t.getServiceImport(&clients[0], t.helloService.Name).Spec.IPs From 01c38bd45bf6d699864f7d67ec98b571e0df50e5 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Mon, 23 Jun 2025 09:59:46 -0400 Subject: [PATCH 2/2] Add conformance test to verify resolution of conflicting service types Fixes https://github.com/kubernetes-sigs/mcs-api/issues/92 Signed-off-by: Tom Pantelis --- conformance/service_import.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/conformance/service_import.go b/conformance/service_import.go index ac44bd6..0637630 100644 --- a/conformance/service_import.go +++ b/conformance/service_import.go @@ -33,6 +33,7 @@ var ( _ = Describe("", Label(ClusterIPLabel), testClusterIPServiceImport) _ = Describe("", Label(HeadlessLabel), testHeadlessServiceImport) _ = Describe("", Label(ExternalNameLabel), testExternalNameService) + _ = Describe("", testServiceTypeConflict) ) func testGeneralServiceImport() { @@ -326,3 +327,30 @@ func testExternalNameService() { "the ServiceImport should not exist for an ExternalName service") }) } + +func testServiceTypeConflict() { + t := newTwoClusterTestDriver(newTestDriver()) + + BeforeEach(func() { + t.helloService2.Spec.ClusterIP = corev1.ClusterIPNone + }) + + JustBeforeEach(func() { + t.createServiceExport(&clients[0], newHelloServiceExport()) + }) + + Specify("A service exported on two clusters with conflicting headlessness should apply the conflict resolution policy and "+ + "report a Conflict condition on the ServiceExport", Label(RequiredLabel), func() { + AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#headlessness") + + t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) + t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) + + for i := range clients { + serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil) + + Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.ClusterSetIP), reportNonConformant( + fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type))) + } + }) +}