Skip to content

Commit daed40d

Browse files
authored
Merge pull request #2126 from kube-logging/feat/es-version-compat
feat: use forked elasticsearch gem
2 parents 6177d98 + 43bf277 commit daed40d

File tree

7 files changed

+1005
-189
lines changed

7 files changed

+1005
-189
lines changed

e2e/common/cond/conditions.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cisco-open/operator-tools/pkg/utils"
2222
"github.com/kube-logging/logging-operator/e2e/common"
2323
"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
24+
appsv1 "k8s.io/api/apps/v1"
2425
corev1 "k8s.io/api/core/v1"
2526
apierrors "k8s.io/apimachinery/pkg/api/errors"
2627
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -191,3 +192,50 @@ func CheckExcessSyslogNGStatus(t *testing.T, c *common.Cluster, ctx *context.Con
191192

192193
return true
193194
}
195+
196+
// DeploymentAvailable returns a condition function that checks if a deployment
197+
// is available with all replicas ready.
198+
func DeploymentAvailable(t *testing.T, c client.Client, ctx *context.Context, namespace, name string) func() bool {
199+
return func() bool {
200+
deployment := &appsv1.Deployment{}
201+
if err := c.Get(*ctx, client.ObjectKey{
202+
Name: name,
203+
Namespace: namespace,
204+
}, deployment); err != nil {
205+
t.Logf("Failed to get deployment %s/%s: %v", namespace, name, err)
206+
return false
207+
}
208+
209+
if deployment.Spec.Replicas == nil {
210+
return false
211+
}
212+
desiredReplicas := *deployment.Spec.Replicas
213+
214+
if deployment.Status.ReadyReplicas != desiredReplicas {
215+
t.Logf("Deployment %s/%s: %d/%d replicas ready",
216+
namespace, name, deployment.Status.ReadyReplicas, desiredReplicas)
217+
return false
218+
}
219+
220+
if deployment.Status.AvailableReplicas != desiredReplicas {
221+
t.Logf("Deployment %s/%s: %d/%d replicas available",
222+
namespace, name, deployment.Status.AvailableReplicas, desiredReplicas)
223+
return false
224+
}
225+
226+
for _, condition := range deployment.Status.Conditions {
227+
if condition.Type == appsv1.DeploymentAvailable {
228+
if condition.Status == corev1.ConditionTrue {
229+
t.Logf("Deployment %s/%s is available", namespace, name)
230+
return true
231+
}
232+
t.Logf("Deployment %s/%s Available condition is %s: %s",
233+
namespace, name, condition.Status, condition.Message)
234+
return false
235+
}
236+
}
237+
238+
t.Logf("Deployment %s/%s has no Available condition", namespace, name)
239+
return false
240+
}
241+
}

e2e/common/helpers.go

Lines changed: 71 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ import (
2020
"os"
2121
"sync/atomic"
2222
"testing"
23+
"time"
2324

2425
"emperror.dev/errors"
2526
"github.com/spf13/cast"
2627
"github.com/stretchr/testify/assert"
27-
v12 "k8s.io/api/core/v1"
28+
corev1 "k8s.io/api/core/v1"
2829
"k8s.io/apimachinery/pkg/api/resource"
29-
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31+
"k8s.io/apimachinery/pkg/util/wait"
3032
"sigs.k8s.io/controller-runtime/pkg/client"
3133

3234
"github.com/kube-logging/logging-operator/pkg/sdk/logging/api/v1beta1"
@@ -78,7 +80,7 @@ func LoggingInfra(
7880
producerLabels map[string]string) {
7981

8082
output := v1beta1.ClusterOutput{
81-
ObjectMeta: v1.ObjectMeta{
83+
ObjectMeta: metav1.ObjectMeta{
8284
Name: "http",
8385
Namespace: nsInfra,
8486
},
@@ -96,7 +98,7 @@ func LoggingInfra(
9698

9799
RequireNoError(t, c.Create(ctx, &output))
98100
flow := v1beta1.ClusterFlow{
99-
ObjectMeta: v1.ObjectMeta{
101+
ObjectMeta: metav1.ObjectMeta{
100102
Name: "flow",
101103
Namespace: nsInfra,
102104
},
@@ -115,7 +117,7 @@ func LoggingInfra(
115117
RequireNoError(t, c.Create(ctx, &flow))
116118

117119
agent := v1beta1.FluentbitAgent{
118-
ObjectMeta: v1.ObjectMeta{
120+
ObjectMeta: metav1.ObjectMeta{
119121
Name: "infra",
120122
},
121123
Spec: v1beta1.FluentbitSpec{
@@ -135,7 +137,7 @@ func LoggingInfra(
135137
RequireNoError(t, c.Create(ctx, &agent))
136138

137139
logging := v1beta1.Logging{
138-
ObjectMeta: v1.ObjectMeta{
140+
ObjectMeta: metav1.ObjectMeta{
139141
Name: "infra",
140142
Labels: map[string]string{
141143
"tenant": "infra",
@@ -158,10 +160,10 @@ func LoggingInfra(
158160
Tag: NodeExporterTag,
159161
},
160162
DisablePvc: true,
161-
Resources: v12.ResourceRequirements{
162-
Requests: v12.ResourceList{
163-
v12.ResourceCPU: resource.MustParse("50m"),
164-
v12.ResourceMemory: resource.MustParse("50M"),
163+
Resources: corev1.ResourceRequirements{
164+
Requests: corev1.ResourceList{
165+
corev1.ResourceCPU: resource.MustParse("50m"),
166+
corev1.ResourceMemory: resource.MustParse("50M"),
165167
},
166168
},
167169
},
@@ -181,7 +183,7 @@ func LoggingTenant(
181183
buffer *output.Buffer,
182184
producerLabels map[string]string) {
183185
output := v1beta1.Output{
184-
ObjectMeta: v1.ObjectMeta{
186+
ObjectMeta: metav1.ObjectMeta{
185187
Name: "http",
186188
Namespace: nsTenant,
187189
},
@@ -197,7 +199,7 @@ func LoggingTenant(
197199

198200
RequireNoError(t, c.Create(ctx, &output))
199201
flow := v1beta1.Flow{
200-
ObjectMeta: v1.ObjectMeta{
202+
ObjectMeta: metav1.ObjectMeta{
201203
Name: "flow",
202204
Namespace: nsTenant,
203205
},
@@ -216,7 +218,7 @@ func LoggingTenant(
216218
RequireNoError(t, c.Create(ctx, &flow))
217219

218220
logging := v1beta1.Logging{
219-
ObjectMeta: v1.ObjectMeta{
221+
ObjectMeta: metav1.ObjectMeta{
220222
Name: "tenant",
221223
Labels: map[string]string{
222224
"tenant": "tenant",
@@ -240,10 +242,10 @@ func LoggingTenant(
240242
Tag: NodeExporterTag,
241243
},
242244
DisablePvc: true,
243-
Resources: v12.ResourceRequirements{
244-
Requests: v12.ResourceList{
245-
v12.ResourceCPU: resource.MustParse("50m"),
246-
v12.ResourceMemory: resource.MustParse("50M"),
245+
Resources: corev1.ResourceRequirements{
246+
Requests: corev1.ResourceList{
247+
corev1.ResourceCPU: resource.MustParse("50m"),
248+
corev1.ResourceMemory: resource.MustParse("50M"),
247249
},
248250
},
249251
},
@@ -254,20 +256,68 @@ func LoggingTenant(
254256

255257
func LoggingRoute(ctx context.Context, t *testing.T, c client.Client) {
256258
ap := v1beta1.LoggingRoute{
257-
ObjectMeta: v1.ObjectMeta{
259+
ObjectMeta: metav1.ObjectMeta{
258260
Name: "tenants",
259261
},
260262
Spec: v1beta1.LoggingRouteSpec{
261263
Source: "infra",
262-
Targets: v1.LabelSelector{
263-
MatchExpressions: []v1.LabelSelectorRequirement{
264+
Targets: metav1.LabelSelector{
265+
MatchExpressions: []metav1.LabelSelectorRequirement{
264266
{
265267
Key: "tenant",
266-
Operator: v1.LabelSelectorOpExists,
268+
Operator: metav1.LabelSelectorOpExists,
267269
},
268270
},
269271
},
270272
},
271273
}
272274
RequireNoError(t, c.Create(ctx, &ap))
273275
}
276+
277+
// WaitForPodReady waits for a pod to be in Running phase and Ready condition
278+
func WaitForPodReady(ctx context.Context, c client.Client, pod *corev1.Pod, pollInterval, pollTimeout time.Duration) error {
279+
return wait.PollUntilContextTimeout(ctx, pollInterval, pollTimeout, true, wait.ConditionWithContextFunc(func(ctx context.Context) (bool, error) {
280+
var updatedPod corev1.Pod
281+
err := c.Get(ctx, client.ObjectKeyFromObject(pod), &updatedPod)
282+
if client.IgnoreNotFound(err) != nil {
283+
return false, fmt.Errorf("failed to get pod status: %w", err)
284+
}
285+
286+
isReady := updatedPod.Status.Phase == corev1.PodRunning
287+
for _, cond := range updatedPod.Status.Conditions {
288+
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
289+
return true, nil
290+
}
291+
}
292+
return isReady, nil
293+
}))
294+
}
295+
296+
// SetupCurlPod creates a curl pod for testing HTTP endpoints and waits for it to be ready
297+
func SetupCurlPod(ctx context.Context, c client.Client, namespace, name string, pollInterval, pollTimeout time.Duration) (*corev1.Pod, error) {
298+
pod := &corev1.Pod{
299+
ObjectMeta: metav1.ObjectMeta{
300+
Name: name,
301+
Namespace: namespace,
302+
},
303+
Spec: corev1.PodSpec{
304+
Containers: []corev1.Container{
305+
{
306+
Name: "curl",
307+
Image: "curlimages/curl:latest",
308+
Command: []string{"sleep", "3600"},
309+
},
310+
},
311+
},
312+
}
313+
314+
if err := c.Create(ctx, pod); err != nil {
315+
return nil, fmt.Errorf("failed to create curl pod: %w", err)
316+
}
317+
318+
if err := WaitForPodReady(ctx, c, pod, pollInterval, pollTimeout); err != nil {
319+
return nil, fmt.Errorf("failed to wait for curl pod to be ready: %w", err)
320+
}
321+
322+
return pod, nil
323+
}

0 commit comments

Comments
 (0)