Concurrency
Concurrency control in Chronicle ensures that multiple operations don't interfere with each other when appending events to the same event source. Chronicle provides a sophisticated concurrency control mechanism through the ConcurrencyScope concept, which allows you to define precisely how concurrency should be handled based on event metadata tags.
Understanding ConcurrencyScope
A ConcurrencyScope defines the boundaries and constraints for concurrent operations when appending events. It uses the event metadata tags to scope concurrency control to specific aspects of your events, providing fine-grained control over when concurrency violations should be detected.
Event Metadata Tags for Concurrency
Chronicle uses the following event metadata tags to scope concurrency:
- EventSourceId: Unique identifier for the event source
- EventSourceType: Overarching, binding concept (e.g., Account)
- EventStreamType: A concrete process related to event source type (e.g., Onboarding, Transactions)
- EventStreamId: A marker to separate independent streams for a stream type (e.g., Monthly, Yearly)
- EventTypes: Specific event types to scope concurrency to
Basic Usage
Simple Concurrency Scope
The most basic form of concurrency control scopes to a specific sequence number for an event source:
using Cratis.Events;
using Cratis.Chronicle.EventSequences.Concurrency;
public class BankAccountService(IEventLog eventLog)
{
public async Task OpenAccount(AccountId accountId, string accountName)
{
var concurrencyScope = new ConcurrencyScope(
sequenceNumber: 42, // Expected sequence number
eventSourceId: accountId
);
await eventLog.Append(
accountId,
new AccountOpened(accountName),
concurrencyScope: concurrencyScope
);
}
}
Using ConcurrencyScopeBuilder
For more complex scenarios, use the ConcurrencyScopeBuilder to fluently construct concurrency scopes:
public class AccountTransactionService(IEventLog eventLog)
{
public async Task ProcessTransaction(AccountId accountId, decimal amount)
{
var concurrencyScope = new ConcurrencyScopeBuilder()
.WithEventSourceId(accountId)
.WithSequenceNumber(15)
.WithEventStreamType("Transactions")
.WithEventType<MoneyDeposited>()
.WithEventType<MoneyWithdrawn>()
.Build();
await eventLog.Append(
accountId,
new MoneyDeposited(amount),
concurrencyScope: concurrencyScope
);
}
}
Scoping by Event Metadata Tags
EventSourceType and EventStreamType
You can scope concurrency to specific event source types and stream types:
public class AccountManagementService(IEventLog eventLog)
{
public async Task UpdateAccountSettings(AccountId accountId, AccountSettings settings)
{
var concurrencyScope = new ConcurrencyScopeBuilder()
.WithEventSourceId(accountId)
.WithEventSourceType("BankAccount")
.WithEventStreamType("AccountManagement")
.WithSequenceNumber(10)
.Build();
await eventLog.Append(
accountId,
new AccountSettingsUpdated(settings),
eventSourceType: "BankAccount",
eventStreamType: "AccountManagement",
concurrencyScope: concurrencyScope
);
}
}
EventStreamId
Use event stream IDs to scope concurrency to specific streams within a stream type:
public class MonthlyReportService(IEventLog eventLog)
{
public async Task GenerateMonthlyReport(AccountId accountId, DateTime month)
{
var monthKey = month.ToString("yyyy-MM");
var concurrencyScope = new ConcurrencyScopeBuilder()
.WithEventSourceId(accountId)
.WithEventStreamType("Reporting")
.WithEventStreamId(monthKey)
.WithSequenceNumber(5)
.Build();
await eventLog.Append(
accountId,
new MonthlyReportGenerated(month),
eventStreamType: "Reporting",
eventStreamId: monthKey,
concurrencyScope: concurrencyScope
);
}
}
Event Types
Scope concurrency to specific event types to allow concurrent operations on different types of events:
public class AccountService(IEventLog eventLog)
{
public async Task ProcessPayment(AccountId accountId, decimal amount)
{
// Only check concurrency for payment-related events
var concurrencyScope = new ConcurrencyScopeBuilder()
.WithEventSourceId(accountId)
.WithSequenceNumber(20)
.WithEventType<PaymentProcessed>()
.WithEventType<PaymentFailed>()
.WithEventType<PaymentRefunded>()
.Build();
await eventLog.Append(
accountId,
new PaymentProcessed(amount),
concurrencyScope: concurrencyScope
);
}
}
AppendMany with Concurrency Scopes
When appending multiple events, you can specify different concurrency scopes for different event sources:
public class TransferService(IEventLog eventLog)
{
public async Task TransferMoney(AccountId fromAccount, AccountId toAccount, decimal amount)
{
var events = new[]
{
new EventForEventSourceId(fromAccount, new MoneyWithdrawn(amount)),
new EventForEventSourceId(toAccount, new MoneyDeposited(amount))
};
var concurrencyScopes = new Dictionary<EventSourceId, ConcurrencyScope>
{
[fromAccount] = new ConcurrencyScopeBuilder()
.WithEventSourceId(fromAccount)
.WithSequenceNumber(50)
.WithEventType<MoneyWithdrawn>()
.Build(),
[toAccount] = new ConcurrencyScopeBuilder()
.WithEventSourceId(toAccount)
.WithSequenceNumber(25)
.WithEventType<MoneyDeposited>()
.Build()
};
await eventLog.AppendMany(events, concurrencyScopes: concurrencyScopes);
}
}
Event Source Operations with Concurrency
You can also use concurrency scopes with event source operations:
public class BatchAccountProcessor(IEventLog eventLog)
{
public async Task ProcessAccountBatch(AccountId accountId)
{
await eventLog
.For(accountId)
.WithConcurrencyScope(scope => scope
.WithSequenceNumber(30)
.WithEventType<AccountProcessed>()
.WithEventType<AccountValidated>())
.Append(new AccountValidated())
.Append(new AccountProcessed())
.Commit();
}
}
Handling Concurrency Violations
When a concurrency violation occurs, Chronicle will return a ConcurrencyViolation in the append result:
public class SafeAccountService(IEventLog eventLog)
{
public async Task<bool> TryOpenAccount(AccountId accountId, string accountName)
{
var concurrencyScope = new ConcurrencyScope(
sequenceNumber: 0, // Expect this to be the first event
eventSourceId: accountId
);
var result = await eventLog.Append(
accountId,
new AccountOpened(accountName),
concurrencyScope: concurrencyScope
);
if (result.HasConcurrencyViolation)
{
// Handle the violation - maybe retry or return false
return false;
}
return result.IsSuccess;
}
}
Concurrency Strategies
Chronicle provides built-in concurrency strategies:
Optimistic Concurrency Strategy
This strategy gets the current tail sequence number and uses it as the expected sequence number:
// Configured automatically when using dependency injection
public class OptimisticAccountService(IEventLog eventLog, IConcurrencyScopeStrategies strategies)
{
public async Task UpdateAccount(AccountId accountId, string newName)
{
var strategy = strategies.GetFor<Account>();
var concurrencyScope = await strategy.GetScope(accountId);
await eventLog.Append(
accountId,
new AccountNameChanged(newName),
concurrencyScope: concurrencyScope
);
}
}
Note: When using aggregate roots, the
EventStreamTypeis automatically set based on the aggregate root's type name or the[EventStreamType]attribute if specified. Aggregate roots automatically scope concurrency to their specific event stream type, ensuring that concurrent operations on different stream types don't interfere with each other.
Best Practices
- Use specific scoping: Scope concurrency as narrowly as possible to avoid unnecessary blocking
- Event type scoping: When possible, scope to specific event types to allow concurrent operations on different event types
- Handle violations gracefully: Always check for concurrency violations and implement appropriate retry or fallback logic
- Use builders for complex scopes: The
ConcurrencyScopeBuilderprovides a clear, fluent API for complex concurrency requirements - Aggregate root integration: Leverage aggregate roots for automatic stream type management and built-in concurrency strategies