Skip to content

Commit d8a0468

Browse files
authored
Feat: Go Substrate Client - Connection Failover Implementation (#1022)
* feat: Maintains single connection with multiple backup URLs * fix: remove panic in Transfer * test: adding test suite covers failover senarios * chore: remove old workflows * Update CI and test image * fix: guard Substarte connection state in GetClient() and Close() * test: Added a sync.WaitGroup to properly track when goroutines complete in TestFailoverMechanism
1 parent 5e76664 commit d8a0468

File tree

9 files changed

+194
-83
lines changed

9 files changed

+194
-83
lines changed

.github/workflows/010_build_and_test.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ jobs:
88
build-and-test:
99
runs-on: [self-hosted, tfchainrunner01]
1010
container:
11-
image: threefolddev/tfchain:4
11+
image: threefolddev/tfchain:5
1212
env:
1313
DEBIAN_FRONTEND: noninteractive
1414
PATH: /root/.cargo/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/go/bin

.github/workflows/020_lint_and_test_go_client.yaml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,22 @@ jobs:
2121
- name: Set up Go
2222
uses: actions/setup-go@v5
2323
with:
24-
go-version: "1.20"
24+
go-version: "1.21"
2525
cache: false
2626
# cache-dependency-path: clients/tfchain-client-go/go.sum
2727
id: go
2828

2929
- name: golangci-lint
30-
uses: golangci/golangci-lint-action@v3.7.0
30+
uses: golangci/golangci-lint-action@v6
3131
with:
3232
args: --timeout 3m --verbose
3333
working-directory: clients/tfchain-client-go
3434

3535
- name: staticcheck
36-
uses: dominikh/staticcheck-action@v1.3.0
36+
uses: dominikh/staticcheck-action@v1
3737
with:
38-
version: "2022.1.3"
38+
version: "latest"
39+
install-go: false
3940
working-directory: clients/tfchain-client-go
4041
env:
4142
GO111MODULE: on
@@ -44,4 +45,4 @@ jobs:
4445
uses: Jerome1337/gofmt-action@v1.0.5
4546
with:
4647
gofmt-path: './clients/tfchain-client-go'
47-
gofmt-flags: "-l -d"
48+
gofmt-flags: "-l -d"

.github/workflows/build_test.Dockerfile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM ubuntu:20.04
1+
FROM ubuntu:22.04
22
ENV DEBIAN_FRONTEND=noninteractive
33
RUN apt update && \
44
apt install -y \
@@ -16,8 +16,8 @@ RUN apt update && \
1616
zstd \
1717
wget \
1818
protobuf-compiler && \
19-
wget https://go.dev/dl/go1.20.2.linux-amd64.tar.gz && \
20-
tar -xvf go1.20.2.linux-amd64.tar.gz && \
19+
wget https://go.dev/dl/go1.21.13.linux-amd64.tar.gz && \
20+
tar -xvf go1.21.13.linux-amd64.tar.gz && \
2121
mv go /usr/local && \
2222
echo "GOPATH=/usr/local/go" >> ~/.bashrc && \
2323
echo "PATH=\$PATH:\$GOPATH/bin" >> ~/.bashrc && \

clients/tfchain-client-go/.github/workflows/lint.yml

Lines changed: 0 additions & 35 deletions
This file was deleted.

clients/tfchain-client-go/.github/workflows/test.yml

Lines changed: 0 additions & 24 deletions
This file was deleted.

clients/tfchain-client-go/impl.go

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ var (
2727
ErrUnknownVersion = fmt.Errorf("unknown version")
2828
//ErrNotFound is returned if an object is not found
2929
ErrNotFound = fmt.Errorf("object not found")
30+
//ErrClosed is returned if the client is closed
31+
ErrClosed = fmt.Errorf("client closed")
3032
)
3133

3234
// Versioned base for all types
@@ -87,7 +89,7 @@ func (p *mgrImpl) Substrate() (*Substrate, error) {
8789
return nil, err
8890
}
8991

90-
return newSubstrate(cl, meta, p.put)
92+
return newSubstrate(cl, meta, p.put, p.connect)
9193
}
9294

9395
// Raw returns a RPC substrate client. plus meta. The returned connection
@@ -105,7 +107,7 @@ func (p *mgrImpl) Raw() (Conn, Meta, error) {
105107

106108
boff := backoff.WithMaxRetries(
107109
backoff.NewConstantBackOff(200*time.Millisecond),
108-
2*uint64(len(p.urls)),
110+
4*uint64(len(p.urls)),
109111
)
110112

111113
var (
@@ -145,36 +147,80 @@ func (p *mgrImpl) Raw() (Conn, Meta, error) {
145147
return cl, meta, err
146148
}
147149

150+
// connect connects to the next endpoint in roundrobin fashion
151+
// and replaces the current connection with the new one.
152+
// need to be called while lock is acquired.
153+
func (p *mgrImpl) connect(s *Substrate) error {
154+
cl, meta, err := p.Raw()
155+
if err != nil {
156+
return err
157+
}
158+
// close the old connection if it exists
159+
if s.cl != nil {
160+
s.cl.Client.Close()
161+
log.Info().Str("url", s.cl.Client.URL()).Msg("unhealthy connection closed")
162+
}
163+
// set the new connection
164+
s.cl = cl
165+
s.meta = meta
166+
log.Info().Str("url", s.cl.Client.URL()).Msg("connection restored")
167+
return nil
168+
}
169+
148170
// TODO: implement reusable connections instead of
149171
// closing the connection.
150-
func (p *mgrImpl) put(cl *Substrate) {
172+
func (p *mgrImpl) put(s *Substrate) {
151173
// naive put implementation for now
152174
// we just immediately kill the connection
153-
if cl.cl != nil {
154-
cl.cl.Client.Close()
175+
if s.cl != nil {
176+
s.cl.Client.Close()
155177
}
156-
cl.cl = nil
157-
cl.meta = nil
178+
s.cl = nil
179+
s.meta = nil
158180
}
159181

160182
// Substrate client
161183
type Substrate struct {
162-
cl Conn
163-
meta Meta
184+
mu sync.Mutex
185+
cl Conn
186+
meta Meta
187+
closed bool
164188

165-
close func(s *Substrate)
189+
close func(s *Substrate)
190+
connect func(s *Substrate) error
166191
}
167192

168193
// NewSubstrate creates a substrate client
169-
func newSubstrate(cl Conn, meta Meta, close func(*Substrate)) (*Substrate, error) {
170-
return &Substrate{cl: cl, meta: meta, close: close}, nil
194+
func newSubstrate(cl Conn, meta Meta, close func(*Substrate), connect func(s *Substrate) error) (*Substrate, error) {
195+
return &Substrate{cl: cl, meta: meta, close: close, connect: connect}, nil
171196
}
172197

173198
func (s *Substrate) Close() {
199+
s.mu.Lock()
200+
defer s.mu.Unlock()
201+
if s.closed {
202+
return
203+
}
174204
s.close(s)
205+
s.closed = true
175206
}
176207

177208
func (s *Substrate) GetClient() (Conn, Meta, error) {
209+
s.mu.Lock()
210+
defer s.mu.Unlock()
211+
212+
if s.closed {
213+
return nil, nil, ErrClosed
214+
}
215+
216+
// check if connection is healthy
217+
if _, err := getTime(s.cl, s.meta); err != nil {
218+
log.Info().Str("url", s.cl.Client.URL()).Msg("connection unhealthy, attempting failover")
219+
err := s.connect(s)
220+
if err != nil {
221+
return nil, nil, err // all attempts failed, no connection available
222+
}
223+
}
178224
return s.cl, s.meta, nil
179225
}
180226

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package substrate
2+
3+
import (
4+
"sync"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestFailoverMechanism(t *testing.T) {
12+
t.Run("should failover to next URL when current node is unhealthy", func(t *testing.T) {
13+
// Create manager with multiple URLs
14+
urls := []string{"ws://fail1", getUrlBasedOnEnv()}
15+
mgr := NewManager(urls...)
16+
17+
// Get initial substrate client
18+
sub, err := mgr.Substrate()
19+
require.NoError(t, err)
20+
defer sub.Close()
21+
22+
// Store initial Client
23+
initialClient := sub.cl.Client
24+
25+
// Force connection to become unhealthy by closing it
26+
sub.cl.Client.Close()
27+
28+
// Try to use the connection - should trigger failover
29+
_, err = sub.Time()
30+
require.NoError(t, err)
31+
32+
// Check that we're now using a different URL
33+
newClient := sub.cl.Client
34+
assert.NotEqual(t, initialClient, newClient)
35+
})
36+
37+
t.Run("should try all URLs in rotation", func(t *testing.T) {
38+
urls := []string{
39+
"ws://fail1",
40+
"ws://fail2",
41+
getUrlBasedOnEnv(),
42+
}
43+
44+
mgr := NewManager(urls...)
45+
sub, err := mgr.Substrate()
46+
require.NoError(t, err)
47+
defer sub.Close()
48+
49+
// The final URL should be the working one
50+
assert.Equal(t, getUrlBasedOnEnv(), sub.cl.Client.URL())
51+
})
52+
53+
t.Run("should reuse connection if healthy", func(t *testing.T) {
54+
sub := startLocalConnection(t)
55+
defer sub.Close()
56+
57+
initialClient := sub.cl.Client
58+
59+
// Use the connection multiple times
60+
for i := 0; i < 3; i++ {
61+
_, err := sub.Time()
62+
require.NoError(t, err)
63+
assert.Equal(t, initialClient, sub.cl.Client)
64+
}
65+
})
66+
67+
t.Run("should handle all nodes being down", func(t *testing.T) {
68+
urls := []string{"ws://fail1", "ws://fail2"}
69+
mgr := NewManager(urls...)
70+
_, err := mgr.Substrate()
71+
assert.Error(t, err)
72+
})
73+
74+
t.Run("should handle concurrent failover attempts", func(t *testing.T) {
75+
urls := []string{getUrlBasedOnEnv(), getUrlBasedOnEnv()}
76+
mgr := NewManager(urls...)
77+
sub1, err := mgr.Substrate()
78+
require.NoError(t, err)
79+
defer sub1.Close()
80+
81+
sub2, err := mgr.Substrate()
82+
require.NoError(t, err)
83+
defer sub2.Close()
84+
85+
// Force both connections to fail
86+
sub1.cl.Client.Close()
87+
sub2.cl.Client.Close()
88+
89+
// Create WaitGroup to ensure all goroutines complete before test ends
90+
var wg sync.WaitGroup
91+
wg.Add(2)
92+
93+
// Try to use both connections concurrently
94+
errs := make(chan error, 2)
95+
go func() {
96+
defer wg.Done()
97+
_, err := sub1.Time()
98+
errs <- err
99+
}()
100+
101+
go func() {
102+
defer wg.Done()
103+
_, err := sub2.Time()
104+
errs <- err
105+
}()
106+
107+
// Wait for both operations to complete
108+
go func() {
109+
wg.Wait()
110+
close(errs)
111+
}()
112+
113+
// Check errors from both goroutines
114+
for err := range errs {
115+
require.NoError(t, err)
116+
}
117+
})
118+
}

clients/tfchain-client-go/transfer.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ func (s *Substrate) Transfer(identity Identity, amount uint64, destination Accou
2020
bal := big.NewInt(int64(amount))
2121

2222
c, err := types.NewCall(meta, "Balances.transfer", dest, types.NewUCompact(bal))
23-
if err != nil {
24-
panic(err)
25-
}
2623

2724
if err != nil {
2825
return errors.Wrap(err, "failed to create call")

clients/tfchain-client-go/utils_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ func startLocalConnection(t *testing.T) *Substrate {
6363
return cl
6464
}
6565

66+
func getUrlBasedOnEnv() string {
67+
if _, ok := os.LookupEnv("CI"); ok {
68+
return "ws://127.0.0.1:9944"
69+
} else {
70+
return "wss://tfchain.dev.grid.tf"
71+
}
72+
}
73+
6674
func assertCreateTwin(t *testing.T, cl *Substrate, user AccountUser) uint32 {
6775
u := Accounts[user]
6876

0 commit comments

Comments
 (0)