External Integrations
This guide covers how to integrate your modules with external services, manage credentials securely, and access resources.
HTTP/REST API Integration
Basic Setup
public class RestApiModule : FlowModule<RestApiModuleSettings>
{
private HttpClient httpClient;
public override string UserFriendlyName => "REST API Client";
public RestApiModule() : base(FlowModuleType.Output) { }
public override async Task<IError?> Initialize()
{
// Get API credentials
if (!this.Settings.ApiCredential.HasValue)
{
return new Error("API credential not configured");
}
var apiCredential = await this.GetCredentialContentAsync<CredentialWithApiKey>(
this.Settings.ApiCredential.Value);
if (apiCredential.IsError || apiCredential.Value is null)
{
return apiCredential.Error ?? new Error("Failed to get API credential");
}
// Configure HTTP client
this.httpClient = new HttpClient
{
BaseAddress = new Uri(this.Settings.ApiBaseUrl),
Timeout = TimeSpan.FromSeconds(this.Settings.TimeoutSeconds)
};
this.httpClient.DefaultRequestHeaders.Add("Authorization",
$"Bearer {apiCredential.Value.ApiKey}");
this.httpClient.DefaultRequestHeaders.Add("User-Agent",
"CrosserModule/1.0");
// Test connection
try
{
var response = await this.httpClient.GetAsync("/health");
if (!response.IsSuccessStatusCode)
{
return new Error($"API health check failed: {response.StatusCode}");
}
Information("Successfully connected to API at {BaseUrl}",
this.Settings.ApiBaseUrl);
}
catch (HttpRequestException ex)
{
return new Error($"Failed to connect to API: {ex.Message}");
}
return await base.Initialize();
}
protected override async Task MessageReceived(IFlowMessage message)
{
try
{
var endpoint = message.Get<string>("endpoint") ?? "/data";
var method = message.Get<string>("method") ?? "GET";
var body = message.Get<object>("body");
HttpResponseMessage response;
switch (method.ToUpper())
{
case "GET":
response = await this.httpClient.GetAsync(endpoint);
break;
case "POST":
var content = new StringContent(
JsonSerializer.Serialize(body),
Encoding.UTF8,
"application/json"
);
response = await this.httpClient.PostAsync(endpoint, content);
break;
case "PUT":
content = new StringContent(
JsonSerializer.Serialize(body),
Encoding.UTF8,
"application/json"
);
response = await this.httpClient.PutAsync(endpoint, content);
break;
case "DELETE":
response = await this.httpClient.DeleteAsync(endpoint);
break;
default:
var error = $"Unsupported HTTP method: {method}";
this.SetStatus(Status.Warning, error);
message.SetError(error);
return;
}
if (response.IsSuccessStatusCode)
{
var responseData = await response.Content.ReadAsStringAsync();
message.Set("response", responseData);
message.Set("statusCode", (int)response.StatusCode);
message.Set("success", true);
}
else
{
var error = $"API request failed: {response.StatusCode}";
this.SetStatus(Status.Warning, error);
message.SetError(error);
message.Set("statusCode", (int)response.StatusCode);
}
}
catch (HttpRequestException ex)
{
Error(ex, "HTTP request failed");
var error = $"HTTP request failed: {ex.Message}";
this.SetStatus(Status.Warning, error);
message.SetError(error);
}
catch (Exception ex)
{
Error(ex, "Unexpected error");
this.SetStatus(Status.Warning, ex.Message);
message.SetError(ex.Message);
}
finally
{
// Always forward the message
await this.Next(message);
}
}
public override async Task Stop()
{
this.httpClient?.Dispose();
await base.Stop();
}
}
public class RestApiModuleSettings : FlowModuleSettings
{
[Required]
[Display(Name = "API Base URL")]
[Description("Base URL of the REST API")]
public string ApiBaseUrl { get; set; }
[JsonSchemaExtensionData("x-credential", "ApiKey")]
[Display(Name = "API Key")]
[Description("API key for authentication")]
public Guid? ApiCredential { get; set; }
[Range(1, 300)]
[Display(Name = "Timeout (seconds)")]
[DefaultValue(30)]
public int TimeoutSeconds { get; set; } = 30;
public override void Validate(SettingsValidator validator)
{
if (!string.IsNullOrWhiteSpace(this.ApiBaseUrl))
{
if (!Uri.TryCreate(this.ApiBaseUrl, UriKind.Absolute, out var uri))
{
validator.AddError(nameof(this.ApiBaseUrl),
"API Base URL must be a valid URL");
}
}
base.Validate(validator);
}
}
Database Integration
SQL Server Example
public class DatabaseModule : FlowModule<DatabaseModuleSettings>
{
private string connectionString;
public override string UserFriendlyName => "Database Writer";
public DatabaseModule() : base(FlowModuleType.Output) { }
public override async Task<IError?> Initialize()
{
// Get database credentials
if (!this.Settings.DbCredential.HasValue)
{
return new Error("Database credential not configured");
}
var credentials = await this.GetCredentialContentAsync<CredentialWithConnectionString>(
this.Settings.DbCredential.Value);
if (credentials.IsError || credentials.Value is null)
{
return credentials.Error ?? new Error("Failed to get database credential");
}
this.connectionString = credentials.Value.ConnectionString;
// Test connection
try
{
using var connection = new SqlConnection(this.connectionString);
await connection.OpenAsync();
Information("Successfully connected to database");
}
catch (SqlException ex)
{
return new Error($"Database connection failed: {ex.Message}");
}
return await base.Initialize();
}
protected override async Task MessageReceived(IFlowMessage message)
{
try
{
// Create new connection per message (uses connection pooling)
using var connection = new SqlConnection(this.connectionString);
await connection.OpenAsync();
var tableName = this.Settings.TableName;
var data = message.Get<Dictionary<string, object>>("data");
if (data == null || !data.Any())
{
var error = "No data to insert";
this.SetStatus(Status.Warning, error);
message.SetError(error);
return;
}
// Build parameterized INSERT query
var columns = string.Join(", ", data.Keys);
var parameters = string.Join(", ", data.Keys.Select(k => $"@{k}"));
var query = $"INSERT INTO {tableName} ({columns}) VALUES ({parameters})";
using var command = connection.CreateCommand();
command.CommandText = query;
command.CommandTimeout = this.Settings.QueryTimeout;
// Add parameters safely
foreach (var kvp in data)
{
var param = command.CreateParameter();
param.ParameterName = $"@{kvp.Key}";
param.Value = kvp.Value ?? DBNull.Value;
command.Parameters.Add(param);
}
// Execute query
var rowsAffected = await command.ExecuteNonQueryAsync();
message.Set("rowsAffected", rowsAffected);
message.Set("success", true);
message.Set("insertedAt", DateTime.UtcNow);
}
catch (SqlException ex)
{
Error(ex, "Database operation failed");
var error = $"Database operation failed: {ex.Message}";
this.SetStatus(Status.Warning, error);
message.SetError(error);
}
catch (Exception ex)
{
Error(ex, "Unexpected error");
this.SetStatus(Status.Warning, ex.Message);
message.SetError(ex.Message);
}
finally
{
// Always forward the message
await this.Next(message);
}
}
}
public class DatabaseModuleSettings : FlowModuleSettings
{
[JsonSchemaExtensionData("x-credential", "ConnectionString")]
[Display(Name = "Database Credentials")]
[Description("Database connection string")]
public Guid? DbCredential { get; set; }
[Required]
[Display(Name = "Table Name")]
[Description("Name of the database table")]
public string TableName { get; set; }
[Range(1, 300)]
[Display(Name = "Query Timeout (seconds)")]
[DefaultValue(30)]
public int QueryTimeout { get; set; } = 30;
}
Multiple Database Providers
public class MultiDbModule : FlowModule<MultiDbModuleSettings>
{
private IDbConnection connection;
public override async Task<IError?> Initialize()
{
var credentials = await this.GetCredentialContentAsync<CredentialWithConnectionString>(
this.Settings.DbCredential.Value);
if (credentials.IsError || credentials.Value is null)
{
return credentials.Error ?? new Error("Failed to get database credential");
}
var connString = credentials.Value.ConnectionString;
// Create connection based on provider
this.connection = this.Settings.DatabaseProvider.ToLower() switch
{
"sqlserver" => new SqlConnection(connString),
"postgresql" => new NpgsqlConnection(connString),
"mysql" => new MySqlConnection(connString),
_ => null
};
if (this.connection == null)
{
return new Error($"Unsupported database provider: {this.Settings.DatabaseProvider}");
}
// Test connection
try
{
await ((DbConnection)this.connection).OpenAsync();
Information("Successfully connected to {Provider} database",
this.Settings.DatabaseProvider);
}
catch (DbException ex)
{
return new Error($"Database connection failed: {ex.Message}");
}
return await base.Initialize();
}
public override async Task Stop()
{
this.connection?.Close();
this.connection?.Dispose();
await base.Stop();
}
}
public class MultiDbModuleSettings : FlowModuleSettings
{
[Required]
[Display(Name = "Database Provider")]
[Description("Type of database")]
[DefaultValue("SqlServer")]
public string DatabaseProvider { get; set; } = "SqlServer";
[JsonSchemaExtensionData("x-credential", "ConnectionString")]
[Display(Name = "Database Credentials")]
public Guid? DbCredential { get; set; }
}
Message Queue Integration
RabbitMQ Example
public class RabbitMqModule : FlowModule<RabbitMqModuleSettings>
{
private IConnection connection;
private IModel channel;
public override string UserFriendlyName => "RabbitMQ Publisher";
public RabbitMqModule() : base(FlowModuleType.Output) { }
public override async Task<IError?> Initialize()
{
// Get credentials
if (!this.Settings.QueueCredential.HasValue)
{
return new Error("Queue credential not configured");
}
var credentials = await this.GetCredentialContentAsync<CredentialWithUsernamePassword>(
this.Settings.QueueCredential.Value);
if (credentials.IsError || credentials.Value is null)
{
return credentials.Error ?? new Error("Failed to get queue credential");
}
// Setup connection
var factory = new ConnectionFactory
{
HostName = this.Settings.Host,
Port = this.Settings.Port,
UserName = credentials.Value.Username,
Password = credentials.Value.Password,
VirtualHost = this.Settings.VirtualHost,
RequestedHeartbeat = TimeSpan.FromSeconds(60),
NetworkRecoveryInterval = TimeSpan.FromSeconds(10),
AutomaticRecoveryEnabled = true
};
try
{
this.connection = factory.CreateConnection($"{this.UserFriendlyName}-{this.Id}");
this.channel = this.connection.CreateModel();
// Declare queue
this.channel.QueueDeclare(
queue: this.Settings.QueueName,
durable: this.Settings.DurableQueue,
exclusive: false,
autoDelete: false,
arguments: null
);
Information("Connected to RabbitMQ queue {Queue} at {Host}:{Port}",
this.Settings.QueueName, this.Settings.Host, this.Settings.Port);
}
catch (Exception ex)
{
return new Error($"Failed to connect to RabbitMQ: {ex.Message}");
}
return await base.Initialize();
}
protected override async Task MessageReceived(IFlowMessage message)
{
try
{
var messageBody = message.ToJSON();
var body = Encoding.UTF8.GetBytes(messageBody);
var properties = this.channel.CreateBasicProperties();
properties.Persistent = this.Settings.PersistentMessages;
properties.ContentType = "application/json";
properties.MessageId = Guid.NewGuid().ToString();
properties.Timestamp = new AmqpTimestamp(
DateTimeOffset.UtcNow.ToUnixTimeSeconds());
// Optional routing key from message
var routingKey = message.Get<string>("routingKey")
?? this.Settings.RoutingKey
?? this.Settings.QueueName;
this.channel.BasicPublish(
exchange: this.Settings.Exchange ?? "",
routingKey: routingKey,
basicProperties: properties,
body: body
);
message.Set("published", true);
message.Set("messageId", properties.MessageId);
message.Set("queue", this.Settings.QueueName);
}
catch (Exception ex)
{
Error(ex, "Failed to publish message to RabbitMQ");
var error = $"Publish failed: {ex.Message}";
this.SetStatus(Status.Warning, error);
message.SetError(error);
}
finally
{
// Always forward the message
await this.Next(message);
}
}
public override async Task Stop()
{
try
{
this.channel?.Close();
this.channel?.Dispose();
this.connection?.Close();
this.connection?.Dispose();
Information("Disconnected from RabbitMQ");
}
catch (Exception ex)
{
Warning(ex, "Error closing RabbitMQ connection");
}
await base.Stop();
}
}
public class RabbitMqModuleSettings : FlowModuleSettings
{
[Required]
[Display(Name = "Host")]
[Description("RabbitMQ server hostname")]
public string Host { get; set; }
[Range(1, 65535)]
[Display(Name = "Port")]
[DefaultValue(5672)]
public int Port { get; set; } = 5672;
[Display(Name = "Virtual Host")]
[DefaultValue("/")]
public string VirtualHost { get; set; } = "/";
[Required]
[Display(Name = "Queue Name")]
[Description("Name of the queue to publish to")]
public string QueueName { get; set; }
[Display(Name = "Exchange")]
[Description("Optional exchange name (leave empty for default)")]
public string Exchange { get; set; }
[Display(Name = "Routing Key")]
[Description("Optional routing key (defaults to queue name)")]
public string RoutingKey { get; set; }
[Display(Name = "Durable Queue")]
[Description("Queue survives broker restart")]
[DefaultValue(true)]
public bool DurableQueue { get; set; } = true;
[Display(Name = "Persistent Messages")]
[Description("Messages survive broker restart")]
[DefaultValue(true)]
public bool PersistentMessages { get; set; } = true;
[JsonSchemaExtensionData("x-credential", "UsernamePassword")]
[Display(Name = "Queue Credentials")]
public Guid? QueueCredential { get; set; }
}
Resource File Handling
Text File Resources
public override async Task<IError?> Initialize()
{
if (!this.Settings.ConfigFile.HasValue)
{
return new Error("Configuration file not specified");
}
// Get resource as string (for text files like JSON, XML, CSV)
var resource = await this.GetResourceContentAsync<string>(
this.Settings.ConfigFile.Value);
if (!resource.IsSuccess)
{
return new Error("Failed to load configuration file");
}
var fileContent = resource.Result;
if (string.IsNullOrEmpty(fileContent))
{
return new Error("Configuration file is empty");
}
// Parse based on format
try
{
if (this.IsJson(fileContent))
{
this.configuration = JsonSerializer.Deserialize<ModuleConfiguration>(fileContent);
}
else if (this.IsXml(fileContent))
{
var doc = new XmlDocument();
doc.LoadXml(fileContent);
this.ProcessXmlDocument(doc);
}
else
{
return new Error("Unsupported file format");
}
}
catch (Exception ex)
{
return new Error($"Failed to parse configuration file: {ex.Message}");
}
return await base.Initialize();
}
private bool IsJson(string content)
{
content = content.Trim();
return (content.StartsWith("{") && content.EndsWith("}")) ||
(content.StartsWith("[") && content.EndsWith("]"));
}
private bool IsXml(string content)
{
content = content.Trim();
return content.StartsWith("<?xml") || content.StartsWith("<");
}
Binary File Resources
public override async Task<IError?> Initialize()
{
if (!this.Settings.DataFile.HasValue)
{
return new Error("Data file not specified");
}
// Get resource as byte array (for binary files)
var resource = await this.GetResourceContentAsync<byte[]>(
this.Settings.DataFile.Value);
if (!resource.IsSuccess)
{
return new Error("Failed to load data file");
}
var resourceBytes = resource.Result;
if (resourceBytes == null || resourceBytes.Length == 0)
{
return new Error("Data file is empty");
}
// Process binary data
try
{
// Example: Save to temp file for processing
var tempPath = Path.Combine(Path.GetTempPath(),
$"module_{Guid.NewGuid()}.dat");
await File.WriteAllBytesAsync(tempPath, resourceBytes);
// Process the file...
this.ProcessBinaryFile(tempPath);
// Clean up
File.Delete(tempPath);
}
catch (Exception ex)
{
return new Error($"Failed to process data file: {ex.Message}");
}
return await base.Initialize();
}
Integration Best Practices
1. Connection Health Checks
public override async Task<IError?> Start()
{
// Start background health check
this.healthCheckTimer = new Timer(
async _ => await this.CheckConnectionHealth(),
null,
TimeSpan.FromMinutes(1),
TimeSpan.FromMinutes(1)
);
return await base.Start();
}
private async Task CheckConnectionHealth()
{
try
{
var response = await this.httpClient.GetAsync("/health");
if (!response.IsSuccessStatusCode)
{
Warning("Health check failed: {StatusCode}", response.StatusCode);
this.SetStatus(Status.Warning, "External service health check failed");
}
}
catch (Exception ex)
{
Warning(ex, "Health check failed");
this.SetStatus(Status.Warning, "Cannot reach external service");
}
}
2. Retry Logic
See Error Handling for complete retry strategies.
3. Timeout Configuration
Always configure timeouts for external calls:
this.httpClient.Timeout = TimeSpan.FromSeconds(this.Settings.TimeoutSeconds);
command.CommandTimeout = this.Settings.QueryTimeout;
4. Secure Credential Handling
See Security Best Practices for complete guidance.
Summary
Key principles for external integrations:
- Always use credentials - Never hardcode sensitive information
- Test connections - Validate connectivity during Initialize()
- Handle errors gracefully - Always forward messages, even on failure
- Configure timeouts - Prevent hanging operations
- Reuse connections - Initialize clients once, use many times
- Monitor health - Implement health checks for long-running connections
- Log appropriately - Log operations but never log credentials
For more details on specific topics:
- Security Best Practices - Secure credential handling
- Error Handling - Retry strategies and error patterns
- Performance Optimization - Connection pooling and efficiency