Table of Contents

Performance Optimization

Guidelines and patterns for building high-performance Crosser modules.

General Principles

  1. Async/Await properly - Don't block threads
  2. Initialize resources once - Avoid recreation per message
  3. Batch when appropriate - Reduce external call overhead
  4. Use appropriate collections - Thread-safe when needed
  5. Dispose resources properly - Prevent memory leaks

Resource Initialization

Initialize Once, Use Many Times

Wrong - Recreates resources per message:

protected override async Task MessageReceived(IFlowMessage message)
{
    // Creating new instances for every message is expensive!
    var httpClient = new HttpClient();
    var result = await httpClient.GetAsync("https://api.example.com/data");
    httpClient.Dispose();
    
    await this.Next(message);
}

Correct - Initialize once, reuse:

private HttpClient httpClient;

public override async Task<IError?> Initialize()
{
    // Create once during initialization
    this.httpClient = new HttpClient
    {
        BaseAddress = new Uri(this.Settings.ApiBaseUrl),
        Timeout = TimeSpan.FromSeconds(this.Settings.TimeoutSeconds)
    };
    
    this.httpClient.DefaultRequestHeaders.Add("User-Agent", "CrosserModule/1.0");
    
    return await base.Initialize();
}

protected override async Task MessageReceived(IFlowMessage message)
{
    // Reuse the same instance
    var result = await this.httpClient.GetAsync("/data");
    await this.Next(message);
}

public override async Task Stop()
{
    // Cleanup when module stops
    this.httpClient?.Dispose();
    await base.Stop();
}

Async/Await Patterns

Non-Blocking Operations

Wrong - Blocking calls:

protected override async Task MessageReceived(IFlowMessage message)
{
    // .Result blocks the thread!
    var data = this.httpClient.GetAsync("/data").Result;
    
    // .Wait() blocks the thread!
    this.ProcessDataAsync(message).Wait();
    
    await this.Next(message);
}

Correct - Proper async/await:

protected override async Task MessageReceived(IFlowMessage message)
{
    // Non-blocking awaits
    var data = await this.httpClient.GetAsync("/data");
    await this.ProcessDataAsync(message);
    await this.Next(message);
}

CPU-Intensive Operations

For CPU-bound work, use Task.Run to avoid blocking when message processing order is not critical:

protected override async Task MessageReceived(IFlowMessage message)
{
    try
    {
        var data = message.Get<byte[]>("imageData");
        
        // Offload CPU-intensive work to thread pool
        var processed = await Task.Run(() => this.ProcessImage(data));
        
        message.Set("processedImage", processed);
    }
    catch (Exception ex)
    {
        Error(ex, "Image processing failed");
        var error = "Image processing failed";
        this.SetStatus(Status.Warning, error);
        message.SetError(error);
    }
    finally
    {
        await this.Next(message);
    }
}

private byte[] ProcessImage(byte[] imageData)
{
    // CPU-intensive synchronous processing
    // This runs on a thread pool thread, not blocking the module
    return ImageProcessor.Resize(imageData, 800, 600);
}

Batching Patterns

One-Shot Timer Batching

This pattern collects messages and processes them in batches, using a one-shot timer that only fires when needed:

public class BatchProcessorModule : FlowModule<BatchProcessorModuleSettings>
{
    private readonly ConcurrentQueue<IFlowMessage> messageQueue = new();
    private readonly SemaphoreSlim semaphore = new(1, 1);
    private Timer batchTimer;
    private volatile bool isProcessing;

    public override string UserFriendlyName => "Batch Processor";

    public BatchProcessorModule() : base(FlowModuleType.Function) { }

    protected override async Task MessageReceived(IFlowMessage message)
    {
        await this.semaphore.WaitAsync();
        try
        {
            var wasEmpty = this.messageQueue.IsEmpty;
            
            // Add message to queue (clone to avoid reference issues)
            this.messageQueue.Enqueue(message.Clone());

            if (wasEmpty)
            {
                // First message - start the timer
                this.StartBatchTimer();
            }

            // Size threshold - process immediately if batch is full
            if (this.messageQueue.Count >= this.Settings.BatchSize)
            {
                this.StopBatchTimer();
                await this.ProcessBatch();
            }
        }
        finally
        {
            this.semaphore.Release();
            // Always forward original message
            await this.Next(message);
        }
    }

    private void StartBatchTimer()
    {
        if (this.batchTimer == null)
        {
            this.batchTimer = new Timer(
                async _ => await this.ProcessBatchFromTimer(),
                null,
                Timeout.InfiniteTimeSpan,  // Start disabled
                Timeout.InfiniteTimeSpan   // One-shot timer
            );
        }

        // Set timer to fire once after timeout period
        this.batchTimer.Change(
            TimeSpan.FromSeconds(this.Settings.BatchTimeoutSeconds),
            Timeout.InfiniteTimeSpan
        );
    }

    private void StopBatchTimer()
    {
        this.batchTimer?.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
    }

    private async Task ProcessBatchFromTimer()
    {
        await this.ProcessBatch();
    }

    private async Task ProcessBatch()
    {
        if (this.isProcessing || this.messageQueue.IsEmpty)
        {
            return;
        }

        this.isProcessing = true;
        try
        {
            var batch = new List<IFlowMessage>();

            // Dequeue up to BatchSize messages
            while (batch.Count < this.Settings.BatchSize &&
                   this.messageQueue.TryDequeue(out var msg))
            {
                batch.Add(msg);
            }

            if (batch.Any())
            {
                Information("Processing batch of {Count} messages", batch.Count);

                // Extract only needed data for bulk operation
                var ids = batch.Select(m => m.Get<string>("id")).ToList();
                
                // Process batch in single external call
                var result = await this.ProcessBulkAsync(ids);

                // Send batch result
                var batchMsg = new FlowMessage();
                batchMsg.Set("batchSize", batch.Count);
                batchMsg.Set("successCount", result.SuccessCount);
                batchMsg.Set("failedCount", result.FailedCount);
                batchMsg.Set("processedAt", DateTime.UtcNow);
                
                await this.Next(batchMsg);

                // Restart timer if messages remain
                if (!this.messageQueue.IsEmpty)
                {
                    this.StartBatchTimer();
                }
            }
        }
        catch (Exception ex)
        {
            Error(ex, "Batch processing failed");
            this.SetStatus(Status.Warning, "Batch processing failed");
        }
        finally
        {
            this.isProcessing = false;
        }
    }

    private async Task<BatchResult> ProcessBulkAsync(List<string> ids)
    {
        // Simulate bulk API call
        await Task.Delay(100);
        return new BatchResult { SuccessCount = ids.Count, FailedCount = 0 };
    }

    public override async Task Stop()
    {
        this.StopBatchTimer();
        this.batchTimer?.Dispose();

        // Process remaining messages
        if (!this.messageQueue.IsEmpty)
        {
            await this.ProcessBatch();
        }

        this.semaphore?.Dispose();
        await base.Stop();
    }
}

public class BatchProcessorModuleSettings : FlowModuleSettings
{
    [Display(Name = "Batch Size")]
    [Description("Maximum number of messages to process in a batch")]
    [Range(1, 1000)]
    [DefaultValue(100)]
    public int BatchSize { get; set; } = 100;

    [Display(Name = "Batch Timeout (seconds)")]
    [Description("Maximum time to wait before processing incomplete batch")]
    [Range(1, 300)]
    [DefaultValue(5)]
    public int BatchTimeoutSeconds { get; set; } = 5;
}

public class BatchResult
{
    public int SuccessCount { get; set; }
    public int FailedCount { get; set; }
}

Key Design Points

  • One-Shot Timer: Fires exactly once per batch period, no unnecessary checks
  • Smart Reset: Timer restarts only when messages remain after processing
  • Efficient Triggers: Size threshold stops timer and processes immediately
  • ConcurrentQueue: Lock-free thread-safe operations
  • Minimal Processing: Extract only required fields for bulk operations
  • SemaphoreSlim: Prevents race conditions when starting/stopping timer

Thread-Safe Collections

Use Concurrent Collections for Shared State

Wrong - Not thread-safe:

private readonly List<IFlowMessage> messageBuffer = new();
private readonly object lockObject = new();

protected override async Task MessageReceived(IFlowMessage message)
{
    lock (this.lockObject)
    {
        this.messageBuffer.Add(message);
    }
    
    await this.Next(message);
}

Correct - Use concurrent collections:

private readonly ConcurrentQueue<IFlowMessage> messageBuffer = new();

protected override async Task MessageReceived(IFlowMessage message)
{
    // Lock-free, thread-safe enqueue
    this.messageBuffer.Enqueue(message);
    
    await this.Next(message);
}

Available Concurrent Collections

// Queue: First-in, first-out
private readonly ConcurrentQueue<T> queue = new();

// Stack: Last-in, first-out
private readonly ConcurrentStack<T> stack = new();

// Dictionary: Thread-safe key-value pairs
private readonly ConcurrentDictionary<TKey, TValue> dictionary = new();

// Bag: Unordered collection
private readonly ConcurrentBag<T> bag = new();

Memory Management

Dispose Resources Properly

Implement IDisposable for modules with unmanaged resources:

public class DatabaseModule : FlowModule<DatabaseModuleSettings>, IDisposable
{
    private SqlConnection connection;
    private Timer heartbeatTimer;
    private bool disposed = false;

    public override async Task<IError?> Initialize()
    {
        this.connection = new SqlConnection(this.Settings.ConnectionString);
        await this.connection.OpenAsync();
        
        this.heartbeatTimer = new Timer(
            _ => this.CheckConnection(),
            null,
            TimeSpan.FromMinutes(1),
            TimeSpan.FromMinutes(1)
        );
        
        return await base.Initialize();
    }

    protected override async Task MessageReceived(IFlowMessage message)
    {
        // Use connection...
        await this.Next(message);
    }

    public override async Task Stop()
    {
        // Dispose resources
        this.heartbeatTimer?.Dispose();
        this.connection?.Close();
        this.connection?.Dispose();
        
        await base.Stop();
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!this.disposed)
        {
            if (disposing)
            {
                // Dispose managed resources
                this.heartbeatTimer?.Dispose();
                this.connection?.Dispose();
            }
            
            this.disposed = true;
        }
    }

    public void Dispose()
    {
        this.Dispose(true);
        GC.SuppressFinalize(this);
    }
}

Avoid Memory Leaks

Common memory leak patterns to avoid:

// ❌ WRONG - Unbounded growth
private readonly List<IFlowMessage> allMessages = new();

protected override async Task MessageReceived(IFlowMessage message)
{
    this.allMessages.Add(message); // Grows forever!
    await this.Next(message);
}

// ✅ CORRECT - Bounded collection
private readonly Queue<IFlowMessage> recentMessages = new();
private const int MaxBufferSize = 1000;

protected override async Task MessageReceived(IFlowMessage message)
{
    this.recentMessages.Enqueue(message);
    
    // Maintain size limit
    while (this.recentMessages.Count > MaxBufferSize)
    {
        this.recentMessages.Dequeue();
    }
    
    await this.Next(message);
}

Unsubscribe from Events

// ❌ WRONG - Event handler leak
public override async Task<IError?> Start()
{
    ExternalService.OnDataReceived += this.HandleData; // Never unsubscribed!
    return await base.Start();
}

// ✅ CORRECT - Proper cleanup
public override async Task<IError?> Start()
{
    ExternalService.OnDataReceived += this.HandleData;
    return await base.Start();
}

public override async Task Stop()
{
    ExternalService.OnDataReceived -= this.HandleData;
    await base.Stop();
}

Connection Pooling

Database Connection Pooling

Most ADO.NET providers handle connection pooling automatically:

public class DatabaseModule : FlowModule<DatabaseModuleSettings>
{
    private string connectionString;

    public override async Task<IError?> Initialize()
    {
        // Connection string with pooling enabled (default for most providers)
        this.connectionString = this.Settings.ConnectionString;
        
        // Test connection
        using var connection = new SqlConnection(this.connectionString);
        await connection.OpenAsync();
        
        return await base.Initialize();
    }

    protected override async Task MessageReceived(IFlowMessage message)
    {
        // Create connection per message - pooling handles efficiency
        using var connection = new SqlConnection(this.connectionString);
        await connection.OpenAsync();
        
        // Use connection
        using var command = connection.CreateCommand();
        command.CommandText = "SELECT * FROM table WHERE id = @id";
        command.Parameters.AddWithValue("@id", message.Get<int>("id"));
        
        using var reader = await command.ExecuteReaderAsync();
        // Process results...
        
        await this.Next(message);
        
        // Connection automatically returned to pool when disposed
    }
}

HttpClient Reuse

// ✅ CORRECT - Single HttpClient instance
private HttpClient httpClient;

public override async Task<IError?> Initialize()
{
    this.httpClient = new HttpClient
    {
        BaseAddress = new Uri(this.Settings.ApiBaseUrl),
        Timeout = TimeSpan.FromSeconds(30)
    };
    
    // HttpClient handles connection pooling internally
    return await base.Initialize();
}

protected override async Task MessageReceived(IFlowMessage message)
{
    // Reuse same HttpClient - it handles connection pooling
    var response = await this.httpClient.GetAsync("/endpoint");
    await this.Next(message);
}

Performance Monitoring

Built-in Timing

private readonly Stopwatch processingTimer = new();

protected override async Task MessageReceived(IFlowMessage message)
{
    this.processingTimer.Restart();
    
    try
    {
        await this.ProcessMessage(message);
    }
    finally
    {
        this.processingTimer.Stop();
        
        // Log slow operations
        if (this.processingTimer.ElapsedMilliseconds > 1000)
        {
            Warning("Slow processing: {ElapsedMs}ms for message", 
                this.processingTimer.ElapsedMilliseconds);
        }

        await this.Next(message);
    }
}

Message Rate Tracking

private int messageCount;
private DateTime lastResetTime = DateTime.UtcNow;

protected override async Task MessageReceived(IFlowMessage message)
{
    Interlocked.Increment(ref this.messageCount);
    
    var elapsed = DateTime.UtcNow - this.lastResetTime;
    if (elapsed.TotalSeconds >= 60)
    {
        var rate = this.messageCount / elapsed.TotalSeconds;
        Information("Processing rate: {Rate:F2} messages/second", rate);
        
        this.messageCount = 0;
        this.lastResetTime = DateTime.UtcNow;
    }
    
    await this.ProcessMessage(message);
    await this.Next(message);
}

Performance Checklist

  • [ ] Resources initialized once, not per message
  • [ ] HttpClient/database connections reused
  • [ ] Async/await used correctly (no blocking)
  • [ ] CPU-intensive work offloaded with Task.Run
  • [ ] Concurrent collections used for shared state
  • [ ] Resources disposed properly in Stop()
  • [ ] Event handlers unsubscribed
  • [ ] Collections have size limits
  • [ ] Batching used for bulk operations
  • [ ] Timers disposed correctly

Common Performance Mistakes

1. Creating Resources Per Message

// ❌ Creates 1000 HttpClients for 1000 messages!
protected override async Task MessageReceived(IFlowMessage message)
{
    var client = new HttpClient(); // Don't do this!
    await client.GetAsync("...");
    client.Dispose();
}

2. Blocking on Async Operations

// ❌ Blocks thread pool thread
protected override async Task MessageReceived(IFlowMessage message)
{
    var result = SomeAsyncOperation().Result; // Blocking!
}

3. Unbounded Collections

// ❌ Grows without limit
private readonly List<IFlowMessage> buffer = new();

protected override async Task MessageReceived(IFlowMessage message)
{
    this.buffer.Add(message); // Memory leak!
}

4. Unnecessary Synchronization

// ❌ Lock on every message when concurrent collection would work
private readonly List<int> values = new();
private readonly object lockObj = new();

protected override async Task MessageReceived(IFlowMessage message)
{
    lock (this.lockObj) // Unnecessary lock!
    {
        this.values.Add(message.Get<int>("value"));
    }
}

// ✅ Use concurrent collection instead
private readonly ConcurrentBag<int> values = new();

protected override async Task MessageReceived(IFlowMessage message)
{
    this.values.Add(message.Get<int>("value")); // Lock-free!
}

Summary

Key principles for high-performance modules:

  1. Initialize once - Create resources during Initialize(), not per message
  2. Don't block - Use async/await properly, never use .Result or .Wait()
  3. Batch efficiently - Use one-shot timers and concurrent collections
  4. Manage memory - Dispose resources, limit collection sizes, unsubscribe from events
  5. Monitor performance - Track processing times and message rates
  6. Use thread-safe collections - ConcurrentQueue, ConcurrentDictionary when needed
  7. Pool connections - Reuse HttpClient and rely on connection pooling

Remember: Performance issues in your module can affect the entire flow. Design for efficiency from the start.

>> External Integrations