Skip to content

Akka.Streams SubSource race conditions and architectural issues #7817

@Aaronontheweb

Description

@Aaronontheweb

Detailed Analysis: SubSource Race Condition & Architectural Deep Dive

The Race Condition Visualized

sequenceDiagram
    participant Test as Test Code
    participant Sub1 as Subscriber1
    participant Sub2 as Subscriber2
    participant SS as SubSource
    participant AS as Actor System
    
    Note over Test: Creates tail source from PrefixAndTail
    Test->>Sub1: tail.To(Sink.FromSubscriber(subscriber1)).Run()
    Sub1->>SS: Materialize (First)
    SS->>AS: PreStart() - SetCallback succeeds
    Note over SS,AS: Callback set successfully
    
    Test->>Sub2: tail.To(Sink.FromSubscriber(subscriber2)).Run()
    Sub2->>SS: Materialize (Second)
    SS->>AS: PreStart() - SetCallback detects double materialization
    
    Note over Test: Race window begins here
    Test->>Sub2: ExpectSubscriptionAndError()
    Sub2->>Sub2: sub.Request(1) - Signals demand
    
    par Concurrent execution
        AS-->>SS: Process IllegalStateException
        SS-->>Sub2: Should send OnError
    and
        AS-->>SS: Process demand from Request(1)  
        SS-->>Sub2: Sends OnNext(2) - WINS THE RACE!
    end
    
    Note over Test: Test fails: Expected OnError, got OnNext(2)
Loading

Current Implementation Analysis

The problematic code path in SubSource<T>.Logic.SetCallback():

private void SetCallback(Action<IActorSubscriberMessage> callback)
{
    // This CompareExchange is atomic and was fixed in PR #7796
    var previous = _stage._status.CompareExchange(null, callback);
    
    switch (previous)
    {
        case null:
            return; // Success - first materialization
        case Action<IActorSubscriberMessage>:
            // This exception is thrown asynchronously in PreStart()
            throw new IllegalStateException("Substream Source cannot be materialized more than once");
        // ... other cases
    }
}

The Issue: By the time this exception is thrown, the subscription is already established and ExpectSubscriptionAndError() can immediately call Request(1), creating a race between error propagation and demand processing in the actor's mailbox.

Test Code Race Window

// Current failing test pattern
var subscriber2 = this.CreateSubscriberProbe<int>();
tail.To(Sink.FromSubscriber(subscriber2)).Run(Materializer); // Triggers SetCallback()

// This method has a hidden race condition:
subscriber2.ExpectSubscriptionAndError() // Defaults to signalDemand: true
    .Message.Should()
    .Be("Substream Source cannot be materialized more than once");

// Expands to:
internal static async Task<Exception> ExpectSubscriptionAndErrorTask(...)
{
    var sub = await probe.ExpectSubscriptionAsync(); // Subscription established
    if(signalDemand)
        sub.Request(1); // ⚡ RACE: This can execute before error processing!
    
    return await probe.ExpectErrorAsync(); // May receive OnNext instead
}

Potential Architectural Solutions

Option A: Eager Materialization Check

internal sealed class SubSource<T> : GraphStage<SourceShape<T>>
{
    private readonly AtomicBoolean _materialized = new(false);
    
    protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes)
    {
        // Check BEFORE creating any reactive streams infrastructure
        if (!_materialized.CompareAndSet(false, true))
        {
            throw new IllegalStateException("Substream Source cannot be materialized more than once");
        }
        
        return new Logic(this);
    }
}

Problems:

  • Breaks existing callback-based error handling pattern
  • CreateLogic() exceptions may not propagate correctly to subscribers
  • Would require refactoring the entire SubSourceOutlet system

Option B: Synchronous Subscription Guard

private sealed class Logic : OutGraphStageLogic
{
    private readonly AtomicBoolean _subscriptionAllowed = new(true);
    
    // Override subscription method to fail fast
    protected override void OnSubscribe(ISubscription subscription) 
    {
        if (!_subscriptionAllowed.CompareAndSet(true, false))
        {
            // Immediately fail the subscription before any async processing
            subscription.Cancel();
            // This would require changes to Reactive Streams infrastructure
            throw new IllegalStateException("Substream Source cannot be materialized more than once");
        }
        
        base.OnSubscribe(subscription);
    }
}

Problems:

  • Requires changes to core Reactive Streams implementation
  • May violate Reactive Streams specification
  • Complex coordination with existing error handling

Option C: Demand-Aware Error Handling

private sealed class Logic : OutGraphStageLogic
{
    private volatile bool _errorState = false;
    
    public override void OnPull()
    {
        if (_errorState)
        {
            // Prioritize error over demand processing
            return; // Don't process demand if in error state
        }
        
        base.OnPull();
    }
    
    private void SetCallback(Action<IActorSubscriberMessage> callback)
    {
        var previous = _stage._status.CompareExchange(null, callback);
        
        if (previous is Action<IActorSubscriberMessage>)
        {
            _errorState = true; // Set before any async processing
            FailStage(new IllegalStateException("..."));
        }
    }
}

Problems:

  • Still asynchronous - FailStage() goes through actor mailbox
  • Adds complexity to normal operation paths
  • Race condition still exists, just reduced window

Architecture Dependency Graph

graph TD
    A[PrefixAndTail Stage] -->|creates| B[SubSourceOutlet]
    B -->|materializes to| C[SubSource GraphStage]
    C -->|creates| D[SubSource.Logic]
    D -->|during PreStart| E[SetCallback Race Detection]
    
    F[Test: ExpectSubscriptionAndError] -->|calls| G[ExpectSubscription]
    G -->|establishes| H[Reactive Streams Subscription]
    H -->|enables| I[Immediate Request/Demand]
    
    E -->|async| J[Actor Mailbox Processing]
    I -->|async| J
    
    J -->|race between| K[Error Propagation]
    J -->|race between| L[Demand Processing]
    
    K -->|should win| M[OnError to Subscriber]
    L -->|actually wins| N[OnNext to Subscriber - BUG]
    
    style J fill:#ffcccc
    style N fill:#ff6666
    style E fill:#ffcccc
Loading

Why Our Solution Works

// Fixed test - eliminates the race entirely
subscriber2.ExpectSubscriptionAndError(signalDemand: false) // No Request(1) call
    .Message.Should()
    .Be("Substream Source cannot be materialized more than once");

Timing Flow After Fix:

sequenceDiagram
    participant Test as Test Code  
    participant Sub2 as Subscriber2
    participant SS as SubSource
    participant AS as Actor System
    
    Test->>Sub2: tail.To(Sink.FromSubscriber(subscriber2)).Run()
    Sub2->>SS: Materialize (Second)
    SS->>AS: PreStart() - SetCallback detects double materialization
    
    Test->>Sub2: ExpectSubscriptionAndError(signalDemand: false)
    Note over Sub2: No Request(1) call - eliminates race
    
    AS->>SS: Process IllegalStateException (no competing demand)
    SS->>Sub2: OnError - "cannot be materialized more than once"
    Sub2->>Test: Returns expected error ✅
Loading

Implementation Complexity Analysis

Approach Code Changes Risk Level Compatibility Impact Test Changes
Current Fix Minimal (1 line) Very Low None Test only
Eager Check Major refactoring High Breaking changes Many tests
Subscription Guard Core RS changes Very High Spec compliance Unknown
Demand-Aware Moderate complexity Medium Performance impact Some tests

Future Considerations

For a comprehensive architectural fix, we would need:

  1. Reactive Streams Extensions: Custom subscription lifecycle hooks
  2. GraphStage Lifecycle Redesign: Synchronous validation phases
  3. Cross-Thread Coordination: Better synchronization primitives
  4. Performance Analysis: Ensure no regression in hot paths
  5. Specification Compliance: Maintain Reactive Streams contract

The current fix is optimal because it:

  • ✅ Solves the immediate problem with zero risk
  • ✅ Demonstrates the correct testing pattern for similar scenarios
  • ✅ Preserves all architectural options for future improvements
  • ✅ Documents the complexity involved in a proper fix

This approach allows us to ship a stable solution while keeping the door open for more comprehensive improvements in future major versions.

Originally posted by @Aaronontheweb in #7816 (comment)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions