Documentation
adrs/041-bbu-processing-hardening.md
ADR-041: BBU Processing Hardening Patterns
Status
Accepted
Context
The BBU component processes RFID tag reads from multiple sources:
- Real-time MQTT via
BbuMqttProcessor(Zebra FX readers) - Batch replay via
TagReadReplayService(backfill from IoT database) - Camera reconciliation via
ShipmentLifecycleProcessor - Return tracking via
ReturnDoorProcessor
Several critical issues were identified:
- Dual Processing Vulnerability - Same tag read could be processed by both MQTT (real-time) and replay (backfill), causing duplicate baskets and movements
- Checkpoint Race Conditions - Crash between processing and checkpoint update causes reprocessing on restart
- No Distributed Locking - Multiple service instances could process same data concurrently
- Unbounded Retries - Failed items (missing OBLPN, etc.) would retry forever
- Cross-Component Writes - BBU was writing to IoT's
bbu_processed_atcolumn, violating component boundaries - Movement Duplicates - Same basket movement could create multiple records within short time windows
Decision
We implement a comprehensive hardening strategy with these patterns:
1. BBU-Owned Processing State
BBU maintains its own processing state tables instead of writing to IoT:
bbu.tag_read_processings - Tracks which IoT tag reads have been processed
bbu.movement_deduplications - Prevents duplicate movements in time windows
bbu.processing_dead_letters - Failed items with retry counts
This respects component boundaries: IoT collects raw data; BBU owns business logic.
2. Unified Processing Service
All tag read processing flows through BasketProcessingService:
public record BasketProcessingResult(
bool Success,
Guid? BasketId,
Guid? MovementId,
bool IsNewBasket,
string? SkipReason,
string? ErrorMessage
);
public async Task<BasketProcessingResult> ProcessTagReadAsync(
long? zbTagReadId, // From replay (has IoT DB ID)
string epc,
string? epcUri,
string deviceId,
DateTime eventTimestamp,
string processorType, // "MQTT" or "Replay"
Guid tenantId,
CancellationToken cancellationToken
);
3. Dual Idempotency Keys
- Replay path: Uses
ZbTagReadId(IoT database primary key) - MQTT path: Uses content hash
SHA256(DeviceId + EPC + Timestamp)since IoT record may not exist yet
private static string CalculateContentHash(string deviceId, string epc, DateTime timestamp) {
var input = $"{deviceId}|{epc}|{timestamp:O}";
var bytes = SHA256.HashData(System.Text.Encoding.UTF8.GetBytes(input));
return Convert.ToBase64String(bytes);
}
4. PostgreSQL Advisory Locks
Session-level locks prevent concurrent processing across instances:
public static class PostgresAdvisoryLock {
public const long RETURN_DOOR_PROCESSOR_LOCK = 0x42425530001;
public const long ORACLE_PROCESSOR_LOCK = 0x42425530002;
public const long TAG_READ_REPLAY_LOCK = 0x42425530003;
public const long SHIPMENT_LIFECYCLE_LOCK = 0x42425530004;
public static async Task<bool> TryAcquireLockAsync(DbContext context, long lockId, CancellationToken ct) {
return await context.Database
.SqlQueryRaw<bool>($"SELECT pg_try_advisory_lock({lockId})")
.FirstOrDefaultAsync(ct);
}
}
Per-device locks use hash-based IDs: 0x42425540000 + (hash(serial) % 0x10000)
5. Movement Time-Window Deduplication
Movements are deduplicated within 5-minute buckets:
public static DateTime CalculateWindowStart(DateTime timestamp) {
var minutes = timestamp.Minute / 5 * 5;
return new DateTime(timestamp.Year, timestamp.Month, timestamp.Day,
timestamp.Hour, minutes, 0, DateTimeKind.Utc);
}
Unique constraint: (BasketId, FromLocationId, ToLocationId, WindowStartUtc, TenantId)
6. Dead Letter with Retry Bounds
Failed items are tracked with configurable retry limits:
public class ProcessingDeadLetter {
public string ProcessorName { get; set; } // e.g., "ShipmentLifecycleProcessor"
public string EntityType { get; set; } // e.g., "CameraRead"
public string EntityId { get; set; }
public int RetryCount { get; set; }
public int MaxRetries { get; set; } = 5;
public string? LastError { get; set; }
}
After MaxRetries exceeded, items are excluded from processing.
7. Per-Read Checkpointing
Checkpoints updated after each successful read (not after batch):
foreach (var read in reads) {
try {
await ProcessReadAsync(read);
checkpoint.LastProcessedReadId = read.Id;
await db.SaveChangesAsync(); // Save after EACH read
} catch (Exception ex) {
await RecordDeadLetterAsync(read.Id, ex.Message);
checkpoint.LastProcessedReadId = read.Id; // Still advance
await db.SaveChangesAsync();
}
}
Consequences
Positive
- No duplicate processing - Idempotency keys prevent double-processing
- Crash resilient - Per-read checkpointing minimizes reprocessing
- Scale-out safe - Advisory locks prevent concurrent processing
- Component boundaries respected - BBU doesn't write to IoT database
- Retry bounded - Failed items don't retry forever
- Movement consistency - Time-window deduplication prevents duplicates
Negative
- Additional tables - Three new BBU tables for state tracking
- Query overhead - Idempotency checks add queries per read
- Lock contention - Advisory locks serialize some operations
Neutral
- Migration required - Existing
bbu_processed_atdata not migrated (fresh start) - Memory usage - Content hashes stored for MQTT deduplication
Database Schema
tag_read_processings
CREATE TABLE bbu.tag_read_processings (
id BIGSERIAL PRIMARY KEY,
zb_tag_read_id BIGINT, -- NULL for MQTT (uses content_hash)
content_hash VARCHAR(64), -- NULL for Replay (uses zb_tag_read_id)
processor_type VARCHAR(20) NOT NULL, -- "MQTT" or "Replay"
processed_at_utc TIMESTAMP NOT NULL,
basket_id UUID,
movement_id UUID,
epc_hex VARCHAR(64) NOT NULL,
device_id VARCHAR(64) NOT NULL,
is_new_basket BOOLEAN NOT NULL,
error_message TEXT,
tenant_id UUID NOT NULL
);
CREATE UNIQUE INDEX ix_tag_read_processings_zb_tag_read
ON bbu.tag_read_processings (tenant_id, zb_tag_read_id)
WHERE zb_tag_read_id IS NOT NULL;
CREATE UNIQUE INDEX ix_tag_read_processings_content_hash
ON bbu.tag_read_processings (tenant_id, content_hash)
WHERE content_hash IS NOT NULL;
movement_deduplications
CREATE TABLE bbu.movement_deduplications (
id BIGSERIAL PRIMARY KEY,
basket_id UUID NOT NULL,
from_location_id UUID NOT NULL,
to_location_id UUID NOT NULL,
window_start_utc TIMESTAMP NOT NULL,
movement_id UUID NOT NULL,
created_at_utc TIMESTAMP NOT NULL,
tenant_id UUID NOT NULL
);
CREATE UNIQUE INDEX ix_movement_deduplications_unique
ON bbu.movement_deduplications
(basket_id, from_location_id, to_location_id, window_start_utc, tenant_id);
processing_dead_letters
CREATE TABLE bbu.processing_dead_letters (
id BIGSERIAL PRIMARY KEY,
processor_name VARCHAR(100) NOT NULL,
entity_type VARCHAR(50) NOT NULL,
entity_id VARCHAR(100) NOT NULL,
retry_count INT NOT NULL,
max_retries INT NOT NULL DEFAULT 5,
last_error TEXT,
first_failure_utc TIMESTAMP NOT NULL,
last_failure_utc TIMESTAMP NOT NULL,
tenant_id UUID NOT NULL
);
Related ADRs
- ADR-009: Database Schema Per Service - Component boundary enforcement
- ADR-027: Event-Driven Architecture - Future: Replace polling with events
- ADR-037: Explicit Tenant Scope Operations - Tenant ID handling
Implementation
BasketProcessingService- Unified idempotent processingPostgresAdvisoryLock- Distributed lock helperTagReadProcessing,MovementDeduplication,ProcessingDeadLetter- Database entitiesCachedReferenceDataService- Time-expiring cache for reference data