Documentation
fsds/bbu-and-iot-component-hardening.md
BBU & IoT Component Hardening Plan
Goal: Production-ready idempotency, race condition elimination, and architectural boundary enforcement
Milestone: Thursday
Scope: Full hardening (Critical + High + Medium issues)
Executive Summary
The BBU and IoT components have several critical issues that can cause duplicate data, lost updates, and data inconsistency. This plan addresses them systematically while respecting the architectural boundary: IoT collects raw data; BBU owns business logic and processing state.
Critical Issues Being Fixed
| Issue | Impact | Priority |
|---|---|---|
| Dual Processing | Same tag read processed by live MQTT AND replay service | CRITICAL |
| Checkpoint Races | Crash between process and checkpoint update causes reprocessing | CRITICAL |
| Missing Indexes | Full table scans on WHERE bbu_processed_at IS NULL |
CRITICAL |
| Movement Duplicates | Same basket movement creates multiple records | HIGH |
| Camera Read Duplicates | CameraReadProcessing inserted without upsert | HIGH |
| Unbounded Retries | Infinite retry loop for missing OBLPN | HIGH |
| Cache Never Expires | Stale movement types, service users | MEDIUM |
| Hard-coded Device Map | Device-to-location in code, not config | MEDIUM |
Implementation Stages
Stage 1: Database Schema Changes (BBU)
Est. Time: 2-3 hours
Priority: CRITICAL - Foundation for all other fixes
Create new entities in BBU database to own processing state:
1.1 TagReadProcessing Entity
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/TagReadProcessing.cs
Purpose: BBU-owned tracking of which IoT tag reads have been processed
Fields:
- Id (long, PK)
- ZbTagReadId (long) - Reference to IoT record
- ProcessorType (string) - "MQTT" or "Replay"
- ProcessedAtUtc (DateTime)
- BasketId (Guid?) - Result item ID
- MovementId (Guid?) - If movement was created
- TenantId (Guid)
- Unique index on (TenantId, ZbTagReadId)
1.2 MovementDeduplication Entity
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/MovementDeduplication.cs
Purpose: Prevent duplicate movements within time window
Fields:
- Id (long, PK)
- BasketId (Guid)
- FromLocationId (Guid)
- ToLocationId (Guid)
- WindowStartUtc (DateTime) - 5-minute buckets
- MovementId (Guid)
- CreatedAtUtc (DateTime)
- TenantId (Guid)
- Unique index on (BasketId, FromLocationId, ToLocationId, WindowStartUtc, TenantId)
1.3 ProcessingDeadLetter Entity
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/ProcessingDeadLetter.cs
Purpose: Track failed items with retry bounds
Fields:
- Id (long, PK)
- ProcessorName (string)
- EntityType (string) - "TagRead", "CameraRead", "Blob"
- EntityId (string)
- RetryCount (int)
- MaxRetries (int) - Default 5
- LastError (string?)
- FirstFailureUtc (DateTime)
- LastFailureUtc (DateTime)
- TenantId (Guid)
1.4 Update BbuDb.cs
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/BbuDb.cs
Add DbSets and model configuration for new entities
1.5 Create Migration
Use /generate-migration command to create migration
Stage 2: PostgreSQL Advisory Locks Helper
Est. Time: 1-2 hours
Priority: HIGH - Enables safe concurrent processing
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Services/PostgresAdvisoryLock.cs
Lock Types:
- ReturnDoorProcessor: 0x42425530001
- OracleProcessor: 0x42425530002
- TagReadReplay: 0x42425530003
- ShipmentLifecycle: 0x42425530004
- Per-device locks: 0x42425540000 + hash(serial)
Methods:
- TryAcquireLockAsync(DbContext, lockType)
- TryAcquireDeviceLockAsync(DbContext, deviceSerial)
- ReleaseLockAsync(DbContext, lockType)
Stage 3: Unified Basket Processing Service
Est. Time: 3-4 hours
Priority: CRITICAL - Fixes dual processing
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/BasketProcessingService.cs
Purpose: Single entry point for all basket creation/update logic with idempotency
Methods:
1. ProcessTagReadAsync(tagReadId, epc, deviceId, timestamp, processorType)
- Check TagReadProcessing for existing entry → return early if exists
- Find/create basket (existing logic)
- Create movement if location changed (with deduplication)
- Record in TagReadProcessing
- Return result with basket/movement IDs
2. CheckAndRecordMovementAsync(basketId, fromLoc, toLoc, tenantId)
- Calculate 5-minute window bucket
- Check MovementDeduplication for existing
- Create movement only if not duplicate
- Record in MovementDeduplication
Key Design:
- Uses BBU's TagReadProcessing table (not IoT's bbu_processed_at)
- Both MQTT and Replay services call this
- IoT database not modified by BBU (respects boundary)
Stage 4: Refactor BbuMqttProcessor
Est. Time: 2 hours
Priority: CRITICAL
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/BbuMqttProcessor.cs
Changes:
1. Inject BasketProcessingService
2. OnTagReadsAsync: For each read, look up ZbTagRead.Id from IoT (by EPC + timestamp)
3. Call BasketProcessingService.ProcessTagReadAsync with tagReadId
4. Remove duplicated basket creation logic (now in BasketProcessingService)
5. Remove CreateMovementRecordAsync (moved to BasketProcessingService)
Challenge: MQTT path doesn't have ZbTagRead.Id yet (IoT captures separately)
Solution: Query IoT by (DeviceId, Epc, EventTimestamp) to find the ID, or use
a content-based key: SHA256(DeviceId + Epc + Timestamp) for TagReadProcessing
Stage 5: Refactor TagReadReplayService
Est. Time: 1-2 hours
Priority: CRITICAL
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/TagReadReplayService.cs
Changes:
1. Inject BasketProcessingService
2. Query: Change from IoT's bbu_processed_at to BBU's TagReadProcessing
- SELECT from iot.zb_tag_reads WHERE id NOT IN (SELECT zb_tag_read_id FROM bbu.tag_read_processing)
- Or use LEFT JOIN pattern
3. Call BasketProcessingService.ProcessTagReadAsync for each
4. Remove duplicated basket creation logic
5. Remove direct IoT database writes (no more setting bbu_processed_at)
Stage 6: Fix ReturnDoorProcessor Race Condition
Est. Time: 2 hours
Priority: HIGH
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ReturnDoorProcessor.cs
Changes at lines 159-207:
1. Acquire per-device advisory lock before processing
2. Move checkpoint update INSIDE the processing loop
3. Save checkpoint after EACH read, not after entire batch
4. Wrap each read in try/catch, continue on error
5. Add dead letter tracking for repeated failures
Pattern:
foreach(var device in doorDevices) {
if(!await PostgresAdvisoryLock.TryAcquireDeviceLockAsync(bbuDb, device.Serial, ct))
continue;
try {
foreach(var read in reads) {
using var tx = await bbuDb.Database.BeginTransactionAsync();
await ProcessReturnReadAsync(read);
checkpoint.LastProcessedReadId = read.Id;
await bbuDb.SaveChangesAsync();
await tx.CommitAsync();
}
} finally {
await PostgresAdvisoryLock.ReleaseLockAsync(bbuDb, device.Serial);
}
}
Stage 7: Fix OracleProcessor Race Condition
Est. Time: 1-2 hours
Priority: HIGH
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/OracleProcessor.cs
Changes at lines 109-182:
1. Acquire global OracleProcessor advisory lock
2. Add blob to _processedBlobs BEFORE processing (with rollback on failure)
3. Or: Remove in-memory HashSet entirely, rely on DB + unique constraint
4. Add dead letter tracking for blob processing failures
Recommended approach:
- Keep HashSet as performance optimization
- Add unique constraint on BackgroundServiceRun.BlobName
- Catch unique constraint violations gracefully
Stage 8: Fix ShipmentLifecycleProcessor
Est. Time: 1-2 hours
Priority: HIGH
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ShipmentLifecycleProcessor.cs
Changes at lines 438-465 (CreateCameraReadProcessings):
1. Use upsert pattern instead of blind insert
2. Check for existing CameraReadProcessing by DlCameraReadId
3. Skip if already exists (don't throw)
Changes for retry bounds:
1. Add MaxRetryCount field to CameraReadProcessing (or use dead letter)
2. Track retry count for "NoBaskets" and "MissingStack" statuses
3. After N retries (e.g., 10), move to dead letter or mark as Failed
Stage 9: IoT Database Indexes
Est. Time: 1 hour
Priority: HIGH
File: New migration in IoT component
Add indexes:
1. Partial index for BBU query optimization:
CREATE INDEX ix_zb_tag_reads_id_unprocessed ON iot.zb_tag_reads (id)
(Note: This is for BBU's NOT IN query pattern, not bbu_processed_at)
2. Composite index for time-windowed device queries:
CREATE INDEX ix_zb_tag_reads_device_timestamp
ON iot.zb_tag_reads (device_id, event_timestamp);
3. Unique constraint for blob deduplication:
CREATE UNIQUE INDEX ix_dl_camera_reads_blob_tenant
ON iot.dl_camera_reads (tenant_id, blob_name);
Stage 10: Cache Expiration
Est. Time: 1 hour
Priority: MEDIUM
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/CachedReferenceDataService.cs (new)
Wrap cached reference data with expiration:
- Movement type ID (15 minute TTL)
- Movement state ID (15 minute TTL)
- Service user ID (15 minute TTL)
- Item type/status IDs (15 minute TTL)
Update processors to use CachedReferenceDataService instead of instance fields.
Stage 11: Configurable Device Mapping
Est. Time: 1 hour
Priority: MEDIUM
File: /engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Configuration/DeviceMappingOptions.cs
Move hard-coded mapping (BbuInitializationService.cs lines 33-41) to configuration:
{
"DeviceMapping": {
"Mappings": {
"fdfb45": "Gantry",
"5fae32": "Door 173"
}
}
}
Migration Strategy
- Deploy BBU database migration first - New tables don't affect existing functionality
- Keep IoT's bbu_processed_at field - Don't remove yet for backwards compatibility
- Deploy code changes - New processing uses BBU's TagReadProcessing table
- Backfill TagReadProcessing - Optional: migrate existing bbu_processed_at data
- Future cleanup - Remove bbu_processed_at from IoT in later release
Implementation Order (Thursday MVP)
| Order | Stage | Time | Cumulative |
|---|---|---|---|
| 1 | Stage 1: DB Schema | 2-3h | 3h |
| 2 | Stage 2: Advisory Locks | 1-2h | 5h |
| 3 | Stage 3: BasketProcessingService | 3-4h | 9h |
| 4 | Stage 4: Refactor BbuMqttProcessor | 2h | 11h |
| 5 | Stage 5: Refactor TagReadReplayService | 1-2h | 13h |
| 6 | Stage 6: Fix ReturnDoorProcessor | 2h | 15h |
| 7 | Stage 7: Fix OracleProcessor | 1-2h | 17h |
| 8 | Stage 8: Fix ShipmentLifecycle | 1-2h | 19h |
| 9 | Stage 9: IoT Indexes | 1h | 20h |
| 10 | Stage 10: Cache Expiration | 1h | 21h |
| 11 | Stage 11: Device Mapping | 1h | 22h |
Critical path for Thursday: Stages 1-8 (~17-19 hours)
Nice to have: Stages 9-11 (~3 hours)
Files to Modify/Create
New Files (BBU)
/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/TagReadProcessing.cs/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/MovementDeduplication.cs/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/ProcessingDeadLetter.cs/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Services/PostgresAdvisoryLock.cs/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/BasketProcessingService.cs/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/CachedReferenceDataService.cs/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Configuration/DeviceMappingOptions.cs
Modified Files (BBU)
/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/BbuDb.cs- Add new DbSets/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/BbuMqttProcessor.cs- Use BasketProcessingService/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/TagReadReplayService.cs- Use BasketProcessingService/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ReturnDoorProcessor.cs- Fix checkpoint race/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/OracleProcessor.cs- Fix blob race/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ShipmentLifecycleProcessor.cs- Add upsert, retry bounds/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/BbuInitializationService.cs- Use DeviceMappingOptions/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Program.cs- Register new services/engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/appsettings.json- Add DeviceMapping section
Modified Files (IoT)
/engines/iot/src/Acsis.Dynaplex.Engines.Iot.Database/- New migration for indexes
Testing Checklist
- Process same tag read via MQTT twice → only one basket created
- Start replay service while MQTT is processing → no duplicates
- Kill BBU mid-checkpoint update → no duplicate processing on restart
- Same basket moves A→B twice in 5 minutes → only one movement
- Kill OracleProcessor mid-blob → blob not reprocessed (or gracefully handled)
- Camera read with missing OBLPN → retried up to limit, then dead-lettered
- Query unprocessed tag reads with 1M records → fast (uses index)
- Device mapping loaded from config, not hard-coded
Open Questions (Resolved)
Q: Should BBU write to IoT's bbu_processed_at?
A: No. BBU maintains its own TagReadProcessing table. IoT is unaware of BBU.Q: What's the idempotency key for MQTT processing?
A: SHA256(DeviceId + Epc + Timestamp) or query IoT to find ZbTagRead.IdQ: How to handle concurrent instances?
A: PostgreSQL advisory locks (session-level, auto-released on disconnect)