Skip to content
This repository was archived by the owner on Jan 28, 2021. It is now read-only.

Commit 4e1dc14

Browse files
authored
Merge pull request #829 from erizocosmico/fix/race-exchange
plan: fix race conditions in Exchange node
2 parents 1836395 + a381efe commit 4e1dc14

File tree

1 file changed

+36
-20
lines changed

1 file changed

+36
-20
lines changed

sql/plan/exchange.go

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,14 @@ type exchangeRowIter struct {
7575
parallelism int
7676
partitions sql.PartitionIter
7777
tree sql.Node
78-
mut sync.Mutex
79-
tokens chan struct{}
78+
mut sync.RWMutex
79+
tokensChan chan struct{}
8080
started bool
8181
rows chan sql.Row
8282
err chan error
83-
quit chan struct{}
83+
84+
quitMut sync.RWMutex
85+
quitChan chan struct{}
8486
}
8587

8688
func newExchangeRowIter(
@@ -97,34 +99,40 @@ func newExchangeRowIter(
9799
started: false,
98100
tree: tree,
99101
partitions: iter,
100-
quit: make(chan struct{}),
102+
quitChan: make(chan struct{}),
101103
}
102104
}
103105

104106
func (it *exchangeRowIter) releaseToken() {
105107
it.mut.Lock()
106108
defer it.mut.Unlock()
107109

108-
if it.tokens != nil {
109-
it.tokens <- struct{}{}
110+
if it.tokensChan != nil {
111+
it.tokensChan <- struct{}{}
110112
}
111113
}
112114

113115
func (it *exchangeRowIter) closeTokens() {
114116
it.mut.Lock()
115117
defer it.mut.Unlock()
116118

117-
close(it.tokens)
118-
it.tokens = nil
119+
close(it.tokensChan)
120+
it.tokensChan = nil
121+
}
122+
123+
func (it *exchangeRowIter) tokens() chan struct{} {
124+
it.mut.RLock()
125+
defer it.mut.RUnlock()
126+
return it.tokensChan
119127
}
120128

121129
func (it *exchangeRowIter) fillTokens() {
122130
it.mut.Lock()
123131
defer it.mut.Unlock()
124132

125-
it.tokens = make(chan struct{}, it.parallelism)
133+
it.tokensChan = make(chan struct{}, it.parallelism)
126134
for i := 0; i < it.parallelism; i++ {
127-
it.tokens <- struct{}{}
135+
it.tokensChan <- struct{}{}
128136
}
129137
}
130138

@@ -142,7 +150,7 @@ func (it *exchangeRowIter) start() {
142150
it.err <- context.Canceled
143151
it.closeTokens()
144152
return
145-
case <-it.quit:
153+
case <-it.quit():
146154
it.closeTokens()
147155
return
148156
case p, ok := <-partitions:
@@ -179,9 +187,9 @@ func (it *exchangeRowIter) iterPartitions(ch chan<- sql.Partition) {
179187
case <-it.ctx.Done():
180188
it.err <- context.Canceled
181189
return
182-
case <-it.quit:
190+
case <-it.quit():
183191
return
184-
case <-it.tokens:
192+
case <-it.tokens():
185193
}
186194

187195
p, err := it.partitions.Next()
@@ -226,7 +234,7 @@ func (it *exchangeRowIter) iterPartition(p sql.Partition) {
226234
case <-it.ctx.Done():
227235
it.err <- context.Canceled
228236
return
229-
case <-it.quit:
237+
case <-it.quit():
230238
return
231239
default:
232240
}
@@ -263,17 +271,25 @@ func (it *exchangeRowIter) Next() (sql.Row, error) {
263271
}
264272
}
265273

266-
func (it *exchangeRowIter) Close() (err error) {
267-
if it.quit != nil {
268-
close(it.quit)
269-
it.quit = nil
274+
func (it *exchangeRowIter) quit() chan struct{} {
275+
it.quitMut.RLock()
276+
defer it.quitMut.RUnlock()
277+
return it.quitChan
278+
}
279+
280+
func (it *exchangeRowIter) Close() error {
281+
it.quitMut.Lock()
282+
if it.quitChan != nil {
283+
close(it.quitChan)
284+
it.quitChan = nil
270285
}
286+
it.quitMut.Unlock()
271287

272288
if it.partitions != nil {
273-
err = it.partitions.Close()
289+
return it.partitions.Close()
274290
}
275291

276-
return err
292+
return nil
277293
}
278294

279295
type exchangePartition struct {

0 commit comments

Comments
 (0)