Update and fix worker

This commit is contained in:
2025-11-11 03:02:24 +07:00
parent e8e2ec5a43
commit 4a8c22e52a
4 changed files with 105 additions and 6 deletions

View File

@@ -52,11 +52,13 @@ public class PostgreSqlJobRepository : IJobRepository
""GeneticRequestId"", ""RetryCount"", ""MaxRetries"", ""RetryAfter"",
""IsRetryable"", ""FailureCategory""
FROM ""Jobs""
WHERE ""Status"" = @status";
WHERE ""Status"" = @status
AND (""RetryAfter"" IS NULL OR ""RetryAfter"" <= @now)";
var parameters = new List<NpgsqlParameter>
{
new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending }
new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending },
new NpgsqlParameter("now", NpgsqlDbType.TimestampTz) { Value = DateTime.UtcNow }
};
if (jobType.HasValue)
@@ -255,6 +257,7 @@ public class PostgreSqlJobRepository : IJobRepository
FROM ""Jobs"" j
WHERE j.""Status"" = @status
AND j.""JobType"" = @jobType
AND (j.""RetryAfter"" IS NULL OR j.""RetryAfter"" <= @now)
AND (
SELECT COUNT(*)
FROM ""Jobs"" running
@@ -271,7 +274,8 @@ public class PostgreSqlJobRepository : IJobRepository
new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending },
new NpgsqlParameter("jobType", NpgsqlDbType.Integer) { Value = (int)jobType },
new NpgsqlParameter("runningStatus", NpgsqlDbType.Integer) { Value = (int)JobStatus.Running },
new NpgsqlParameter("maxConcurrentPerUser", NpgsqlDbType.Integer) { Value = maxConcurrentPerUser }
new NpgsqlParameter("maxConcurrentPerUser", NpgsqlDbType.Integer) { Value = maxConcurrentPerUser },
new NpgsqlParameter("now", NpgsqlDbType.TimestampTz) { Value = DateTime.UtcNow }
};
_logger.LogDebug("Claiming random job atomically (maxConcurrentPerUser: {MaxConcurrent})", maxConcurrentPerUser);
@@ -453,8 +457,14 @@ public class PostgreSqlJobRepository : IJobRepository
// Use AsTracking() to enable change tracking since DbContext uses NoTracking by default
var staleJobs = await _context.Jobs
.AsTracking()
.Where(j => j.Status == (int)JobStatus.Running &&
(j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold))
.Where(j =>
// Running jobs with stale or missing heartbeats
(j.Status == (int)JobStatus.Running &&
(j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold)) ||
// Pending jobs that were assigned to a worker but never started (stale assignment)
(j.Status == (int)JobStatus.Pending &&
j.AssignedWorkerId != null &&
(j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold)))
.ToListAsync();
foreach (var job in staleJobs)
@@ -474,6 +484,33 @@ public class PostgreSqlJobRepository : IJobRepository
return count;
}
public async Task<int> ResetJobsByWorkerIdAsync(string workerId)
{
// Use AsTracking() to enable change tracking since DbContext uses NoTracking by default
// Reset ALL jobs (Running or Pending) assigned to this worker - they belong to a previous instance
var jobsToReset = await _context.Jobs
.AsTracking()
.Where(j => j.AssignedWorkerId == workerId &&
(j.Status == (int)JobStatus.Running || j.Status == (int)JobStatus.Pending))
.ToListAsync();
foreach (var job in jobsToReset)
{
job.Status = (int)JobStatus.Pending;
job.AssignedWorkerId = null;
job.LastHeartbeat = null;
}
var count = jobsToReset.Count;
if (count > 0)
{
await _context.SaveChangesAsync();
_logger.LogInformation("Reset {Count} jobs assigned to worker {WorkerId} back to Pending status", count, workerId);
}
return count;
}
public async Task<JobSummary> GetSummaryAsync()
{
// Use ADO.NET directly for aggregation queries to avoid EF Core mapping issues