Skip to content

Commit c124d7e

Browse files
yhabteabjulianbrost
authored andcommitted
Track event rules version in RuntimeConfig
Instead of having to determine the rules version with each incoming request by the listener, we can just make use the already existing incremental update mechanism to keep the version up-to-date. This way, the version will only be updated when a rule is added, changed, or deleted, so there will be no performance impact on the listener side. Apart from that, the listener responds and verifies the rules version on a per-source basis now. As required by the internal document, event rules will be tied to their source because only this source knows how to evaluate them. Therefore, the database `rule` table has been extended to include a `source_id` column, and consequently, each source will only receive the rules that are relevant to it. Thus, initially, when a source submits an event, it will likely be rejected but at the same time receive the current rules, so it can retry the event submission with the correct event rules. This way, no extra HTTP request is needed to fetch the rules, as we will always respond with the newest ones whenever we detect that they're using an outdated event rules config.
1 parent 8c4d689 commit c124d7e

File tree

8 files changed

+146
-46
lines changed

8 files changed

+146
-46
lines changed

internal/config/rule.go

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,36 @@ import (
66
"slices"
77
)
88

9+
// SourceRulesInfo holds information about the rules associated with a specific source.
10+
type SourceRulesInfo struct {
11+
// Version is the version of the rules for the source.
12+
//
13+
// It is a monotonically increasing number that is updated whenever a rule is added, modified, or deleted.
14+
// With each state change of the rules referenced by RuleIDs, the Version will always be incremented
15+
// by 1, starting from 0. When there are no configured rules for the source, Version will be reset to 0.
16+
//
17+
// The Version is not unique across different sources, but it is unique for a specific source at a specific time.
18+
Version uint64
19+
20+
// RuleIDs is a list of rule IDs associated with a specific source.
21+
//
22+
// It is used to quickly access the rules for a specific source without iterating over all rules.
23+
RuleIDs []int64
24+
}
25+
26+
// RuleSet represents the set of event rules currently loaded in the runtime configuration.
27+
// It contains the rules and their associated information, such as the source they belong to and their version.
28+
type RuleSet struct {
29+
Rules map[int64]*rule.Rule // rules is a map of rule.Rule by their ID.
30+
31+
RulesBySource map[int64]*SourceRulesInfo // RulesBySource maps source IDs to their rules and version information.
32+
}
33+
934
// applyPendingRules synchronizes changed rules.
1035
func (r *RuntimeConfig) applyPendingRules() {
36+
// Keep track of sources the rules were updated for, so we can update their version later.
37+
updatedSources := make(map[int64]struct{})
38+
1139
incrementalApplyPending(
1240
r,
1341
&r.Rules, &r.configChange.Rules,
@@ -21,6 +49,17 @@ func (r *RuntimeConfig) applyPendingRules() {
2149
}
2250

2351
newElement.Escalations = make(map[int64]*rule.Escalation)
52+
updatedSources[newElement.SourceID] = struct{}{}
53+
if r.RulesBySource == nil {
54+
r.RulesBySource = make(map[int64]*SourceRulesInfo)
55+
}
56+
57+
// Add the new rule to the per-source rules cache.
58+
if sourceInfo := r.RulesBySource[newElement.SourceID]; sourceInfo == nil {
59+
r.RulesBySource[newElement.SourceID] = &SourceRulesInfo{RuleIDs: []int64{newElement.ID}}
60+
} else {
61+
sourceInfo.RuleIDs = append(sourceInfo.RuleIDs, newElement.ID)
62+
}
2463
return nil
2564
},
2665
func(curElement, update *rule.Rule) error {
@@ -40,10 +79,35 @@ func (r *RuntimeConfig) applyPendingRules() {
4079

4180
// ObjectFilter{,Expr} are being initialized by config.IncrementalConfigurableInitAndValidatable.
4281
curElement.ObjectFilterExpr = update.ObjectFilterExpr
82+
updatedSources[curElement.SourceID] = struct{}{}
4383

4484
return nil
4585
},
46-
nil)
86+
func(delElement *rule.Rule) error {
87+
if sourceInfo, ok := r.RulesBySource[delElement.SourceID]; ok {
88+
sourceInfo.RuleIDs = slices.DeleteFunc(sourceInfo.RuleIDs, func(id int64) bool {
89+
return id == delElement.ID
90+
})
91+
if len(sourceInfo.RuleIDs) == 0 {
92+
delete(r.RulesBySource, delElement.SourceID) // Remove the source if no rules are left.
93+
}
94+
}
95+
return nil
96+
},
97+
)
98+
99+
// After applying the rules, we need to update the version of the sources that were modified.
100+
// This is done to ensure that the version is incremented whenever a rule is added, modified,
101+
// or deleted only once per applyPendingRules call, even if multiple rules from the same source
102+
// were changed.
103+
for sourceID := range updatedSources {
104+
if r.RulesBySource != nil {
105+
if sourceInfo, ok := r.RulesBySource[sourceID]; ok {
106+
// Invariant: len(sourceInfo.RuleIDs) > 0 if the source exists in RulesBySource (see delete above).
107+
sourceInfo.Version++
108+
}
109+
}
110+
}
47111

48112
incrementalApplyPending(
49113
r,

internal/config/runtime.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"context"
55
"database/sql"
66
"errors"
7+
"fmt"
78
"github.com/icinga/icinga-go-library/database"
89
"github.com/icinga/icinga-go-library/logging"
10+
"github.com/icinga/icinga-go-library/notifications"
911
"github.com/icinga/icinga-go-library/types"
1012
"github.com/icinga/icinga-notifications/internal/channel"
1113
"github.com/icinga/icinga-notifications/internal/recipient"
@@ -61,9 +63,10 @@ type ConfigSet struct {
6163
Groups map[int64]*recipient.Group
6264
TimePeriods map[int64]*timeperiod.TimePeriod
6365
Schedules map[int64]*recipient.Schedule
64-
Rules map[int64]*rule.Rule
6566
Sources map[int64]*Source
6667

68+
RuleSet // RuleSet contains the currently loaded rules and their version.
69+
6770
// The following fields contain intermediate values, necessary for the incremental config synchronization.
6871
// Furthermore, they allow accessing intermediate tables as everything is referred by pointers.
6972
groupMembers map[recipient.GroupMemberKey]*recipient.GroupMember
@@ -162,6 +165,25 @@ func (r *RuntimeConfig) GetRuleEscalation(escalationID int64) *rule.Escalation {
162165
return nil
163166
}
164167

168+
// GetRulesVersionFor retrieves the version of the rules for a specific source.
169+
//
170+
// It returns the version as a hexadecimal string, which is a representation of the version number.
171+
// If the source does not have any rules associated with it, the version will be set to notifications.EmptyRulesVersion.
172+
//
173+
// May not be called while holding the write lock on the RuntimeConfig.
174+
func (r *RuntimeConfig) GetRulesVersionFor(source int64) string {
175+
r.RLock()
176+
defer r.RUnlock()
177+
178+
if r.RulesBySource != nil {
179+
if sourceInfo, ok := r.RulesBySource[source]; ok && sourceInfo.Version > 0 {
180+
return fmt.Sprintf("%x", sourceInfo.Version)
181+
}
182+
}
183+
184+
return notifications.EmptyRulesVersion
185+
}
186+
165187
// GetContact returns *recipient.Contact by the given username (case-insensitive).
166188
// Returns nil when the given username doesn't exist.
167189
func (r *RuntimeConfig) GetContact(username string) *recipient.Contact {

internal/listener/listener.go

Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ func NewListener(db *database.DB, runtimeConfig *config.RuntimeConfig, logs *log
4545

4646
l.mux.Handle("/debug/", http.StripPrefix("/debug", l.requireDebugAuth(debugMux)))
4747
l.mux.HandleFunc("/process-event", l.ProcessEvent)
48-
l.mux.HandleFunc("/event-rules", l.RulesForFilters)
4948
return l
5049
}
5150

@@ -86,28 +85,6 @@ func (l *Listener) Run(ctx context.Context) error {
8685
}
8786
}
8887

89-
// getRuleVersion returns the latest rule version.
90-
//
91-
// Technically, the rule version is an encoded string representation of the latest changed_at value from each rule.
92-
// Being an implementation detail, it might change over time. For the moment, a simple equality check is enough.
93-
func (l *Listener) getRuleVersion() string {
94-
l.runtimeConfig.RLock()
95-
defer l.runtimeConfig.RUnlock()
96-
97-
var latest time.Time
98-
for _, r := range l.runtimeConfig.Rules {
99-
if t := r.ChangedAt.Time(); t.After(latest) {
100-
latest = t
101-
}
102-
}
103-
104-
if latest.IsZero() {
105-
return "NA"
106-
}
107-
108-
return fmt.Sprintf("%x", latest.UnixNano())
109-
}
110-
11188
// sourceFromAuthOrAbort extracts a *config.Source from the HTTP Basic Auth. If the credentials are wrong, (nil, false) is
11289
// returned and 401 was written back to the response writer.
11390
func (l *Listener) sourceFromAuthOrAbort(w http.ResponseWriter, r *http.Request) (*config.Source, bool) {
@@ -154,12 +131,16 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) {
154131
ruleIdsStr := r.Header.Get("X-Rule-Ids")
155132
ruleVersion := r.Header.Get("X-Rule-Version")
156133

157-
if latestRuleVersion := l.getRuleVersion(); ruleVersion != latestRuleVersion {
158-
abort(http.StatusFailedDependency,
159-
nil,
160-
"X-Rule-Version %q does not match %q, refetch rules",
161-
ruleVersion,
162-
latestRuleVersion)
134+
// If the client uses an outdated rules version, reject the request but send also the current rules version
135+
// and rules for this source back to the client, so it can retry the request with the updated rules.
136+
if latestRuleVersion := l.runtimeConfig.GetRulesVersionFor(source.ID); ruleVersion != latestRuleVersion {
137+
w.WriteHeader(http.StatusFailedDependency)
138+
l.writeSourceRulesInfo(w, source)
139+
140+
l.logger.Debugw("Abort event processing due to outdated rules version",
141+
zap.String("current_version", latestRuleVersion),
142+
zap.String("provided_version", ruleVersion),
143+
zap.String("source", source.Name))
163144
return
164145
}
165146

@@ -207,15 +188,6 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, r *http.Request) {
207188
_, _ = fmt.Fprintln(w)
208189
}
209190

210-
func (l *Listener) RulesForFilters(w http.ResponseWriter, r *http.Request) {
211-
_, validAuth := l.sourceFromAuthOrAbort(w, r)
212-
if !validAuth {
213-
return
214-
}
215-
216-
l.DumpRules(w, r)
217-
}
218-
219191
// requireDebugAuth is a middleware that checks if the valid debug password was provided. If there is no password
220192
// configured or the supplied password is incorrect, it sends an error code and does not redirect the request.
221193
func (l *Listener) requireDebugAuth(next http.Handler) http.Handler {
@@ -327,17 +299,39 @@ func (l *Listener) DumpRules(w http.ResponseWriter, r *http.Request) {
327299
return
328300
}
329301

302+
l.runtimeConfig.RLock()
303+
rules := l.runtimeConfig.Rules
304+
l.runtimeConfig.RUnlock()
305+
306+
enc := json.NewEncoder(w)
307+
enc.SetIndent("", " ")
308+
_ = enc.Encode(rules)
309+
}
310+
311+
// writeSourceRulesInfo writes the rules information for a specific source to the response writer.
312+
//
313+
// It includes the rules version and a map of rule IDs to their corresponding rule objects.
314+
func (l *Listener) writeSourceRulesInfo(w http.ResponseWriter, source *config.Source) {
330315
type Response struct {
331316
Version string
332317
Rules map[int64]*rule.Rule
333318
}
334319

335320
var resp Response
336-
resp.Version = l.getRuleVersion()
321+
resp.Version = l.runtimeConfig.GetRulesVersionFor(source.ID)
337322

338-
l.runtimeConfig.RLock()
339-
resp.Rules = l.runtimeConfig.Rules
340-
l.runtimeConfig.RUnlock()
323+
func() { // Use a function to ensure that the RLock and RUnlock are called before writing the response.
324+
l.runtimeConfig.RLock()
325+
defer l.runtimeConfig.RUnlock()
326+
327+
sourceInfo := l.runtimeConfig.RulesBySource[source.ID]
328+
if sourceInfo != nil {
329+
resp.Rules = make(map[int64]*rule.Rule, len(sourceInfo.RuleIDs))
330+
for _, rID := range sourceInfo.RuleIDs {
331+
resp.Rules[rID] = l.runtimeConfig.Rules[rID]
332+
}
333+
}
334+
}()
341335

342336
enc := json.NewEncoder(w)
343337
enc.SetIndent("", " ")

internal/rule/rule.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type Rule struct {
1515
Name string `db:"name"`
1616
TimePeriod *timeperiod.TimePeriod `db:"-"`
1717
TimePeriodID types.Int `db:"timeperiod_id"`
18+
SourceID int64 `db:"source_id"`
1819
ObjectFilterExpr types.String `db:"object_filter"`
1920
Escalations map[int64]*Escalation `db:"-"`
2021
}
@@ -37,6 +38,7 @@ func (r *Rule) IncrementalInitAndValidate() error {
3738
func (r *Rule) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
3839
encoder.AddInt64("id", r.ID)
3940
encoder.AddString("name", r.Name)
41+
encoder.AddInt64("source_id", r.SourceID)
4042

4143
if r.TimePeriodID.Valid && r.TimePeriodID.Int64 != 0 {
4244
encoder.AddInt64("timeperiod_id", r.TimePeriodID.Int64)

schema/mysql/schema.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,13 +273,15 @@ CREATE TABLE rule (
273273
id bigint NOT NULL AUTO_INCREMENT,
274274
name text NOT NULL COLLATE utf8mb4_unicode_ci,
275275
timeperiod_id bigint,
276+
source_id bigint NOT NULL, -- the source this rule belongs to
276277
object_filter text,
277278

278279
changed_at bigint NOT NULL,
279280
deleted enum('n', 'y') NOT NULL DEFAULT 'n',
280281

281282
CONSTRAINT pk_rule PRIMARY KEY (id),
282-
CONSTRAINT fk_rule_timeperiod FOREIGN KEY (timeperiod_id) REFERENCES timeperiod(id)
283+
CONSTRAINT fk_rule_timeperiod FOREIGN KEY (timeperiod_id) REFERENCES timeperiod(id),
284+
CONSTRAINT fk_rule_source FOREIGN KEY (source_id) REFERENCES source(id)
283285
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
284286

285287
CREATE INDEX idx_rule_changed_at ON rule(changed_at);

schema/mysql/upgrades/001.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,10 @@ ALTER TABLE source
1111
Drop COLUMN icinga2_common_name,
1212
Drop COLUMN icinga2_insecure_tls,
1313
MODIFY COLUMN listener_password_hash text NOT NULL;
14+
15+
ALTER TABLE rule
16+
ADD COLUMN source_id bigint DEFAULT NULL AFTER timeperiod_id,
17+
ADD CONSTRAINT fk_rule_source FOREIGN KEY (source_id) REFERENCES source(id);
18+
19+
UPDATE rule SET source_id = (SELECT id FROM source WHERE type = 'icinga2');
20+
ALTER TABLE rule MODIFY COLUMN source_id bigint NOT NULL;

schema/pgsql/schema.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,13 +320,15 @@ CREATE TABLE rule (
320320
id bigserial,
321321
name citext NOT NULL,
322322
timeperiod_id bigint,
323+
source_id bigint NOT NULL, -- the source this rule belongs to
323324
object_filter text,
324325

325326
changed_at bigint NOT NULL,
326327
deleted boolenum NOT NULL DEFAULT 'n',
327328

328329
CONSTRAINT pk_rule PRIMARY KEY (id),
329-
CONSTRAINT fk_rule_timeperiod FOREIGN KEY (timeperiod_id) REFERENCES timeperiod(id)
330+
CONSTRAINT fk_rule_timeperiod FOREIGN KEY (timeperiod_id) REFERENCES timeperiod(id),
331+
CONSTRAINT fk_rule_source FOREIGN KEY (source_id) REFERENCES source(id)
330332
);
331333

332334
CREATE INDEX idx_rule_changed_at ON rule(changed_at);

schema/pgsql/upgrades/001.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,10 @@ ALTER TABLE source
1111
Drop COLUMN icinga2_common_name,
1212
Drop COLUMN icinga2_insecure_tls,
1313
ALTER COLUMN listener_password_hash SET NOT NULL;
14+
15+
ALTER TABLE rule
16+
ADD COLUMN source_id bigint DEFAULT NULL,
17+
ADD CONSTRAINT fk_rule_source FOREIGN KEY (source_id) REFERENCES source(id);
18+
19+
UPDATE rule SET source_id = (SELECT id FROM source WHERE type = 'icinga2');
20+
ALTER TABLE rule ALTER COLUMN source_id SET NOT NULL;

0 commit comments

Comments
 (0)