diff --git a/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs b/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs
index 2d60d374..7247576a 100644
--- a/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs
+++ b/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs
@@ -24,7 +24,7 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
private readonly TimeSpan _inactiveThreshold = TimeSpan.FromMinutes(2); // Check bundles inactive for 2+ minutes
private readonly TimeSpan
- _stuckThreshold = TimeSpan.FromHours(2); // Consider bundle stuck if no progress for 2 hours
+ _stuckThreshold = TimeSpan.FromMinutes(10); // Consider bundle stuck if no progress for 10 minutes
private readonly IMessengerService _messengerService;
@@ -153,11 +153,29 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
return (StuckCount: 0, MissingJobsCount: 1, HealthyCount: 0);
}
- // Check 2: Stuck bundle - Running for too long without progress
+ // Calculate job status counts (used in multiple checks below)
+ var completedJobs = jobs.Count(j => j.Status == JobStatus.Completed);
+ var failedJobs = jobs.Count(j => j.Status == JobStatus.Failed);
+ var runningJobs = jobs.Count(j => j.Status == JobStatus.Running);
+
+ // Check 2: Stuck bundle - Running but no jobs are actually running
if (bundle.Status == BundleBacktestRequestStatus.Running)
{
var timeSinceUpdate = DateTime.UtcNow - bundle.UpdatedAt;
+ // Check if bundle is running but no jobs are being processed
+ // This indicates the old BundleBacktestWorker created the bundle but never created jobs
+ if (runningJobs == 0 && completedJobs == 0 && timeSinceUpdate > TimeSpan.FromMinutes(5))
+ {
+ _logger.LogWarning(
+ "⚠️ Bundle {BundleRequestId} is Running but has no jobs running or completed after {Minutes} minutes. " +
+ "This indicates jobs were never created or never picked up.",
+ bundle.RequestId, timeSinceUpdate.TotalMinutes);
+ await HandleStuckBundleAsync(bundle, timeSinceUpdate, jobs, backtestRepository, jobRepository);
+ return (StuckCount: 1, MissingJobsCount: 0, HealthyCount: 0);
+ }
+
+ // Check if bundle hasn't made progress in a long time
if (timeSinceUpdate > _stuckThreshold)
{
await HandleStuckBundleAsync(bundle, timeSinceUpdate, jobs, backtestRepository, jobRepository);
@@ -186,8 +204,6 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
}
// Check 4: Bundle with all jobs completed but bundle status not updated
- var completedJobs = jobs.Count(j => j.Status == JobStatus.Completed);
- var failedJobs = jobs.Count(j => j.Status == JobStatus.Failed);
var totalProcessedJobs = completedJobs + failedJobs;
if (totalProcessedJobs == bundle.TotalBacktests &&
diff --git a/src/Managing.Application/Workers/BundleBacktestWorker.cs b/src/Managing.Application/Workers/BundleBacktestWorker.cs
index 63e50413..43629163 100644
--- a/src/Managing.Application/Workers/BundleBacktestWorker.cs
+++ b/src/Managing.Application/Workers/BundleBacktestWorker.cs
@@ -1,10 +1,6 @@
-using System.Text.Json;
+using Managing.Application.Abstractions.Repositories;
using Managing.Application.Abstractions.Services;
using Managing.Domain.Backtests;
-using Managing.Domain.Bots;
-using Managing.Domain.MoneyManagements;
-using Managing.Domain.Scenarios;
-using Managing.Domain.Strategies;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
@@ -12,17 +8,17 @@ using static Managing.Common.Enums;
namespace Managing.Application.Workers;
///
-/// Worker for processing bundle backtest requests
+/// Worker for creating jobs for bundle backtest requests.
+/// This worker creates all jobs upfront and lets BacktestComputeWorker handle execution.
+/// BundleBacktestHealthCheckWorker monitors bundle health and handles stuck scenarios.
///
public class BundleBacktestWorker : BaseWorker
{
private readonly IServiceProvider _serviceProvider;
- private readonly IMessengerService _messengerService;
private static readonly WorkerType _workerType = WorkerType.BundleBacktest;
public BundleBacktestWorker(
IServiceProvider serviceProvider,
- IMessengerService messengerService,
ILogger logger) : base(
_workerType,
logger,
@@ -30,21 +26,17 @@ public class BundleBacktestWorker : BaseWorker
serviceProvider)
{
_serviceProvider = serviceProvider;
- _messengerService = messengerService;
}
protected override async Task Run(CancellationToken cancellationToken)
{
- var maxDegreeOfParallelism = 3;
- using var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);
- var processingTasks = new List();
try
{
- // Create a new service scope to get fresh instances of services with scoped DbContext
using var scope = _serviceProvider.CreateScope();
var backtester = scope.ServiceProvider.GetRequiredService();
+ var jobRepository = scope.ServiceProvider.GetRequiredService();
- // Get pending bundle backtest requests
+ // Get pending bundle backtest requests (these need jobs created)
var pendingRequests =
await backtester.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Pending);
@@ -53,24 +45,20 @@ public class BundleBacktestWorker : BaseWorker
if (cancellationToken.IsCancellationRequested)
break;
- await semaphore.WaitAsync(cancellationToken);
- var task = Task.Run(async () =>
+ try
{
- try
- {
- await ProcessBundleRequest(bundleRequest, cancellationToken);
- }
- finally
- {
- semaphore.Release();
- }
- }, cancellationToken);
- processingTasks.Add(task);
+ await ProcessBundleRequest(bundleRequest, backtester, jobRepository);
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError(ex, "Error processing bundle request {RequestId}", bundleRequest.RequestId);
+ // Mark bundle as failed so it doesn't get stuck in Pending status
+ bundleRequest.Status = BundleBacktestRequestStatus.Failed;
+ bundleRequest.ErrorMessage = $"Failed to create jobs: {ex.Message}";
+ bundleRequest.UpdatedAt = DateTime.UtcNow;
+ await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
+ }
}
-
- await Task.WhenAll(processingTasks);
-
- await RetryUnfinishedBacktestsInFailedBundles(backtester, cancellationToken);
}
catch (Exception ex)
{
@@ -79,367 +67,50 @@ public class BundleBacktestWorker : BaseWorker
}
}
- private async Task ProcessBundleRequest(BundleBacktestRequest bundleRequest, CancellationToken cancellationToken)
- {
- // Create a new service scope for this task to avoid DbContext concurrency issues
- using var scope = _serviceProvider.CreateScope();
- var backtester = scope.ServiceProvider.GetRequiredService();
-
- try
- {
- _logger.LogInformation("Starting to process bundle backtest request {RequestId}", bundleRequest.RequestId);
-
- // Update status to running
- bundleRequest.Status = BundleBacktestRequestStatus.Running;
- await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
-
- // Generate backtest requests from the new variant structure
- var backtestRequests = await GenerateBacktestRequestsFromVariants(bundleRequest);
- if (backtestRequests == null || !backtestRequests.Any())
- {
- throw new InvalidOperationException("Failed to generate backtest requests from variants");
- }
-
- // Process each backtest request
- for (int i = 0; i < backtestRequests.Count; i++)
- {
- if (cancellationToken.IsCancellationRequested)
- break;
-
- try
- {
- var runBacktestRequest = backtestRequests[i];
- // Update current backtest being processed
- bundleRequest.CurrentBacktest = $"Backtest {i + 1} of {backtestRequests.Count}";
- await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
-
- // Run the backtest directly with the strongly-typed request
- var backtestId = await RunSingleBacktest(backtester, runBacktestRequest, bundleRequest, i,
- cancellationToken);
- if (!string.IsNullOrEmpty(backtestId))
- {
- bundleRequest.Results.Add(backtestId);
- }
-
- // Update progress
- bundleRequest.CompletedBacktests++;
- await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
-
- _logger.LogInformation("Completed backtest {Index} for bundle request {RequestId}",
- i + 1, bundleRequest.RequestId);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error processing backtest {Index} for bundle request {RequestId}",
- i + 1, bundleRequest.RequestId);
- bundleRequest.FailedBacktests++;
- SentrySdk.CaptureException(ex);
- await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
- }
- }
-
- // Update final status and send notifications
- if (bundleRequest.FailedBacktests == 0)
- {
- bundleRequest.Status = BundleBacktestRequestStatus.Completed;
- // Send Telegram message to the user's channelId
- await NotifyUser(bundleRequest);
- }
- else if (bundleRequest.CompletedBacktests == 0)
- {
- bundleRequest.Status = BundleBacktestRequestStatus.Failed;
- bundleRequest.ErrorMessage = "All backtests failed";
- }
- else
- {
- bundleRequest.Status = BundleBacktestRequestStatus.Completed;
- bundleRequest.ErrorMessage = $"{bundleRequest.FailedBacktests} backtests failed";
- // Send Telegram message to the user's channelId even with partial failures
- await NotifyUser(bundleRequest);
- }
-
- bundleRequest.CompletedAt = DateTime.UtcNow;
- bundleRequest.CurrentBacktest = null;
- await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
-
- _logger.LogInformation("Completed processing bundle backtest request {RequestId} with status {Status}",
- bundleRequest.RequestId, bundleRequest.Status);
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error processing bundle backtest request {RequestId}", bundleRequest.RequestId);
-
- bundleRequest.Status = BundleBacktestRequestStatus.Failed;
- bundleRequest.ErrorMessage = ex.Message;
- bundleRequest.CompletedAt = DateTime.UtcNow;
- await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
- }
- }
-
- private async Task NotifyUser(BundleBacktestRequest bundleRequest)
- {
- if (bundleRequest.User?.TelegramChannel != null)
- {
- var message =
- $"⚠️ Bundle backtest '{bundleRequest.Name}' (ID: {bundleRequest.RequestId}) completed with {bundleRequest.FailedBacktests} failed backtests.";
- await _messengerService.SendMessage(message, bundleRequest.User.TelegramChannel);
- }
- }
-
- // Change RunSingleBacktest to accept RunBacktestRequest directly
- private async Task RunSingleBacktest(IBacktester backtester, RunBacktestRequest runBacktestRequest,
+ private async Task ProcessBundleRequest(
BundleBacktestRequest bundleRequest,
- int index, CancellationToken cancellationToken)
+ IBacktester backtester,
+ IJobRepository jobRepository)
{
- if (runBacktestRequest == null || runBacktestRequest.Config == null)
+ // Check if jobs already exist for this bundle
+ var existingJobs = (await jobRepository.GetByBundleRequestIdAsync(bundleRequest.RequestId)).ToList();
+
+ if (existingJobs.Any())
{
- _logger.LogError("Invalid RunBacktestRequest in bundle (null config)");
- return string.Empty;
- }
-
- // Map MoneyManagement
- MoneyManagement moneyManagement = null;
- if (!string.IsNullOrEmpty(runBacktestRequest.Config.MoneyManagementName))
- {
- // In worker context, we cannot resolve by name (no user/db), so skip or set null
- // Optionally, log a warning
- _logger.LogWarning("MoneyManagementName provided but cannot resolve in worker context: {Name}",
- (string)runBacktestRequest.Config.MoneyManagementName);
- }
- else if (runBacktestRequest.Config.MoneyManagement != null)
- {
- var mmReq = runBacktestRequest.Config.MoneyManagement;
- moneyManagement = new MoneyManagement
+ _logger.LogInformation(
+ "Bundle request {RequestId} already has {JobCount} jobs. Skipping job creation.",
+ bundleRequest.RequestId, existingJobs.Count);
+
+ // Ensure bundle is in Running status if jobs exist
+ if (bundleRequest.Status == BundleBacktestRequestStatus.Pending)
{
- Name = mmReq.Name,
- Timeframe = mmReq.Timeframe,
- StopLoss = mmReq.StopLoss,
- TakeProfit = mmReq.TakeProfit,
- Leverage = mmReq.Leverage
- };
- moneyManagement.FormatPercentage();
- }
-
- // Map Scenario
- LightScenario scenario = null;
- if (runBacktestRequest.Config.Scenario != null)
- {
- var sReq = runBacktestRequest.Config.Scenario;
- scenario = new LightScenario(sReq.Name, sReq.LookbackPeriod)
- {
- Indicators = sReq.Indicators?.Select(i => new LightIndicator(i.Name, i.Type)
- {
- MinimumHistory = i.MinimumHistory,
- Period = i.Period,
- FastPeriods = i.FastPeriods,
- SlowPeriods = i.SlowPeriods,
- SignalPeriods = i.SignalPeriods,
- Multiplier = i.Multiplier,
- SmoothPeriods = i.SmoothPeriods,
- StochPeriods = i.StochPeriods,
- CyclePeriods = i.CyclePeriods
- }).ToList() ?? new List()
- };
- }
-
- // Map TradingBotConfig
- var backtestConfig = new TradingBotConfig
- {
- AccountName = runBacktestRequest.Config.AccountName,
- MoneyManagement = moneyManagement,
- Ticker = runBacktestRequest.Config.Ticker,
- ScenarioName = runBacktestRequest.Config.ScenarioName,
- Scenario = scenario,
- Timeframe = runBacktestRequest.Config.Timeframe,
- IsForWatchingOnly = runBacktestRequest.Config.IsForWatchingOnly,
- BotTradingBalance = runBacktestRequest.Config.BotTradingBalance,
- TradingType = TradingType.BacktestFutures,
- CooldownPeriod = runBacktestRequest.Config.CooldownPeriod ?? 1,
- MaxLossStreak = runBacktestRequest.Config.MaxLossStreak,
- MaxPositionTimeHours = runBacktestRequest.Config.MaxPositionTimeHours,
- FlipOnlyWhenInProfit = runBacktestRequest.Config.FlipOnlyWhenInProfit,
- FlipPosition = runBacktestRequest.Config.FlipPosition,
- Name = $"{bundleRequest.Name} #{index + 1}",
- CloseEarlyWhenProfitable = runBacktestRequest.Config.CloseEarlyWhenProfitable,
- UseSynthApi = runBacktestRequest.Config.UseSynthApi,
- UseForPositionSizing = runBacktestRequest.Config.UseForPositionSizing,
- UseForSignalFiltering = runBacktestRequest.Config.UseForSignalFiltering,
- UseForDynamicStopLoss = runBacktestRequest.Config.UseForDynamicStopLoss
- };
-
- // Run the backtest (no user context)
- var result = await backtester.RunTradingBotBacktest(
- backtestConfig,
- runBacktestRequest.StartDate,
- runBacktestRequest.EndDate,
- bundleRequest.User, // No user context in worker
- true,
- runBacktestRequest.WithCandles,
- bundleRequest.RequestId.ToString() // Use bundleRequestId as requestId for traceability
- );
-
- _logger.LogInformation("Processed backtest for bundle request {RequestId}", bundleRequest.RequestId);
- // Assume the backtest is created and you have its ID (e.g., backtest.Id)
- // Return the backtest ID
- return result.Id;
- }
-
- private async Task RetryUnfinishedBacktestsInFailedBundles(IBacktester backtester,
- CancellationToken cancellationToken)
- {
- var failedBundles = await backtester.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Failed);
- foreach (var failedBundle in failedBundles)
- {
- if (cancellationToken.IsCancellationRequested)
- break;
-
- // Use Results property to determine which backtests need to be retried
- var succeededIds = new HashSet(failedBundle.Results ?? new List());
-
- // Generate backtest requests from the new variant structure
- var originalRequests = await GenerateBacktestRequestsFromVariants(failedBundle);
- if (originalRequests == null || !originalRequests.Any()) continue;
-
- for (int i = failedBundle.CompletedBacktests; i < originalRequests.Count; i++)
- {
- var expectedId = /* logic to compute expected backtest id for this request */ string.Empty;
- // If this backtest was not run or did not succeed, re-run it
- if (!succeededIds.Contains(expectedId))
- {
- var backtestId = await RunSingleBacktest(backtester, originalRequests[i], failedBundle, i,
- cancellationToken);
- if (!string.IsNullOrEmpty(backtestId))
- {
- failedBundle.Results?.Add(backtestId);
- failedBundle.CompletedBacktests++;
- await backtester.UpdateBundleBacktestRequestAsync(failedBundle);
- }
- }
- }
-
- // If all backtests succeeded, update the bundle status
- if (failedBundle.CompletedBacktests == originalRequests.Count)
- {
- failedBundle.Status = BundleBacktestRequestStatus.Completed;
- failedBundle.ErrorMessage = null; // Clear any previous error
- failedBundle.CompletedAt = DateTime.UtcNow;
- await backtester.UpdateBundleBacktestRequestAsync(failedBundle);
-
- // Notify user about successful retry
- await NotifyUser(failedBundle);
- }
- else
- {
- _logger.LogWarning("Bundle {RequestId} still has unfinished backtests after retry",
- failedBundle.RequestId);
- }
- }
- }
-
- ///
- /// Generates individual backtest requests from variant configuration
- ///
- private async Task> GenerateBacktestRequestsFromVariants(
- BundleBacktestRequest bundleRequest)
- {
- try
- {
- // Deserialize the variant configurations
- var universalConfig =
- JsonSerializer.Deserialize(bundleRequest.UniversalConfigJson);
- var dateTimeRanges = JsonSerializer.Deserialize>(bundleRequest.DateTimeRangesJson);
- var moneyManagementVariants =
- JsonSerializer.Deserialize>(bundleRequest.MoneyManagementVariantsJson);
- var tickerVariants = JsonSerializer.Deserialize>(bundleRequest.TickerVariantsJson);
-
- if (universalConfig == null || dateTimeRanges == null || moneyManagementVariants == null ||
- tickerVariants == null)
- {
- _logger.LogError("Failed to deserialize variant configurations for bundle request {RequestId}",
+ bundleRequest.Status = BundleBacktestRequestStatus.Running;
+ bundleRequest.TotalBacktests = existingJobs.Count;
+ bundleRequest.UpdatedAt = DateTime.UtcNow;
+ await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
+ _logger.LogInformation(
+ "Updated bundle request {RequestId} to Running status",
bundleRequest.RequestId);
- return new List();
}
-
- // Get the first account for the user using AccountService
- using var scope = _serviceProvider.CreateScope();
- var accountService = scope.ServiceProvider.GetRequiredService();
- var accounts =
- await accountService.GetAccountsByUserAsync(bundleRequest.User, hideSecrets: true, getBalance: false);
- var firstAccount = accounts.FirstOrDefault();
-
- if (firstAccount == null)
- {
- _logger.LogError("No accounts found for user {UserId} in bundle request {RequestId}",
- bundleRequest.User.Id, bundleRequest.RequestId);
- return new List();
- }
-
- var backtestRequests = new List();
-
- foreach (var dateRange in dateTimeRanges)
- {
- foreach (var mmVariant in moneyManagementVariants)
- {
- foreach (var ticker in tickerVariants)
- {
- var config = new TradingBotConfigRequest
- {
- AccountName = firstAccount.Name,
- Ticker = ticker,
- Timeframe = universalConfig.Timeframe,
- IsForWatchingOnly = universalConfig.IsForWatchingOnly,
- BotTradingBalance = universalConfig.BotTradingBalance,
- Name =
- $"{universalConfig.BotName}_{ticker}_{dateRange.StartDate:yyyyMMdd}_{dateRange.EndDate:yyyyMMdd}",
- FlipPosition = universalConfig.FlipPosition,
- CooldownPeriod = universalConfig.CooldownPeriod,
- MaxLossStreak = universalConfig.MaxLossStreak,
- Scenario = universalConfig.Scenario,
- ScenarioName = universalConfig.ScenarioName,
- MoneyManagement = mmVariant.MoneyManagement,
- MaxPositionTimeHours = universalConfig.MaxPositionTimeHours,
- CloseEarlyWhenProfitable = universalConfig.CloseEarlyWhenProfitable,
- FlipOnlyWhenInProfit = universalConfig.FlipOnlyWhenInProfit,
- UseSynthApi = universalConfig.UseSynthApi,
- UseForPositionSizing = universalConfig.UseForPositionSizing,
- UseForSignalFiltering = universalConfig.UseForSignalFiltering,
- UseForDynamicStopLoss = universalConfig.UseForDynamicStopLoss,
- TradingType = universalConfig.TradingType
- };
-
- var backtestRequest = new RunBacktestRequest
- {
- Config = config,
- StartDate = dateRange.StartDate,
- EndDate = dateRange.EndDate,
- Balance = universalConfig.BotTradingBalance,
- WatchOnly = universalConfig.WatchOnly,
- Save = universalConfig.Save,
- WithCandles = false, // Bundle backtests never return candles
- MoneyManagement = mmVariant.MoneyManagement != null
- ? new MoneyManagement
- {
- Name = mmVariant.MoneyManagement.Name,
- Timeframe = mmVariant.MoneyManagement.Timeframe,
- StopLoss = mmVariant.MoneyManagement.StopLoss,
- TakeProfit = mmVariant.MoneyManagement.TakeProfit,
- Leverage = mmVariant.MoneyManagement.Leverage
- }
- : null
- };
-
- backtestRequests.Add(backtestRequest);
- }
- }
- }
-
- return backtestRequests;
- }
- catch (Exception ex)
- {
- _logger.LogError(ex, "Error generating backtest requests from variants for bundle request {RequestId}",
- bundleRequest.RequestId);
- return new List();
+ return;
}
+
+ _logger.LogInformation("Creating jobs for bundle request {RequestId}", bundleRequest.RequestId);
+
+ // Create all jobs for this bundle using the service method
+ await backtester.CreateJobsForBundleRequestAsync(bundleRequest);
+
+ // Get the created jobs to set TotalBacktests
+ var createdJobs = (await jobRepository.GetByBundleRequestIdAsync(bundleRequest.RequestId)).ToList();
+
+ // Update bundle status to Running and set TotalBacktests
+ bundleRequest.Status = BundleBacktestRequestStatus.Running;
+ bundleRequest.TotalBacktests = createdJobs.Count;
+ bundleRequest.UpdatedAt = DateTime.UtcNow;
+ await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
+
+ _logger.LogInformation(
+ "Successfully created {JobCount} jobs for bundle request {RequestId}. Bundle is now Running.",
+ createdJobs.Count, bundleRequest.RequestId);
}
}
\ No newline at end of file