Documentation

reference/configuration/service-communication.md

Service Communication Patterns in Dynaplex Architecture

This guide covers inter-service communication patterns, best practices, and implementation strategies for the Acsis Core Dynaplex architecture.

🎯 Communication Overview

The Dynaplex architecture supports multiple communication patterns:

  • Synchronous HTTP/HTTPS: Request-response via REST APIs
  • Asynchronous Messaging: Event-driven communication (future)
  • gRPC: High-performance binary protocol (future)
  • GraphQL: Flexible query language (future)

🔄 Synchronous Communication

HTTP Client Configuration

// Program.cs - Configure typed HTTP clients
builder.Services.AddHttpClient<CoreDataApiClient>(client =>
{
    client.BaseAddress = new Uri("https://localhost:40443");
    client.DefaultRequestHeaders.Add("User-Agent", "Catalog-Service");
    client.Timeout = TimeSpan.FromSeconds(30);
})
.AddStandardResilienceHandler(options =>
{
    options.Retry = new HttpRetryStrategyOptions
    {
        MaxRetryAttempts = 3,
        Delay = TimeSpan.FromSeconds(1),
        BackoffType = DelayBackoffType.Exponential,
        UseJitter = true
    };
    options.CircuitBreaker = new HttpCircuitBreakerStrategyOptions
    {
        SamplingDuration = TimeSpan.FromSeconds(10),
        FailureRatio = 0.5,
        MinimumThroughput = 5,
        BreakDuration = TimeSpan.FromSeconds(30)
    };
    options.AttemptTimeout = new HttpTimeoutStrategyOptions
    {
        Timeout = TimeSpan.FromSeconds(10)
    };
});

// Add health checks for dependencies
builder.Services.AddHealthChecks()
    .AddUrlGroup(
        new Uri("https://localhost:40443/health"),
        name: "core-data-service",
        tags: new[] { "dependency", "critical" });

Service Discovery with Aspire

// Aspire automatically provides service discovery
builder.Services.AddServiceDiscovery();

// Configure HTTP client with service discovery
builder.Services.AddHttpClient<AssetApiClient>(
    "asset-service",
    client => client.BaseAddress = new Uri("https+http://asset-service"))
    .AddServiceDiscovery();

// Usage in service
public class CatalogService
{
    private readonly AssetApiClient _assetClient;
    
    public async Task<CatalogItem> GetCatalogItemAsync(Guid assetId)
    {
        // Service discovery handles endpoint resolution
        var asset = await _assetClient.Assets[assetId.ToString()].GetAsync();
        
        return new CatalogItem
        {
            Id = asset.Id,
            Name = asset.Name,
            Category = DetermineCategory(asset.Type)
        };
    }
}

Request/Response Patterns

Standard Request Flow

public class OrderService
{
    private readonly ILogger<OrderService> _logger;
    private readonly AssetApiClient _assetClient;
    private readonly InventoryApiClient _inventoryClient;
    
    public async Task<Order> CreateOrderAsync(CreateOrderRequest request)
    {
        // Correlation ID for distributed tracing
        var correlationId = Activity.Current?.Id ?? Guid.NewGuid().ToString();
        
        using var activity = Activity.StartActivity("CreateOrder");
        activity?.SetTag("order.items.count", request.Items.Count);
        activity?.SetTag("correlation.id", correlationId);
        
        try
        {
            // Parallel service calls
            var tasks = request.Items.Select(async item =>
            {
                var assetTask = _assetClient.Assets[item.AssetId].GetAsync();
                var inventoryTask = _inventoryClient.Inventory[item.AssetId].GetAsync();
                
                await Task.WhenAll(assetTask, inventoryTask);
                
                return new OrderItem
                {
                    Asset = await assetTask,
                    Availability = await inventoryTask
                };
            });
            
            var orderItems = await Task.WhenAll(tasks);
            
            // Validate and create order
            ValidateOrderItems(orderItems);
            
            var order = new Order
            {
                Id = Guid.NewGuid(),
                Items = orderItems,
                CreatedAt = DateTime.UtcNow,
                CorrelationId = correlationId
            };
            
            activity?.SetTag("order.id", order.Id);
            _logger.LogInformation("Order {OrderId} created with {ItemCount} items",
                order.Id, orderItems.Length);
            
            return order;
        }
        catch (Exception ex)
        {
            activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
            _logger.LogError(ex, "Failed to create order with correlation ID {CorrelationId}",
                correlationId);
            throw;
        }
    }
}

Aggregation Pattern

public class DashboardService
{
    private readonly HttpClient _httpClient;
    private readonly IMemoryCache _cache;
    
    public async Task<DashboardData> GetDashboardDataAsync()
    {
        // Check cache first
        if (_cache.TryGetValue("dashboard", out DashboardData cached))
            return cached;
        
        // Parallel calls to multiple services
        var tasks = new List<Task>();
        var assetCountTask = GetAssetCountAsync();
        var recentOrdersTask = GetRecentOrdersAsync();
        var alertsTask = GetActiveAlertsAsync();
        var metricsTask = GetMetricsAsync();
        
        await Task.WhenAll(
            assetCountTask,
            recentOrdersTask,
            alertsTask,
            metricsTask);
        
        var dashboard = new DashboardData
        {
            AssetCount = await assetCountTask,
            RecentOrders = await recentOrdersTask,
            ActiveAlerts = await alertsTask,
            Metrics = await metricsTask
        };
        
        // Cache for 1 minute
        _cache.Set("dashboard", dashboard, TimeSpan.FromMinutes(1));
        
        return dashboard;
    }
    
    private async Task<int> GetAssetCountAsync()
    {
        var response = await _httpClient.GetAsync("https://asset-service/api/v1/assets/count");
        response.EnsureSuccessStatusCode();
        return await response.Content.ReadFromJsonAsync<int>();
    }
}

🔐 Authentication & Authorization

Service-to-Service Authentication

// Token provider for service authentication
public class ServiceTokenProvider
{
    private readonly IConfiguration _configuration;
    private readonly IMemoryCache _cache;
    
    public async Task<string> GetTokenAsync()
    {
        if (_cache.TryGetValue("service_token", out string token))
            return token;
        
        var tokenRequest = new ClientCredentialsTokenRequest
        {
            Address = _configuration["Auth:TokenEndpoint"],
            ClientId = _configuration["Auth:ClientId"],
            ClientSecret = _configuration["Auth:ClientSecret"],
            Scope = "acsis.api"
        };
        
        using var client = new HttpClient();
        var response = await client.RequestClientCredentialsTokenAsync(tokenRequest);
        
        if (response.IsError)
            throw new AuthenticationException($"Token request failed: {response.Error}");
        
        _cache.Set("service_token", response.AccessToken,
            TimeSpan.FromSeconds(response.ExpiresIn - 60));
        
        return response.AccessToken;
    }
}

// HTTP client with authentication
public class AuthenticatedHttpClientHandler : DelegatingHandler
{
    private readonly ServiceTokenProvider _tokenProvider;
    
    public AuthenticatedHttpClientHandler(ServiceTokenProvider tokenProvider)
    {
        _tokenProvider = tokenProvider;
    }
    
    protected override async Task<HttpResponseMessage> SendAsync(
        HttpRequestMessage request,
        CancellationToken cancellationToken)
    {
        var token = await _tokenProvider.GetTokenAsync();
        request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
        
        return await base.SendAsync(request, cancellationToken);
    }
}

// Registration
builder.Services.AddTransient<AuthenticatedHttpClientHandler>();
builder.Services.AddHttpClient<AssetApiClient>()
    .AddHttpMessageHandler<AuthenticatedHttpClientHandler>();

User Context Propagation

public class UserContextPropagationHandler : DelegatingHandler
{
    private readonly IHttpContextAccessor _httpContextAccessor;
    
    protected override async Task<HttpResponseMessage> SendAsync(
        HttpRequestMessage request,
        CancellationToken cancellationToken)
    {
        var httpContext = _httpContextAccessor.HttpContext;
        
        if (httpContext?.User?.Identity?.IsAuthenticated == true)
        {
            // Forward user token
            var token = await httpContext.GetTokenAsync("access_token");
            if (!string.IsNullOrEmpty(token))
            {
                request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
            }
            
            // Forward user context headers
            request.Headers.Add("X-User-Id", httpContext.User.FindFirst("sub")?.Value);
            request.Headers.Add("X-User-Name", httpContext.User.Identity.Name);
            request.Headers.Add("X-Tenant-Id", httpContext.User.FindFirst("tenant_id")?.Value);
        }
        
        // Forward correlation ID
        if (Activity.Current != null)
        {
            request.Headers.Add("X-Correlation-Id", Activity.Current.Id);
        }
        
        return await base.SendAsync(request, cancellationToken);
    }
}

🔁 Resilience Patterns

Retry Policy

// Polly retry configuration
builder.Services.AddHttpClient<AssetApiClient>()
    .AddPolicyHandler(HttpPolicyExtensions
        .HandleTransientHttpError()
        .OrResult(msg => !msg.IsSuccessStatusCode)
        .WaitAndRetryAsync(
            3,
            retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
            onRetry: (outcome, timespan, retryCount, context) =>
            {
                var logger = context.Values["logger"] as ILogger;
                logger?.LogWarning(
                    "Retry {RetryCount} after {Delay}ms",
                    retryCount,
                    timespan.TotalMilliseconds);
            }));

Circuit Breaker

// Circuit breaker configuration
builder.Services.AddHttpClient<AssetApiClient>()
    .AddPolicyHandler(HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(
            handledEventsAllowedBeforeBreaking: 5,
            durationOfBreak: TimeSpan.FromSeconds(30),
            onBreak: (result, duration) =>
            {
                _logger.LogWarning("Circuit breaker opened for {Duration}", duration);
            },
            onReset: () =>
            {
                _logger.LogInformation("Circuit breaker reset");
            }));

// Combined policies
var retryPolicy = HttpPolicyExtensions
    .HandleTransientHttpError()
    .WaitAndRetryAsync(3, retryAttempt => 
        TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));

var circuitBreakerPolicy = HttpPolicyExtensions
    .HandleTransientHttpError()
    .CircuitBreakerAsync(5, TimeSpan.FromSeconds(30));

builder.Services.AddHttpClient<AssetApiClient>()
    .AddPolicyHandler(Policy.WrapAsync(retryPolicy, circuitBreakerPolicy));

Timeout Handling

public class TimeoutService
{
    private readonly HttpClient _httpClient;
    
    public async Task<T> ExecuteWithTimeoutAsync<T>(
        Func<Task<T>> operation,
        TimeSpan timeout,
        CancellationToken cancellationToken = default)
    {
        using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        cts.CancelAfter(timeout);
        
        try
        {
            return await operation().ConfigureAwait(false);
        }
        catch (OperationCanceledException) when (cts.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
        {
            throw new TimeoutException($"Operation timed out after {timeout.TotalSeconds} seconds");
        }
    }
    
    public async Task<Asset> GetAssetWithTimeoutAsync(Guid id)
    {
        return await ExecuteWithTimeoutAsync(
            async () =>
            {
                var response = await _httpClient.GetAsync($"/api/v1/assets/{id}");
                response.EnsureSuccessStatusCode();
                return await response.Content.ReadFromJsonAsync<Asset>();
            },
            TimeSpan.FromSeconds(5));
    }
}

Bulkhead Isolation

// Bulkhead pattern to isolate resources
builder.Services.AddHttpClient<CriticalApiClient>()
    .AddPolicyHandler(Policy.BulkheadAsync<HttpResponseMessage>(
        maxParallelization: 10,
        maxQueuingActions: 50,
        onBulkheadRejectedAsync: async context =>
        {
            _logger.LogWarning("Bulkhead rejected request");
        }));

// Using SemaphoreSlim for custom bulkhead
public class BulkheadService
{
    private readonly SemaphoreSlim _semaphore;
    
    public BulkheadService(int maxConcurrency = 10)
    {
        _semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
    }
    
    public async Task<T> ExecuteAsync<T>(Func<Task<T>> operation)
    {
        await _semaphore.WaitAsync();
        try
        {
            return await operation();
        }
        finally
        {
            _semaphore.Release();
        }
    }
}

📨 Asynchronous Communication (Future)

Event Bus Pattern

// Event definition
public record AssetCreatedEvent(
    Guid AssetId,
    string Name,
    string Type,
    DateTime CreatedAt) : IIntegrationEvent;

// Publisher
public class AssetService
{
    private readonly IEventBus _eventBus;
    
    public async Task<Asset> CreateAssetAsync(CreateAssetRequest request)
    {
        var asset = new Asset { /* ... */ };
        
        await _repository.SaveAsync(asset);
        
        // Publish event
        await _eventBus.PublishAsync(new AssetCreatedEvent(
            asset.Id,
            asset.Name,
            asset.Type,
            asset.CreatedAt));
        
        return asset;
    }
}

// Subscriber
public class InventoryService : IEventHandler<AssetCreatedEvent>
{
    public async Task HandleAsync(AssetCreatedEvent @event)
    {
        // Create inventory record for new asset
        var inventory = new InventoryItem
        {
            AssetId = @event.AssetId,
            Quantity = 0,
            ReorderLevel = 10
        };
        
        await _repository.SaveAsync(inventory);
    }
}

Saga Pattern

public class OrderSaga : Saga<OrderSagaData>,
    IAmStartedByMessages<CreateOrderCommand>,
    IHandleMessages<AssetReservedEvent>,
    IHandleMessages<PaymentProcessedEvent>,
    IHandleMessages<ShipmentCreatedEvent>
{
    public async Task Handle(CreateOrderCommand message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        Data.Items = message.Items;
        
        // Start saga - reserve assets
        foreach (var item in message.Items)
        {
            await context.Send(new ReserveAssetCommand(item.AssetId, item.Quantity));
        }
    }
    
    public async Task Handle(AssetReservedEvent message, IMessageHandlerContext context)
    {
        Data.ReservedAssets.Add(message.AssetId);
        
        if (Data.AllAssetsReserved())
        {
            // Process payment
            await context.Send(new ProcessPaymentCommand(Data.OrderId));
        }
    }
    
    public async Task Handle(PaymentProcessedEvent message, IMessageHandlerContext context)
    {
        Data.PaymentProcessed = true;
        
        // Create shipment
        await context.Send(new CreateShipmentCommand(Data.OrderId));
    }
    
    public async Task Handle(ShipmentCreatedEvent message, IMessageHandlerContext context)
    {
        // Saga complete
        MarkAsComplete();
    }
}

🔍 Service Mesh (Future)

Istio Integration

# VirtualService for traffic management
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: asset-service
spec:
  hosts:
  - asset-service
  http:
  - match:
    - headers:
        x-version:
          exact: v2
    route:
    - destination:
        host: asset-service
        subset: v2
      weight: 100
  - route:
    - destination:
        host: asset-service
        subset: v1
      weight: 90
    - destination:
        host: asset-service
        subset: v2
      weight: 10

📊 Communication Monitoring

Distributed Tracing

public class TracingMiddleware
{
    private readonly RequestDelegate _next;
    
    public async Task InvokeAsync(HttpContext context)
    {
        using var activity = Activity.StartActivity(
            $"{context.Request.Method} {context.Request.Path}");
        
        activity?.SetTag("http.method", context.Request.Method);
        activity?.SetTag("http.url", context.Request.GetDisplayUrl());
        activity?.SetTag("http.host", context.Request.Host.ToString());
        activity?.SetTag("user.id", context.User?.FindFirst("sub")?.Value);
        
        try
        {
            await _next(context);
            
            activity?.SetTag("http.status_code", context.Response.StatusCode);
            
            if (context.Response.StatusCode >= 400)
            {
                activity?.SetStatus(ActivityStatusCode.Error);
            }
        }
        catch (Exception ex)
        {
            activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
            activity?.RecordException(ex);
            throw;
        }
    }
}

Metrics Collection

public class CommunicationMetrics
{
    private readonly IMetrics _metrics;
    
    public void RecordRequest(string service, string method, int statusCode, double duration)
    {
        _metrics.Measure.Counter.Increment(
            "http_requests_total",
            new MetricTags(
                new[] { "service", "method", "status" },
                new[] { service, method, statusCode.ToString() }));
        
        _metrics.Measure.Histogram.Update(
            "http_request_duration_seconds",
            duration,
            new MetricTags(
                new[] { "service", "method" },
                new[] { service, method }));
    }
    
    public void RecordCircuitBreakerState(string service, string state)
    {
        _metrics.Measure.Gauge.SetValue(
            "circuit_breaker_state",
            state == "Open" ? 1 : 0,
            new MetricTags("service", service));
    }
}

🔒 Security Considerations

mTLS Between Services

// Configure mutual TLS
builder.Services.AddHttpClient<AssetApiClient>()
    .ConfigurePrimaryHttpMessageHandler(() =>
    {
        var handler = new HttpClientHandler();
        
        // Load client certificate
        var cert = new X509Certificate2(
            Path.Combine(AppContext.BaseDirectory, "certs/client.pfx"),
            configuration["Certificates:Password"]);
        
        handler.ClientCertificates.Add(cert);
        
        // Validate server certificate
        handler.ServerCertificateCustomValidationCallback = 
            (sender, certificate, chain, errors) =>
            {
                // Custom validation logic
                return ValidateServerCertificate(certificate, chain, errors);
            };
        
        return handler;
    });

API Key Authentication

public class ApiKeyAuthenticationHandler : DelegatingHandler
{
    private readonly string _apiKey;
    
    public ApiKeyAuthenticationHandler(IConfiguration configuration)
    {
        _apiKey = configuration["ServiceAuth:ApiKey"];
    }
    
    protected override async Task<HttpResponseMessage> SendAsync(
        HttpRequestMessage request,
        CancellationToken cancellationToken)
    {
        request.Headers.Add("X-API-Key", _apiKey);
        return await base.SendAsync(request, cancellationToken);
    }
}

🎯 Best Practices

1. Use Typed Clients

// ✅ Good - Typed client
services.AddHttpClient<AssetApiClient>();

// ❌ Bad - Direct HttpClient usage
var client = new HttpClient();

2. Implement Timeouts

// ✅ Good - Explicit timeout
httpClient.Timeout = TimeSpan.FromSeconds(30);

// ❌ Bad - No timeout (default is 100 seconds)

3. Handle Failures Gracefully

// ✅ Good - Graceful degradation
try
{
    return await _assetClient.GetAssetAsync(id);
}
catch (HttpRequestException)
{
    return GetCachedAsset(id) ?? GetDefaultAsset();
}

// ❌ Bad - Let exception bubble up
return await _assetClient.GetAssetAsync(id);

4. Use Correlation IDs

// ✅ Good - Correlation tracking
request.Headers.Add("X-Correlation-Id", Activity.Current?.Id ?? Guid.NewGuid().ToString());

// ❌ Bad - No correlation tracking

5. Implement Health Checks

// ✅ Good - Health check for dependencies
services.AddHealthChecks()
    .AddUrlGroup(new Uri("https://asset-service/health"), "asset-service");

// ❌ Bad - No health checks

📚 Resources

🔧 Troubleshooting

Connection Refused

# Check service is running
curl https://localhost:40443/health

# Check firewall rules
netstat -an | grep 40443

# Check Docker network
docker network inspect acsis-network

Timeout Issues

// Increase timeout
httpClient.Timeout = TimeSpan.FromSeconds(60);

// Add retry with backoff
.WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)))

Certificate Errors

# Trust development certificate
dotnet dev-certs https --trust

# Check certificate expiry
openssl x509 -in cert.pem -noout -dates

Circuit Breaker Open

// Check circuit breaker state
var state = circuitBreaker.CircuitState;

// Manual reset if needed
circuitBreaker.Reset();