@@ -13,6 +13,7 @@ import (
1313 "slices"
1414 "strings"
1515 "sync"
16+ "sync/atomic"
1617 "syscall"
1718 "time"
1819
@@ -47,6 +48,7 @@ import (
4748 "github.com/buildbuddy-io/buildbuddy/server/util/status"
4849 "github.com/buildbuddy-io/buildbuddy/server/util/tracing"
4950 "github.com/buildbuddy-io/buildbuddy/server/util/uuid"
51+ "github.com/docker/go-units"
5052 "github.com/prometheus/client_golang/prometheus"
5153 "golang.org/x/sync/errgroup"
5254 "google.golang.org/protobuf/types/known/durationpb"
@@ -140,7 +142,7 @@ type WarmupConfig struct {
140142}
141143
142144// state indicates the current state of a taskRunner.
143- type state int
145+ type state int32
144146
145147func (s state ) String () string {
146148 switch s {
@@ -157,6 +159,21 @@ func (s state) String() string {
157159 }
158160}
159161
162+ func (s state ) ShortString () string {
163+ switch s {
164+ case initial :
165+ return "I"
166+ case paused :
167+ return "P"
168+ case ready :
169+ return "R"
170+ case removed :
171+ return "X"
172+ default :
173+ return "?"
174+ }
175+ }
176+
160177type runnerSlice []* taskRunner
161178
162179func (rs runnerSlice ) String () string {
@@ -194,8 +211,10 @@ type taskRunner struct {
194211
195212 // task is the current task assigned to the runner.
196213 task * repb.ExecutionTask
197- // State is the current state of the runner as it pertains to reuse.
198- state state
214+ // State is the current state of the runner as it pertains to reuse. It is
215+ // atomic because in some cases we want to print runner metadata for debug
216+ // purposes but without having to hold the pool lock.
217+ state atomic.Int32
199218
200219 worker * persistentworker.Worker
201220
@@ -218,9 +237,14 @@ func (r *taskRunner) Metadata() *espb.RunnerMetadata {
218237}
219238
220239func (r * taskRunner ) String () string {
221- // Note: we don't log r.state here as this can make log statements calling
222- // this function racy. Beware of this if re-adding r.state below.
223- return fmt .Sprintf ("%s:%d:%s" , r .debugID , r .metadata .GetTaskNumber (), keyString (r .key ))
240+ return fmt .Sprintf ("%s:%s:%d:%s" , r .debugID , r .getState ().ShortString (), r .metadata .GetTaskNumber (), keyString (r .key ))
241+ }
242+
243+ func (r * taskRunner ) setState (s state ) {
244+ r .state .Store (int32 (s ))
245+ }
246+ func (r * taskRunner ) getState () state {
247+ return state (r .state .Load ())
224248}
225249
226250func (r * taskRunner ) pullCredentials () (oci.Credentials , error ) {
@@ -390,13 +414,8 @@ func (r *taskRunner) Run(ctx context.Context, ioStats *repb.IOStats) (res *inter
390414 }
391415
392416 // Get the container to "ready" state so that we can exec commands in it.
393- //
394- // TODO(bduffany): Make this access to r.state thread-safe. The pool can be
395- // shutdown while this func is executing, which concurrently sets the runner
396- // state to "removed". This doesn't cause any known issues right now, but is
397- // error prone.
398417 r .p .mu .RLock ()
399- s := r .state
418+ s := r .getState ()
400419 r .p .mu .RUnlock ()
401420 switch s {
402421 case initial :
@@ -415,7 +434,7 @@ func (r *taskRunner) Run(ctx context.Context, ioStats *repb.IOStats) (res *inter
415434 return commandutil .ErrorResult (err )
416435 }
417436 r .p .mu .Lock ()
418- r .state = ready
437+ r .setState ( ready )
419438 r .p .mu .Unlock ()
420439 case ready :
421440 case removed :
@@ -498,8 +517,8 @@ func (r *taskRunner) shutdown(ctx context.Context) error {
498517
499518func (r * taskRunner ) Remove (ctx context.Context ) error {
500519 r .p .mu .Lock ()
501- s := r .state
502- r .state = removed
520+ s := r .getState ()
521+ r .setState ( removed )
503522 r .p .mu .Unlock ()
504523 if s == removed {
505524 return nil
@@ -696,10 +715,10 @@ func (p *pool) checkAddPreconditions(r *taskRunner) *labeledError {
696715 }
697716 // Note: shutdown can change the state to removed, so we need the lock to be
698717 // held for this check.
699- if r .state != ready {
700- alert .UnexpectedEvent ("unexpected_runner_state" , "Unexpected runner state %d during add()" , r .state )
718+ if r .getState () != ready {
719+ alert .UnexpectedEvent ("unexpected_runner_state" , "Unexpected runner state %d during add()" , r .getState () )
701720 return & labeledError {
702- status .InternalErrorf ("unexpected runner state %d; this should never happen" , r .state ),
721+ status .InternalErrorf ("unexpected runner state %d; this should never happen" , r .getState () ),
703722 "unexpected_runner_state" ,
704723 }
705724 }
@@ -774,25 +793,29 @@ func (p *pool) add(ctx context.Context, r *taskRunner) *labeledError {
774793 }
775794 }
776795
777- shouldEvict := func () bool {
796+ shouldEvict := func () ( bool , string ) {
778797 // If pooling this runner would put us over the max number of pooled
779798 // runners, we need to evict a runner.
780799 if p .maxRunnerCount > 0 && p .pausedRunnerCount ()+ 1 > p .maxRunnerCount {
781- return true
800+ return true , fmt . Sprintf ( "max runner count exceeded (max=%d)" , p . maxRunnerCount )
782801 }
783802 // If pooling this runner would put us over the total memory limit,
784803 // we need to evict a runner.
785804 if p .maxTotalRunnerMemoryUsageBytes > 0 && p .pausedRunnerMemoryUsageBytes ()+ stats .MemoryBytes > p .maxTotalRunnerMemoryUsageBytes {
786- return true
805+ return true , fmt . Sprintf ( "max runner memory usage exceeded (max=%s)" , units . BytesSize ( float64 ( p . maxTotalRunnerMemoryUsageBytes )))
787806 }
788807 // Otherwise, we don't need to evict.
789- return false
808+ return false , ""
790809 }
791- for shouldEvict () {
810+ for {
811+ evict , reason := shouldEvict ()
812+ if ! evict {
813+ break
814+ }
792815 // Evict the oldest (first) paused runner to make room for the new one.
793816 evictIndex := - 1
794817 for i , r := range p .runners {
795- if r .state == paused {
818+ if r .getState () == paused {
796819 evictIndex = i
797820 break
798821 }
@@ -805,7 +828,7 @@ func (p *pool) add(ctx context.Context, r *taskRunner) *labeledError {
805828 }
806829
807830 r := p .runners [evictIndex ]
808- log .Infof ("Evicting runner %s (pool max count %d exceeded). " , r , p . maxRunnerCount )
831+ log .Infof ("Evicting runner %s (%s) " , r , reason )
809832 p .runners = append (p .runners [:evictIndex ], p .runners [evictIndex + 1 :]... )
810833
811834 metrics .RunnerPoolEvictions .Inc ()
@@ -831,7 +854,7 @@ func (p *pool) add(ctx context.Context, r *taskRunner) *labeledError {
831854 metrics .RunnerPoolCount .Inc ()
832855
833856 // Officially mark this runner paused and ready for reuse.
834- r .state = paused
857+ r .setState ( paused )
835858
836859 return nil
837860}
@@ -1157,7 +1180,8 @@ func (p *pool) newRunner(ctx context.Context, key *rnpb.RunnerKey, props *platfo
11571180 r .removeCallback = func () {
11581181 p .pendingRemovals .Done ()
11591182 }
1160- log .CtxInfof (ctx , "Created new %s runner %s for task" , props .WorkloadIsolationType , r )
1183+
1184+ log .CtxInfof (ctx , "Created new runner for task (runner=%q, type=%s, recyclable=%v)" , r , props .WorkloadIsolationType , props .RecycleRunner )
11611185 return r , nil
11621186}
11631187
@@ -1285,7 +1309,7 @@ func (p *pool) take(ctx context.Context, key *rnpb.RunnerKey) *taskRunner {
12851309
12861310 for i := len (p .runners ) - 1 ; i >= 0 ; i -- {
12871311 r := p .runners [i ]
1288- if key .GroupId != r .key .GroupId || r .state != paused {
1312+ if key .GroupId != r .key .GroupId || r .getState () != paused {
12891313 continue
12901314 }
12911315 // Check for an exact match on the runner pool keys.
@@ -1298,7 +1322,7 @@ func (p *pool) take(ctx context.Context, key *rnpb.RunnerKey) *taskRunner {
12981322 continue
12991323 }
13001324
1301- r .state = ready
1325+ r .setState ( ready )
13021326
13031327 metrics .RunnerPoolCount .Dec ()
13041328 metrics .RunnerPoolDiskUsageBytes .Sub (float64 (r .diskUsageBytes ))
@@ -1334,7 +1358,7 @@ func (p *pool) ActiveRunnerCount() int {
13341358func (p * pool ) pausedRunnerCount () int {
13351359 n := 0
13361360 for _ , r := range p .runners {
1337- if r .state == paused {
1361+ if r .getState () == paused {
13381362 n ++
13391363 }
13401364 }
@@ -1344,7 +1368,7 @@ func (p *pool) pausedRunnerCount() int {
13441368func (p * pool ) pausedRunnerMemoryUsageBytes () int64 {
13451369 b := int64 (0 )
13461370 for _ , r := range p .runners {
1347- if r .state == paused {
1371+ if r .getState () == paused {
13481372 b += r .memoryUsageBytes
13491373 }
13501374 }
@@ -1362,7 +1386,7 @@ func (p *pool) Shutdown(ctx context.Context) error {
13621386 // grace period expiring.
13631387 var pausedRunners , activeRunners []* taskRunner
13641388 for _ , r := range p .runners {
1365- if r .state == paused {
1389+ if r .getState () == paused {
13661390 pausedRunners = append (pausedRunners , r )
13671391 } else {
13681392 activeRunners = append (activeRunners , r )
@@ -1381,7 +1405,6 @@ func (p *pool) Shutdown(ctx context.Context) error {
13811405 // to finish (if applicable). A single runner that takes a long time to
13821406 // upload its outputs should not block other runners from working on
13831407 // workspace removal in the meantime.
1384- r := r
13851408 go func () {
13861409 removeResults <- r .RemoveWithTimeout (ctx )
13871410 }()
@@ -1447,15 +1470,21 @@ func (p *pool) TryRecycle(ctx context.Context, r interfaces.Runner, finishedClea
14471470 return
14481471 }
14491472 p .mu .Lock ()
1450- state := cr .state
1473+ state := cr .getState ()
14511474 p .mu .Unlock ()
14521475 if ! finishedCleanly || cr .doNotReuse || state != ready {
14531476 log .CtxWarningf (ctx , "Failed to recycle runner %s due to previous execution error" , cr )
1477+ metrics .RunnerPoolFailedRecycleAttempts .With (prometheus.Labels {
1478+ metrics .RunnerPoolFailedRecycleReason : "execution_error" ,
1479+ }).Inc ()
14541480 return
14551481 }
14561482 // Clean the workspace before recycling the runner (to save on disk space).
14571483 if err := cr .Workspace .Clean (); err != nil {
14581484 log .CtxErrorf (ctx , "Failed to recycle runner %s: failed to clean workspace: %s" , cr , err )
1485+ metrics .RunnerPoolFailedRecycleAttempts .With (prometheus.Labels {
1486+ metrics .RunnerPoolFailedRecycleReason : "clean_workspace_failed" ,
1487+ }).Inc ()
14591488 return
14601489 }
14611490
0 commit comments