Enhance BacktestExecutor and BacktestComputeWorker with timeout and memory monitoring features. Implement auto-completion for stuck jobs and handle long-running jobs more effectively. Add overall runtime checks for bundles in BundleBacktestHealthCheckWorker to improve job management and recovery processes.

This commit is contained in:
2025-12-28 18:56:33 +07:00
parent f84524f93a
commit d1924d9030
3 changed files with 272 additions and 87 deletions

View File

@@ -141,7 +141,6 @@ public class BacktestExecutor
var candlesProcessed = 0;
var lastProgressUpdate = DateTime.UtcNow;
const int progressUpdateIntervalMs = 5000; // Update progress every 5 seconds to reduce database load
const int walletCheckInterval = 10; // Check wallet balance every N candles instead of every candle
var lastWalletCheck = 0;
var lastWalletBalance = config.BotTradingBalance;
@@ -164,13 +163,58 @@ public class BacktestExecutor
var loopOverheadTotalTime = TimeSpan.Zero;
// Process all candles with optimized rolling window approach
var lastHeartbeatTime = DateTime.UtcNow;
var candlesSinceLastProgress = 0;
foreach (var candle in candles)
{
var loopStart = Stopwatch.GetTimestamp();
var loopStopwatchStart = Stopwatch.GetTimestamp(); // Use for timing measurements
var loopStartTime = DateTime.UtcNow; // Use for timeout checks
var loopStopwatch = Stopwatch.StartNew();
// Check for cancellation (timeout or shutdown)
cancellationToken.ThrowIfCancellationRequested();
// HEARTBEAT: Check if we've been processing too long on a single candle (potential hang)
if (loopStopwatch.Elapsed > TimeSpan.FromMinutes(5)) // 5 minutes per candle is too long
{
throw new TimeoutException($"Backtest hung on candle {currentCandle + 1}/{totalCandles} at {candle.Date} - processing took {loopStopwatch.Elapsed.TotalMinutes:F1} minutes");
}
// MEMORY CHECK: Monitor memory usage to detect leaks
var currentMemory = GC.GetTotalMemory(false);
if (currentMemory > initialMemory * 10) // 10x initial memory usage
{
_logger.LogWarning("High memory usage detected: {CurrentMB:F2}MB (initial: {InitialMB:F2}MB) at candle {Current}/{Total}",
currentMemory / 1024.0 / 1024.0, initialMemory / 1024.0 / 1024.0, currentCandle + 1, totalCandles);
}
// PROGRESS UPDATE: Ensure progress is reported regularly (every 30 seconds or 1000 candles)
var timeSinceLastProgress = DateTime.UtcNow - lastProgressUpdate;
candlesSinceLastProgress++;
if (timeSinceLastProgress > TimeSpan.FromSeconds(30) || candlesSinceLastProgress >= 1000)
{
var currentProgress = (int)((double)currentCandle / totalCandles * 100);
if (progressCallback != null)
{
try
{
await progressCallback(Math.Min(currentProgress, 99)); // Never report 100% until actually done
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Progress callback failed at candle {Current}/{Total}", currentCandle + 1, totalCandles);
}
}
lastProgressUpdate = DateTime.UtcNow;
candlesSinceLastProgress = 0;
var progressMemoryUsage = GC.GetTotalMemory(false);
_logger.LogDebug("Backtest progress: {Current}/{Total} candles ({Progress}%) - Memory: {MemoryMB:F2}MB",
currentCandle + 1, totalCandles, currentProgress, progressMemoryUsage / 1024.0 / 1024.0);
}
// TIMING: Measure rolling window operations
var rollingWindowStart = Stopwatch.GetTimestamp();
@@ -187,14 +231,40 @@ public class BacktestExecutor
tradingBot.LastCandle = candle;
// Run with optimized backtest path (minimize async calls)
// Run with optimized backtest path (minimize async calls) with timeout protection
var signalUpdateStart = Stopwatch.GetTimestamp();
// Pass List<Candle> directly - no conversion needed, order is preserved
await tradingBot.UpdateSignals(rollingWindowCandles);
try
{
// Timeout wrapper for signal updates (max 2 minutes per candle)
var signalTimeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
var signalLinkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, signalTimeoutCts.Token);
// Pass List<Candle> directly - no conversion needed, order is preserved
await tradingBot.UpdateSignals(rollingWindowCandles).WaitAsync(signalLinkedCts.Token);
signalTimeoutCts.Dispose();
signalLinkedCts.Dispose();
}
catch (TimeoutException)
{
throw new TimeoutException($"UpdateSignals timed out on candle {currentCandle + 1}/{totalCandles} at {candle.Date}");
}
signalUpdateTotalTime += Stopwatch.GetElapsedTime(signalUpdateStart);
var backtestStepStart = Stopwatch.GetTimestamp();
await tradingBot.Run();
try
{
// Timeout wrapper for backtest run (max 3 minutes per candle)
var runTimeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(3));
var runLinkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, runTimeoutCts.Token);
await tradingBot.Run().WaitAsync(runLinkedCts.Token);
runTimeoutCts.Dispose();
runLinkedCts.Dispose();
}
catch (TimeoutException)
{
throw new TimeoutException($"Run() timed out on candle {currentCandle + 1}/{totalCandles} at {candle.Date}");
}
backtestStepTotalTime += Stopwatch.GetElapsedTime(backtestStepStart);
telemetry.TotalBacktestSteps++;
@@ -218,40 +288,20 @@ public class BacktestExecutor
}
}
// Update progress callback if provided (optimized frequency)
var currentPercentage = (currentCandle * 100) / totalCandles;
var timeSinceLastUpdate = (DateTime.UtcNow - lastProgressUpdate).TotalMilliseconds;
if (progressCallback != null && (timeSinceLastUpdate >= progressUpdateIntervalMs ||
currentPercentage >= lastLoggedPercentage + 10))
{
var progressCallbackStart = Stopwatch.GetTimestamp();
try
{
await progressCallback(currentPercentage);
telemetry.ProgressCallbacksCount++;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error in progress callback");
}
progressCallbackTotalTime += Stopwatch.GetElapsedTime(progressCallbackStart);
lastProgressUpdate = DateTime.UtcNow;
}
// Track peak memory usage (reduced frequency to minimize GC overhead)
if (currentCandle - lastMemoryCheck >= memoryCheckInterval)
{
var currentMemory = GC.GetTotalMemory(false);
if (currentMemory > peakMemory)
var memoryCheckValue = GC.GetTotalMemory(false);
if (memoryCheckValue > peakMemory)
{
peakMemory = currentMemory;
peakMemory = memoryCheckValue;
}
lastMemoryCheck = currentCandle;
}
// Log progress every 10% (reduced frequency)
var currentPercentage = (currentCandle * 100) / totalCandles;
if (currentPercentage >= lastLoggedPercentage + 10)
{
lastLoggedPercentage = currentPercentage;
@@ -260,12 +310,34 @@ public class BacktestExecutor
currentPercentage, currentCandle, totalCandles);
}
// LOOP COMPLETION CHECK: Ensure we haven't spent too long on this single candle
var loopDuration = DateTime.UtcNow - loopStartTime;
if (loopDuration > TimeSpan.FromMinutes(10)) // Entire loop should not take more than 10 minutes
{
_logger.LogWarning("Candle processing took unusually long: {Duration:F1} minutes for candle {Current}/{Total} at {Date}",
loopDuration.TotalMinutes, currentCandle, totalCandles, candle.Date);
}
// TIMING: Calculate loop overhead (everything except signal updates and backtest steps)
var loopTotal = Stopwatch.GetElapsedTime(loopStart);
var loopTotal = Stopwatch.GetElapsedTime(loopStopwatchStart);
var signalAndBacktestTime = signalUpdateTotalTime + backtestStepTotalTime;
// Note: loopOverheadTotalTime is cumulative, we track it at the end
}
// FINAL PROGRESS UPDATE: Ensure 100% is reported when complete
if (progressCallback != null)
{
try
{
await progressCallback(100);
_logger.LogInformation("Backtest completed: 100% ({TotalCandles} candles processed)", totalCandles);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Final progress callback failed");
}
}
// Complete candle processing telemetry
telemetry.CandleProcessingTime = Stopwatch.GetElapsedTime(candleProcessingStart);
telemetry.SignalUpdateTime = signalUpdateTotalTime;

View File

@@ -637,7 +637,7 @@ public class BacktestComputeWorker : BackgroundService
var stuckCompletedJobs = runningJobs
.Where(j => j.JobType == JobType.Backtest && j.ProgressPercentage >= 100)
.ToList();
if (stuckCompletedJobs.Any())
{
_logger.LogWarning(
@@ -646,42 +646,27 @@ public class BacktestComputeWorker : BackgroundService
foreach (var stuckJob in stuckCompletedJobs)
{
_logger.LogWarning(
"🔧 Job {JobId} stuck at 100% progress in Running status since {StartedAt}. Marking as completed.",
stuckJob.Id, stuckJob.StartedAt);
await AutoCompleteStuckJobAsync(stuckJob, jobRepository, scope.ServiceProvider);
}
}
stuckJob.Status = JobStatus.Completed;
stuckJob.CompletedAt = stuckJob.CompletedAt ?? DateTime.UtcNow;
stuckJob.LastHeartbeat = DateTime.UtcNow;
// Add note to error message if not already set
if (string.IsNullOrEmpty(stuckJob.ErrorMessage))
{
stuckJob.ErrorMessage = "Job completed but status was not updated (auto-recovered)";
}
await jobRepository.UpdateAsync(stuckJob);
// Clean up progress tracker if still present
_jobProgressTrackers.TryRemove(stuckJob.Id, out _);
_runningJobTasks.TryRemove(stuckJob.Id, out _);
// Also check for jobs that have been running for too long but haven't reached 100%
var longRunningJobs = runningJobs
.Where(j => j.JobType == JobType.Backtest &&
j.ProgressPercentage < 100 &&
j.StartedAt.HasValue &&
(DateTime.UtcNow - j.StartedAt.Value) > TimeSpan.FromMinutes(_options.JobTimeoutMinutes + 10)) // Extra 10 min grace
.ToList();
// Update bundle request if this is part of a bundle
if (stuckJob.BundleRequestId.HasValue)
{
try
{
await UpdateBundleRequestProgress(stuckJob.BundleRequestId.Value, scope.ServiceProvider, stuckJob);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating bundle request progress for stuck job {JobId}", stuckJob.Id);
}
}
_logger.LogInformation(
"✅ Successfully auto-completed stuck job {JobId}. Worker can now claim new jobs.",
stuckJob.Id);
if (longRunningJobs.Any())
{
_logger.LogWarning(
"🔧 Found {Count} jobs running longer than timeout for worker {WorkerId}. Marking as failed.",
longRunningJobs.Count, _options.WorkerId);
foreach (var longJob in longRunningJobs)
{
await HandleLongRunningJobAsync(longJob, jobRepository, scope.ServiceProvider);
}
}
@@ -947,6 +932,102 @@ public class BacktestComputeWorker : BackgroundService
return category == FailureCategory.Transient || category == FailureCategory.SystemError;
}
private async Task AutoCompleteStuckJobAsync(Job stuckJob, IJobRepository jobRepository, IServiceProvider serviceProvider)
{
try
{
_logger.LogWarning(
"🔧 Job {JobId} stuck at 100% progress in Running status since {StartedAt}. Marking as completed.",
stuckJob.Id, stuckJob.StartedAt);
stuckJob.Status = JobStatus.Completed;
stuckJob.CompletedAt = stuckJob.CompletedAt ?? DateTime.UtcNow;
stuckJob.LastHeartbeat = DateTime.UtcNow;
// Add note to error message if not already set
if (string.IsNullOrEmpty(stuckJob.ErrorMessage))
{
stuckJob.ErrorMessage = "Job completed but status was not updated (auto-recovered)";
}
await jobRepository.UpdateAsync(stuckJob);
// Clean up progress tracker if still present
_jobProgressTrackers.TryRemove(stuckJob.Id, out _);
_runningJobTasks.TryRemove(stuckJob.Id, out _);
// Update bundle request if this is part of a bundle
if (stuckJob.BundleRequestId.HasValue)
{
try
{
await UpdateBundleRequestProgress(stuckJob.BundleRequestId.Value, serviceProvider, stuckJob);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating bundle request progress for stuck job {JobId}", stuckJob.Id);
}
}
_logger.LogInformation(
"✅ Successfully auto-completed stuck job {JobId}. Worker can now claim new jobs.",
stuckJob.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error auto-completing stuck job {JobId}", stuckJob.Id);
}
}
private async Task HandleLongRunningJobAsync(Job longJob, IJobRepository jobRepository, IServiceProvider serviceProvider)
{
try
{
var elapsed = longJob.StartedAt.HasValue
? (DateTime.UtcNow - longJob.StartedAt.Value).TotalMinutes
: 0;
_logger.LogWarning(
"🔧 Job {JobId} has been running for {Elapsed:F1} minutes (timeout: {_options.JobTimeoutMinutes}). Failing job.",
longJob.Id, elapsed, _options.JobTimeoutMinutes);
// Mark as failed
longJob.Status = JobStatus.Failed;
longJob.ErrorMessage = $"Job exceeded maximum runtime of {_options.JobTimeoutMinutes} minutes";
longJob.FailureCategory = FailureCategory.SystemError;
longJob.IsRetryable = false;
longJob.CompletedAt = DateTime.UtcNow;
longJob.AssignedWorkerId = null;
longJob.LastHeartbeat = DateTime.UtcNow;
await jobRepository.UpdateAsync(longJob);
// Clean up
_jobProgressTrackers.TryRemove(longJob.Id, out _);
_runningJobTasks.TryRemove(longJob.Id, out _);
// Update bundle request if this is part of a bundle
if (longJob.BundleRequestId.HasValue)
{
try
{
await UpdateBundleRequestProgress(longJob.BundleRequestId.Value, serviceProvider, longJob);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating bundle request progress for long-running job {JobId}", longJob.Id);
}
}
// Notify about permanent failure
await NotifyPermanentFailure(longJob, new TimeoutException($"Job exceeded {elapsed:F1} minutes"), serviceProvider);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling long-running job {JobId}", longJob.Id);
}
}
private async Task NotifyPermanentFailure(
Job job,
Exception ex,
@@ -956,7 +1037,7 @@ public class BacktestComputeWorker : BackgroundService
{
var webhookService = serviceProvider.GetRequiredService<IWebhookService>();
const string alertsChannel = "2676086723";
var jobTypeName = job.JobType == JobType.Genetic ? "Genetic" : "Backtest";
var message = $"🚨 **{jobTypeName} Job Failed Permanently**\n" +
$"Job ID: `{job.Id}`\n" +
@@ -965,7 +1046,7 @@ public class BacktestComputeWorker : BackgroundService
$"Failure Category: {job.FailureCategory}\n" +
$"Error: {ex.Message}\n" +
$"Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC";
await webhookService.SendMessage(message, alertsChannel);
}
catch (Exception notifyEx)

View File

@@ -181,6 +181,17 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
await HandleStuckBundleAsync(bundle, timeSinceUpdate, jobs, backtestRepository, jobRepository);
return (StuckCount: 1, MissingJobsCount: 0, HealthyCount: 0);
}
// NEW: Check if bundle has been running for too long overall (regardless of recent updates)
var timeSinceCreation = DateTime.UtcNow - bundle.CreatedAt;
if (timeSinceCreation > TimeSpan.FromHours(2)) // 2 hours is too long for any bundle
{
_logger.LogWarning(
"⚠️ Bundle {BundleRequestId} has been running for {Hours:F1} hours total. Forcing stuck bundle recovery.",
bundle.RequestId, timeSinceCreation.TotalHours);
await HandleStuckBundleAsync(bundle, timeSinceCreation, jobs, backtestRepository, jobRepository);
return (StuckCount: 1, MissingJobsCount: 0, HealthyCount: 0);
}
}
// Check 3: Pending bundle that's been pending too long (jobs created but never started)
@@ -436,9 +447,9 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
IJobRepository jobRepository)
{
_logger.LogWarning(
"⚠️ Bundle {BundleRequestId} is stuck in Running status. No progress for {TimeSinceUpdate} hours. " +
"⚠️ Bundle {BundleRequestId} is stuck in Running status. No progress for {TimeSinceUpdate}. " +
"Completed: {Completed}/{Total}",
bundle.RequestId, timeSinceUpdate.TotalHours, bundle.CompletedBacktests, bundle.TotalBacktests);
bundle.RequestId, timeSinceUpdate, bundle.CompletedBacktests, bundle.TotalBacktests);
// Check job statuses to understand why bundle is stuck
var jobStatusSummary = jobs
@@ -478,32 +489,53 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
else
{
// Some jobs are still pending or running - bundle is genuinely stuck
// Reset any stale running jobs back to pending
var runningJobs = jobs.Where(j => j.Status == JobStatus.Running).ToList();
var pendingJobs = jobs.Where(j => j.Status == JobStatus.Pending).ToList();
var resetCount = 0;
// Reset ALL running jobs for stuck bundles - they're likely not making progress
foreach (var job in runningJobs)
{
var timeSinceJobHeartbeat = job.LastHeartbeat.HasValue
? DateTime.UtcNow - job.LastHeartbeat.Value
: DateTime.UtcNow - job.CreatedAt;
_logger.LogInformation(
"Resetting running job {JobId} for stuck bundle {BundleRequestId} back to Pending",
job.Id, bundle.RequestId);
if (timeSinceJobHeartbeat > TimeSpan.FromMinutes(30))
{
_logger.LogInformation(
"Resetting stale job {JobId} for bundle {BundleRequestId} back to Pending",
job.Id, bundle.RequestId);
job.Status = JobStatus.Pending;
job.AssignedWorkerId = null;
job.LastHeartbeat = null;
await jobRepository.UpdateAsync(job);
resetCount++;
}
job.Status = JobStatus.Pending;
job.AssignedWorkerId = null;
job.LastHeartbeat = null;
await jobRepository.UpdateAsync(job);
}
// Also reset old pending jobs that might be stuck in retry loops
var oldPendingJobs = pendingJobs.Where(j =>
j.RetryAfter.HasValue && j.RetryAfter.Value < DateTime.UtcNow).ToList();
foreach (var job in oldPendingJobs)
{
_logger.LogInformation(
"Resetting old pending job {JobId} for stuck bundle {BundleRequestId}",
job.Id, bundle.RequestId);
job.RetryAfter = null; // Clear retry delay
await jobRepository.UpdateAsync(job);
resetCount++;
}
// If no jobs were reset and there are pending jobs, the issue might be elsewhere
if (resetCount == 0 && pendingJobs.Any())
{
_logger.LogWarning(
"Bundle {BundleRequestId} has {PendingCount} pending jobs but no running jobs. " +
"This might indicate workers are not picking up jobs or jobs are failing immediately.",
bundle.RequestId, pendingJobs.Count);
}
// Update bundle timestamp to give it another chance
bundle.UpdatedAt = DateTime.UtcNow;
bundle.ErrorMessage =
$"Bundle was stuck. Reset {runningJobs.Count(j => j.Status == JobStatus.Pending)} stale jobs to pending.";
bundle.ErrorMessage = resetCount > 0
? $"Bundle was stuck. Reset {resetCount} jobs to allow retry."
: $"Bundle was stuck but no jobs needed resetting. May indicate worker issues.";
}
await backtestRepository.UpdateBundleBacktestRequestAsync(bundle);