Documentation
adrs/042-mqtt-message-delivery-architecture.md
ADR-042: MQTT Message Delivery Architecture for IoT Devices
Status
Accepted
Context
Dynaplex systems deploy to multiple environments (local development, Azure dev, Azure staging, Azure production) that all need to receive IoT device data from shared RFID readers and other sensors. The challenge is designing an MQTT message delivery architecture that satisfies these requirements:
Requirements
Cross-Environment Isolation: Each deployment environment must receive ALL messages independently. A message delivered to Azure dev must also be delivered to Azure prod and to any local developer's machine. Environments cannot "steal" messages from each other.
Within-Environment Load Balancing: In scaled deployments (Kubernetes, Azure Container Apps with multiple replicas), messages should be distributed across instances. Each message should be processed exactly once per environment, not duplicated to every pod.
Message Persistence During Disconnects: When a consumer disconnects (restart, crash, network issue), the broker must queue messages and deliver them when the consumer reconnects. No message loss is acceptable.
Future-Ready for Scale: The architecture must support horizontal scaling within environments without code changes.
Visual Representation
┌─────────────────┐
RFID Readers ─────►│ EMQX Broker │
(Zebra FX9600) │ (Cloud-hosted) │
└────────┬────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Azure Prod │ │ Azure Dev │ │ Local Dev │
│ Environment │ │ Environment │ │ (Dave's Mac) │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │
┌─────┴─────┐ ┌─────┴─────┐ │
│ │ │ │ │
Pod 1 Pod 2 Pod 1 Pod 2 Single Instance
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
(33% msgs) (33% msgs) (50% msgs) (50% msgs) (100% msgs)
Each environment receives 100% of messages, but within an environment, messages are distributed across instances.
Anti-Pattern: Cross-Environment Message Theft
Without proper isolation, this catastrophic scenario can occur:
# WRONG: All environments share same subscription group
$share/bbu-processors/zebra/fx/+/reads
Azure Prod Pod 1: receives message A
Local Dev: receives message B
Azure Prod Pod 2: receives message C
Azure Dev: receives message D
...
# Result: Each environment only sees ~25% of messages!
Decision
Implement environment-scoped MQTT 5 shared subscriptions with persistent sessions.
Key Design Decisions
1. Environment-Scoped Shared Subscription Groups
The SharedSubscriptionGroup configuration is automatically suffixed with EnvironmentId:
// In MqttProcessorBase.OnConnectedAsync()
var effectiveGroup = Config.SharedSubscriptionGroup;
if (!string.IsNullOrWhiteSpace(effectiveGroup)) {
effectiveGroup = $"{effectiveGroup}-{_effectiveEnvironmentId}";
}
// Subscription becomes: $share/{group}-{env}/{topic}
var effectiveTopic = $"$share/{effectiveGroup}/{topic}";
Result: Each environment has a unique shared subscription group:
$share/bbu-processors-azure-prod/zebra/fx/+/reads$share/bbu-processors-azure-dev/zebra/fx/+/reads$share/bbu-processors-local-daves-macbook/zebra/fx/+/reads
EMQX delivers ALL messages to each group, but load-balances within each group.
2. Environment ID Detection
Environment ID is determined hierarchically:
private static string GetEffectiveEnvironmentId(string? configuredId) {
// 1. Explicit configuration (highest priority)
if (!string.IsNullOrWhiteSpace(configuredId))
return configuredId;
// 2. Environment variable override
var envVar = Environment.GetEnvironmentVariable("MQTT_ENVIRONMENT_ID");
if (!string.IsNullOrWhiteSpace(envVar))
return envVar;
// 3. Build-configuration-based auto-detection
#if DEBUG
// Local development: isolate by machine name
return $"local-{Environment.MachineName.ToLowerInvariant()}";
#else
// Production builds: MUST be explicitly configured
throw new InvalidOperationException(
"MQTT EnvironmentId must be explicitly configured in Release builds.");
#endif
}
For Azure deployments: Use the project's EnvironmentShortName (e.g., elbbudev, elbbuqa, elbbuprod) as the MQTT EnvironmentId.
3. Stable Client IDs for Session Persistence
Client IDs must be stable across restarts for the broker to recognize the same consumer:
// Persistent session (default, recommended)
_effectiveClientId = $"{baseClientId}-{environmentId}-{instanceId}";
// Example: "bbu-processor-azure-prod-pod-1"
// Ephemeral session (for testing only)
_effectiveClientId = $"{baseClientId}-{environmentId}-{timestamp}-{uniqueId}";
// Example: "bbu-processor-azure-prod-20241205-a1b2c3"
Configuration:
{
"Mqtt": {
"AppendUniqueIdToClientId": false, // false = persistent sessions
"InstanceId": null // Defaults to machine name; set in K8s to pod name
}
}
4. MQTT 5 Session Persistence
The broker maintains session state when CleanSession=false:
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V500)
.WithCleanStart(Config.CleanSession) // false = persistent
.WithSessionExpiryInterval(Config.SessionExpiryIntervalSeconds) // 3600 = 1 hour
Behavior:
- Client disconnects unexpectedly
- Broker continues queuing messages for that client ID (up to 1 hour)
- Client reconnects with same client ID
- Broker delivers all queued messages
5. QoS Level 2 (Exactly Once)
All subscriptions use QoS 2 for guaranteed delivery:
subscribeOptionsBuilder.WithTopicFilter(f =>
f.WithTopic(effectiveTopic)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
);
Configuration Reference
Complete MqttProcessorConfiguration options:
| Property | Default | Purpose |
|---|---|---|
BrokerHost |
(required) | EMQX broker hostname |
BrokerPort |
8883 |
TLS port |
ClientId |
"mqtt-processor" |
Base client ID (environment/instance appended) |
SubscribeTopics |
[] |
Array of topic filters to subscribe to |
SharedSubscriptionGroup |
null |
Group name for load balancing (environment auto-appended) |
EnvironmentId |
null |
Environment identifier (auto-detected in DEBUG) |
InstanceId |
null |
Instance identifier (defaults to machine name) |
CleanSession |
false |
false = persistent sessions with message queueing |
SessionExpiryIntervalSeconds |
3600 |
How long broker retains session after disconnect |
AppendUniqueIdToClientId |
false |
false = stable IDs for persistence |
AutoReconnect |
true |
Automatically reconnect on disconnect |
ReconnectDelaySeconds |
5 |
Delay between reconnection attempts |
Example Configuration
{
"Mqtt": {
"ZebraRfid": {
"Enabled": true,
"BrokerHost": "i5122f12.ala.us-east-1.emqxsl.com",
"BrokerPort": 8883,
"ClientId": "bbu-processor",
"SubscribeTopics": [
"zebra/fx/+/reads",
"zebra/fx/+/events"
],
"SharedSubscriptionGroup": "bbu-processors",
"Username": "dplx-bbu-rfid",
"Password": "...",
"AutoReconnect": true,
"ReconnectDelaySeconds": 5,
"CleanSession": false,
"AppendUniqueIdToClientId": false,
"SessionExpiryIntervalSeconds": 3600,
"EnvironmentId": null,
"InstanceId": null
}
}
}
Azure Deployment Configuration
For Azure Container Apps or Kubernetes, configure via environment variables or appsettings overrides:
// appsettings.Production.json
{
"Mqtt": {
"ZebraRfid": {
"EnvironmentId": "elbbuprod", // Matches project's EnvironmentShortName
"InstanceId": "${CONTAINER_APP_REPLICA_NAME}" // Azure Container Apps
}
}
}
Or via Aspire:
builder.AddComponent<BbuService>(ComponentIndex.Bbu.Metadata)
.WithEnvironment("Mqtt__ZebraRfid__EnvironmentId", options.EnvironmentShortName)
.WithEnvironment("Mqtt__ZebraRfid__InstanceId", "${CONTAINER_APP_REPLICA_NAME}");
Consequences
Positive
- Complete Environment Isolation: Each environment receives 100% of messages
- No Message Loss: Session persistence queues messages during disconnects
- Horizontal Scalability: Shared subscriptions distribute load within environments
- Automatic Local Dev Isolation: DEBUG builds auto-isolate by machine name
- Fail-Safe Production: Release builds fail if environment not explicitly configured
- Standard MQTT 5: No proprietary broker features required
Negative
- EMQX Dependency: Requires MQTT 5 broker with shared subscription support (EMQX, HiveMQ, etc.)
- Session Limits: Broker has limits on queued messages per session
- Client ID Coordination: In K8s, must ensure stable instance IDs across restarts
Neutral
- Broker-Side Configuration: May need to tune EMQX session limits for high-volume deployments
- Monitoring Complexity: Need to track multiple subscription groups in broker metrics
Verification
1. Check Logs on Startup
Look for these log messages:
Connecting to MQTT broker: i5122f12.ala.us-east-1.emqxsl.com:8883
(Environment: local-daves-macbook, ClientId: bbu-processor-local-daves-macbook-daves-macbook,
CleanStart: False, SessionExpiry: 3600s)
Subscribing to topic: $share/bbu-processors-local-daves-macbook/zebra/fx/+/reads
(shared group: bbu-processors-local-daves-macbook)
Verify:
Environmentmatches expected valueCleanStart: Falsefor persistent sessions- Topic includes
$share/{group}-{env}/prefix
2. Query EMQX Broker API
# List connected clients
curl -u "admin:password" "https://broker:8443/api/v5/clients"
# Check specific client session
curl -u "admin:password" "https://broker:8443/api/v5/clients/bbu-processor-azure-prod-pod-1"
# List all subscriptions
curl -u "admin:password" "https://broker:8443/api/v5/subscriptions"
Look for:
clean_start: falsesession_expiry_interval: 3600- Subscription topics with environment-specific shared groups
3. Controlled Disconnect Test
- Start processor, observe successful connection and subscription
- Stop processor (Ctrl+C or stop in Aspire)
- Generate RFID reads (wave tags in front of reader)
- Wait 1-2 minutes
- Restart processor
- Verify all reads appear in database with similar
created_at_utctimestamps (batch arrival)
4. Database Batch Analysis
-- Look for evidence of queued message delivery (batch arrivals)
SELECT
DATE_TRUNC('second', created_at_utc) as batch_time,
COUNT(*) as batch_size,
MIN(event_timestamp_utc) as earliest_event,
MAX(event_timestamp_utc) as latest_event,
EXTRACT(EPOCH FROM (MAX(event_timestamp_utc) - MIN(event_timestamp_utc))) as event_span_seconds
FROM bbu.tag_read_processings
GROUP BY DATE_TRUNC('second', created_at_utc)
HAVING COUNT(*) > 10
ORDER BY batch_time DESC
LIMIT 20;
If you see batches where event_span_seconds is large (events spread over minutes) but batch_time is the same second, that's evidence of queued messages being delivered after reconnect.
Related ADRs
- ADR-027: Event-Driven Architecture - Internal service events (different from IoT device MQTT)
- ADR-041: BBU Processing Hardening - Idempotent processing of MQTT messages
- ADR-033: Base Orchestration Pattern - How
EnvironmentShortNameis configured in projects
Implementation Files
engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Configuration/MqttProcessorConfiguration.cs- Configuration classengines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/MqttProcessorBase.cs- Base processor with environment isolation logicengines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Services/ZebraRfidProcessor.cs- Example implementationprojects/bbu-rfid/src/Acsis.Dynaplex.Projects.BbuRfid/BbuRfidOptions.cs- Project options withEnvironmentShortName
Date: 2025-12-05
Author: Architecture Team
Reviewers: Development Team