-
Notifications
You must be signed in to change notification settings - Fork 21
Not returning on connection close for chainsync, block-fetch and tx-submission protocol #1141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 13 commits
6b39f59
c42b242
829e00e
b255d66
3afdaa0
4de8be1
a43fdd1
762277d
b01fcbe
3e78cb5
29a9bd1
728d9b1
6163d32
f6b9134
7c2d72a
aca6320
9e80115
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| package ouroboros | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "io" | ||
|
|
@@ -76,6 +77,8 @@ type Connection struct { | |
| delayProtocolStart bool | ||
| fullDuplex bool | ||
| peerSharingEnabled bool | ||
| ctx context.Context | ||
| cancelCtx context.CancelFunc | ||
| // Mini-protocols | ||
| blockFetch *blockfetch.BlockFetch | ||
| blockFetchConfig *blockfetch.Config | ||
|
|
@@ -250,6 +253,52 @@ func (c *Connection) shutdown() { | |
| close(c.errorChan) | ||
| } | ||
|
|
||
| // handleConnectionError handles connection-level errors centrally | ||
| func (c *Connection) handleConnectionError(err error) error { | ||
| if err == nil { | ||
| return nil | ||
| } | ||
|
|
||
| // Only propagate EOF errors when acting as a client with active server-side protocols | ||
| if errors.Is(err, io.EOF) { | ||
| // Check if we have any active server-side protocols | ||
| if c.server { | ||
| return err | ||
| } | ||
|
|
||
| // For clients, only propagate EOF if we have active server protocols | ||
| hasActiveServerProtocols := false | ||
| if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
| if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
| if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
| if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
| if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
| if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() { | ||
| hasActiveServerProtocols = true | ||
| } | ||
|
|
||
| if hasActiveServerProtocols { | ||
| return err | ||
| } | ||
|
|
||
| // EOF with no active server protocols is normal connection closure | ||
| return nil | ||
| } | ||
|
|
||
| // For non-EOF errors, always propagate | ||
| return err | ||
| } | ||
|
|
||
| // setupConnection establishes the muxer, configures and starts the handshake process, and initializes | ||
| // the appropriate mini-protocols | ||
| func (c *Connection) setupConnection() error { | ||
|
|
@@ -260,10 +309,13 @@ func (c *Connection) setupConnection() error { | |
| c.networkMagic, | ||
| ) | ||
| } | ||
| // Create context for connection | ||
| c.ctx, c.cancelCtx = context.WithCancel(context.Background()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We already have an async shutdown method, so we don't need to add another one. |
||
| // Start Goroutine to shutdown when doneChan is closed | ||
| c.doneChan = make(chan any) | ||
| go func() { | ||
| <-c.doneChan | ||
| c.cancelCtx() | ||
| c.shutdown() | ||
| }() | ||
| // Populate connection ID | ||
|
|
@@ -285,23 +337,28 @@ func (c *Connection) setupConnection() error { | |
| if !ok { | ||
| return | ||
| } | ||
| var connErr *muxer.ConnectionClosedError | ||
| if errors.As(err, &connErr) { | ||
| // Pass through ConnectionClosedError from muxer | ||
| c.errorChan <- err | ||
| } else { | ||
| // Wrap error message to denote it comes from the muxer | ||
| c.errorChan <- fmt.Errorf("muxer error: %w", err) | ||
|
|
||
| // Use centralized connection error handling | ||
| if handledErr := c.handleConnectionError(err); handledErr != nil { | ||
| var connErr *muxer.ConnectionClosedError | ||
| if errors.As(handledErr, &connErr) { | ||
| // Pass through ConnectionClosedError from muxer | ||
| c.errorChan <- handledErr | ||
| } else { | ||
| // Wrap error message to denote it comes from the muxer | ||
| c.errorChan <- fmt.Errorf("muxer error: %w", handledErr) | ||
| } | ||
| // Close connection on muxer errors | ||
| c.Close() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Always call Prompt for AI agentsThere was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Graceful remote disconnects no longer trigger Close(), leaving doneChan and connection resources hanging because handleConnectionError suppresses io.EOF. Prompt for AI agentsThere was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A normal remote EOF (handledErr == nil) no longer triggers Close(), so the connection never shuts down and goroutines leak. Close the connection even when the EOF is being suppressed. Prompt for AI agents |
||
| } | ||
| // Close connection on muxer errors | ||
| c.Close() | ||
| } | ||
| }() | ||
| protoOptions := protocol.ProtocolOptions{ | ||
| ConnectionId: c.id, | ||
| Muxer: c.muxer, | ||
| Logger: c.logger, | ||
| ErrorChan: c.protoErrorChan, | ||
| Context: c.ctx, | ||
| } | ||
| if c.useNodeToNodeProto { | ||
| protoOptions.Mode = protocol.ProtocolModeNodeToNode | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
EOF handling logic inverted for server/client; also consider ErrUnexpectedEOF
Proposed fix:
func (c *Connection) handleConnectionError(err error) error { if err == nil { return nil } - // Only propagate EOF errors when acting as a client with active server-side protocols - if errors.Is(err, io.EOF) { - // Check if we have any active server-side protocols - if c.server { - return err - } - - // For clients, only propagate EOF if we have active server protocols - hasActiveServerProtocols := false - if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() { - hasActiveServerProtocols = true - } - if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() { - hasActiveServerProtocols = true - } - - if hasActiveServerProtocols { - return err - } - - // EOF with no active server protocols is normal connection closure - return nil - } + // Treat EOF/UnexpectedEOF as connection closed, decide based on active protocols for our role + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + hasActive := false + if c.server { + // Server: check server-side protocols + if c.chainSync != nil && c.chainSync.Server != nil && !c.chainSync.Server.IsDone() { + hasActive = true + } + if c.blockFetch != nil && c.blockFetch.Server != nil && !c.blockFetch.Server.IsDone() { + hasActive = true + } + if c.txSubmission != nil && c.txSubmission.Server != nil && !c.txSubmission.Server.IsDone() { + hasActive = true + } + if c.localStateQuery != nil && c.localStateQuery.Server != nil && !c.localStateQuery.Server.IsDone() { + hasActive = true + } + if c.localTxMonitor != nil && c.localTxMonitor.Server != nil && !c.localTxMonitor.Server.IsDone() { + hasActive = true + } + if c.localTxSubmission != nil && c.localTxSubmission.Server != nil && !c.localTxSubmission.Server.IsDone() { + hasActive = true + } + } else { + // Client: check client-side protocols + if c.chainSync != nil && c.chainSync.Client != nil && !c.chainSync.Client.IsDone() { + hasActive = true + } + if c.blockFetch != nil && c.blockFetch.Client != nil && !c.blockFetch.Client.IsDone() { + hasActive = true + } + if c.txSubmission != nil && c.txSubmission.Client != nil && !c.txSubmission.Client.IsDone() { + hasActive = true + } + if c.localStateQuery != nil && c.localStateQuery.Client != nil && !c.localStateQuery.Client.IsDone() { + hasActive = true + } + if c.localTxMonitor != nil && c.localTxMonitor.Client != nil && !c.localTxMonitor.Client.IsDone() { + hasActive = true + } + if c.localTxSubmission != nil && c.localTxSubmission.Client != nil && !c.localTxSubmission.Client.IsDone() { + hasActive = true + } + } + // Propagate only if a protocol is still active; otherwise swallow as normal closure + if hasActive { + return err + } + return nil + } // For non-EOF errors, always propagate return err }To confirm muxer behavior for UnexpectedEOF remapping, run:
Also applies to: 335-347
🏁 Script executed:
Length of output: 0
🏁 Script executed:
Length of output: 5839
🌐 Web query:
💡 Result:
io.ErrUnexpectedEOF
What it is: a package-level error variable defined as
var ErrUnexpectedEOF = errors.New("unexpected EOF")
meaning EOF was encountered in the middle of reading a fixed-size block or structured data. [1]
Where it’s declared: package io in the Go standard library. [1]
When it’s returned: commonly by helpers that require reading an exact number of bytes (e.g. io.ReadFull, io.ReadAtLeast) and by packages that read fixed-size binary structures (e.g. encoding/binary) when EOF occurs after some but not all bytes were read. If no bytes were read, functions typically return io.EOF instead. [2][3]
How to check it: since Go 1.13 errors may be wrapped, use errors.Is(err, io.ErrUnexpectedEOF) to detect it robustly. (Older code sometimes used err == io.ErrUnexpectedEOF.) [4]
Sources:
[1] io package docs (ErrUnexpectedEOF definition).
[2] io.ReadFull / ReadAtLeast behavior (returns ErrUnexpectedEOF when partial read then EOF).
[3] encoding/binary.Read behavior (returns io.ErrUnexpectedEOF if EOF after some bytes).
[4] Go error-handling FAQ (use errors.Is for comparisons).
🏁 Script executed:
Length of output: 100
🏁 Script executed:
Length of output: 3089
🏁 Script executed:
Length of output: 802
Fix EOF handling logic: server/client role checks are inverted
The current code has the logic backwards:
This means servers always propagate EOF (even when protocols are active), and clients check the wrong role's protocols. The proposed fix correctly swaps the logic: server checks
Serverfields, client checksClientfields. Additionally, handleio.ErrUnexpectedEOFalongsideio.EOF, as the protocol layer already uses it for partial reads.Apply the proposed diff and ensure the muxer behavior aligns with the corrected logic.
🤖 Prompt for AI Agents