Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions helper/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ type Runner interface {
}

func NewManager(r Runner) *Manager {
lockName := os.Getenv("LOCK_NAME")
return &Manager{
runner: r,
logger: log.WithField("type", r.Name()),
stopCh: make(chan interface{}),
voluntarilyReleaseLockCh: make(chan interface{}),
lockName: lockName,
}
}

Expand All @@ -37,6 +39,7 @@ type Manager struct {
logger *log.Entry // logger for the consul connection struct
stopCh chan interface{} // internal channel used to stop all go-routines when gracefully shutting down
voluntarilyReleaseLockCh chan interface{}
lockName string
}

// cleanup will do cleanup tasks when the reconciler is shutting down
Expand All @@ -50,6 +53,13 @@ func (m *Manager) cleanup() {
m.logger.Debugf("Cleanup complete")
}

func (m *Manager) getLockName() string {
if m.lockName != "" {
return m.lockName
}
return m.runner.Name()
}

// continuouslyAcquireConsulLeadership waits to acquire the lock to the Consul KV key.
// it will run until the stopCh is closed
func (m *Manager) continuouslyAcquireConsulLeadership() error {
Expand All @@ -76,7 +86,7 @@ func (m *Manager) continuouslyAcquireConsulLeadership() error {

// Read the Last Change Time from Consul KV, so we don't re-process tasks over and over on restart
func (m *Manager) restoreLastChangeTime() interface{} {
kv, _, err := m.client.KV().Get(fmt.Sprintf("nomad-firehose/%s.value", m.runner.Name()), nil)
kv, _, err := m.client.KV().Get(fmt.Sprintf("nomad-firehose/%s.value", m.getLockName()), nil)
if err != nil {
return 0
}
Expand All @@ -102,8 +112,8 @@ func (m *Manager) restoreLastChangeTime() interface{} {
func (m *Manager) acquireConsulLeadership() error {
var err error
m.lock, err = m.client.LockOpts(&consulapi.LockOptions{
Key: fmt.Sprintf("nomad-firehose/%s.lock", m.runner.Name()),
SessionName: fmt.Sprintf("nomad-firehose-%s", m.runner.Name()),
Key: fmt.Sprintf("nomad-firehose/%s.lock", m.getLockName()),
SessionName: fmt.Sprintf("nomad-firehose-%s", m.getLockName()),
MonitorRetries: 10,
MonitorRetryTime: 5 * time.Second,
})
Expand Down Expand Up @@ -161,7 +171,7 @@ func (m *Manager) acquireConsulLeadership() error {

m.logger.Infof("Writing lastChangedTime to KV: %s", r)
kv := &consulapi.KVPair{
Key: fmt.Sprintf("nomad-firehose/%s.value", m.runner.Name()),
Key: fmt.Sprintf("nomad-firehose/%s.value", m.getLockName()),
Value: []byte(r),
}
_, err := m.client.KV().Put(kv, nil)
Expand Down