Skip to content

Commit a4ba432

Browse files
authored
Merge pull request #14 from ecordell/ctxcancel
queue.Operations: add context cancelation and record error
2 parents 71e2069 + 45eab07 commit a4ba432

File tree

6 files changed

+107
-12
lines changed

6 files changed

+107
-12
lines changed

component/ensure_component.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type Annotator[T any] interface {
2323
// will create a component object and ensure it has the computed spec.
2424
type EnsureComponentByHash[K KubeObject, A Annotator[A]] struct {
2525
*HashableComponent[K]
26-
ctrls *typedctx.Key[queue.Interface]
26+
ctrls queue.OperationsContext
2727
nn typedctx.MustValueContext[types.NamespacedName]
2828
applyObject func(ctx context.Context, apply A) (K, error)
2929
deleteObject func(ctx context.Context, nn types.NamespacedName) error
@@ -36,7 +36,7 @@ var _ handler.ContextHandler = &EnsureComponentByHash[*corev1.Service, *applycor
3636
func NewEnsureComponentByHash[K KubeObject, A Annotator[A]](
3737
component *HashableComponent[K],
3838
owner typedctx.MustValueContext[types.NamespacedName],
39-
ctrls *typedctx.Key[queue.Interface],
39+
ctrls queue.OperationsContext,
4040
applyObj func(ctx context.Context, apply A) (K, error),
4141
deleteObject func(ctx context.Context, nn types.NamespacedName) error,
4242
newObj func(ctx context.Context) A,
@@ -57,7 +57,7 @@ func (e *EnsureComponentByHash[K, A]) Handle(ctx context.Context) {
5757
newObj := e.newObj(ctx)
5858
hash, err := e.Hash(newObj)
5959
if err != nil {
60-
e.ctrls.MustValue(ctx).RequeueErr(err)
60+
e.ctrls.RequeueErr(ctx, err)
6161
return
6262
}
6363
newObj = newObj.WithAnnotations(map[string]string{e.HashAnnotationKey: hash})
@@ -80,7 +80,7 @@ func (e *EnsureComponentByHash[K, A]) Handle(ctx context.Context) {
8080
// apply if no matching KubeObject in cluster
8181
_, err = e.applyObject(ctx, newObj)
8282
if err != nil {
83-
e.ctrls.MustValue(ctx).RequeueErr(err)
83+
e.ctrls.RequeueErr(ctx, err)
8484
return
8585
}
8686
}
@@ -92,7 +92,7 @@ func (e *EnsureComponentByHash[K, A]) Handle(ctx context.Context) {
9292
Namespace: o.GetNamespace(),
9393
Name: o.GetName(),
9494
}); err != nil {
95-
e.ctrls.MustValue(ctx).RequeueErr(err)
95+
e.ctrls.RequeueErr(ctx, err)
9696
return
9797
}
9898
}

component/ensure_component_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func TestEnsureServiceHandler(t *testing.T) {
144144
}),
145145
hash.NewObjectHash(), hashKey),
146146
ctxOwner,
147-
queueOps.Key,
147+
queueOps,
148148
func(ctx context.Context, apply *applycorev1.ServiceApplyConfiguration) (*corev1.Service, error) {
149149
applyCalled = true
150150
return nil, nil

manager/controller.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ import (
3232
"k8s.io/controller-manager/controller"
3333
controllerhealthz "k8s.io/controller-manager/pkg/healthz"
3434

35+
"github.com/go-logr/logr"
36+
3537
"github.com/authzed/controller-idioms/cachekeys"
3638
"github.com/authzed/controller-idioms/queue"
3739
"github.com/authzed/controller-idioms/typed"
38-
"github.com/go-logr/logr"
3940
)
4041

4142
// SyncFunc is a function called when an event needs processing
@@ -162,7 +163,7 @@ func (c *OwnedResourceController) processNext(ctx context.Context) bool {
162163
c.Queue.AddAfter(key, after)
163164
}
164165

165-
ctx = c.OperationsContext.WithValue(ctx, queue.NewOperations(done, requeue))
166+
ctx = c.OperationsContext.WithValue(ctx, queue.NewOperations(done, requeue, cancel))
166167

167168
c.sync(ctx, *gvr, namespace, name)
168169
done()

queue/controls.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import (
2020
"context"
2121
"time"
2222

23-
"github.com/authzed/controller-idioms/typedctx"
2423
"github.com/go-logr/logr"
24+
25+
"github.com/authzed/controller-idioms/typedctx"
2526
)
2627

2728
//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6 -generate
@@ -58,10 +59,15 @@ func (h OperationsContext) RequeueAPIErr(ctx context.Context, err error) {
5859
h.MustValue(ctx).RequeueAPIErr(err)
5960
}
6061

61-
func NewOperations(done func(), requeueAfter func(time.Duration)) *Operations {
62+
func (h OperationsContext) Error(ctx context.Context) error {
63+
return h.MustValue(ctx).Error()
64+
}
65+
66+
func NewOperations(done func(), requeueAfter func(time.Duration), cancel context.CancelFunc) *Operations {
6267
return &Operations{
6368
done: done,
6469
requeueAfter: requeueAfter,
70+
cancel: cancel,
6571
}
6672
}
6773

@@ -74,32 +80,50 @@ type Interface interface {
7480
Requeue()
7581
RequeueErr(err error)
7682
RequeueAPIErr(err error)
83+
Error() error
7784
}
7885

7986
// Operations deals with the current queue key and provides controls for
8087
// requeueing or stopping reconciliation.
8188
type Operations struct {
8289
done func()
8390
requeueAfter func(duration time.Duration)
91+
cancel context.CancelFunc
92+
err error
8493
}
8594

95+
// Done marks the current key as finished. Note that processing should stop
96+
// as soon as possible after calling `Done`, since marking it as done frees the
97+
// queue to potentially process the same key again.
8698
func (c *Operations) Done() {
99+
defer c.cancel()
87100
c.done()
88101
}
89102

103+
// RequeueAfter requeues the current key after duration.
90104
func (c *Operations) RequeueAfter(duration time.Duration) {
105+
defer c.cancel()
91106
c.requeueAfter(duration)
92107
}
93108

109+
// Requeue requeues the current key immediately.
94110
func (c *Operations) Requeue() {
111+
defer c.cancel()
95112
c.requeueAfter(0)
96113
}
97114

115+
// RequeueErr sets err on the object and requeues the current key.
98116
func (c *Operations) RequeueErr(err error) {
117+
defer c.cancel()
118+
c.err = err
99119
c.requeueAfter(0)
100120
}
101121

122+
// RequeueAPIErr checks to see if `err` is a kube api error with retry data.
123+
// If so, it requeues after the wait period, otherwise, it requeues immediately.
102124
func (c *Operations) RequeueAPIErr(err error) {
125+
defer c.cancel()
126+
c.err = err
103127
retry, after := ShouldRetry(err)
104128
if retry && after > 0 {
105129
c.RequeueAfter(after)
@@ -109,3 +133,8 @@ func (c *Operations) RequeueAPIErr(err error) {
109133
}
110134
c.Done()
111135
}
136+
137+
// Error returns the last recorded error, if any
138+
func (c *Operations) Error() error {
139+
return c.err
140+
}

queue/controls_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func ExampleNewOperations() {
2525
queue.Done(key)
2626
}, func(duration time.Duration) {
2727
queue.AddAfter(key, duration)
28-
})
28+
}, cancel)
2929

3030
// typically called from a handler
3131
handler.NewHandlerFromFunc(func(ctx context.Context) {
@@ -57,7 +57,7 @@ func ExampleNewQueueOperationsCtx() {
5757
queue.Done(key)
5858
}, func(duration time.Duration) {
5959
queue.AddAfter(key, duration)
60-
}))
60+
}, cancel))
6161

6262
// queue controls are passed via context
6363
handler.NewHandlerFromFunc(func(ctx context.Context) {

queue/fake/zz_generated.go

Lines changed: 65 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)