Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions utils/ordererconn/conn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ type (
// This will increase the config version, allowing a connection instance to identify
// a connection update and fetch the new connections.
ConnectionManager struct {
configVersion atomic.Uint64
connections map[string]*grpc.ClientConn
config *ConnectionConfig
lock sync.Mutex
configVersion atomic.Uint64
connections map[string]*grpc.ClientConn
uniqueConnections []*grpc.ClientConn
config *ConnectionConfig
lock sync.Mutex
}

// ConnFilter is used to filter connections.
Expand Down Expand Up @@ -127,7 +128,7 @@ func (c *ConnectionManager) Update(config *ConnectionConfig) error {
var err error
conn, err = openConnection(config, endpoints)
if err != nil {
closeConnection(connections)
connection.CloseConnectionsLog(slices.Collect(maps.Values(connCache))...)
return err
}
connCache[endpointsKey] = conn
Expand All @@ -143,8 +144,9 @@ func (c *ConnectionManager) Update(config *ConnectionConfig) error {
// We increase the version early (before closing any connections, but after locking)
// to ensure the recovery stage knows about an update.
c.configVersion.Add(1)
closeConnection(c.connections)
connection.CloseConnectionsLog(c.uniqueConnections...)
c.connections = connections
c.uniqueConnections = slices.Collect(maps.Values(connCache))
c.config = config
return nil
}
Expand Down Expand Up @@ -218,8 +220,9 @@ func makeEndpointsKey(endpoint []*connection.Endpoint) string {
func (c *ConnectionManager) CloseConnections() {
c.lock.Lock()
defer c.lock.Unlock()
closeConnection(c.connections)
connection.CloseConnectionsLog(c.uniqueConnections...)
c.connections = nil
c.uniqueConnections = nil
}

// IsStale checks if the given OrdererConnectionResiliencyManager is stale.
Expand All @@ -228,12 +231,6 @@ func (c *ConnectionManager) IsStale(configVersion uint64) bool {
return c.configVersion.Load() != configVersion
}

func closeConnection(connections map[string]*grpc.ClientConn) {
if connections != nil {
connection.CloseConnectionsLog(slices.Collect(maps.Values(connections))...)
}
}

func shuffle[T any](nodes []T) {
rand.Shuffle(len(nodes), func(i, j int) { nodes[i], nodes[j] = nodes[j], nodes[i] })
}
Loading