@@ -3,6 +3,7 @@ package stackitprovider
33import (
44 "context"
55 "fmt"
6+ "sync"
67
78 stackitdnsclient "github.com/stackitcloud/stackit-sdk-go/services/dns"
89 "go.uber.org/zap"
@@ -12,43 +13,99 @@ import (
1213
1314// ApplyChanges applies a given set of changes in a given zone.
1415func (d * StackitDNSProvider ) ApplyChanges (ctx context.Context , changes * plan.Changes ) error {
16+ var tasks []changeTask
1517 // create rr set. POST /v1/projects/{projectId}/zones/{zoneId}/rrsets
16- err := d .createRRSets (ctx , changes .Create )
17- if err != nil {
18- return err
19- }
20-
18+ tasks = append (tasks , d .buildRRSetTasks (changes .Create , CREATE )... )
2119 // update rr set. PATCH /v1/projects/{projectId}/zones/{zoneId}/rrsets/{rrSetId}
22- err = d .updateRRSets (ctx , changes .UpdateNew )
20+ tasks = append (tasks , d .buildRRSetTasks (changes .UpdateNew , UPDATE )... )
21+ d .logger .Info ("records to delete" , zap .String ("records" , fmt .Sprintf ("%v" , changes .Delete )))
22+ // delete rr set. DELETE /v1/projects/{projectId}/zones/{zoneId}/rrsets/{rrSetId}
23+ tasks = append (tasks , d .buildRRSetTasks (changes .Delete , DELETE )... )
24+
25+ zones , err := d .zoneFetcherClient .zones (ctx )
2326 if err != nil {
2427 return err
2528 }
2629
27- // delete rr set. DELETE /v1/projects/{projectId}/zones/{zoneId}/rrsets/{rrSetId}
28- err = d .deleteRRSets (ctx , changes .Delete )
29- if err != nil {
30- return err
30+ return d .handleRRSetWithWorkers (ctx , tasks , zones )
31+ }
32+
33+ // handleRRSetWithWorkers handles the given endpoints with workers to optimize speed.
34+ func (d * StackitDNSProvider ) buildRRSetTasks (
35+ endpoints []* endpoint.Endpoint ,
36+ action string ,
37+ ) []changeTask {
38+ tasks := make ([]changeTask , 0 , len (endpoints ))
39+
40+ for _ , change := range endpoints {
41+ tasks = append (tasks , changeTask {
42+ action : action ,
43+ change : change ,
44+ })
3145 }
3246
33- return nil
47+ return tasks
3448}
3549
36- // createRRSets creates new record sets in the stackitprovider for the given endpoints that are in the
37- // creation field.
38- func (d * StackitDNSProvider ) createRRSets (
50+ // handleRRSetWithWorkers handles the given endpoints with workers to optimize speed.
51+ func (d * StackitDNSProvider ) handleRRSetWithWorkers (
3952 ctx context.Context ,
40- endpoints []* endpoint.Endpoint ,
53+ tasks []changeTask ,
54+ zones []stackitdnsclient.Zone ,
4155) error {
42- if len (endpoints ) == 0 {
43- return nil
56+ workerChannel := make (chan changeTask , len (tasks ))
57+ errorChannel := make (chan error , len (tasks ))
58+
59+ var wg sync.WaitGroup
60+ for i := 0 ; i < d .workers ; i ++ {
61+ wg .Add (1 )
62+ go d .changeWorker (ctx , workerChannel , errorChannel , zones , & wg )
4463 }
4564
46- zones , err := d .zoneFetcherClient .zones (ctx )
47- if err != nil {
48- return err
65+ for _ , task := range tasks {
66+ workerChannel <- task
67+ }
68+ close (workerChannel )
69+
70+ // capture first error
71+ var err error
72+ for i := 0 ; i < len (tasks ); i ++ {
73+ err = <- errorChannel
74+ if err != nil {
75+ break
76+ }
77+ }
78+
79+ // wait until all workers have finished
80+ wg .Wait ()
81+
82+ return err
83+ }
84+
85+ // changeWorker is a worker that handles changes passed by a channel.
86+ func (d * StackitDNSProvider ) changeWorker (
87+ ctx context.Context ,
88+ changes chan changeTask ,
89+ errorChannel chan error ,
90+ zones []stackitdnsclient.Zone ,
91+ wg * sync.WaitGroup ,
92+ ) {
93+ defer wg .Done ()
94+
95+ for change := range changes {
96+ var err error
97+ switch change .action {
98+ case CREATE :
99+ err = d .createRRSet (ctx , change .change , zones )
100+ case UPDATE :
101+ err = d .updateRRSet (ctx , change .change , zones )
102+ case DELETE :
103+ err = d .deleteRRSet (ctx , change .change , zones )
104+ }
105+ errorChannel <- err
49106 }
50107
51- return d . handleRRSetWithWorkers ( ctx , endpoints , zones , CREATE )
108+ d . logger . Debug ( "change worker finished" )
52109}
53110
54111// createRRSet creates a new record set in the stackitprovider for the given endpoint.
@@ -88,24 +145,6 @@ func (d *StackitDNSProvider) createRRSet(
88145 return nil
89146}
90147
91- // updateRRSets patches (overrides) contents in the record sets in the stackitprovider for the given
92- // endpoints that are in the update new field.
93- func (d * StackitDNSProvider ) updateRRSets (
94- ctx context.Context ,
95- endpoints []* endpoint.Endpoint ,
96- ) error {
97- if len (endpoints ) == 0 {
98- return nil
99- }
100-
101- zones , err := d .zoneFetcherClient .zones (ctx )
102- if err != nil {
103- return err
104- }
105-
106- return d .handleRRSetWithWorkers (ctx , endpoints , zones , UPDATE )
107- }
108-
109148// updateRRSet patches (overrides) contents in the record set in the stackitprovider.
110149func (d * StackitDNSProvider ) updateRRSet (
111150 ctx context.Context ,
@@ -142,28 +181,6 @@ func (d *StackitDNSProvider) updateRRSet(
142181 return nil
143182}
144183
145- // deleteRRSets deletes record sets in the stackitprovider for the given endpoints that are in the
146- // deletion field.
147- func (d * StackitDNSProvider ) deleteRRSets (
148- ctx context.Context ,
149- endpoints []* endpoint.Endpoint ,
150- ) error {
151- if len (endpoints ) == 0 {
152- d .logger .Debug ("no endpoints to delete" )
153-
154- return nil
155- }
156-
157- d .logger .Info ("records to delete" , zap .String ("records" , fmt .Sprintf ("%v" , endpoints )))
158-
159- zones , err := d .zoneFetcherClient .zones (ctx )
160- if err != nil {
161- return err
162- }
163-
164- return d .handleRRSetWithWorkers (ctx , endpoints , zones , DELETE )
165- }
166-
167184// deleteRRSet deletes a record set in the stackitprovider for the given endpoint.
168185func (d * StackitDNSProvider ) deleteRRSet (
169186 ctx context.Context ,
@@ -197,62 +214,3 @@ func (d *StackitDNSProvider) deleteRRSet(
197214
198215 return nil
199216}
200-
201- // handleRRSetWithWorkers handles the given endpoints with workers to optimize speed.
202- func (d * StackitDNSProvider ) handleRRSetWithWorkers (
203- ctx context.Context ,
204- endpoints []* endpoint.Endpoint ,
205- zones []stackitdnsclient.Zone ,
206- action string ,
207- ) error {
208- workerChannel := make (chan changeTask , len (endpoints ))
209- errorChannel := make (chan error , len (endpoints ))
210-
211- for i := 0 ; i < d .workers ; i ++ {
212- go d .changeWorker (ctx , workerChannel , errorChannel , zones )
213- }
214-
215- for _ , change := range endpoints {
216- workerChannel <- changeTask {
217- action : action ,
218- change : change ,
219- }
220- }
221-
222- for i := 0 ; i < len (endpoints ); i ++ {
223- err := <- errorChannel
224- if err != nil {
225- close (workerChannel )
226-
227- return err
228- }
229- }
230-
231- close (workerChannel )
232-
233- return nil
234- }
235-
236- // changeWorker is a worker that handles changes passed by a channel.
237- func (d * StackitDNSProvider ) changeWorker (
238- ctx context.Context ,
239- changes chan changeTask ,
240- errorChannel chan error ,
241- zones []stackitdnsclient.Zone ,
242- ) {
243- for change := range changes {
244- switch change .action {
245- case CREATE :
246- err := d .createRRSet (ctx , change .change , zones )
247- errorChannel <- err
248- case UPDATE :
249- err := d .updateRRSet (ctx , change .change , zones )
250- errorChannel <- err
251- case DELETE :
252- err := d .deleteRRSet (ctx , change .change , zones )
253- errorChannel <- err
254- }
255- }
256-
257- d .logger .Debug ("change worker finished" )
258- }
0 commit comments