Skip to content

Commit 20de0eb

Browse files
authored
Merge pull request #111 from tpantelis/svc_type_conflict
Add conformance test to verify resolution of conflicting service types
2 parents a23ff80 + 01c38bd commit 20de0eb

File tree

3 files changed

+96
-81
lines changed

3 files changed

+96
-81
lines changed

conformance/clusterip_service_dns.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@ var _ = Describe("", Label(OptionalLabel, DNSLabel, ClusterIPLabel), func() {
4444

4545
serviceImports := []*v1alpha1.ServiceImport{}
4646
for _, client := range clients {
47-
serviceImport := t.awaitServiceImport(&client, t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
48-
return len(serviceImport.Spec.IPs) > 0
49-
})
50-
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found on cluster %q", client.name)
51-
Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), "ServiceImport on cluster %q does not contain an IP", client.name)
47+
serviceImport := t.awaitServiceImport(&client, t.helloService.Name, false,
48+
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
49+
g.Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), "ServiceImport on cluster %q does not contain an IP", client.name)
50+
})
5251
serviceImports = append(serviceImports, serviceImport)
5352
}
5453

conformance/conformance_suite.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
apierrors "k8s.io/apimachinery/pkg/api/errors"
3838
"k8s.io/apimachinery/pkg/api/meta"
3939
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
40-
"k8s.io/apimachinery/pkg/util/wait"
4140
"k8s.io/client-go/kubernetes"
4241
rest "k8s.io/client-go/rest"
4342
"k8s.io/client-go/tools/clientcmd"
@@ -227,22 +226,31 @@ func (t *testDriver) getServiceImport(c *clusterClients, name string) *v1alpha1.
227226
return si
228227
}
229228

230-
func (t *testDriver) awaitServiceImport(c *clusterClients, name string, verify func(*v1alpha1.ServiceImport) bool) *v1alpha1.ServiceImport {
229+
func (t *testDriver) awaitServiceImport(c *clusterClients, name string, reportNonConformanceOnMissing bool,
230+
verify func(Gomega, *v1alpha1.ServiceImport)) *v1alpha1.ServiceImport {
231231
var serviceImport *v1alpha1.ServiceImport
232232

233-
_ = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond,
234-
20*time.Second, true, func(ctx context.Context) (bool, error) {
235-
defer GinkgoRecover()
233+
Eventually(func(g Gomega) {
234+
si := t.getServiceImport(c, name)
236235

237-
si := t.getServiceImport(c, name)
238-
if si == nil {
239-
return false, nil
240-
}
236+
missingMsg := fmt.Sprintf("ServiceImport was not found on cluster %q", c.name)
241237

242-
serviceImport = si
238+
var missing any = missingMsg
239+
if reportNonConformanceOnMissing {
240+
missing = reportNonConformant(missingMsg)
241+
}
243242

244-
return verify == nil || verify(serviceImport), nil
245-
})
243+
g.Expect(si).NotTo(BeNil(), missing)
244+
245+
serviceImport = si
246+
247+
if verify != nil {
248+
verify(g, serviceImport)
249+
}
250+
251+
// The final run succeeded so cancel any prior non-conformance reported.
252+
cancelNonConformanceReport()
253+
}).Within(20 * time.Second).WithPolling(100 * time.Millisecond).Should(Succeed())
246254

247255
return serviceImport
248256
}

conformance/service_import.go

Lines changed: 72 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ var (
3333
_ = Describe("", Label(ClusterIPLabel), testClusterIPServiceImport)
3434
_ = Describe("", Label(HeadlessLabel), testHeadlessServiceImport)
3535
_ = Describe("", Label(ExternalNameLabel), testExternalNameService)
36+
_ = Describe("", testServiceTypeConflict)
3637
)
3738

3839
func testGeneralServiceImport() {
@@ -47,14 +48,15 @@ func testGeneralServiceImport() {
4748
t.createServiceExport(&clients[0], helloServiceExport)
4849
})
4950

50-
assertHasKeyValues := func(actual, expected map[string]string) {
51+
assertHasKeyValues := func(g Gomega, actual, expected map[string]string) {
5152
for k, v := range expected {
52-
Expect(actual).To(HaveKeyWithValue(k, v), reportNonConformant(""))
53+
g.Expect(actual).To(HaveKeyWithValue(k, v), reportNonConformant(""))
5354
}
5455
}
55-
assertNotHasKeyValues := func(actual, expected map[string]string) {
56+
57+
assertNotHasKeyValues := func(g Gomega, actual, expected map[string]string) {
5658
for k, v := range expected {
57-
Expect(actual).ToNot(HaveKeyWithValue(k, v), reportNonConformant(""))
59+
g.Expect(actual).ToNot(HaveKeyWithValue(k, v), reportNonConformant(""))
5860
}
5961
}
6062

@@ -66,10 +68,7 @@ func testGeneralServiceImport() {
6668

6769
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/blob/master/keps/sig-multicluster/1645-multi-cluster-services-api/README.md#importing-services")
6870

69-
serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
70-
return true
71-
})
72-
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
71+
t.awaitServiceImport(&clients[0], t.helloService.Name, false, nil)
7372

7473
By(fmt.Sprintf("Exporting the service on the second cluster %q", clients[1].name))
7574

@@ -109,16 +108,14 @@ func testGeneralServiceImport() {
109108
Label(OptionalLabel), Label(ExportedLabelsLabel), func() {
110109
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#labels-and-annotations")
111110

112-
serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, func(serviceImport *v1alpha1.ServiceImport) bool {
113-
return len(serviceImport.Labels) > 0
114-
})
115-
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
111+
t.awaitServiceImport(&clients[0], helloServiceName, false,
112+
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
113+
assertHasKeyValues(g, serviceImport.Annotations, helloServiceExport.Annotations)
114+
assertNotHasKeyValues(g, serviceImport.Annotations, t.helloService.Annotations)
116115

117-
assertHasKeyValues(serviceImport.Annotations, helloServiceExport.Annotations)
118-
assertNotHasKeyValues(serviceImport.Annotations, t.helloService.Annotations)
119-
120-
assertHasKeyValues(serviceImport.Labels, helloServiceExport.Labels)
121-
assertNotHasKeyValues(serviceImport.Labels, t.helloService.Labels)
116+
assertHasKeyValues(g, serviceImport.Labels, helloServiceExport.Labels)
117+
assertNotHasKeyValues(g, serviceImport.Labels, t.helloService.Labels)
118+
})
122119
})
123120
})
124121

@@ -141,16 +138,14 @@ func testGeneralServiceImport() {
141138
t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)
142139
t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)
143140

144-
serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
145-
return len(serviceImport.Labels) > 0
146-
})
147-
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
148-
149-
assertHasKeyValues(serviceImport.Annotations, helloServiceExport.Annotations)
150-
assertNotHasKeyValues(serviceImport.Annotations, tt.helloServiceExport2.Annotations)
141+
t.awaitServiceImport(&clients[0], t.helloService.Name, false,
142+
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
143+
assertHasKeyValues(g, serviceImport.Annotations, helloServiceExport.Annotations)
144+
assertNotHasKeyValues(g, serviceImport.Annotations, tt.helloServiceExport2.Annotations)
151145

152-
assertHasKeyValues(serviceImport.Labels, helloServiceExport.Labels)
153-
assertNotHasKeyValues(serviceImport.Labels, tt.helloServiceExport2.Labels)
146+
assertHasKeyValues(g, serviceImport.Labels, helloServiceExport.Labels)
147+
assertNotHasKeyValues(g, serviceImport.Labels, tt.helloServiceExport2.Labels)
148+
})
154149
})
155150
})
156151
})
@@ -173,9 +168,7 @@ func testClusterIPServiceImport() {
173168
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#importing-services")
174169

175170
for i := range clients {
176-
serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, nil)
177-
Expect(serviceImport).NotTo(BeNil(), reportNonConformant(fmt.Sprintf("ServiceImport was not found on cluster %q",
178-
clients[i].name)))
171+
serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil)
179172

180173
Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.ClusterSetIP), reportNonConformant(
181174
fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type)))
@@ -195,37 +188,32 @@ func testClusterIPServiceImport() {
195188
Label(RequiredLabel), func() {
196189
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#session-affinity")
197190

198-
serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, nil)
199-
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
191+
t.awaitServiceImport(&clients[0], helloServiceName, false, func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
192+
g.Expect(serviceImport.Spec.SessionAffinity).To(Equal(t.helloService.Spec.SessionAffinity), reportNonConformant(""))
200193

201-
Expect(serviceImport.Spec.SessionAffinity).To(Equal(t.helloService.Spec.SessionAffinity), reportNonConformant(""))
202-
203-
Expect(serviceImport.Spec.SessionAffinityConfig).To(Equal(t.helloService.Spec.SessionAffinityConfig), reportNonConformant(
204-
"The SessionAffinityConfig of the ServiceImport does not match the exported Service's SessionAffinityConfig"))
194+
g.Expect(serviceImport.Spec.SessionAffinityConfig).To(Equal(t.helloService.Spec.SessionAffinityConfig), reportNonConformant(
195+
"The SessionAffinityConfig of the ServiceImport does not match the exported Service's SessionAffinityConfig"))
196+
})
205197
})
206198

207199
Specify("An IP should be allocated for a ClusterSetIP ServiceImport", Label(RequiredLabel), func() {
208200
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#clustersetip")
209201

210-
serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
211-
return len(serviceImport.Spec.IPs) > 0
212-
})
213-
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
202+
serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, false,
203+
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
204+
g.Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), reportNonConformant(""))
205+
})
214206

215-
Expect(serviceImport.Spec.IPs).ToNot(BeEmpty(), reportNonConformant(""))
216207
Expect(net.ParseIP(serviceImport.Spec.IPs[0])).ToNot(BeNil(),
217208
reportNonConformant(fmt.Sprintf("The value %q is not a valid IP", serviceImport.Spec.IPs[0])))
218209
})
219210

220211
Specify("The ports for a ClusterSetIP ServiceImport should match those of the exported service", Label(RequiredLabel), func() {
221212
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-port")
222213

223-
serviceImport := t.awaitServiceImport(&clients[0], helloServiceName, func(serviceImport *v1alpha1.ServiceImport) bool {
224-
return len(serviceImport.Spec.Ports) > 0
214+
t.awaitServiceImport(&clients[0], helloServiceName, false, func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
215+
g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), reportNonConformant(""))
225216
})
226-
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
227-
228-
Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)), reportNonConformant(""))
229217
})
230218

231219
Context("A ClusterIP service exported on two clusters", func() {
@@ -246,13 +234,11 @@ func testClusterIPServiceImport() {
246234
Specify("should expose the union of the constituent service ports", Label(RequiredLabel), func() {
247235
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-port")
248236

249-
serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
250-
return len(serviceImport.Spec.Ports) == 3
251-
})
252-
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
253-
254-
Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(
255-
append(t.helloService.Spec.Ports, tt.helloService2.Spec.Ports[1]))), reportNonConformant(""))
237+
t.awaitServiceImport(&clients[0], t.helloService.Name, false,
238+
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
239+
g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(
240+
append(t.helloService.Spec.Ports, tt.helloService2.Spec.Ports[1]))), reportNonConformant(""))
241+
})
256242
})
257243
})
258244

@@ -268,13 +254,11 @@ func testClusterIPServiceImport() {
268254
t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)
269255
t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)
270256

271-
serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, func(serviceImport *v1alpha1.ServiceImport) bool {
272-
return len(serviceImport.Spec.Ports) == len(t.helloService.Spec.Ports)
273-
})
274-
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
275-
276-
Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)),
277-
reportNonConformant("The service ports were not resolved correctly"))
257+
t.awaitServiceImport(&clients[0], t.helloService.Name, false,
258+
func(g Gomega, serviceImport *v1alpha1.ServiceImport) {
259+
g.Expect(sortMCSPorts(serviceImport.Spec.Ports)).To(Equal(toMCSPorts(t.helloService.Spec.Ports)),
260+
reportNonConformant("The service ports were not resolved correctly"))
261+
})
278262
})
279263
})
280264
})
@@ -296,9 +280,7 @@ func testHeadlessServiceImport() {
296280
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#service-types")
297281

298282
for i := range clients {
299-
serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, nil)
300-
Expect(serviceImport).NotTo(BeNil(), reportNonConformant(fmt.Sprintf("ServiceImport was not found on cluster %q",
301-
clients[i].name)))
283+
serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil)
302284

303285
Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.Headless), reportNonConformant(
304286
fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type)))
@@ -317,8 +299,7 @@ func testHeadlessServiceImport() {
317299
Specify("No clusterset IP should be allocated for a Headless ServiceImport", Label(RequiredLabel), func() {
318300
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#clustersetip")
319301

320-
serviceImport := t.awaitServiceImport(&clients[0], t.helloService.Name, nil)
321-
Expect(serviceImport).NotTo(BeNil(), "ServiceImport was not found")
302+
t.awaitServiceImport(&clients[0], t.helloService.Name, false, nil)
322303

323304
Consistently(func() []string {
324305
return t.getServiceImport(&clients[0], t.helloService.Name).Spec.IPs
@@ -346,3 +327,30 @@ func testExternalNameService() {
346327
"the ServiceImport should not exist for an ExternalName service")
347328
})
348329
}
330+
331+
func testServiceTypeConflict() {
332+
t := newTwoClusterTestDriver(newTestDriver())
333+
334+
BeforeEach(func() {
335+
t.helloService2.Spec.ClusterIP = corev1.ClusterIPNone
336+
})
337+
338+
JustBeforeEach(func() {
339+
t.createServiceExport(&clients[0], newHelloServiceExport())
340+
})
341+
342+
Specify("A service exported on two clusters with conflicting headlessness should apply the conflict resolution policy and "+
343+
"report a Conflict condition on the ServiceExport", Label(RequiredLabel), func() {
344+
AddReportEntry(SpecRefReportEntry, "https://github.com/kubernetes/enhancements/tree/master/keps/sig-multicluster/1645-multi-cluster-services-api#headlessness")
345+
346+
t.awaitServiceExportCondition(&clients[0], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)
347+
t.awaitServiceExportCondition(&clients[1], v1alpha1.ServiceExportConflict, metav1.ConditionTrue)
348+
349+
for i := range clients {
350+
serviceImport := t.awaitServiceImport(&clients[i], helloServiceName, true, nil)
351+
352+
Expect(serviceImport.Spec.Type).To(Equal(v1alpha1.ClusterSetIP), reportNonConformant(
353+
fmt.Sprintf("ServiceImport on cluster %q has type %q", clients[i].name, serviceImport.Spec.Type)))
354+
}
355+
})
356+
}

0 commit comments

Comments
 (0)