Make more backtest parallele and run bundle health only on instance 1

This commit is contained in:
2025-11-13 12:22:23 +07:00
parent 27e2cf0a09
commit 155fb2b569
3 changed files with 56 additions and 36 deletions

View File

@@ -12,8 +12,8 @@ using OpenTelemetry.Resources;
using OpenTelemetry.Trace; using OpenTelemetry.Trace;
// Explicitly set the environment before creating the host builder // Explicitly set the environment before creating the host builder
var environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT") var environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")
?? Environment.GetEnvironmentVariable("DOTNET_ENVIRONMENT") ?? Environment.GetEnvironmentVariable("DOTNET_ENVIRONMENT")
?? "Development"; ?? "Development";
var hostBuilder = new HostBuilder() var hostBuilder = new HostBuilder()
@@ -24,28 +24,28 @@ var host = hostBuilder
.ConfigureAppConfiguration((hostingContext, config) => .ConfigureAppConfiguration((hostingContext, config) =>
{ {
var detectedEnv = hostingContext.HostingEnvironment.EnvironmentName; var detectedEnv = hostingContext.HostingEnvironment.EnvironmentName;
if (detectedEnv != environment) if (detectedEnv != environment)
{ {
Console.WriteLine($"⚠️ WARNING: Environment mismatch! Expected: {environment}, Got: {detectedEnv}"); Console.WriteLine($"⚠️ WARNING: Environment mismatch! Expected: {environment}, Got: {detectedEnv}");
} }
config.SetBasePath(AppContext.BaseDirectory); config.SetBasePath(AppContext.BaseDirectory);
// Load configuration files in order (later files override earlier ones) // Load configuration files in order (later files override earlier ones)
// 1. Base appsettings.json (always loaded) // 1. Base appsettings.json (always loaded)
config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true); config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
// 2. Load ONLY the environment-specific file (not other environments) // 2. Load ONLY the environment-specific file (not other environments)
if (!string.IsNullOrEmpty(detectedEnv)) if (!string.IsNullOrEmpty(detectedEnv))
{ {
var envFile = $"appsettings.{detectedEnv}.json"; var envFile = $"appsettings.{detectedEnv}.json";
config.AddJsonFile(envFile, optional: true, reloadOnChange: true); config.AddJsonFile(envFile, optional: true, reloadOnChange: true);
} }
// 3. Environment variables (highest priority) // 3. Environment variables (highest priority)
config.AddEnvironmentVariables(); config.AddEnvironmentVariables();
// User secrets only in development (requires ASP.NET Core, so we skip in production) // User secrets only in development (requires ASP.NET Core, so we skip in production)
if (detectedEnv == "Development") if (detectedEnv == "Development")
{ {
@@ -82,7 +82,7 @@ var host = hostBuilder
var serviceName = "Managing.Workers"; var serviceName = "Managing.Workers";
var serviceVersion = typeof(Program).Assembly.GetName().Version?.ToString() ?? "1.0.0"; var serviceVersion = typeof(Program).Assembly.GetName().Version?.ToString() ?? "1.0.0";
var otlpWorkerId = configuration["BacktestComputeWorker:WorkerId"] ?? Environment.MachineName; var otlpWorkerId = configuration["BacktestComputeWorker:WorkerId"] ?? Environment.MachineName;
services.AddOpenTelemetry() services.AddOpenTelemetry()
.ConfigureResource(resource => resource .ConfigureResource(resource => resource
.AddService(serviceName, serviceVersion: serviceVersion) .AddService(serviceName, serviceVersion: serviceVersion)
@@ -108,7 +108,7 @@ var host = hostBuilder
.AddSource("Managing.Application.Workers") // Worker activities .AddSource("Managing.Application.Workers") // Worker activities
.AddOtlpExporter(); // OTLP exporter for Rider integration .AddOtlpExporter(); // OTLP exporter for Rider integration
}); });
// Note: OTLP exporter will use OTEL_EXPORTER_OTLP_ENDPOINT from Rider or environment // Note: OTLP exporter will use OTEL_EXPORTER_OTLP_ENDPOINT from Rider or environment
// Rider automatically sets this when running from IDE, so data will be sent to Rider's OpenTelemetry service // Rider automatically sets this when running from IDE, so data will be sent to Rider's OpenTelemetry service
} }
@@ -125,11 +125,11 @@ var host = hostBuilder
// Configure connection timeout (default is 15 seconds, increase for network latency) // Configure connection timeout (default is 15 seconds, increase for network latency)
Timeout = 30, // 30 seconds for connection establishment Timeout = 30, // 30 seconds for connection establishment
CommandTimeout = 60, // 60 seconds for command execution CommandTimeout = 60, // 60 seconds for command execution
// Configure connection pooling for better performance and reliability // Configure connection pooling for better performance and reliability
MaxPoolSize = 100, // Maximum pool size MaxPoolSize = 100, // Maximum pool size
MinPoolSize = 5, // Minimum pool size MinPoolSize = 5, // Minimum pool size
// Configure KeepAlive to maintain connections and detect network issues // Configure KeepAlive to maintain connections and detect network issues
KeepAlive = 300 // 5 minutes keepalive interval KeepAlive = 300 // 5 minutes keepalive interval
}; };
@@ -142,8 +142,8 @@ var host = hostBuilder
{ {
// Enable retry on failure for transient errors // Enable retry on failure for transient errors
npgsqlOptions.EnableRetryOnFailure( npgsqlOptions.EnableRetryOnFailure(
maxRetryCount: 5, maxRetryCount: 5,
maxRetryDelay: TimeSpan.FromSeconds(10), maxRetryDelay: TimeSpan.FromSeconds(10),
errorCodesToAdd: null); errorCodesToAdd: null);
}); });
@@ -168,31 +168,25 @@ var host = hostBuilder
// Get task slot from CapRover ({{.Task.Slot}}) or environment variable // Get task slot from CapRover ({{.Task.Slot}}) or environment variable
// This identifies which instance of the worker is running // This identifies which instance of the worker is running
var taskSlot = Environment.GetEnvironmentVariable("TASK_SLOT") ?? var taskSlot = Environment.GetEnvironmentVariable("TASK_SLOT") ??
Environment.GetEnvironmentVariable("CAPROVER_TASK_SLOT") ?? Environment.GetEnvironmentVariable("CAPROVER_TASK_SLOT") ??
"0"; "0";
// Override WorkerId from environment variable if provided, otherwise use task slot // Override WorkerId from environment variable if provided, otherwise use task slot
var workerId = Environment.GetEnvironmentVariable("WORKER_ID") ?? var workerId = Environment.GetEnvironmentVariable("WORKER_ID") ??
configuration["BacktestComputeWorker:WorkerId"] ?? configuration["BacktestComputeWorker:WorkerId"] ??
$"{Environment.MachineName}-{taskSlot}"; $"{Environment.MachineName}-{taskSlot}";
services.Configure<BacktestComputeWorkerOptions>(options => services.Configure<BacktestComputeWorkerOptions>(options => { options.WorkerId = workerId; });
{
options.WorkerId = workerId;
});
// Configure GeneticComputeWorker options // Configure GeneticComputeWorker options
services.Configure<GeneticComputeWorkerOptions>( services.Configure<GeneticComputeWorkerOptions>(
configuration.GetSection(GeneticComputeWorkerOptions.SectionName)); configuration.GetSection(GeneticComputeWorkerOptions.SectionName));
// Override Genetic WorkerId from environment variable if provided, otherwise use task slot // Override Genetic WorkerId from environment variable if provided, otherwise use task slot
var geneticWorkerId = Environment.GetEnvironmentVariable("GENETIC_WORKER_ID") ?? var geneticWorkerId = Environment.GetEnvironmentVariable("GENETIC_WORKER_ID") ??
configuration["GeneticComputeWorker:WorkerId"] ?? configuration["GeneticComputeWorker:WorkerId"] ??
$"{Environment.MachineName}-genetic-{taskSlot}"; $"{Environment.MachineName}-genetic-{taskSlot}";
services.Configure<GeneticComputeWorkerOptions>(options => services.Configure<GeneticComputeWorkerOptions>(options => { options.WorkerId = geneticWorkerId; });
{
options.WorkerId = geneticWorkerId;
});
// Register the backtest compute worker if enabled // Register the backtest compute worker if enabled
var isBacktestWorkerEnabled = configuration.GetValue<bool>("WorkerBacktestCompute", false); var isBacktestWorkerEnabled = configuration.GetValue<bool>("WorkerBacktestCompute", false);
@@ -210,7 +204,7 @@ var host = hostBuilder
// Register the bundle backtest health check worker if enabled // Register the bundle backtest health check worker if enabled
var isBundleHealthCheckEnabled = configuration.GetValue<bool>("WorkerBundleBacktestHealthCheck", false); var isBundleHealthCheckEnabled = configuration.GetValue<bool>("WorkerBundleBacktestHealthCheck", false);
if (isBundleHealthCheckEnabled) if (isBundleHealthCheckEnabled && taskSlot == "1")
{ {
services.AddHostedService<BundleBacktestHealthCheckWorker>(); services.AddHostedService<BundleBacktestHealthCheckWorker>();
} }
@@ -220,7 +214,7 @@ var host = hostBuilder
logging.ClearProviders(); logging.ClearProviders();
logging.AddConsole(); logging.AddConsole();
logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging")); logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging"));
// Add OpenTelemetry logging for Rider integration (Development only) // Add OpenTelemetry logging for Rider integration (Development only)
if (hostingContext.HostingEnvironment.IsDevelopment()) if (hostingContext.HostingEnvironment.IsDevelopment())
{ {
@@ -232,7 +226,7 @@ var host = hostBuilder
options.AddOtlpExporter(); // Uses OTEL_EXPORTER_OTLP_ENDPOINT from Rider or environment options.AddOtlpExporter(); // Uses OTEL_EXPORTER_OTLP_ENDPOINT from Rider or environment
}); });
} }
// Filter out EF Core database command logs (SQL queries) // Filter out EF Core database command logs (SQL queries)
logging.AddFilter("Microsoft.EntityFrameworkCore.Database.Command", LogLevel.Warning); logging.AddFilter("Microsoft.EntityFrameworkCore.Database.Command", LogLevel.Warning);
}) })
@@ -259,7 +253,7 @@ else
p => p.Split('=')[0].Trim(), p => p.Split('=')[0].Trim(),
p => p.Contains('=') ? p.Substring(p.IndexOf('=') + 1).Trim() : string.Empty, p => p.Contains('=') ? p.Substring(p.IndexOf('=') + 1).Trim() : string.Empty,
StringComparer.OrdinalIgnoreCase); StringComparer.OrdinalIgnoreCase);
var dbHost = connectionParts.GetValueOrDefault("Host", "unknown"); var dbHost = connectionParts.GetValueOrDefault("Host", "unknown");
logger.LogWarning("📊 Database Host: {Host}", dbHost); logger.LogWarning("📊 Database Host: {Host}", dbHost);
} }
@@ -267,7 +261,7 @@ else
{ {
// Failed to parse connection string, continue anyway // Failed to parse connection string, continue anyway
} }
try try
{ {
using var scope = host.Services.CreateScope(); using var scope = host.Services.CreateScope();
@@ -310,4 +304,4 @@ catch (Exception ex)
finally finally
{ {
SentrySdk.FlushAsync(TimeSpan.FromSeconds(2)).Wait(); SentrySdk.FlushAsync(TimeSpan.FromSeconds(2)).Wait();
} }

View File

@@ -0,0 +1,26 @@
{
"WorkerBacktestCompute": true,
"BacktestComputeWorker": {
"MaxConcurrentPerUser": 8,
"MaxConcurrentPerInstance": 30,
"JobPollIntervalSeconds": 5,
"HeartbeatIntervalSeconds": 30,
"StaleJobTimeoutMinutes": 10
},
"WorkerGeneticCompute": true,
"GeneticComputeWorker": {
"MaxConcurrentGenetics": 1,
"JobPollIntervalSeconds": 5,
"HeartbeatIntervalSeconds": 30,
"StaleJobTimeoutMinutes": 10
},
"PostgreSql": {
"ConnectionString": "Host=kaigen-db.kaigen.managing.live;Port=5432;Database=managing;Username=postgres;Password=2ab5423dcca4aa2d"
},
"InfluxDb": {
"Url": "https://influx-db.kaigen.managing.live",
"Organization": "managing-org",
"Token": "ROvQoZ1Dg5jiKDFxB0saEGqHC3rsLkUNlPL6_AFbOcpNjMieIv8v58yA4v5tFU9sX9LLvXEToPvUrxqQEMaWDw=="
}
}

View File

@@ -1,8 +1,8 @@
{ {
"WorkerBacktestCompute": true, "WorkerBacktestCompute": true,
"BacktestComputeWorker": { "BacktestComputeWorker": {
"MaxConcurrentPerUser": 8, "MaxConcurrentPerUser": 15,
"MaxConcurrentPerInstance": 30, "MaxConcurrentPerInstance": 60,
"JobPollIntervalSeconds": 5, "JobPollIntervalSeconds": 5,
"HeartbeatIntervalSeconds": 30, "HeartbeatIntervalSeconds": 30,
"StaleJobTimeoutMinutes": 10 "StaleJobTimeoutMinutes": 10