Documentation

adrs/042-mqtt-message-delivery-architecture.md

ADR-042: MQTT Message Delivery Architecture for IoT Devices

Status

Accepted

Context

Dynaplex systems deploy to multiple environments (local development, Azure dev, Azure staging, Azure production) that all need to receive IoT device data from shared RFID readers and other sensors. The challenge is designing an MQTT message delivery architecture that satisfies these requirements:

Requirements

  1. Cross-Environment Isolation: Each deployment environment must receive ALL messages independently. A message delivered to Azure dev must also be delivered to Azure prod and to any local developer's machine. Environments cannot "steal" messages from each other.

  2. Within-Environment Load Balancing: In scaled deployments (Kubernetes, Azure Container Apps with multiple replicas), messages should be distributed across instances. Each message should be processed exactly once per environment, not duplicated to every pod.

  3. Message Persistence During Disconnects: When a consumer disconnects (restart, crash, network issue), the broker must queue messages and deliver them when the consumer reconnects. No message loss is acceptable.

  4. Future-Ready for Scale: The architecture must support horizontal scaling within environments without code changes.

Visual Representation

                      ┌─────────────────┐
   RFID Readers ─────►│  EMQX Broker    │
   (Zebra FX9600)     │  (Cloud-hosted) │
                      └────────┬────────┘
                               │
           ┌───────────────────┼───────────────────┐
           │                   │                   │
           ▼                   ▼                   ▼
  ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
  │  Azure Prod     │ │  Azure Dev      │ │  Local Dev      │
  │  Environment    │ │  Environment    │ │  (Dave's Mac)   │
  └────────┬────────┘ └────────┬────────┘ └────────┬────────┘
           │                   │                   │
     ┌─────┴─────┐       ┌─────┴─────┐            │
     │           │       │           │            │
   Pod 1      Pod 2    Pod 1      Pod 2      Single Instance
     │           │       │           │            │
     ▼           ▼       ▼           ▼            ▼
  (33% msgs) (33% msgs) (50% msgs) (50% msgs) (100% msgs)

Each environment receives 100% of messages, but within an environment, messages are distributed across instances.

Anti-Pattern: Cross-Environment Message Theft

Without proper isolation, this catastrophic scenario can occur:

# WRONG: All environments share same subscription group
$share/bbu-processors/zebra/fx/+/reads

Azure Prod Pod 1: receives message A
Local Dev:        receives message B
Azure Prod Pod 2: receives message C
Azure Dev:        receives message D
...

# Result: Each environment only sees ~25% of messages!

Decision

Implement environment-scoped MQTT 5 shared subscriptions with persistent sessions.

Key Design Decisions

1. Environment-Scoped Shared Subscription Groups

The SharedSubscriptionGroup configuration is automatically suffixed with EnvironmentId:

// In MqttProcessorBase.OnConnectedAsync()
var effectiveGroup = Config.SharedSubscriptionGroup;
if (!string.IsNullOrWhiteSpace(effectiveGroup)) {
    effectiveGroup = $"{effectiveGroup}-{_effectiveEnvironmentId}";
}

// Subscription becomes: $share/{group}-{env}/{topic}
var effectiveTopic = $"$share/{effectiveGroup}/{topic}";

Result: Each environment has a unique shared subscription group:

  • $share/bbu-processors-azure-prod/zebra/fx/+/reads
  • $share/bbu-processors-azure-dev/zebra/fx/+/reads
  • $share/bbu-processors-local-daves-macbook/zebra/fx/+/reads

EMQX delivers ALL messages to each group, but load-balances within each group.

2. Environment ID Detection

Environment ID is determined hierarchically:

private static string GetEffectiveEnvironmentId(string? configuredId) {
    // 1. Explicit configuration (highest priority)
    if (!string.IsNullOrWhiteSpace(configuredId))
        return configuredId;

    // 2. Environment variable override
    var envVar = Environment.GetEnvironmentVariable("MQTT_ENVIRONMENT_ID");
    if (!string.IsNullOrWhiteSpace(envVar))
        return envVar;

    // 3. Build-configuration-based auto-detection
    #if DEBUG
    // Local development: isolate by machine name
    return $"local-{Environment.MachineName.ToLowerInvariant()}";
    #else
    // Production builds: MUST be explicitly configured
    throw new InvalidOperationException(
        "MQTT EnvironmentId must be explicitly configured in Release builds.");
    #endif
}

For Azure deployments: Use the project's EnvironmentShortName (e.g., elbbudev, elbbuqa, elbbuprod) as the MQTT EnvironmentId.

3. Stable Client IDs for Session Persistence

Client IDs must be stable across restarts for the broker to recognize the same consumer:

// Persistent session (default, recommended)
_effectiveClientId = $"{baseClientId}-{environmentId}-{instanceId}";
// Example: "bbu-processor-azure-prod-pod-1"

// Ephemeral session (for testing only)
_effectiveClientId = $"{baseClientId}-{environmentId}-{timestamp}-{uniqueId}";
// Example: "bbu-processor-azure-prod-20241205-a1b2c3"

Configuration:

{
  "Mqtt": {
    "AppendUniqueIdToClientId": false,  // false = persistent sessions
    "InstanceId": null  // Defaults to machine name; set in K8s to pod name
  }
}

4. MQTT 5 Session Persistence

The broker maintains session state when CleanSession=false:

.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.WithCleanStart(Config.CleanSession)           // false = persistent
.WithSessionExpiryInterval(Config.SessionExpiryIntervalSeconds)  // 3600 = 1 hour

Behavior:

  • Client disconnects unexpectedly
  • Broker continues queuing messages for that client ID (up to 1 hour)
  • Client reconnects with same client ID
  • Broker delivers all queued messages

5. QoS Level 2 (Exactly Once)

All subscriptions use QoS 2 for guaranteed delivery:

subscribeOptionsBuilder.WithTopicFilter(f =>
    f.WithTopic(effectiveTopic)
     .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
);

Configuration Reference

Complete MqttProcessorConfiguration options:

Property Default Purpose
BrokerHost (required) EMQX broker hostname
BrokerPort 8883 TLS port
ClientId "mqtt-processor" Base client ID (environment/instance appended)
SubscribeTopics [] Array of topic filters to subscribe to
SharedSubscriptionGroup null Group name for load balancing (environment auto-appended)
EnvironmentId null Environment identifier (auto-detected in DEBUG)
InstanceId null Instance identifier (defaults to machine name)
CleanSession false false = persistent sessions with message queueing
SessionExpiryIntervalSeconds 3600 How long broker retains session after disconnect
AppendUniqueIdToClientId false false = stable IDs for persistence
AutoReconnect true Automatically reconnect on disconnect
ReconnectDelaySeconds 5 Delay between reconnection attempts

Example Configuration

{
  "Mqtt": {
    "ZebraRfid": {
      "Enabled": true,
      "BrokerHost": "i5122f12.ala.us-east-1.emqxsl.com",
      "BrokerPort": 8883,
      "ClientId": "bbu-processor",
      "SubscribeTopics": [
        "zebra/fx/+/reads",
        "zebra/fx/+/events"
      ],
      "SharedSubscriptionGroup": "bbu-processors",
      "Username": "dplx-bbu-rfid",
      "Password": "...",
      "AutoReconnect": true,
      "ReconnectDelaySeconds": 5,
      "CleanSession": false,
      "AppendUniqueIdToClientId": false,
      "SessionExpiryIntervalSeconds": 3600,
      "EnvironmentId": null,
      "InstanceId": null
    }
  }
}

Azure Deployment Configuration

For Azure Container Apps or Kubernetes, configure via environment variables or appsettings overrides:

// appsettings.Production.json
{
  "Mqtt": {
    "ZebraRfid": {
      "EnvironmentId": "elbbuprod",  // Matches project's EnvironmentShortName
      "InstanceId": "${CONTAINER_APP_REPLICA_NAME}"  // Azure Container Apps
    }
  }
}

Or via Aspire:

builder.AddComponent<BbuService>(ComponentIndex.Bbu.Metadata)
    .WithEnvironment("Mqtt__ZebraRfid__EnvironmentId", options.EnvironmentShortName)
    .WithEnvironment("Mqtt__ZebraRfid__InstanceId", "${CONTAINER_APP_REPLICA_NAME}");

Consequences

Positive

  • Complete Environment Isolation: Each environment receives 100% of messages
  • No Message Loss: Session persistence queues messages during disconnects
  • Horizontal Scalability: Shared subscriptions distribute load within environments
  • Automatic Local Dev Isolation: DEBUG builds auto-isolate by machine name
  • Fail-Safe Production: Release builds fail if environment not explicitly configured
  • Standard MQTT 5: No proprietary broker features required

Negative

  • EMQX Dependency: Requires MQTT 5 broker with shared subscription support (EMQX, HiveMQ, etc.)
  • Session Limits: Broker has limits on queued messages per session
  • Client ID Coordination: In K8s, must ensure stable instance IDs across restarts

Neutral

  • Broker-Side Configuration: May need to tune EMQX session limits for high-volume deployments
  • Monitoring Complexity: Need to track multiple subscription groups in broker metrics

Verification

1. Check Logs on Startup

Look for these log messages:

Connecting to MQTT broker: i5122f12.ala.us-east-1.emqxsl.com:8883
  (Environment: local-daves-macbook, ClientId: bbu-processor-local-daves-macbook-daves-macbook,
   CleanStart: False, SessionExpiry: 3600s)

Subscribing to topic: $share/bbu-processors-local-daves-macbook/zebra/fx/+/reads
  (shared group: bbu-processors-local-daves-macbook)

Verify:

  • Environment matches expected value
  • CleanStart: False for persistent sessions
  • Topic includes $share/{group}-{env}/ prefix

2. Query EMQX Broker API

# List connected clients
curl -u "admin:password" "https://broker:8443/api/v5/clients"

# Check specific client session
curl -u "admin:password" "https://broker:8443/api/v5/clients/bbu-processor-azure-prod-pod-1"

# List all subscriptions
curl -u "admin:password" "https://broker:8443/api/v5/subscriptions"

Look for:

  • clean_start: false
  • session_expiry_interval: 3600
  • Subscription topics with environment-specific shared groups

3. Controlled Disconnect Test

  1. Start processor, observe successful connection and subscription
  2. Stop processor (Ctrl+C or stop in Aspire)
  3. Generate RFID reads (wave tags in front of reader)
  4. Wait 1-2 minutes
  5. Restart processor
  6. Verify all reads appear in database with similar created_at_utc timestamps (batch arrival)

4. Database Batch Analysis

-- Look for evidence of queued message delivery (batch arrivals)
SELECT
    DATE_TRUNC('second', created_at_utc) as batch_time,
    COUNT(*) as batch_size,
    MIN(event_timestamp_utc) as earliest_event,
    MAX(event_timestamp_utc) as latest_event,
    EXTRACT(EPOCH FROM (MAX(event_timestamp_utc) - MIN(event_timestamp_utc))) as event_span_seconds
FROM bbu.tag_read_processings
GROUP BY DATE_TRUNC('second', created_at_utc)
HAVING COUNT(*) > 10
ORDER BY batch_time DESC
LIMIT 20;

If you see batches where event_span_seconds is large (events spread over minutes) but batch_time is the same second, that's evidence of queued messages being delivered after reconnect.

Implementation Files

  • engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Configuration/MqttProcessorConfiguration.cs - Configuration class
  • engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/MqttProcessorBase.cs - Base processor with environment isolation logic
  • engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/ZebraRfidProcessor.cs - Example implementation
  • projects/bbu-rfid/src/Acsis.Dynaplex.Projects.BbuRfid/BbuRfidOptions.cs - Project options with EnvironmentShortName

Date: 2025-12-05
Author: Architecture Team
Reviewers: Development Team