Documentation

fsds/bbu-oracle-processor-optimization.md

OracleProcessor Database Operation Optimization

Problem Statement

The OracleProcessor is currently achieving ~6-7 files/second with 64 parallel workers. PostgreSQL monitoring shows 43 connections waiting on LWLocks, indicating the database is the bottleneck. Analysis reveals severe inefficiencies in how database operations are structured.

Current State (Per File)

Metric Current Target Reduction
Service Scopes 60+ 10-15 ~75%
SaveChangesAsync calls 60+ 15-20 ~70%
Database Queries 350+ 80-100 ~70%

Root Cause: Each entity upsert creates its own scope, does individual lookups, and calls SaveChangesAsync independently.


Optimization Strategy

Phase 1: Refactor ProcessShippingInfo to Use Shared Scopes

Goal: Create scopes once at the start, pass DbContexts to helper methods.

Current Pattern:

// Each method creates its own scope
await UpsertLocationAsync(...);     // Creates BbuDb, SpatialDb, PrismDb scopes
await UpsertItemTypeAsync(...);     // Creates CatalogDb, PrismDb scopes
await UpsertShipmentAsync(...);     // Creates TransportDb, PrismDb scopes

New Pattern:

using var scope = _services.CreateScope();
var catalogDb = scope.ServiceProvider.GetRequiredService<CatalogDb>();
var prismDb = scope.ServiceProvider.GetRequiredService<PrismDb>();
var transportDb = scope.ServiceProvider.GetRequiredService<TransportDb>();
var spatialDb = scope.ServiceProvider.GetRequiredService<SpatialDb>();

// Pass contexts to methods
await UpsertLocationAsync(spatialDb, prismDb, ...);
await UpsertItemTypeAsync(catalogDb, prismDb, ...);

Files to Modify:

  • OracleProcessor.cs: Lines 982-1180 (ProcessShippingInfo)

Phase 2: Batch Entity Upserts Before Processing Details

Goal: Pre-process all unique entities once, then reference them during detail processing.

Current Pattern (lines 1046-1112):

foreach(var item in uniqueItems) {
    await UpsertItemTypeAsync(item.ItemAlternateCode, ...);  // Scope + saves each
}
foreach(var detail in details) {
    await UpsertLocationAsync(detail.ShipToFacilityCode, ...);  // Scope + saves each
}

New Pattern:

// Collect all unique values first
var uniqueItemCodes = details.Select(d => d.ItemAlternateCode).Distinct();
var uniqueLocationCodes = details.Select(d => d.ShipToFacilityCode).Distinct();

// Batch upsert all item types (single scope, batched saves)
var itemTypeMap = await BatchUpsertItemTypesAsync(catalogDb, prismDb, uniqueItemCodes);

// Batch upsert all locations (single scope, batched saves)
var locationMap = await BatchUpsertLocationsAsync(spatialDb, prismDb, uniqueLocationCodes);

// Now process details using cached maps (no DB calls in loop)
foreach(var detail in details) {
    var itemTypeId = itemTypeMap[detail.ItemAlternateCode];
    var locationId = locationMap[detail.ShipToFacilityCode];
    // Use cached values...
}

New Methods to Create:

  • BatchUpsertItemTypesAsync() - Process all item types, return Dictionary<string, Guid>
  • BatchUpsertLocationsAsync() - Process all locations, return Dictionary<string, Location>

Phase 3: Consolidate SaveChangesAsync Calls

Goal: Reduce from 60+ saves to ~15-20 per file.

Strategy:

  1. Add entities to context without saving
  2. Call SaveChangesAsync at logical boundaries:
    • After all reference data (categories, locations, item types)
    • After shipment + deliveries
    • After stack + allocations + product items

Example Consolidation:

// Instead of saving after each identifier:
await EnsureIdentifierAsync(prismDb, id1, ...);  // Query + possible save
await EnsureIdentifierAsync(prismDb, id2, ...);  // Query + possible save
await EnsureIdentifierAsync(prismDb, id3, ...);  // Query + possible save

// Batch them:
EnsureIdentifier(prismDb, id1, ...);  // Add to context, no save
EnsureIdentifier(prismDb, id2, ...);  // Add to context, no save
EnsureIdentifier(prismDb, id3, ...);  // Add to context, no save
await prismDb.SaveChangesAsync();     // Single save

Phase 4: Optimize SyncStackProductItemsAsync N+1 Queries

Goal: Reduce 300+ queries per file to ~20.

Current Pattern (lines 1264-1458):

foreach(var detail in details) {  // 100 iterations
    // 3 queries per iteration = 300 queries
    var matchingIds = await GetEntityIdsByIdentifierAsync(prismDb, ...);
    var productItem = await catalogDb.Items.FirstOrDefaultAsync(...);
    var allocation = await transportDb.ShipmentItemAllocations.FirstOrDefaultAsync(...);
}

New Pattern:

// Pre-load all needed data in 3 queries total
var allComponentKeys = details.Select(d => BuildComponentKey(d)).ToList();

var existingIdentifiers = await prismDb.Identifiers
    .Where(i => i.IdentifierType.Code == "STACK_COMPONENT_KEY" && allComponentKeys.Contains(i.Value))
    .ToDictionaryAsync(i => i.Value, i => i.AssociatedObjectId);

var existingItemIds = existingIdentifiers.Values.ToList();
var existingItems = await catalogDb.Items
    .Where(i => existingItemIds.Contains(i.Id))
    .ToDictionaryAsync(i => i.Id);

var existingAllocations = await transportDb.ShipmentItemAllocations
    .Where(a => a.ShipmentId == shipmentId && existingItemIds.Contains(a.ItemId))
    .ToDictionaryAsync(a => a.ItemId);

// Now loop with no queries
foreach(var detail in details) {
    var key = BuildComponentKey(detail);
    existingIdentifiers.TryGetValue(key, out var itemId);
    // Use cached data...
}

Implementation Order

Phase Scope Impact Risk
1 Shared scopes Medium Low
2 Batch upserts High Medium
3 Consolidate saves High Low
4 Fix N+1 queries Very High Medium

Recommended approach: Implement phases 1-3 together (they're interrelated), then phase 4.


Files to Modify

File Changes
OracleProcessor.cs Main refactoring - ProcessShippingInfo, all Upsert methods

Expected Results

  • Current: ~6-7 files/sec with 64 workers (DB bottleneck)
  • Target: 15-25 files/sec with fewer workers
  • Rationale: 70% fewer DB round trips = significantly less lock contention

Verification

After implementation:

  1. Check pg_stat_activity for reduced LWLock waits
  2. Measure files/sec with same worker count
  3. Monitor CPU usage (should be higher as DB is no longer bottleneck)