diff --git a/helper/manager.go b/helper/manager.go index df7e179e..e77f8823 100644 --- a/helper/manager.go +++ b/helper/manager.go @@ -20,11 +20,13 @@ type Runner interface { } func NewManager(r Runner) *Manager { + lockName := os.Getenv("LOCK_NAME") return &Manager{ runner: r, logger: log.WithField("type", r.Name()), stopCh: make(chan interface{}), voluntarilyReleaseLockCh: make(chan interface{}), + lockName: lockName, } } @@ -37,6 +39,7 @@ type Manager struct { logger *log.Entry // logger for the consul connection struct stopCh chan interface{} // internal channel used to stop all go-routines when gracefully shutting down voluntarilyReleaseLockCh chan interface{} + lockName string } // cleanup will do cleanup tasks when the reconciler is shutting down @@ -50,6 +53,13 @@ func (m *Manager) cleanup() { m.logger.Debugf("Cleanup complete") } +func (m *Manager) getLockName() string { + if m.lockName != "" { + return m.lockName + } + return m.runner.Name() +} + // continuouslyAcquireConsulLeadership waits to acquire the lock to the Consul KV key. // it will run until the stopCh is closed func (m *Manager) continuouslyAcquireConsulLeadership() error { @@ -76,7 +86,7 @@ func (m *Manager) continuouslyAcquireConsulLeadership() error { // Read the Last Change Time from Consul KV, so we don't re-process tasks over and over on restart func (m *Manager) restoreLastChangeTime() interface{} { - kv, _, err := m.client.KV().Get(fmt.Sprintf("nomad-firehose/%s.value", m.runner.Name()), nil) + kv, _, err := m.client.KV().Get(fmt.Sprintf("nomad-firehose/%s.value", m.getLockName()), nil) if err != nil { return 0 } @@ -102,8 +112,8 @@ func (m *Manager) restoreLastChangeTime() interface{} { func (m *Manager) acquireConsulLeadership() error { var err error m.lock, err = m.client.LockOpts(&consulapi.LockOptions{ - Key: fmt.Sprintf("nomad-firehose/%s.lock", m.runner.Name()), - SessionName: fmt.Sprintf("nomad-firehose-%s", m.runner.Name()), + Key: fmt.Sprintf("nomad-firehose/%s.lock", m.getLockName()), + SessionName: fmt.Sprintf("nomad-firehose-%s", m.getLockName()), MonitorRetries: 10, MonitorRetryTime: 5 * time.Second, }) @@ -161,7 +171,7 @@ func (m *Manager) acquireConsulLeadership() error { m.logger.Infof("Writing lastChangedTime to KV: %s", r) kv := &consulapi.KVPair{ - Key: fmt.Sprintf("nomad-firehose/%s.value", m.runner.Name()), + Key: fmt.Sprintf("nomad-firehose/%s.value", m.getLockName()), Value: []byte(r), } _, err := m.client.KV().Put(kv, nil)