From bf9fdfdcf72741e3fda2c6a15e4c73cea919ecbe Mon Sep 17 00:00:00 2001 From: Wojciech Klusek Date: Thu, 19 Oct 2023 11:31:10 +0200 Subject: [PATCH] Add MassTransit section --- docs/domain/domain_event/index.md | 2 +- .../messaging_masstransit/handling_events.md | 44 +++++ .../messaging_masstransit/index.md | 171 ++++++++++++++++++ mkdocs.yml | 1 + 4 files changed, 217 insertions(+), 1 deletion(-) create mode 100644 docs/external_integrations/messaging_masstransit/handling_events.md diff --git a/docs/domain/domain_event/index.md b/docs/domain/domain_event/index.md index 37dadddd9..38fedf610 100644 --- a/docs/domain/domain_event/index.md +++ b/docs/domain/domain_event/index.md @@ -63,4 +63,4 @@ public class EmployeeAssignedToAssignment : IDomainEvent Ensure that `EmployeeAssignedToAssignment` implements the `IDomainEvent` interface, and it has a constructor with the `[JsonConstructor]` attribute for proper deserialization. After being raised, the event can be handled by the matching `IConsumer` to perform wanted action. -> **Tip:** To read how to handle domain events, see [Messaging - MassTransit](../../external_integrations/messaging_masstransit/index.md). +> **Tip:** To read how to handle domain events, visit [here](../../external_integrations/messaging_masstransit/handling_events.md). diff --git a/docs/external_integrations/messaging_masstransit/handling_events.md b/docs/external_integrations/messaging_masstransit/handling_events.md new file mode 100644 index 000000000..d1e261cea --- /dev/null +++ b/docs/external_integrations/messaging_masstransit/handling_events.md @@ -0,0 +1,44 @@ +# Handling events + +Once an event is raised, it can be handled by a corresponding `IConsumer` to perform the desired action. The default consumer configuration can be customized by overriding the `ConfigureConsumer` method from the `ConsumerDefinition` interface. In the following example, an email is sent to the employee who has been assigned to a assignment: + +```csharp +public class SendEmailToEmployeeOnEmployeeAssignedToAssignment + : IConsumer +{ + private readonly IEmailSender emailSender; + private readonly IRepository employees; + private readonly IRepository assignments; + + public SendEmailToEmployeeOnEmployeeAssignedToAssignment( + IEmailSender emailSender, + IRepository employees, + IRepository assignments) + { + this.emailSender = emailSender; + this.employees = employees; + this.assignments = assignments; + } + + public async Task Consume( + ConsumeContext context) + { + var msg = context.Message; + + var employee = await employees.FindAndEnsureExistsAsync( + msg.EmployeeId, + context.CancellationToken); + + var assignment = await assignments.FindAndEnsureExistsAsync( + msg.AssignmentId, + context.CancellationToken); + + await emailSender.SendEmployeeAssignedToAssignmentEmailAsync( + employee, + assignment, + context.CancellationToken); + } +} +``` + +> **Tip:** More about consumers can be found here: [MassTransit Consumers](https://masstransit.io/documentation/concepts/consumers). diff --git a/docs/external_integrations/messaging_masstransit/index.md b/docs/external_integrations/messaging_masstransit/index.md index e69de29bb..7b1e0d360 100644 --- a/docs/external_integrations/messaging_masstransit/index.md +++ b/docs/external_integrations/messaging_masstransit/index.md @@ -0,0 +1,171 @@ +# Messaging - MassTransit + +[MassTransit] is a popular open-source distributed application framework for building scalable and robust messaging systems in .NET applications. It provides a comprehensive set of tools and abstractions to simplify the development of message-based applications, making it easier to implement messaging patterns. + +To integrate [MassTransit] with LeanCode CoreLibrary CQRS, you can utilize the [LeanCode.CQRS.MassTransitRelay] package. This package serves as a bridge that enables the passing of raised events from your application to the [MassTransit] message bus. This integration is vital for facilitating event-driven communication within domain. + +## Configuration + +[LeanCode.CQRS.MassTransitRelay] requires the following elements to be configured in the CQRS pipeline (in the following order): + +1. `CommitDatabaseTransactionMiddleware` (call [CommitTransaction]) +2. `EventsPublisherMiddleware` (call [PublishEvents]) + +> **Tip:** To find you more how you can configure pipeline visit [here](../../cqrs/pipeline/index.md). + +For setting up bus configuration, [AddMassTransitIntegration] must be used. This method registers all the essential services required for [MassTransit] to work seamlessly with the rest of CoreLibrary. It effectively invokes [AddMassTransit], allowing you to consult the [MassTransit documentation](https://masstransit.io/documentation/concepts) for further insights. + +Here's an example configuration that utilizes in-memory transport during development and Azure Service Bus in non-development environments: + +```csharp +public override void ConfigureServices(IServiceCollection services) +{ + . . . + + services.AddCQRSMassTransitIntegration(cfg => + { + // Adds MassTransit's Inbox & Outbox pattern implementation. + // More information in Inbox & Outbox section below. + cfg.AddEntityFrameworkOutbox(outboxCfg => + { + outboxCfg.LockStatementProvider = + // Using CustomPostgresLockStatementProvider vendored from + // https://github.com/MassTransit/MassTransit/blob/9e6c78573ad211a70b624fad31382faa331dc4d8/src/Persistence/MassTransit.EntityFrameworkIntegration/EntityFrameworkIntegration/SqlLockStatementProvider.cs + // as MassTransit uses EF8 incompatible API + new LeanCode.CQRS.MassTransitRelay.LockProviders.CustomPostgresLockStatementProvider(); + outboxCfg.UseBusOutbox(); + }); + + // Adds consumers with default configuration. + // More information in Consumer definition section below. + cfg.AddConsumersWithDefaultConfiguration( + // Array of assemblies where handlers are located + AllHandlers.Assemblies.ToArray(), + typeof(DefaultConsumerDefinition<>) + ); + + if (hostEnv.IsDevelopment()) + { + cfg.AddDelayedMessageScheduler(); + cfg.UsingInMemory( + (ctx, cfg) => + { + cfg.UseDelayedMessageScheduler(); + ConfigureBusCommon( + ctx, + cfg + ); + } + ); + } + else + { + cfg.AddServiceBusMessageScheduler(); + cfg.UsingAzureServiceBus( + (ctx, cfg) => + { + cfg.Host( + // Azure Service Bus endpoint taken from Configuration + new Uri(Config.MassTransit.AzureServiceBus.Endpoint( + Configuration)), + host => + { + host.RetryLimit = 5; + host.RetryMinBackoff = TimeSpan.FromSeconds(3); + // Helper method to create Azure.Core.TokenCredential from Configuration + host.TokenCredential = + DefaultLeanCodeCredential.Create(Configuration); + } + ); + + cfg.UseServiceBusMessageScheduler(); + ConfigureBusCommon( + ctx, + cfg + ); + } + ); + } + + static void ConfigureBusCommon( + IBusRegistrationContext ctx, + TConfigurator cfg + ) + where TConfigurator : IBusFactoryConfigurator + where TReceiveConfigurator : IReceiveEndpointConfigurator + { + cfg.ConfigureEndpoints(ctx); + cfg.ConfigureJsonSerializerOptions(KnownConverters.AddAll); + cfg.ConnectBusObservers(ctx); + } + }); + + . . . +} +``` + +### Inbox & Outbox + +The Inbox & Outbox pattern is a crucial architectural concept when it comes to handling and managing distributed transactions and messaging in software systems. In the context of CoreLibrary and [MassTransit] integration, this pattern is employed to ensure reliable and transactional message processing. + +**Inbox:** The Inbox component serves as the place where incoming messages are initially received and stored before they are processed. It acts as a buffer, ensuring that no messages are lost or duplicated during the message processing pipeline. This is essential for achieving message reliability and consistency, especially in scenarios involving distributed systems. + +**Outbox:** On the other hand, the Outbox is a key component for ensuring the atomicity of operations that involve both message publishing and database changes. It is used to store messages that need to be sent as part of a transaction. These messages are held in the Outbox until the associated database changes are committed successfully. Once the database transaction is confirmed, the Outbox releases the messages for delivery, ensuring that the two operations—database updates and message publishing—occur atomically. This is a critical feature when building systems that demand consistency across multiple components and data stores. + +The integration of CoreLibrary and [MassTransit] relies on the implementation of the [MassTransit] Inbox & Outbox pattern. To learn more about this pattern, you can refer to [the documentation](https://masstransit.io/documentation/patterns/transactional-outbox). In addition to adding this pattern within `AddCQRSMassTransitIntegration` in your configuration, it also needs to be incorporated within `OnModelCreating` when using Entity Framework, as demonstrated below: + +```csharp +protected override void OnModelCreating(ModelBuilder builder) +{ + . . . + + builder.AddTransactionalOutboxEntities(); + + . . . +} +``` + +### Consumer definition + +[MassTransit] uses [ConsumerDefinition] to configure the pipeline of each consumer. To work effectively with domain events, the pipeline for every consumer that might raise them needs to include [EventsPublisherFilter]. For streamlined configuration and error prevention, [AddConsumersWithDefaultConfiguration] registers all consumers with default configuration using conventions. It's important to note that consumers registered this way **must be** public. + +[LeanCode.CQRS.MassTransitRelay] comes with a set of filters, primarily designed to ensure that events raised in consumers are relayed to the bus. These filters include: + +- [CorrelationFilter]: Enriches logs with a message ID and a consumer type, added by `UseLogsCorrelation`. +- [EventsPublisherFilter]: Relays Domain Events raised later in the pipeline to the bus, added by `UseDomainEventsPublishing`. + +Here's a minimal, usable consumer definition that can serve as a default: + +```csharp +public class DefaultConsumerDefinition + : ConsumerDefinition + where TConsumer : class, IConsumer +{ + protected override void ConfigureConsumer( + IReceiveEndpointConfigurator endpointConfigurator, + IConsumerConfigurator consumerConfigurator, + IRegistrationContext context + ) + { + // Correlate logs with the message consumer execution + endpointConfigurator.UseLogsCorrelation(); + + // Use transactional outbox pattern + endpointConfigurator.UseEntityFrameworkOutbox(context); + + // Configure domain events + endpointConfigurator.UseDomainEventsPublishing(context); + } +} +``` + +[MassTransit]: https://masstransit-project.com/ +[LeanCode.CQRS.MassTransitRelay]: https://github.com/leancodepl/corelibrary/tree/v8.0-preview/src/CQRS/LeanCode.CQRS.MassTransitRelay +[CommitTransaction]: https://github.com/leancodepl/corelibrary/blob/v8.0-preview/src/CQRS/LeanCode.CQRS.MassTransitRelay/MassTransitRelayApplicationBuilderExtensions.cs#L9 +[Publishevents]: https://github.com/leancodepl/corelibrary/blob/v8.0-preview/src/CQRS/LeanCode.CQRS.MassTransitRelay/MassTransitRelayApplicationBuilderExtensions.cs#L16 +[AddMassTransitIntegration]: https://github.com/leancodepl/corelibrary/blob/v8.0-preview/src/CQRS/LeanCode.CQRS.MassTransitRelay/MassTransitRelayServiceCollectionExtensions.cs#L10 +[AddConsumersWithDefaultConfiguration]: https://github.com/leancodepl/corelibrary/blob/v8.0-preview/src/CQRS/LeanCode.CQRS.MassTransitRelay/MassTransitRegistrationConfigurationExtensions.cs#L13 +[CorrelationFilter]: https://github.com/leancodepl/corelibrary/blob/v8.0-preview/src/CQRS/LeanCode.CQRS.MassTransitRelay/Middleware/CorrelationFilter.cs +[EventsPublisherFilter]: https://github.com/leancodepl/corelibrary/blob/v8.0-preview/src/CQRS/LeanCode.CQRS.MassTransitRelay/Middleware/EventsPublisherFilter.cs +[ConsumerDefinition]: https://masstransit.io/documentation/configuration/consumers#consumer-definitions +[AddMassTransit]: https://masstransit.io/documentation/configuration diff --git a/mkdocs.yml b/mkdocs.yml index d571a27fe..b72e13ea4 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -50,6 +50,7 @@ nav: - ./external_integrations/index.md - Messaging - MassTransit: - ./external_integrations/messaging_masstransit/index.md + - Handling events: ./external_integrations/messaging_masstransit/handling_events.md - Features: - ./features/index.md - Force update: