From 0a676d1fb799093bd3421f6d9fbbe57da57a5b96 Mon Sep 17 00:00:00 2001 From: cryptooda Date: Tue, 11 Nov 2025 05:31:06 +0700 Subject: [PATCH] Add the bundle healthcheck worker --- .../BundleBacktestHealthCheckWorker.cs | 543 ++++++++++++++++++ 1 file changed, 543 insertions(+) create mode 100644 src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs diff --git a/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs b/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs new file mode 100644 index 00000000..7fdfdac0 --- /dev/null +++ b/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs @@ -0,0 +1,543 @@ +using System.Text.Json; +using Managing.Application.Abstractions.Repositories; +using Managing.Application.Abstractions.Services; +using Managing.Application.Backtests; +using Managing.Domain.Backtests; +using Managing.Domain.Bots; +using Managing.Domain.MoneyManagements; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using static Managing.Common.Enums; + +namespace Managing.Application.Workers; + +/// +/// Background worker that monitors BundleBacktestRequest health. +/// Runs every 30 minutes to detect and handle stuck bundles that are missing jobs or have stalled progress. +/// +public class BundleBacktestHealthCheckWorker : BackgroundService +{ + private readonly IServiceScopeFactory _scopeFactory; + private readonly ILogger _logger; + private readonly TimeSpan _checkInterval = TimeSpan.FromMinutes(30); + private readonly TimeSpan _inactiveThreshold = TimeSpan.FromMinutes(30); // Check bundles inactive for 30+ minutes + private readonly TimeSpan _stuckThreshold = TimeSpan.FromHours(2); // Consider bundle stuck if no progress for 2 hours + private readonly IMessengerService _messengerService; + + public BundleBacktestHealthCheckWorker( + IServiceScopeFactory scopeFactory, + ILogger logger, + IMessengerService messengerService) + { + _scopeFactory = scopeFactory; + _logger = logger; + _messengerService = messengerService; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation( + "BundleBacktestHealthCheckWorker starting. Check interval: {CheckInterval} minutes, Inactive threshold: {InactiveThreshold} minutes, Stuck threshold: {StuckThreshold} hours", + _checkInterval.TotalMinutes, _inactiveThreshold.TotalMinutes, _stuckThreshold.TotalHours); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await CheckBundleHealthAsync(stoppingToken); + } + catch (OperationCanceledException) + { + _logger.LogInformation("BundleBacktestHealthCheckWorker stopping due to cancellation"); + break; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in BundleBacktestHealthCheckWorker health check cycle"); + } + + try + { + await Task.Delay(_checkInterval, stoppingToken); + } + catch (OperationCanceledException) + { + _logger.LogInformation("BundleBacktestHealthCheckWorker stopping during delay"); + break; + } + } + + _logger.LogInformation("BundleBacktestHealthCheckWorker stopped"); + } + + private async Task CheckBundleHealthAsync(CancellationToken stoppingToken) + { + using var scope = _scopeFactory.CreateScope(); + var backtestRepository = scope.ServiceProvider.GetRequiredService(); + var jobRepository = scope.ServiceProvider.GetRequiredService(); + + _logger.LogInformation("Starting bundle health check..."); + + // Check bundles in Pending and Running status + var pendingBundles = await backtestRepository.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Pending); + var runningBundles = await backtestRepository.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Running); + + // Only check bundles that haven't been updated in more than the inactive threshold + var inactiveThresholdTime = DateTime.UtcNow.Add(-_inactiveThreshold); + var allBundlesToCheck = pendingBundles.Concat(runningBundles) + .Where(b => b.UpdatedAt < inactiveThresholdTime) + .ToList(); + + _logger.LogInformation( + "Found {TotalCount} bundles (from {PendingTotal} pending and {RunningTotal} running) that haven't been updated in >30 minutes", + allBundlesToCheck.Count, pendingBundles.Count(), runningBundles.Count()); + + var stuckBundlesCount = 0; + var missingJobsCount = 0; + var healthyBundlesCount = 0; + + foreach (var bundle in allBundlesToCheck) + { + if (stoppingToken.IsCancellationRequested) + break; + + try + { + var (stuckCount, missingJobs, healthyCount) = await CheckSingleBundleHealthAsync( + bundle, backtestRepository, jobRepository); + + stuckBundlesCount += stuckCount; + missingJobsCount += missingJobs; + healthyBundlesCount += healthyCount; + } + catch (Exception ex) + { + _logger.LogError(ex, + "Error checking health for bundle {BundleRequestId}", + bundle.RequestId); + } + } + + _logger.LogInformation( + "Bundle health check completed. Healthy: {HealthyCount}, Stuck: {StuckCount}, Missing Jobs: {MissingJobsCount}", + healthyBundlesCount, stuckBundlesCount, missingJobsCount); + } + + private async Task<(int StuckCount, int MissingJobsCount, int HealthyCount)> CheckSingleBundleHealthAsync( + BundleBacktestRequest bundle, + IBacktestRepository backtestRepository, + IJobRepository jobRepository) + { + // Get all jobs associated with this bundle + var jobs = (await jobRepository.GetByBundleRequestIdAsync(bundle.RequestId)).ToList(); + + // Calculate expected number of jobs based on TotalBacktests + var expectedJobCount = bundle.TotalBacktests; + var actualJobCount = jobs.Count; + + _logger.LogDebug( + "Bundle {BundleRequestId} ({Status}): Expected {Expected} jobs, Found {Actual} jobs, Completed {Completed}/{Total}", + bundle.RequestId, bundle.Status, expectedJobCount, actualJobCount, bundle.CompletedBacktests, bundle.TotalBacktests); + + // Check 1: Missing jobs - bundle has no jobs or fewer jobs than expected + if (actualJobCount == 0 || actualJobCount < expectedJobCount) + { + await HandleBundleWithMissingJobsAsync(bundle, actualJobCount, expectedJobCount, backtestRepository); + return (StuckCount: 0, MissingJobsCount: 1, HealthyCount: 0); + } + + // Check 2: Stuck bundle - Running for too long without progress + if (bundle.Status == BundleBacktestRequestStatus.Running) + { + var timeSinceUpdate = DateTime.UtcNow - bundle.UpdatedAt; + + if (timeSinceUpdate > _stuckThreshold) + { + await HandleStuckBundleAsync(bundle, timeSinceUpdate, jobs, backtestRepository, jobRepository); + return (StuckCount: 1, MissingJobsCount: 0, HealthyCount: 0); + } + } + + // Check 3: Pending bundle that's been pending too long (jobs created but never started) + if (bundle.Status == BundleBacktestRequestStatus.Pending) + { + var timeSinceCreation = DateTime.UtcNow - bundle.CreatedAt; + + // If bundle has been pending for more than the stuck threshold, check job statuses + if (timeSinceCreation > _stuckThreshold) + { + var allJobsPending = jobs.All(j => j.Status == JobStatus.Pending); + var hasFailedJobs = jobs.Any(j => j.Status == JobStatus.Failed); + + if (allJobsPending || hasFailedJobs) + { + await HandleStalePendingBundleAsync(bundle, timeSinceCreation, jobs, backtestRepository, jobRepository); + return (StuckCount: 1, MissingJobsCount: 0, HealthyCount: 0); + } + } + } + + return (StuckCount: 0, MissingJobsCount: 0, HealthyCount: 1); + } + + private async Task HandleBundleWithMissingJobsAsync( + BundleBacktestRequest bundle, + int actualJobCount, + int expectedJobCount, + IBacktestRepository backtestRepository) + { + _logger.LogWarning( + "⚠️ Bundle {BundleRequestId} ({Status}) is missing jobs. Expected: {Expected}, Actual: {Actual}. " + + "Attempting to recreate missing jobs.", + bundle.RequestId, bundle.Status, expectedJobCount, actualJobCount); + + try + { + using var scope = _scopeFactory.CreateScope(); + var jobService = scope.ServiceProvider.GetRequiredService(); + var jobRepository = scope.ServiceProvider.GetRequiredService(); + + // Generate all backtest requests from bundle variants + var allBacktestRequests = await GenerateBacktestRequestsFromVariants(bundle); + + if (allBacktestRequests == null || !allBacktestRequests.Any()) + { + _logger.LogError( + "Failed to generate backtest requests from variants for bundle {BundleRequestId}", + bundle.RequestId); + + bundle.ErrorMessage = $"Failed to regenerate jobs: Could not generate backtest requests from variants."; + bundle.UpdatedAt = DateTime.UtcNow; + await backtestRepository.UpdateBundleBacktestRequestAsync(bundle); + return; + } + + // Get existing jobs for this bundle + var existingJobs = (await jobRepository.GetByBundleRequestIdAsync(bundle.RequestId)).ToList(); + + // Create a set of existing job config signatures for quick lookup + var existingJobSignatures = existingJobs + .Select(j => + { + var config = JsonSerializer.Deserialize(j.ConfigJson); + return config != null + ? $"{config.Ticker}_{config.Timeframe}_{config.Name}_{j.StartDate:yyyyMMdd}_{j.EndDate:yyyyMMdd}" + : null; + }) + .Where(s => s != null) + .ToHashSet(); + + // Filter out backtest requests that already have jobs + var missingRequests = allBacktestRequests + .Where(req => + { + var signature = $"{req.Config.Ticker}_{req.Config.Timeframe}_{req.Config.Name}_{req.StartDate:yyyyMMdd}_{req.EndDate:yyyyMMdd}"; + return !existingJobSignatures.Contains(signature); + }) + .ToList(); + + if (missingRequests.Any()) + { + _logger.LogInformation( + "Creating {MissingCount} missing jobs for bundle {BundleRequestId}", + missingRequests.Count, bundle.RequestId); + + // Create jobs for missing requests + var createdJobs = await jobService.CreateBundleJobsAsync(bundle, missingRequests); + + _logger.LogInformation( + "Successfully created {CreatedCount} missing jobs for bundle {BundleRequestId}", + createdJobs.Count, bundle.RequestId); + + // Update bundle status back to Pending if it was in a failed state, or keep Running if it was running + if (bundle.Status == BundleBacktestRequestStatus.Failed) + { + bundle.Status = BundleBacktestRequestStatus.Pending; + bundle.ErrorMessage = null; + bundle.CompletedAt = null; + } + + bundle.UpdatedAt = DateTime.UtcNow; + await backtestRepository.UpdateBundleBacktestRequestAsync(bundle); + + _logger.LogInformation( + "Recreated {CreatedCount} missing jobs for bundle {BundleRequestId}. Bundle status: {Status}", + createdJobs.Count, bundle.RequestId, bundle.Status); + } + else + { + _logger.LogWarning( + "No missing jobs found to recreate for bundle {BundleRequestId}. All {ExpectedCount} jobs already exist.", + bundle.RequestId, expectedJobCount); + + bundle.UpdatedAt = DateTime.UtcNow; + await backtestRepository.UpdateBundleBacktestRequestAsync(bundle); + } + } + catch (Exception ex) + { + _logger.LogError(ex, + "Error recreating missing jobs for bundle {BundleRequestId}", + bundle.RequestId); + + bundle.ErrorMessage = $"Error recreating jobs: {ex.Message}"; + bundle.UpdatedAt = DateTime.UtcNow; + await backtestRepository.UpdateBundleBacktestRequestAsync(bundle); + } + } + + private async Task> GenerateBacktestRequestsFromVariants( + BundleBacktestRequest bundleRequest) + { + // Generate backtest requests from bundle variants (duplicated logic from Backtester since it's private) + try + { + // Deserialize the variant configurations + var universalConfig = + JsonSerializer.Deserialize(bundleRequest.UniversalConfigJson); + var dateTimeRanges = JsonSerializer.Deserialize>(bundleRequest.DateTimeRangesJson); + var moneyManagementVariants = + JsonSerializer.Deserialize>(bundleRequest.MoneyManagementVariantsJson); + var tickerVariants = JsonSerializer.Deserialize>(bundleRequest.TickerVariantsJson); + + if (universalConfig == null || dateTimeRanges == null || moneyManagementVariants == null || + tickerVariants == null) + { + _logger.LogError("Failed to deserialize variant configurations for bundle request {RequestId}", + bundleRequest.RequestId); + return new List(); + } + + // Get the first account for the user + using var scope = _scopeFactory.CreateScope(); + var accountService = scope.ServiceProvider.GetRequiredService(); + var accounts = await accountService.GetAccountsByUserAsync(bundleRequest.User, hideSecrets: true, getBalance: false); + var firstAccount = accounts.FirstOrDefault(); + + if (firstAccount == null) + { + _logger.LogError("No accounts found for user {UserId} in bundle request {RequestId}", + bundleRequest.User.Id, bundleRequest.RequestId); + return new List(); + } + + var backtestRequests = new List(); + + foreach (var dateRange in dateTimeRanges) + { + foreach (var mmVariant in moneyManagementVariants) + { + foreach (var ticker in tickerVariants) + { + var config = new TradingBotConfigRequest + { + AccountName = firstAccount.Name, + Ticker = ticker, + Timeframe = universalConfig.Timeframe, + IsForWatchingOnly = universalConfig.IsForWatchingOnly, + BotTradingBalance = universalConfig.BotTradingBalance, + Name = $"{bundleRequest.Name} #{backtestRequests.Count + 1}", + FlipPosition = universalConfig.FlipPosition, + CooldownPeriod = universalConfig.CooldownPeriod, + MaxLossStreak = universalConfig.MaxLossStreak, + Scenario = universalConfig.Scenario, + ScenarioName = universalConfig.ScenarioName, + MoneyManagement = mmVariant.MoneyManagement, + MaxPositionTimeHours = universalConfig.MaxPositionTimeHours, + CloseEarlyWhenProfitable = universalConfig.CloseEarlyWhenProfitable, + FlipOnlyWhenInProfit = universalConfig.FlipOnlyWhenInProfit, + UseSynthApi = universalConfig.UseSynthApi, + UseForPositionSizing = universalConfig.UseForPositionSizing, + UseForSignalFiltering = universalConfig.UseForSignalFiltering, + UseForDynamicStopLoss = universalConfig.UseForDynamicStopLoss + }; + + var backtestRequest = new RunBacktestRequest + { + Config = config, + StartDate = dateRange.StartDate, + EndDate = dateRange.EndDate, + Balance = universalConfig.BotTradingBalance, + WatchOnly = universalConfig.WatchOnly, + Save = universalConfig.Save, + WithCandles = false, + MoneyManagement = mmVariant.MoneyManagement != null + ? new MoneyManagement + { + Name = mmVariant.MoneyManagement.Name, + Timeframe = mmVariant.MoneyManagement.Timeframe, + StopLoss = mmVariant.MoneyManagement.StopLoss, + TakeProfit = mmVariant.MoneyManagement.TakeProfit, + Leverage = mmVariant.MoneyManagement.Leverage + } + : null + }; + + backtestRequests.Add(backtestRequest); + } + } + } + + return backtestRequests; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error generating backtest requests from variants for bundle request {RequestId}", + bundleRequest.RequestId); + return new List(); + } + } + + private async Task HandleStuckBundleAsync( + BundleBacktestRequest bundle, + TimeSpan timeSinceUpdate, + List jobs, + IBacktestRepository backtestRepository, + IJobRepository jobRepository) + { + _logger.LogWarning( + "⚠️ Bundle {BundleRequestId} is stuck in Running status. No progress for {TimeSinceUpdate} hours. " + + "Completed: {Completed}/{Total}", + bundle.RequestId, timeSinceUpdate.TotalHours, bundle.CompletedBacktests, bundle.TotalBacktests); + + // Check job statuses to understand why bundle is stuck + var jobStatusSummary = jobs + .GroupBy(j => j.Status) + .Select(g => $"{g.Key}: {g.Count()}") + .ToList(); + + _logger.LogInformation( + "Job status summary for stuck bundle {BundleRequestId}: {JobStatusSummary}", + bundle.RequestId, string.Join(", ", jobStatusSummary)); + + // Check if all jobs are actually completed but bundle wasn't updated + var completedJobs = jobs.Count(j => j.Status == JobStatus.Completed); + var failedJobs = jobs.Count(j => j.Status == JobStatus.Failed); + var totalProcessedJobs = completedJobs + failedJobs; + + if (totalProcessedJobs == bundle.TotalBacktests) + { + // All jobs are done, update bundle status accordingly + _logger.LogInformation( + "Bundle {BundleRequestId} has all jobs completed ({Completed} completed, {Failed} failed). Updating bundle status.", + bundle.RequestId, completedJobs, failedJobs); + + bundle.Status = failedJobs == 0 ? BundleBacktestRequestStatus.Completed : BundleBacktestRequestStatus.Completed; + bundle.CompletedBacktests = completedJobs; + bundle.FailedBacktests = failedJobs; + bundle.CompletedAt = DateTime.UtcNow; + bundle.UpdatedAt = DateTime.UtcNow; + + if (failedJobs > 0) + { + bundle.ErrorMessage = $"{failedJobs} backtests failed"; + } + } + else + { + // Some jobs are still pending or running - bundle is genuinely stuck + // Reset any stale running jobs back to pending + var runningJobs = jobs.Where(j => j.Status == JobStatus.Running).ToList(); + + foreach (var job in runningJobs) + { + var timeSinceJobHeartbeat = job.LastHeartbeat.HasValue + ? DateTime.UtcNow - job.LastHeartbeat.Value + : DateTime.UtcNow - job.CreatedAt; + + if (timeSinceJobHeartbeat > TimeSpan.FromMinutes(30)) + { + _logger.LogInformation( + "Resetting stale job {JobId} for bundle {BundleRequestId} back to Pending", + job.Id, bundle.RequestId); + + job.Status = JobStatus.Pending; + job.AssignedWorkerId = null; + job.LastHeartbeat = null; + await jobRepository.UpdateAsync(job); + } + } + + // Update bundle timestamp to give it another chance + bundle.UpdatedAt = DateTime.UtcNow; + bundle.ErrorMessage = $"Bundle was stuck. Reset {runningJobs.Count(j => j.Status == JobStatus.Pending)} stale jobs to pending."; + } + + await backtestRepository.UpdateBundleBacktestRequestAsync(bundle); + + _logger.LogInformation( + "Handled stuck bundle {BundleRequestId}. New status: {Status}", + bundle.RequestId, bundle.Status); + } + + private async Task HandleStalePendingBundleAsync( + BundleBacktestRequest bundle, + TimeSpan timeSinceCreation, + List jobs, + IBacktestRepository backtestRepository, + IJobRepository jobRepository) + { + _logger.LogWarning( + "⚠️ Bundle {BundleRequestId} has been in Pending status for {TimeSinceCreation} hours. " + + "Jobs: {JobCount}, All Pending: {AllPending}", + bundle.RequestId, timeSinceCreation.TotalHours, jobs.Count, jobs.All(j => j.Status == JobStatus.Pending)); + + var hasFailedJobs = jobs.Any(j => j.Status == JobStatus.Failed); + + if (hasFailedJobs) + { + // If all jobs failed, mark bundle as failed + var failedJobCount = jobs.Count(j => j.Status == JobStatus.Failed); + + if (failedJobCount == bundle.TotalBacktests) + { + _logger.LogInformation( + "All {FailedCount} jobs for bundle {BundleRequestId} have failed. Marking bundle as Failed.", + failedJobCount, bundle.RequestId); + + bundle.Status = BundleBacktestRequestStatus.Failed; + bundle.FailedBacktests = failedJobCount; + bundle.ErrorMessage = $"All {failedJobCount} backtest jobs failed"; + bundle.CompletedAt = DateTime.UtcNow; + bundle.UpdatedAt = DateTime.UtcNow; + + await backtestRepository.UpdateBundleBacktestRequestAsync(bundle); + await NotifyUserOfFailedBundle(bundle, "All jobs failed"); + } + } + else + { + // All jobs are pending - just update the timestamp to avoid repeated warnings + bundle.UpdatedAt = DateTime.UtcNow; + await backtestRepository.UpdateBundleBacktestRequestAsync(bundle); + + _logger.LogInformation( + "Bundle {BundleRequestId} has {JobCount} pending jobs waiting to be processed. Updated timestamp.", + bundle.RequestId, jobs.Count); + } + } + + private async Task NotifyUserOfFailedBundle(BundleBacktestRequest bundle, string reason) + { + try + { + if (bundle.User?.TelegramChannel != null) + { + var message = $"❌ Bundle backtest '{bundle.Name}' (ID: {bundle.RequestId}) failed: {reason}"; + await _messengerService.SendMessage(message, bundle.User.TelegramChannel); + + _logger.LogInformation( + "Sent failure notification to user {UserId} for bundle {BundleRequestId}", + bundle.User.Id, bundle.RequestId); + } + } + catch (Exception ex) + { + _logger.LogError(ex, + "Failed to send notification to user for bundle {BundleRequestId}", + bundle.RequestId); + } + } +} +