Documentation
reference/configuration/service-communication.md
Service Communication Patterns in Dynaplex Architecture
This guide covers inter-service communication patterns, best practices, and implementation strategies for the Acsis Core Dynaplex architecture.
🎯 Communication Overview
The Dynaplex architecture supports multiple communication patterns:
- Synchronous HTTP/HTTPS: Request-response via REST APIs
- Asynchronous Messaging: Event-driven communication (future)
- gRPC: High-performance binary protocol (future)
- GraphQL: Flexible query language (future)
🔄 Synchronous Communication
HTTP Client Configuration
// Program.cs - Configure typed HTTP clients
builder.Services.AddHttpClient<CoreDataApiClient>(client =>
{
client.BaseAddress = new Uri("https://localhost:40443");
client.DefaultRequestHeaders.Add("User-Agent", "Catalog-Service");
client.Timeout = TimeSpan.FromSeconds(30);
})
.AddStandardResilienceHandler(options =>
{
options.Retry = new HttpRetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromSeconds(1),
BackoffType = DelayBackoffType.Exponential,
UseJitter = true
};
options.CircuitBreaker = new HttpCircuitBreakerStrategyOptions
{
SamplingDuration = TimeSpan.FromSeconds(10),
FailureRatio = 0.5,
MinimumThroughput = 5,
BreakDuration = TimeSpan.FromSeconds(30)
};
options.AttemptTimeout = new HttpTimeoutStrategyOptions
{
Timeout = TimeSpan.FromSeconds(10)
};
});
// Add health checks for dependencies
builder.Services.AddHealthChecks()
.AddUrlGroup(
new Uri("https://localhost:40443/health"),
name: "core-data-service",
tags: new[] { "dependency", "critical" });
Service Discovery with Aspire
// Aspire automatically provides service discovery
builder.Services.AddServiceDiscovery();
// Configure HTTP client with service discovery
builder.Services.AddHttpClient<AssetApiClient>(
"asset-service",
client => client.BaseAddress = new Uri("https+http://asset-service"))
.AddServiceDiscovery();
// Usage in service
public class CatalogService
{
private readonly AssetApiClient _assetClient;
public async Task<CatalogItem> GetCatalogItemAsync(Guid assetId)
{
// Service discovery handles endpoint resolution
var asset = await _assetClient.Assets[assetId.ToString()].GetAsync();
return new CatalogItem
{
Id = asset.Id,
Name = asset.Name,
Category = DetermineCategory(asset.Type)
};
}
}
Request/Response Patterns
Standard Request Flow
public class OrderService
{
private readonly ILogger<OrderService> _logger;
private readonly AssetApiClient _assetClient;
private readonly InventoryApiClient _inventoryClient;
public async Task<Order> CreateOrderAsync(CreateOrderRequest request)
{
// Correlation ID for distributed tracing
var correlationId = Activity.Current?.Id ?? Guid.NewGuid().ToString();
using var activity = Activity.StartActivity("CreateOrder");
activity?.SetTag("order.items.count", request.Items.Count);
activity?.SetTag("correlation.id", correlationId);
try
{
// Parallel service calls
var tasks = request.Items.Select(async item =>
{
var assetTask = _assetClient.Assets[item.AssetId].GetAsync();
var inventoryTask = _inventoryClient.Inventory[item.AssetId].GetAsync();
await Task.WhenAll(assetTask, inventoryTask);
return new OrderItem
{
Asset = await assetTask,
Availability = await inventoryTask
};
});
var orderItems = await Task.WhenAll(tasks);
// Validate and create order
ValidateOrderItems(orderItems);
var order = new Order
{
Id = Guid.NewGuid(),
Items = orderItems,
CreatedAt = DateTime.UtcNow,
CorrelationId = correlationId
};
activity?.SetTag("order.id", order.Id);
_logger.LogInformation("Order {OrderId} created with {ItemCount} items",
order.Id, orderItems.Length);
return order;
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
_logger.LogError(ex, "Failed to create order with correlation ID {CorrelationId}",
correlationId);
throw;
}
}
}
Aggregation Pattern
public class DashboardService
{
private readonly HttpClient _httpClient;
private readonly IMemoryCache _cache;
public async Task<DashboardData> GetDashboardDataAsync()
{
// Check cache first
if (_cache.TryGetValue("dashboard", out DashboardData cached))
return cached;
// Parallel calls to multiple services
var tasks = new List<Task>();
var assetCountTask = GetAssetCountAsync();
var recentOrdersTask = GetRecentOrdersAsync();
var alertsTask = GetActiveAlertsAsync();
var metricsTask = GetMetricsAsync();
await Task.WhenAll(
assetCountTask,
recentOrdersTask,
alertsTask,
metricsTask);
var dashboard = new DashboardData
{
AssetCount = await assetCountTask,
RecentOrders = await recentOrdersTask,
ActiveAlerts = await alertsTask,
Metrics = await metricsTask
};
// Cache for 1 minute
_cache.Set("dashboard", dashboard, TimeSpan.FromMinutes(1));
return dashboard;
}
private async Task<int> GetAssetCountAsync()
{
var response = await _httpClient.GetAsync("https://asset-service/api/v1/assets/count");
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<int>();
}
}
🔐 Authentication & Authorization
Service-to-Service Authentication
// Token provider for service authentication
public class ServiceTokenProvider
{
private readonly IConfiguration _configuration;
private readonly IMemoryCache _cache;
public async Task<string> GetTokenAsync()
{
if (_cache.TryGetValue("service_token", out string token))
return token;
var tokenRequest = new ClientCredentialsTokenRequest
{
Address = _configuration["Auth:TokenEndpoint"],
ClientId = _configuration["Auth:ClientId"],
ClientSecret = _configuration["Auth:ClientSecret"],
Scope = "acsis.api"
};
using var client = new HttpClient();
var response = await client.RequestClientCredentialsTokenAsync(tokenRequest);
if (response.IsError)
throw new AuthenticationException($"Token request failed: {response.Error}");
_cache.Set("service_token", response.AccessToken,
TimeSpan.FromSeconds(response.ExpiresIn - 60));
return response.AccessToken;
}
}
// HTTP client with authentication
public class AuthenticatedHttpClientHandler : DelegatingHandler
{
private readonly ServiceTokenProvider _tokenProvider;
public AuthenticatedHttpClientHandler(ServiceTokenProvider tokenProvider)
{
_tokenProvider = tokenProvider;
}
protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken cancellationToken)
{
var token = await _tokenProvider.GetTokenAsync();
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
return await base.SendAsync(request, cancellationToken);
}
}
// Registration
builder.Services.AddTransient<AuthenticatedHttpClientHandler>();
builder.Services.AddHttpClient<AssetApiClient>()
.AddHttpMessageHandler<AuthenticatedHttpClientHandler>();
User Context Propagation
public class UserContextPropagationHandler : DelegatingHandler
{
private readonly IHttpContextAccessor _httpContextAccessor;
protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken cancellationToken)
{
var httpContext = _httpContextAccessor.HttpContext;
if (httpContext?.User?.Identity?.IsAuthenticated == true)
{
// Forward user token
var token = await httpContext.GetTokenAsync("access_token");
if (!string.IsNullOrEmpty(token))
{
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
}
// Forward user context headers
request.Headers.Add("X-User-Id", httpContext.User.FindFirst("sub")?.Value);
request.Headers.Add("X-User-Name", httpContext.User.Identity.Name);
request.Headers.Add("X-Tenant-Id", httpContext.User.FindFirst("tenant_id")?.Value);
}
// Forward correlation ID
if (Activity.Current != null)
{
request.Headers.Add("X-Correlation-Id", Activity.Current.Id);
}
return await base.SendAsync(request, cancellationToken);
}
}
🔁 Resilience Patterns
Retry Policy
// Polly retry configuration
builder.Services.AddHttpClient<AssetApiClient>()
.AddPolicyHandler(HttpPolicyExtensions
.HandleTransientHttpError()
.OrResult(msg => !msg.IsSuccessStatusCode)
.WaitAndRetryAsync(
3,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: (outcome, timespan, retryCount, context) =>
{
var logger = context.Values["logger"] as ILogger;
logger?.LogWarning(
"Retry {RetryCount} after {Delay}ms",
retryCount,
timespan.TotalMilliseconds);
}));
Circuit Breaker
// Circuit breaker configuration
builder.Services.AddHttpClient<AssetApiClient>()
.AddPolicyHandler(HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(
handledEventsAllowedBeforeBreaking: 5,
durationOfBreak: TimeSpan.FromSeconds(30),
onBreak: (result, duration) =>
{
_logger.LogWarning("Circuit breaker opened for {Duration}", duration);
},
onReset: () =>
{
_logger.LogInformation("Circuit breaker reset");
}));
// Combined policies
var retryPolicy = HttpPolicyExtensions
.HandleTransientHttpError()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
var circuitBreakerPolicy = HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(5, TimeSpan.FromSeconds(30));
builder.Services.AddHttpClient<AssetApiClient>()
.AddPolicyHandler(Policy.WrapAsync(retryPolicy, circuitBreakerPolicy));
Timeout Handling
public class TimeoutService
{
private readonly HttpClient _httpClient;
public async Task<T> ExecuteWithTimeoutAsync<T>(
Func<Task<T>> operation,
TimeSpan timeout,
CancellationToken cancellationToken = default)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);
try
{
return await operation().ConfigureAwait(false);
}
catch (OperationCanceledException) when (cts.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
{
throw new TimeoutException($"Operation timed out after {timeout.TotalSeconds} seconds");
}
}
public async Task<Asset> GetAssetWithTimeoutAsync(Guid id)
{
return await ExecuteWithTimeoutAsync(
async () =>
{
var response = await _httpClient.GetAsync($"/api/v1/assets/{id}");
response.EnsureSuccessStatusCode();
return await response.Content.ReadFromJsonAsync<Asset>();
},
TimeSpan.FromSeconds(5));
}
}
Bulkhead Isolation
// Bulkhead pattern to isolate resources
builder.Services.AddHttpClient<CriticalApiClient>()
.AddPolicyHandler(Policy.BulkheadAsync<HttpResponseMessage>(
maxParallelization: 10,
maxQueuingActions: 50,
onBulkheadRejectedAsync: async context =>
{
_logger.LogWarning("Bulkhead rejected request");
}));
// Using SemaphoreSlim for custom bulkhead
public class BulkheadService
{
private readonly SemaphoreSlim _semaphore;
public BulkheadService(int maxConcurrency = 10)
{
_semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
}
public async Task<T> ExecuteAsync<T>(Func<Task<T>> operation)
{
await _semaphore.WaitAsync();
try
{
return await operation();
}
finally
{
_semaphore.Release();
}
}
}
📨 Asynchronous Communication (Future)
Event Bus Pattern
// Event definition
public record AssetCreatedEvent(
Guid AssetId,
string Name,
string Type,
DateTime CreatedAt) : IIntegrationEvent;
// Publisher
public class AssetService
{
private readonly IEventBus _eventBus;
public async Task<Asset> CreateAssetAsync(CreateAssetRequest request)
{
var asset = new Asset { /* ... */ };
await _repository.SaveAsync(asset);
// Publish event
await _eventBus.PublishAsync(new AssetCreatedEvent(
asset.Id,
asset.Name,
asset.Type,
asset.CreatedAt));
return asset;
}
}
// Subscriber
public class InventoryService : IEventHandler<AssetCreatedEvent>
{
public async Task HandleAsync(AssetCreatedEvent @event)
{
// Create inventory record for new asset
var inventory = new InventoryItem
{
AssetId = @event.AssetId,
Quantity = 0,
ReorderLevel = 10
};
await _repository.SaveAsync(inventory);
}
}
Saga Pattern
public class OrderSaga : Saga<OrderSagaData>,
IAmStartedByMessages<CreateOrderCommand>,
IHandleMessages<AssetReservedEvent>,
IHandleMessages<PaymentProcessedEvent>,
IHandleMessages<ShipmentCreatedEvent>
{
public async Task Handle(CreateOrderCommand message, IMessageHandlerContext context)
{
Data.OrderId = message.OrderId;
Data.Items = message.Items;
// Start saga - reserve assets
foreach (var item in message.Items)
{
await context.Send(new ReserveAssetCommand(item.AssetId, item.Quantity));
}
}
public async Task Handle(AssetReservedEvent message, IMessageHandlerContext context)
{
Data.ReservedAssets.Add(message.AssetId);
if (Data.AllAssetsReserved())
{
// Process payment
await context.Send(new ProcessPaymentCommand(Data.OrderId));
}
}
public async Task Handle(PaymentProcessedEvent message, IMessageHandlerContext context)
{
Data.PaymentProcessed = true;
// Create shipment
await context.Send(new CreateShipmentCommand(Data.OrderId));
}
public async Task Handle(ShipmentCreatedEvent message, IMessageHandlerContext context)
{
// Saga complete
MarkAsComplete();
}
}
🔍 Service Mesh (Future)
Istio Integration
# VirtualService for traffic management
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: asset-service
spec:
hosts:
- asset-service
http:
- match:
- headers:
x-version:
exact: v2
route:
- destination:
host: asset-service
subset: v2
weight: 100
- route:
- destination:
host: asset-service
subset: v1
weight: 90
- destination:
host: asset-service
subset: v2
weight: 10
📊 Communication Monitoring
Distributed Tracing
public class TracingMiddleware
{
private readonly RequestDelegate _next;
public async Task InvokeAsync(HttpContext context)
{
using var activity = Activity.StartActivity(
$"{context.Request.Method} {context.Request.Path}");
activity?.SetTag("http.method", context.Request.Method);
activity?.SetTag("http.url", context.Request.GetDisplayUrl());
activity?.SetTag("http.host", context.Request.Host.ToString());
activity?.SetTag("user.id", context.User?.FindFirst("sub")?.Value);
try
{
await _next(context);
activity?.SetTag("http.status_code", context.Response.StatusCode);
if (context.Response.StatusCode >= 400)
{
activity?.SetStatus(ActivityStatusCode.Error);
}
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.RecordException(ex);
throw;
}
}
}
Metrics Collection
public class CommunicationMetrics
{
private readonly IMetrics _metrics;
public void RecordRequest(string service, string method, int statusCode, double duration)
{
_metrics.Measure.Counter.Increment(
"http_requests_total",
new MetricTags(
new[] { "service", "method", "status" },
new[] { service, method, statusCode.ToString() }));
_metrics.Measure.Histogram.Update(
"http_request_duration_seconds",
duration,
new MetricTags(
new[] { "service", "method" },
new[] { service, method }));
}
public void RecordCircuitBreakerState(string service, string state)
{
_metrics.Measure.Gauge.SetValue(
"circuit_breaker_state",
state == "Open" ? 1 : 0,
new MetricTags("service", service));
}
}
🔒 Security Considerations
mTLS Between Services
// Configure mutual TLS
builder.Services.AddHttpClient<AssetApiClient>()
.ConfigurePrimaryHttpMessageHandler(() =>
{
var handler = new HttpClientHandler();
// Load client certificate
var cert = new X509Certificate2(
Path.Combine(AppContext.BaseDirectory, "certs/client.pfx"),
configuration["Certificates:Password"]);
handler.ClientCertificates.Add(cert);
// Validate server certificate
handler.ServerCertificateCustomValidationCallback =
(sender, certificate, chain, errors) =>
{
// Custom validation logic
return ValidateServerCertificate(certificate, chain, errors);
};
return handler;
});
API Key Authentication
public class ApiKeyAuthenticationHandler : DelegatingHandler
{
private readonly string _apiKey;
public ApiKeyAuthenticationHandler(IConfiguration configuration)
{
_apiKey = configuration["ServiceAuth:ApiKey"];
}
protected override async Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request,
CancellationToken cancellationToken)
{
request.Headers.Add("X-API-Key", _apiKey);
return await base.SendAsync(request, cancellationToken);
}
}
🎯 Best Practices
1. Use Typed Clients
// ✅ Good - Typed client
services.AddHttpClient<AssetApiClient>();
// ❌ Bad - Direct HttpClient usage
var client = new HttpClient();
2. Implement Timeouts
// ✅ Good - Explicit timeout
httpClient.Timeout = TimeSpan.FromSeconds(30);
// ❌ Bad - No timeout (default is 100 seconds)
3. Handle Failures Gracefully
// ✅ Good - Graceful degradation
try
{
return await _assetClient.GetAssetAsync(id);
}
catch (HttpRequestException)
{
return GetCachedAsset(id) ?? GetDefaultAsset();
}
// ❌ Bad - Let exception bubble up
return await _assetClient.GetAssetAsync(id);
4. Use Correlation IDs
// ✅ Good - Correlation tracking
request.Headers.Add("X-Correlation-Id", Activity.Current?.Id ?? Guid.NewGuid().ToString());
// ❌ Bad - No correlation tracking
5. Implement Health Checks
// ✅ Good - Health check for dependencies
services.AddHealthChecks()
.AddUrlGroup(new Uri("https://asset-service/health"), "asset-service");
// ❌ Bad - No health checks
📚 Resources
- .NET Aspire Service Discovery
- Polly Resilience
- OpenTelemetry .NET
- Microsoft.Extensions.Http.Resilience
🔧 Troubleshooting
Connection Refused
# Check service is running
curl https://localhost:40443/health
# Check firewall rules
netstat -an | grep 40443
# Check Docker network
docker network inspect acsis-network
Timeout Issues
// Increase timeout
httpClient.Timeout = TimeSpan.FromSeconds(60);
// Add retry with backoff
.WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)))
Certificate Errors
# Trust development certificate
dotnet dev-certs https --trust
# Check certificate expiry
openssl x509 -in cert.pem -noout -dates
Circuit Breaker Open
// Check circuit breaker state
var state = circuitBreaker.CircuitState;
// Manual reset if needed
circuitBreaker.Reset();