-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Open
Description
Detailed Analysis: SubSource Race Condition & Architectural Deep Dive
The Race Condition Visualized
sequenceDiagram
participant Test as Test Code
participant Sub1 as Subscriber1
participant Sub2 as Subscriber2
participant SS as SubSource
participant AS as Actor System
Note over Test: Creates tail source from PrefixAndTail
Test->>Sub1: tail.To(Sink.FromSubscriber(subscriber1)).Run()
Sub1->>SS: Materialize (First)
SS->>AS: PreStart() - SetCallback succeeds
Note over SS,AS: Callback set successfully
Test->>Sub2: tail.To(Sink.FromSubscriber(subscriber2)).Run()
Sub2->>SS: Materialize (Second)
SS->>AS: PreStart() - SetCallback detects double materialization
Note over Test: Race window begins here
Test->>Sub2: ExpectSubscriptionAndError()
Sub2->>Sub2: sub.Request(1) - Signals demand
par Concurrent execution
AS-->>SS: Process IllegalStateException
SS-->>Sub2: Should send OnError
and
AS-->>SS: Process demand from Request(1)
SS-->>Sub2: Sends OnNext(2) - WINS THE RACE!
end
Note over Test: Test fails: Expected OnError, got OnNext(2)
Current Implementation Analysis
The problematic code path in SubSource<T>.Logic.SetCallback()
:
private void SetCallback(Action<IActorSubscriberMessage> callback)
{
// This CompareExchange is atomic and was fixed in PR #7796
var previous = _stage._status.CompareExchange(null, callback);
switch (previous)
{
case null:
return; // Success - first materialization
case Action<IActorSubscriberMessage>:
// This exception is thrown asynchronously in PreStart()
throw new IllegalStateException("Substream Source cannot be materialized more than once");
// ... other cases
}
}
The Issue: By the time this exception is thrown, the subscription is already established and ExpectSubscriptionAndError()
can immediately call Request(1)
, creating a race between error propagation and demand processing in the actor's mailbox.
Test Code Race Window
// Current failing test pattern
var subscriber2 = this.CreateSubscriberProbe<int>();
tail.To(Sink.FromSubscriber(subscriber2)).Run(Materializer); // Triggers SetCallback()
// This method has a hidden race condition:
subscriber2.ExpectSubscriptionAndError() // Defaults to signalDemand: true
.Message.Should()
.Be("Substream Source cannot be materialized more than once");
// Expands to:
internal static async Task<Exception> ExpectSubscriptionAndErrorTask(...)
{
var sub = await probe.ExpectSubscriptionAsync(); // Subscription established
if(signalDemand)
sub.Request(1); // ⚡ RACE: This can execute before error processing!
return await probe.ExpectErrorAsync(); // May receive OnNext instead
}
Potential Architectural Solutions
Option A: Eager Materialization Check
internal sealed class SubSource<T> : GraphStage<SourceShape<T>>
{
private readonly AtomicBoolean _materialized = new(false);
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
{
// Check BEFORE creating any reactive streams infrastructure
if (!_materialized.CompareAndSet(false, true))
{
throw new IllegalStateException("Substream Source cannot be materialized more than once");
}
return new Logic(this);
}
}
Problems:
- Breaks existing callback-based error handling pattern
CreateLogic()
exceptions may not propagate correctly to subscribers- Would require refactoring the entire
SubSourceOutlet
system
Option B: Synchronous Subscription Guard
private sealed class Logic : OutGraphStageLogic
{
private readonly AtomicBoolean _subscriptionAllowed = new(true);
// Override subscription method to fail fast
protected override void OnSubscribe(ISubscription subscription)
{
if (!_subscriptionAllowed.CompareAndSet(true, false))
{
// Immediately fail the subscription before any async processing
subscription.Cancel();
// This would require changes to Reactive Streams infrastructure
throw new IllegalStateException("Substream Source cannot be materialized more than once");
}
base.OnSubscribe(subscription);
}
}
Problems:
- Requires changes to core Reactive Streams implementation
- May violate Reactive Streams specification
- Complex coordination with existing error handling
Option C: Demand-Aware Error Handling
private sealed class Logic : OutGraphStageLogic
{
private volatile bool _errorState = false;
public override void OnPull()
{
if (_errorState)
{
// Prioritize error over demand processing
return; // Don't process demand if in error state
}
base.OnPull();
}
private void SetCallback(Action<IActorSubscriberMessage> callback)
{
var previous = _stage._status.CompareExchange(null, callback);
if (previous is Action<IActorSubscriberMessage>)
{
_errorState = true; // Set before any async processing
FailStage(new IllegalStateException("..."));
}
}
}
Problems:
- Still asynchronous -
FailStage()
goes through actor mailbox - Adds complexity to normal operation paths
- Race condition still exists, just reduced window
Architecture Dependency Graph
graph TD
A[PrefixAndTail Stage] -->|creates| B[SubSourceOutlet]
B -->|materializes to| C[SubSource GraphStage]
C -->|creates| D[SubSource.Logic]
D -->|during PreStart| E[SetCallback Race Detection]
F[Test: ExpectSubscriptionAndError] -->|calls| G[ExpectSubscription]
G -->|establishes| H[Reactive Streams Subscription]
H -->|enables| I[Immediate Request/Demand]
E -->|async| J[Actor Mailbox Processing]
I -->|async| J
J -->|race between| K[Error Propagation]
J -->|race between| L[Demand Processing]
K -->|should win| M[OnError to Subscriber]
L -->|actually wins| N[OnNext to Subscriber - BUG]
style J fill:#ffcccc
style N fill:#ff6666
style E fill:#ffcccc
Why Our Solution Works
// Fixed test - eliminates the race entirely
subscriber2.ExpectSubscriptionAndError(signalDemand: false) // No Request(1) call
.Message.Should()
.Be("Substream Source cannot be materialized more than once");
Timing Flow After Fix:
sequenceDiagram
participant Test as Test Code
participant Sub2 as Subscriber2
participant SS as SubSource
participant AS as Actor System
Test->>Sub2: tail.To(Sink.FromSubscriber(subscriber2)).Run()
Sub2->>SS: Materialize (Second)
SS->>AS: PreStart() - SetCallback detects double materialization
Test->>Sub2: ExpectSubscriptionAndError(signalDemand: false)
Note over Sub2: No Request(1) call - eliminates race
AS->>SS: Process IllegalStateException (no competing demand)
SS->>Sub2: OnError - "cannot be materialized more than once"
Sub2->>Test: Returns expected error ✅
Implementation Complexity Analysis
Approach | Code Changes | Risk Level | Compatibility Impact | Test Changes |
---|---|---|---|---|
Current Fix | Minimal (1 line) | Very Low | None | Test only |
Eager Check | Major refactoring | High | Breaking changes | Many tests |
Subscription Guard | Core RS changes | Very High | Spec compliance | Unknown |
Demand-Aware | Moderate complexity | Medium | Performance impact | Some tests |
Future Considerations
For a comprehensive architectural fix, we would need:
- Reactive Streams Extensions: Custom subscription lifecycle hooks
- GraphStage Lifecycle Redesign: Synchronous validation phases
- Cross-Thread Coordination: Better synchronization primitives
- Performance Analysis: Ensure no regression in hot paths
- Specification Compliance: Maintain Reactive Streams contract
The current fix is optimal because it:
- ✅ Solves the immediate problem with zero risk
- ✅ Demonstrates the correct testing pattern for similar scenarios
- ✅ Preserves all architectural options for future improvements
- ✅ Documents the complexity involved in a proper fix
This approach allows us to ship a stable solution while keeping the door open for more comprehensive improvements in future major versions.
Originally posted by @Aaronontheweb in #7816 (comment)