Documentation

fsds/bbu-and-iot-component-hardening.md

BBU & IoT Component Hardening Plan

Goal: Production-ready idempotency, race condition elimination, and architectural boundary enforcement
Milestone: Thursday
Scope: Full hardening (Critical + High + Medium issues)


Executive Summary

The BBU and IoT components have several critical issues that can cause duplicate data, lost updates, and data inconsistency. This plan addresses them systematically while respecting the architectural boundary: IoT collects raw data; BBU owns business logic and processing state.


Critical Issues Being Fixed

Issue Impact Priority
Dual Processing Same tag read processed by live MQTT AND replay service CRITICAL
Checkpoint Races Crash between process and checkpoint update causes reprocessing CRITICAL
Missing Indexes Full table scans on WHERE bbu_processed_at IS NULL CRITICAL
Movement Duplicates Same basket movement creates multiple records HIGH
Camera Read Duplicates CameraReadProcessing inserted without upsert HIGH
Unbounded Retries Infinite retry loop for missing OBLPN HIGH
Cache Never Expires Stale movement types, service users MEDIUM
Hard-coded Device Map Device-to-location in code, not config MEDIUM

Implementation Stages

Stage 1: Database Schema Changes (BBU)

Est. Time: 2-3 hours
Priority: CRITICAL - Foundation for all other fixes

Create new entities in BBU database to own processing state:

1.1 TagReadProcessing Entity

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/TagReadProcessing.cs

Purpose: BBU-owned tracking of which IoT tag reads have been processed
Fields:
- Id (long, PK)
- ZbTagReadId (long) - Reference to IoT record
- ProcessorType (string) - "MQTT" or "Replay"
- ProcessedAtUtc (DateTime)
- BasketId (Guid?) - Result item ID
- MovementId (Guid?) - If movement was created
- TenantId (Guid)
- Unique index on (TenantId, ZbTagReadId)

1.2 MovementDeduplication Entity

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/MovementDeduplication.cs

Purpose: Prevent duplicate movements within time window
Fields:
- Id (long, PK)
- BasketId (Guid)
- FromLocationId (Guid)
- ToLocationId (Guid)
- WindowStartUtc (DateTime) - 5-minute buckets
- MovementId (Guid)
- CreatedAtUtc (DateTime)
- TenantId (Guid)
- Unique index on (BasketId, FromLocationId, ToLocationId, WindowStartUtc, TenantId)

1.3 ProcessingDeadLetter Entity

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/ProcessingDeadLetter.cs

Purpose: Track failed items with retry bounds
Fields:
- Id (long, PK)
- ProcessorName (string)
- EntityType (string) - "TagRead", "CameraRead", "Blob"
- EntityId (string)
- RetryCount (int)
- MaxRetries (int) - Default 5
- LastError (string?)
- FirstFailureUtc (DateTime)
- LastFailureUtc (DateTime)
- TenantId (Guid)

1.4 Update BbuDb.cs

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/BbuDb.cs

Add DbSets and model configuration for new entities

1.5 Create Migration

Use /generate-migration command to create migration

Stage 2: PostgreSQL Advisory Locks Helper

Est. Time: 1-2 hours
Priority: HIGH - Enables safe concurrent processing

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Services/PostgresAdvisoryLock.cs

Lock Types:
- ReturnDoorProcessor: 0x42425530001
- OracleProcessor: 0x42425530002
- TagReadReplay: 0x42425530003
- ShipmentLifecycle: 0x42425530004
- Per-device locks: 0x42425540000 + hash(serial)

Methods:
- TryAcquireLockAsync(DbContext, lockType)
- TryAcquireDeviceLockAsync(DbContext, deviceSerial)
- ReleaseLockAsync(DbContext, lockType)

Stage 3: Unified Basket Processing Service

Est. Time: 3-4 hours
Priority: CRITICAL - Fixes dual processing

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/BasketProcessingService.cs

Purpose: Single entry point for all basket creation/update logic with idempotency

Methods:
1. ProcessTagReadAsync(tagReadId, epc, deviceId, timestamp, processorType)
   - Check TagReadProcessing for existing entry → return early if exists
   - Find/create basket (existing logic)
   - Create movement if location changed (with deduplication)
   - Record in TagReadProcessing
   - Return result with basket/movement IDs

2. CheckAndRecordMovementAsync(basketId, fromLoc, toLoc, tenantId)
   - Calculate 5-minute window bucket
   - Check MovementDeduplication for existing
   - Create movement only if not duplicate
   - Record in MovementDeduplication

Key Design:
- Uses BBU's TagReadProcessing table (not IoT's bbu_processed_at)
- Both MQTT and Replay services call this
- IoT database not modified by BBU (respects boundary)

Stage 4: Refactor BbuMqttProcessor

Est. Time: 2 hours
Priority: CRITICAL

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/BbuMqttProcessor.cs

Changes:
1. Inject BasketProcessingService
2. OnTagReadsAsync: For each read, look up ZbTagRead.Id from IoT (by EPC + timestamp)
3. Call BasketProcessingService.ProcessTagReadAsync with tagReadId
4. Remove duplicated basket creation logic (now in BasketProcessingService)
5. Remove CreateMovementRecordAsync (moved to BasketProcessingService)

Challenge: MQTT path doesn't have ZbTagRead.Id yet (IoT captures separately)
Solution: Query IoT by (DeviceId, Epc, EventTimestamp) to find the ID, or use
         a content-based key: SHA256(DeviceId + Epc + Timestamp) for TagReadProcessing

Stage 5: Refactor TagReadReplayService

Est. Time: 1-2 hours
Priority: CRITICAL

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/TagReadReplayService.cs

Changes:
1. Inject BasketProcessingService
2. Query: Change from IoT's bbu_processed_at to BBU's TagReadProcessing
   - SELECT from iot.zb_tag_reads WHERE id NOT IN (SELECT zb_tag_read_id FROM bbu.tag_read_processing)
   - Or use LEFT JOIN pattern
3. Call BasketProcessingService.ProcessTagReadAsync for each
4. Remove duplicated basket creation logic
5. Remove direct IoT database writes (no more setting bbu_processed_at)

Stage 6: Fix ReturnDoorProcessor Race Condition

Est. Time: 2 hours
Priority: HIGH

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ReturnDoorProcessor.cs

Changes at lines 159-207:
1. Acquire per-device advisory lock before processing
2. Move checkpoint update INSIDE the processing loop
3. Save checkpoint after EACH read, not after entire batch
4. Wrap each read in try/catch, continue on error
5. Add dead letter tracking for repeated failures

Pattern:
foreach(var device in doorDevices) {
    if(!await PostgresAdvisoryLock.TryAcquireDeviceLockAsync(bbuDb, device.Serial, ct))
        continue;
    try {
        foreach(var read in reads) {
            using var tx = await bbuDb.Database.BeginTransactionAsync();
            await ProcessReturnReadAsync(read);
            checkpoint.LastProcessedReadId = read.Id;
            await bbuDb.SaveChangesAsync();
            await tx.CommitAsync();
        }
    } finally {
        await PostgresAdvisoryLock.ReleaseLockAsync(bbuDb, device.Serial);
    }
}

Stage 7: Fix OracleProcessor Race Condition

Est. Time: 1-2 hours
Priority: HIGH

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/OracleProcessor.cs

Changes at lines 109-182:
1. Acquire global OracleProcessor advisory lock
2. Add blob to _processedBlobs BEFORE processing (with rollback on failure)
3. Or: Remove in-memory HashSet entirely, rely on DB + unique constraint
4. Add dead letter tracking for blob processing failures

Recommended approach:
- Keep HashSet as performance optimization
- Add unique constraint on BackgroundServiceRun.BlobName
- Catch unique constraint violations gracefully

Stage 8: Fix ShipmentLifecycleProcessor

Est. Time: 1-2 hours
Priority: HIGH

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ShipmentLifecycleProcessor.cs

Changes at lines 438-465 (CreateCameraReadProcessings):
1. Use upsert pattern instead of blind insert
2. Check for existing CameraReadProcessing by DlCameraReadId
3. Skip if already exists (don't throw)

Changes for retry bounds:
1. Add MaxRetryCount field to CameraReadProcessing (or use dead letter)
2. Track retry count for "NoBaskets" and "MissingStack" statuses
3. After N retries (e.g., 10), move to dead letter or mark as Failed

Stage 9: IoT Database Indexes

Est. Time: 1 hour
Priority: HIGH

File: New migration in IoT component

Add indexes:
1. Partial index for BBU query optimization:
   CREATE INDEX ix_zb_tag_reads_id_unprocessed ON iot.zb_tag_reads (id)
   (Note: This is for BBU's NOT IN query pattern, not bbu_processed_at)

2. Composite index for time-windowed device queries:
   CREATE INDEX ix_zb_tag_reads_device_timestamp
   ON iot.zb_tag_reads (device_id, event_timestamp);

3. Unique constraint for blob deduplication:
   CREATE UNIQUE INDEX ix_dl_camera_reads_blob_tenant
   ON iot.dl_camera_reads (tenant_id, blob_name);

Stage 10: Cache Expiration

Est. Time: 1 hour
Priority: MEDIUM

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/CachedReferenceDataService.cs (new)

Wrap cached reference data with expiration:
- Movement type ID (15 minute TTL)
- Movement state ID (15 minute TTL)
- Service user ID (15 minute TTL)
- Item type/status IDs (15 minute TTL)

Update processors to use CachedReferenceDataService instead of instance fields.

Stage 11: Configurable Device Mapping

Est. Time: 1 hour
Priority: MEDIUM

File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Configuration/DeviceMappingOptions.cs

Move hard-coded mapping (BbuInitializationService.cs lines 33-41) to configuration:
{
  "DeviceMapping": {
    "Mappings": {
      "fdfb45": "Gantry",
      "5fae32": "Door 173"
    }
  }
}

Migration Strategy

  1. Deploy BBU database migration first - New tables don't affect existing functionality
  2. Keep IoT's bbu_processed_at field - Don't remove yet for backwards compatibility
  3. Deploy code changes - New processing uses BBU's TagReadProcessing table
  4. Backfill TagReadProcessing - Optional: migrate existing bbu_processed_at data
  5. Future cleanup - Remove bbu_processed_at from IoT in later release

Implementation Order (Thursday MVP)

Order Stage Time Cumulative
1 Stage 1: DB Schema 2-3h 3h
2 Stage 2: Advisory Locks 1-2h 5h
3 Stage 3: BasketProcessingService 3-4h 9h
4 Stage 4: Refactor BbuMqttProcessor 2h 11h
5 Stage 5: Refactor TagReadReplayService 1-2h 13h
6 Stage 6: Fix ReturnDoorProcessor 2h 15h
7 Stage 7: Fix OracleProcessor 1-2h 17h
8 Stage 8: Fix ShipmentLifecycle 1-2h 19h
9 Stage 9: IoT Indexes 1h 20h
10 Stage 10: Cache Expiration 1h 21h
11 Stage 11: Device Mapping 1h 22h

Critical path for Thursday: Stages 1-8 (~17-19 hours)
Nice to have: Stages 9-11 (~3 hours)


Files to Modify/Create

New Files (BBU)

  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/TagReadProcessing.cs
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/MovementDeduplication.cs
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/ProcessingDeadLetter.cs
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Services/PostgresAdvisoryLock.cs
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/BasketProcessingService.cs
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/CachedReferenceDataService.cs
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Configuration/DeviceMappingOptions.cs

Modified Files (BBU)

  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/BbuDb.cs - Add new DbSets
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/BbuMqttProcessor.cs - Use BasketProcessingService
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/TagReadReplayService.cs - Use BasketProcessingService
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ReturnDoorProcessor.cs - Fix checkpoint race
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/OracleProcessor.cs - Fix blob race
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ShipmentLifecycleProcessor.cs - Add upsert, retry bounds
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/BbuInitializationService.cs - Use DeviceMappingOptions
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Program.cs - Register new services
  • /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/appsettings.json - Add DeviceMapping section

Modified Files (IoT)

  • /engines/iot/src/Acsis.Dynaplex.Engines.Iot.Database/ - New migration for indexes

Testing Checklist

  • Process same tag read via MQTT twice → only one basket created
  • Start replay service while MQTT is processing → no duplicates
  • Kill BBU mid-checkpoint update → no duplicate processing on restart
  • Same basket moves A→B twice in 5 minutes → only one movement
  • Kill OracleProcessor mid-blob → blob not reprocessed (or gracefully handled)
  • Camera read with missing OBLPN → retried up to limit, then dead-lettered
  • Query unprocessed tag reads with 1M records → fast (uses index)
  • Device mapping loaded from config, not hard-coded

Open Questions (Resolved)

  1. Q: Should BBU write to IoT's bbu_processed_at?
    A: No. BBU maintains its own TagReadProcessing table. IoT is unaware of BBU.

  2. Q: What's the idempotency key for MQTT processing?
    A: SHA256(DeviceId + Epc + Timestamp) or query IoT to find ZbTagRead.Id

  3. Q: How to handle concurrent instances?
    A: PostgreSQL advisory locks (session-level, auto-released on disconnect)