diff --git a/src/Managing.Application/Backtests/BacktestExecutor.cs b/src/Managing.Application/Backtests/BacktestExecutor.cs index 60b6d602..4a9ff42b 100644 --- a/src/Managing.Application/Backtests/BacktestExecutor.cs +++ b/src/Managing.Application/Backtests/BacktestExecutor.cs @@ -141,7 +141,6 @@ public class BacktestExecutor var candlesProcessed = 0; var lastProgressUpdate = DateTime.UtcNow; - const int progressUpdateIntervalMs = 5000; // Update progress every 5 seconds to reduce database load const int walletCheckInterval = 10; // Check wallet balance every N candles instead of every candle var lastWalletCheck = 0; var lastWalletBalance = config.BotTradingBalance; @@ -164,13 +163,58 @@ public class BacktestExecutor var loopOverheadTotalTime = TimeSpan.Zero; // Process all candles with optimized rolling window approach + var lastHeartbeatTime = DateTime.UtcNow; + var candlesSinceLastProgress = 0; + foreach (var candle in candles) { - var loopStart = Stopwatch.GetTimestamp(); - + var loopStopwatchStart = Stopwatch.GetTimestamp(); // Use for timing measurements + var loopStartTime = DateTime.UtcNow; // Use for timeout checks + var loopStopwatch = Stopwatch.StartNew(); + // Check for cancellation (timeout or shutdown) cancellationToken.ThrowIfCancellationRequested(); + // HEARTBEAT: Check if we've been processing too long on a single candle (potential hang) + if (loopStopwatch.Elapsed > TimeSpan.FromMinutes(5)) // 5 minutes per candle is too long + { + throw new TimeoutException($"Backtest hung on candle {currentCandle + 1}/{totalCandles} at {candle.Date} - processing took {loopStopwatch.Elapsed.TotalMinutes:F1} minutes"); + } + + // MEMORY CHECK: Monitor memory usage to detect leaks + var currentMemory = GC.GetTotalMemory(false); + if (currentMemory > initialMemory * 10) // 10x initial memory usage + { + _logger.LogWarning("High memory usage detected: {CurrentMB:F2}MB (initial: {InitialMB:F2}MB) at candle {Current}/{Total}", + currentMemory / 1024.0 / 1024.0, initialMemory / 1024.0 / 1024.0, currentCandle + 1, totalCandles); + } + + // PROGRESS UPDATE: Ensure progress is reported regularly (every 30 seconds or 1000 candles) + var timeSinceLastProgress = DateTime.UtcNow - lastProgressUpdate; + candlesSinceLastProgress++; + + if (timeSinceLastProgress > TimeSpan.FromSeconds(30) || candlesSinceLastProgress >= 1000) + { + var currentProgress = (int)((double)currentCandle / totalCandles * 100); + if (progressCallback != null) + { + try + { + await progressCallback(Math.Min(currentProgress, 99)); // Never report 100% until actually done + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Progress callback failed at candle {Current}/{Total}", currentCandle + 1, totalCandles); + } + } + lastProgressUpdate = DateTime.UtcNow; + candlesSinceLastProgress = 0; + + var progressMemoryUsage = GC.GetTotalMemory(false); + _logger.LogDebug("Backtest progress: {Current}/{Total} candles ({Progress}%) - Memory: {MemoryMB:F2}MB", + currentCandle + 1, totalCandles, currentProgress, progressMemoryUsage / 1024.0 / 1024.0); + } + // TIMING: Measure rolling window operations var rollingWindowStart = Stopwatch.GetTimestamp(); @@ -187,14 +231,40 @@ public class BacktestExecutor tradingBot.LastCandle = candle; - // Run with optimized backtest path (minimize async calls) + // Run with optimized backtest path (minimize async calls) with timeout protection var signalUpdateStart = Stopwatch.GetTimestamp(); - // Pass List directly - no conversion needed, order is preserved - await tradingBot.UpdateSignals(rollingWindowCandles); + try + { + // Timeout wrapper for signal updates (max 2 minutes per candle) + var signalTimeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + var signalLinkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, signalTimeoutCts.Token); + + // Pass List directly - no conversion needed, order is preserved + await tradingBot.UpdateSignals(rollingWindowCandles).WaitAsync(signalLinkedCts.Token); + signalTimeoutCts.Dispose(); + signalLinkedCts.Dispose(); + } + catch (TimeoutException) + { + throw new TimeoutException($"UpdateSignals timed out on candle {currentCandle + 1}/{totalCandles} at {candle.Date}"); + } signalUpdateTotalTime += Stopwatch.GetElapsedTime(signalUpdateStart); var backtestStepStart = Stopwatch.GetTimestamp(); - await tradingBot.Run(); + try + { + // Timeout wrapper for backtest run (max 3 minutes per candle) + var runTimeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(3)); + var runLinkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, runTimeoutCts.Token); + + await tradingBot.Run().WaitAsync(runLinkedCts.Token); + runTimeoutCts.Dispose(); + runLinkedCts.Dispose(); + } + catch (TimeoutException) + { + throw new TimeoutException($"Run() timed out on candle {currentCandle + 1}/{totalCandles} at {candle.Date}"); + } backtestStepTotalTime += Stopwatch.GetElapsedTime(backtestStepStart); telemetry.TotalBacktestSteps++; @@ -218,40 +288,20 @@ public class BacktestExecutor } } - // Update progress callback if provided (optimized frequency) - var currentPercentage = (currentCandle * 100) / totalCandles; - var timeSinceLastUpdate = (DateTime.UtcNow - lastProgressUpdate).TotalMilliseconds; - if (progressCallback != null && (timeSinceLastUpdate >= progressUpdateIntervalMs || - currentPercentage >= lastLoggedPercentage + 10)) - { - var progressCallbackStart = Stopwatch.GetTimestamp(); - try - { - await progressCallback(currentPercentage); - telemetry.ProgressCallbacksCount++; - } - catch (Exception ex) - { - _logger.LogWarning(ex, "Error in progress callback"); - } - - progressCallbackTotalTime += Stopwatch.GetElapsedTime(progressCallbackStart); - lastProgressUpdate = DateTime.UtcNow; - } - // Track peak memory usage (reduced frequency to minimize GC overhead) if (currentCandle - lastMemoryCheck >= memoryCheckInterval) { - var currentMemory = GC.GetTotalMemory(false); - if (currentMemory > peakMemory) + var memoryCheckValue = GC.GetTotalMemory(false); + if (memoryCheckValue > peakMemory) { - peakMemory = currentMemory; + peakMemory = memoryCheckValue; } lastMemoryCheck = currentCandle; } // Log progress every 10% (reduced frequency) + var currentPercentage = (currentCandle * 100) / totalCandles; if (currentPercentage >= lastLoggedPercentage + 10) { lastLoggedPercentage = currentPercentage; @@ -260,12 +310,34 @@ public class BacktestExecutor currentPercentage, currentCandle, totalCandles); } + // LOOP COMPLETION CHECK: Ensure we haven't spent too long on this single candle + var loopDuration = DateTime.UtcNow - loopStartTime; + if (loopDuration > TimeSpan.FromMinutes(10)) // Entire loop should not take more than 10 minutes + { + _logger.LogWarning("Candle processing took unusually long: {Duration:F1} minutes for candle {Current}/{Total} at {Date}", + loopDuration.TotalMinutes, currentCandle, totalCandles, candle.Date); + } + // TIMING: Calculate loop overhead (everything except signal updates and backtest steps) - var loopTotal = Stopwatch.GetElapsedTime(loopStart); + var loopTotal = Stopwatch.GetElapsedTime(loopStopwatchStart); var signalAndBacktestTime = signalUpdateTotalTime + backtestStepTotalTime; // Note: loopOverheadTotalTime is cumulative, we track it at the end } + // FINAL PROGRESS UPDATE: Ensure 100% is reported when complete + if (progressCallback != null) + { + try + { + await progressCallback(100); + _logger.LogInformation("Backtest completed: 100% ({TotalCandles} candles processed)", totalCandles); + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Final progress callback failed"); + } + } + // Complete candle processing telemetry telemetry.CandleProcessingTime = Stopwatch.GetElapsedTime(candleProcessingStart); telemetry.SignalUpdateTime = signalUpdateTotalTime; diff --git a/src/Managing.Application/Workers/BacktestComputeWorker.cs b/src/Managing.Application/Workers/BacktestComputeWorker.cs index 21c78bbf..3036c9f3 100644 --- a/src/Managing.Application/Workers/BacktestComputeWorker.cs +++ b/src/Managing.Application/Workers/BacktestComputeWorker.cs @@ -637,7 +637,7 @@ public class BacktestComputeWorker : BackgroundService var stuckCompletedJobs = runningJobs .Where(j => j.JobType == JobType.Backtest && j.ProgressPercentage >= 100) .ToList(); - + if (stuckCompletedJobs.Any()) { _logger.LogWarning( @@ -646,42 +646,27 @@ public class BacktestComputeWorker : BackgroundService foreach (var stuckJob in stuckCompletedJobs) { - _logger.LogWarning( - "🔧 Job {JobId} stuck at 100% progress in Running status since {StartedAt}. Marking as completed.", - stuckJob.Id, stuckJob.StartedAt); + await AutoCompleteStuckJobAsync(stuckJob, jobRepository, scope.ServiceProvider); + } + } - stuckJob.Status = JobStatus.Completed; - stuckJob.CompletedAt = stuckJob.CompletedAt ?? DateTime.UtcNow; - stuckJob.LastHeartbeat = DateTime.UtcNow; - - // Add note to error message if not already set - if (string.IsNullOrEmpty(stuckJob.ErrorMessage)) - { - stuckJob.ErrorMessage = "Job completed but status was not updated (auto-recovered)"; - } - - await jobRepository.UpdateAsync(stuckJob); - - // Clean up progress tracker if still present - _jobProgressTrackers.TryRemove(stuckJob.Id, out _); - _runningJobTasks.TryRemove(stuckJob.Id, out _); + // Also check for jobs that have been running for too long but haven't reached 100% + var longRunningJobs = runningJobs + .Where(j => j.JobType == JobType.Backtest && + j.ProgressPercentage < 100 && + j.StartedAt.HasValue && + (DateTime.UtcNow - j.StartedAt.Value) > TimeSpan.FromMinutes(_options.JobTimeoutMinutes + 10)) // Extra 10 min grace + .ToList(); - // Update bundle request if this is part of a bundle - if (stuckJob.BundleRequestId.HasValue) - { - try - { - await UpdateBundleRequestProgress(stuckJob.BundleRequestId.Value, scope.ServiceProvider, stuckJob); - } - catch (Exception ex) - { - _logger.LogError(ex, "Error updating bundle request progress for stuck job {JobId}", stuckJob.Id); - } - } - - _logger.LogInformation( - "✅ Successfully auto-completed stuck job {JobId}. Worker can now claim new jobs.", - stuckJob.Id); + if (longRunningJobs.Any()) + { + _logger.LogWarning( + "🔧 Found {Count} jobs running longer than timeout for worker {WorkerId}. Marking as failed.", + longRunningJobs.Count, _options.WorkerId); + + foreach (var longJob in longRunningJobs) + { + await HandleLongRunningJobAsync(longJob, jobRepository, scope.ServiceProvider); } } @@ -947,6 +932,102 @@ public class BacktestComputeWorker : BackgroundService return category == FailureCategory.Transient || category == FailureCategory.SystemError; } + private async Task AutoCompleteStuckJobAsync(Job stuckJob, IJobRepository jobRepository, IServiceProvider serviceProvider) + { + try + { + _logger.LogWarning( + "🔧 Job {JobId} stuck at 100% progress in Running status since {StartedAt}. Marking as completed.", + stuckJob.Id, stuckJob.StartedAt); + + stuckJob.Status = JobStatus.Completed; + stuckJob.CompletedAt = stuckJob.CompletedAt ?? DateTime.UtcNow; + stuckJob.LastHeartbeat = DateTime.UtcNow; + + // Add note to error message if not already set + if (string.IsNullOrEmpty(stuckJob.ErrorMessage)) + { + stuckJob.ErrorMessage = "Job completed but status was not updated (auto-recovered)"; + } + + await jobRepository.UpdateAsync(stuckJob); + + // Clean up progress tracker if still present + _jobProgressTrackers.TryRemove(stuckJob.Id, out _); + _runningJobTasks.TryRemove(stuckJob.Id, out _); + + // Update bundle request if this is part of a bundle + if (stuckJob.BundleRequestId.HasValue) + { + try + { + await UpdateBundleRequestProgress(stuckJob.BundleRequestId.Value, serviceProvider, stuckJob); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error updating bundle request progress for stuck job {JobId}", stuckJob.Id); + } + } + + _logger.LogInformation( + "✅ Successfully auto-completed stuck job {JobId}. Worker can now claim new jobs.", + stuckJob.Id); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error auto-completing stuck job {JobId}", stuckJob.Id); + } + } + + private async Task HandleLongRunningJobAsync(Job longJob, IJobRepository jobRepository, IServiceProvider serviceProvider) + { + try + { + var elapsed = longJob.StartedAt.HasValue + ? (DateTime.UtcNow - longJob.StartedAt.Value).TotalMinutes + : 0; + + _logger.LogWarning( + "🔧 Job {JobId} has been running for {Elapsed:F1} minutes (timeout: {_options.JobTimeoutMinutes}). Failing job.", + longJob.Id, elapsed, _options.JobTimeoutMinutes); + + // Mark as failed + longJob.Status = JobStatus.Failed; + longJob.ErrorMessage = $"Job exceeded maximum runtime of {_options.JobTimeoutMinutes} minutes"; + longJob.FailureCategory = FailureCategory.SystemError; + longJob.IsRetryable = false; + longJob.CompletedAt = DateTime.UtcNow; + longJob.AssignedWorkerId = null; + longJob.LastHeartbeat = DateTime.UtcNow; + + await jobRepository.UpdateAsync(longJob); + + // Clean up + _jobProgressTrackers.TryRemove(longJob.Id, out _); + _runningJobTasks.TryRemove(longJob.Id, out _); + + // Update bundle request if this is part of a bundle + if (longJob.BundleRequestId.HasValue) + { + try + { + await UpdateBundleRequestProgress(longJob.BundleRequestId.Value, serviceProvider, longJob); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error updating bundle request progress for long-running job {JobId}", longJob.Id); + } + } + + // Notify about permanent failure + await NotifyPermanentFailure(longJob, new TimeoutException($"Job exceeded {elapsed:F1} minutes"), serviceProvider); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error handling long-running job {JobId}", longJob.Id); + } + } + private async Task NotifyPermanentFailure( Job job, Exception ex, @@ -956,7 +1037,7 @@ public class BacktestComputeWorker : BackgroundService { var webhookService = serviceProvider.GetRequiredService(); const string alertsChannel = "2676086723"; - + var jobTypeName = job.JobType == JobType.Genetic ? "Genetic" : "Backtest"; var message = $"🚨 **{jobTypeName} Job Failed Permanently**\n" + $"Job ID: `{job.Id}`\n" + @@ -965,7 +1046,7 @@ public class BacktestComputeWorker : BackgroundService $"Failure Category: {job.FailureCategory}\n" + $"Error: {ex.Message}\n" + $"Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC"; - + await webhookService.SendMessage(message, alertsChannel); } catch (Exception notifyEx) diff --git a/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs b/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs index 7247576a..155ddd7b 100644 --- a/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs +++ b/src/Managing.Application/Workers/BundleBacktestHealthCheckWorker.cs @@ -181,6 +181,17 @@ public class BundleBacktestHealthCheckWorker : BackgroundService await HandleStuckBundleAsync(bundle, timeSinceUpdate, jobs, backtestRepository, jobRepository); return (StuckCount: 1, MissingJobsCount: 0, HealthyCount: 0); } + + // NEW: Check if bundle has been running for too long overall (regardless of recent updates) + var timeSinceCreation = DateTime.UtcNow - bundle.CreatedAt; + if (timeSinceCreation > TimeSpan.FromHours(2)) // 2 hours is too long for any bundle + { + _logger.LogWarning( + "⚠️ Bundle {BundleRequestId} has been running for {Hours:F1} hours total. Forcing stuck bundle recovery.", + bundle.RequestId, timeSinceCreation.TotalHours); + await HandleStuckBundleAsync(bundle, timeSinceCreation, jobs, backtestRepository, jobRepository); + return (StuckCount: 1, MissingJobsCount: 0, HealthyCount: 0); + } } // Check 3: Pending bundle that's been pending too long (jobs created but never started) @@ -436,9 +447,9 @@ public class BundleBacktestHealthCheckWorker : BackgroundService IJobRepository jobRepository) { _logger.LogWarning( - "⚠️ Bundle {BundleRequestId} is stuck in Running status. No progress for {TimeSinceUpdate} hours. " + + "⚠️ Bundle {BundleRequestId} is stuck in Running status. No progress for {TimeSinceUpdate}. " + "Completed: {Completed}/{Total}", - bundle.RequestId, timeSinceUpdate.TotalHours, bundle.CompletedBacktests, bundle.TotalBacktests); + bundle.RequestId, timeSinceUpdate, bundle.CompletedBacktests, bundle.TotalBacktests); // Check job statuses to understand why bundle is stuck var jobStatusSummary = jobs @@ -478,32 +489,53 @@ public class BundleBacktestHealthCheckWorker : BackgroundService else { // Some jobs are still pending or running - bundle is genuinely stuck - // Reset any stale running jobs back to pending var runningJobs = jobs.Where(j => j.Status == JobStatus.Running).ToList(); + var pendingJobs = jobs.Where(j => j.Status == JobStatus.Pending).ToList(); + var resetCount = 0; + // Reset ALL running jobs for stuck bundles - they're likely not making progress foreach (var job in runningJobs) { - var timeSinceJobHeartbeat = job.LastHeartbeat.HasValue - ? DateTime.UtcNow - job.LastHeartbeat.Value - : DateTime.UtcNow - job.CreatedAt; + _logger.LogInformation( + "Resetting running job {JobId} for stuck bundle {BundleRequestId} back to Pending", + job.Id, bundle.RequestId); - if (timeSinceJobHeartbeat > TimeSpan.FromMinutes(30)) - { - _logger.LogInformation( - "Resetting stale job {JobId} for bundle {BundleRequestId} back to Pending", - job.Id, bundle.RequestId); + job.Status = JobStatus.Pending; + job.AssignedWorkerId = null; + job.LastHeartbeat = null; + await jobRepository.UpdateAsync(job); + resetCount++; + } - job.Status = JobStatus.Pending; - job.AssignedWorkerId = null; - job.LastHeartbeat = null; - await jobRepository.UpdateAsync(job); - } + // Also reset old pending jobs that might be stuck in retry loops + var oldPendingJobs = pendingJobs.Where(j => + j.RetryAfter.HasValue && j.RetryAfter.Value < DateTime.UtcNow).ToList(); + + foreach (var job in oldPendingJobs) + { + _logger.LogInformation( + "Resetting old pending job {JobId} for stuck bundle {BundleRequestId}", + job.Id, bundle.RequestId); + + job.RetryAfter = null; // Clear retry delay + await jobRepository.UpdateAsync(job); + resetCount++; + } + + // If no jobs were reset and there are pending jobs, the issue might be elsewhere + if (resetCount == 0 && pendingJobs.Any()) + { + _logger.LogWarning( + "Bundle {BundleRequestId} has {PendingCount} pending jobs but no running jobs. " + + "This might indicate workers are not picking up jobs or jobs are failing immediately.", + bundle.RequestId, pendingJobs.Count); } // Update bundle timestamp to give it another chance bundle.UpdatedAt = DateTime.UtcNow; - bundle.ErrorMessage = - $"Bundle was stuck. Reset {runningJobs.Count(j => j.Status == JobStatus.Pending)} stale jobs to pending."; + bundle.ErrorMessage = resetCount > 0 + ? $"Bundle was stuck. Reset {resetCount} jobs to allow retry." + : $"Bundle was stuck but no jobs needed resetting. May indicate worker issues."; } await backtestRepository.UpdateBundleBacktestRequestAsync(bundle);