Concurrency
Concurrency control in Chronicle ensures that multiple operations don’t interfere with each other when appending events to the same event source. Chronicle provides a sophisticated concurrency control mechanism through the ConcurrencyScope concept, which allows you to define precisely how concurrency should be handled based on formalized event metadata tags.
Understanding ConcurrencyScope
Section titled “Understanding ConcurrencyScope”A ConcurrencyScope defines the boundaries and constraints for concurrent operations when appending events. It uses formalized, well-known event metadata tags that are indexed by Chronicle to scope concurrency control to specific aspects of your events, providing fine-grained control over when concurrency violations should be detected. These tags are separate from any user-controlled event tags you may add for categorization. See Event Metadata Tags for details.
Formalized Metadata Tags for Concurrency
Section titled “Formalized Metadata Tags for Concurrency”Chronicle uses the following formalized, indexed metadata tags to scope concurrency:
- EventSourceId: Unique identifier for the event source
- EventSourceType: Overarching, binding concept (e.g., Account)
- EventStreamType: A concrete process related to event source type (e.g., Onboarding, Transactions)
- EventStreamId: A marker to separate independent streams for a stream type (e.g., Monthly, Yearly)
- EventTypes: Specific event types to scope concurrency to
Basic Usage
Section titled “Basic Usage”Simple Concurrency Scope
Section titled “Simple Concurrency Scope”The most basic form of concurrency control scopes to a specific sequence number for an event source:
using Cratis.Events;using Cratis.Chronicle.EventSequences.Concurrency;
public class BankAccountService(IEventLog eventLog){ public async Task OpenAccount(AccountId accountId, string accountName) { var concurrencyScope = new ConcurrencyScope( SequenceNumber: 42, // Expected sequence number EventSourceId: accountId );
await eventLog.Append( accountId, new AccountOpened(accountName), concurrencyScope: concurrencyScope ); }}Using ConcurrencyScopeBuilder
Section titled “Using ConcurrencyScopeBuilder”For more complex scenarios, use the ConcurrencyScopeBuilder to fluently construct concurrency scopes:
public class AccountTransactionService(IEventLog eventLog){ public async Task ProcessTransaction(AccountId accountId, decimal amount) { var concurrencyScope = new ConcurrencyScopeBuilder() .WithEventSourceId(accountId) .WithSequenceNumber(15) .WithEventStreamType("Transactions") .WithEventType<MoneyDeposited>() .WithEventType<MoneyWithdrawn>() .Build();
await eventLog.Append( accountId, new MoneyDeposited(amount), concurrencyScope: concurrencyScope ); }}Scoping by Event Metadata Tags
Section titled “Scoping by Event Metadata Tags”EventSourceType and EventStreamType
Section titled “EventSourceType and EventStreamType”You can scope concurrency to specific event source types and stream types:
public class AccountManagementService(IEventLog eventLog){ public async Task UpdateAccountSettings(AccountId accountId, AccountSettings settings) { var concurrencyScope = new ConcurrencyScopeBuilder() .WithEventSourceId(accountId) .WithEventSourceType("BankAccount") .WithEventStreamType("AccountManagement") .WithSequenceNumber(10) .Build();
await eventLog.Append( accountId, new AccountSettingsUpdated(settings), eventSourceType: "BankAccount", eventStreamType: "AccountManagement", concurrencyScope: concurrencyScope ); }}EventStreamId
Section titled “EventStreamId”Use event stream IDs to scope concurrency to specific streams within a stream type:
public class MonthlyReportService(IEventLog eventLog){ public async Task GenerateMonthlyReport(AccountId accountId, DateTime month) { var monthKey = month.ToString("yyyy-MM");
var concurrencyScope = new ConcurrencyScopeBuilder() .WithEventSourceId(accountId) .WithEventStreamType("Reporting") .WithEventStreamId(monthKey) .WithSequenceNumber(5) .Build();
await eventLog.Append( accountId, new MonthlyReportGenerated(month), eventStreamType: "Reporting", eventStreamId: monthKey, concurrencyScope: concurrencyScope ); }}Event Types
Section titled “Event Types”Scope concurrency to specific event types to allow concurrent operations on different types of events:
public class AccountService(IEventLog eventLog){ public async Task ProcessPayment(AccountId accountId, decimal amount) { // Only check concurrency for payment-related events var concurrencyScope = new ConcurrencyScopeBuilder() .WithEventSourceId(accountId) .WithSequenceNumber(20) .WithEventType<PaymentProcessed>() .WithEventType<PaymentFailed>() .WithEventType<PaymentRefunded>() .Build();
await eventLog.Append( accountId, new PaymentProcessed(amount), concurrencyScope: concurrencyScope ); }}AppendMany with Concurrency Scopes
Section titled “AppendMany with Concurrency Scopes”When appending multiple events, you can specify different concurrency scopes for different event sources:
public class TransferService(IEventLog eventLog){ public async Task TransferMoney(AccountId fromAccount, AccountId toAccount, decimal amount) { var events = new[] { new EventForEventSourceId(fromAccount, new MoneyWithdrawn(amount)), new EventForEventSourceId(toAccount, new MoneyDeposited(amount)) };
var concurrencyScopes = new Dictionary<EventSourceId, ConcurrencyScope> { [fromAccount] = new ConcurrencyScopeBuilder() .WithEventSourceId(fromAccount) .WithSequenceNumber(50) .WithEventType<MoneyWithdrawn>() .Build(),
[toAccount] = new ConcurrencyScopeBuilder() .WithEventSourceId(toAccount) .WithSequenceNumber(25) .WithEventType<MoneyDeposited>() .Build() };
await eventLog.AppendMany(events, concurrencyScopes: concurrencyScopes); }}Event Source Operations with Concurrency
Section titled “Event Source Operations with Concurrency”You can also use concurrency scopes with event source operations:
public class BatchAccountProcessor(IEventLog eventLog){ public async Task ProcessAccountBatch(AccountId accountId) { await eventLog .ForEventSourceId(accountId, source => source .WithConcurrencyScope(scope => scope .WithSequenceNumber(30) .WithEventType<AccountProcessed>() .WithEventType<AccountValidated>()) .Append(new AccountValidated()) .Append(new AccountProcessed())) .Perform(); }}Handling Concurrency Violations
Section titled “Handling Concurrency Violations”When a concurrency violation occurs, Chronicle will return a ConcurrencyViolation in the append result:
public class SafeAccountService(IEventLog eventLog){ public async Task<bool> TryOpenAccount(AccountId accountId, string accountName) { var concurrencyScope = new ConcurrencyScope( sequenceNumber: 0, // Expect this to be the first event eventSourceId: accountId );
var result = await eventLog.Append( accountId, new AccountOpened(accountName), concurrencyScope: concurrencyScope );
if (!result.IsSuccess) { // Handle the violation - maybe retry or return false return false; }
return result.IsSuccess; }}Concurrency Strategies
Section titled “Concurrency Strategies”Chronicle provides built-in concurrency strategies:
Optimistic Concurrency Strategy
Section titled “Optimistic Concurrency Strategy”This strategy gets the current tail sequence number and uses it as the expected sequence number:
// Configured automatically when using dependency injectionpublic class OptimisticAccountService(IEventLog eventLog, IConcurrencyScopeStrategies strategies){ public async Task UpdateAccount(AccountId accountId, string newName) { var strategy = strategies.GetFor(eventLog); var concurrencyScope = await strategy.GetScope(accountId);
await eventLog.Append( accountId, new AccountNameChanged(newName), concurrencyScope: concurrencyScope ); }}Note: When using aggregate roots, the
EventStreamTypeis automatically set based on the aggregate root’s type name or the[EventStreamType]attribute if specified. Aggregate roots automatically scope concurrency to their specific event stream type, ensuring that concurrent operations on different stream types don’t interfere with each other.
Best Practices
Section titled “Best Practices”- Use specific scoping: Scope concurrency as narrowly as possible to avoid unnecessary blocking
- Event type scoping: When possible, scope to specific event types to allow concurrent operations on different event types
- Handle violations gracefully: Always check for concurrency violations and implement appropriate retry or fallback logic
- Use builders for complex scopes: The
ConcurrencyScopeBuilderprovides a clear, fluent API for complex concurrency requirements - Aggregate root integration: Leverage aggregate roots for automatic stream type management and built-in concurrency strategies