@@ -19,13 +19,11 @@ package lib
1919
2020import (
2121 "errors"
22+ "github.com/paypal/hera/utility/logger"
2223 "os"
2324 "os/signal"
2425 "sync"
2526 "syscall"
26-
27- "github.com/paypal/hera/utility"
28- "github.com/paypal/hera/utility/logger"
2927)
3028
3129// HeraWorkerType defines the possible worker type
@@ -64,7 +62,7 @@ type WorkerBroker struct {
6462 // and restart the stopped workers.
6563 //
6664 pidworkermap map [int32 ]* WorkerClient
67- lock sync.Mutex
65+ lock sync.Mutex
6866
6967 //
7068 // loaded from cfg once and used later.
@@ -204,7 +202,9 @@ func (broker *WorkerBroker) GetWorkerPoolCfgs() (pCfgs []map[HeraWorkerType]*Wor
204202
205203// GetWorkerPool get the worker pool object for the type and id
206204// ids holds optional paramenters.
207- // ids[0] == instance id; ids[1] == shard id.
205+ //
206+ // ids[0] == instance id; ids[1] == shard id.
207+ //
208208// if a particular id is not set, it defaults to 0.
209209// TODO: interchange sid <--> instId since instId is not yet used
210210func (broker * WorkerBroker ) GetWorkerPool (wType HeraWorkerType , ids ... int ) (workerbroker * WorkerPool , err error ) {
@@ -273,59 +273,69 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
273273 // we can get all the pids in this call. double the size in case we
274274 // get none-hera defunct processes. +1 in case racing casue mapsize=0.
275275 //
276- var arraySize = 2 * len (broker .pidworkermap ) + 1
277- var defunctPids = make ([]int32 , arraySize )
278- if logger .GetLogger ().V (logger .Verbose ) {
279- logger .GetLogger ().Log (logger .Verbose , "Wait SIGCHLD len=" , arraySize - 1 , ", pwmap:" , broker .pidworkermap )
280- }
281- if arraySize > 0 {
282- utility .ReapDefunctPids (defunctPids )
283- }
284- if logger .GetLogger ().V (logger .Info ) {
285- logger .GetLogger ().Log (logger .Info , "exited worker" , defunctPids )
286- }
287- broker .lock .Lock ()
288- for i := 0 ; i < arraySize ; i ++ {
289- //
290- // last valid entry in stoppedpids is followed by one or more zeros.
291- //
292- if defunctPids [i ] == 0 {
276+ defunctPids := make ([]int32 , 0 )
277+ for {
278+ var status syscall.WaitStatus
279+
280+ //Reap exited children in non-blocking mode
281+ pid , err := syscall .Wait4 (- 1 , & status , syscall .WNOHANG , nil )
282+ if pid > 0 {
283+ if logger .GetLogger ().V (logger .Verbose ) {
284+ logger .GetLogger ().Log (logger .Verbose , "received worker exit signal for pid:" , pid , " status: " , status )
285+ }
286+ defunctPids = append (defunctPids , int32 (pid ))
287+ } else if pid == 0 {
293288 break
289+ } else {
290+ if errors .Is (err , syscall .ECHILD ) {
291+ break
292+ } else {
293+ logger .GetLogger ().Log (logger .Warning , "error in wait signal: " , err )
294+ }
294295 }
295- var workerclient = broker.pidworkermap [defunctPids [i ]]
296- if workerclient != nil {
297- delete (broker .pidworkermap , defunctPids [i ])
298- pool , err := GetWorkerBrokerInstance ().GetWorkerPool (workerclient .Type , workerclient .instID , workerclient .shardID )
299- if err != nil {
300- if logger .GetLogger ().V (logger .Alert ) {
301- logger .GetLogger ().Log (logger .Alert , "Can't get pool for" , workerclient , ":" , err )
296+ }
297+
298+ if len (defunctPids ) > 0 {
299+ if logger .GetLogger ().V (logger .Debug ) {
300+ logger .GetLogger ().Log (logger .Debug , "worker exit signal received from pids :" , defunctPids )
301+ }
302+ broker .lock .Lock ()
303+ for _ , pid := range defunctPids {
304+ var workerclient = broker .pidworkermap [pid ]
305+ if workerclient != nil {
306+ delete (broker .pidworkermap , pid )
307+ pool , err := GetWorkerBrokerInstance ().GetWorkerPool (workerclient .Type , workerclient .instID , workerclient .shardID )
308+ if err != nil {
309+ if logger .GetLogger ().V (logger .Alert ) {
310+ logger .GetLogger ().Log (logger .Alert , "Can't get pool for" , workerclient , ":" , err )
311+ }
312+ } else {
313+ //
314+ // a worker could be terminated while serving a request.
315+ // in these cases, doRead() in workerclient will get an
316+ // EOF and exit. doSession() in coordinator will get the
317+ // worker outCh closed event and exit, at which point
318+ // coordinator itself calls returnworker to set connstate
319+ // from assign to idle.
320+ // no need to publish the following event again.
321+ //
322+ //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
323+ // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
324+ //}
325+ if logger .GetLogger ().V (logger .Debug ) {
326+ logger .GetLogger ().Log (logger .Debug , "worker (id=" , workerclient .ID , "pid=" , workerclient .pid , ") received signal. transits from state " , workerclient .Status , " to terminated." )
327+ }
328+ workerclient .setState (wsUnset ) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
329+ pool .RestartWorker (workerclient )
302330 }
303331 } else {
304- //
305- // a worker could be terminated while serving a request.
306- // in these cases, doRead() in workerclient will get an
307- // EOF and exit. doSession() in coordinator will get the
308- // worker outCh closed event and exit, at which point
309- // coordinator itself calls returnworker to set connstate
310- // from assign to idle.
311- // no need to publish the following event again.
312- //
313- //if (workerclient.Status == WAIT) || (workerclient.Status == BUSY) {
314- // GetStateLog().PublishStateEvent(StateEvent{eType:ConnStateEvt, shardId:workerclient.shardId, wType:workerclient.Type, instId:workerclient.instId, oldCState:Assign, newCState:Idle})
315- //}
316- if logger .GetLogger ().V (logger .Debug ) {
317- logger .GetLogger ().Log (logger .Debug , "worker (pid=" , workerclient .pid , ") received signal. transits from state " , workerclient .Status , " to terminated." )
332+ if logger .GetLogger ().V (logger .Alert ) {
333+ logger .GetLogger ().Log (logger .Alert , "Exited worker pid =" , pid , " not found" )
318334 }
319- workerclient .setState (wsUnset ) // Set the state to UNSET to make sure worker does not stay in FNSH state so long
320- pool .RestartWorker (workerclient )
321- }
322- } else {
323- if logger .GetLogger ().V (logger .Alert ) {
324- logger .GetLogger ().Log (logger .Alert , "Exited worker pid =" , defunctPids [i ], " not found" )
325335 }
326336 }
337+ broker .lock .Unlock ()
327338 }
328- broker .lock .Unlock ()
329339 case syscall .SIGTERM :
330340 if logger .GetLogger ().V (logger .Debug ) {
331341 logger .GetLogger ().Log (logger .Debug , "Got SIGTERM" )
@@ -365,8 +375,8 @@ func (broker *WorkerBroker) startWorkerMonitor() (err error) {
365375}
366376
367377/*
368- resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of
369- the number of workers changed
378+ resizePool calls workerpool.Resize to resize a worker pool when the dynamic configuration of
379+ the number of workers changed
370380*/
371381func (broker * WorkerBroker ) resizePool (wType HeraWorkerType , maxWorkers int , shardID int ) {
372382 broker .poolCfgs [0 ][wType ].maxWorkerCnt = maxWorkers
@@ -381,7 +391,7 @@ func (broker *WorkerBroker) resizePool(wType HeraWorkerType, maxWorkers int, sha
381391}
382392
383393/*
384- changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools
394+ changeMaxWorkers is called when the dynamic config changed, it calls resizePool() for all the pools
385395*/
386396func (broker * WorkerBroker ) changeMaxWorkers () {
387397 wW := GetNumWWorkers (0 )
0 commit comments