@@ -3,247 +3,200 @@ package workers
33import (
44 "context"
55 "errors"
6- "os"
7- "os/signal"
86 "sync"
9- "syscall"
107 "time"
118)
129
13- var defaultWatchSignals = []os.Signal {syscall .SIGINT , syscall .SIGTERM , syscall .SIGKILL }
14-
15- // Worker Contains the work function. Allows an input and output to a channel or another worker for pipeline work.
16- // Return nil if you want the Runner to continue otherwise any error will cause the Runner to shutdown and return the
17- // error.
10+ // Worker Contains the work function.
11+ // Work() get input and could put in outChan for followers.
12+ // ⚠️ outChan could be closed if follower is stoped before producer.
13+ // error returned can be process by afterFunc but will be ignored by default.
1814type Worker interface {
19- Work (in interface {}, out chan <- interface {}) error
20- }
21-
22- // Runner Handles the running the Worker logic.
23- type Runner interface {
24- BeforeFunc (func (ctx context.Context ) error ) Runner
25- AfterFunc (func (ctx context.Context , err error ) error ) Runner
26- SetDeadline (t time.Time ) Runner
27- SetTimeout (duration time.Duration ) Runner
28- SetFollower ()
29- Send (in interface {})
30- InFrom (w ... Runner ) Runner
31- SetOut (chan interface {})
32- Start () Runner
33- Stop () chan error
34- Wait () error
15+ Work (ctx context.Context , in interface {}, out chan <- interface {}) error
3516}
3617
37- type runner struct {
38- ctx context.Context
39- cancel context.CancelFunc
40- inChan chan interface {}
41- outChan chan interface {}
42- errChan chan error
43- signalChan chan os. Signal
44- limiter chan struct {}
18+ type Runner struct {
19+ ctx context.Context
20+ cancel context.CancelFunc
21+ inputCtx context. Context
22+ inputCancel context. CancelFunc
23+ inChan chan interface {}
24+ outChan chan interface {}
25+ limiter chan struct {}
4526
4627 afterFunc func (ctx context.Context , err error ) error
47- workFunc func (in interface {}, out chan <- interface {}) error
28+ workFunc func (ctx context. Context , in interface {}, out chan <- interface {}) error
4829 beforeFunc func (ctx context.Context ) error
4930
50- timeout time.Duration
51- deadline time.Duration
52-
53- isLeader bool
54- stopCalled bool
31+ timeout time.Duration
5532
5633 numWorkers int64
57- lock * sync.RWMutex
58- wg * sync.WaitGroup
59- done * sync.Once
60- once * sync.Once
34+ started * sync.Once
35+ done chan struct {}
6136}
6237
6338// NewRunner Factory function for a new Runner. The Runner will handle running the workers logic.
64- func NewRunner (ctx context.Context , w Worker , numWorkers int64 ) Runner {
65- var runnerCtx , runnerCancel = context . WithCancel (ctx )
66- var runner = & runner {
67- ctx : runnerCtx ,
68- cancel : runnerCancel ,
69- inChan : make ( chan interface {}, numWorkers ),
70- outChan : nil ,
71- errChan : make ( chan error , 1 ) ,
72- signalChan : make ( chan os. Signal , 1 ) ,
73- limiter : make ( chan struct {}, numWorkers ) ,
74- afterFunc : func ( ctx context. Context , err error ) error { return err } ,
75- workFunc : w . Work ,
76- beforeFunc : func ( ctx context. Context ) error { return nil } ,
77- numWorkers : numWorkers ,
78- isLeader : true ,
79- lock : new (sync. RWMutex ) ,
80- wg : new (sync. WaitGroup ) ,
81- once : new (sync.Once ),
82- done : new (sync. Once ),
39+ func NewRunner (ctx context.Context , w Worker , numWorkers int64 , buffer int64 ) * Runner {
40+ // runnerCtx, runnerCancel := signal.NotifyContext (ctx, defaultWatchSignals... )
41+ runnerCtx , runnerCancel := context . WithCancel ( ctx )
42+ inputCtx , inputCancel := context . WithCancel ( runnerCtx )
43+
44+ runner := & Runner {
45+ ctx : runnerCtx ,
46+ cancel : runnerCancel ,
47+ inputCtx : inputCtx ,
48+ inputCancel : inputCancel ,
49+ inChan : make ( chan interface {}, buffer ) ,
50+ outChan : nil ,
51+ limiter : make ( chan struct {}, numWorkers ) ,
52+ afterFunc : func ( ctx context. Context , err error ) error { return nil } ,
53+ workFunc : w . Work ,
54+ beforeFunc : func ( ctx context. Context ) error { return nil } ,
55+ numWorkers : numWorkers ,
56+ started : new (sync.Once ),
57+ done : make ( chan struct {} ),
8358 }
84- runner .waitForSignal (defaultWatchSignals ... )
8559 return runner
8660}
8761
88- // Send Send an object to the worker for processing.
89- func (r * runner ) Send (in interface {}) {
62+ var ErrInputClosed = errors .New ("input closed" )
63+
64+ // Send Send an object to the worker for processing if context is not Done.
65+ func (r * Runner ) Send (in interface {}) error {
9066 select {
91- case <- r .ctx .Done ():
92- return
67+ case <- r .inputCtx .Done ():
68+ return ErrInputClosed
9369 case r .inChan <- in :
9470 }
71+ return nil
9572}
9673
9774// InFrom Set a worker to accept output from another worker(s).
98- func (r * runner ) InFrom (w ... Runner ) Runner {
99- r .SetFollower ()
75+ func (r * Runner ) InFrom (w ... * Runner ) * Runner {
10076 for _ , wr := range w {
101- wr .SetOut (r .inChan )
77+ // in := make(chan interface{})
78+ // go func(in chan interface{}) {
79+ // for msg := range in {
80+ // if err := r.Send(msg); err != nil {
81+ // return
82+ // }
83+ // }
84+ // }(in)
85+ wr .SetOut (r .inChan ) // nolint
10286 }
10387 return r
10488}
10589
106- // SetFollower Sets the worker as a follower and does not need to close it's in channel.
107- func (r * runner ) SetFollower () {
108- r .lock .Lock ()
109- r .isLeader = false
110- r .lock .Unlock ()
111- }
112-
113- // Start Starts the worker on processing.
114- func (r * runner ) Start () Runner {
115- r .startWork ()
116- return r
90+ // Start execute beforeFunc and launch worker processing.
91+ func (r * Runner ) Start () error {
92+ r .started .Do (func () {
93+ if err := r .beforeFunc (r .ctx ); err == nil {
94+ go r .work ()
95+ }
96+ })
97+ return nil
11798}
11899
119100// BeforeFunc Function to be run before worker starts processing.
120- func (r * runner ) BeforeFunc (f func (ctx context.Context ) error ) Runner {
101+ func (r * Runner ) BeforeFunc (f func (ctx context.Context ) error ) * Runner {
121102 r .beforeFunc = f
122103 return r
123104}
124105
125106// AfterFunc Function to be run after worker has stopped.
126- func (r * runner ) AfterFunc (f func (ctx context.Context , err error ) error ) Runner {
107+ // It can be used for logging and error management.
108+ // input can be retreive with context value:
109+ // ctx.Value(workers.InputKey{})
110+ // ⚠️ If an error is returned it stop Runner execution.
111+ func (r * Runner ) AfterFunc (f func (ctx context.Context , err error ) error ) * Runner {
127112 r .afterFunc = f
128113 return r
129114}
130115
116+ var ErrOutAlready = errors .New ("out already set" )
117+
131118// SetOut Allows the setting of a workers out channel, if not already set.
132- func (r * runner ) SetOut (c chan interface {}) {
119+ func (r * Runner ) SetOut (c chan interface {}) error {
133120 if r .outChan != nil {
134- return
121+ return ErrOutAlready
135122 }
136123 r .outChan = c
124+ return nil
137125}
138126
139- // SetDeadline allows a time to be set when the workers should stop.
140- // Deadline needs to be handled by the IsDone method.
141- func (r * runner ) SetDeadline (t time.Time ) Runner {
142- r .lock .Lock ()
143- defer r .lock .Unlock ()
127+ // SetDeadline allows a time to be set when the Runner should stop.
128+ // ⚠️ Should only be called before Start
129+ func (r * Runner ) SetDeadline (t time.Time ) * Runner {
144130 r .ctx , r .cancel = context .WithDeadline (r .ctx , t )
145131 return r
146132}
147133
148- // SetTimeout allows a time duration to be set when the workers should stop.
149- // Timeout needs to be handled by the IsDone method.
150- func (r * runner ) SetTimeout (duration time.Duration ) Runner {
151- r .lock .Lock ()
152- defer r .lock .Unlock ()
134+ // SetWorkerTimeout allows a time duration to be set when the workers should stop.
135+ // ⚠️ Should only be called before Start
136+ func (r * Runner ) SetWorkerTimeout (duration time.Duration ) * Runner {
153137 r .timeout = duration
154138 return r
155139}
156140
157- // Wait calls stop on workers and waits for the channel to drain.
158- // !!Should only be called when certain nothing will send to worker.
159- func (r * runner ) Wait () error {
160- r .waitForDrain ()
161- if err := <- r .Stop (); err != nil && ! errors .Is (err , context .Canceled ) {
162- return err
141+ // Wait close the input channel and waits it to drain and process.
142+ func (r * Runner ) Wait () * Runner {
143+ if r .inputCtx .Err () == nil {
144+ r .inputCancel ()
145+ close (r .inChan )
163146 }
164- return nil
165- }
166147
167- // Stop Stops the processing of a worker and closes it's channel in.
168- // Returns a blocking channel with type error.
169- // !!Should only be called when certain nothing will send to worker.
170- func (r * runner ) Stop () chan error {
171- r .done .Do (func () {
172- if r .inChan != nil && r .isLeader {
173- close (r .inChan )
174- }
175- })
176- return r .errChan
148+ <- r .done
149+
150+ return r
177151}
178152
179- // IsDone returns a channel signaling the workers context has been canceled.
180- func (r * runner ) IsDone () <- chan struct {} {
181- return r .ctx .Done ()
153+ // Stop Stops the processing of a worker and waits for workers to finish.
154+ func (r * Runner ) Stop () * Runner {
155+ r .cancel ()
156+ r .Wait ()
157+ return r
182158}
183159
184- // waitForSignal make sure we wait for a term signal and shutdown correctly
185- func (r * runner ) waitForSignal (signals ... os.Signal ) {
186- go func () {
187- signal .Notify (r .signalChan , signals ... )
188- <- r .signalChan
189- if r .cancel != nil {
190- r .cancel ()
191- }
160+ type InputKey struct {}
161+
162+ // work starts processing input and limits worker instance number.
163+ func (r * Runner ) work () {
164+ var wg sync.WaitGroup
165+
166+ defer func () {
167+ wg .Wait ()
168+ r .cancel ()
169+ close (r .done )
192170 }()
193- }
194171
195- // waitForDrain Waits for the limiter to be zeroed out and the in channel to be empty.
196- func (r * runner ) waitForDrain () {
197- for len (r .limiter ) > 0 || len (r .inChan ) > 0 {
198- // Wait for the drain.
199- }
200- }
172+ for {
173+ select {
174+ case <- r .ctx .Done ():
175+ return
176+ case input , open := <- r .inChan :
177+ if ! open {
178+ return
179+ }
180+ wg .Add (1 )
201181
202- // startWork Runs the before function and starts processing until one of three things happen.
203- // 1. A term signal is received or cancellation of context.
204- // 2. Stop function is called.
205- // 3. Worker returns an error.
206- func (r * runner ) startWork () {
207- var err error
208- if err = r .beforeFunc (r .ctx ); err != nil {
209- r .errChan <- err
210- return
211- }
212- if r .timeout > 0 {
213- r .ctx , r .cancel = context .WithTimeout (r .ctx , r .timeout )
214- }
215- r .wg .Add (1 )
216- go func () {
217- var workerWG = new (sync.WaitGroup )
218- var closeOnce = new (sync.Once )
219-
220- // write out error if not nil on exit.
221- defer func () {
222- workerWG .Wait ()
223- r .errChan <- err
224- closeOnce .Do (func () {
225- if r .outChan != nil {
226- close (r .outChan )
227- }
228- })
229- r .wg .Done ()
230- }()
231- for in := range r .inChan {
232- input := in
233182 r .limiter <- struct {}{}
234- workerWG .Add (1 )
183+
184+ inputCtx := context .WithValue (r .ctx , InputKey {}, input )
185+ workCtx , workCancel := context .WithCancel (inputCtx )
186+ if r .timeout > 0 {
187+ workCtx , workCancel = context .WithTimeout (inputCtx , r .timeout )
188+ }
189+
235190 go func () {
236191 defer func () {
237192 <- r .limiter
238- workerWG .Done ()
193+ workCancel ()
194+ wg .Done ()
239195 }()
240- if err := r .afterFunc (r .ctx , r .workFunc (input , r .outChan )); err != nil {
241- r .once .Do (func () {
242- r .errChan <- err
243- r .cancel ()
244- })
196+ if err := r .afterFunc (inputCtx , r .workFunc (workCtx , input , r .outChan )); err != nil {
197+ r .cancel ()
245198 }
246199 }()
247200 }
248- }()
201+ }
249202}
0 commit comments