Documentation
fsds/iot-mqtt-message-persistence-and-env-isolation.md
MQTT Message Persistence and Environment Isolation Plan
Summary
Configure MQTT to ensure messages are persisted until written to IoT raw tables, with proper environment isolation between local dev and Azure deployments.
Requirements (From User)
- Persistence: Store messages in broker until confirmed written to
iot.zb_tag_reads - Shared Consumption: One consumer processes each message (load balancing within environment)
- Environment Isolation:
- Debug builds = local dev = isolated by machine name
- Release builds = Azure environment = shared within that environment
- Each developer's testing is isolated from other devs and from Azure
Implementation Stages
Stage 1: Configuration Model Changes
File: engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Configuration/MqttProcessorConfiguration.cs
Add new properties:
/// <summary>
/// Environment identifier for session and subscription isolation.
/// When null/empty, auto-detection is used:
/// - Debug builds: Uses machine name (e.g., "local-daniels-macbook")
/// - Release builds: Must be explicitly configured (e.g., "azure-prod")
/// </summary>
public string? EnvironmentId { get; set; }
/// <summary>
/// MQTT 5 Session Expiry Interval in seconds.
/// How long broker maintains session after disconnect. Default: 3600 (1 hour).
/// Set to 0 for immediate cleanup (MQTT 3.1.1 behavior).
/// </summary>
public uint SessionExpiryIntervalSeconds { get; set; } = 3600;
/// <summary>
/// Instance identifier for persistent sessions within an environment.
/// Required when running multiple replicas. If not set, defaults to machine name.
/// </summary>
public string? InstanceId { get; set; }
Stage 2: Environment Auto-Detection
File: engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/MqttProcessorBase.cs
Add helper method and update constructor:
private static string GetEffectiveEnvironmentId(string? configuredId) {
// If explicitly configured, use it
if (!string.IsNullOrWhiteSpace(configuredId)) {
return configuredId;
}
// Check for explicit env var override
var envVar = Environment.GetEnvironmentVariable("MQTT_ENVIRONMENT_ID");
if (!string.IsNullOrWhiteSpace(envVar)) {
return envVar;
}
// Auto-detect based on build configuration
#if DEBUG
// Debug builds = local dev, isolate by machine name
return $"local-{Environment.MachineName.ToLowerInvariant()}";
#else
// Release builds = production, require explicit config
throw new InvalidOperationException(
"MQTT EnvironmentId must be explicitly configured in Release builds. " +
"Set Mqtt:EnvironmentId in appsettings or MQTT_ENVIRONMENT_ID environment variable.");
#endif
}
Update constructor to apply environment to ClientId and SharedSubscriptionGroup:
protected MqttProcessorBase(ILogger logger, IOptions<TConfig> config) {
Logger = logger;
Config = config.Value;
_effectiveEnvironmentId = GetEffectiveEnvironmentId(Config.EnvironmentId);
// Build client ID: {base}-{environment}-{instance}
var baseClientId = Config.ClientId;
var instanceId = Config.InstanceId ?? Environment.MachineName.ToLowerInvariant();
if (Config.AppendUniqueIdToClientId) {
// Ephemeral session - add timestamp for uniqueness
var timestamp = DateTime.UtcNow.ToString("yyyyMMdd-HHmmss");
var uniqueId = Guid.NewGuid().ToString("N")[..6];
_effectiveClientId = $"{baseClientId}-{_effectiveEnvironmentId}-{timestamp}-{uniqueId}";
} else {
// Persistent session - stable ID per instance
_effectiveClientId = $"{baseClientId}-{_effectiveEnvironmentId}-{instanceId}";
}
}
Stage 3: MQTT 5 Protocol with Session Expiry
File: engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/MqttProcessorBase.cs
Update StartAsync to use MQTT 5:
var clientOptionsBuilder = new MqttClientOptionsBuilder()
.WithTcpServer(Config.BrokerHost, Config.BrokerPort)
.WithClientId(_effectiveClientId)
.WithCredentials(Config.Username, Config.Password)
// MQTT 5 with session expiry
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.WithCleanStart(Config.CleanSession)
.WithSessionExpiryInterval(Config.SessionExpiryIntervalSeconds)
.WithTlsOptions(o => { /* existing TLS config */ });
Logger.LogInformation(
"Connecting with MQTT 5. ClientId={ClientId}, Environment={Env}, CleanStart={Clean}, SessionExpiry={Expiry}s",
_effectiveClientId, _effectiveEnvironmentId, Config.CleanSession, Config.SessionExpiryIntervalSeconds);
Stage 4: Environment-Isolated Shared Subscriptions
File: engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/MqttProcessorBase.cs
Update OnConnectedAsync to apply environment to shared subscription group:
protected virtual async Task OnConnectedAsync(MqttClientConnectedEventArgs e) {
var subscribeOptionsBuilder = new MqttClientSubscribeOptionsBuilder();
// Apply environment prefix to shared subscription group
var effectiveGroup = Config.SharedSubscriptionGroup;
if (!string.IsNullOrWhiteSpace(effectiveGroup)) {
effectiveGroup = $"{effectiveGroup}-{_effectiveEnvironmentId}";
}
foreach (var topic in Config.SubscribeTopics) {
var effectiveTopic = string.IsNullOrWhiteSpace(effectiveGroup)
? topic
: $"$share/{effectiveGroup}/{topic}";
subscribeOptionsBuilder.WithTopicFilter(f =>
f.WithTopic(effectiveTopic)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce));
Logger.LogInformation("Subscribing to: {Topic}", effectiveTopic);
}
// ... rest of method
}
Stage 5: Manual ACK After Raw Capture
File: engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/ZebraRfidProcessor.cs
Modify message handling to ACK only after raw capture succeeds:
protected override async Task OnMessageReceivedAsync(
string topic,
string payload,
MqttApplicationMessageReceivedEventArgs eventArgs)
{
// Disable auto-ACK - we'll ACK manually after raw capture
eventArgs.AutoAcknowledge = false;
var deviceId = ExtractDeviceId(topic);
bool shouldAck = true;
try {
if (topic.EndsWith("/reads")) {
shouldAck = await HandleTagReadsWithManualAckAsync(deviceId, payload);
}
// ... other topic handlers
} catch (Exception ex) {
Logger.LogError(ex, "Error processing message from {Topic}", topic);
shouldAck = false; // Don't ACK on exception - allow redelivery
}
if (shouldAck) {
await eventArgs.AcknowledgeAsync(CancellationToken.None);
Logger.LogDebug("ACKed message from {Topic}", topic);
} else {
Logger.LogWarning("NOT ACKing message from {Topic} - will be redelivered", topic);
}
}
private async Task<bool> HandleTagReadsWithManualAckAsync(string deviceId, string payload) {
var reads = JsonSerializer.Deserialize<List<BasicReadEvent>>(payload, _jsonOptions);
if (reads == null || reads.Count == 0) return true;
// Step 1: Raw capture - this MUST succeed for ACK
bool captureSucceeded = true;
if (Config.RawDataCapture.Enabled && Config.RawDataCapture.CaptureTagReads) {
try {
await CaptureTagReadsAsync(deviceId, reads, payload);
} catch (Exception ex) {
Logger.LogError(ex, "Raw capture failed for {DeviceId} - NOT ACKing", deviceId);
captureSucceeded = false;
}
}
// Step 2: Business logic - failures don't affect ACK
// (TagReadReplayService will catch unprocessed records)
if (captureSucceeded) {
try {
await OnTagReadsAsync(deviceId, reads);
} catch (Exception ex) {
Logger.LogError(ex, "Business logic failed for {DeviceId} - replay will handle", deviceId);
// Don't fail ACK - raw capture succeeded, replay will catch it
}
}
return captureSucceeded;
}
Stage 6: Update Configuration Files
File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/appsettings.json
"Mqtt": {
"Enabled": true,
"BrokerHost": "i5122f12.ala.us-east-1.emqxsl.com",
"BrokerPort": 8883,
"ClientId": "bbu-processor",
"SubscribeTopics": [
"zebra/fx/+/reads",
"zebra/fx/+/events",
"zebra/fx/+/management/responses",
"zebra/fx/+/control/responses"
],
"SharedSubscriptionGroup": "bbu-processors",
"Username": "dplx-bbu-rfid",
"Password": "honesty-avengers-PETE-dotty",
"AutoReconnect": true,
"ReconnectDelaySeconds": 5,
"CleanSession": false,
"AppendUniqueIdToClientId": false,
"SessionExpiryIntervalSeconds": 3600,
"EnvironmentId": null,
"InstanceId": null,
"RawDataCapture": {
"Enabled": true,
"RetentionDays": 30,
"CaptureTagReads": true,
"CaptureManagementEvents": true,
"CaptureControlResponses": true,
"CaptureManagementResponses": true
}
}
Files to Modify
engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Configuration/MqttProcessorConfiguration.cs- Add: EnvironmentId, SessionExpiryIntervalSeconds, InstanceId properties
engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/MqttProcessorBase.cs- Add: GetEffectiveEnvironmentId() helper
- Modify: Constructor for environment-aware ClientId
- Modify: StartAsync() for MQTT 5 protocol
- Modify: OnConnectedAsync() for environment-isolated shared subscriptions
engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/ZebraRfidProcessor.cs- Modify: OnMessageReceivedAsync() for manual ACK
- Add: HandleTagReadsWithManualAckAsync() method
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/appsettings.json- Update: MQTT config with new defaults
engines/bbu/resources/docs/guides/configuration-reference.md- Document: New configuration options
Behavior Summary
| Build | EnvironmentId | SharedSubscriptionGroup | Effect |
|---|---|---|---|
| Debug (local) | local-{machine} |
bbu-processors-local-{machine} |
Each dev isolated |
| Release (Azure) | Must be set | bbu-processors-{envId} |
Shared within env |
| Scenario | ACK Sent? | Redelivery? |
|---|---|---|
| Raw capture succeeds, business logic succeeds | Yes | No |
| Raw capture succeeds, business logic fails | Yes | No (replay handles) |
| Raw capture fails | No | Yes (from broker) |
| Exception during processing | No | Yes (from broker) |
Testing Checklist
- Local dev instance gets unique subscription group based on machine name
- Two local devs don't share messages (different machine names)
- Release build fails startup if EnvironmentId not configured
- Messages are persisted during service restart (CleanSession=false works)
- Raw capture failure prevents ACK and causes redelivery
- Business logic failure doesn't prevent ACK
- TagReadReplayService catches business logic failures