File tree Expand file tree Collapse file tree 1 file changed +8
-4
lines changed Expand file tree Collapse file tree 1 file changed +8
-4
lines changed Original file line number Diff line number Diff line change 77	"math/rand" 
88	"sort" 
99	"sync" 
10- 	"sync/atomic" 
1110)
1211
1312// The Balancer interface provides an abstraction of the message distribution 
@@ -42,8 +41,10 @@ func (f BalancerFunc) Balance(msg Message, partitions ...int) int {
4241type  RoundRobin  struct  {
4342	ChunkSize  int 
4443	// Use a 32 bits integer so RoundRobin values don't need to be aligned to 
45- 	// apply atomic  increments. 
44+ 	// apply increments. 
4645	counter  uint32 
46+ 
47+ 	mutex  sync.Mutex 
4748}
4849
4950// Balance satisfies the Balancer interface. 
@@ -52,14 +53,17 @@ func (rr *RoundRobin) Balance(msg Message, partitions ...int) int {
5253}
5354
5455func  (rr  * RoundRobin ) balance (partitions  []int ) int  {
56+ 	rr .mutex .Lock ()
57+ 	defer  rr .mutex .Unlock ()
58+ 
5559	if  rr .ChunkSize  <  1  {
5660		rr .ChunkSize  =  1 
5761	}
5862
5963	length  :=  len (partitions )
60- 	counterNow  :=  atomic . LoadUint32 ( & rr .counter ) 
64+ 	counterNow  :=  rr .counter 
6165	offset  :=  int (counterNow  /  uint32 (rr .ChunkSize ))
62- 	atomic . AddUint32 ( & rr .counter ,  1 ) 
66+ 	rr .counter ++ 
6367	return  partitions [offset % length ]
6468}
6569
 
 
   
 
     
   
   
          
    
    
     
    
      
     
     
    You can’t perform that action at this time.
  
 
    
  
    
      
        
     
       
      
     
   
 
    
    
  
 
  
 
     
    
0 commit comments