Documentation

fsds/iot-emqx-connection-status-sync.md

Plan: EMQX Connection Status Sync with Historical Tracking

IMMEDIATE FIX REQUIRED: Tenant ID Handling

Root Cause Identified: The EmqxConnectionSyncService fails with "Nullable object must have a value" because it doesn't call SetTenantId() on the IotDb before querying.

The DynaplexDbContext applies global query filters that access _tenantId.Value. When the tenant ID is not set, this throws the exception.

Fix Required (following DatalogicBlobProcessor pattern):

  1. Add dependency on IdentityApiClient via service provider
  2. Add _cachedTenantId field to cache the tenant ID
  3. Add GetTenantIdAsync() method using TenantHelper.GetDefaultTenantIdAsync()
  4. Call iotDb.SetTenantId(tenantId) before every database query

File to Modify: engines/iot/src/Acsis.Dynaplex.Engines.Iot/Integrations/Emqx/EmqxConnectionSyncService.cs

// Add field
private Guid? _cachedTenantId;

// Add method (copy from DatalogicBlobProcessor)
private async Task<Guid> GetTenantIdAsync(CancellationToken cancellationToken) {
    if(_cachedTenantId.HasValue) {
        return _cachedTenantId.Value;
    }

    await using var scope = services.CreateAsyncScope();
    var identityClient = scope.ServiceProvider.GetRequiredService<IdentityApiClient>();

    _cachedTenantId = await TenantHelper.GetDefaultTenantIdAsync(identityClient, cancellationToken);
    logger.LogInformation("Using default tenant ID: {TenantId}", _cachedTenantId.Value);

    return _cachedTenantId.Value;
}

// Modify SyncConnectedClients to set tenant ID
private async Task SyncConnectedClients(CancellationToken cancellationToken) {
    using var scope = services.CreateScope();
    var apiClient = scope.ServiceProvider.GetRequiredService<EmqxApiClient>();
    var iotDb = scope.ServiceProvider.GetRequiredService<IotDb>();

    // ADD THIS LINE - the missing piece!
    var tenantId = await GetTenantIdAsync(cancellationToken);
    iotDb.SetTenantId(tenantId);

    // ... rest of method unchanged
}

Required using statement: using Acsis.Dynaplex.Engines.Identity.Database; and using Acsis.Dynaplex.Engines.Identity.Abstractions;


Problem Statement

We need to track MQTT client (door reader) connection status from the EMQX broker with historical data - not just "last seen". This enables detecting patterns like "disconnecting for 5 minutes every hour" which would be invisible with just a last-seen timestamp.

EMQX Broker API (Confirmed Working):

  • URL: https://i5122f12.ala.us-east-1.emqxsl.com:8443/api/v5/clients
  • Auth: App ID j1c227ff / App Secret Tb8iWrJqv5-rNjGE

Design Decisions

Historical Tracking Approach: Event-Based

Store raw connection state changes as events, not every poll result:

Approach Description Verdict
Last-seen only Just update LastSeenAtUtc on Device ❌ No history
Poll recording Store every poll result ❌ Massive data growth
Event-based Record CONNECT/DISCONNECT events ✅ Clean history

Key insight: Only record state changes. If device was connected and is still connected, no new event. If device was connected but now gone from API response → DISCONNECT event.

Data Model

1. New Entity: DeviceConnectionEvent

iot.device_connection_events
├── id (guid, PK)
├── device_id (guid, FK to devices)
├── mqtt_client_id (string) - the client ID from EMQX
├── event_type (string) - "CONNECTED" or "DISCONNECTED"
├── event_timestamp_utc (timestamptz) - when the state change was detected
├── ip_address (string, nullable) - connection IP address
├── connected_at_utc (timestamptz, nullable) - from EMQX "connected_at" field
├── metadata_json (jsonb, nullable) - keepalive, protocol version, etc.
└── created_at_utc (timestamptz)

2. Add to Device entity for quick lookups:

// Current connection status (updated each poll)
public bool? IsConnected { get; set; }
public DateTimeOffset? LastSeenAtUtc { get; set; }
public DateTimeOffset? ConnectedSinceUtc { get; set; }
public string? MqttClientId { get; set; }
public string? ConnectionIpAddress { get; set; }

Sync Service Logic

1. Poll EMQX /api/v5/clients every N seconds (configurable, default 60s)
2. For each client in response:
   a. Parse client ID to identify device (e.g., "door-173-..." → Device where SerialNumber contains "173")
   b. Look up device in database
   c. If device.IsConnected == false or null, and now connected:
      - Record CONNECTED event
      - Update device status fields
   d. If device.IsConnected == true, and still connected:
      - Just update LastSeenAtUtc (no new event)
3. For devices that WERE connected but NOT in response:
   - Record DISCONNECTED event
   - Update device.IsConnected = false

Files to Create

1. Configuration

engines/iot/src/Acsis.Dynaplex.Engines.Iot.Abstractions/Configuration/EmqxSyncConfiguration.cs

public class EmqxSyncConfiguration {
    public bool Enabled { get; set; } = false;
    public string BrokerApiBaseUrl { get; set; } = string.Empty;
    public string AppId { get; set; } = string.Empty;     // kv:emqx-app-id
    public string AppSecret { get; set; } = string.Empty; // kv:emqx-app-secret
    public int PollingIntervalSeconds { get; set; } = 60;
    public int RetentionDays { get; set; } = 90;
    public string ClientIdPattern { get; set; } = "door-{serial}-"; // Pattern to match
}

2. Entity

engines/iot/src/Acsis.Dynaplex.Engines.Iot.Database/DeviceConnectionEvent.cs

[Table("device_connection_events")]
public class DeviceConnectionEvent {
    [Key] public Guid Id { get; set; }
    public Guid DeviceId { get; set; }
    public string MqttClientId { get; set; }
    public string EventType { get; set; } // "CONNECTED" or "DISCONNECTED"
    public DateTimeOffset EventTimestampUtc { get; set; }
    public string? IpAddress { get; set; }
    public DateTimeOffset? ConnectedAtUtc { get; set; }
    public string? MetadataJson { get; set; }
    public DateTimeOffset CreatedAtUtc { get; set; }

    public Device Device { get; set; }
}

3. API Client

engines/iot/src/Acsis.Dynaplex.Engines.Iot/Integrations/Emqx/EmqxApiClient.cs

  • HTTP client wrapper for EMQX REST API
  • Basic auth with App ID/Secret
  • GetConnectedClients() method

4. Sync Service

engines/iot/src/Acsis.Dynaplex.Engines.Iot/Integrations/Emqx/EmqxConnectionSyncService.cs

  • BackgroundService following Tn360SyncBackgroundService pattern
  • Polls EMQX API
  • Detects state changes
  • Records events and updates device status

Files to Modify

1. Device Entity

engines/iot/src/Acsis.Dynaplex.Engines.Iot.Database/Device.cs
Add connection status fields:

[Column("is_connected")]
public bool? IsConnected { get; set; }

[Column("last_seen_at_utc")]
public DateTimeOffset? LastSeenAtUtc { get; set; }

[Column("connected_since_utc")]
public DateTimeOffset? ConnectedSinceUtc { get; set; }

[Column("mqtt_client_id")]
public string? MqttClientId { get; set; }

[Column("connection_ip_address")]
public string? ConnectionIpAddress { get; set; }

2. IotDb Context

engines/iot/src/Acsis.Dynaplex.Engines.Iot.Database/IotDb.cs
Add DbSet for new entity:

public virtual DbSet<DeviceConnectionEvent> DeviceConnectionEvents { get; set; }

3. IoT Component Registration

engines/iot/src/Acsis.Dynaplex.Engines.Iot/IotDependencies.cs (or equivalent)
Register:

  • EmqxApiClient as HttpClient
  • EmqxConnectionSyncService as HostedService

4. AppSettings

projects/bbu-rfid/src/Acsis.Dynaplex.Projects.BbuRfid/appsettings.json
Add EMQX configuration:

"Iot": {
    "EmqxSync": {
        "Enabled": true,
        "BrokerApiBaseUrl": "https://i5122f12.ala.us-east-1.emqxsl.com:8443/api/v5",
        "AppId": "kv:emqx-app-id",
        "AppSecret": "kv:emqx-app-secret",
        "PollingIntervalSeconds": 60,
        "RetentionDays": 90
    }
}

Database Migration

Create migration for:

  1. New columns on iot.devices table
  2. New iot.device_connection_events table with:
    • Index on device_id
    • Index on event_timestamp_utc for time-range queries
    • Index on (device_id, event_timestamp_utc) for device-specific history

Query Examples (After Implementation)

-- Find devices that disconnected more than 5 times in last 24 hours
SELECT device_id, COUNT(*) as disconnect_count
FROM iot.device_connection_events
WHERE event_type = 'DISCONNECTED'
  AND event_timestamp_utc > NOW() - INTERVAL '24 hours'
GROUP BY device_id
HAVING COUNT(*) > 5;

-- Find longest outage per device in last week
WITH connected AS (
    SELECT device_id, event_timestamp_utc as connected_at,
           LAG(event_timestamp_utc) OVER (PARTITION BY device_id ORDER BY event_timestamp_utc) as disconnected_at
    FROM iot.device_connection_events
    WHERE event_type = 'CONNECTED'
      AND event_timestamp_utc > NOW() - INTERVAL '7 days'
)
SELECT device_id, MAX(connected_at - disconnected_at) as longest_outage
FROM connected
WHERE disconnected_at IS NOT NULL
GROUP BY device_id;

-- Current connection status (quick lookup from device table)
SELECT serial_number, is_connected, last_seen_at_utc, connected_since_utc
FROM iot.devices
WHERE device_type = 'DOOR_READER';

Implementation Order

  1. Entity & Migration - Create DeviceConnectionEvent entity and add fields to Device
  2. Configuration - Create EmqxSyncConfiguration
  3. API Client - Create EmqxApiClient
  4. Sync Service - Create EmqxConnectionSyncService
  5. Registration - Wire up DI and add to appsettings
  6. Testing - Verify events are recorded on connect/disconnect

Notes

  • The EMQX client ID pattern for doors is door-{serialNumber}-{environmentId}-{instanceId}
  • We'll need to parse client IDs to match to devices by serial number
  • Retention cleanup can be handled by a simple background job or PostgreSQL partition/TTL