Skip to content

Commit 697eca1

Browse files
committed
Notifications: Address Code Review
- Bump IGL to latest changes in Icinga/icinga-go-library#145. - Allow specifying which pipeline keys are relevant, ignore others. - Allow specifying which pipeline key should be parsed in which type. - Create history.DowntimeHistoryMeta as a chimera combining history.DowntimeHistory and history.HistoryDowntime to allow access event_type, distinguishing between downtime_start and downtime_end. - Trace times for submission steps in the worker. Turns out, the single threaded worker blocks roughly two seconds for each Client.ProcessEvent method call. This might sum up to minutes if lots of events are processed at once. My current theory is that the delay results in the expensive bcrypt hash comparison on Notifications.
1 parent 49b7d98 commit 697eca1

File tree

6 files changed

+249
-167
lines changed

6 files changed

+249
-167
lines changed

cmd/icingadb/main.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -170,23 +170,26 @@ func run() int {
170170
sig := make(chan os.Signal, 1)
171171
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
172172

173-
var notificationsSourceCallback func(database.Entity)
174-
if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" {
175-
logger.Info("Starting Icinga Notifications source")
176-
177-
notificationsSource := notifications.NewNotificationsSource(
178-
ctx,
179-
db,
180-
rc,
181-
logs.GetChildLogger("notifications-source"),
182-
cfg)
183-
notificationsSourceCallback = notificationsSource.Submit
184-
}
185-
186173
go func() {
174+
var callback func(database.Entity)
175+
var callbackKeyStructPtr map[string]any
176+
177+
if cfg := cmd.Config.NotificationsSource; cfg.ApiBaseUrl != "" {
178+
logger.Info("Starting Icinga Notifications source")
179+
180+
notificationsSource := notifications.NewNotificationsClient(
181+
ctx,
182+
db,
183+
rc,
184+
logs.GetChildLogger("notifications-source"),
185+
cfg)
186+
callback = notificationsSource.Submit
187+
callbackKeyStructPtr = notifications.SyncKeyStructPtrs
188+
}
189+
187190
logger.Info("Starting history sync")
188191

189-
if err := hs.Sync(ctx, notificationsSourceCallback); err != nil && !utils.IsContextCanceled(err) {
192+
if err := hs.Sync(ctx, callbackKeyStructPtr, callback); err != nil && !utils.IsContextCanceled(err) {
190193
logger.Fatalf("%+v", err)
191194
}
192195
}()

internal/config/config.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"github.com/creasty/defaults"
55
"github.com/icinga/icinga-go-library/database"
66
"github.com/icinga/icinga-go-library/logging"
7-
"github.com/icinga/icinga-go-library/notifications"
7+
"github.com/icinga/icinga-go-library/notifications/source"
88
"github.com/icinga/icinga-go-library/redis"
99
"github.com/icinga/icingadb/pkg/icingadb/history"
1010
"github.com/pkg/errors"
@@ -16,11 +16,11 @@ const DefaultConfigPath = "/etc/icingadb/config.yml"
1616

1717
// Config defines Icinga DB config.
1818
type Config struct {
19-
Database database.Config `yaml:"database" envPrefix:"DATABASE_"`
20-
Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"`
21-
Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"`
22-
Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"`
23-
NotificationsSource notifications.Config `yaml:"notifications-source" envPrefix:"NOTIFICATIONS_SOURCE_"`
19+
Database database.Config `yaml:"database" envPrefix:"DATABASE_"`
20+
Redis redis.Config `yaml:"redis" envPrefix:"REDIS_"`
21+
Logging logging.Config `yaml:"logging" envPrefix:"LOGGING_"`
22+
Retention RetentionConfig `yaml:"retention" envPrefix:"RETENTION_"`
23+
NotificationsSource source.Config `yaml:"notifications-source" envPrefix:"NOTIFICATIONS_SOURCE_"`
2424
}
2525

2626
func (c *Config) SetDefaults() {

pkg/icingadb/history/sync.go

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,15 @@ func NewSync(db *database.DB, redis *redis.Client, logger *logging.Logger) *Sync
3939

4040
// Sync synchronizes Redis history streams from s.redis to s.db and deletes the original data on success.
4141
//
42-
// If not nil, the callback function is appended to each synchronization pipeline and called before the entry is deleted
43-
// from Redis.
44-
func (s Sync) Sync(ctx context.Context, callback func(database.Entity)) error {
42+
// An optional callback and callbackKeyStructPtr might be given. Both most either be nil or not nil.
43+
//
44+
// The callbackKeyStructPtr says which pipeline keys should be mapped to which type, identified by a struct pointer. If
45+
// a key is missing from the map, it will not be used for the callback. The callback function itself shall not block.
46+
func (s Sync) Sync(ctx context.Context, callbackKeyStructPtr map[string]any, callback func(database.Entity)) error {
47+
if (callbackKeyStructPtr == nil) != (callback == nil) {
48+
return fmt.Errorf("either both callbackKeyStructPtr and callback must be nil or none")
49+
}
50+
4551
g, ctx := errgroup.WithContext(ctx)
4652

4753
for key, pipeline := range syncPipelines {
@@ -67,8 +73,13 @@ func (s Sync) Sync(ctx context.Context, callback func(database.Entity)) error {
6773
// forward the entry after it has completed its own sync so that later stages can rely on previous stages being
6874
// executed successfully.
6975

70-
if callback != nil {
71-
pipeline = append(pipeline, makeCallbackStageFunc(callback))
76+
// Shadowed variable to allow appending custom callbacks.
77+
pipeline := pipeline
78+
if callbackKeyStructPtr != nil {
79+
_, ok := callbackKeyStructPtr[key]
80+
if ok {
81+
pipeline = append(pipeline, makeCallbackStageFunc(callbackKeyStructPtr, callback))
82+
}
7283
}
7384

7485
ch := make([]chan redis.XMessage, len(pipeline)+1)
@@ -371,28 +382,17 @@ func userNotificationStage(ctx context.Context, s Sync, key string, in <-chan re
371382

372383
// makeCallbackStageFunc creates a new stageFunc calling the given callback function for each message.
373384
//
385+
// The keyStructPtrs map decides what kind of database.Entity type will be used for the input data based on the key.
386+
//
374387
// The callback call is blocking and the message will be forwarded to the out channel after the function has returned.
375388
// Thus, please ensure this function does not block too long.
376-
func makeCallbackStageFunc(callback func(database.Entity)) stageFunc {
389+
func makeCallbackStageFunc(keyStructPtrs map[string]any, callback func(database.Entity)) stageFunc {
377390
return func(ctx context.Context, _ Sync, key string, in <-chan redis.XMessage, out chan<- redis.XMessage) error {
378391
defer close(out)
379392

380-
var structPtr database.Entity
381-
switch key { // keep in sync with syncPipelines below
382-
case "notification":
383-
structPtr = (*v1.NotificationHistory)(nil)
384-
case "state":
385-
structPtr = (*v1.StateHistory)(nil)
386-
case "downtime":
387-
structPtr = (*v1.DowntimeHistory)(nil)
388-
case "comment":
389-
structPtr = (*v1.CommentHistory)(nil)
390-
case "flapping":
391-
structPtr = (*v1.FlappingHistory)(nil)
392-
case "acknowledgement":
393-
structPtr = (*v1.AcknowledgementHistory)(nil)
394-
default:
395-
return fmt.Errorf("unsupported key %q", key)
393+
structPtr, ok := keyStructPtrs[key]
394+
if !ok {
395+
return fmt.Errorf("can't lookup struct pointer for key %q", key)
396396
}
397397

398398
structifier := structify.MakeMapStructifier(
@@ -409,7 +409,7 @@ func makeCallbackStageFunc(callback func(database.Entity)) stageFunc {
409409

410410
val, err := structifier(msg.Values)
411411
if err != nil {
412-
return errors.Wrapf(err, "can't structify values %#v for %s", msg.Values, key)
412+
return errors.Wrapf(err, "can't structify values %#v for %q", msg.Values, key)
413413
}
414414

415415
entity, ok := val.(database.Entity)
@@ -427,32 +427,41 @@ func makeCallbackStageFunc(callback func(database.Entity)) stageFunc {
427427
}
428428
}
429429

430+
const (
431+
SyncPipelineAcknowledgement = "acknowledgement"
432+
SyncPipelineComment = "comment"
433+
SyncPipelineDowntime = "downtime"
434+
SyncPipelineFlapping = "flapping"
435+
SyncPipelineNotification = "notification"
436+
SyncPipelineState = "state"
437+
)
438+
430439
var syncPipelines = map[string][]stageFunc{
431-
"notification": {
432-
writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history
433-
userNotificationStage, // user_notification_history (depends on notification_history)
434-
writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history)
440+
SyncPipelineAcknowledgement: {
441+
writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history
442+
writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history)
435443
},
436-
"state": {
437-
writeOneEntityStage((*v1.StateHistory)(nil)), // state_history
438-
writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history)
439-
writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state
444+
SyncPipelineComment: {
445+
writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history
446+
writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history)
440447
},
441-
"downtime": {
448+
SyncPipelineDowntime: {
442449
writeOneEntityStage((*v1.DowntimeHistory)(nil)), // downtime_history
443450
writeOneEntityStage((*v1.HistoryDowntime)(nil)), // history (depends on downtime_history)
444451
writeOneEntityStage((*v1.SlaHistoryDowntime)(nil)), // sla_history_downtime
445452
},
446-
"comment": {
447-
writeOneEntityStage((*v1.CommentHistory)(nil)), // comment_history
448-
writeOneEntityStage((*v1.HistoryComment)(nil)), // history (depends on comment_history)
449-
},
450-
"flapping": {
453+
SyncPipelineFlapping: {
451454
writeOneEntityStage((*v1.FlappingHistory)(nil)), // flapping_history
452455
writeOneEntityStage((*v1.HistoryFlapping)(nil)), // history (depends on flapping_history)
453456
},
454-
"acknowledgement": {
455-
writeOneEntityStage((*v1.AcknowledgementHistory)(nil)), // acknowledgement_history
456-
writeOneEntityStage((*v1.HistoryAck)(nil)), // history (depends on acknowledgement_history)
457+
SyncPipelineNotification: {
458+
writeOneEntityStage((*v1.NotificationHistory)(nil)), // notification_history
459+
userNotificationStage, // user_notification_history (depends on notification_history)
460+
writeOneEntityStage((*v1.HistoryNotification)(nil)), // history (depends on notification_history)
461+
},
462+
SyncPipelineState: {
463+
writeOneEntityStage((*v1.StateHistory)(nil)), // state_history
464+
writeOneEntityStage((*v1.HistoryState)(nil)), // history (depends on state_history)
465+
writeMultiEntityStage(stateHistoryToSlaEntity), // sla_history_state
457466
},
458467
}

pkg/icingadb/v1/history/downtime.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@ func (*HistoryDowntime) TableName() string {
8888
return "history"
8989
}
9090

91+
type DowntimeHistoryMeta struct {
92+
DowntimeHistoryEntity `json:",inline"`
93+
DowntimeHistory `json:",inline"`
94+
HistoryMeta `json:",inline"`
95+
}
96+
9197
type SlaHistoryDowntime struct {
9298
DowntimeHistoryEntity `json:",inline"`
9399
HistoryTableMeta `json:",inline"`

0 commit comments

Comments
 (0)