Skip to content

Commit 8fa29bf

Browse files
author
Edward Muller
committed
Handle waiting for a redis instance to be available
1 parent 6acb986 commit 8fa29bf

File tree

6 files changed

+167
-48
lines changed

6 files changed

+167
-48
lines changed

Procfile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
web: go-websocket-chat-demo
2-
release: bin/addon-wait
1+
web: go-websocket-chat-demo

app.json

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,7 @@
1010
"mount_dir": "src/github.com/heroku-examples/go-websocket-chat-demo",
1111
"website": "http://github.com/heroku-examples/go-websocket-chat-demo",
1212
"repository": "http://github.com/heroku-examples/go-websocket-chat-demol",
13-
"scripts": {},
14-
"buildpacks":[
15-
{ "url" : "https://github.com/heroku/heroku-buildpack-addon-wait.git" },
16-
{ "url" : "heroku/go" }
17-
],
1813
"addons": [
1914
"heroku-redis"
2015
]
21-
}
16+
}

main.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,33 +3,62 @@ package main
33
import (
44
"net/http"
55
"os"
6+
"time"
67

78
"github.com/Sirupsen/logrus"
89
"github.com/heroku/gobits/redis"
910
)
1011

1112
var (
12-
log = logrus.WithField("cmd", "go-websocket-chat-demo")
13-
rr redisReceiver
14-
rw redisWriter
13+
waitTimeout = time.Minute * 10
14+
log = logrus.WithField("cmd", "go-websocket-chat-demo")
15+
rr redisReceiver
16+
rw redisWriter
1517
)
1618

1719
func main() {
20+
port := os.Getenv("PORT")
21+
if port == "" {
22+
log.WithField("PORT", port).Fatal("$PORT must be set")
23+
}
24+
1825
redisURL := os.Getenv("REDIS_URL")
1926
redisPool, err := redis.NewRedisPoolFromURL(redisURL)
2027
if err != nil {
2128
log.WithField("url", redisURL).Fatal("Unable to create Redis pool")
2229
}
2330

2431
rr = newRedisReceiver(redisPool)
25-
go rr.run()
2632
rw = newRedisWriter(redisPool)
27-
go rw.run()
2833

29-
port := os.Getenv("PORT")
30-
if port == "" {
31-
log.WithField("PORT", port).Fatal("$PORT must be set")
32-
}
34+
go func() {
35+
for {
36+
waited, err := redis.WaitForAvailability(redisURL, waitTimeout, rr.wait)
37+
if !waited || err != nil {
38+
log.WithFields(logrus.Fields{"waitTimeout": waitTimeout, "err": err}).Fatal("Redis not available by timeout!")
39+
}
40+
rr.broadcast(availableMessage)
41+
err = rr.run()
42+
if err == nil {
43+
break
44+
}
45+
log.Error(err)
46+
}
47+
}()
48+
49+
go func() {
50+
for {
51+
waited, err := redis.WaitForAvailability(redisURL, waitTimeout, nil)
52+
if !waited || err != nil {
53+
log.WithFields(logrus.Fields{"waitTimeout": waitTimeout, "err": err}).Fatal("Redis not available by timeout!")
54+
}
55+
err = rw.run()
56+
if err == nil {
57+
break
58+
}
59+
log.Error(err)
60+
}
61+
}()
3362

3463
http.Handle("/", http.FileServer(http.Dir("./public")))
3564
http.HandleFunc("/ws", handleWebsocket)

redis.go

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package main
22

33
import (
4+
"encoding/json"
45
"sync"
6+
"time"
57

68
"github.com/Sirupsen/logrus"
79
"github.com/garyburd/redigo/redis"
810
"github.com/gorilla/websocket"
11+
"github.com/pkg/errors"
912
"github.com/satori/go.uuid"
1013
)
1114

@@ -14,12 +17,36 @@ const (
1417
Channel = "chat"
1518
)
1619

20+
var (
21+
waitingMessage, availableMessage []byte
22+
waitSleep = time.Second * 10
23+
)
24+
25+
func init() {
26+
var err error
27+
waitingMessage, err = json.Marshal(message{
28+
Handle: "system",
29+
Text: "Waiting for redis to be available. Messaging won't work until redis is available",
30+
})
31+
if err != nil {
32+
panic(err)
33+
}
34+
availableMessage, err = json.Marshal(message{
35+
Handle: "system",
36+
Text: "Redis is now available & messaging is now possible",
37+
})
38+
if err != nil {
39+
panic(err)
40+
}
41+
}
42+
1743
// redisReceiver receives messages from Redis and broadcasts them to all
1844
// registered websocket connections that are Registered.
1945
type redisReceiver struct {
20-
pool *redis.Pool
21-
sync.Mutex // Protects the conns map
22-
conns map[string]*websocket.Conn
46+
pool *redis.Pool
47+
48+
mu sync.Mutex
49+
conns map[string]*websocket.Conn
2350
}
2451

2552
// newRedisReceiver creates a redisReceiver that will use the provided
@@ -31,9 +58,15 @@ func newRedisReceiver(pool *redis.Pool) redisReceiver {
3158
}
3259
}
3360

61+
func (rr *redisReceiver) wait(_ time.Time) error {
62+
rr.broadcast(waitingMessage)
63+
time.Sleep(waitSleep)
64+
return nil
65+
}
66+
3467
// run receives pubsub messages from Redis after establishing a connection.
3568
// When a valid message is received it is broadcast to all connected websockets
36-
func (rr *redisReceiver) run() {
69+
func (rr *redisReceiver) run() error {
3770
l := log.WithField("channel", Channel)
3871
conn := rr.pool.Get()
3972
defer conn.Close()
@@ -54,7 +87,7 @@ func (rr *redisReceiver) run() {
5487
"count": v.Count,
5588
}).Println("Redis Subscription Received")
5689
case error:
57-
l.WithField("err", v).Error("Error while subscribed to Redis channel")
90+
return errors.Wrap(v, "Error while subscribed to Redis channel")
5891
default:
5992
l.WithField("v", v).Info("Unknown Redis receive during subscription")
6093
}
@@ -65,8 +98,8 @@ func (rr *redisReceiver) run() {
6598
// If an error occurs while writting a message to a websocket connection it is
6699
// closed and deregistered.
67100
func (rr *redisReceiver) broadcast(data []byte) {
68-
rr.Mutex.Lock()
69-
defer rr.Mutex.Unlock()
101+
rr.mu.Lock()
102+
defer rr.mu.Unlock()
70103
for id, conn := range rr.conns {
71104
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
72105
log.WithFields(logrus.Fields{
@@ -84,17 +117,17 @@ func (rr *redisReceiver) broadcast(data []byte) {
84117
// identifier for the connection. This identifier can be used to deregister the
85118
// connection later
86119
func (rr *redisReceiver) register(conn *websocket.Conn) string {
87-
rr.Mutex.Lock()
88-
defer rr.Mutex.Unlock()
120+
rr.mu.Lock()
121+
defer rr.mu.Unlock()
89122
id := uuid.NewV4().String()
90123
rr.conns[id] = conn
91124
return id
92125
}
93126

94127
// deRegister the connection by closing it and removing it from our list.
95128
func (rr *redisReceiver) deRegister(id string) {
96-
rr.Mutex.Lock()
97-
defer rr.Mutex.Unlock()
129+
rr.mu.Lock()
130+
defer rr.mu.Unlock()
98131
conn, ok := rr.conns[id]
99132
if ok {
100133
conn.Close()
@@ -111,24 +144,32 @@ type redisWriter struct {
111144
func newRedisWriter(pool *redis.Pool) redisWriter {
112145
return redisWriter{
113146
pool: pool,
114-
messages: make(chan []byte),
147+
messages: make(chan []byte, 10000),
115148
}
116149
}
117150

118151
// run the main redisWriter loop that publishes incoming messages to Redis.
119-
func (rw *redisWriter) run() {
152+
func (rw *redisWriter) run() error {
120153
conn := rw.pool.Get()
121154
defer conn.Close()
122155

123156
for data := range rw.messages {
124-
l := log.WithField("data", data)
125-
if err := conn.Send("PUBLISH", Channel, data); err != nil {
126-
l.WithField("err", err).Fatalf("Unable to publish message to Redis")
127-
}
128-
if err := conn.Flush(); err != nil {
129-
l.WithField("err", err).Fatalf("Unable to flush published message to Redis")
157+
if err := writeToRedis(conn, data); err != nil {
158+
rw.publish(data) // attempt to redeliver later
159+
return err
130160
}
131161
}
162+
return nil
163+
}
164+
165+
func writeToRedis(conn redis.Conn, data []byte) error {
166+
if err := conn.Send("PUBLISH", Channel, data); err != nil {
167+
return errors.Wrap(err, "Unable to publish message to Redis")
168+
}
169+
if err := conn.Flush(); err != nil {
170+
return errors.Wrap(err, "Unable to flush published message to Redis")
171+
}
172+
return nil
132173
}
133174

134175
// publish to Redis via channel.

vendor/github.com/heroku/gobits/redis/redis.go

Lines changed: 64 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/vendor.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,10 @@
3232
"revisionTime": "2016-08-17T22:48:30Z"
3333
},
3434
{
35-
"checksumSHA1": "KwTwEV5aWQbYFnbdsPWjV++9Y5c=",
35+
"checksumSHA1": "G2M6++gheNWcKWclENJPEF7mtsI=",
3636
"path": "github.com/heroku/gobits/redis",
37-
"revision": "c55d8aab15a05920fbc6b9ef474122e2c0acee90",
38-
"revisionTime": "2016-08-17T22:48:30Z"
37+
"revision": "7ec1b023d6ef07ea214fa234ca9952ae01b9135a",
38+
"revisionTime": "2016-08-18T00:10:40Z"
3939
},
4040
{
4141
"checksumSHA1": "QoVjlQFru1ixgV8vh63T4/JAtLI=",

0 commit comments

Comments
 (0)