Performance Optimization
Guidelines and patterns for building high-performance Crosser modules.
General Principles
- Async/Await properly - Don't block threads
- Initialize resources once - Avoid recreation per message
- Batch when appropriate - Reduce external call overhead
- Use appropriate collections - Thread-safe when needed
- Dispose resources properly - Prevent memory leaks
Resource Initialization
Initialize Once, Use Many Times
❌ Wrong - Recreates resources per message:
protected override async Task MessageReceived(IFlowMessage message)
{
// Creating new instances for every message is expensive!
var httpClient = new HttpClient();
var result = await httpClient.GetAsync("https://api.example.com/data");
httpClient.Dispose();
await this.Next(message);
}
✅ Correct - Initialize once, reuse:
private HttpClient httpClient;
public override async Task<IError?> Initialize()
{
// Create once during initialization
this.httpClient = new HttpClient
{
BaseAddress = new Uri(this.Settings.ApiBaseUrl),
Timeout = TimeSpan.FromSeconds(this.Settings.TimeoutSeconds)
};
this.httpClient.DefaultRequestHeaders.Add("User-Agent", "CrosserModule/1.0");
return await base.Initialize();
}
protected override async Task MessageReceived(IFlowMessage message)
{
// Reuse the same instance
var result = await this.httpClient.GetAsync("/data");
await this.Next(message);
}
public override async Task Stop()
{
// Cleanup when module stops
this.httpClient?.Dispose();
await base.Stop();
}
Async/Await Patterns
Non-Blocking Operations
❌ Wrong - Blocking calls:
protected override async Task MessageReceived(IFlowMessage message)
{
// .Result blocks the thread!
var data = this.httpClient.GetAsync("/data").Result;
// .Wait() blocks the thread!
this.ProcessDataAsync(message).Wait();
await this.Next(message);
}
✅ Correct - Proper async/await:
protected override async Task MessageReceived(IFlowMessage message)
{
// Non-blocking awaits
var data = await this.httpClient.GetAsync("/data");
await this.ProcessDataAsync(message);
await this.Next(message);
}
CPU-Intensive Operations
For CPU-bound work, use Task.Run to avoid blocking when message processing order is not critical:
protected override async Task MessageReceived(IFlowMessage message)
{
try
{
var data = message.Get<byte[]>("imageData");
// Offload CPU-intensive work to thread pool
var processed = await Task.Run(() => this.ProcessImage(data));
message.Set("processedImage", processed);
}
catch (Exception ex)
{
Error(ex, "Image processing failed");
var error = "Image processing failed";
this.SetStatus(Status.Warning, error);
message.SetError(error);
}
finally
{
await this.Next(message);
}
}
private byte[] ProcessImage(byte[] imageData)
{
// CPU-intensive synchronous processing
// This runs on a thread pool thread, not blocking the module
return ImageProcessor.Resize(imageData, 800, 600);
}
Batching Patterns
One-Shot Timer Batching
This pattern collects messages and processes them in batches, using a one-shot timer that only fires when needed:
public class BatchProcessorModule : FlowModule<BatchProcessorModuleSettings>
{
private readonly ConcurrentQueue<IFlowMessage> messageQueue = new();
private readonly SemaphoreSlim semaphore = new(1, 1);
private Timer batchTimer;
private volatile bool isProcessing;
public override string UserFriendlyName => "Batch Processor";
public BatchProcessorModule() : base(FlowModuleType.Function) { }
protected override async Task MessageReceived(IFlowMessage message)
{
await this.semaphore.WaitAsync();
try
{
var wasEmpty = this.messageQueue.IsEmpty;
// Add message to queue (clone to avoid reference issues)
this.messageQueue.Enqueue(message.Clone());
if (wasEmpty)
{
// First message - start the timer
this.StartBatchTimer();
}
// Size threshold - process immediately if batch is full
if (this.messageQueue.Count >= this.Settings.BatchSize)
{
this.StopBatchTimer();
await this.ProcessBatch();
}
}
finally
{
this.semaphore.Release();
// Always forward original message
await this.Next(message);
}
}
private void StartBatchTimer()
{
if (this.batchTimer == null)
{
this.batchTimer = new Timer(
async _ => await this.ProcessBatchFromTimer(),
null,
Timeout.InfiniteTimeSpan, // Start disabled
Timeout.InfiniteTimeSpan // One-shot timer
);
}
// Set timer to fire once after timeout period
this.batchTimer.Change(
TimeSpan.FromSeconds(this.Settings.BatchTimeoutSeconds),
Timeout.InfiniteTimeSpan
);
}
private void StopBatchTimer()
{
this.batchTimer?.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
}
private async Task ProcessBatchFromTimer()
{
await this.ProcessBatch();
}
private async Task ProcessBatch()
{
if (this.isProcessing || this.messageQueue.IsEmpty)
{
return;
}
this.isProcessing = true;
try
{
var batch = new List<IFlowMessage>();
// Dequeue up to BatchSize messages
while (batch.Count < this.Settings.BatchSize &&
this.messageQueue.TryDequeue(out var msg))
{
batch.Add(msg);
}
if (batch.Any())
{
Information("Processing batch of {Count} messages", batch.Count);
// Extract only needed data for bulk operation
var ids = batch.Select(m => m.Get<string>("id")).ToList();
// Process batch in single external call
var result = await this.ProcessBulkAsync(ids);
// Send batch result
var batchMsg = new FlowMessage();
batchMsg.Set("batchSize", batch.Count);
batchMsg.Set("successCount", result.SuccessCount);
batchMsg.Set("failedCount", result.FailedCount);
batchMsg.Set("processedAt", DateTime.UtcNow);
await this.Next(batchMsg);
// Restart timer if messages remain
if (!this.messageQueue.IsEmpty)
{
this.StartBatchTimer();
}
}
}
catch (Exception ex)
{
Error(ex, "Batch processing failed");
this.SetStatus(Status.Warning, "Batch processing failed");
}
finally
{
this.isProcessing = false;
}
}
private async Task<BatchResult> ProcessBulkAsync(List<string> ids)
{
// Simulate bulk API call
await Task.Delay(100);
return new BatchResult { SuccessCount = ids.Count, FailedCount = 0 };
}
public override async Task Stop()
{
this.StopBatchTimer();
this.batchTimer?.Dispose();
// Process remaining messages
if (!this.messageQueue.IsEmpty)
{
await this.ProcessBatch();
}
this.semaphore?.Dispose();
await base.Stop();
}
}
public class BatchProcessorModuleSettings : FlowModuleSettings
{
[Display(Name = "Batch Size")]
[Description("Maximum number of messages to process in a batch")]
[Range(1, 1000)]
[DefaultValue(100)]
public int BatchSize { get; set; } = 100;
[Display(Name = "Batch Timeout (seconds)")]
[Description("Maximum time to wait before processing incomplete batch")]
[Range(1, 300)]
[DefaultValue(5)]
public int BatchTimeoutSeconds { get; set; } = 5;
}
public class BatchResult
{
public int SuccessCount { get; set; }
public int FailedCount { get; set; }
}
Key Design Points
- One-Shot Timer: Fires exactly once per batch period, no unnecessary checks
- Smart Reset: Timer restarts only when messages remain after processing
- Efficient Triggers: Size threshold stops timer and processes immediately
- ConcurrentQueue: Lock-free thread-safe operations
- Minimal Processing: Extract only required fields for bulk operations
- SemaphoreSlim: Prevents race conditions when starting/stopping timer
Thread-Safe Collections
Use Concurrent Collections for Shared State
❌ Wrong - Not thread-safe:
private readonly List<IFlowMessage> messageBuffer = new();
private readonly object lockObject = new();
protected override async Task MessageReceived(IFlowMessage message)
{
lock (this.lockObject)
{
this.messageBuffer.Add(message);
}
await this.Next(message);
}
✅ Correct - Use concurrent collections:
private readonly ConcurrentQueue<IFlowMessage> messageBuffer = new();
protected override async Task MessageReceived(IFlowMessage message)
{
// Lock-free, thread-safe enqueue
this.messageBuffer.Enqueue(message);
await this.Next(message);
}
Available Concurrent Collections
// Queue: First-in, first-out
private readonly ConcurrentQueue<T> queue = new();
// Stack: Last-in, first-out
private readonly ConcurrentStack<T> stack = new();
// Dictionary: Thread-safe key-value pairs
private readonly ConcurrentDictionary<TKey, TValue> dictionary = new();
// Bag: Unordered collection
private readonly ConcurrentBag<T> bag = new();
Memory Management
Dispose Resources Properly
Implement IDisposable for modules with unmanaged resources:
public class DatabaseModule : FlowModule<DatabaseModuleSettings>, IDisposable
{
private SqlConnection connection;
private Timer heartbeatTimer;
private bool disposed = false;
public override async Task<IError?> Initialize()
{
this.connection = new SqlConnection(this.Settings.ConnectionString);
await this.connection.OpenAsync();
this.heartbeatTimer = new Timer(
_ => this.CheckConnection(),
null,
TimeSpan.FromMinutes(1),
TimeSpan.FromMinutes(1)
);
return await base.Initialize();
}
protected override async Task MessageReceived(IFlowMessage message)
{
// Use connection...
await this.Next(message);
}
public override async Task Stop()
{
// Dispose resources
this.heartbeatTimer?.Dispose();
this.connection?.Close();
this.connection?.Dispose();
await base.Stop();
}
protected virtual void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
// Dispose managed resources
this.heartbeatTimer?.Dispose();
this.connection?.Dispose();
}
this.disposed = true;
}
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
}
Avoid Memory Leaks
Common memory leak patterns to avoid:
// ❌ WRONG - Unbounded growth
private readonly List<IFlowMessage> allMessages = new();
protected override async Task MessageReceived(IFlowMessage message)
{
this.allMessages.Add(message); // Grows forever!
await this.Next(message);
}
// ✅ CORRECT - Bounded collection
private readonly Queue<IFlowMessage> recentMessages = new();
private const int MaxBufferSize = 1000;
protected override async Task MessageReceived(IFlowMessage message)
{
this.recentMessages.Enqueue(message);
// Maintain size limit
while (this.recentMessages.Count > MaxBufferSize)
{
this.recentMessages.Dequeue();
}
await this.Next(message);
}
Unsubscribe from Events
// ❌ WRONG - Event handler leak
public override async Task<IError?> Start()
{
ExternalService.OnDataReceived += this.HandleData; // Never unsubscribed!
return await base.Start();
}
// ✅ CORRECT - Proper cleanup
public override async Task<IError?> Start()
{
ExternalService.OnDataReceived += this.HandleData;
return await base.Start();
}
public override async Task Stop()
{
ExternalService.OnDataReceived -= this.HandleData;
await base.Stop();
}
Connection Pooling
Database Connection Pooling
Most ADO.NET providers handle connection pooling automatically:
public class DatabaseModule : FlowModule<DatabaseModuleSettings>
{
private string connectionString;
public override async Task<IError?> Initialize()
{
// Connection string with pooling enabled (default for most providers)
this.connectionString = this.Settings.ConnectionString;
// Test connection
using var connection = new SqlConnection(this.connectionString);
await connection.OpenAsync();
return await base.Initialize();
}
protected override async Task MessageReceived(IFlowMessage message)
{
// Create connection per message - pooling handles efficiency
using var connection = new SqlConnection(this.connectionString);
await connection.OpenAsync();
// Use connection
using var command = connection.CreateCommand();
command.CommandText = "SELECT * FROM table WHERE id = @id";
command.Parameters.AddWithValue("@id", message.Get<int>("id"));
using var reader = await command.ExecuteReaderAsync();
// Process results...
await this.Next(message);
// Connection automatically returned to pool when disposed
}
}
HttpClient Reuse
// ✅ CORRECT - Single HttpClient instance
private HttpClient httpClient;
public override async Task<IError?> Initialize()
{
this.httpClient = new HttpClient
{
BaseAddress = new Uri(this.Settings.ApiBaseUrl),
Timeout = TimeSpan.FromSeconds(30)
};
// HttpClient handles connection pooling internally
return await base.Initialize();
}
protected override async Task MessageReceived(IFlowMessage message)
{
// Reuse same HttpClient - it handles connection pooling
var response = await this.httpClient.GetAsync("/endpoint");
await this.Next(message);
}
Performance Monitoring
Built-in Timing
private readonly Stopwatch processingTimer = new();
protected override async Task MessageReceived(IFlowMessage message)
{
this.processingTimer.Restart();
try
{
await this.ProcessMessage(message);
}
finally
{
this.processingTimer.Stop();
// Log slow operations
if (this.processingTimer.ElapsedMilliseconds > 1000)
{
Warning("Slow processing: {ElapsedMs}ms for message",
this.processingTimer.ElapsedMilliseconds);
}
await this.Next(message);
}
}
Message Rate Tracking
private int messageCount;
private DateTime lastResetTime = DateTime.UtcNow;
protected override async Task MessageReceived(IFlowMessage message)
{
Interlocked.Increment(ref this.messageCount);
var elapsed = DateTime.UtcNow - this.lastResetTime;
if (elapsed.TotalSeconds >= 60)
{
var rate = this.messageCount / elapsed.TotalSeconds;
Information("Processing rate: {Rate:F2} messages/second", rate);
this.messageCount = 0;
this.lastResetTime = DateTime.UtcNow;
}
await this.ProcessMessage(message);
await this.Next(message);
}
Performance Checklist
- [ ] Resources initialized once, not per message
- [ ] HttpClient/database connections reused
- [ ] Async/await used correctly (no blocking)
- [ ] CPU-intensive work offloaded with Task.Run
- [ ] Concurrent collections used for shared state
- [ ] Resources disposed properly in Stop()
- [ ] Event handlers unsubscribed
- [ ] Collections have size limits
- [ ] Batching used for bulk operations
- [ ] Timers disposed correctly
Common Performance Mistakes
1. Creating Resources Per Message
// ❌ Creates 1000 HttpClients for 1000 messages!
protected override async Task MessageReceived(IFlowMessage message)
{
var client = new HttpClient(); // Don't do this!
await client.GetAsync("...");
client.Dispose();
}
2. Blocking on Async Operations
// ❌ Blocks thread pool thread
protected override async Task MessageReceived(IFlowMessage message)
{
var result = SomeAsyncOperation().Result; // Blocking!
}
3. Unbounded Collections
// ❌ Grows without limit
private readonly List<IFlowMessage> buffer = new();
protected override async Task MessageReceived(IFlowMessage message)
{
this.buffer.Add(message); // Memory leak!
}
4. Unnecessary Synchronization
// ❌ Lock on every message when concurrent collection would work
private readonly List<int> values = new();
private readonly object lockObj = new();
protected override async Task MessageReceived(IFlowMessage message)
{
lock (this.lockObj) // Unnecessary lock!
{
this.values.Add(message.Get<int>("value"));
}
}
// ✅ Use concurrent collection instead
private readonly ConcurrentBag<int> values = new();
protected override async Task MessageReceived(IFlowMessage message)
{
this.values.Add(message.Get<int>("value")); // Lock-free!
}
Summary
Key principles for high-performance modules:
- Initialize once - Create resources during Initialize(), not per message
- Don't block - Use async/await properly, never use .Result or .Wait()
- Batch efficiently - Use one-shot timers and concurrent collections
- Manage memory - Dispose resources, limit collection sizes, unsubscribe from events
- Monitor performance - Track processing times and message rates
- Use thread-safe collections - ConcurrentQueue, ConcurrentDictionary when needed
- Pool connections - Reuse HttpClient and rely on connection pooling
Remember: Performance issues in your module can affect the entire flow. Design for efficiency from the start.