Documentation

adrs/041-bbu-processing-hardening.md

ADR-041: BBU Processing Hardening Patterns

Status

Accepted

Context

The BBU component processes RFID tag reads from multiple sources:

  • Real-time MQTT via BbuMqttProcessor (Zebra FX readers)
  • Batch replay via TagReadReplayService (backfill from IoT database)
  • Camera reconciliation via ShipmentLifecycleProcessor
  • Return tracking via ReturnDoorProcessor

Several critical issues were identified:

  1. Dual Processing Vulnerability - Same tag read could be processed by both MQTT (real-time) and replay (backfill), causing duplicate baskets and movements
  2. Checkpoint Race Conditions - Crash between processing and checkpoint update causes reprocessing on restart
  3. No Distributed Locking - Multiple service instances could process same data concurrently
  4. Unbounded Retries - Failed items (missing OBLPN, etc.) would retry forever
  5. Cross-Component Writes - BBU was writing to IoT's bbu_processed_at column, violating component boundaries
  6. Movement Duplicates - Same basket movement could create multiple records within short time windows

Decision

We implement a comprehensive hardening strategy with these patterns:

1. BBU-Owned Processing State

BBU maintains its own processing state tables instead of writing to IoT:

bbu.tag_read_processings     - Tracks which IoT tag reads have been processed
bbu.movement_deduplications  - Prevents duplicate movements in time windows
bbu.processing_dead_letters  - Failed items with retry counts

This respects component boundaries: IoT collects raw data; BBU owns business logic.

2. Unified Processing Service

All tag read processing flows through BasketProcessingService:

public record BasketProcessingResult(
    bool Success,
    Guid? BasketId,
    Guid? MovementId,
    bool IsNewBasket,
    string? SkipReason,
    string? ErrorMessage
);

public async Task<BasketProcessingResult> ProcessTagReadAsync(
    long? zbTagReadId,      // From replay (has IoT DB ID)
    string epc,
    string? epcUri,
    string deviceId,
    DateTime eventTimestamp,
    string processorType,   // "MQTT" or "Replay"
    Guid tenantId,
    CancellationToken cancellationToken
);

3. Dual Idempotency Keys

  • Replay path: Uses ZbTagReadId (IoT database primary key)
  • MQTT path: Uses content hash SHA256(DeviceId + EPC + Timestamp) since IoT record may not exist yet
private static string CalculateContentHash(string deviceId, string epc, DateTime timestamp) {
    var input = $"{deviceId}|{epc}|{timestamp:O}";
    var bytes = SHA256.HashData(System.Text.Encoding.UTF8.GetBytes(input));
    return Convert.ToBase64String(bytes);
}

4. PostgreSQL Advisory Locks

Session-level locks prevent concurrent processing across instances:

public static class PostgresAdvisoryLock {
    public const long RETURN_DOOR_PROCESSOR_LOCK = 0x42425530001;
    public const long ORACLE_PROCESSOR_LOCK      = 0x42425530002;
    public const long TAG_READ_REPLAY_LOCK       = 0x42425530003;
    public const long SHIPMENT_LIFECYCLE_LOCK    = 0x42425530004;

    public static async Task<bool> TryAcquireLockAsync(DbContext context, long lockId, CancellationToken ct) {
        return await context.Database
            .SqlQueryRaw<bool>($"SELECT pg_try_advisory_lock({lockId})")
            .FirstOrDefaultAsync(ct);
    }
}

Per-device locks use hash-based IDs: 0x42425540000 + (hash(serial) % 0x10000)

5. Movement Time-Window Deduplication

Movements are deduplicated within 5-minute buckets:

public static DateTime CalculateWindowStart(DateTime timestamp) {
    var minutes = timestamp.Minute / 5 * 5;
    return new DateTime(timestamp.Year, timestamp.Month, timestamp.Day,
        timestamp.Hour, minutes, 0, DateTimeKind.Utc);
}

Unique constraint: (BasketId, FromLocationId, ToLocationId, WindowStartUtc, TenantId)

6. Dead Letter with Retry Bounds

Failed items are tracked with configurable retry limits:

public class ProcessingDeadLetter {
    public string ProcessorName { get; set; }   // e.g., "ShipmentLifecycleProcessor"
    public string EntityType { get; set; }       // e.g., "CameraRead"
    public string EntityId { get; set; }
    public int RetryCount { get; set; }
    public int MaxRetries { get; set; } = 5;
    public string? LastError { get; set; }
}

After MaxRetries exceeded, items are excluded from processing.

7. Per-Read Checkpointing

Checkpoints updated after each successful read (not after batch):

foreach (var read in reads) {
    try {
        await ProcessReadAsync(read);
        checkpoint.LastProcessedReadId = read.Id;
        await db.SaveChangesAsync();  // Save after EACH read
    } catch (Exception ex) {
        await RecordDeadLetterAsync(read.Id, ex.Message);
        checkpoint.LastProcessedReadId = read.Id;  // Still advance
        await db.SaveChangesAsync();
    }
}

Consequences

Positive

  • No duplicate processing - Idempotency keys prevent double-processing
  • Crash resilient - Per-read checkpointing minimizes reprocessing
  • Scale-out safe - Advisory locks prevent concurrent processing
  • Component boundaries respected - BBU doesn't write to IoT database
  • Retry bounded - Failed items don't retry forever
  • Movement consistency - Time-window deduplication prevents duplicates

Negative

  • Additional tables - Three new BBU tables for state tracking
  • Query overhead - Idempotency checks add queries per read
  • Lock contention - Advisory locks serialize some operations

Neutral

  • Migration required - Existing bbu_processed_at data not migrated (fresh start)
  • Memory usage - Content hashes stored for MQTT deduplication

Database Schema

tag_read_processings

CREATE TABLE bbu.tag_read_processings (
    id BIGSERIAL PRIMARY KEY,
    zb_tag_read_id BIGINT,              -- NULL for MQTT (uses content_hash)
    content_hash VARCHAR(64),            -- NULL for Replay (uses zb_tag_read_id)
    processor_type VARCHAR(20) NOT NULL, -- "MQTT" or "Replay"
    processed_at_utc TIMESTAMP NOT NULL,
    basket_id UUID,
    movement_id UUID,
    epc_hex VARCHAR(64) NOT NULL,
    device_id VARCHAR(64) NOT NULL,
    is_new_basket BOOLEAN NOT NULL,
    error_message TEXT,
    tenant_id UUID NOT NULL
);

CREATE UNIQUE INDEX ix_tag_read_processings_zb_tag_read
    ON bbu.tag_read_processings (tenant_id, zb_tag_read_id)
    WHERE zb_tag_read_id IS NOT NULL;

CREATE UNIQUE INDEX ix_tag_read_processings_content_hash
    ON bbu.tag_read_processings (tenant_id, content_hash)
    WHERE content_hash IS NOT NULL;

movement_deduplications

CREATE TABLE bbu.movement_deduplications (
    id BIGSERIAL PRIMARY KEY,
    basket_id UUID NOT NULL,
    from_location_id UUID NOT NULL,
    to_location_id UUID NOT NULL,
    window_start_utc TIMESTAMP NOT NULL,
    movement_id UUID NOT NULL,
    created_at_utc TIMESTAMP NOT NULL,
    tenant_id UUID NOT NULL
);

CREATE UNIQUE INDEX ix_movement_deduplications_unique
    ON bbu.movement_deduplications
    (basket_id, from_location_id, to_location_id, window_start_utc, tenant_id);

processing_dead_letters

CREATE TABLE bbu.processing_dead_letters (
    id BIGSERIAL PRIMARY KEY,
    processor_name VARCHAR(100) NOT NULL,
    entity_type VARCHAR(50) NOT NULL,
    entity_id VARCHAR(100) NOT NULL,
    retry_count INT NOT NULL,
    max_retries INT NOT NULL DEFAULT 5,
    last_error TEXT,
    first_failure_utc TIMESTAMP NOT NULL,
    last_failure_utc TIMESTAMP NOT NULL,
    tenant_id UUID NOT NULL
);

Implementation

  • BasketProcessingService - Unified idempotent processing
  • PostgresAdvisoryLock - Distributed lock helper
  • TagReadProcessing, MovementDeduplication, ProcessingDeadLetter - Database entities
  • CachedReferenceDataService - Time-expiring cache for reference data