Table of Contents

External Integrations

This guide covers how to integrate your modules with external services, manage credentials securely, and access resources.

HTTP/REST API Integration

Basic Setup

public class RestApiModule : FlowModule<RestApiModuleSettings>
{
    private HttpClient httpClient;
    
    public override string UserFriendlyName => "REST API Client";

    public RestApiModule() : base(FlowModuleType.Output) { }

    public override async Task<IError?> Initialize()
    {
        // Get API credentials
        if (!this.Settings.ApiCredential.HasValue)
        {
            return new Error("API credential not configured");
        }

        var apiCredential = await this.GetCredentialContentAsync<CredentialWithApiKey>(
            this.Settings.ApiCredential.Value);
        
        if (apiCredential.IsError || apiCredential.Value is null)
        {
            return apiCredential.Error ?? new Error("Failed to get API credential");
        }

        // Configure HTTP client
        this.httpClient = new HttpClient
        {
            BaseAddress = new Uri(this.Settings.ApiBaseUrl),
            Timeout = TimeSpan.FromSeconds(this.Settings.TimeoutSeconds)
        };
        
        this.httpClient.DefaultRequestHeaders.Add("Authorization", 
            $"Bearer {apiCredential.Value.ApiKey}");
        this.httpClient.DefaultRequestHeaders.Add("User-Agent", 
            "CrosserModule/1.0");

        // Test connection
        try
        {
            var response = await this.httpClient.GetAsync("/health");
            if (!response.IsSuccessStatusCode)
            {
                return new Error($"API health check failed: {response.StatusCode}");
            }
            
            Information("Successfully connected to API at {BaseUrl}", 
                this.Settings.ApiBaseUrl);
        }
        catch (HttpRequestException ex)
        {
            return new Error($"Failed to connect to API: {ex.Message}");
        }

        return await base.Initialize();
    }
    
    protected override async Task MessageReceived(IFlowMessage message)
    {
        try
        {
            var endpoint = message.Get<string>("endpoint") ?? "/data";
            var method = message.Get<string>("method") ?? "GET";
            var body = message.Get<object>("body");

            HttpResponseMessage response;

            switch (method.ToUpper())
            {
                case "GET":
                    response = await this.httpClient.GetAsync(endpoint);
                    break;

                case "POST":
                    var content = new StringContent(
                        JsonSerializer.Serialize(body),
                        Encoding.UTF8,
                        "application/json"
                    );
                    response = await this.httpClient.PostAsync(endpoint, content);
                    break;

                case "PUT":
                    content = new StringContent(
                        JsonSerializer.Serialize(body),
                        Encoding.UTF8,
                        "application/json"
                    );
                    response = await this.httpClient.PutAsync(endpoint, content);
                    break;

                case "DELETE":
                    response = await this.httpClient.DeleteAsync(endpoint);
                    break;

                default:
                    var error = $"Unsupported HTTP method: {method}";
                    this.SetStatus(Status.Warning, error);
                    message.SetError(error);
                    return;
            }

            if (response.IsSuccessStatusCode)
            {
                var responseData = await response.Content.ReadAsStringAsync();
                message.Set("response", responseData);
                message.Set("statusCode", (int)response.StatusCode);
                message.Set("success", true);
            }
            else
            {
                var error = $"API request failed: {response.StatusCode}";
                this.SetStatus(Status.Warning, error);
                message.SetError(error);
                message.Set("statusCode", (int)response.StatusCode);
            }
        }
        catch (HttpRequestException ex)
        {
            Error(ex, "HTTP request failed");
            var error = $"HTTP request failed: {ex.Message}";
            this.SetStatus(Status.Warning, error);
            message.SetError(error);
        }
        catch (Exception ex)
        {
            Error(ex, "Unexpected error");
            this.SetStatus(Status.Warning, ex.Message);
            message.SetError(ex.Message);
        }
        finally
        {
            // Always forward the message
            await this.Next(message);
        }
    }

    public override async Task Stop()
    {
        this.httpClient?.Dispose();
        await base.Stop();
    }
}

public class RestApiModuleSettings : FlowModuleSettings
{
    [Required]
    [Display(Name = "API Base URL")]
    [Description("Base URL of the REST API")]
    public string ApiBaseUrl { get; set; }

    [JsonSchemaExtensionData("x-credential", "ApiKey")]
    [Display(Name = "API Key")]
    [Description("API key for authentication")]
    public Guid? ApiCredential { get; set; }

    [Range(1, 300)]
    [Display(Name = "Timeout (seconds)")]
    [DefaultValue(30)]
    public int TimeoutSeconds { get; set; } = 30;

    public override void Validate(SettingsValidator validator)
    {
        if (!string.IsNullOrWhiteSpace(this.ApiBaseUrl))
        {
            if (!Uri.TryCreate(this.ApiBaseUrl, UriKind.Absolute, out var uri))
            {
                validator.AddError(nameof(this.ApiBaseUrl), 
                    "API Base URL must be a valid URL");
            }
        }

        base.Validate(validator);
    }
}

Database Integration

SQL Server Example

public class DatabaseModule : FlowModule<DatabaseModuleSettings>
{
    private string connectionString;
    
    public override string UserFriendlyName => "Database Writer";

    public DatabaseModule() : base(FlowModuleType.Output) { }

    public override async Task<IError?> Initialize()
    {
        // Get database credentials
        if (!this.Settings.DbCredential.HasValue)
        {
            return new Error("Database credential not configured");
        }

        var credentials = await this.GetCredentialContentAsync<CredentialWithConnectionString>(
            this.Settings.DbCredential.Value);
        
        if (credentials.IsError || credentials.Value is null)
        {
            return credentials.Error ?? new Error("Failed to get database credential");
        }

        this.connectionString = credentials.Value.ConnectionString;

        // Test connection
        try
        {
            using var connection = new SqlConnection(this.connectionString);
            await connection.OpenAsync();
            
            Information("Successfully connected to database");
        }
        catch (SqlException ex)
        {
            return new Error($"Database connection failed: {ex.Message}");
        }

        return await base.Initialize();
    }
    
    protected override async Task MessageReceived(IFlowMessage message)
    {
        try
        {
            // Create new connection per message (uses connection pooling)
            using var connection = new SqlConnection(this.connectionString);
            await connection.OpenAsync();

            var tableName = this.Settings.TableName;
            var data = message.Get<Dictionary<string, object>>("data");

            if (data == null || !data.Any())
            {
                var error = "No data to insert";
                this.SetStatus(Status.Warning, error);
                message.SetError(error);
                return;
            }

            // Build parameterized INSERT query
            var columns = string.Join(", ", data.Keys);
            var parameters = string.Join(", ", data.Keys.Select(k => $"@{k}"));
            var query = $"INSERT INTO {tableName} ({columns}) VALUES ({parameters})";

            using var command = connection.CreateCommand();
            command.CommandText = query;
            command.CommandTimeout = this.Settings.QueryTimeout;

            // Add parameters safely
            foreach (var kvp in data)
            {
                var param = command.CreateParameter();
                param.ParameterName = $"@{kvp.Key}";
                param.Value = kvp.Value ?? DBNull.Value;
                command.Parameters.Add(param);
            }

            // Execute query
            var rowsAffected = await command.ExecuteNonQueryAsync();
            
            message.Set("rowsAffected", rowsAffected);
            message.Set("success", true);
            message.Set("insertedAt", DateTime.UtcNow);
        }
        catch (SqlException ex)
        {
            Error(ex, "Database operation failed");
            var error = $"Database operation failed: {ex.Message}";
            this.SetStatus(Status.Warning, error);
            message.SetError(error);
        }
        catch (Exception ex)
        {
            Error(ex, "Unexpected error");
            this.SetStatus(Status.Warning, ex.Message);
            message.SetError(ex.Message);
        }
        finally
        {
            // Always forward the message
            await this.Next(message);
        }
    }
}

public class DatabaseModuleSettings : FlowModuleSettings
{
    [JsonSchemaExtensionData("x-credential", "ConnectionString")]
    [Display(Name = "Database Credentials")]
    [Description("Database connection string")]
    public Guid? DbCredential { get; set; }

    [Required]
    [Display(Name = "Table Name")]
    [Description("Name of the database table")]
    public string TableName { get; set; }

    [Range(1, 300)]
    [Display(Name = "Query Timeout (seconds)")]
    [DefaultValue(30)]
    public int QueryTimeout { get; set; } = 30;
}

Multiple Database Providers

public class MultiDbModule : FlowModule<MultiDbModuleSettings>
{
    private IDbConnection connection;
    
    public override async Task<IError?> Initialize()
    {
        var credentials = await this.GetCredentialContentAsync<CredentialWithConnectionString>(
            this.Settings.DbCredential.Value);
        
        if (credentials.IsError || credentials.Value is null)
        {
            return credentials.Error ?? new Error("Failed to get database credential");
        }

        var connString = credentials.Value.ConnectionString;

        // Create connection based on provider
        this.connection = this.Settings.DatabaseProvider.ToLower() switch
        {
            "sqlserver" => new SqlConnection(connString),
            "postgresql" => new NpgsqlConnection(connString),
            "mysql" => new MySqlConnection(connString),
            _ => null
        };

        if (this.connection == null)
        {
            return new Error($"Unsupported database provider: {this.Settings.DatabaseProvider}");
        }

        // Test connection
        try
        {
            await ((DbConnection)this.connection).OpenAsync();
            Information("Successfully connected to {Provider} database", 
                this.Settings.DatabaseProvider);
        }
        catch (DbException ex)
        {
            return new Error($"Database connection failed: {ex.Message}");
        }

        return await base.Initialize();
    }

    public override async Task Stop()
    {
        this.connection?.Close();
        this.connection?.Dispose();
        await base.Stop();
    }
}

public class MultiDbModuleSettings : FlowModuleSettings
{
    [Required]
    [Display(Name = "Database Provider")]
    [Description("Type of database")]
    [DefaultValue("SqlServer")]
    public string DatabaseProvider { get; set; } = "SqlServer";

    [JsonSchemaExtensionData("x-credential", "ConnectionString")]
    [Display(Name = "Database Credentials")]
    public Guid? DbCredential { get; set; }
}

Message Queue Integration

RabbitMQ Example

public class RabbitMqModule : FlowModule<RabbitMqModuleSettings>
{
    private IConnection connection;
    private IModel channel;
    
    public override string UserFriendlyName => "RabbitMQ Publisher";

    public RabbitMqModule() : base(FlowModuleType.Output) { }

    public override async Task<IError?> Initialize()
    {
        // Get credentials
        if (!this.Settings.QueueCredential.HasValue)
        {
            return new Error("Queue credential not configured");
        }

        var credentials = await this.GetCredentialContentAsync<CredentialWithUsernamePassword>(
            this.Settings.QueueCredential.Value);
        
        if (credentials.IsError || credentials.Value is null)
        {
            return credentials.Error ?? new Error("Failed to get queue credential");
        }

        // Setup connection
        var factory = new ConnectionFactory
        {
            HostName = this.Settings.Host,
            Port = this.Settings.Port,
            UserName = credentials.Value.Username,
            Password = credentials.Value.Password,
            VirtualHost = this.Settings.VirtualHost,
            RequestedHeartbeat = TimeSpan.FromSeconds(60),
            NetworkRecoveryInterval = TimeSpan.FromSeconds(10),
            AutomaticRecoveryEnabled = true
        };

        try
        {
            this.connection = factory.CreateConnection($"{this.UserFriendlyName}-{this.Id}");
            this.channel = this.connection.CreateModel();

            // Declare queue
            this.channel.QueueDeclare(
                queue: this.Settings.QueueName,
                durable: this.Settings.DurableQueue,
                exclusive: false,
                autoDelete: false,
                arguments: null
            );

            Information("Connected to RabbitMQ queue {Queue} at {Host}:{Port}", 
                this.Settings.QueueName, this.Settings.Host, this.Settings.Port);
        }
        catch (Exception ex)
        {
            return new Error($"Failed to connect to RabbitMQ: {ex.Message}");
        }

        return await base.Initialize();
    }
    
    protected override async Task MessageReceived(IFlowMessage message)
    {
        try
        {
            var messageBody = message.ToJSON();
            var body = Encoding.UTF8.GetBytes(messageBody);

            var properties = this.channel.CreateBasicProperties();
            properties.Persistent = this.Settings.PersistentMessages;
            properties.ContentType = "application/json";
            properties.MessageId = Guid.NewGuid().ToString();
            properties.Timestamp = new AmqpTimestamp(
                DateTimeOffset.UtcNow.ToUnixTimeSeconds());

            // Optional routing key from message
            var routingKey = message.Get<string>("routingKey") 
                ?? this.Settings.RoutingKey 
                ?? this.Settings.QueueName;

            this.channel.BasicPublish(
                exchange: this.Settings.Exchange ?? "",
                routingKey: routingKey,
                basicProperties: properties,
                body: body
            );

            message.Set("published", true);
            message.Set("messageId", properties.MessageId);
            message.Set("queue", this.Settings.QueueName);
        }
        catch (Exception ex)
        {
            Error(ex, "Failed to publish message to RabbitMQ");
            var error = $"Publish failed: {ex.Message}";
            this.SetStatus(Status.Warning, error);
            message.SetError(error);
        }
        finally
        {
            // Always forward the message
            await this.Next(message);
        }
    }

    public override async Task Stop()
    {
        try
        {
            this.channel?.Close();
            this.channel?.Dispose();
            this.connection?.Close();
            this.connection?.Dispose();
            
            Information("Disconnected from RabbitMQ");
        }
        catch (Exception ex)
        {
            Warning(ex, "Error closing RabbitMQ connection");
        }
        
        await base.Stop();
    }
}

public class RabbitMqModuleSettings : FlowModuleSettings
{
    [Required]
    [Display(Name = "Host")]
    [Description("RabbitMQ server hostname")]
    public string Host { get; set; }

    [Range(1, 65535)]
    [Display(Name = "Port")]
    [DefaultValue(5672)]
    public int Port { get; set; } = 5672;

    [Display(Name = "Virtual Host")]
    [DefaultValue("/")]
    public string VirtualHost { get; set; } = "/";

    [Required]
    [Display(Name = "Queue Name")]
    [Description("Name of the queue to publish to")]
    public string QueueName { get; set; }

    [Display(Name = "Exchange")]
    [Description("Optional exchange name (leave empty for default)")]
    public string Exchange { get; set; }

    [Display(Name = "Routing Key")]
    [Description("Optional routing key (defaults to queue name)")]
    public string RoutingKey { get; set; }

    [Display(Name = "Durable Queue")]
    [Description("Queue survives broker restart")]
    [DefaultValue(true)]
    public bool DurableQueue { get; set; } = true;

    [Display(Name = "Persistent Messages")]
    [Description("Messages survive broker restart")]
    [DefaultValue(true)]
    public bool PersistentMessages { get; set; } = true;

    [JsonSchemaExtensionData("x-credential", "UsernamePassword")]
    [Display(Name = "Queue Credentials")]
    public Guid? QueueCredential { get; set; }
}

Resource File Handling

Text File Resources

public override async Task<IError?> Initialize()
{
    if (!this.Settings.ConfigFile.HasValue)
    {
        return new Error("Configuration file not specified");
    }

    // Get resource as string (for text files like JSON, XML, CSV)
    var resource = await this.GetResourceContentAsync<string>(
        this.Settings.ConfigFile.Value);
    
    if (!resource.IsSuccess)
    {
        return new Error("Failed to load configuration file");
    }
    
    var fileContent = resource.Result;
    
    if (string.IsNullOrEmpty(fileContent))
    {
        return new Error("Configuration file is empty");
    }

    // Parse based on format
    try
    {
        if (this.IsJson(fileContent))
        {
            this.configuration = JsonSerializer.Deserialize<ModuleConfiguration>(fileContent);
        }
        else if (this.IsXml(fileContent))
        {
            var doc = new XmlDocument();
            doc.LoadXml(fileContent);
            this.ProcessXmlDocument(doc);
        }
        else
        {
            return new Error("Unsupported file format");
        }
    }
    catch (Exception ex)
    {
        return new Error($"Failed to parse configuration file: {ex.Message}");
    }

    return await base.Initialize();
}

private bool IsJson(string content)
{
    content = content.Trim();
    return (content.StartsWith("{") && content.EndsWith("}")) ||
           (content.StartsWith("[") && content.EndsWith("]"));
}

private bool IsXml(string content)
{
    content = content.Trim();
    return content.StartsWith("<?xml") || content.StartsWith("<");
}

Binary File Resources

public override async Task<IError?> Initialize()
{
    if (!this.Settings.DataFile.HasValue)
    {
        return new Error("Data file not specified");
    }

    // Get resource as byte array (for binary files)
    var resource = await this.GetResourceContentAsync<byte[]>(
        this.Settings.DataFile.Value);
    
    if (!resource.IsSuccess)
    {
        return new Error("Failed to load data file");
    }
    
    var resourceBytes = resource.Result;
    
    if (resourceBytes == null || resourceBytes.Length == 0)
    {
        return new Error("Data file is empty");
    }

    // Process binary data
    try
    {
        // Example: Save to temp file for processing
        var tempPath = Path.Combine(Path.GetTempPath(), 
            $"module_{Guid.NewGuid()}.dat");
        await File.WriteAllBytesAsync(tempPath, resourceBytes);
        
        // Process the file...
        this.ProcessBinaryFile(tempPath);
        
        // Clean up
        File.Delete(tempPath);
    }
    catch (Exception ex)
    {
        return new Error($"Failed to process data file: {ex.Message}");
    }

    return await base.Initialize();
}

Integration Best Practices

1. Connection Health Checks

public override async Task<IError?> Start()
{
    // Start background health check
    this.healthCheckTimer = new Timer(
        async _ => await this.CheckConnectionHealth(),
        null,
        TimeSpan.FromMinutes(1),
        TimeSpan.FromMinutes(1)
    );
    
    return await base.Start();
}

private async Task CheckConnectionHealth()
{
    try
    {
        var response = await this.httpClient.GetAsync("/health");
        if (!response.IsSuccessStatusCode)
        {
            Warning("Health check failed: {StatusCode}", response.StatusCode);
            this.SetStatus(Status.Warning, "External service health check failed");
        }
    }
    catch (Exception ex)
    {
        Warning(ex, "Health check failed");
        this.SetStatus(Status.Warning, "Cannot reach external service");
    }
}

2. Retry Logic

See Error Handling for complete retry strategies.

3. Timeout Configuration

Always configure timeouts for external calls:

this.httpClient.Timeout = TimeSpan.FromSeconds(this.Settings.TimeoutSeconds);

command.CommandTimeout = this.Settings.QueryTimeout;

4. Secure Credential Handling

See Security Best Practices for complete guidance.

Summary

Key principles for external integrations:

  1. Always use credentials - Never hardcode sensitive information
  2. Test connections - Validate connectivity during Initialize()
  3. Handle errors gracefully - Always forward messages, even on failure
  4. Configure timeouts - Prevent hanging operations
  5. Reuse connections - Initialize clients once, use many times
  6. Monitor health - Implement health checks for long-running connections
  7. Log appropriately - Log operations but never log credentials

For more details on specific topics:

>> Logging