Skip to content

Commit e11bfbd

Browse files
committed
Fix #68: Track websocket activity for container idle check
1 parent bbddae6 commit e11bfbd

File tree

7 files changed

+632
-6
lines changed

7 files changed

+632
-6
lines changed

internal/app/container_manager.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ type ContainerManager struct {
6868
stripAppPath bool
6969
mountArgs []string
7070
cargs map[string]string
71+
proxyTracker *Tracker // Track bytes sent and received by the proxy
7172
}
7273

7374
func NewContainerManager(logger *types.Logger, app *App, containerFile string,
@@ -173,7 +174,8 @@ func NewContainerManager(logger *types.Logger, app *App, containerFile string,
173174
cargs: cargs_map,
174175
}
175176

176-
if containerConfig.IdleShutdownSecs > 0 && (!app.IsDev || containerConfig.IdleShutdownDevApps) {
177+
if containerConfig.IdleShutdownSecs > 0 &&
178+
(!app.IsDev || containerConfig.IdleShutdownDevApps) {
177179
// Start the idle shutdown check
178180
m.idleShutdownTicker = time.NewTicker(time.Duration(containerConfig.IdleShutdownSecs) * time.Second)
179181
go m.idleAppShutdown()
@@ -240,6 +242,18 @@ func (m *ContainerManager) idleAppShutdown() {
240242
continue
241243
}
242244

245+
if m.proxyTracker != nil {
246+
sent, recv := m.proxyTracker.GetRollingTotals()
247+
totalBytes := sent + recv
248+
if totalBytes >= uint64(m.containerConfig.IdleBytesHighWatermark) {
249+
m.Trace().Msgf("App %s not idle, bytes sent %d, bytes received %d, total bytes %d at high watermark %d",
250+
m.app.Id, sent, recv, totalBytes, m.containerConfig.IdleBytesHighWatermark)
251+
continue
252+
}
253+
m.Info().Msgf("App %s idle, bytes sent %d, bytes received %d, total bytes %d below high watermark %d",
254+
m.app.Id, sent, recv, totalBytes, m.containerConfig.IdleBytesHighWatermark)
255+
}
256+
243257
m.Debug().Msgf("Shutting down idle app %s after %d seconds", m.app.Id, idleTimeSecs)
244258

245259
fullHash, err := m.getAppHash()

internal/app/proxy_tracker.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package app
2+
3+
import (
4+
"bufio"
5+
"io"
6+
"net"
7+
"net/http"
8+
"net/http/httputil"
9+
"sync"
10+
"time"
11+
)
12+
13+
type bucket struct {
14+
sec int64
15+
sent uint64
16+
recv uint64
17+
}
18+
19+
type ByteWindow struct {
20+
mu sync.Mutex
21+
buckets []bucket // len = windowSeconds
22+
window int // seconds
23+
}
24+
25+
func NewByteWindow(windowSeconds int) *ByteWindow {
26+
return &ByteWindow{
27+
buckets: make([]bucket, windowSeconds),
28+
window: windowSeconds,
29+
}
30+
}
31+
32+
func (bw *ByteWindow) add(now time.Time, sent, recv uint64) {
33+
sec := now.Unix()
34+
idx := int(sec % int64(bw.window))
35+
36+
bw.mu.Lock()
37+
defer bw.mu.Unlock()
38+
39+
b := &bw.buckets[idx]
40+
if b.sec != sec {
41+
// this slot is stale; reset
42+
b.sec = sec
43+
b.sent = 0
44+
b.recv = 0
45+
}
46+
b.sent += sent
47+
b.recv += recv
48+
}
49+
50+
func (bw *ByteWindow) Totals() (sent, recv uint64) {
51+
now := time.Now().Unix()
52+
bw.mu.Lock()
53+
defer bw.mu.Unlock()
54+
for i := range bw.buckets {
55+
if now-bw.buckets[i].sec < int64(bw.window) {
56+
sent += bw.buckets[i].sent
57+
recv += bw.buckets[i].recv
58+
}
59+
}
60+
return
61+
}
62+
63+
type countingReadCloser struct {
64+
rc io.ReadCloser
65+
bw *ByteWindow
66+
}
67+
68+
func (c *countingReadCloser) Read(p []byte) (int, error) {
69+
n, err := c.rc.Read(p)
70+
if n > 0 {
71+
c.bw.add(time.Now(), 0, uint64(n))
72+
}
73+
return n, err
74+
}
75+
func (c *countingReadCloser) Close() error { return c.rc.Close() }
76+
77+
type countingResponseWriter struct {
78+
http.ResponseWriter
79+
bw *ByteWindow
80+
}
81+
82+
func (w *countingResponseWriter) Write(p []byte) (int, error) {
83+
n, err := w.ResponseWriter.Write(p)
84+
if n > 0 {
85+
w.bw.add(time.Now(), uint64(n), 0)
86+
}
87+
return n, err
88+
}
89+
90+
// Support Flush for streaming/SSE.
91+
func (w *countingResponseWriter) Flush() {
92+
if f, ok := w.ResponseWriter.(http.Flusher); ok {
93+
f.Flush()
94+
}
95+
}
96+
97+
// Implement Hijacker so reverse proxy can upgrade to WS,
98+
// and we can wrap the net.Conn to count both directions.
99+
func (w *countingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
100+
hj, ok := w.ResponseWriter.(http.Hijacker)
101+
if !ok {
102+
return nil, nil, http.ErrHijacked
103+
}
104+
c, rw, err := hj.Hijack()
105+
if err != nil {
106+
return nil, nil, err
107+
}
108+
return &countingConn{Conn: c, bw: w.bw}, rw, nil
109+
}
110+
111+
type countingConn struct {
112+
net.Conn
113+
bw *ByteWindow
114+
}
115+
116+
func (c *countingConn) Read(b []byte) (int, error) {
117+
n, err := c.Conn.Read(b)
118+
if n > 0 {
119+
c.bw.add(time.Now(), 0, uint64(n)) // client -> proxy
120+
}
121+
return n, err
122+
}
123+
func (c *countingConn) Write(b []byte) (int, error) {
124+
n, err := c.Conn.Write(b)
125+
if n > 0 {
126+
c.bw.add(time.Now(), uint64(n), 0) // proxy -> client
127+
}
128+
return n, err
129+
}
130+
131+
// Tracker is a reverse proxy with byte count tracking
132+
type Tracker struct {
133+
bw *ByteWindow
134+
proxy *httputil.ReverseProxy
135+
}
136+
137+
func NewTracker(proxy *httputil.ReverseProxy, windowSeconds int) *Tracker {
138+
return &Tracker{
139+
bw: NewByteWindow(windowSeconds),
140+
proxy: proxy,
141+
}
142+
}
143+
144+
func (t *Tracker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
145+
// Count request bytes (body) for non-upgraded requests.
146+
if r.Body != nil {
147+
r.Body = &countingReadCloser{rc: r.Body, bw: t.bw}
148+
}
149+
crw := &countingResponseWriter{ResponseWriter: w, bw: t.bw}
150+
t.proxy.ServeHTTP(crw, r)
151+
}
152+
153+
// Accessor to read the rolling totals.
154+
func (t *Tracker) GetRollingTotals() (sent, recv uint64) {
155+
return t.bw.Totals()
156+
}

0 commit comments

Comments
 (0)