diff --git a/src/Managing.Application.Abstractions/Repositories/IJobRepository.cs b/src/Managing.Application.Abstractions/Repositories/IJobRepository.cs index 7a92d90f..65ee8abf 100644 --- a/src/Managing.Application.Abstractions/Repositories/IJobRepository.cs +++ b/src/Managing.Application.Abstractions/Repositories/IJobRepository.cs @@ -52,6 +52,12 @@ public interface IJobRepository /// Task ResetStaleJobsAsync(int timeoutMinutes = 5); + /// + /// Resets all jobs (Running or Pending) assigned to a specific worker back to Pending status. + /// Used when a worker restarts to clear jobs from the previous instance. + /// + Task ResetJobsByWorkerIdAsync(string workerId); + /// /// Gets all running jobs assigned to a specific worker /// diff --git a/src/Managing.Application/Workers/BacktestComputeWorker.cs b/src/Managing.Application/Workers/BacktestComputeWorker.cs index 7eaf311a..e1b8eba1 100644 --- a/src/Managing.Application/Workers/BacktestComputeWorker.cs +++ b/src/Managing.Application/Workers/BacktestComputeWorker.cs @@ -42,9 +42,37 @@ public class BacktestComputeWorker : BackgroundService { _logger.LogInformation( "BacktestComputeWorker starting. WorkerId: {WorkerId}, MaxConcurrentPerUser: {MaxConcurrentPerUser}, MaxConcurrentPerInstance: {MaxConcurrentPerInstance}, PollInterval: {PollInterval}s, JobTimeout: {JobTimeoutMinutes}min", - _options.WorkerId, _options.MaxConcurrentPerUser, _options.MaxConcurrentPerInstance, + _options.WorkerId, _options.MaxConcurrentPerUser, _options.MaxConcurrentPerInstance, _options.JobPollIntervalSeconds, _options.JobTimeoutMinutes); + // Reset any jobs assigned to this WorkerId from previous worker instances at startup + // This is critical when restarting with the same WorkerId (e.g., Environment.MachineName) + try + { + using var scope = _scopeFactory.CreateScope(); + var jobRepository = scope.ServiceProvider.GetRequiredService(); + + // First, reset all jobs assigned to this WorkerId (from previous instance) + var workerResetCount = await jobRepository.ResetJobsByWorkerIdAsync(_options.WorkerId); + if (workerResetCount > 0) + { + _logger.LogInformation("Reset {Count} jobs assigned to worker {WorkerId} from previous instance", + workerResetCount, _options.WorkerId); + } + + // Then, reset any other stale jobs (from other workers or orphaned jobs) + var staleResetCount = await jobRepository.ResetStaleJobsAsync(_options.StaleJobTimeoutMinutes); + if (staleResetCount > 0) + { + _logger.LogInformation("Reset {Count} stale jobs to Pending status at startup", staleResetCount); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error resetting jobs at startup"); + // Don't fail startup if this fails, but log it + } + // Link cancellation tokens using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, _shutdownCts.Token); var cancellationToken = linkedCts.Token; diff --git a/src/Managing.Application/Workers/GeneticComputeWorker.cs b/src/Managing.Application/Workers/GeneticComputeWorker.cs index beeed098..8e20fdf2 100644 --- a/src/Managing.Application/Workers/GeneticComputeWorker.cs +++ b/src/Managing.Application/Workers/GeneticComputeWorker.cs @@ -39,6 +39,34 @@ public class GeneticComputeWorker : BackgroundService "GeneticComputeWorker starting. WorkerId: {WorkerId}, MaxConcurrent: {MaxConcurrent}, PollInterval: {PollInterval}s", _options.WorkerId, _options.MaxConcurrentGenetics, _options.JobPollIntervalSeconds); + // Reset any jobs assigned to this WorkerId from previous worker instances at startup + // This is critical when restarting with the same WorkerId (e.g., Environment.MachineName) + try + { + using var scope = _scopeFactory.CreateScope(); + var jobRepository = scope.ServiceProvider.GetRequiredService(); + + // First, reset all jobs assigned to this WorkerId (from previous instance) + var workerResetCount = await jobRepository.ResetJobsByWorkerIdAsync(_options.WorkerId); + if (workerResetCount > 0) + { + _logger.LogInformation("Reset {Count} jobs assigned to worker {WorkerId} from previous instance", + workerResetCount, _options.WorkerId); + } + + // Then, reset any other stale jobs (from other workers or orphaned jobs) + var staleResetCount = await jobRepository.ResetStaleJobsAsync(_options.StaleJobTimeoutMinutes); + if (staleResetCount > 0) + { + _logger.LogInformation("Reset {Count} stale jobs to Pending status at startup", staleResetCount); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error resetting jobs at startup"); + // Don't fail startup if this fails, but log it + } + // Background task for stale job recovery var staleJobRecoveryTask = Task.Run(() => StaleJobRecoveryLoop(stoppingToken), stoppingToken); diff --git a/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlJobRepository.cs b/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlJobRepository.cs index 383c6640..ebfb76c9 100644 --- a/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlJobRepository.cs +++ b/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlJobRepository.cs @@ -52,11 +52,13 @@ public class PostgreSqlJobRepository : IJobRepository ""GeneticRequestId"", ""RetryCount"", ""MaxRetries"", ""RetryAfter"", ""IsRetryable"", ""FailureCategory"" FROM ""Jobs"" - WHERE ""Status"" = @status"; + WHERE ""Status"" = @status + AND (""RetryAfter"" IS NULL OR ""RetryAfter"" <= @now)"; var parameters = new List { - new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending } + new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending }, + new NpgsqlParameter("now", NpgsqlDbType.TimestampTz) { Value = DateTime.UtcNow } }; if (jobType.HasValue) @@ -255,6 +257,7 @@ public class PostgreSqlJobRepository : IJobRepository FROM ""Jobs"" j WHERE j.""Status"" = @status AND j.""JobType"" = @jobType + AND (j.""RetryAfter"" IS NULL OR j.""RetryAfter"" <= @now) AND ( SELECT COUNT(*) FROM ""Jobs"" running @@ -271,7 +274,8 @@ public class PostgreSqlJobRepository : IJobRepository new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending }, new NpgsqlParameter("jobType", NpgsqlDbType.Integer) { Value = (int)jobType }, new NpgsqlParameter("runningStatus", NpgsqlDbType.Integer) { Value = (int)JobStatus.Running }, - new NpgsqlParameter("maxConcurrentPerUser", NpgsqlDbType.Integer) { Value = maxConcurrentPerUser } + new NpgsqlParameter("maxConcurrentPerUser", NpgsqlDbType.Integer) { Value = maxConcurrentPerUser }, + new NpgsqlParameter("now", NpgsqlDbType.TimestampTz) { Value = DateTime.UtcNow } }; _logger.LogDebug("Claiming random job atomically (maxConcurrentPerUser: {MaxConcurrent})", maxConcurrentPerUser); @@ -453,8 +457,14 @@ public class PostgreSqlJobRepository : IJobRepository // Use AsTracking() to enable change tracking since DbContext uses NoTracking by default var staleJobs = await _context.Jobs .AsTracking() - .Where(j => j.Status == (int)JobStatus.Running && - (j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold)) + .Where(j => + // Running jobs with stale or missing heartbeats + (j.Status == (int)JobStatus.Running && + (j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold)) || + // Pending jobs that were assigned to a worker but never started (stale assignment) + (j.Status == (int)JobStatus.Pending && + j.AssignedWorkerId != null && + (j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold))) .ToListAsync(); foreach (var job in staleJobs) @@ -474,6 +484,33 @@ public class PostgreSqlJobRepository : IJobRepository return count; } + public async Task ResetJobsByWorkerIdAsync(string workerId) + { + // Use AsTracking() to enable change tracking since DbContext uses NoTracking by default + // Reset ALL jobs (Running or Pending) assigned to this worker - they belong to a previous instance + var jobsToReset = await _context.Jobs + .AsTracking() + .Where(j => j.AssignedWorkerId == workerId && + (j.Status == (int)JobStatus.Running || j.Status == (int)JobStatus.Pending)) + .ToListAsync(); + + foreach (var job in jobsToReset) + { + job.Status = (int)JobStatus.Pending; + job.AssignedWorkerId = null; + job.LastHeartbeat = null; + } + + var count = jobsToReset.Count; + if (count > 0) + { + await _context.SaveChangesAsync(); + _logger.LogInformation("Reset {Count} jobs assigned to worker {WorkerId} back to Pending status", count, workerId); + } + + return count; + } + public async Task GetSummaryAsync() { // Use ADO.NET directly for aggregation queries to avoid EF Core mapping issues