Update jobs
This commit is contained in:
@@ -2,7 +2,10 @@ using Managing.Application.Abstractions.Repositories;
|
||||
using Managing.Domain.Backtests;
|
||||
using Managing.Infrastructure.Databases.PostgreSql.Entities;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Storage;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Npgsql;
|
||||
using NpgsqlTypes;
|
||||
using static Managing.Common.Enums;
|
||||
|
||||
namespace Managing.Infrastructure.Databases.PostgreSql;
|
||||
@@ -32,43 +35,96 @@ public class PostgreSqlJobRepository : IJobRepository
|
||||
{
|
||||
// Use execution strategy to support retry with transactions
|
||||
var strategy = _context.Database.CreateExecutionStrategy();
|
||||
|
||||
|
||||
return await strategy.ExecuteAsync(async () =>
|
||||
{
|
||||
await using var transaction = await _context.Database.BeginTransactionAsync();
|
||||
|
||||
|
||||
try
|
||||
{
|
||||
// Build SQL query with optional job type filter
|
||||
// Use raw ADO.NET to avoid EF Core wrapping the query (which breaks FOR UPDATE SKIP LOCKED)
|
||||
var sql = @"
|
||||
SELECT * FROM ""Jobs""
|
||||
WHERE ""Status"" = {0}";
|
||||
|
||||
var parameters = new List<object> { (int)JobStatus.Pending };
|
||||
|
||||
SELECT ""Id"", ""BundleRequestId"", ""UserId"", ""Status"", ""JobType"", ""Priority"",
|
||||
""ConfigJson"", ""StartDate"", ""EndDate"", ""ProgressPercentage"",
|
||||
""AssignedWorkerId"", ""LastHeartbeat"", ""CreatedAt"", ""StartedAt"",
|
||||
""CompletedAt"", ""ResultJson"", ""ErrorMessage"", ""RequestId"",
|
||||
""GeneticRequestId"", ""RetryCount"", ""MaxRetries"", ""RetryAfter"",
|
||||
""IsRetryable"", ""FailureCategory""
|
||||
FROM ""Jobs""
|
||||
WHERE ""Status"" = @status";
|
||||
|
||||
var parameters = new List<NpgsqlParameter>
|
||||
{
|
||||
new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending }
|
||||
};
|
||||
|
||||
if (jobType.HasValue)
|
||||
{
|
||||
sql += @" AND ""JobType"" = {1}";
|
||||
parameters.Add((int)jobType.Value);
|
||||
sql += @" AND ""JobType"" = @jobType";
|
||||
parameters.Add(new NpgsqlParameter("jobType", NpgsqlDbType.Integer) { Value = (int)jobType.Value });
|
||||
}
|
||||
|
||||
|
||||
sql += @"
|
||||
ORDER BY ""Priority"" DESC, ""CreatedAt"" ASC
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED";
|
||||
|
||||
_logger.LogDebug("Claiming job with SQL: {Sql}, Parameters: Status={Status}, JobType={JobType}",
|
||||
sql, (int)JobStatus.Pending, jobType.HasValue ? (int)jobType.Value : (int?)null);
|
||||
|
||||
// Execute raw SQL using ADO.NET to get the job with row-level locking
|
||||
var connection = _context.Database.GetDbConnection();
|
||||
await using var command = connection.CreateCommand();
|
||||
command.Transaction = transaction.GetDbTransaction();
|
||||
command.CommandText = sql;
|
||||
command.Parameters.AddRange(parameters.ToArray());
|
||||
|
||||
JobEntity? job = null;
|
||||
await using var reader = await command.ExecuteReaderAsync();
|
||||
|
||||
// Use raw SQL with FromSqlRaw to get the next job with row-level locking
|
||||
var job = await _context.Jobs
|
||||
.FromSqlRaw(sql, parameters.ToArray())
|
||||
.FirstOrDefaultAsync();
|
||||
if (await reader.ReadAsync())
|
||||
{
|
||||
job = new JobEntity
|
||||
{
|
||||
Id = reader.GetGuid(reader.GetOrdinal("Id")),
|
||||
BundleRequestId = reader.IsDBNull(reader.GetOrdinal("BundleRequestId")) ? null : reader.GetGuid(reader.GetOrdinal("BundleRequestId")),
|
||||
UserId = reader.GetInt32(reader.GetOrdinal("UserId")),
|
||||
Status = reader.GetInt32(reader.GetOrdinal("Status")),
|
||||
JobType = reader.GetInt32(reader.GetOrdinal("JobType")),
|
||||
Priority = reader.GetInt32(reader.GetOrdinal("Priority")),
|
||||
ConfigJson = reader.GetString(reader.GetOrdinal("ConfigJson")),
|
||||
StartDate = reader.GetDateTime(reader.GetOrdinal("StartDate")),
|
||||
EndDate = reader.GetDateTime(reader.GetOrdinal("EndDate")),
|
||||
ProgressPercentage = reader.GetInt32(reader.GetOrdinal("ProgressPercentage")),
|
||||
AssignedWorkerId = reader.IsDBNull(reader.GetOrdinal("AssignedWorkerId")) ? null : reader.GetString(reader.GetOrdinal("AssignedWorkerId")),
|
||||
LastHeartbeat = reader.IsDBNull(reader.GetOrdinal("LastHeartbeat")) ? null : reader.GetDateTime(reader.GetOrdinal("LastHeartbeat")),
|
||||
CreatedAt = reader.GetDateTime(reader.GetOrdinal("CreatedAt")),
|
||||
StartedAt = reader.IsDBNull(reader.GetOrdinal("StartedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("StartedAt")),
|
||||
CompletedAt = reader.IsDBNull(reader.GetOrdinal("CompletedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("CompletedAt")),
|
||||
ResultJson = reader.IsDBNull(reader.GetOrdinal("ResultJson")) ? null : reader.GetString(reader.GetOrdinal("ResultJson")),
|
||||
ErrorMessage = reader.IsDBNull(reader.GetOrdinal("ErrorMessage")) ? null : reader.GetString(reader.GetOrdinal("ErrorMessage")),
|
||||
RequestId = reader.IsDBNull(reader.GetOrdinal("RequestId")) ? null : reader.GetString(reader.GetOrdinal("RequestId")),
|
||||
GeneticRequestId = reader.IsDBNull(reader.GetOrdinal("GeneticRequestId")) ? null : reader.GetString(reader.GetOrdinal("GeneticRequestId")),
|
||||
RetryCount = reader.GetInt32(reader.GetOrdinal("RetryCount")),
|
||||
MaxRetries = reader.GetInt32(reader.GetOrdinal("MaxRetries")),
|
||||
RetryAfter = reader.IsDBNull(reader.GetOrdinal("RetryAfter")) ? null : reader.GetDateTime(reader.GetOrdinal("RetryAfter")),
|
||||
IsRetryable = reader.GetBoolean(reader.GetOrdinal("IsRetryable")),
|
||||
FailureCategory = reader.IsDBNull(reader.GetOrdinal("FailureCategory")) ? null : reader.GetInt32(reader.GetOrdinal("FailureCategory"))
|
||||
};
|
||||
}
|
||||
|
||||
await reader.CloseAsync();
|
||||
|
||||
if (job == null)
|
||||
{
|
||||
await transaction.RollbackAsync();
|
||||
_logger.LogDebug("No job found to claim for worker {WorkerId}", workerId);
|
||||
await transaction.CommitAsync();
|
||||
return null;
|
||||
}
|
||||
|
||||
// Update the job status atomically
|
||||
// Attach and update the job entity
|
||||
_context.Jobs.Attach(job);
|
||||
job.Status = (int)JobStatus.Running;
|
||||
job.AssignedWorkerId = workerId;
|
||||
job.StartedAt = DateTime.UtcNow;
|
||||
@@ -77,6 +133,7 @@ public class PostgreSqlJobRepository : IJobRepository
|
||||
await _context.SaveChangesAsync();
|
||||
await transaction.CommitAsync();
|
||||
|
||||
_logger.LogInformation("Claimed job {JobId} for worker {WorkerId}", job.Id, workerId);
|
||||
return MapToDomain(job);
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -250,10 +307,10 @@ public class PostgreSqlJobRepository : IJobRepository
|
||||
public async Task<IEnumerable<Job>> GetStaleJobsAsync(int timeoutMinutes = 5)
|
||||
{
|
||||
var timeoutThreshold = DateTime.UtcNow.AddMinutes(-timeoutMinutes);
|
||||
|
||||
|
||||
var entities = await _context.Jobs
|
||||
.Where(j => j.Status == (int)JobStatus.Running &&
|
||||
(j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold))
|
||||
(j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold))
|
||||
.ToListAsync();
|
||||
|
||||
return entities.Select(MapToDomain);
|
||||
@@ -262,12 +319,12 @@ public class PostgreSqlJobRepository : IJobRepository
|
||||
public async Task<int> ResetStaleJobsAsync(int timeoutMinutes = 5)
|
||||
{
|
||||
var timeoutThreshold = DateTime.UtcNow.AddMinutes(-timeoutMinutes);
|
||||
|
||||
|
||||
// Use AsTracking() to enable change tracking since DbContext uses NoTracking by default
|
||||
var staleJobs = await _context.Jobs
|
||||
.AsTracking()
|
||||
.Where(j => j.Status == (int)JobStatus.Running &&
|
||||
(j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold))
|
||||
(j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold))
|
||||
.ToListAsync();
|
||||
|
||||
foreach (var job in staleJobs)
|
||||
@@ -495,5 +552,4 @@ public class PostgreSqlJobRepository : IJobRepository
|
||||
FailureCategory = entity.FailureCategory.HasValue ? (FailureCategory)entity.FailureCategory.Value : null
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user