Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions hashmail_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ const (
// reads for it to be considered for pruning. Otherwise, memory will grow
// unbounded.
streamTTL = 24 * time.Hour

// streamAcquireTimeout determines how long we wait for a read/write
// stream to become available before reporting it as occupied. Context
// cancellation is still honoured immediately, so callers can shorten
// the wait.
streamAcquireTimeout = 250 * time.Millisecond
)

// streamID is the identifier of a stream.
Expand Down Expand Up @@ -317,7 +323,14 @@ func (s *stream) RequestReadStream(ctx context.Context) (*readStream, error) {
case r := <-s.readStreamChan:
s.status.streamTaken(true)
return r, nil
default:

case <-s.quit:
return nil, fmt.Errorf("stream shutting down")

case <-ctx.Done():
return nil, ctx.Err()

case <-time.After(streamAcquireTimeout):
return nil, fmt.Errorf("read stream occupied")
}
}
Expand All @@ -332,7 +345,14 @@ func (s *stream) RequestWriteStream(ctx context.Context) (*writeStream, error) {
case w := <-s.writeStreamChan:
s.status.streamTaken(false)
return w, nil
default:

case <-s.quit:
return nil, fmt.Errorf("stream shutting down")

case <-ctx.Done():
return nil, ctx.Err()

case <-time.After(streamAcquireTimeout):
return nil, fmt.Errorf("write stream occupied")
}
}
Expand Down
9 changes: 8 additions & 1 deletion hashmail_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ func setupAperture(t *testing.T) {
errChan := make(chan error)
shutdown := make(chan struct{})
require.NoError(t, aperture.Start(errChan, shutdown))
t.Cleanup(func() {
close(shutdown)
require.NoError(t, aperture.Stop())
})

// Any error while starting?
select {
Expand Down Expand Up @@ -508,7 +512,10 @@ func recvFromStream(client hashmailrpc.HashMailClient) error {
}()

select {
case <-time.After(time.Second):
// Wait a little longer than the server's stream-acquire timeout so we
// only trip this path when the server truly failed to hand over the
// stream (instead of beating it to the punch).
case <-time.After(2 * streamAcquireTimeout):
return fmt.Errorf("timed out waiting to receive from receive " +
"stream")

Expand Down
Loading