using System.Collections.Concurrent;
using System.Text.Json;
using Managing.Application.Abstractions.Repositories;
using Managing.Application.Abstractions.Services;
using Managing.Application.Backtests;
using Managing.Domain.Backtests;
using Managing.Domain.Bots;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
///
/// Background worker that processes backtest jobs from the queue.
/// Polls for pending jobs, claims them using advisory locks, and processes them.
///
public class BacktestComputeWorker : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger _logger;
private readonly BacktestComputeWorkerOptions _options;
private readonly SemaphoreSlim _instanceSemaphore;
private readonly ConcurrentDictionary _runningJobTasks = new();
private readonly ConcurrentDictionary _jobProgressTrackers = new();
private readonly CancellationTokenSource _shutdownCts = new();
public BacktestComputeWorker(
IServiceScopeFactory scopeFactory,
ILogger logger,
IOptions options)
{
_scopeFactory = scopeFactory;
_logger = logger;
_options = options.Value;
_instanceSemaphore = new SemaphoreSlim(_options.MaxConcurrentPerInstance, _options.MaxConcurrentPerInstance);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
"BacktestComputeWorker starting. WorkerId: {WorkerId}, MaxConcurrentPerUser: {MaxConcurrentPerUser}, MaxConcurrentPerInstance: {MaxConcurrentPerInstance}, PollInterval: {PollInterval}s, JobTimeout: {JobTimeoutMinutes}min",
_options.WorkerId, _options.MaxConcurrentPerUser, _options.MaxConcurrentPerInstance,
_options.JobPollIntervalSeconds, _options.JobTimeoutMinutes);
// Reset any jobs assigned to this WorkerId from previous worker instances at startup
// This is critical when restarting with the same WorkerId (e.g., Environment.MachineName)
try
{
using var scope = _scopeFactory.CreateScope();
var jobRepository = scope.ServiceProvider.GetRequiredService();
// First, reset all jobs assigned to this WorkerId (from previous instance)
var workerResetCount = await jobRepository.ResetJobsByWorkerIdAsync(_options.WorkerId);
if (workerResetCount > 0)
{
_logger.LogInformation("Reset {Count} jobs assigned to worker {WorkerId} from previous instance",
workerResetCount, _options.WorkerId);
}
// Then, reset any other stale jobs (from other workers or orphaned jobs)
var staleResetCount = await jobRepository.ResetStaleJobsAsync(_options.StaleJobTimeoutMinutes);
if (staleResetCount > 0)
{
_logger.LogInformation("Reset {Count} stale jobs to Pending status at startup", staleResetCount);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error resetting jobs at startup");
// Don't fail startup if this fails, but log it
}
// Link cancellation tokens
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, _shutdownCts.Token);
var cancellationToken = linkedCts.Token;
// Background task for stale job recovery
var staleJobRecoveryTask = Task.Run(() => StaleJobRecoveryLoop(cancellationToken), cancellationToken);
// Background task for heartbeat updates
var heartbeatTask = Task.Run(() => HeartbeatLoop(cancellationToken), cancellationToken);
// Background task for progress persistence
var progressPersistenceTask = Task.Run(() => ProgressPersistenceLoop(cancellationToken), cancellationToken);
// Main job processing loop
try
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await ProcessJobsAsync(cancellationToken);
}
catch (OperationCanceledException)
{
// Expected during shutdown
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in BacktestComputeWorker main loop");
SentrySdk.CaptureException(ex);
}
try
{
await Task.Delay(TimeSpan.FromSeconds(_options.JobPollIntervalSeconds), cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
}
}
finally
{
_logger.LogInformation("BacktestComputeWorker stopping, waiting for {Count} running jobs to complete or timeout",
_runningJobTasks.Count);
// Signal shutdown
_shutdownCts.Cancel();
// Wait for running jobs with timeout
var waitTasks = _runningJobTasks.Values.ToArray();
if (waitTasks.Length > 0)
{
var timeoutTask = Task.Delay(TimeSpan.FromMinutes(_options.GracefulShutdownTimeoutMinutes), CancellationToken.None);
var completedTask = await Task.WhenAny(Task.WhenAll(waitTasks), timeoutTask);
if (completedTask == timeoutTask)
{
_logger.LogWarning("Graceful shutdown timeout reached, {Count} jobs may still be running",
_runningJobTasks.Count);
}
else
{
_logger.LogInformation("All running jobs completed during graceful shutdown");
}
}
_logger.LogInformation("BacktestComputeWorker stopped");
}
}
private async Task ProcessJobsAsync(CancellationToken cancellationToken)
{
// Check if this instance has capacity
if (!await _instanceSemaphore.WaitAsync(0, cancellationToken))
{
// Instance at capacity, skip this iteration
return;
}
try
{
using var scope = _scopeFactory.CreateScope();
var jobRepository = scope.ServiceProvider.GetRequiredService();
// Claim a random backtest job atomically, excluding users at capacity
// The SQL query checks running job counts within the transaction, ensuring thread-safety
var job = await jobRepository.ClaimRandomJobAsync(
_options.WorkerId,
JobType.Backtest,
_options.MaxConcurrentPerUser);
if (job == null)
{
// No jobs available for users not at capacity, release semaphore
_instanceSemaphore.Release();
return;
}
_logger.LogInformation(
"Claimed random backtest job {JobId} (UserId: {UserId}) for worker {WorkerId}",
job.Id, job.UserId, _options.WorkerId);
// Process the job asynchronously (don't await, let it run in background)
// Create a new scope for the job processing to ensure proper lifetime management
var jobTask = Task.Run(async () =>
{
try
{
await ProcessJobAsync(job, cancellationToken);
}
catch (OperationCanceledException)
{
// Handle cancellation gracefully
_logger.LogInformation("Job {JobId} was cancelled during processing", job.Id);
await HandleJobCancellation(job);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing job {JobId}", job.Id);
// Error handling is done in ProcessJobAsync
}
finally
{
_runningJobTasks.TryRemove(job.Id, out _);
_instanceSemaphore.Release();
}
}, cancellationToken);
// Track the running job task
_runningJobTasks.TryAdd(job.Id, jobTask);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error claiming or processing job");
_instanceSemaphore.Release();
throw;
}
}
private async Task ProcessJobAsync(
Job job,
CancellationToken cancellationToken)
{
using var scope = _scopeFactory.CreateScope();
var jobRepository = scope.ServiceProvider.GetRequiredService();
var executor = scope.ServiceProvider.GetRequiredService();
var userService = scope.ServiceProvider.GetRequiredService();
var exchangeService = scope.ServiceProvider.GetRequiredService();
var agentSummaryRepository = scope.ServiceProvider.GetRequiredService();
var jobStartTime = DateTime.UtcNow;
try
{
_logger.LogInformation(
"Processing backtest job {JobId} (BundleRequestId: {BundleRequestId}, UserId: {UserId})",
job.Id, job.BundleRequestId, job.UserId);
// Deserialize config
var config = JsonSerializer.Deserialize(job.ConfigJson);
if (config == null)
{
throw new InvalidOperationException("Failed to deserialize TradingBotConfig from job");
}
// Load user
var user = await userService.GetUserByIdAsync(job.UserId);
if (user == null)
{
throw new InvalidOperationException($"User {job.UserId} not found");
}
// Load candles
var candles = await exchangeService.GetCandlesInflux(
TradingExchanges.Evm,
config.Ticker,
job.StartDate,
config.Timeframe,
job.EndDate);
if (candles == null || candles.Count == 0)
{
throw new InvalidOperationException(
$"No candles found for {config.Ticker} on {config.Timeframe} from {job.StartDate} to {job.EndDate}");
}
// Create progress tracker for this job
var progressTracker = new JobProgressTracker(job.Id, _logger);
_jobProgressTrackers.TryAdd(job.Id, progressTracker);
// Progress callback that only updates in-memory progress (non-blocking)
// Timeout is now enforced via CancellationToken, not by throwing in callback
Func progressCallback = (percentage) =>
{
// Update progress in memory only - persistence happens in background
progressTracker.UpdateProgress(percentage);
return Task.CompletedTask; // Non-blocking
};
// Execute the backtest with timeout
var timeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(_options.JobTimeoutMinutes));
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
// Convert HashSet to List - candles are already ordered from repository
var candlesList = candles.ToList();
LightBacktest result;
try
{
result = await executor.ExecuteAsync(
config,
candlesList,
user,
save: true,
withCandles: false,
requestId: job.RequestId,
bundleRequestId: job.BundleRequestId,
metadata: null,
progressCallback: progressCallback,
cancellationToken: linkedCts.Token);
}
catch (OperationCanceledException) when (timeoutCts.Token.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
{
var elapsed = DateTime.UtcNow - jobStartTime;
throw new TimeoutException($"Job {job.Id} exceeded timeout of {_options.JobTimeoutMinutes} minutes (ran for {elapsed.TotalMinutes:F1} minutes)");
}
finally
{
timeoutCts.Dispose();
linkedCts.Dispose();
}
// Update job with result
job.Status = JobStatus.Completed;
job.ProgressPercentage = 100;
job.ResultJson = JsonSerializer.Serialize(result);
job.CompletedAt = DateTime.UtcNow;
job.LastHeartbeat = DateTime.UtcNow;
await jobRepository.UpdateAsync(job);
// Clean up progress tracker
_jobProgressTrackers.TryRemove(job.Id, out _);
// Increment backtest count for the user's agent summary
try
{
await agentSummaryRepository.IncrementBacktestCountAsync(job.UserId);
_logger.LogDebug("Incremented backtest count for user {UserId}", job.UserId);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to increment backtest count for user {UserId}", job.UserId);
// Don't fail the job if this update fails
}
var elapsedTime = DateTime.UtcNow - jobStartTime;
_logger.LogInformation(
"Completed backtest job {JobId}. Score: {Score}, PnL: {PnL}, Duration: {DurationMinutes:F1} minutes",
job.Id, result.Score, result.FinalPnl, elapsedTime.TotalMinutes);
// Update bundle request progress if this job is part of a bundle
if (job.BundleRequestId.HasValue)
{
try
{
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider, job);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating bundle request progress for job {JobId}", job.Id);
}
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
_logger.LogInformation("Job {JobId} was cancelled", job.Id);
throw; // Re-throw to be handled by the caller
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing backtest job {JobId}", job.Id);
SentrySdk.CaptureException(ex);
// Clean up progress tracker on failure
_jobProgressTrackers.TryRemove(job.Id, out _);
await HandleJobFailure(job, ex, jobRepository, scope.ServiceProvider);
}
}
private async Task HandleJobCancellation(Job job)
{
try
{
using var scope = _scopeFactory.CreateScope();
var jobRepository = scope.ServiceProvider.GetRequiredService();
// Reload job to get latest state
var currentJob = await jobRepository.GetByIdAsync(job.Id);
if (currentJob == null)
{
_logger.LogWarning("Job {JobId} not found when handling cancellation", job.Id);
return;
}
// If job is still running, mark it for retry
if (currentJob.Status == JobStatus.Running)
{
if (currentJob.RetryCount < currentJob.MaxRetries)
{
currentJob.Status = JobStatus.Pending;
currentJob.RetryCount++;
var backoffMinutes = Math.Min(Math.Pow(2, currentJob.RetryCount), _options.MaxRetryDelayMinutes);
currentJob.RetryAfter = DateTime.UtcNow.AddMinutes(backoffMinutes);
currentJob.ErrorMessage = $"Worker shutdown - retry {currentJob.RetryCount}/{currentJob.MaxRetries}";
currentJob.FailureCategory = FailureCategory.Transient;
currentJob.AssignedWorkerId = null;
currentJob.LastHeartbeat = null;
_logger.LogWarning(
"Job {JobId} cancelled during shutdown, will be retried (attempt {RetryCount}/{MaxRetries})",
currentJob.Id, currentJob.RetryCount, currentJob.MaxRetries);
}
else
{
currentJob.Status = JobStatus.Failed;
currentJob.ErrorMessage = "Worker shutdown - exceeded max retries";
currentJob.FailureCategory = FailureCategory.SystemError;
currentJob.IsRetryable = false;
currentJob.CompletedAt = DateTime.UtcNow;
currentJob.AssignedWorkerId = null;
_logger.LogError("Job {JobId} cancelled during shutdown and exceeded max retries", currentJob.Id);
await NotifyPermanentFailure(currentJob, new OperationCanceledException("Worker shutdown"), scope.ServiceProvider);
}
await jobRepository.UpdateAsync(currentJob);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling job cancellation for job {JobId}", job.Id);
}
}
private async Task UpdateBundleRequestProgress(Guid bundleRequestId, IServiceProvider serviceProvider, Job? updatedJob = null)
{
try
{
var backtestRepository = serviceProvider.GetRequiredService();
var jobRepository = serviceProvider.GetRequiredService();
var userService = serviceProvider.GetRequiredService();
var webhookService = serviceProvider.GetRequiredService();
// Get all jobs for this bundle
var jobs = await jobRepository.GetByBundleRequestIdAsync(bundleRequestId);
var totalJobs = jobs.Count();
// If we have an updated job, use its current status instead of querying again
// This avoids race conditions where the job was just updated but not yet reflected in the query
var completedJobs = jobs.Count(j => j.Status == JobStatus.Completed);
var failedJobs = jobs.Count(j => j.Status == JobStatus.Failed);
var runningJobs = jobs.Count(j => j.Status == JobStatus.Running);
// Adjust counts based on the updated job if provided
if (updatedJob != null)
{
var existingJob = jobs.FirstOrDefault(j => j.Id == updatedJob.Id);
if (existingJob != null)
{
// Remove the old status count
switch (existingJob.Status)
{
case JobStatus.Completed:
completedJobs--;
break;
case JobStatus.Failed:
failedJobs--;
break;
case JobStatus.Running:
runningJobs--;
break;
}
// Add the new status count
switch (updatedJob.Status)
{
case JobStatus.Completed:
completedJobs++;
break;
case JobStatus.Failed:
failedJobs++;
break;
case JobStatus.Running:
runningJobs++;
break;
}
}
}
if (totalJobs == 0)
{
return; // No jobs yet
}
// Get user from first job
var firstJob = jobs.First();
var user = await userService.GetUserByIdAsync(firstJob.UserId);
if (user == null)
{
_logger.LogWarning("User {UserId} not found for bundle request {BundleRequestId}", firstJob.UserId, bundleRequestId);
return;
}
// Get bundle request
var bundleRequest = backtestRepository.GetBundleBacktestRequestByIdForUser(user, bundleRequestId);
if (bundleRequest == null)
{
_logger.LogWarning("Bundle request {BundleRequestId} not found for user {UserId}", bundleRequestId, user.Id);
return;
}
var previousStatus = bundleRequest.Status;
// Update bundle request progress (always update counters regardless of status)
bundleRequest.CompletedBacktests = completedJobs;
bundleRequest.FailedBacktests = failedJobs;
bundleRequest.UpdatedAt = DateTime.UtcNow;
// CRITICAL: If bundle is already in a final state (Completed/Failed with CompletedAt set),
// don't overwrite it unless we're detecting a legitimate inconsistency that needs fixing
if (bundleRequest.CompletedAt.HasValue &&
(bundleRequest.Status == BundleBacktestRequestStatus.Completed ||
bundleRequest.Status == BundleBacktestRequestStatus.Failed))
{
// Bundle already finalized, only update if job counts indicate it should be re-opened
// (This shouldn't happen in normal flow, but guards against race conditions)
if (completedJobs + failedJobs == totalJobs)
{
_logger.LogDebug(
"Bundle {BundleRequestId} already completed/failed. Skipping status update.",
bundleRequestId);
// Progress counters already updated above, just return
return;
}
else
{
_logger.LogWarning(
"Bundle {BundleRequestId} was marked as completed/failed but has incomplete jobs ({Completed}+{Failed}/{Total}). Reopening.",
bundleRequestId, completedJobs, failedJobs, totalJobs);
// Allow the update to proceed to fix inconsistent state
}
}
// Update status based on job states
if (completedJobs + failedJobs == totalJobs)
{
// All jobs completed or failed
if (failedJobs == 0)
{
bundleRequest.Status = BundleBacktestRequestStatus.Completed;
}
else if (completedJobs == 0)
{
bundleRequest.Status = BundleBacktestRequestStatus.Failed;
bundleRequest.ErrorMessage = "All backtests failed";
}
else
{
bundleRequest.Status = BundleBacktestRequestStatus.Completed;
bundleRequest.ErrorMessage = $"{failedJobs} backtests failed";
}
bundleRequest.CompletedAt = DateTime.UtcNow;
bundleRequest.CurrentBacktest = null;
}
else if (runningJobs > 0 || completedJobs > 0 || failedJobs > 0)
{
// Some jobs are running, or some have completed/failed (meaning work has started)
// Once a bundle has started processing, it should stay "Running" until all jobs are done
bundleRequest.Status = BundleBacktestRequestStatus.Running;
}
// If all jobs are still pending (completedJobs = 0, failedJobs = 0, runningJobs = 0),
// keep the current status (likely Pending)
// Update results list from completed jobs
var completedJobResults = jobs
.Where(j => j.Status == JobStatus.Completed && !string.IsNullOrEmpty(j.ResultJson))
.Select(j =>
{
try
{
var result = JsonSerializer.Deserialize(j.ResultJson);
return result?.Id;
}
catch
{
return null;
}
})
.Where(id => !string.IsNullOrEmpty(id))
.ToList();
bundleRequest.Results = completedJobResults!;
await backtestRepository.UpdateBundleBacktestRequestAsync(bundleRequest);
// Send webhook notification if bundle request just completed
if (previousStatus != BundleBacktestRequestStatus.Completed &&
bundleRequest.Status == BundleBacktestRequestStatus.Completed &&
!string.IsNullOrEmpty(user.TelegramChannel))
{
var completedAt = bundleRequest.CompletedAt ?? DateTime.UtcNow;
var duration = completedAt - bundleRequest.CreatedAt;
var durationText = duration.TotalHours >= 1
? $"{duration.Hours}h {duration.Minutes}m {duration.Seconds}s"
: duration.TotalMinutes >= 1
? $"{duration.Minutes}m {duration.Seconds}s"
: $"{duration.Seconds}s";
var message = $"✅ Bundle backtest '{bundleRequest.Name}' (ID: {bundleRequest.RequestId}) completed successfully. " +
$"Completed: {completedJobs}/{totalJobs} backtests" +
(failedJobs > 0 ? $", Failed: {failedJobs}" : "") +
$". Results: {completedJobResults.Count} backtest(s) generated. " +
$"Duration: {durationText}";
await webhookService.SendMessage(message, user.TelegramChannel);
}
else if (previousStatus != BundleBacktestRequestStatus.Failed &&
bundleRequest.Status == BundleBacktestRequestStatus.Failed &&
!string.IsNullOrEmpty(user.TelegramChannel))
{
var completedAt = bundleRequest.CompletedAt ?? DateTime.UtcNow;
var duration = completedAt - bundleRequest.CreatedAt;
var durationText = duration.TotalHours >= 1
? $"{duration.Hours}h {duration.Minutes}m {duration.Seconds}s"
: duration.TotalMinutes >= 1
? $"{duration.Minutes}m {duration.Seconds}s"
: $"{duration.Seconds}s";
var message = $"❌ Bundle backtest '{bundleRequest.Name}' (ID: {bundleRequest.RequestId}) failed. " +
$"All {totalJobs} backtests failed. Error: {bundleRequest.ErrorMessage}. " +
$"Duration: {durationText}";
await webhookService.SendMessage(message, user.TelegramChannel);
}
_logger.LogInformation(
"Updated bundle request {BundleRequestId} progress: {Completed}/{Total} completed, {Failed} failed, {Running} running",
bundleRequestId, completedJobs, totalJobs, failedJobs, runningJobs);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error updating bundle request {BundleRequestId} progress", bundleRequestId);
}
}
private async Task StaleJobRecoveryLoop(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken); // Check every minute
using var scope = _scopeFactory.CreateScope();
var jobRepository = scope.ServiceProvider.GetRequiredService();
// Get running jobs for this worker
var runningJobs = await jobRepository.GetRunningJobsByWorkerIdAsync(_options.WorkerId);
// CRITICAL FIX: Check for jobs stuck at 100% progress
// These jobs completed execution but their status wasn't updated to Completed
// This causes the worker to think it's at max capacity
var stuckCompletedJobs = runningJobs
.Where(j => j.JobType == JobType.Backtest && j.ProgressPercentage >= 100)
.ToList();
if (stuckCompletedJobs.Any())
{
_logger.LogWarning(
"🔧 Found {Count} jobs stuck at 100% progress for worker {WorkerId}. Auto-completing them.",
stuckCompletedJobs.Count, _options.WorkerId);
foreach (var stuckJob in stuckCompletedJobs)
{
await AutoCompleteStuckJobAsync(stuckJob, jobRepository, scope.ServiceProvider);
}
}
// Also check for jobs that have been running for too long but haven't reached 100%
var longRunningJobs = runningJobs
.Where(j => j.JobType == JobType.Backtest &&
j.ProgressPercentage < 100 &&
j.StartedAt.HasValue &&
(DateTime.UtcNow - j.StartedAt.Value) > TimeSpan.FromMinutes(_options.JobTimeoutMinutes + 10)) // Extra 10 min grace
.ToList();
if (longRunningJobs.Any())
{
_logger.LogWarning(
"🔧 Found {Count} jobs running longer than timeout for worker {WorkerId}. Marking as failed.",
longRunningJobs.Count, _options.WorkerId);
foreach (var longJob in longRunningJobs)
{
await HandleLongRunningJobAsync(longJob, jobRepository, scope.ServiceProvider);
}
}
// Get stale jobs for this worker
var now = DateTime.UtcNow;
var staleJobs = runningJobs
.Where(j => j.JobType == JobType.Backtest &&
j.ProgressPercentage < 100 && // Don't mark stuck-at-100% jobs as stale
(
// Stale heartbeat (no heartbeat in timeout period)
j.LastHeartbeat == null ||
j.LastHeartbeat < now.AddMinutes(-_options.StaleJobTimeoutMinutes) ||
// Job running too long (even with recent heartbeat)
(j.StartedAt.HasValue &&
j.StartedAt.Value < now.AddMinutes(-_options.JobTimeoutMinutes))
))
.ToList();
foreach (var job in staleJobs)
{
var elapsed = job.StartedAt.HasValue
? (DateTime.UtcNow - job.StartedAt.Value).TotalMinutes
: (double?)null;
var lastHeartbeatAge = job.LastHeartbeat.HasValue
? (DateTime.UtcNow - job.LastHeartbeat.Value).TotalMinutes
: (double?)null;
_logger.LogWarning(
"Detected stale job {JobId}: Started {StartedAt}, LastHeartbeat: {LastHeartbeat} ({HeartbeatAge} min ago), Elapsed: {Elapsed} min",
job.Id, job.StartedAt, job.LastHeartbeat, lastHeartbeatAge, elapsed);
// Remove from running tasks if still tracked
_runningJobTasks.TryRemove(job.Id, out _);
// If it's stale but retryable, reset to pending with retry count
if (job.RetryCount < job.MaxRetries)
{
job.Status = JobStatus.Pending;
job.RetryCount++;
var backoffMinutes = Math.Min(Math.Pow(2, job.RetryCount), _options.MaxRetryDelayMinutes);
job.RetryAfter = DateTime.UtcNow.AddMinutes(backoffMinutes);
job.ErrorMessage = $"Job timeout/stale - retry {job.RetryCount}/{job.MaxRetries} (ran for {elapsed:F1} min)";
job.FailureCategory = FailureCategory.SystemError;
_logger.LogWarning(
"Stale job {JobId} will be retried (attempt {RetryCount}/{MaxRetries}) after {RetryAfter}",
job.Id, job.RetryCount, job.MaxRetries, job.RetryAfter);
}
else
{
// Exceeded retries - mark as failed
job.Status = JobStatus.Failed;
job.ErrorMessage = $"Job timeout/stale - exceeded max retries (ran for {elapsed:F1} min)";
job.FailureCategory = FailureCategory.SystemError;
job.IsRetryable = false;
job.CompletedAt = DateTime.UtcNow;
_logger.LogError(
"Stale job {JobId} exceeded max retries after running for {Elapsed} minutes",
job.Id, elapsed);
// Notify permanent failure
await NotifyPermanentFailure(job, new TimeoutException($"Job timeout after {elapsed:F1} minutes"), scope.ServiceProvider);
// Update bundle request if this is part of a bundle
if (job.BundleRequestId.HasValue)
{
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider, job);
}
}
job.AssignedWorkerId = null;
job.LastHeartbeat = null;
await jobRepository.UpdateAsync(job);
}
if (staleJobs.Count > 0)
{
_logger.LogInformation("Processed {Count} stale backtest jobs", staleJobs.Count);
}
}
catch (OperationCanceledException)
{
// Expected during shutdown, don't log as error
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in stale job recovery loop");
}
}
}
private async Task HeartbeatLoop(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(_options.HeartbeatIntervalSeconds), cancellationToken);
using var scope = _scopeFactory.CreateScope();
var jobRepository = scope.ServiceProvider.GetRequiredService();
// Update heartbeat for all jobs assigned to this worker
var runningJobs = await jobRepository.GetRunningJobsByWorkerIdAsync(_options.WorkerId);
foreach (var job in runningJobs)
{
job.LastHeartbeat = DateTime.UtcNow;
await jobRepository.UpdateAsync(job);
}
}
catch (OperationCanceledException)
{
// Expected during shutdown, don't log as error
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in heartbeat loop");
}
}
}
private async Task ProgressPersistenceLoop(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken); // Check every 2 seconds
using var scope = _scopeFactory.CreateScope();
var jobRepository = scope.ServiceProvider.GetRequiredService();
// Process all progress trackers that need persistence
var trackersToPersist = _jobProgressTrackers
.Where(kvp => kvp.Value.ShouldPersist())
.ToList();
if (trackersToPersist.Count > 0)
{
_logger.LogDebug("Persisting progress for {Count} jobs", trackersToPersist.Count);
foreach (var (jobId, tracker) in trackersToPersist)
{
try
{
var (percentage, lastUpdate) = tracker.GetProgressForPersistence();
// Get and update the job
var job = await jobRepository.GetByIdAsync(jobId);
if (job != null && job.Status == JobStatus.Running)
{
job.ProgressPercentage = percentage;
job.LastHeartbeat = lastUpdate;
await jobRepository.UpdateAsync(job);
_logger.LogDebug("Persisted progress {Percentage}% for job {JobId}", percentage, jobId);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error persisting progress for job {JobId}", jobId);
}
}
}
}
catch (OperationCanceledException)
{
// Expected during shutdown, don't log as error
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in progress persistence loop");
}
}
}
private async Task HandleJobFailure(
Job job,
Exception ex,
IJobRepository jobRepository,
IServiceProvider serviceProvider)
{
try
{
// Categorize the failure
var failureCategory = CategorizeFailure(ex);
var isRetryable = IsRetryableFailure(ex, failureCategory);
// Check if we should retry
if (isRetryable && job.RetryCount < job.MaxRetries)
{
// Calculate exponential backoff: 2^retryCount minutes, capped at MaxRetryDelayMinutes
var backoffMinutes = Math.Min(Math.Pow(2, job.RetryCount), _options.MaxRetryDelayMinutes);
job.RetryAfter = DateTime.UtcNow.AddMinutes(backoffMinutes);
job.RetryCount++;
job.Status = JobStatus.Pending; // Reset to pending for retry
job.AssignedWorkerId = null; // Allow any worker to pick it up
job.ErrorMessage = $"Retry {job.RetryCount}/{job.MaxRetries}: {ex.Message}";
job.FailureCategory = failureCategory;
job.IsRetryable = true;
_logger.LogWarning(
"Job {JobId} will be retried (attempt {RetryCount}/{MaxRetries}) after {RetryAfter}. Error: {Error}",
job.Id, job.RetryCount, job.MaxRetries, job.RetryAfter, ex.Message);
}
else
{
// Permanent failure - mark as failed
job.Status = JobStatus.Failed;
job.ErrorMessage = ex.Message;
job.FailureCategory = failureCategory;
job.IsRetryable = false;
job.CompletedAt = DateTime.UtcNow;
_logger.LogError(
"Job {JobId} failed permanently after {RetryCount} retries. Error: {Error}",
job.Id, job.RetryCount, ex.Message);
// Send notification for permanent failure
await NotifyPermanentFailure(job, ex, serviceProvider);
// Update bundle request if this is part of a bundle
if (job.BundleRequestId.HasValue)
{
await UpdateBundleRequestProgress(job.BundleRequestId.Value, serviceProvider, job);
}
}
job.LastHeartbeat = DateTime.UtcNow;
await jobRepository.UpdateAsync(job);
}
catch (Exception updateEx)
{
_logger.LogError(updateEx, "Failed to update job {JobId} status after failure", job.Id);
}
}
private FailureCategory CategorizeFailure(Exception ex)
{
return ex switch
{
TimeoutException => FailureCategory.Transient,
TaskCanceledException => FailureCategory.Transient,
HttpRequestException => FailureCategory.Transient,
InvalidOperationException when ex.Message.Contains("candles") || ex.Message.Contains("No candles") => FailureCategory.DataError,
InvalidOperationException when ex.Message.Contains("User") || ex.Message.Contains("not found") => FailureCategory.UserError,
OutOfMemoryException => FailureCategory.SystemError,
_ => FailureCategory.Unknown
};
}
private bool IsRetryableFailure(Exception ex, FailureCategory category)
{
// Don't retry user errors or data errors (missing candles, invalid config)
if (category == FailureCategory.UserError || category == FailureCategory.DataError)
return false;
// Retry transient and system errors
return category == FailureCategory.Transient || category == FailureCategory.SystemError;
}
private async Task AutoCompleteStuckJobAsync(Job stuckJob, IJobRepository jobRepository, IServiceProvider serviceProvider)
{
try
{
_logger.LogWarning(
"🔧 Job {JobId} stuck at 100% progress in Running status since {StartedAt}. Marking as completed.",
stuckJob.Id, stuckJob.StartedAt);
stuckJob.Status = JobStatus.Completed;
stuckJob.CompletedAt = stuckJob.CompletedAt ?? DateTime.UtcNow;
stuckJob.LastHeartbeat = DateTime.UtcNow;
// Add note to error message if not already set
if (string.IsNullOrEmpty(stuckJob.ErrorMessage))
{
stuckJob.ErrorMessage = "Job completed but status was not updated (auto-recovered)";
}
await jobRepository.UpdateAsync(stuckJob);
// Clean up progress tracker if still present
_jobProgressTrackers.TryRemove(stuckJob.Id, out _);
_runningJobTasks.TryRemove(stuckJob.Id, out _);
// Update bundle request if this is part of a bundle
if (stuckJob.BundleRequestId.HasValue)
{
try
{
await UpdateBundleRequestProgress(stuckJob.BundleRequestId.Value, serviceProvider, stuckJob);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating bundle request progress for stuck job {JobId}", stuckJob.Id);
}
}
_logger.LogInformation(
"✅ Successfully auto-completed stuck job {JobId}. Worker can now claim new jobs.",
stuckJob.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error auto-completing stuck job {JobId}", stuckJob.Id);
}
}
private async Task HandleLongRunningJobAsync(Job longJob, IJobRepository jobRepository, IServiceProvider serviceProvider)
{
try
{
var elapsed = longJob.StartedAt.HasValue
? (DateTime.UtcNow - longJob.StartedAt.Value).TotalMinutes
: 0;
_logger.LogWarning(
"🔧 Job {JobId} has been running for {Elapsed:F1} minutes (timeout: {_options.JobTimeoutMinutes}). Failing job.",
longJob.Id, elapsed, _options.JobTimeoutMinutes);
// Mark as failed
longJob.Status = JobStatus.Failed;
longJob.ErrorMessage = $"Job exceeded maximum runtime of {_options.JobTimeoutMinutes} minutes";
longJob.FailureCategory = FailureCategory.SystemError;
longJob.IsRetryable = false;
longJob.CompletedAt = DateTime.UtcNow;
longJob.AssignedWorkerId = null;
longJob.LastHeartbeat = DateTime.UtcNow;
await jobRepository.UpdateAsync(longJob);
// Clean up
_jobProgressTrackers.TryRemove(longJob.Id, out _);
_runningJobTasks.TryRemove(longJob.Id, out _);
// Update bundle request if this is part of a bundle
if (longJob.BundleRequestId.HasValue)
{
try
{
await UpdateBundleRequestProgress(longJob.BundleRequestId.Value, serviceProvider, longJob);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error updating bundle request progress for long-running job {JobId}", longJob.Id);
}
}
// Notify about permanent failure
await NotifyPermanentFailure(longJob, new TimeoutException($"Job exceeded {elapsed:F1} minutes"), serviceProvider);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling long-running job {JobId}", longJob.Id);
}
}
private async Task NotifyPermanentFailure(
Job job,
Exception ex,
IServiceProvider serviceProvider)
{
try
{
var webhookService = serviceProvider.GetRequiredService();
const string alertsChannel = "2676086723";
var jobTypeName = job.JobType == JobType.Genetic ? "Genetic" : "Backtest";
var message = $"🚨 **{jobTypeName} Job Failed Permanently**\n" +
$"Job ID: `{job.Id}`\n" +
$"User ID: {job.UserId}\n" +
$"Retry Attempts: {job.RetryCount}/{job.MaxRetries}\n" +
$"Failure Category: {job.FailureCategory}\n" +
$"Error: {ex.Message}\n" +
$"Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC";
await webhookService.SendMessage(message, alertsChannel);
}
catch (Exception notifyEx)
{
_logger.LogError(notifyEx, "Failed to send permanent failure notification for job {JobId}", job.Id);
}
}
public override void Dispose()
{
_shutdownCts?.Cancel();
_shutdownCts?.Dispose();
_instanceSemaphore?.Dispose();
base.Dispose();
}
}
///
/// Tracks job progress with batched database updates for performance optimization
///
public class JobProgressTracker
{
private readonly object _lock = new();
private int _lastPersistedPercentage;
private DateTime _lastPersistedTime;
private readonly ILogger _logger;
public Guid JobId { get; }
public int CurrentPercentage { get; private set; }
public DateTime LastUpdateTime { get; private set; }
public JobProgressTracker(Guid jobId, ILogger logger)
{
JobId = jobId;
_logger = logger;
_lastPersistedTime = DateTime.UtcNow;
}
///
/// Updates progress in memory only - thread safe
///
public void UpdateProgress(int percentage)
{
lock (_lock)
{
CurrentPercentage = percentage;
LastUpdateTime = DateTime.UtcNow;
}
}
///
/// Checks if progress should be persisted to database based on time/percentage thresholds
///
public bool ShouldPersist(int progressUpdateIntervalMs = 5000, int percentageThreshold = 5)
{
lock (_lock)
{
var timeSinceLastPersist = (DateTime.UtcNow - _lastPersistedTime).TotalMilliseconds;
var percentageSinceLastPersist = CurrentPercentage - _lastPersistedPercentage;
return timeSinceLastPersist >= progressUpdateIntervalMs ||
percentageSinceLastPersist >= percentageThreshold ||
CurrentPercentage >= 100; // Always persist completion
}
}
///
/// Gets current progress and marks as persisted
///
public (int percentage, DateTime lastUpdate) GetProgressForPersistence()
{
lock (_lock)
{
var percentage = CurrentPercentage;
var lastUpdate = LastUpdateTime;
_lastPersistedPercentage = percentage;
_lastPersistedTime = DateTime.UtcNow;
return (percentage, lastUpdate);
}
}
}
///
/// Configuration options for BacktestComputeWorker
///
public class BacktestComputeWorkerOptions
{
public const string SectionName = "BacktestComputeWorker";
///
/// Unique identifier for this worker instance
///
public string WorkerId { get; set; } = Environment.MachineName;
///
/// Maximum number of concurrent backtests per user (global limit across all workers)
///
public int MaxConcurrentPerUser { get; set; } = 6;
///
/// Maximum number of concurrent backtests per worker instance (local limit for this worker)
///
public int MaxConcurrentPerInstance { get; set; } = 6;
///
/// Interval in seconds between job polling attempts
///
public int JobPollIntervalSeconds { get; set; } = 5;
///
/// Interval in seconds between heartbeat updates
///
public int HeartbeatIntervalSeconds { get; set; } = 30;
///
/// Timeout in minutes for considering a job stale
///
public int StaleJobTimeoutMinutes { get; set; } = 5;
///
/// Default maximum retry attempts for failed jobs
///
public int DefaultMaxRetries { get; set; } = 3;
///
/// Maximum retry delay in minutes (cap for exponential backoff)
///
public int MaxRetryDelayMinutes { get; set; } = 60;
///
/// Maximum time in minutes a job can run before being considered timed out
///
public int JobTimeoutMinutes { get; set; } = 60;
///
/// Timeout in minutes to wait for running jobs during graceful shutdown
///
public int GracefulShutdownTimeoutMinutes { get; set; } = 5;
}