From f60277d81d543208ab3979f7ba072f5acfaf0043 Mon Sep 17 00:00:00 2001 From: cryptooda Date: Fri, 26 Dec 2025 04:27:08 +0700 Subject: [PATCH] Update backtest worker to fix stuck backtest --- .../BundleBacktestHealthCheckWorker.cs | 24 +- .../Workers/BundleBacktestWorker.cs | 441 +++--------------- 2 files changed, 76 insertions(+), 389 deletions(-) diff --git a/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs b/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs index 2d60d374..7247576a 100644 --- a/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs +++ b/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs @@ -24,7 +24,7 @@ public class BundleBacktestHealthCheckWorker : BackgroundService private readonly TimeSpan _inactiveThreshold = TimeSpan.FromMinutes(2); // Check bundles inactive for 2+ minutes private readonly TimeSpan - _stuckThreshold = TimeSpan.FromHours(2); // Consider bundle stuck if no progress for 2 hours + _stuckThreshold = TimeSpan.FromMinutes(10); // Consider bundle stuck if no progress for 10 minutes private readonly IMessengerService _messengerService; @@ -153,11 +153,29 @@ public class BundleBacktestHealthCheckWorker : BackgroundService return (StuckCount: 0, MissingJobsCount: 1, HealthyCount: 0); } - // Check 2: Stuck bundle - Running for too long without progress + // Calculate job status counts (used in multiple checks below) + var completedJobs = jobs.Count(j => j.Status == JobStatus.Completed); + var failedJobs = jobs.Count(j => j.Status == JobStatus.Failed); + var runningJobs = jobs.Count(j => j.Status == JobStatus.Running); + + // Check 2: Stuck bundle - Running but no jobs are actually running if (bundle.Status == BundleBacktestRequestStatus.Running) { var timeSinceUpdate = DateTime.UtcNow - bundle.UpdatedAt; + // Check if bundle is running but no jobs are being processed + // This indicates the old BundleBacktestWorker created the bundle but never created jobs + if (runningJobs == 0 && completedJobs == 0 && timeSinceUpdate > TimeSpan.FromMinutes(5)) + { + _logger.LogWarning( + "⚠️ Bundle {BundleRequestId} is Running but has no jobs running or completed after {Minutes} minutes. " + + "This indicates jobs were never created or never picked up.", + bundle.RequestId, timeSinceUpdate.TotalMinutes); + await HandleStuckBundleAsync(bundle, timeSinceUpdate, jobs, backtestRepository, jobRepository); + return (StuckCount: 1, MissingJobsCount: 0, HealthyCount: 0); + } + + // Check if bundle hasn't made progress in a long time if (timeSinceUpdate > _stuckThreshold) { await HandleStuckBundleAsync(bundle, timeSinceUpdate, jobs, backtestRepository, jobRepository); @@ -186,8 +204,6 @@ public class BundleBacktestHealthCheckWorker : BackgroundService } // Check 4: Bundle with all jobs completed but bundle status not updated - var completedJobs = jobs.Count(j => j.Status == JobStatus.Completed); - var failedJobs = jobs.Count(j => j.Status == JobStatus.Failed); var totalProcessedJobs = completedJobs + failedJobs; if (totalProcessedJobs == bundle.TotalBacktests && diff --git a/src/Managing.Application/Workers/BundleBacktestWorker.cs b/src/Managing.Application/Workers/BundleBacktestWorker.cs index 63e50413..43629163 100644 --- a/src/Managing.Application/Workers/BundleBacktestWorker.cs +++ b/src/Managing.Application/Workers/BundleBacktestWorker.cs @@ -1,10 +1,6 @@ -using System.Text.Json; +using Managing.Application.Abstractions.Repositories; using Managing.Application.Abstractions.Services; using Managing.Domain.Backtests; -using Managing.Domain.Bots; -using Managing.Domain.MoneyManagements; -using Managing.Domain.Scenarios; -using Managing.Domain.Strategies; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using static Managing.Common.Enums; @@ -12,17 +8,17 @@ using static Managing.Common.Enums; namespace Managing.Application.Workers; /// -/// Worker for processing bundle backtest requests +/// Worker for creating jobs for bundle backtest requests. +/// This worker creates all jobs upfront and lets BacktestComputeWorker handle execution. +/// BundleBacktestHealthCheckWorker monitors bundle health and handles stuck scenarios. /// public class BundleBacktestWorker : BaseWorker { private readonly IServiceProvider _serviceProvider; - private readonly IMessengerService _messengerService; private static readonly WorkerType _workerType = WorkerType.BundleBacktest; public BundleBacktestWorker( IServiceProvider serviceProvider, - IMessengerService messengerService, ILogger logger) : base( _workerType, logger, @@ -30,21 +26,17 @@ public class BundleBacktestWorker : BaseWorker serviceProvider) { _serviceProvider = serviceProvider; - _messengerService = messengerService; } protected override async Task Run(CancellationToken cancellationToken) { - var maxDegreeOfParallelism = 3; - using var semaphore = new SemaphoreSlim(maxDegreeOfParallelism); - var processingTasks = new List(); try { - // Create a new service scope to get fresh instances of services with scoped DbContext using var scope = _serviceProvider.CreateScope(); var backtester = scope.ServiceProvider.GetRequiredService(); + var jobRepository = scope.ServiceProvider.GetRequiredService(); - // Get pending bundle backtest requests + // Get pending bundle backtest requests (these need jobs created) var pendingRequests = await backtester.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Pending); @@ -53,24 +45,20 @@ public class BundleBacktestWorker : BaseWorker if (cancellationToken.IsCancellationRequested) break; - await semaphore.WaitAsync(cancellationToken); - var task = Task.Run(async () => + try { - try - { - await ProcessBundleRequest(bundleRequest, cancellationToken); - } - finally - { - semaphore.Release(); - } - }, cancellationToken); - processingTasks.Add(task); + await ProcessBundleRequest(bundleRequest, backtester, jobRepository); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing bundle request {RequestId}", bundleRequest.RequestId); + // Mark bundle as failed so it doesn't get stuck in Pending status + bundleRequest.Status = BundleBacktestRequestStatus.Failed; + bundleRequest.ErrorMessage = $"Failed to create jobs: {ex.Message}"; + bundleRequest.UpdatedAt = DateTime.UtcNow; + await backtester.UpdateBundleBacktestRequestAsync(bundleRequest); + } } - - await Task.WhenAll(processingTasks); - - await RetryUnfinishedBacktestsInFailedBundles(backtester, cancellationToken); } catch (Exception ex) { @@ -79,367 +67,50 @@ public class BundleBacktestWorker : BaseWorker } } - private async Task ProcessBundleRequest(BundleBacktestRequest bundleRequest, CancellationToken cancellationToken) - { - // Create a new service scope for this task to avoid DbContext concurrency issues - using var scope = _serviceProvider.CreateScope(); - var backtester = scope.ServiceProvider.GetRequiredService(); - - try - { - _logger.LogInformation("Starting to process bundle backtest request {RequestId}", bundleRequest.RequestId); - - // Update status to running - bundleRequest.Status = BundleBacktestRequestStatus.Running; - await backtester.UpdateBundleBacktestRequestAsync(bundleRequest); - - // Generate backtest requests from the new variant structure - var backtestRequests = await GenerateBacktestRequestsFromVariants(bundleRequest); - if (backtestRequests == null || !backtestRequests.Any()) - { - throw new InvalidOperationException("Failed to generate backtest requests from variants"); - } - - // Process each backtest request - for (int i = 0; i < backtestRequests.Count; i++) - { - if (cancellationToken.IsCancellationRequested) - break; - - try - { - var runBacktestRequest = backtestRequests[i]; - // Update current backtest being processed - bundleRequest.CurrentBacktest = $"Backtest {i + 1} of {backtestRequests.Count}"; - await backtester.UpdateBundleBacktestRequestAsync(bundleRequest); - - // Run the backtest directly with the strongly-typed request - var backtestId = await RunSingleBacktest(backtester, runBacktestRequest, bundleRequest, i, - cancellationToken); - if (!string.IsNullOrEmpty(backtestId)) - { - bundleRequest.Results.Add(backtestId); - } - - // Update progress - bundleRequest.CompletedBacktests++; - await backtester.UpdateBundleBacktestRequestAsync(bundleRequest); - - _logger.LogInformation("Completed backtest {Index} for bundle request {RequestId}", - i + 1, bundleRequest.RequestId); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error processing backtest {Index} for bundle request {RequestId}", - i + 1, bundleRequest.RequestId); - bundleRequest.FailedBacktests++; - SentrySdk.CaptureException(ex); - await backtester.UpdateBundleBacktestRequestAsync(bundleRequest); - } - } - - // Update final status and send notifications - if (bundleRequest.FailedBacktests == 0) - { - bundleRequest.Status = BundleBacktestRequestStatus.Completed; - // Send Telegram message to the user's channelId - await NotifyUser(bundleRequest); - } - else if (bundleRequest.CompletedBacktests == 0) - { - bundleRequest.Status = BundleBacktestRequestStatus.Failed; - bundleRequest.ErrorMessage = "All backtests failed"; - } - else - { - bundleRequest.Status = BundleBacktestRequestStatus.Completed; - bundleRequest.ErrorMessage = $"{bundleRequest.FailedBacktests} backtests failed"; - // Send Telegram message to the user's channelId even with partial failures - await NotifyUser(bundleRequest); - } - - bundleRequest.CompletedAt = DateTime.UtcNow; - bundleRequest.CurrentBacktest = null; - await backtester.UpdateBundleBacktestRequestAsync(bundleRequest); - - _logger.LogInformation("Completed processing bundle backtest request {RequestId} with status {Status}", - bundleRequest.RequestId, bundleRequest.Status); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error processing bundle backtest request {RequestId}", bundleRequest.RequestId); - - bundleRequest.Status = BundleBacktestRequestStatus.Failed; - bundleRequest.ErrorMessage = ex.Message; - bundleRequest.CompletedAt = DateTime.UtcNow; - await backtester.UpdateBundleBacktestRequestAsync(bundleRequest); - } - } - - private async Task NotifyUser(BundleBacktestRequest bundleRequest) - { - if (bundleRequest.User?.TelegramChannel != null) - { - var message = - $"⚠️ Bundle backtest '{bundleRequest.Name}' (ID: {bundleRequest.RequestId}) completed with {bundleRequest.FailedBacktests} failed backtests."; - await _messengerService.SendMessage(message, bundleRequest.User.TelegramChannel); - } - } - - // Change RunSingleBacktest to accept RunBacktestRequest directly - private async Task RunSingleBacktest(IBacktester backtester, RunBacktestRequest runBacktestRequest, + private async Task ProcessBundleRequest( BundleBacktestRequest bundleRequest, - int index, CancellationToken cancellationToken) + IBacktester backtester, + IJobRepository jobRepository) { - if (runBacktestRequest == null || runBacktestRequest.Config == null) + // Check if jobs already exist for this bundle + var existingJobs = (await jobRepository.GetByBundleRequestIdAsync(bundleRequest.RequestId)).ToList(); + + if (existingJobs.Any()) { - _logger.LogError("Invalid RunBacktestRequest in bundle (null config)"); - return string.Empty; - } - - // Map MoneyManagement - MoneyManagement moneyManagement = null; - if (!string.IsNullOrEmpty(runBacktestRequest.Config.MoneyManagementName)) - { - // In worker context, we cannot resolve by name (no user/db), so skip or set null - // Optionally, log a warning - _logger.LogWarning("MoneyManagementName provided but cannot resolve in worker context: {Name}", - (string)runBacktestRequest.Config.MoneyManagementName); - } - else if (runBacktestRequest.Config.MoneyManagement != null) - { - var mmReq = runBacktestRequest.Config.MoneyManagement; - moneyManagement = new MoneyManagement + _logger.LogInformation( + "Bundle request {RequestId} already has {JobCount} jobs. Skipping job creation.", + bundleRequest.RequestId, existingJobs.Count); + + // Ensure bundle is in Running status if jobs exist + if (bundleRequest.Status == BundleBacktestRequestStatus.Pending) { - Name = mmReq.Name, - Timeframe = mmReq.Timeframe, - StopLoss = mmReq.StopLoss, - TakeProfit = mmReq.TakeProfit, - Leverage = mmReq.Leverage - }; - moneyManagement.FormatPercentage(); - } - - // Map Scenario - LightScenario scenario = null; - if (runBacktestRequest.Config.Scenario != null) - { - var sReq = runBacktestRequest.Config.Scenario; - scenario = new LightScenario(sReq.Name, sReq.LookbackPeriod) - { - Indicators = sReq.Indicators?.Select(i => new LightIndicator(i.Name, i.Type) - { - MinimumHistory = i.MinimumHistory, - Period = i.Period, - FastPeriods = i.FastPeriods, - SlowPeriods = i.SlowPeriods, - SignalPeriods = i.SignalPeriods, - Multiplier = i.Multiplier, - SmoothPeriods = i.SmoothPeriods, - StochPeriods = i.StochPeriods, - CyclePeriods = i.CyclePeriods - }).ToList() ?? new List() - }; - } - - // Map TradingBotConfig - var backtestConfig = new TradingBotConfig - { - AccountName = runBacktestRequest.Config.AccountName, - MoneyManagement = moneyManagement, - Ticker = runBacktestRequest.Config.Ticker, - ScenarioName = runBacktestRequest.Config.ScenarioName, - Scenario = scenario, - Timeframe = runBacktestRequest.Config.Timeframe, - IsForWatchingOnly = runBacktestRequest.Config.IsForWatchingOnly, - BotTradingBalance = runBacktestRequest.Config.BotTradingBalance, - TradingType = TradingType.BacktestFutures, - CooldownPeriod = runBacktestRequest.Config.CooldownPeriod ?? 1, - MaxLossStreak = runBacktestRequest.Config.MaxLossStreak, - MaxPositionTimeHours = runBacktestRequest.Config.MaxPositionTimeHours, - FlipOnlyWhenInProfit = runBacktestRequest.Config.FlipOnlyWhenInProfit, - FlipPosition = runBacktestRequest.Config.FlipPosition, - Name = $"{bundleRequest.Name} #{index + 1}", - CloseEarlyWhenProfitable = runBacktestRequest.Config.CloseEarlyWhenProfitable, - UseSynthApi = runBacktestRequest.Config.UseSynthApi, - UseForPositionSizing = runBacktestRequest.Config.UseForPositionSizing, - UseForSignalFiltering = runBacktestRequest.Config.UseForSignalFiltering, - UseForDynamicStopLoss = runBacktestRequest.Config.UseForDynamicStopLoss - }; - - // Run the backtest (no user context) - var result = await backtester.RunTradingBotBacktest( - backtestConfig, - runBacktestRequest.StartDate, - runBacktestRequest.EndDate, - bundleRequest.User, // No user context in worker - true, - runBacktestRequest.WithCandles, - bundleRequest.RequestId.ToString() // Use bundleRequestId as requestId for traceability - ); - - _logger.LogInformation("Processed backtest for bundle request {RequestId}", bundleRequest.RequestId); - // Assume the backtest is created and you have its ID (e.g., backtest.Id) - // Return the backtest ID - return result.Id; - } - - private async Task RetryUnfinishedBacktestsInFailedBundles(IBacktester backtester, - CancellationToken cancellationToken) - { - var failedBundles = await backtester.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Failed); - foreach (var failedBundle in failedBundles) - { - if (cancellationToken.IsCancellationRequested) - break; - - // Use Results property to determine which backtests need to be retried - var succeededIds = new HashSet(failedBundle.Results ?? new List()); - - // Generate backtest requests from the new variant structure - var originalRequests = await GenerateBacktestRequestsFromVariants(failedBundle); - if (originalRequests == null || !originalRequests.Any()) continue; - - for (int i = failedBundle.CompletedBacktests; i < originalRequests.Count; i++) - { - var expectedId = /* logic to compute expected backtest id for this request */ string.Empty; - // If this backtest was not run or did not succeed, re-run it - if (!succeededIds.Contains(expectedId)) - { - var backtestId = await RunSingleBacktest(backtester, originalRequests[i], failedBundle, i, - cancellationToken); - if (!string.IsNullOrEmpty(backtestId)) - { - failedBundle.Results?.Add(backtestId); - failedBundle.CompletedBacktests++; - await backtester.UpdateBundleBacktestRequestAsync(failedBundle); - } - } - } - - // If all backtests succeeded, update the bundle status - if (failedBundle.CompletedBacktests == originalRequests.Count) - { - failedBundle.Status = BundleBacktestRequestStatus.Completed; - failedBundle.ErrorMessage = null; // Clear any previous error - failedBundle.CompletedAt = DateTime.UtcNow; - await backtester.UpdateBundleBacktestRequestAsync(failedBundle); - - // Notify user about successful retry - await NotifyUser(failedBundle); - } - else - { - _logger.LogWarning("Bundle {RequestId} still has unfinished backtests after retry", - failedBundle.RequestId); - } - } - } - - /// - /// Generates individual backtest requests from variant configuration - /// - private async Task> GenerateBacktestRequestsFromVariants( - BundleBacktestRequest bundleRequest) - { - try - { - // Deserialize the variant configurations - var universalConfig = - JsonSerializer.Deserialize(bundleRequest.UniversalConfigJson); - var dateTimeRanges = JsonSerializer.Deserialize>(bundleRequest.DateTimeRangesJson); - var moneyManagementVariants = - JsonSerializer.Deserialize>(bundleRequest.MoneyManagementVariantsJson); - var tickerVariants = JsonSerializer.Deserialize>(bundleRequest.TickerVariantsJson); - - if (universalConfig == null || dateTimeRanges == null || moneyManagementVariants == null || - tickerVariants == null) - { - _logger.LogError("Failed to deserialize variant configurations for bundle request {RequestId}", + bundleRequest.Status = BundleBacktestRequestStatus.Running; + bundleRequest.TotalBacktests = existingJobs.Count; + bundleRequest.UpdatedAt = DateTime.UtcNow; + await backtester.UpdateBundleBacktestRequestAsync(bundleRequest); + _logger.LogInformation( + "Updated bundle request {RequestId} to Running status", bundleRequest.RequestId); - return new List(); } - - // Get the first account for the user using AccountService - using var scope = _serviceProvider.CreateScope(); - var accountService = scope.ServiceProvider.GetRequiredService(); - var accounts = - await accountService.GetAccountsByUserAsync(bundleRequest.User, hideSecrets: true, getBalance: false); - var firstAccount = accounts.FirstOrDefault(); - - if (firstAccount == null) - { - _logger.LogError("No accounts found for user {UserId} in bundle request {RequestId}", - bundleRequest.User.Id, bundleRequest.RequestId); - return new List(); - } - - var backtestRequests = new List(); - - foreach (var dateRange in dateTimeRanges) - { - foreach (var mmVariant in moneyManagementVariants) - { - foreach (var ticker in tickerVariants) - { - var config = new TradingBotConfigRequest - { - AccountName = firstAccount.Name, - Ticker = ticker, - Timeframe = universalConfig.Timeframe, - IsForWatchingOnly = universalConfig.IsForWatchingOnly, - BotTradingBalance = universalConfig.BotTradingBalance, - Name = - $"{universalConfig.BotName}_{ticker}_{dateRange.StartDate:yyyyMMdd}_{dateRange.EndDate:yyyyMMdd}", - FlipPosition = universalConfig.FlipPosition, - CooldownPeriod = universalConfig.CooldownPeriod, - MaxLossStreak = universalConfig.MaxLossStreak, - Scenario = universalConfig.Scenario, - ScenarioName = universalConfig.ScenarioName, - MoneyManagement = mmVariant.MoneyManagement, - MaxPositionTimeHours = universalConfig.MaxPositionTimeHours, - CloseEarlyWhenProfitable = universalConfig.CloseEarlyWhenProfitable, - FlipOnlyWhenInProfit = universalConfig.FlipOnlyWhenInProfit, - UseSynthApi = universalConfig.UseSynthApi, - UseForPositionSizing = universalConfig.UseForPositionSizing, - UseForSignalFiltering = universalConfig.UseForSignalFiltering, - UseForDynamicStopLoss = universalConfig.UseForDynamicStopLoss, - TradingType = universalConfig.TradingType - }; - - var backtestRequest = new RunBacktestRequest - { - Config = config, - StartDate = dateRange.StartDate, - EndDate = dateRange.EndDate, - Balance = universalConfig.BotTradingBalance, - WatchOnly = universalConfig.WatchOnly, - Save = universalConfig.Save, - WithCandles = false, // Bundle backtests never return candles - MoneyManagement = mmVariant.MoneyManagement != null - ? new MoneyManagement - { - Name = mmVariant.MoneyManagement.Name, - Timeframe = mmVariant.MoneyManagement.Timeframe, - StopLoss = mmVariant.MoneyManagement.StopLoss, - TakeProfit = mmVariant.MoneyManagement.TakeProfit, - Leverage = mmVariant.MoneyManagement.Leverage - } - : null - }; - - backtestRequests.Add(backtestRequest); - } - } - } - - return backtestRequests; - } - catch (Exception ex) - { - _logger.LogError(ex, "Error generating backtest requests from variants for bundle request {RequestId}", - bundleRequest.RequestId); - return new List(); + return; } + + _logger.LogInformation("Creating jobs for bundle request {RequestId}", bundleRequest.RequestId); + + // Create all jobs for this bundle using the service method + await backtester.CreateJobsForBundleRequestAsync(bundleRequest); + + // Get the created jobs to set TotalBacktests + var createdJobs = (await jobRepository.GetByBundleRequestIdAsync(bundleRequest.RequestId)).ToList(); + + // Update bundle status to Running and set TotalBacktests + bundleRequest.Status = BundleBacktestRequestStatus.Running; + bundleRequest.TotalBacktests = createdJobs.Count; + bundleRequest.UpdatedAt = DateTime.UtcNow; + await backtester.UpdateBundleBacktestRequestAsync(bundleRequest); + + _logger.LogInformation( + "Successfully created {JobCount} jobs for bundle request {RequestId}. Bundle is now Running.", + createdJobs.Count, bundleRequest.RequestId); } } \ No newline at end of file