-
Notifications
You must be signed in to change notification settings - Fork 0
ii. ✨Sagas
Sagas in DDD are a crucial pattern for managing complex business processes that span multiple aggregates or bounded contexts, especially when you need to maintain consistency across distributed transactions.
A saga is a sequence of local transactions that can be coordinated to achieve eventual consistency across multiple aggregates. When a business process requires changes to multiple aggregates that can't be handled in a single transaction, sagas provide a way to break the process into steps while maintaining data consistency through compensation actions.
Sagas typically work with commands in this flow:
- Initial Command: A command triggers the saga
- Command Sequence: The saga breaks down the business process into a series of commands
- Compensation Commands: If any step fails, compensation commands undo previous successful steps
- Events: Sagas react to commands to raise domain events to progress other Aggregates.
- Consistency: Provides eventual consistency across aggregate boundaries without distributed transactions
- Resilience: Built-in error handling and compensation mechanisms
- Scalability: Avoids long-running locks and distributed transaction overhead
- Business Alignment: Models complex business processes explicitly
- Auditability & Monitoring: Provides visibility into long-running business processes and change auditability.
- Idempotency: All commands and compensation actions should be idempotent since they might be retried
- Timeout Handling: Sagas should handle scenarios where steps don't complete within expected timeframes
- State Persistence: Saga state needs to be persisted to survive system failures
- Ordering: Consider whether steps can be executed in parallel or must be sequential
- Compensation Logic: Design compensation actions that can reliably undo the effects of successful steps
To define a Saga, you need to implement from Saga<TEntity>
base class. TEntity is the aggregate entity class that the saga applies changes as a result of handling commands. To subscribe to individual commands raised by the Aggregate class, The saga needs to implement IHandles<TCommand>
interface where TCommand is the type of command raised by associated Aggregate.
Example: AccountSaga below.
public class AccountSaga : Saga<BankAccount>,
IHandles<CreateAccount>,
IHandles<ActivateAccount>,
IHandles<DepositMoney>,
IHandles<WithdrawMoney>,
IHandles<CloseAccount>
{
public async Task Handle(CreateAccount command)
{
logger.LogInformation("Action=Account_Created, Account={AccountId}, Holder={AccountName}, Initial_Balance={InitialBalance}",
command.Payload.Id, command.Payload.AccountName, command.Payload.InitialAmount);
if (string.IsNullOrEmpty(command.Payload.AccountName))
throw new ArgumentException("Account create requires account holder name.", nameof(command.Payload.AccountName));
if (command.Payload.InitialAmount <= 0)
throw new ArgumentException("Account create requires initial amount.", nameof(command.Payload.InitialAmount));
var account = new BankAccount
{
Id = command.Payload.Id,
AccountName = command.Payload.AccountName,
Balance = command.Payload.InitialAmount
};
await repository.Persist(account);
await Raise(new AccountCreated(account));
}
public async Task Handle(ActivateAccount command)
{
logger.LogInformation("Action=Account_Activate, ActivatedOn={ActiveOn}, Account={AccountId}", command.Payload.ActiveOn, command.Payload.Id);
var account = await repository.Get<BankAccount>(command.Payload.Id);
if (account.IsClosed)
throw new InvalidOperationException("Cannot deposit to a closed account");
if (command.Payload.ActiveOn == DateTime.MinValue)
throw new ArgumentException("Deposit amount must be positive", nameof(command.Payload.ActiveOn));
account.ActiveOn = command.Payload.ActiveOn;
await repository.Persist(account);
await Raise(new AccountUpdated(account));
}
public async Task Handle(DepositMoney command)
{
logger.LogInformation("Action=Money_Deposited, Amount={Amount}, Account={AccountId}", command.Payload.Amount, command.Payload.Id);
var account = await repository.Get<BankAccount>(command.Payload.Id);
if (account.IsClosed)
throw new InvalidOperationException("Cannot deposit to a closed account");
if (command.Payload.Amount <= 0)
throw new ArgumentException("Deposit amount must be positive", nameof(command.Payload.Amount));
command.Payload.CurrentBalance = account.Balance + command.Payload.Amount;
account.Balance = command.Payload.CurrentBalance;
await repository.Persist(account);
await Raise(new AccountUpdated(account));
}
public async Task Handle(WithdrawMoney command)
{
logger.LogInformation("Action=Money_Withdrawn, Amount={Amount}, Account={AccountId}", command.Payload.Amount, command.Payload.Id);
var account = await repository.Get<BankAccount>(command.Payload.Id);
if (account.IsClosed)
throw new InvalidOperationException("Cannot deposit to a closed account");
if (command.Payload.Amount <= 0)
throw new ArgumentException("Deposit amount must be positive", nameof(command.Payload.Amount));
command.Payload.CurrentBalance = account.Balance - command.Payload.Amount;
account.Balance = command.Payload.CurrentBalance;
await repository.Persist(account);
await Raise(new AccountUpdated(account));
}
public async Task Handle(CloseAccount command)
{
logger.LogInformation("Action=Account_Closed, Account={AccountId}, Reason={Reason}", command.Payload.Id, command.Payload.ClosureReason);
if (string.IsNullOrWhiteSpace(command.Payload.ClosureReason))
throw new ArgumentException("Reason for closing cannot be empty", nameof(command.Payload.ClosureReason));
var account = await repository.Get<BankAccount>(command.Payload.Id);
if (account.IsClosed)
throw new InvalidOperationException("Cannot close account on a closed account");
account.ClosureReason = command.Payload.ClosureReason;
account.IsClosed = command.Payload.IsClosed = true;
await repository.Persist(account);
await Raise(new AccountUpdated(account));
}
}
In the above example saga implementation, each command handler updates the state of the bank account entity. saga has access to the entity repository for retrieving and persisting aggregate entity.
Note: You need to provide the implementation of IRepository
interface to manage entities.
In addition to the state management, saga (handlers) can raise domain events as a result of change by the business operation. Saga should always raise domain events to reflect entity created, updated or deleted state changes.
An Event is an Implementation of Event<TPayload>
base class. The payload is basically the encapsulated entity (instance of IEntity) affected by the event. Other aggregates can subcribe to events from associated or other sagas.
public class AccountCreated : Event<BankAccount>
{
public AccountCreated(BankAccount payload) : base(payload)
{
}
}
Example: AccountSaga
below - uses Raise() methos to publish a domain event.
public class AccountSaga : Saga<BankAccount>,
IHandles<CreateAccount>
{
public async Task Handle(CreateAccount command)
{
.......
await Raise(new AccountCreated(account));
}
}
The Aggregate could replay the stream of commands. The Saga needs to handle the commands idempotently to recreate the same state each time upon replay. When commands are replayed, the Command.Metadata.IsReplay
property in command is set to true
to differentiate the replay command processing.