Documentation

adrs/027-event-driven-architecture.md

ADR-027: Event-Driven Architecture with CloudEvents and Azure Event Grid

Status

Proposed

Context

Dynaplex services need asynchronous, decoupled communication for scenarios where synchronous HTTP calls are inappropriate or insufficient. The existing Events component (engines/events/) provides a solid foundation for event publishing using the CNCF CloudEvents specification and Azure Event Grid, but lacks standardized patterns for:

  1. Event consumption: No webhook infrastructure for receiving events from Event Grid
  2. Reliability: No transactional outbox to prevent event loss during failures
  3. Idempotency: No built-in protection against duplicate event processing
  4. Naming conventions: No standardized event type taxonomy
  5. Cross-service coordination: No defined patterns for saga/choreography

Existing Foundation

The Events component already provides:

Capability Implementation Status
Event format CNCF CloudEvents 1.0 ✅ Complete
Transport Azure Event Grid ✅ Complete
Publisher IEventPublisher / EventPublisher ✅ Complete
In-process subscriber IEventSubscriber / EventSubscriber ✅ Complete
Domain event pattern DomainEventRequest with Domain/Action/EntityId ✅ Complete
Correlation/tracing CloudEventExtensions ✅ Complete
REST API /events/domain, /events/system, /events/batch ✅ Complete

Use Cases Requiring Events

Business Events:

  • Asset created, moved, or status changed
  • Workflow state transitions
  • Location hierarchy changes
  • User permission changes

System Events:

  • Component health status changes
  • Configuration updates
  • Batch job completion

Integration Events:

  • External system synchronization
  • Real-time UI updates via SignalR
  • Audit trail generation

Decision

We extend the existing Events component to provide a complete event-driven architecture with:

  1. Webhook handlers for receiving Event Grid events
  2. Transactional outbox for reliable event publishing
  3. Idempotent handler infrastructure for exactly-once semantics
  4. Standardized event naming following CloudEvents conventions
  5. Aspire integration for local development and service discovery

Architecture Overview

┌─────────────────────────────────────────────────────────────────────────────┐
│                              PUBLISHING                                     │
│                                                                             │
│  ┌──────────────┐    ┌─────────────────┐    ┌──────────────────────────┐   │
│  │ Catalog      │    │ Transactional   │    │ Outbox Processor         │   │
│  │ Service      │───▶│ Outbox          │───▶│ (Background Service)     │   │
│  │              │    │ (same DB tx)    │    │                          │   │
│  └──────────────┘    └─────────────────┘    └───────────┬──────────────┘   │
│                                                         │                   │
│                                                         ▼                   │
│                                              ┌──────────────────────────┐   │
│                                              │ IEventPublisher          │   │
│                                              │ (Azure Event Grid)       │   │
│                                              └───────────┬──────────────┘   │
└──────────────────────────────────────────────────────────┼──────────────────┘
                                                           │
                                                           ▼
                                              ┌──────────────────────────┐
                                              │ Azure Event Grid Topic   │
                                              │                          │
                                              │  Subscriptions:          │
                                              │  - spatial-events        │
                                              │  - workflow-events       │
                                              │  - notification-events   │
                                              └───────────┬──────────────┘
                                                          │
┌─────────────────────────────────────────────────────────┼───────────────────┐
│                              CONSUMPTION                │                   │
│                                                         ▼                   │
│  ┌──────────────────────────────────────────────────────────────────────┐  │
│  │                    Event Grid Webhook Endpoint                        │  │
│  │                    POST /events/webhook                               │  │
│  └───────────────────────────────────┬──────────────────────────────────┘  │
│                                      │                                      │
│                                      ▼                                      │
│  ┌──────────────────────────────────────────────────────────────────────┐  │
│  │                    Idempotency Check                                  │  │
│  │                    (ProcessedEventsStore)                             │  │
│  └───────────────────────────────────┬──────────────────────────────────┘  │
│                                      │                                      │
│                                      ▼                                      │
│  ┌──────────────────────────────────────────────────────────────────────┐  │
│  │                    IEventSubscriber                                   │  │
│  │                    (Dispatch to registered handlers)                  │  │
│  └───────────────────────────────────┬──────────────────────────────────┘  │
│                                      │                                      │
│         ┌────────────────────────────┼────────────────────────────┐        │
│         ▼                            ▼                            ▼        │
│  ┌─────────────┐            ┌─────────────────┐           ┌─────────────┐  │
│  │ Asset       │            │ Notification    │           │ Audit       │  │
│  │ Handler     │            │ Handler         │           │ Handler     │  │
│  └─────────────┘            └─────────────────┘           └─────────────┘  │
└─────────────────────────────────────────────────────────────────────────────┘

Consequences

Positive

  • Standards-based: CloudEvents 1.0 ensures interoperability
  • Loose coupling: Services communicate without direct dependencies
  • Reliability: Transactional outbox prevents event loss
  • Scalability: Event Grid handles high throughput automatically
  • Flexibility: Easy to add new event consumers via subscriptions
  • Observability: Full tracing via OpenTelemetry integration

Negative

  • Eventual consistency: Data may be temporarily out of sync across services
  • Complexity: More infrastructure to manage (Event Grid, subscriptions)
  • Debugging: Event flows harder to trace than synchronous calls
  • Cost: Azure Event Grid has per-event pricing

Neutral

  • Ordering: Event Grid provides ordering within partition key only
  • Delivery: At-least-once delivery requires idempotent handlers

Implementation

1. Event Naming Conventions

All events follow the CloudEvents type attribute convention:

com.acsis.{domain}.{action}

Domain Events (Business Operations)

Domain Action Event Type Description
asset created com.acsis.asset.created New asset registered
asset moved com.acsis.asset.moved Asset location changed
asset status-changed com.acsis.asset.status-changed Asset status updated
asset deleted com.acsis.asset.deleted Asset removed
location created com.acsis.location.created New location created
location hierarchy-changed com.acsis.location.hierarchy-changed Parent/child relationship changed
workflow started com.acsis.workflow.started Workflow execution began
workflow step-completed com.acsis.workflow.step-completed Workflow step finished
workflow completed com.acsis.workflow.completed Workflow execution finished
workflow failed com.acsis.workflow.failed Workflow execution failed
user created com.acsis.user.created New user registered
user permissions-changed com.acsis.user.permissions-changed User permissions updated
tag read com.acsis.tag.read RFID tag scanned
tag batch-processed com.acsis.tag.batch-processed Tag batch completed

System Events (Operational)

Component Event Type Description
(any) com.acsis.system.started Component started
(any) com.acsis.system.stopped Component stopped
(any) com.acsis.system.health-changed Health status changed
(any) com.acsis.system.config-updated Configuration reloaded

2. Event Data Contracts

All event data classes inherit from BaseEventData and use proper Dynaplex types:

// In Acsis.Dynaplex.Engines.Events.Abstractions/Models/

/// <summary>
/// Event data for asset creation events.
/// </summary>
public class AssetCreatedEventData : BaseEventData
{
    /// <summary>
    /// The unique identifier of the created asset.
    /// </summary>
    [JsonPropertyName("assetId")]
    public Guid AssetId { get; set; }

    /// <summary>
    /// The asset's passport ID in the Prism registry.
    /// </summary>
    [JsonPropertyName("passportId")]
    public Guid PassportId { get; set; }

    /// <summary>
    /// The asset name.
    /// </summary>
    [JsonPropertyName("name")]
    public string Name { get; set; } = string.Empty;

    /// <summary>
    /// The item type identifier.
    /// </summary>
    [JsonPropertyName("itemTypeId")]
    public Guid ItemTypeId { get; set; }

    /// <summary>
    /// The location where the asset was created.
    /// </summary>
    [JsonPropertyName("locationId")]
    public Guid LocationId { get; set; }

    /// <summary>
    /// The tenant that owns this asset.
    /// </summary>
    [JsonPropertyName("tenantId")]
    public Guid TenantId { get; set; }
}

/// <summary>
/// Event data for asset movement events.
/// </summary>
public class AssetMovedEventData : BaseEventData
{
    [JsonPropertyName("assetId")]
    public Guid AssetId { get; set; }

    [JsonPropertyName("passportId")]
    public Guid PassportId { get; set; }

    [JsonPropertyName("fromLocationId")]
    public Guid FromLocationId { get; set; }

    [JsonPropertyName("toLocationId")]
    public Guid ToLocationId { get; set; }

    [JsonPropertyName("movedBy")]
    public Guid? MovedBy { get; set; }

    [JsonPropertyName("tenantId")]
    public Guid TenantId { get; set; }
}

/// <summary>
/// Event data for workflow state transitions.
/// </summary>
public class WorkflowStepCompletedEventData : BaseEventData
{
    [JsonPropertyName("workflowInstanceId")]
    public Guid WorkflowInstanceId { get; set; }

    [JsonPropertyName("workflowDefinitionId")]
    public Guid WorkflowDefinitionId { get; set; }

    [JsonPropertyName("stepId")]
    public Guid StepId { get; set; }

    [JsonPropertyName("stepName")]
    public string StepName { get; set; } = string.Empty;

    [JsonPropertyName("outcome")]
    public string Outcome { get; set; } = string.Empty;

    [JsonPropertyName("nextStepId")]
    public Guid? NextStepId { get; set; }

    [JsonPropertyName("tenantId")]
    public Guid TenantId { get; set; }
}

3. Transactional Outbox Pattern

Ensures events are never lost, even if Event Grid is temporarily unavailable:

// In Acsis.Dynaplex.Engines.Events.Database/

/// <summary>
/// Stores events to be published, ensuring atomicity with business operations.
/// </summary>
[Table("outbox_messages", Schema = "events")]
public class OutboxMessage
{
    [Key]
    [Column("id")]
    public Guid Id { get; set; }

    /// <summary>
    /// The CloudEvent type (e.g., "com.acsis.asset.created").
    /// </summary>
    [Required]
    [Column("event_type")]
    [MaxLength(256)]
    public string EventType { get; set; } = string.Empty;

    /// <summary>
    /// The CloudEvent source URI.
    /// </summary>
    [Required]
    [Column("source")]
    [MaxLength(512)]
    public string Source { get; set; } = string.Empty;

    /// <summary>
    /// The serialized CloudEvent data payload.
    /// </summary>
    [Required]
    [Column("event_data", TypeName = "jsonb")]
    public string EventData { get; set; } = string.Empty;

    /// <summary>
    /// Optional subject for routing.
    /// </summary>
    [Column("subject")]
    [MaxLength(256)]
    public string? Subject { get; set; }

    /// <summary>
    /// Correlation ID for distributed tracing.
    /// </summary>
    [Column("correlation_id")]
    [MaxLength(64)]
    public string? CorrelationId { get; set; }

    /// <summary>
    /// When the event occurred in the domain.
    /// </summary>
    [Column("occurred_at")]
    public DateTimeOffset OccurredAt { get; set; }

    /// <summary>
    /// When this outbox record was created.
    /// </summary>
    [Column("created_at")]
    public DateTimeOffset CreatedAt { get; set; } = DateTimeOffset.UtcNow;

    /// <summary>
    /// When the event was successfully published (null if pending).
    /// </summary>
    [Column("published_at")]
    public DateTimeOffset? PublishedAt { get; set; }

    /// <summary>
    /// Number of publish attempts.
    /// </summary>
    [Column("attempt_count")]
    public int AttemptCount { get; set; }

    /// <summary>
    /// Last error message if publish failed.
    /// </summary>
    [Column("last_error")]
    public string? LastError { get; set; }

    /// <summary>
    /// Tenant context for the event.
    /// </summary>
    [Column("tenant_id")]
    public Guid TenantId { get; set; }
}

Outbox Publisher Service

// In Acsis.Dynaplex.Engines.Events/Services/

/// <summary>
/// Publishes events through the outbox for transactional reliability.
/// </summary>
public interface IOutboxEventPublisher
{
    /// <summary>
    /// Queues an event in the outbox within the current transaction.
    /// The event will be published asynchronously by the OutboxProcessor.
    /// </summary>
    Task QueueEventAsync<TData>(
        string eventType,
        TData data,
        string? subject = null,
        CancellationToken ct = default) where TData : BaseEventData;
}

public class OutboxEventPublisher(
    EventsDb db,
    ICurrentTenantService tenantService,
    ILogger<OutboxEventPublisher> logger) : IOutboxEventPublisher
{
    public async Task QueueEventAsync<TData>(
        string eventType,
        TData data,
        string? subject = null,
        CancellationToken ct = default) where TData : BaseEventData
    {
        var correlationId = Activity.Current?.TraceId.ToString();

        var outboxMessage = new OutboxMessage
        {
            Id = Guid.NewGuid(),
            EventType = eventType,
            Source = $"urn:acsis:component:{GetComponentName()}",
            EventData = JsonSerializer.Serialize(data),
            Subject = subject,
            CorrelationId = correlationId,
            OccurredAt = data.Timestamp,
            TenantId = tenantService.TenantId
        };

        db.OutboxMessages.Add(outboxMessage);

        // Note: SaveChangesAsync is called by the caller as part of their transaction
        logger.LogDebug("Queued event {EventType} with ID {EventId} to outbox",
            eventType, outboxMessage.Id);
    }

    private static string GetComponentName() =>
        Assembly.GetEntryAssembly()?.GetName().Name ?? "unknown";
}

Background Outbox Processor

/// <summary>
/// Background service that publishes queued outbox events to Event Grid.
/// </summary>
public class OutboxProcessor(
    IServiceScopeFactory scopeFactory,
    IEventPublisher eventPublisher,
    ILogger<OutboxProcessor> logger) : BackgroundService
{
    private static readonly TimeSpan PollingInterval = TimeSpan.FromSeconds(5);
    private static readonly int BatchSize = 100;
    private static readonly int MaxAttempts = 5;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        logger.LogInformation("Outbox processor started");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await ProcessPendingMessagesAsync(stoppingToken);
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Error processing outbox messages");
            }

            await Task.Delay(PollingInterval, stoppingToken);
        }
    }

    private async Task ProcessPendingMessagesAsync(CancellationToken ct)
    {
        using var scope = scopeFactory.CreateScope();
        var db = scope.ServiceProvider.GetRequiredService<EventsDb>();

        var pendingMessages = await db.OutboxMessages
            .Where(m => m.PublishedAt == null && m.AttemptCount < MaxAttempts)
            .OrderBy(m => m.CreatedAt)
            .Take(BatchSize)
            .ToListAsync(ct);

        if (pendingMessages.Count == 0)
            return;

        logger.LogDebug("Processing {Count} pending outbox messages", pendingMessages.Count);

        foreach (var message in pendingMessages)
        {
            try
            {
                var cloudEvent = CreateCloudEventFromOutbox(message);
                await eventPublisher.PublishAsync(cloudEvent, ct);

                message.PublishedAt = DateTimeOffset.UtcNow;
                message.AttemptCount++;

                logger.LogInformation("Published outbox event {EventId} of type {EventType}",
                    message.Id, message.EventType);
            }
            catch (Exception ex)
            {
                message.AttemptCount++;
                message.LastError = ex.Message;

                logger.LogWarning(ex,
                    "Failed to publish outbox event {EventId} (attempt {Attempt}/{MaxAttempts})",
                    message.Id, message.AttemptCount, MaxAttempts);
            }
        }

        await db.SaveChangesAsync(ct);
    }

    private static CloudEvent CreateCloudEventFromOutbox(OutboxMessage message)
    {
        var cloudEvent = new CloudEvent
        {
            Id = message.Id.ToString(),
            Type = message.EventType,
            Source = new Uri(message.Source),
            Time = message.OccurredAt,
            DataContentType = "application/json",
            Data = JsonSerializer.Deserialize<JsonElement>(message.EventData),
            Subject = message.Subject
        };

        if (!string.IsNullOrEmpty(message.CorrelationId))
        {
            cloudEvent["correlationid"] = message.CorrelationId;
        }

        return cloudEvent;
    }
}

4. Idempotent Event Handling

Prevents duplicate processing when events are delivered multiple times:

// In Acsis.Dynaplex.Engines.Events.Database/

/// <summary>
/// Tracks processed events for idempotency.
/// </summary>
[Table("processed_events", Schema = "events")]
[Index(nameof(EventId), IsUnique = true)]
public class ProcessedEvent
{
    [Key]
    [Column("id")]
    public Guid Id { get; set; }

    /// <summary>
    /// The CloudEvent ID that was processed.
    /// </summary>
    [Required]
    [Column("event_id")]
    [MaxLength(64)]
    public string EventId { get; set; } = string.Empty;

    /// <summary>
    /// The event type that was processed.
    /// </summary>
    [Required]
    [Column("event_type")]
    [MaxLength(256)]
    public string EventType { get; set; } = string.Empty;

    /// <summary>
    /// When the event was processed.
    /// </summary>
    [Column("processed_at")]
    public DateTimeOffset ProcessedAt { get; set; } = DateTimeOffset.UtcNow;

    /// <summary>
    /// The handler that processed this event.
    /// </summary>
    [Column("handler_name")]
    [MaxLength(256)]
    public string? HandlerName { get; set; }
}

Idempotency Service

/// <summary>
/// Service for ensuring idempotent event processing.
/// </summary>
public interface IEventIdempotencyService
{
    /// <summary>
    /// Checks if an event has already been processed and marks it as processing.
    /// Returns true if the event should be processed, false if already handled.
    /// </summary>
    Task<bool> TryBeginProcessingAsync(string eventId, string eventType, string handlerName, CancellationToken ct = default);

    /// <summary>
    /// Marks an event as successfully processed.
    /// </summary>
    Task MarkProcessedAsync(string eventId, CancellationToken ct = default);

    /// <summary>
    /// Removes processing lock if handler fails (allows retry).
    /// </summary>
    Task RollbackProcessingAsync(string eventId, CancellationToken ct = default);
}

public class EventIdempotencyService(
    EventsDb db,
    ILogger<EventIdempotencyService> logger) : IEventIdempotencyService
{
    public async Task<bool> TryBeginProcessingAsync(
        string eventId,
        string eventType,
        string handlerName,
        CancellationToken ct = default)
    {
        try
        {
            // Try to insert - will fail if duplicate due to unique index
            db.ProcessedEvents.Add(new ProcessedEvent
            {
                Id = Guid.NewGuid(),
                EventId = eventId,
                EventType = eventType,
                HandlerName = handlerName,
                ProcessedAt = DateTimeOffset.UtcNow
            });

            await db.SaveChangesAsync(ct);
            return true;
        }
        catch (DbUpdateException ex) when (ex.InnerException is PostgresException { SqlState: "23505" })
        {
            // Duplicate key - event already processed
            logger.LogDebug("Event {EventId} already processed by {Handler}, skipping",
                eventId, handlerName);
            return false;
        }
    }

    public async Task MarkProcessedAsync(string eventId, CancellationToken ct = default)
    {
        // Already marked in TryBeginProcessingAsync - this is a no-op
        // Could be used to update status if we had a state machine
        await Task.CompletedTask;
    }

    public async Task RollbackProcessingAsync(string eventId, CancellationToken ct = default)
    {
        var processed = await db.ProcessedEvents
            .FirstOrDefaultAsync(p => p.EventId == eventId, ct);

        if (processed != null)
        {
            db.ProcessedEvents.Remove(processed);
            await db.SaveChangesAsync(ct);

            logger.LogDebug("Rolled back processing for event {EventId}", eventId);
        }
    }
}

5. Webhook Handler for Event Grid

Receives events from Azure Event Grid subscriptions:

// In Acsis.Dynaplex.Engines.Events/ApiEngines/

/// <summary>
/// Provides the webhook endpoint for receiving Event Grid events.
/// </summary>
public static class EventGridWebhookApi
{
    public static void MapEventGridWebhookEndpoints(this IEndpointRouteBuilder routes)
    {
        var group = routes.MapGroup("/events/webhook")
            .WithTags("Events Webhook")
            .AllowAnonymous(); // Event Grid uses validation, not auth tokens

        group.MapPost("/", HandleEventGridWebhook)
            .WithName("EventGridWebhook")
            .WithSummary("Receive Event Grid events")
            .WithDescription("Webhook endpoint for Azure Event Grid event delivery.");

        group.MapOptions("/", HandleEventGridValidation)
            .WithName("EventGridValidation")
            .WithSummary("Event Grid subscription validation");
    }

    private static async Task<IResult> HandleEventGridWebhook(
        HttpContext context,
        IEventSubscriber eventSubscriber,
        IEventIdempotencyService idempotency,
        ILogger<IEventSubscriber> logger)
    {
        // Handle Event Grid subscription validation
        if (context.Request.Headers.TryGetValue("aeg-event-type", out var eventType))
        {
            if (eventType == "SubscriptionValidation")
            {
                return await HandleSubscriptionValidation(context);
            }
        }

        // Parse CloudEvents from request body
        var cloudEvents = await ParseCloudEventsAsync(context);

        foreach (var cloudEvent in cloudEvents)
        {
            using var activity = StartEventProcessingActivity(cloudEvent);

            try
            {
                // Idempotency check
                var handlerName = "EventGridWebhook";
                if (!await idempotency.TryBeginProcessingAsync(
                    cloudEvent.Id!, cloudEvent.Type!, handlerName))
                {
                    logger.LogDebug("Skipping duplicate event {EventId}", cloudEvent.Id);
                    continue;
                }

                // Dispatch to registered handlers
                await eventSubscriber.ProcessEventAsync(cloudEvent);

                logger.LogInformation("Processed webhook event {EventType} with ID {EventId}",
                    cloudEvent.Type, cloudEvent.Id);
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Failed to process webhook event {EventId}", cloudEvent.Id);
                await idempotency.RollbackProcessingAsync(cloudEvent.Id!);

                // Return 500 to trigger Event Grid retry
                return Results.StatusCode(500);
            }
        }

        return Results.Ok();
    }

    private static async Task<IResult> HandleSubscriptionValidation(HttpContext context)
    {
        using var reader = new StreamReader(context.Request.Body);
        var body = await reader.ReadToEndAsync();
        var events = JsonSerializer.Deserialize<JsonElement[]>(body);

        if (events?.Length > 0)
        {
            var validationCode = events[0]
                .GetProperty("data")
                .GetProperty("validationCode")
                .GetString();

            return Results.Ok(new { validationResponse = validationCode });
        }

        return Results.BadRequest();
    }

    private static async Task<List<CloudEvent>> ParseCloudEventsAsync(HttpContext context)
    {
        // Event Grid sends CloudEvents in the request body
        var formatter = new JsonEventFormatter();
        var events = new List<CloudEvent>();

        using var reader = new StreamReader(context.Request.Body);
        var body = await reader.ReadToEndAsync();

        // Event Grid may send single event or array
        if (body.TrimStart().StartsWith('['))
        {
            var jsonArray = JsonSerializer.Deserialize<JsonElement[]>(body);
            foreach (var element in jsonArray ?? [])
            {
                var cloudEvent = formatter.DecodeStructuredModeMessage(
                    Encoding.UTF8.GetBytes(element.GetRawText()),
                    new ContentType("application/cloudevents+json"),
                    null);
                events.Add(cloudEvent);
            }
        }
        else
        {
            var cloudEvent = formatter.DecodeStructuredModeMessage(
                Encoding.UTF8.GetBytes(body),
                new ContentType("application/cloudevents+json"),
                null);
            events.Add(cloudEvent);
        }

        return events;
    }

    private static Activity? StartEventProcessingActivity(CloudEvent cloudEvent)
    {
        var activity = new ActivitySource("Acsis.Events")
            .StartActivity("ProcessEvent", ActivityKind.Consumer);

        activity?.SetTag("cloudevents.type", cloudEvent.Type);
        activity?.SetTag("cloudevents.id", cloudEvent.Id);
        activity?.SetTag("cloudevents.source", cloudEvent.Source?.ToString());

        // Restore trace context if present
        var traceParent = cloudEvent.GetTraceParent();
        if (!string.IsNullOrEmpty(traceParent))
        {
            activity?.SetParentId(traceParent);
        }

        return activity;
    }

    private static Task<IResult> HandleEventGridValidation(HttpContext context)
    {
        // OPTIONS request for CORS/validation
        return Task.FromResult(Results.Ok());
    }
}

6. Example: Publishing Events from a Service

// In Acsis.Dynaplex.Engines.Catalog/Services/

public class ItemsService(
    CatalogDb db,
    IOutboxEventPublisher outboxPublisher,
    ICurrentTenantService tenantService,
    ILogger<ItemsService> logger)
{
    public async Task<Item> CreateItemAsync(CreateItemRequest request, CancellationToken ct)
    {
        logger.LogInformation("Creating item {ItemName} of type {ItemTypeId}",
            request.Name, request.ItemTypeId);

        var item = new Item
        {
            Id = Guid.NewGuid(),
            Name = request.Name,
            ItemTypeId = request.ItemTypeId,
            LocationId = request.LocationId,
            TenantId = tenantService.TenantId
        };

        db.Items.Add(item);

        // Queue event in same transaction - guaranteed to be published
        await outboxPublisher.QueueEventAsync(
            eventType: "com.acsis.asset.created",
            data: new AssetCreatedEventData
            {
                AssetId = item.Id,
                PassportId = item.PassportId,
                Name = item.Name,
                ItemTypeId = item.ItemTypeId,
                LocationId = item.LocationId,
                TenantId = item.TenantId
            },
            subject: item.Id.ToString(),
            ct: ct);

        // Both the item AND the outbox message are saved atomically
        await db.SaveChangesAsync(ct);

        logger.LogInformation("Created item {ItemId} with passport {PassportId}",
            item.Id, item.PassportId);

        return item;
    }
}

7. Example: Consuming Events

// In Acsis.Dynaplex.Engines.Spatial/EventHandlers/

/// <summary>
/// Handles asset-related events to update spatial data.
/// </summary>
public class AssetEventHandler(
    SpatialDb db,
    ILogger<AssetEventHandler> logger)
{
    public async Task HandleAssetCreatedAsync(CloudEvent cloudEvent, AssetCreatedEventData data)
    {
        logger.LogInformation("Handling asset created event for asset {AssetId} at location {LocationId}",
            data.AssetId, data.LocationId);

        // Update location asset count or other spatial metadata
        var location = await db.Locations.FindAsync(data.LocationId);
        if (location != null)
        {
            location.AssetCount++;
            await db.SaveChangesAsync();
        }
    }

    public async Task HandleAssetMovedAsync(CloudEvent cloudEvent, AssetMovedEventData data)
    {
        logger.LogInformation("Handling asset moved event: {AssetId} from {FromLocation} to {ToLocation}",
            data.AssetId, data.FromLocationId, data.ToLocationId);

        // Update asset counts for both locations
        var fromLocation = await db.Locations.FindAsync(data.FromLocationId);
        var toLocation = await db.Locations.FindAsync(data.ToLocationId);

        if (fromLocation != null) fromLocation.AssetCount--;
        if (toLocation != null) toLocation.AssetCount++;

        await db.SaveChangesAsync();
    }
}

// Registration in Program.cs
var eventSubscriber = app.Services.GetRequiredService<IEventSubscriber>();
var assetHandler = app.Services.GetRequiredService<AssetEventHandler>();

eventSubscriber.RegisterHandler<AssetCreatedEventData>(
    "com.acsis.asset.created",
    assetHandler.HandleAssetCreatedAsync);

eventSubscriber.RegisterHandler<AssetMovedEventData>(
    "com.acsis.asset.moved",
    assetHandler.HandleAssetMovedAsync);

8. Aspire Integration

// In AppHost/Program.cs

var eventGrid = builder.AddAzureEventGridTopic("events")
    .WithEndpoint(eventsEndpoint);

var eventsService = builder.AddComponent<EventsService>(ComponentIndex.Events.Metadata)
    .WithReference(eventGrid)
    .WithEnvironment("Events__EventGridUrl", eventGrid.GetOutput("Endpoint"));

var catalogService = builder.AddComponent<CatalogService>(ComponentIndex.Catalog.Metadata)
    .WithReference(eventsService);

var spatialService = builder.AddComponent<SpatialService>(ComponentIndex.Spatial.Metadata)
    .WithReference(eventsService)
    .WithEventGridSubscription(eventGrid, "spatial-events", "/events/webhook");

9. Observability Integration (ADR-026)

// In Acsis.Dynaplex.Engines.Events/EventsLogMessages.cs

public static partial class EventsLogMessages
{
    [LoggerMessage(
        EventId = 7000,
        Level = LogLevel.Information,
        Message = "Publishing event {EventType} with ID {EventId} to Event Grid")]
    public static partial void PublishingEvent(
        this ILogger logger,
        string eventType,
        string eventId);

    [LoggerMessage(
        EventId = 7001,
        Level = LogLevel.Information,
        Message = "Published event {EventType} with ID {EventId} successfully")]
    public static partial void PublishedEvent(
        this ILogger logger,
        string eventType,
        string eventId);

    [LoggerMessage(
        EventId = 7002,
        Level = LogLevel.Warning,
        Message = "Event {EventId} already processed, skipping duplicate")]
    public static partial void SkippingDuplicateEvent(
        this ILogger logger,
        string eventId);

    [LoggerMessage(
        EventId = 7003,
        Level = LogLevel.Information,
        Message = "Processing webhook event {EventType} with ID {EventId}")]
    public static partial void ProcessingWebhookEvent(
        this ILogger logger,
        string eventType,
        string eventId);

    [LoggerMessage(
        EventId = 7004,
        Level = LogLevel.Error,
        Message = "Failed to publish event {EventType} with ID {EventId}")]
    public static partial void FailedToPublishEvent(
        this ILogger logger,
        string eventType,
        string eventId,
        Exception exception);

    [LoggerMessage(
        EventId = 7005,
        Level = LogLevel.Information,
        Message = "Outbox processor published {Count} events")]
    public static partial void OutboxProcessorPublished(
        this ILogger logger,
        int count);
}

10. Configuration

// appsettings.json
{
  "Events": {
    "EventGridUrl": "https://your-topic.eastus-1.eventgrid.azure.net/api/events",
    "SasToken": "",
    "Outbox": {
      "PollingIntervalSeconds": 5,
      "BatchSize": 100,
      "MaxAttempts": 5
    },
    "Idempotency": {
      "RetentionDays": 7
    }
  }
}

Implementation Plan

Phase 1: Foundation (Current Sprint)

  • Add OutboxMessage and ProcessedEvent entities to Events database
  • Create database migration for new tables
  • Implement IOutboxEventPublisher service
  • Implement IEventIdempotencyService

Phase 2: Webhook Infrastructure

  • Add EventGridWebhookApi endpoint
  • Configure Event Grid subscription validation
  • Test webhook delivery end-to-end

Phase 3: Background Processing

  • Implement OutboxProcessor background service
  • Add health checks for outbox queue depth
  • Add metrics for publish latency and failure rates

Phase 4: Integration

  • Update Catalog component to use outbox pattern
  • Add event handlers to Spatial component
  • Configure Event Grid subscriptions in Aspire

Phase 5: Observability

  • Add structured logging (EventsLogMessages)
  • Create dashboard for event metrics
  • Configure alerts for failed events

Phase 6: Cleanup

  • Add background job to clean old ProcessedEvent records
  • Add background job to clean published OutboxMessage records
  • Document event catalog and contracts
  • ADR-019: OpenTelemetry Integration (event tracing)
  • ADR-026: Structured Logging (event logging patterns)
  • ADR-034: GUID Primary Keys (event data types)
  • ADR-042: MQTT Architecture (IoT events, separate from business events)

References


Date: 2025-12-XX
Author: Architecture Team
Reviewers: Development Team