Documentation
fsds/iot-emqx-connection-status-sync.md
Plan: EMQX Connection Status Sync with Historical Tracking
IMMEDIATE FIX REQUIRED: Tenant ID Handling
Root Cause Identified: The EmqxConnectionSyncService fails with "Nullable object must have a value" because it doesn't call SetTenantId() on the IotDb before querying.
The DynaplexDbContext applies global query filters that access _tenantId.Value. When the tenant ID is not set, this throws the exception.
Fix Required (following DatalogicBlobProcessor pattern):
- Add dependency on
IdentityApiClientvia service provider - Add
_cachedTenantIdfield to cache the tenant ID - Add
GetTenantIdAsync()method usingTenantHelper.GetDefaultTenantIdAsync() - Call
iotDb.SetTenantId(tenantId)before every database query
File to Modify: engines/iot/src/Acsis.Dynaplex.Engines.Iot/Integrations/Emqx/EmqxConnectionSyncService.cs
// Add field
private Guid? _cachedTenantId;
// Add method (copy from DatalogicBlobProcessor)
private async Task<Guid> GetTenantIdAsync(CancellationToken cancellationToken) {
if(_cachedTenantId.HasValue) {
return _cachedTenantId.Value;
}
await using var scope = services.CreateAsyncScope();
var identityClient = scope.ServiceProvider.GetRequiredService<IdentityApiClient>();
_cachedTenantId = await TenantHelper.GetDefaultTenantIdAsync(identityClient, cancellationToken);
logger.LogInformation("Using default tenant ID: {TenantId}", _cachedTenantId.Value);
return _cachedTenantId.Value;
}
// Modify SyncConnectedClients to set tenant ID
private async Task SyncConnectedClients(CancellationToken cancellationToken) {
using var scope = services.CreateScope();
var apiClient = scope.ServiceProvider.GetRequiredService<EmqxApiClient>();
var iotDb = scope.ServiceProvider.GetRequiredService<IotDb>();
// ADD THIS LINE - the missing piece!
var tenantId = await GetTenantIdAsync(cancellationToken);
iotDb.SetTenantId(tenantId);
// ... rest of method unchanged
}
Required using statement: using Acsis.Dynaplex.Engines.Identity.Database; and using Acsis.Dynaplex.Engines.Identity.Abstractions;
Problem Statement
We need to track MQTT client (door reader) connection status from the EMQX broker with historical data - not just "last seen". This enables detecting patterns like "disconnecting for 5 minutes every hour" which would be invisible with just a last-seen timestamp.
EMQX Broker API (Confirmed Working):
- URL:
https://i5122f12.ala.us-east-1.emqxsl.com:8443/api/v5/clients - Auth: App ID
j1c227ff/ App SecretTb8iWrJqv5-rNjGE
Design Decisions
Historical Tracking Approach: Event-Based
Store raw connection state changes as events, not every poll result:
| Approach | Description | Verdict |
|---|---|---|
| Last-seen only | Just update LastSeenAtUtc on Device |
❌ No history |
| Poll recording | Store every poll result | ❌ Massive data growth |
| Event-based | Record CONNECT/DISCONNECT events | ✅ Clean history |
Key insight: Only record state changes. If device was connected and is still connected, no new event. If device was connected but now gone from API response → DISCONNECT event.
Data Model
1. New Entity: DeviceConnectionEvent
iot.device_connection_events
├── id (guid, PK)
├── device_id (guid, FK to devices)
├── mqtt_client_id (string) - the client ID from EMQX
├── event_type (string) - "CONNECTED" or "DISCONNECTED"
├── event_timestamp_utc (timestamptz) - when the state change was detected
├── ip_address (string, nullable) - connection IP address
├── connected_at_utc (timestamptz, nullable) - from EMQX "connected_at" field
├── metadata_json (jsonb, nullable) - keepalive, protocol version, etc.
└── created_at_utc (timestamptz)
2. Add to Device entity for quick lookups:
// Current connection status (updated each poll)
public bool? IsConnected { get; set; }
public DateTimeOffset? LastSeenAtUtc { get; set; }
public DateTimeOffset? ConnectedSinceUtc { get; set; }
public string? MqttClientId { get; set; }
public string? ConnectionIpAddress { get; set; }
Sync Service Logic
1. Poll EMQX /api/v5/clients every N seconds (configurable, default 60s)
2. For each client in response:
a. Parse client ID to identify device (e.g., "door-173-..." → Device where SerialNumber contains "173")
b. Look up device in database
c. If device.IsConnected == false or null, and now connected:
- Record CONNECTED event
- Update device status fields
d. If device.IsConnected == true, and still connected:
- Just update LastSeenAtUtc (no new event)
3. For devices that WERE connected but NOT in response:
- Record DISCONNECTED event
- Update device.IsConnected = false
Files to Create
1. Configuration
engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Configuration/EmqxSyncConfiguration.cs
public class EmqxSyncConfiguration {
public bool Enabled { get; set; } = false;
public string BrokerApiBaseUrl { get; set; } = string.Empty;
public string AppId { get; set; } = string.Empty; // kv:emqx-app-id
public string AppSecret { get; set; } = string.Empty; // kv:emqx-app-secret
public int PollingIntervalSeconds { get; set; } = 60;
public int RetentionDays { get; set; } = 90;
public string ClientIdPattern { get; set; } = "door-{serial}-"; // Pattern to match
}
2. Entity
engines/iot/src/Acsis.Dynaplex.Engines.Iot.Database/DeviceConnectionEvent.cs
[Table("device_connection_events")]
public class DeviceConnectionEvent {
[Key] public Guid Id { get; set; }
public Guid DeviceId { get; set; }
public string MqttClientId { get; set; }
public string EventType { get; set; } // "CONNECTED" or "DISCONNECTED"
public DateTimeOffset EventTimestampUtc { get; set; }
public string? IpAddress { get; set; }
public DateTimeOffset? ConnectedAtUtc { get; set; }
public string? MetadataJson { get; set; }
public DateTimeOffset CreatedAtUtc { get; set; }
public Device Device { get; set; }
}
3. API Client
engines/iot/src/Acsis.Dynaplex.Engines.Iot/Integrations/Emqx/EmqxApiClient.cs
- HTTP client wrapper for EMQX REST API
- Basic auth with App ID/Secret
GetConnectedClients()method
4. Sync Service
engines/iot/src/Acsis.Dynaplex.Engines.Iot/Integrations/Emqx/EmqxConnectionSyncService.cs
- BackgroundService following
Tn360SyncBackgroundServicepattern - Polls EMQX API
- Detects state changes
- Records events and updates device status
Files to Modify
1. Device Entity
engines/iot/src/Acsis.Dynaplex.Engines.Iot.Database/Device.cs
Add connection status fields:
[Column("is_connected")]
public bool? IsConnected { get; set; }
[Column("last_seen_at_utc")]
public DateTimeOffset? LastSeenAtUtc { get; set; }
[Column("connected_since_utc")]
public DateTimeOffset? ConnectedSinceUtc { get; set; }
[Column("mqtt_client_id")]
public string? MqttClientId { get; set; }
[Column("connection_ip_address")]
public string? ConnectionIpAddress { get; set; }
2. IotDb Context
engines/iot/src/Acsis.Dynaplex.Engines.Iot.Database/IotDb.cs
Add DbSet for new entity:
public virtual DbSet<DeviceConnectionEvent> DeviceConnectionEvents { get; set; }
3. IoT Component Registration
engines/iot/src/Acsis.Dynaplex.Engines.Iot/IotDependencies.cs (or equivalent)
Register:
- EmqxApiClient as HttpClient
- EmqxConnectionSyncService as HostedService
4. AppSettings
projects/bbu-rfid/src/Acsis.Dynaplex.Projects.BbuRfid/appsettings.json
Add EMQX configuration:
"Iot": {
"EmqxSync": {
"Enabled": true,
"BrokerApiBaseUrl": "https://i5122f12.ala.us-east-1.emqxsl.com:8443/api/v5",
"AppId": "kv:emqx-app-id",
"AppSecret": "kv:emqx-app-secret",
"PollingIntervalSeconds": 60,
"RetentionDays": 90
}
}
Database Migration
Create migration for:
- New columns on
iot.devicestable - New
iot.device_connection_eventstable with:- Index on
device_id - Index on
event_timestamp_utcfor time-range queries - Index on
(device_id, event_timestamp_utc)for device-specific history
- Index on
Query Examples (After Implementation)
-- Find devices that disconnected more than 5 times in last 24 hours
SELECT device_id, COUNT(*) as disconnect_count
FROM iot.device_connection_events
WHERE event_type = 'DISCONNECTED'
AND event_timestamp_utc > NOW() - INTERVAL '24 hours'
GROUP BY device_id
HAVING COUNT(*) > 5;
-- Find longest outage per device in last week
WITH connected AS (
SELECT device_id, event_timestamp_utc as connected_at,
LAG(event_timestamp_utc) OVER (PARTITION BY device_id ORDER BY event_timestamp_utc) as disconnected_at
FROM iot.device_connection_events
WHERE event_type = 'CONNECTED'
AND event_timestamp_utc > NOW() - INTERVAL '7 days'
)
SELECT device_id, MAX(connected_at - disconnected_at) as longest_outage
FROM connected
WHERE disconnected_at IS NOT NULL
GROUP BY device_id;
-- Current connection status (quick lookup from device table)
SELECT serial_number, is_connected, last_seen_at_utc, connected_since_utc
FROM iot.devices
WHERE device_type = 'DOOR_READER';
Implementation Order
- Entity & Migration - Create DeviceConnectionEvent entity and add fields to Device
- Configuration - Create EmqxSyncConfiguration
- API Client - Create EmqxApiClient
- Sync Service - Create EmqxConnectionSyncService
- Registration - Wire up DI and add to appsettings
- Testing - Verify events are recorded on connect/disconnect
Notes
- The EMQX client ID pattern for doors is
door-{serialNumber}-{environmentId}-{instanceId} - We'll need to parse client IDs to match to devices by serial number
- Retention cleanup can be handled by a simple background job or PostgreSQL partition/TTL