Documentation

fsds/iot-mqtt-message-persistence-and-env-isolation.md

MQTT Message Persistence and Environment Isolation Plan

Summary

Configure MQTT to ensure messages are persisted until written to IoT raw tables, with proper environment isolation between local dev and Azure deployments.

Requirements (From User)

  1. Persistence: Store messages in broker until confirmed written to iot.zb_tag_reads
  2. Shared Consumption: One consumer processes each message (load balancing within environment)
  3. Environment Isolation:
    • Debug builds = local dev = isolated by machine name
    • Release builds = Azure environment = shared within that environment
    • Each developer's testing is isolated from other devs and from Azure

Implementation Stages

Stage 1: Configuration Model Changes

File: engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Configuration/MqttProcessorConfiguration.cs

Add new properties:

/// <summary>
/// Environment identifier for session and subscription isolation.
/// When null/empty, auto-detection is used:
/// - Debug builds: Uses machine name (e.g., "local-daniels-macbook")
/// - Release builds: Must be explicitly configured (e.g., "azure-prod")
/// </summary>
public string? EnvironmentId { get; set; }

/// <summary>
/// MQTT 5 Session Expiry Interval in seconds.
/// How long broker maintains session after disconnect. Default: 3600 (1 hour).
/// Set to 0 for immediate cleanup (MQTT 3.1.1 behavior).
/// </summary>
public uint SessionExpiryIntervalSeconds { get; set; } = 3600;

/// <summary>
/// Instance identifier for persistent sessions within an environment.
/// Required when running multiple replicas. If not set, defaults to machine name.
/// </summary>
public string? InstanceId { get; set; }

Stage 2: Environment Auto-Detection

File: engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/MqttProcessorBase.cs

Add helper method and update constructor:

private static string GetEffectiveEnvironmentId(string? configuredId) {
    // If explicitly configured, use it
    if (!string.IsNullOrWhiteSpace(configuredId)) {
        return configuredId;
    }

    // Check for explicit env var override
    var envVar = Environment.GetEnvironmentVariable("MQTT_ENVIRONMENT_ID");
    if (!string.IsNullOrWhiteSpace(envVar)) {
        return envVar;
    }

    // Auto-detect based on build configuration
    #if DEBUG
    // Debug builds = local dev, isolate by machine name
    return $"local-{Environment.MachineName.ToLowerInvariant()}";
    #else
    // Release builds = production, require explicit config
    throw new InvalidOperationException(
        "MQTT EnvironmentId must be explicitly configured in Release builds. " +
        "Set Mqtt:EnvironmentId in appsettings or MQTT_ENVIRONMENT_ID environment variable.");
    #endif
}

Update constructor to apply environment to ClientId and SharedSubscriptionGroup:

protected MqttProcessorBase(ILogger logger, IOptions<TConfig> config) {
    Logger = logger;
    Config = config.Value;

    _effectiveEnvironmentId = GetEffectiveEnvironmentId(Config.EnvironmentId);

    // Build client ID: {base}-{environment}-{instance}
    var baseClientId = Config.ClientId;
    var instanceId = Config.InstanceId ?? Environment.MachineName.ToLowerInvariant();

    if (Config.AppendUniqueIdToClientId) {
        // Ephemeral session - add timestamp for uniqueness
        var timestamp = DateTime.UtcNow.ToString("yyyyMMdd-HHmmss");
        var uniqueId = Guid.NewGuid().ToString("N")[..6];
        _effectiveClientId = $"{baseClientId}-{_effectiveEnvironmentId}-{timestamp}-{uniqueId}";
    } else {
        // Persistent session - stable ID per instance
        _effectiveClientId = $"{baseClientId}-{_effectiveEnvironmentId}-{instanceId}";
    }
}

Stage 3: MQTT 5 Protocol with Session Expiry

File: engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/MqttProcessorBase.cs

Update StartAsync to use MQTT 5:

var clientOptionsBuilder = new MqttClientOptionsBuilder()
    .WithTcpServer(Config.BrokerHost, Config.BrokerPort)
    .WithClientId(_effectiveClientId)
    .WithCredentials(Config.Username, Config.Password)
    // MQTT 5 with session expiry
    .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
    .WithCleanStart(Config.CleanSession)
    .WithSessionExpiryInterval(Config.SessionExpiryIntervalSeconds)
    .WithTlsOptions(o => { /* existing TLS config */ });

Logger.LogInformation(
    "Connecting with MQTT 5. ClientId={ClientId}, Environment={Env}, CleanStart={Clean}, SessionExpiry={Expiry}s",
    _effectiveClientId, _effectiveEnvironmentId, Config.CleanSession, Config.SessionExpiryIntervalSeconds);

Stage 4: Environment-Isolated Shared Subscriptions

File: engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/MqttProcessorBase.cs

Update OnConnectedAsync to apply environment to shared subscription group:

protected virtual async Task OnConnectedAsync(MqttClientConnectedEventArgs e) {
    var subscribeOptionsBuilder = new MqttClientSubscribeOptionsBuilder();

    // Apply environment prefix to shared subscription group
    var effectiveGroup = Config.SharedSubscriptionGroup;
    if (!string.IsNullOrWhiteSpace(effectiveGroup)) {
        effectiveGroup = $"{effectiveGroup}-{_effectiveEnvironmentId}";
    }

    foreach (var topic in Config.SubscribeTopics) {
        var effectiveTopic = string.IsNullOrWhiteSpace(effectiveGroup)
            ? topic
            : $"$share/{effectiveGroup}/{topic}";

        subscribeOptionsBuilder.WithTopicFilter(f =>
            f.WithTopic(effectiveTopic)
             .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce));

        Logger.LogInformation("Subscribing to: {Topic}", effectiveTopic);
    }
    // ... rest of method
}

Stage 5: Manual ACK After Raw Capture

File: engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/ZebraRfidProcessor.cs

Modify message handling to ACK only after raw capture succeeds:

protected override async Task OnMessageReceivedAsync(
    string topic,
    string payload,
    MqttApplicationMessageReceivedEventArgs eventArgs)
{
    // Disable auto-ACK - we'll ACK manually after raw capture
    eventArgs.AutoAcknowledge = false;

    var deviceId = ExtractDeviceId(topic);
    bool shouldAck = true;

    try {
        if (topic.EndsWith("/reads")) {
            shouldAck = await HandleTagReadsWithManualAckAsync(deviceId, payload);
        }
        // ... other topic handlers
    } catch (Exception ex) {
        Logger.LogError(ex, "Error processing message from {Topic}", topic);
        shouldAck = false; // Don't ACK on exception - allow redelivery
    }

    if (shouldAck) {
        await eventArgs.AcknowledgeAsync(CancellationToken.None);
        Logger.LogDebug("ACKed message from {Topic}", topic);
    } else {
        Logger.LogWarning("NOT ACKing message from {Topic} - will be redelivered", topic);
    }
}

private async Task<bool> HandleTagReadsWithManualAckAsync(string deviceId, string payload) {
    var reads = JsonSerializer.Deserialize<List<BasicReadEvent>>(payload, _jsonOptions);
    if (reads == null || reads.Count == 0) return true;

    // Step 1: Raw capture - this MUST succeed for ACK
    bool captureSucceeded = true;
    if (Config.RawDataCapture.Enabled && Config.RawDataCapture.CaptureTagReads) {
        try {
            await CaptureTagReadsAsync(deviceId, reads, payload);
        } catch (Exception ex) {
            Logger.LogError(ex, "Raw capture failed for {DeviceId} - NOT ACKing", deviceId);
            captureSucceeded = false;
        }
    }

    // Step 2: Business logic - failures don't affect ACK
    // (TagReadReplayService will catch unprocessed records)
    if (captureSucceeded) {
        try {
            await OnTagReadsAsync(deviceId, reads);
        } catch (Exception ex) {
            Logger.LogError(ex, "Business logic failed for {DeviceId} - replay will handle", deviceId);
            // Don't fail ACK - raw capture succeeded, replay will catch it
        }
    }

    return captureSucceeded;
}

Stage 6: Update Configuration Files

File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/appsettings.json

"Mqtt": {
    "Enabled": true,
    "BrokerHost": "i5122f12.ala.us-east-1.emqxsl.com",
    "BrokerPort": 8883,
    "ClientId": "bbu-processor",
    "SubscribeTopics": [
        "zebra/fx/+/reads",
        "zebra/fx/+/events",
        "zebra/fx/+/management/responses",
        "zebra/fx/+/control/responses"
    ],
    "SharedSubscriptionGroup": "bbu-processors",
    "Username": "dplx-bbu-rfid",
    "Password": "honesty-avengers-PETE-dotty",
    "AutoReconnect": true,
    "ReconnectDelaySeconds": 5,
    "CleanSession": false,
    "AppendUniqueIdToClientId": false,
    "SessionExpiryIntervalSeconds": 3600,
    "EnvironmentId": null,
    "InstanceId": null,
    "RawDataCapture": {
        "Enabled": true,
        "RetentionDays": 30,
        "CaptureTagReads": true,
        "CaptureManagementEvents": true,
        "CaptureControlResponses": true,
        "CaptureManagementResponses": true
    }
}

Files to Modify

  1. engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Configuration/MqttProcessorConfiguration.cs

    • Add: EnvironmentId, SessionExpiryIntervalSeconds, InstanceId properties
  2. engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/MqttProcessorBase.cs

    • Add: GetEffectiveEnvironmentId() helper
    • Modify: Constructor for environment-aware ClientId
    • Modify: StartAsync() for MQTT 5 protocol
    • Modify: OnConnectedAsync() for environment-isolated shared subscriptions
  3. engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/ZebraRfidProcessor.cs

    • Modify: OnMessageReceivedAsync() for manual ACK
    • Add: HandleTagReadsWithManualAckAsync() method
  4. engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/appsettings.json

    • Update: MQTT config with new defaults
  5. engines/bbu/resources/docs/guides/configuration-reference.md

    • Document: New configuration options

Behavior Summary

Build EnvironmentId SharedSubscriptionGroup Effect
Debug (local) local-{machine} bbu-processors-local-{machine} Each dev isolated
Release (Azure) Must be set bbu-processors-{envId} Shared within env
Scenario ACK Sent? Redelivery?
Raw capture succeeds, business logic succeeds Yes No
Raw capture succeeds, business logic fails Yes No (replay handles)
Raw capture fails No Yes (from broker)
Exception during processing No Yes (from broker)

Testing Checklist

  • Local dev instance gets unique subscription group based on machine name
  • Two local devs don't share messages (different machine names)
  • Release build fails startup if EnvironmentId not configured
  • Messages are persisted during service restart (CleanSession=false works)
  • Raw capture failure prevents ACK and causes redelivery
  • Business logic failure doesn't prevent ACK
  • TagReadReplayService catches business logic failures