Documentation

fsds/bbu-data-processing-overhaul.md

BBU Data Processing Overhaul - Implementation Plan

Executive Summary

Modify BBU data processing to only create shipments/stacks/deliveries for OBLPN-prefixed records, add "unknown destination" shipment handling for orphaned RFID reads, improve failure tracking, and clean up existing invalid data.

Core Principle

Every basket seen at the gantry MUST be attached to SOME shipment - no exceptions.

This provides:

  • System resilience - all baskets are tracked
  • Simple diagnostic heuristic - any RFID read > 1 hour old should have: RFID Read → Basket → Shipment chain
  • Complete visibility into basket lifecycle

Current State Analysis

Database Findings

Issue Data
FAR* shipments 2,323 with avg 422 items each
OBLPN* shipments 429 with avg 437 items each
Shipments with 2000+ allocations 30+ (includes products, not just baskets)
Camera NoOblpnValue status 8,783 reads
Camera NoStackForOblpn status 3,963 reads
Camera reads as FARG* (product barcodes) 49,986
Camera reads starting with OBLPN* Only 14

Root Cause

The OracleProcessor creates stacks/shipments for ALL Oracle XML records regardless of whether the identifier starts with OBLPN. The ShipmentLifecycleProcessor tries to match camera reads to stacks but doesn't filter for OBLPN prefix early enough.


Implementation Stages

Stage 1: OracleProcessor OBLPN Filtering

Goal: Only create stacks/shipments for records where OBLPN starts with "OBLPN" prefix

File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/OracleProcessor.cs

Changes:

  1. Add early filter in ProcessShippingInfo() (line ~1073):
// Skip entity creation for non-OBLPN records (FARV*, etc.)
// Raw data is still logged, but we don't create shipments/stacks
var oblpnValue = shippingInfo.Header.Oblpn;
if (string.IsNullOrWhiteSpace(oblpnValue) || !oblpnValue.StartsWith("OBLPN", StringComparison.OrdinalIgnoreCase)) {
    _logger.LogDebug("Skipping entity import for non-OBLPN record: {Oblpn}", oblpnValue);
    _skippedNonOblpnRecords.Add(1); // New metric counter
    return;
}
  1. Add new metrics counter for skipped records:
private static readonly Counter<long> _skippedNonOblpnRecords = _meter.CreateCounter<long>(
    "bbu.oracle.skipped_non_oblpn",
    description: "Number of Oracle records skipped due to non-OBLPN prefix"
);
  1. Add configuration option OracleProcessorOptions.OblpnPrefixRequired (default: true) for backward compatibility during rollout

Success Criteria:

  • New Oracle files only create entities for OBLPN* records
  • Metrics show skipped record counts
  • Raw data still logged for all records

Stage 2: ShipmentLifecycleProcessor OBLPN Filtering & RSSI-Based Basket Selection

Goal: Only process camera reads that decode to OBLPN* values AND use peak_rssi to select the top 20 strongest basket reads

File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ShipmentLifecycleProcessor.cs

Changes:

  1. In ProcessCameraReadAsync() (line ~563), add filter after getting decoded value:
var oblpn = dlRead.DecodedValue?.Trim();

if (string.IsNullOrWhiteSpace(oblpn)) {
    processing.ProcessingStatus = "NoOblpnValue";
    processing.StatusMessage = "Camera decode yielded empty OBLPN - check raw data";
    // ... existing handling
    return;
}

// NEW: Filter for OBLPN prefix - skip product barcode reads (FARG*, etc.)
if (!oblpn.StartsWith("OBLPN", StringComparison.OrdinalIgnoreCase)) {
    processing.ProcessingStatus = "NotOblpnPrefix";
    processing.StatusMessage = $"Decoded value '{oblpn}' is not an OBLPN - likely product barcode";
    processing.UpdatedAtUtc = now;
    processing.ProcessedAtUtc = now;
    return;
}
  1. CRITICAL: When correlating RFID reads in the match window, use peak_rssi to select top 20 distinct EPCs:
// peak_rssi is in dB (negative values) - higher = stronger signal = closer to reader
// Select top N strongest reads by DISTINCT EPC (configurable, default 20)
var maxBaskets = _options.MaxBasketsPerStack; // From configuration

var matchedTags = await iotDb.ZbTagReads
    .Where(tr => tr.DeviceId != null && gantryDevices.Contains(tr.DeviceId))
    .Where(tr => tr.EventTimestamp >= windowStart && tr.EventTimestamp <= windowEnd)
    .GroupBy(tr => tr.Epc)  // Group by EPC to get distinct baskets
    .Select(g => g.OrderByDescending(tr => tr.PeakRssi).First()) // Strongest read per EPC
    .OrderByDescending(tr => tr.PeakRssi) // Then order all by signal strength
    .Take(maxBaskets)
    .ToListAsync(cancellationToken);
  1. Add "NotOblpnPrefix" to TerminalStatuses set (line ~38)

  2. Update ProcessingStats class to track this new status

  3. Add configuration option: ShipmentLifecycleOptions.MaxBasketsPerStack (default: 20)

File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/CameraReadProcessing.cs

  • No schema change needed - ProcessingStatus is already a string field

Note on RSSI: peak_rssi is in decibels (dB), so values are negative.

  • -40 dB = stronger signal (basket closer to reader)
  • -70 dB = weaker signal (basket farther from reader)
  • OrderByDescending puts strongest signals first

Success Criteria:

  • Camera reads with FARG*, FAAG*, etc. are immediately terminal
  • Only OBLPN* decoded values attempt stack/shipment matching
  • Maximum 20 baskets allocated per stack (using strongest RSSI readings)
  • Dashboard shows new status in breakdown

Stage 3: Unknown Destination Shipment Handling

Goal: Create per-time-window shipments for RFID reads without corresponding OBLPN camera reads

Core Principle: Every basket seen at the gantry MUST be attached to SOME shipment - no exceptions.
This provides system resilience and a simple diagnostic: any RFID read > 1 hour old should correlate to a basket → shipment.

New File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/UnknownDestinationProcessor.cs

Design:

public class UnknownDestinationProcessor : BackgroundService {
    // Configuration (from UnknownDestinationOptions)
    private readonly int _gracePeriodMinutes = 60;     // Don't process reads from last hour (default)
    private readonly int _activityGapMinutes = 30;     // Define activity window boundary
    private readonly string _unknownLocationCode = "UNKNOWN_DEST";

    // Process flow:
    // 1. Find RFID tag reads that are:
    //    - OLDER than grace period (default 1 hour) - gives time for normal processing
    //    - Been processed (basket exists)
    //    - Have no matching camera read with OBLPN within match window
    //    - Basket not already allocated to any shipment
    // 2. Group by activity window (gaps > 30min define window boundaries)
    // 3. Create shipment per window with "Unknown Destination" location
    // 4. Allocate orphaned baskets to shipment
    // 5. Follow normal lifecycle (IN_TRANSIT -> DELIVERED -> returns)
}

Configuration: UnknownDestinationOptions in abstractions:

public class UnknownDestinationOptions {
    public bool Enabled { get; set; } = true;
    public int GracePeriodMinutes { get; set; } = 60;      // Wait 1 hour before processing orphans
    public int ActivityGapMinutes { get; set; } = 30;       // Activity gaps define window boundaries
    public int PollingIntervalSeconds { get; set; } = 300;  // Check every 5 minutes
}

Database Changes:

  • Create "Unknown Destination" location in spatial.locations (seeding)
  • NO new shipment_type - just use the unknown destination location to identify these shipments

Success Criteria:

  • Every basket seen at gantry > 1 hour ago is attached to a shipment
  • Orphaned baskets get allocated to trackable shipments
  • Shipments follow normal lifecycle rules (including returns)
  • Simple diagnostic: RFID read → Basket → Shipment chain exists for all reads > 1 hour old

Stage 4: Enhanced Failure Tracking + BBU Database Audit

Goal: Better visibility into camera/processing failures AND clean up redundant tables/views

Part A: BBU Database Audit & Cleanup

Current State: The BBU schema has grown organically with 13 tables and 41 views, many added by different agents without coordination:

Tables to Review (13 total):

  • IdentifierType - SUSPICIOUS: Should this be in core schema instead?
  • route_statistics - Keyless entity, used?
  • dwell_times - Legacy structure with mixed naming conventions

Views to Consolidate (41 total - too many overlapping views):

Category Views Issue
Return tracking return_rate_analysis, return_rate_by_run, return_rate_by_run_sequence, return_rate_by_status, daily_return_tracking, location_return_performance, top_return_performers (7) Overlapping return metrics
Shipment tracking shipment_detail, shipment_lifecycle_overview, shipment_status_distribution, stuck_shipments, allocation_lifecycle_breakdown (5) Redundant shipment views
Device monitoring device_connection_history, device_current_status, device_daily_uptime, device_disconnect_patterns, problematic_devices (5) May duplicate IoT component
RFID reader rfid_reader_daily_activity, rfid_reader_summary (2) May belong in IoT
Catalog views catalog_* (5 views) Should be in Catalog component

Cleanup Approach:

  1. Audit each view for actual usage in Superset dashboards
  2. Identify views that can be consolidated
  3. Remove views that duplicate IoT/Catalog component functionality
  4. Standardize naming conventions
  5. Create migration to drop unused views

Output: Document which views to keep/consolidate/remove

Part B: Enhanced Failure Tracking

Add Failure Stats Entity (engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/):

public class CameraReadFailureStats {
    public long Id { get; set; }
    public DateTime WindowStart { get; set; }
    public DateTime WindowEnd { get; set; }
    public int TotalReads { get; set; }
    public int NoDecodeCount { get; set; }
    public int ProductBarcodeCount { get; set; } // FARG*, etc.
    public int OblpnMatchedCount { get; set; }
    public int OblpnNoStackCount { get; set; }
    public int OblpnNoShipmentCount { get; set; }
    public Guid TenantId { get; set; }
}

Add Consolidated Views:

  • camera_processing_stats - Single view for all camera processing metrics
  • Consolidate return rate views into 1-2 focused views

Success Criteria:

  • BBU schema is cleaned up with clear purpose for each table/view
  • No duplicate functionality across views
  • Dashboard shows camera decode success rate
  • Failure reasons are visible for troubleshooting

Stage 5: Superset View Updates

Goal: Fix visualizations and add OBLPN filtering to analytics

File: engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/Analytics/SupersetAnalyticsViewsSql.cs

Changes:

  1. Update bbu_oblpn_shipments to filter:
WHERE s.tracking_number LIKE 'OBLPN%'
  1. Add new view bbu_camera_processing_stats:
SELECT
    date_trunc('hour', processed_at_utc) as hour,
    processing_status,
    COUNT(*) as count
FROM bbu.camera_read_processing
GROUP BY 1, 2
  1. Update shipment detail views to show:
    • Basket count (item_type = 'Bakery Basket')
    • Stack count (item_type = 'Stack')
    • Product allocation count (everything else)
    • Flag suspicious counts (baskets > 30)

Success Criteria:

  • Dashboard shows only OBLPN shipments by default
  • Basket counts are accurate (not inflated by products)
  • Processing status breakdown is visible

Stage 6: Historical Data Cleanup

Goal: Clean up existing FAR* shipments and invalid allocations

Approach: SQL scripts (not EF migrations for data changes)

Scripts (to be run manually with review):

-- Script 1: Identify scope of cleanup
SELECT
    CASE WHEN tracking_number LIKE 'FAR%' THEN 'FAR*'
         WHEN tracking_number LIKE 'OBLPN%' THEN 'OBLPN*'
         ELSE 'OTHER' END as pattern,
    status,
    COUNT(*) as shipment_count,
    SUM(item_quantity) as total_items
FROM transport.shipments
GROUP BY 1, 2
ORDER BY 1, 2;

-- Script 2: Mark FAR* shipments as ARCHIVED
UPDATE transport.shipments
SET status = 'ARCHIVED',
    comments = COALESCE(comments, '') || ' [Archived: Non-OBLPN shipment]'
WHERE tracking_number LIKE 'FAR%'
  AND status NOT IN ('COMPLETED', 'ARCHIVED');

-- Script 3: Remove product item allocations from shipments (keep baskets/stacks only)
DELETE FROM transport.shipment_item_allocations sia
USING catalog.items i, catalog.item_types it
WHERE sia.item_id = i.id
  AND i.item_type_id = it.id
  AND it.name NOT IN ('Bakery Basket', 'Stack');

-- Script 4: Archive stacks without OBLPN prefix
UPDATE catalog.items i
SET status_id = (SELECT id FROM catalog.item_statuses WHERE code = 'ARCHIVED')
FROM prism.identifiers pi
WHERE pi.associated_object_id = i.id
  AND pi.value NOT LIKE 'OBLPN%';

Safety:

  • Run in transaction with ROLLBACK first
  • Create backup of affected tables
  • Execute during maintenance window

Critical Files Summary

File Changes
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/OracleProcessor.cs Add OBLPN prefix filter, metrics
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/ShipmentLifecycleProcessor.cs Add OBLPN prefix filter, new status
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu/Services/UnknownDestinationProcessor.cs NEW - orphaned basket handling
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Configuration/OracleProcessorOptions.cs Add prefix requirement option
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Configuration/ShipmentLifecycleOptions.cs Add MaxBasketsPerStack option
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Abstractions/Configuration/UnknownDestinationOptions.cs NEW
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/Analytics/SupersetAnalyticsViewsSql.cs Update views
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/Migrations/ Add failure stats table
engines/bbu/src/Acsis.Dynaplex.Engines.Bbu.Database/BbuDb.cs Register new entities

Execution Order

Phase A: Local Development & Testing

  1. Stage 1 & 2 (parallel) - Filtering changes in code
  2. Stage 5 - Superset view updates
  3. Stage 3 - Unknown destination handling
  4. Stage 4 - Enhanced failure tracking
  5. Local Validation:
    • Run local Aspire AppHost
    • Process test Oracle XML files with OBLPN and FARV records
    • Verify only OBLPN records create entities
    • Test camera read processing with various decoded values
    • Verify unknown destination shipments are created correctly
    • Check Superset dashboards show correct data

Phase B: Local Data Cleanup & Validation

  1. Stage 6 - Run cleanup scripts against local database
    • Verify cleanup scripts work correctly
    • Check shipment counts before/after
    • Validate no data loss for OBLPN records

Phase C: Azure Deployment

  1. Deploy code changes to Azure Container Apps
  2. Monitor metrics and logs for processing behavior
  3. Validate dashboards in Azure Superset instance
  4. Execute data cleanup scripts on Azure database (with backup)

Local Testing Validation Checklist

Before Code Changes (Baseline)

Run these queries locally to establish baseline:

-- Count shipments by prefix
SELECT
    CASE WHEN tracking_number LIKE 'OBLPN%' THEN 'OBLPN*' ELSE 'OTHER' END as pattern,
    COUNT(*) as count
FROM transport.shipments GROUP BY 1;

-- Count stacks with OBLPN identifiers
SELECT COUNT(DISTINCT i.id)
FROM catalog.items i
JOIN catalog.item_types it ON i.item_type_id = it.id
JOIN prism.identifiers pi ON pi.associated_object_id = i.id
JOIN core.identifier_types cit ON pi.identifier_type_id = cit.id
WHERE it.name = 'Stack' AND cit.code = 'OBLPN' AND pi.value LIKE 'OBLPN%';

After Code Changes

  1. Oracle Processor Test:

    • Upload/process a test XML file with both FARV and OBLPN records
    • Verify: Only OBLPN records create new stacks/shipments
    • Check logs for "Skipping entity import for non-OBLPN record" messages
    • Verify metrics: bbu.oracle.skipped_non_oblpn counter incremented
  2. ShipmentLifecycle Processor Test:

    • Insert test camera reads with various decoded_value patterns (OBLPN*, FARG*, empty)
    • Run processor and verify:
      • OBLPN* values attempt stack matching
      • FARG*/other values get status "NotOblpnPrefix"
      • Empty values get status "NoOblpnValue"
    • RSSI Selection Test:
      • Create 30+ RFID reads in a match window with varying peak_rssi values
      • Verify only top 20 by RSSI (highest/least negative) are allocated
      • Check that weaker signals are excluded from shipment allocation
  3. Unknown Destination Test (after Stage 3):

    • Create RFID reads without matching camera reads
    • Run processor and verify unknown destination shipment created
    • Check baskets allocated correctly

Validation Queries After All Changes

-- Verify no new FAR* shipments created
SELECT COUNT(*) FROM transport.shipments
WHERE tracking_number NOT LIKE 'OBLPN%'
  AND created_at > '[deploy_timestamp]';

-- Verify camera processing statuses
SELECT processing_status, COUNT(*)
FROM bbu.camera_read_processing
WHERE processed_at_utc > '[deploy_timestamp]'
GROUP BY processing_status;

-- Check unknown destination shipments (identified by location, not type)
SELECT s.* FROM transport.shipments s
JOIN transport.deliveries d ON d.shipment_id = s.id
JOIN spatial.locations l ON d.location_id = l.id
WHERE l.code = 'UNKNOWN_DEST';

-- CORE PRINCIPLE DIAGNOSTIC: Every RFID read > 1 hour old should have basket → shipment
-- This query should return 0 rows after implementation
SELECT tr.id as tag_read_id, tr.epc, tr.event_timestamp
FROM iot.zb_tag_reads tr
LEFT JOIN bbu.tag_read_processing trp ON trp.zb_tag_read_id = tr.id
LEFT JOIN catalog.items basket ON trp.basket_id = basket.id
LEFT JOIN transport.shipment_item_allocations sia ON sia.item_id = basket.id
WHERE tr.event_timestamp < NOW() - INTERVAL '1 hour'
  AND sia.id IS NULL  -- No shipment allocation
  AND trp.basket_id IS NOT NULL  -- But basket was created
ORDER BY tr.event_timestamp DESC
LIMIT 100;

Rollback Plan

Each stage is independently deployable/rollbackable:

  • Stages 1-2: Configuration flags to disable filtering
  • Stage 3: Disable via UnknownDestinationOptions.Enabled = false
  • Stage 4: Views are additive, no breaking changes
  • Stage 5: Views are additive
  • Stage 6: Database backup before execution

Open Questions (Resolved)

  1. Q: What prefix distinguishes valid OBLPNs? A: "OBLPN" prefix only - FARV* and others should be ignored
  2. Q: Should we backfill data? A: Yes, full cleanup of FAR* shipments
  3. Q: Unknown destination shipment grouping? A: Per time window (activity gaps)
  4. Q: Camera reading product GTINs expected? A: Yes, but we only act on OBLPN values

Metrics & Monitoring

After implementation, track:

  • bbu.oracle.skipped_non_oblpn - Records skipped in Oracle processor
  • bbu.camera.not_oblpn_prefix - Camera reads filtered out
  • bbu.unknown_dest.shipments_created - Unknown destination shipments
  • bbu.unknown_dest.baskets_allocated - Baskets allocated to unknown dest

Dashboard alerts:

  • Camera decode rate < 80% for sustained period
  • High ratio of RFID reads without camera correlation
  • Unknown destination shipment count > threshold