diff --git a/conformance/clusterip_service_dns.go b/conformance/clusterip_service_dns.go index c239782..82d940d 100644 --- a/conformance/clusterip_service_dns.go +++ b/conformance/clusterip_service_dns.go @@ -44,11 +44,10 @@ var _ = Describe("", Label(OptionalLabel, DNSLabel, ClusterIPLabel), func() { serviceImports := []*v1alpha1.ServiceImport{} for _, client := range clients { - serviceImport := t.awaitServiceImport(&client, t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Spec.IPs) > 0 - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found on cluster %q", client.name) - Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), "ServiceImport on cluster %q does not contain an IP", client.name) + serviceImport := t.awaitServiceImport(&client, t.helloService.Name, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), "ServiceImport on cluster %q does not contain an IP", client.name) + }) serviceImports = append(serviceImports, serviceImport) } diff --git a/conformance/conformance_suite.go b/conformance/conformance_suite.go index 5d97f31..ccb9701 100644 --- a/conformance/conformance_suite.go +++ b/conformance/conformance_suite.go @@ -37,7 +37,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" rest "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -227,22 +226,31 @@ func (t *testDriver) getServiceImport(c *clusterClients, name string) *v1alpha1. return si } -func (t *testDriver) awaitServiceImport(c *clusterClients, name string, verify func(*v1alpha1.ServiceImport) bool) *v1alpha1.ServiceImport { +func (t *testDriver) awaitServiceImport(c *clusterClients, name string, reportNonConformanceOnMissing bool, + verify func(Gomega, *v1alpha1.ServiceImport)) *v1alpha1.ServiceImport { var serviceImport *v1alpha1.ServiceImport - _ = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, - 20*time.Second, true, func(ctx context.Context) (bool, error) { - defer GinkgoRecover() + Eventually(func(g Gomega) { + si := t.getServiceImport(c, name) - si := t.getServiceImport(c, name) - if si == nil { - return false, nil - } + missingMsg := fmt.Sprintf("ServiceImport was not found on cluster %q", c.name) - serviceImport = si + var missing any = missingMsg + if reportNonConformanceOnMissing { + missing = reportNonConformant(missingMsg) + } - return verify == nil || verify(serviceImport), nil - }) + g.Expect(si).NotTo(BeNil(), missing) + + serviceImport = si + + if verify != nil { + verify(g, serviceImport) + } + + // The final run succeeded so cancel any prior non-conformance reported. + cancelNonConformanceReport() + }).Within(20 * time.Second).WithPolling(100 * time.Millisecond).Should(Succeed()) return serviceImport } diff --git a/conformance/service_import.go b/conformance/service_import.go index 4284d6d..0637630 100644 --- a/conformance/service_import.go +++ b/conformance/service_import.go @@ -33,6 +33,7 @@ var ( _ = Describe("", Label(ClusterIPLabel), testClusterIPServiceImport) _ = Describe("", Label(HeadlessLabel), testHeadlessServiceImport) _ = Describe("", Label(ExternalNameLabel), testExternalNameService) + _ = Describe("", testServiceTypeConflict) ) func testGeneralServiceImport() { @@ -47,14 +48,15 @@ func testGeneralServiceImport() { t.createServiceExport(&clients[0], helloServiceExport) }) - assertHasKeyValues := func(actual, expected map[string]string) { + assertHasKeyValues := func(g Gomega, actual, expected map[string]string) { for k, v := range expected { - Expect(actual).To(HaveKeyWithValue(k, v), reportNonConformant("")) + g.Expect(actual).To(HaveKeyWithValue(k, v), reportNonConformant("")) } } - assertNotHasKeyValues := func(actual, expected map[string]string) { + + assertNotHasKeyValues := func(g Gomega, actual, expected map[string]string) { for k, v := range expected { - Expect(actual).ToNot(HaveKeyWithValue(k, v), reportNonConformant("")) + g.Expect(actual).ToNot(HaveKeyWithValue(k, v), reportNonConformant("")) } } @@ -66,10 +68,7 @@ func testGeneralServiceImport() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/blob/master/keps/sig-multicluster/1645-multi-cluster-services-api/README.md#importing-services") - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return true - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") + t.awaitServiceImport(&clients[0], t.helloService.Name, false, nil) By(fmt.Sprintf("Exporting the service on the second cluster %q", clients[1].name)) @@ -109,16 +108,14 @@ func testGeneralServiceImport() { Label(OptionalLabel), Label(ExportedLabelsLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#labels-and-annotations") - serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Labels) > 0 - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") + t.awaitServiceImport(&clients[0], helloServiceName, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + assertHasKeyValues(g, serviceImport.Annotations, helloServiceExport.Annotations) + assertNotHasKeyValues(g, serviceImport.Annotations, t.helloService.Annotations) - assertHasKeyValues(serviceImport.Annotations, helloServiceExport.Annotations) - assertNotHasKeyValues(serviceImport.Annotations, t.helloService.Annotations) - - assertHasKeyValues(serviceImport.Labels, helloServiceExport.Labels) - assertNotHasKeyValues(serviceImport.Labels, t.helloService.Labels) + assertHasKeyValues(g, serviceImport.Labels, helloServiceExport.Labels) + assertNotHasKeyValues(g, serviceImport.Labels, t.helloService.Labels) + }) }) }) @@ -141,16 +138,14 @@ func testGeneralServiceImport() { t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Labels) > 0 - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") - - assertHasKeyValues(serviceImport.Annotations, helloServiceExport.Annotations) - assertNotHasKeyValues(serviceImport.Annotations, tt.helloServiceExport2.Annotations) + t.awaitServiceImport(&clients[0], t.helloService.Name, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + assertHasKeyValues(g, serviceImport.Annotations, helloServiceExport.Annotations) + assertNotHasKeyValues(g, serviceImport.Annotations, tt.helloServiceExport2.Annotations) - assertHasKeyValues(serviceImport.Labels, helloServiceExport.Labels) - assertNotHasKeyValues(serviceImport.Labels, tt.helloServiceExport2.Labels) + assertHasKeyValues(g, serviceImport.Labels, helloServiceExport.Labels) + assertNotHasKeyValues(g, serviceImport.Labels, tt.helloServiceExport2.Labels) + }) }) }) }) @@ -173,9 +168,7 @@ func testClusterIPServiceImport() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#importing-services") for i := range clients { - serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, nil) - Expect(serviceImport).NotTo(BeNil(), reportNonConformant(fmt.Sprintf("ServiceImport was not found on cluster %q", - clients[i].name))) + serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil) Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.ClusterSetIP), reportNonConformant( fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type))) @@ -195,24 +188,22 @@ func testClusterIPServiceImport() { Label(RequiredLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#session-affinity") - serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, nil) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") + t.awaitServiceImport(&clients[0], helloServiceName, false, func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(serviceImport.Spec.SessionAffinity).To(Equal(t.helloService.Spec.SessionAffinity), reportNonConformant("")) - Expect(serviceImport.Spec.SessionAffinity).To(Equal(t.helloService.Spec.SessionAffinity), reportNonConformant("")) - - Expect(serviceImport.Spec.SessionAffinityConfig).To(Equal(t.helloService.Spec.SessionAffinityConfig), reportNonConformant( - "The SessionAffinityConfig of the ServiceImport does not match the exported Service's SessionAffinityConfig")) + g.Expect(serviceImport.Spec.SessionAffinityConfig).To(Equal(t.helloService.Spec.SessionAffinityConfig), reportNonConformant( + "The SessionAffinityConfig of the ServiceImport does not match the exported Service's SessionAffinityConfig")) + }) }) Specify("An IP should be allocated for a ClusterSetIP ServiceImport", Label(RequiredLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#clustersetip") - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Spec.IPs) > 0 - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") + serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), reportNonConformant("")) + }) - Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), reportNonConformant("")) Expect(net.ParseIP(serviceImport.Spec.IPs[0])).ToNot(BeNil(), reportNonConformant(fmt.Sprintf("The value %q is not a valid IP", serviceImport.Spec.IPs[0]))) }) @@ -220,12 +211,9 @@ func testClusterIPServiceImport() { Specify("The ports for a ClusterSetIP ServiceImport should match those of the exported service", Label(RequiredLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-port") - serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Spec.Ports) > 0 + t.awaitServiceImport(&clients[0], helloServiceName, false, func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), reportNonConformant("")) }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") - - Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), reportNonConformant("")) }) Context("A ClusterIP service exported on two clusters", func() { @@ -246,13 +234,11 @@ func testClusterIPServiceImport() { Specify("should expose the union of the constituent service ports", Label(RequiredLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-port") - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Spec.Ports) == 3 - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") - - Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts( - append(t.helloService.Spec.Ports, tt.helloService2.Spec.Ports[1]))), reportNonConformant("")) + t.awaitServiceImport(&clients[0], t.helloService.Name, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts( + append(t.helloService.Spec.Ports, tt.helloService2.Spec.Ports[1]))), reportNonConformant("")) + }) }) }) @@ -268,13 +254,11 @@ func testClusterIPServiceImport() { t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool { - return len(serviceImport.Spec.Ports) == len(t.helloService.Spec.Ports) - }) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") - - Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), - reportNonConformant("The service ports were not resolved correctly")) + t.awaitServiceImport(&clients[0], t.helloService.Name, false, + func(g Gomega, serviceImport *v1alpha1.ServiceImport) { + g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), + reportNonConformant("The service ports were not resolved correctly")) + }) }) }) }) @@ -296,9 +280,7 @@ func testHeadlessServiceImport() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-types") for i := range clients { - serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, nil) - Expect(serviceImport).NotTo(BeNil(), reportNonConformant(fmt.Sprintf("ServiceImport was not found on cluster %q", - clients[i].name))) + serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil) Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.Headless), reportNonConformant( fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type))) @@ -317,8 +299,7 @@ func testHeadlessServiceImport() { Specify("No clusterset IP should be allocated for a Headless ServiceImport", Label(RequiredLabel), func() { AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#clustersetip") - serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, nil) - Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found") + t.awaitServiceImport(&clients[0], t.helloService.Name, false, nil) Consistently(func() []string { return t.getServiceImport(&clients[0], t.helloService.Name).Spec.IPs @@ -346,3 +327,30 @@ func testExternalNameService() { "the ServiceImport should not exist for an ExternalName service") }) } + +func testServiceTypeConflict() { + t := newTwoClusterTestDriver(newTestDriver()) + + BeforeEach(func() { + t.helloService2.Spec.ClusterIP = corev1.ClusterIPNone + }) + + JustBeforeEach(func() { + t.createServiceExport(&clients[0], newHelloServiceExport()) + }) + + Specify("A service exported on two clusters with conflicting headlessness should apply the conflict resolution policy and "+ + "report a Conflict condition on the ServiceExport", Label(RequiredLabel), func() { + AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#headlessness") + + t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) + t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue) + + for i := range clients { + serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil) + + Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.ClusterSetIP), reportNonConformant( + fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type))) + } + }) +}