Documentation
fsds/bbu-data-processing-overhaul.md
BBU Data Processing Overhaul - Implementation Plan
Executive Summary
Modify BBU data processing to only create shipments/stacks/deliveries for OBLPN-prefixed records, add "unknown destination" shipment handling for orphaned RFID reads, improve failure tracking, and clean up existing invalid data.
Core Principle
Every basket seen at the gantry MUST be attached to SOME shipment - no exceptions.
This provides:
- System resilience - all baskets are tracked
- Simple diagnostic heuristic - any RFID read > 1 hour old should have:
RFID Read → Basket → Shipmentchain - Complete visibility into basket lifecycle
Current State Analysis
Database Findings
| Issue | Data |
|---|---|
| FAR* shipments | 2,323 with avg 422 items each |
| OBLPN* shipments | 429 with avg 437 items each |
| Shipments with 2000+ allocations | 30+ (includes products, not just baskets) |
| Camera NoOblpnValue status | 8,783 reads |
| Camera NoStackForOblpn status | 3,963 reads |
| Camera reads as FARG* (product barcodes) | 49,986 |
| Camera reads starting with OBLPN* | Only 14 |
Root Cause
The OracleProcessor creates stacks/shipments for ALL Oracle XML records regardless of whether the identifier starts with OBLPN. The ShipmentLifecycleProcessor tries to match camera reads to stacks but doesn't filter for OBLPN prefix early enough.
Implementation Stages
Stage 1: OracleProcessor OBLPN Filtering
Goal: Only create stacks/shipments for records where OBLPN starts with "OBLPN" prefix
File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/OracleProcessor.cs
Changes:
- Add early filter in
ProcessShippingInfo()(line ~1073):
// Skip entity creation for non-OBLPN records (FARV*, etc.)
// Raw data is still logged, but we don't create shipments/stacks
var oblpnValue = shippingInfo.Header.Oblpn;
if (string.IsNullOrWhiteSpace(oblpnValue) || !oblpnValue.StartsWith("OBLPN", StringComparison.OrdinalIgnoreCase)) {
_logger.LogDebug("Skipping entity import for non-OBLPN record: {Oblpn}", oblpnValue);
_skippedNonOblpnRecords.Add(1); // New metric counter
return;
}
- Add new metrics counter for skipped records:
private static readonly Counter<long> _skippedNonOblpnRecords = _meter.CreateCounter<long>(
"bbu.oracle.skipped_non_oblpn",
description: "Number of Oracle records skipped due to non-OBLPN prefix"
);
- Add configuration option
OracleProcessorOptions.OblpnPrefixRequired(default: true) for backward compatibility during rollout
Success Criteria:
- New Oracle files only create entities for OBLPN* records
- Metrics show skipped record counts
- Raw data still logged for all records
Stage 2: ShipmentLifecycleProcessor OBLPN Filtering & RSSI-Based Basket Selection
Goal: Only process camera reads that decode to OBLPN* values AND use peak_rssi to select the top 20 strongest basket reads
File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ShipmentLifecycleProcessor.cs
Changes:
- In
ProcessCameraReadAsync()(line ~563), add filter after getting decoded value:
var oblpn = dlRead.DecodedValue?.Trim();
if (string.IsNullOrWhiteSpace(oblpn)) {
processing.ProcessingStatus = "NoOblpnValue";
processing.StatusMessage = "Camera decode yielded empty OBLPN - check raw data";
// ... existing handling
return;
}
// NEW: Filter for OBLPN prefix - skip product barcode reads (FARG*, etc.)
if (!oblpn.StartsWith("OBLPN", StringComparison.OrdinalIgnoreCase)) {
processing.ProcessingStatus = "NotOblpnPrefix";
processing.StatusMessage = $"Decoded value '{oblpn}' is not an OBLPN - likely product barcode";
processing.UpdatedAtUtc = now;
processing.ProcessedAtUtc = now;
return;
}
- CRITICAL: When correlating RFID reads in the match window, use peak_rssi to select top 20 distinct EPCs:
// peak_rssi is in dB (negative values) - higher = stronger signal = closer to reader
// Select top N strongest reads by DISTINCT EPC (configurable, default 20)
var maxBaskets = _options.MaxBasketsPerStack; // From configuration
var matchedTags = await iotDb.ZbTagReads
.Where(tr => tr.DeviceId != null && gantryDevices.Contains(tr.DeviceId))
.Where(tr => tr.EventTimestamp >= windowStart && tr.EventTimestamp <= windowEnd)
.GroupBy(tr => tr.Epc) // Group by EPC to get distinct baskets
.Select(g => g.OrderByDescending(tr => tr.PeakRssi).First()) // Strongest read per EPC
.OrderByDescending(tr => tr.PeakRssi) // Then order all by signal strength
.Take(maxBaskets)
.ToListAsync(cancellationToken);
Add "NotOblpnPrefix" to
TerminalStatusesset (line ~38)Update
ProcessingStatsclass to track this new statusAdd configuration option:
ShipmentLifecycleOptions.MaxBasketsPerStack(default: 20)
File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/CameraReadProcessing.cs
- No schema change needed -
ProcessingStatusis already a string field
Note on RSSI: peak_rssi is in decibels (dB), so values are negative.
- -40 dB = stronger signal (basket closer to reader)
- -70 dB = weaker signal (basket farther from reader)
OrderByDescendingputs strongest signals first
Success Criteria:
- Camera reads with FARG*, FAAG*, etc. are immediately terminal
- Only OBLPN* decoded values attempt stack/shipment matching
- Maximum 20 baskets allocated per stack (using strongest RSSI readings)
- Dashboard shows new status in breakdown
Stage 3: Unknown Destination Shipment Handling
Goal: Create per-time-window shipments for RFID reads without corresponding OBLPN camera reads
Core Principle: Every basket seen at the gantry MUST be attached to SOME shipment - no exceptions.
This provides system resilience and a simple diagnostic: any RFID read > 1 hour old should correlate to a basket → shipment.
New File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/UnknownDestinationProcessor.cs
Design:
public class UnknownDestinationProcessor : BackgroundService {
// Configuration (from UnknownDestinationOptions)
private readonly int _gracePeriodMinutes = 60; // Don't process reads from last hour (default)
private readonly int _activityGapMinutes = 30; // Define activity window boundary
private readonly string _unknownLocationCode = "UNKNOWN_DEST";
// Process flow:
// 1. Find RFID tag reads that are:
// - OLDER than grace period (default 1 hour) - gives time for normal processing
// - Been processed (basket exists)
// - Have no matching camera read with OBLPN within match window
// - Basket not already allocated to any shipment
// 2. Group by activity window (gaps > 30min define window boundaries)
// 3. Create shipment per window with "Unknown Destination" location
// 4. Allocate orphaned baskets to shipment
// 5. Follow normal lifecycle (IN_TRANSIT -> DELIVERED -> returns)
}
Configuration: UnknownDestinationOptions in abstractions:
public class UnknownDestinationOptions {
public bool Enabled { get; set; } = true;
public int GracePeriodMinutes { get; set; } = 60; // Wait 1 hour before processing orphans
public int ActivityGapMinutes { get; set; } = 30; // Activity gaps define window boundaries
public int PollingIntervalSeconds { get; set; } = 300; // Check every 5 minutes
}
Database Changes:
- Create "Unknown Destination" location in spatial.locations (seeding)
- NO new shipment_type - just use the unknown destination location to identify these shipments
Success Criteria:
- Every basket seen at gantry > 1 hour ago is attached to a shipment
- Orphaned baskets get allocated to trackable shipments
- Shipments follow normal lifecycle rules (including returns)
- Simple diagnostic:
RFID read → Basket → Shipmentchain exists for all reads > 1 hour old
Stage 4: Enhanced Failure Tracking + BBU Database Audit
Goal: Better visibility into camera/processing failures AND clean up redundant tables/views
Part A: BBU Database Audit & Cleanup
Current State: The BBU schema has grown organically with 13 tables and 41 views, many added by different agents without coordination:
Tables to Review (13 total):
IdentifierType- SUSPICIOUS: Should this be incoreschema instead?route_statistics- Keyless entity, used?dwell_times- Legacy structure with mixed naming conventions
Views to Consolidate (41 total - too many overlapping views):
| Category | Views | Issue |
|---|---|---|
| Return tracking | return_rate_analysis, return_rate_by_run, return_rate_by_run_sequence, return_rate_by_status, daily_return_tracking, location_return_performance, top_return_performers (7) |
Overlapping return metrics |
| Shipment tracking | shipment_detail, shipment_lifecycle_overview, shipment_status_distribution, stuck_shipments, allocation_lifecycle_breakdown (5) |
Redundant shipment views |
| Device monitoring | device_connection_history, device_current_status, device_daily_uptime, device_disconnect_patterns, problematic_devices (5) |
May duplicate IoT component |
| RFID reader | rfid_reader_daily_activity, rfid_reader_summary (2) |
May belong in IoT |
| Catalog views | catalog_* (5 views) |
Should be in Catalog component |
Cleanup Approach:
- Audit each view for actual usage in Superset dashboards
- Identify views that can be consolidated
- Remove views that duplicate IoT/Catalog component functionality
- Standardize naming conventions
- Create migration to drop unused views
Output: Document which views to keep/consolidate/remove
Part B: Enhanced Failure Tracking
Add Failure Stats Entity (engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/):
public class CameraReadFailureStats {
public long Id { get; set; }
public DateTime WindowStart { get; set; }
public DateTime WindowEnd { get; set; }
public int TotalReads { get; set; }
public int NoDecodeCount { get; set; }
public int ProductBarcodeCount { get; set; } // FARG*, etc.
public int OblpnMatchedCount { get; set; }
public int OblpnNoStackCount { get; set; }
public int OblpnNoShipmentCount { get; set; }
public Guid TenantId { get; set; }
}
Add Consolidated Views:
camera_processing_stats- Single view for all camera processing metrics- Consolidate return rate views into 1-2 focused views
Success Criteria:
- BBU schema is cleaned up with clear purpose for each table/view
- No duplicate functionality across views
- Dashboard shows camera decode success rate
- Failure reasons are visible for troubleshooting
Stage 5: Superset View Updates
Goal: Fix visualizations and add OBLPN filtering to analytics
File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/Analytics/SupersetAnalyticsViewsSql.cs
Changes:
- Update
bbu_oblpn_shipmentsto filter:
WHERE s.tracking_number LIKE 'OBLPN%'
- Add new view
bbu_camera_processing_stats:
SELECT
date_trunc('hour', processed_at_utc) as hour,
processing_status,
COUNT(*) as count
FROM bbu.camera_read_processing
GROUP BY 1, 2
- Update shipment detail views to show:
- Basket count (item_type = 'Bakery Basket')
- Stack count (item_type = 'Stack')
- Product allocation count (everything else)
- Flag suspicious counts (baskets > 30)
Success Criteria:
- Dashboard shows only OBLPN shipments by default
- Basket counts are accurate (not inflated by products)
- Processing status breakdown is visible
Stage 6: Historical Data Cleanup
Goal: Clean up existing FAR* shipments and invalid allocations
Approach: SQL scripts (not EF migrations for data changes)
Scripts (to be run manually with review):
-- Script 1: Identify scope of cleanup
SELECT
CASE WHEN tracking_number LIKE 'FAR%' THEN 'FAR*'
WHEN tracking_number LIKE 'OBLPN%' THEN 'OBLPN*'
ELSE 'OTHER' END as pattern,
status,
COUNT(*) as shipment_count,
SUM(item_quantity) as total_items
FROM transport.shipments
GROUP BY 1, 2
ORDER BY 1, 2;
-- Script 2: Mark FAR* shipments as ARCHIVED
UPDATE transport.shipments
SET status = 'ARCHIVED',
comments = COALESCE(comments, '') || ' [Archived: Non-OBLPN shipment]'
WHERE tracking_number LIKE 'FAR%'
AND status NOT IN ('COMPLETED', 'ARCHIVED');
-- Script 3: Remove product item allocations from shipments (keep baskets/stacks only)
DELETE FROM transport.shipment_item_allocations sia
USING catalog.items i, catalog.item_types it
WHERE sia.item_id = i.id
AND i.item_type_id = it.id
AND it.name NOT IN ('Bakery Basket', 'Stack');
-- Script 4: Archive stacks without OBLPN prefix
UPDATE catalog.items i
SET status_id = (SELECT id FROM catalog.item_statuses WHERE code = 'ARCHIVED')
FROM prism.identifiers pi
WHERE pi.associated_object_id = i.id
AND pi.value NOT LIKE 'OBLPN%';
Safety:
- Run in transaction with ROLLBACK first
- Create backup of affected tables
- Execute during maintenance window
Critical Files Summary
| File | Changes |
|---|---|
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/OracleProcessor.cs |
Add OBLPN prefix filter, metrics |
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ShipmentLifecycleProcessor.cs |
Add OBLPN prefix filter, new status |
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/UnknownDestinationProcessor.cs |
NEW - orphaned basket handling |
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Configuration/OracleProcessorOptions.cs |
Add prefix requirement option |
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Configuration/ShipmentLifecycleOptions.cs |
Add MaxBasketsPerStack option |
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Configuration/UnknownDestinationOptions.cs |
NEW |
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/Analytics/SupersetAnalyticsViewsSql.cs |
Update views |
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/Migrations/ |
Add failure stats table |
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/BbuDb.cs |
Register new entities |
Execution Order
Phase A: Local Development & Testing
- Stage 1 & 2 (parallel) - Filtering changes in code
- Stage 5 - Superset view updates
- Stage 3 - Unknown destination handling
- Stage 4 - Enhanced failure tracking
- Local Validation:
- Run local Aspire AppHost
- Process test Oracle XML files with OBLPN and FARV records
- Verify only OBLPN records create entities
- Test camera read processing with various decoded values
- Verify unknown destination shipments are created correctly
- Check Superset dashboards show correct data
Phase B: Local Data Cleanup & Validation
- Stage 6 - Run cleanup scripts against local database
- Verify cleanup scripts work correctly
- Check shipment counts before/after
- Validate no data loss for OBLPN records
Phase C: Azure Deployment
- Deploy code changes to Azure Container Apps
- Monitor metrics and logs for processing behavior
- Validate dashboards in Azure Superset instance
- Execute data cleanup scripts on Azure database (with backup)
Local Testing Validation Checklist
Before Code Changes (Baseline)
Run these queries locally to establish baseline:
-- Count shipments by prefix
SELECT
CASE WHEN tracking_number LIKE 'OBLPN%' THEN 'OBLPN*' ELSE 'OTHER' END as pattern,
COUNT(*) as count
FROM transport.shipments GROUP BY 1;
-- Count stacks with OBLPN identifiers
SELECT COUNT(DISTINCT i.id)
FROM catalog.items i
JOIN catalog.item_types it ON i.item_type_id = it.id
JOIN prism.identifiers pi ON pi.associated_object_id = i.id
JOIN core.identifier_types cit ON pi.identifier_type_id = cit.id
WHERE it.name = 'Stack' AND cit.code = 'OBLPN' AND pi.value LIKE 'OBLPN%';
After Code Changes
Oracle Processor Test:
- Upload/process a test XML file with both FARV and OBLPN records
- Verify: Only OBLPN records create new stacks/shipments
- Check logs for "Skipping entity import for non-OBLPN record" messages
- Verify metrics:
bbu.oracle.skipped_non_oblpncounter incremented
ShipmentLifecycle Processor Test:
- Insert test camera reads with various decoded_value patterns (OBLPN*, FARG*, empty)
- Run processor and verify:
- OBLPN* values attempt stack matching
- FARG*/other values get status "NotOblpnPrefix"
- Empty values get status "NoOblpnValue"
- RSSI Selection Test:
- Create 30+ RFID reads in a match window with varying peak_rssi values
- Verify only top 20 by RSSI (highest/least negative) are allocated
- Check that weaker signals are excluded from shipment allocation
Unknown Destination Test (after Stage 3):
- Create RFID reads without matching camera reads
- Run processor and verify unknown destination shipment created
- Check baskets allocated correctly
Validation Queries After All Changes
-- Verify no new FAR* shipments created
SELECT COUNT(*) FROM transport.shipments
WHERE tracking_number NOT LIKE 'OBLPN%'
AND created_at > '[deploy_timestamp]';
-- Verify camera processing statuses
SELECT processing_status, COUNT(*)
FROM bbu.camera_read_processing
WHERE processed_at_utc > '[deploy_timestamp]'
GROUP BY processing_status;
-- Check unknown destination shipments (identified by location, not type)
SELECT s.* FROM transport.shipments s
JOIN transport.deliveries d ON d.shipment_id = s.id
JOIN spatial.locations l ON d.location_id = l.id
WHERE l.code = 'UNKNOWN_DEST';
-- CORE PRINCIPLE DIAGNOSTIC: Every RFID read > 1 hour old should have basket → shipment
-- This query should return 0 rows after implementation
SELECT tr.id as tag_read_id, tr.epc, tr.event_timestamp
FROM iot.zb_tag_reads tr
LEFT JOIN bbu.tag_read_processing trp ON trp.zb_tag_read_id = tr.id
LEFT JOIN catalog.items basket ON trp.basket_id = basket.id
LEFT JOIN transport.shipment_item_allocations sia ON sia.item_id = basket.id
WHERE tr.event_timestamp < NOW() - INTERVAL '1 hour'
AND sia.id IS NULL -- No shipment allocation
AND trp.basket_id IS NOT NULL -- But basket was created
ORDER BY tr.event_timestamp DESC
LIMIT 100;
Rollback Plan
Each stage is independently deployable/rollbackable:
- Stages 1-2: Configuration flags to disable filtering
- Stage 3: Disable via
UnknownDestinationOptions.Enabled = false - Stage 4: Views are additive, no breaking changes
- Stage 5: Views are additive
- Stage 6: Database backup before execution
Open Questions (Resolved)
- Q: What prefix distinguishes valid OBLPNs? A: "OBLPN" prefix only - FARV* and others should be ignored
- Q: Should we backfill data? A: Yes, full cleanup of FAR* shipments
- Q: Unknown destination shipment grouping? A: Per time window (activity gaps)
- Q: Camera reading product GTINs expected? A: Yes, but we only act on OBLPN values
Metrics & Monitoring
After implementation, track:
bbu.oracle.skipped_non_oblpn- Records skipped in Oracle processorbbu.camera.not_oblpn_prefix- Camera reads filtered outbbu.unknown_dest.shipments_created- Unknown destination shipmentsbbu.unknown_dest.baskets_allocated- Baskets allocated to unknown dest
Dashboard alerts:
- Camera decode rate < 80% for sustained period
- High ratio of RFID reads without camera correlation
- Unknown destination shipment count > threshold