@@ -4,246 +4,203 @@ import (
44 "context"
55 "errors"
66 "os"
7- "os/signal"
87 "sync"
98 "syscall"
109 "time"
1110)
1211
13- var defaultWatchSignals = []os.Signal {syscall .SIGINT , syscall .SIGTERM , syscall . SIGKILL }
12+ var defaultWatchSignals = []os.Signal {syscall .SIGINT , syscall .SIGTERM }
1413
15- // Worker Contains the work function. Allows an input and output to a channel or another worker for pipeline work.
16- // Return nil if you want the Runner to continue otherwise any error will cause the Runner to shutdown and return the
17- // error.
14+ // Worker Contains the work function.
15+ // Work() get input and could put in outChan for followers.
16+ // ⚠️ outChan could be closed if follower is stoped before producer.
17+ // error returned can be process by afterFunc but will be ignored by default.
1818type Worker interface {
19- Work (in interface {}, out chan <- interface {}) error
19+ Work (ctx context. Context , in interface {}, out chan <- interface {}) error
2020}
2121
22- // Runner Handles the running the Worker logic.
23- type Runner interface {
24- BeforeFunc (func (ctx context.Context ) error ) Runner
25- AfterFunc (func (ctx context.Context , err error ) error ) Runner
26- SetDeadline (t time.Time ) Runner
27- SetTimeout (duration time.Duration ) Runner
28- SetFollower ()
29- Send (in interface {})
30- InFrom (w ... Runner ) Runner
31- SetOut (chan interface {})
32- Start () Runner
33- Stop () chan error
34- Wait () error
35- }
36-
37- type runner struct {
38- ctx context.Context
39- cancel context.CancelFunc
40- inChan chan interface {}
41- outChan chan interface {}
42- errChan chan error
43- signalChan chan os.Signal
44- limiter chan struct {}
22+ type Runner struct {
23+ ctx context.Context
24+ cancel context.CancelFunc
25+ inputCtx context.Context
26+ inputCancel context.CancelFunc
27+ inChan chan interface {}
28+ outChan chan interface {}
29+ limiter chan struct {}
4530
4631 afterFunc func (ctx context.Context , err error ) error
47- workFunc func (in interface {}, out chan <- interface {}) error
32+ workFunc func (ctx context. Context , in interface {}, out chan <- interface {}) error
4833 beforeFunc func (ctx context.Context ) error
4934
50- timeout time.Duration
51- deadline time.Duration
52-
53- isLeader bool
54- stopCalled bool
35+ timeout time.Duration
5536
5637 numWorkers int64
57- lock * sync.RWMutex
58- wg * sync.WaitGroup
59- done * sync.Once
60- once * sync.Once
38+ started * sync.Once
39+ done chan struct {}
6140}
6241
6342// NewRunner Factory function for a new Runner. The Runner will handle running the workers logic.
64- func NewRunner (ctx context.Context , w Worker , numWorkers int64 ) Runner {
65- var runnerCtx , runnerCancel = context . WithCancel (ctx )
66- var runner = & runner {
67- ctx : runnerCtx ,
68- cancel : runnerCancel ,
69- inChan : make ( chan interface {}, numWorkers ),
70- outChan : nil ,
71- errChan : make ( chan error , 1 ) ,
72- signalChan : make ( chan os. Signal , 1 ) ,
73- limiter : make ( chan struct {}, numWorkers ) ,
74- afterFunc : func ( ctx context. Context , err error ) error { return err } ,
75- workFunc : w . Work ,
76- beforeFunc : func ( ctx context. Context ) error { return nil } ,
77- numWorkers : numWorkers ,
78- isLeader : true ,
79- lock : new (sync. RWMutex ) ,
80- wg : new (sync. WaitGroup ) ,
81- once : new (sync.Once ),
82- done : new (sync. Once ),
43+ func NewRunner (ctx context.Context , w Worker , numWorkers int64 , buffer int64 ) * Runner {
44+ // runnerCtx, runnerCancel := signal.NotifyContext (ctx, defaultWatchSignals... )
45+ runnerCtx , runnerCancel := context . WithCancel ( ctx )
46+ inputCtx , inputCancel := context . WithCancel ( runnerCtx )
47+
48+ runner := & Runner {
49+ ctx : runnerCtx ,
50+ cancel : runnerCancel ,
51+ inputCtx : inputCtx ,
52+ inputCancel : inputCancel ,
53+ inChan : make ( chan interface {}, buffer ) ,
54+ outChan : nil ,
55+ limiter : make ( chan struct {}, numWorkers ) ,
56+ afterFunc : func ( ctx context. Context , err error ) error { return nil } ,
57+ workFunc : w . Work ,
58+ beforeFunc : func ( ctx context. Context ) error { return nil } ,
59+ numWorkers : numWorkers ,
60+ started : new (sync.Once ),
61+ done : make ( chan struct {} ),
8362 }
84- runner .waitForSignal (defaultWatchSignals ... )
8563 return runner
8664}
8765
88- // Send Send an object to the worker for processing.
89- func (r * runner ) Send (in interface {}) {
66+ var ErrInputClosed = errors .New ("input closed" )
67+
68+ // Send Send an object to the worker for processing if context is not Done.
69+ func (r * Runner ) Send (in interface {}) error {
9070 select {
91- case <- r .ctx .Done ():
92- return
71+ case <- r .inputCtx .Done ():
72+ return ErrInputClosed
9373 case r .inChan <- in :
9474 }
75+ return nil
9576}
9677
9778// InFrom Set a worker to accept output from another worker(s).
98- func (r * runner ) InFrom (w ... Runner ) Runner {
99- r .SetFollower ()
79+ func (r * Runner ) InFrom (w ... * Runner ) * Runner {
10080 for _ , wr := range w {
101- wr .SetOut (r .inChan )
81+ // in := make(chan interface{})
82+ // go func(in chan interface{}) {
83+ // for msg := range in {
84+ // if err := r.Send(msg); err != nil {
85+ // return
86+ // }
87+ // }
88+ // }(in)
89+ wr .SetOut (r .inChan ) // nolint
10290 }
10391 return r
10492}
10593
106- // SetFollower Sets the worker as a follower and does not need to close it's in channel.
107- func (r * runner ) SetFollower () {
108- r .lock .Lock ()
109- r .isLeader = false
110- r .lock .Unlock ()
111- }
112-
113- // Start Starts the worker on processing.
114- func (r * runner ) Start () Runner {
115- r .startWork ()
116- return r
94+ // Start execute beforeFunc and launch worker processing.
95+ func (r * Runner ) Start () error {
96+ r .started .Do (func () {
97+ if err := r .beforeFunc (r .ctx ); err == nil {
98+ go r .work ()
99+ }
100+ })
101+ return nil
117102}
118103
119104// BeforeFunc Function to be run before worker starts processing.
120- func (r * runner ) BeforeFunc (f func (ctx context.Context ) error ) Runner {
105+ func (r * Runner ) BeforeFunc (f func (ctx context.Context ) error ) * Runner {
121106 r .beforeFunc = f
122107 return r
123108}
124109
125110// AfterFunc Function to be run after worker has stopped.
126- func (r * runner ) AfterFunc (f func (ctx context.Context , err error ) error ) Runner {
111+ // It can be used for logging and error management.
112+ // input can be retreive with context value:
113+ // ctx.Value(workers.InputKey{})
114+ // ⚠️ If an error is returned it stop Runner execution.
115+ func (r * Runner ) AfterFunc (f func (ctx context.Context , err error ) error ) * Runner {
127116 r .afterFunc = f
128117 return r
129118}
130119
120+ var ErrOutAlready = errors .New ("out already set" )
121+
131122// SetOut Allows the setting of a workers out channel, if not already set.
132- func (r * runner ) SetOut (c chan interface {}) {
123+ func (r * Runner ) SetOut (c chan interface {}) error {
133124 if r .outChan != nil {
134- return
125+ return ErrOutAlready
135126 }
136127 r .outChan = c
128+ return nil
137129}
138130
139- // SetDeadline allows a time to be set when the workers should stop.
140- // Deadline needs to be handled by the IsDone method.
141- func (r * runner ) SetDeadline (t time.Time ) Runner {
142- r .lock .Lock ()
143- defer r .lock .Unlock ()
131+ // SetDeadline allows a time to be set when the Runner should stop.
132+ // ⚠️ Should only be called before Start
133+ func (r * Runner ) SetDeadline (t time.Time ) * Runner {
144134 r .ctx , r .cancel = context .WithDeadline (r .ctx , t )
145135 return r
146136}
147137
148- // SetTimeout allows a time duration to be set when the workers should stop.
149- // Timeout needs to be handled by the IsDone method.
150- func (r * runner ) SetTimeout (duration time.Duration ) Runner {
151- r .lock .Lock ()
152- defer r .lock .Unlock ()
138+ // SetWorkerTimeout allows a time duration to be set when the workers should stop.
139+ // ⚠️ Should only be called before Start
140+ func (r * Runner ) SetWorkerTimeout (duration time.Duration ) * Runner {
153141 r .timeout = duration
154142 return r
155143}
156144
157- // Wait calls stop on workers and waits for the channel to drain.
158- // !!Should only be called when certain nothing will send to worker.
159- func (r * runner ) Wait () error {
160- r .waitForDrain ()
161- if err := <- r .Stop (); err != nil && ! errors .Is (err , context .Canceled ) {
162- return err
145+ // Wait close the input channel and waits it to drain and process.
146+ func (r * Runner ) Wait () * Runner {
147+ if r .inputCtx .Err () == nil {
148+ r .inputCancel ()
149+ close (r .inChan )
163150 }
164- return nil
165- }
166151
167- // Stop Stops the processing of a worker and closes it's channel in.
168- // Returns a blocking channel with type error.
169- // !!Should only be called when certain nothing will send to worker.
170- func (r * runner ) Stop () chan error {
171- r .done .Do (func () {
172- if r .inChan != nil && r .isLeader {
173- close (r .inChan )
174- }
175- })
176- return r .errChan
152+ <- r .done
153+
154+ return r
177155}
178156
179- // IsDone returns a channel signaling the workers context has been canceled.
180- func (r * runner ) IsDone () <- chan struct {} {
181- return r .ctx .Done ()
157+ // Stop Stops the processing of a worker and waits for workers to finish.
158+ func (r * Runner ) Stop () * Runner {
159+ r .cancel ()
160+ r .Wait ()
161+ return r
182162}
183163
184- // waitForSignal make sure we wait for a term signal and shutdown correctly
185- func (r * runner ) waitForSignal (signals ... os.Signal ) {
186- go func () {
187- signal .Notify (r .signalChan , signals ... )
188- <- r .signalChan
189- if r .cancel != nil {
190- r .cancel ()
191- }
164+ type InputKey struct {}
165+
166+ // work starts processing input and limits worker instance number.
167+ func (r * Runner ) work () {
168+ var wg sync.WaitGroup
169+
170+ defer func () {
171+ wg .Wait ()
172+ r .cancel ()
173+ close (r .done )
192174 }()
193- }
194175
195- // waitForDrain Waits for the limiter to be zeroed out and the in channel to be empty.
196- func (r * runner ) waitForDrain () {
197- for len (r .limiter ) > 0 || len (r .inChan ) > 0 {
198- // Wait for the drain.
199- }
200- }
176+ for {
177+ select {
178+ case <- r .ctx .Done ():
179+ return
180+ case input , open := <- r .inChan :
181+ if ! open {
182+ return
183+ }
184+ wg .Add (1 )
201185
202- // startWork Runs the before function and starts processing until one of three things happen.
203- // 1. A term signal is received or cancellation of context.
204- // 2. Stop function is called.
205- // 3. Worker returns an error.
206- func (r * runner ) startWork () {
207- var err error
208- if err = r .beforeFunc (r .ctx ); err != nil {
209- r .errChan <- err
210- return
211- }
212- if r .timeout > 0 {
213- r .ctx , r .cancel = context .WithTimeout (r .ctx , r .timeout )
214- }
215- r .wg .Add (1 )
216- go func () {
217- var workerWG = new (sync.WaitGroup )
218- var closeOnce = new (sync.Once )
219-
220- // write out error if not nil on exit.
221- defer func () {
222- workerWG .Wait ()
223- r .errChan <- err
224- closeOnce .Do (func () {
225- if r .outChan != nil {
226- close (r .outChan )
227- }
228- })
229- r .wg .Done ()
230- }()
231- for in := range r .inChan {
232- input := in
233186 r .limiter <- struct {}{}
234- workerWG .Add (1 )
187+
188+ inputCtx := context .WithValue (r .ctx , InputKey {}, input )
189+ workCtx , workCancel := context .WithCancel (inputCtx )
190+ if r .timeout > 0 {
191+ workCtx , workCancel = context .WithTimeout (inputCtx , r .timeout )
192+ }
193+
235194 go func () {
236195 defer func () {
237196 <- r .limiter
238- workerWG .Done ()
197+ workCancel ()
198+ wg .Done ()
239199 }()
240- if err := r .afterFunc (r .ctx , r .workFunc (input , r .outChan )); err != nil {
241- r .once .Do (func () {
242- r .errChan <- err
243- r .cancel ()
244- })
200+ if err := r .afterFunc (inputCtx , r .workFunc (workCtx , input , r .outChan )); err != nil {
201+ r .cancel ()
245202 }
246203 }()
247204 }
248- }()
205+ }
249206}
0 commit comments