Files
managing-apps/src/Managing.Infrastructure.Database/PostgreSql/PostgreSqlJobRepository.cs
2025-11-11 03:02:24 +07:00

741 lines
31 KiB
C#

using Managing.Application.Abstractions.Repositories;
using Managing.Domain.Backtests;
using Managing.Infrastructure.Databases.PostgreSql.Entities;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;
using Npgsql;
using NpgsqlTypes;
using static Managing.Common.Enums;
namespace Managing.Infrastructure.Databases.PostgreSql;
public class PostgreSqlJobRepository : IJobRepository
{
private readonly ManagingDbContext _context;
private readonly ILogger<PostgreSqlJobRepository> _logger;
public PostgreSqlJobRepository(
ManagingDbContext context,
ILogger<PostgreSqlJobRepository> logger)
{
_context = context;
_logger = logger;
}
public async Task<Job> CreateAsync(Job job)
{
var entity = MapToEntity(job);
_context.Jobs.Add(entity);
await _context.SaveChangesAsync();
return MapToDomain(entity);
}
public async Task<Job?> ClaimNextJobAsync(string workerId, JobType? jobType = null)
{
// Use execution strategy to support retry with transactions
var strategy = _context.Database.CreateExecutionStrategy();
return await strategy.ExecuteAsync(async () =>
{
await using var transaction = await _context.Database.BeginTransactionAsync();
try
{
// Build SQL query with optional job type filter
// Use raw ADO.NET to avoid EF Core wrapping the query (which breaks FOR UPDATE SKIP LOCKED)
var sql = @"
SELECT ""Id"", ""BundleRequestId"", ""UserId"", ""Status"", ""JobType"", ""Priority"",
""ConfigJson"", ""StartDate"", ""EndDate"", ""ProgressPercentage"",
""AssignedWorkerId"", ""LastHeartbeat"", ""CreatedAt"", ""StartedAt"",
""CompletedAt"", ""ResultJson"", ""ErrorMessage"", ""RequestId"",
""GeneticRequestId"", ""RetryCount"", ""MaxRetries"", ""RetryAfter"",
""IsRetryable"", ""FailureCategory""
FROM ""Jobs""
WHERE ""Status"" = @status
AND (""RetryAfter"" IS NULL OR ""RetryAfter"" <= @now)";
var parameters = new List<NpgsqlParameter>
{
new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending },
new NpgsqlParameter("now", NpgsqlDbType.TimestampTz) { Value = DateTime.UtcNow }
};
if (jobType.HasValue)
{
sql += @" AND ""JobType"" = @jobType";
parameters.Add(new NpgsqlParameter("jobType", NpgsqlDbType.Integer) { Value = (int)jobType.Value });
}
sql += @"
ORDER BY ""Priority"" DESC, ""CreatedAt"" ASC
LIMIT 1
FOR UPDATE SKIP LOCKED";
_logger.LogDebug("Claiming job with SQL: {Sql}, Parameters: Status={Status}, JobType={JobType}",
sql, (int)JobStatus.Pending, jobType.HasValue ? (int)jobType.Value : (int?)null);
// Execute raw SQL using ADO.NET to get the job with row-level locking
var connection = _context.Database.GetDbConnection();
await using var command = connection.CreateCommand();
command.Transaction = transaction.GetDbTransaction();
command.CommandText = sql;
command.Parameters.AddRange(parameters.ToArray());
JobEntity? job = null;
await using var reader = await command.ExecuteReaderAsync();
if (await reader.ReadAsync())
{
job = new JobEntity
{
Id = reader.GetGuid(reader.GetOrdinal("Id")),
BundleRequestId = reader.IsDBNull(reader.GetOrdinal("BundleRequestId")) ? null : reader.GetGuid(reader.GetOrdinal("BundleRequestId")),
UserId = reader.GetInt32(reader.GetOrdinal("UserId")),
Status = reader.GetInt32(reader.GetOrdinal("Status")),
JobType = reader.GetInt32(reader.GetOrdinal("JobType")),
Priority = reader.GetInt32(reader.GetOrdinal("Priority")),
ConfigJson = reader.GetString(reader.GetOrdinal("ConfigJson")),
StartDate = reader.GetDateTime(reader.GetOrdinal("StartDate")),
EndDate = reader.GetDateTime(reader.GetOrdinal("EndDate")),
ProgressPercentage = reader.GetInt32(reader.GetOrdinal("ProgressPercentage")),
AssignedWorkerId = reader.IsDBNull(reader.GetOrdinal("AssignedWorkerId")) ? null : reader.GetString(reader.GetOrdinal("AssignedWorkerId")),
LastHeartbeat = reader.IsDBNull(reader.GetOrdinal("LastHeartbeat")) ? null : reader.GetDateTime(reader.GetOrdinal("LastHeartbeat")),
CreatedAt = reader.GetDateTime(reader.GetOrdinal("CreatedAt")),
StartedAt = reader.IsDBNull(reader.GetOrdinal("StartedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("StartedAt")),
CompletedAt = reader.IsDBNull(reader.GetOrdinal("CompletedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("CompletedAt")),
ResultJson = reader.IsDBNull(reader.GetOrdinal("ResultJson")) ? null : reader.GetString(reader.GetOrdinal("ResultJson")),
ErrorMessage = reader.IsDBNull(reader.GetOrdinal("ErrorMessage")) ? null : reader.GetString(reader.GetOrdinal("ErrorMessage")),
RequestId = reader.IsDBNull(reader.GetOrdinal("RequestId")) ? null : reader.GetString(reader.GetOrdinal("RequestId")),
GeneticRequestId = reader.IsDBNull(reader.GetOrdinal("GeneticRequestId")) ? null : reader.GetString(reader.GetOrdinal("GeneticRequestId")),
RetryCount = reader.GetInt32(reader.GetOrdinal("RetryCount")),
MaxRetries = reader.GetInt32(reader.GetOrdinal("MaxRetries")),
RetryAfter = reader.IsDBNull(reader.GetOrdinal("RetryAfter")) ? null : reader.GetDateTime(reader.GetOrdinal("RetryAfter")),
IsRetryable = reader.GetBoolean(reader.GetOrdinal("IsRetryable")),
FailureCategory = reader.IsDBNull(reader.GetOrdinal("FailureCategory")) ? null : reader.GetInt32(reader.GetOrdinal("FailureCategory"))
};
}
await reader.CloseAsync();
if (job == null)
{
_logger.LogDebug("No job found to claim for worker {WorkerId}", workerId);
await transaction.CommitAsync();
return null;
}
// Attach and update the job entity
_context.Jobs.Attach(job);
job.Status = (int)JobStatus.Running;
job.AssignedWorkerId = workerId;
job.StartedAt = DateTime.UtcNow;
job.LastHeartbeat = DateTime.UtcNow;
await _context.SaveChangesAsync();
await transaction.CommitAsync();
_logger.LogInformation("Claimed job {JobId} for worker {WorkerId}", job.Id, workerId);
return MapToDomain(job);
}
catch (Exception ex)
{
await transaction.RollbackAsync();
_logger.LogError(ex, "Error claiming job for worker {WorkerId}", workerId);
throw;
}
});
}
public async Task UpdateAsync(Job job)
{
// Use AsTracking() to enable change tracking since DbContext uses NoTracking by default
var entity = await _context.Jobs
.AsTracking()
.FirstOrDefaultAsync(e => e.Id == job.Id);
if (entity == null)
{
_logger.LogWarning("Job {JobId} not found for update", job.Id);
return;
}
// Update entity properties
entity.Status = (int)job.Status;
entity.JobType = (int)job.JobType;
entity.ProgressPercentage = job.ProgressPercentage;
entity.AssignedWorkerId = job.AssignedWorkerId;
entity.LastHeartbeat = job.LastHeartbeat;
entity.StartedAt = job.StartedAt;
entity.CompletedAt = job.CompletedAt;
entity.ResultJson = job.ResultJson;
entity.ErrorMessage = job.ErrorMessage;
entity.RequestId = job.RequestId;
entity.GeneticRequestId = job.GeneticRequestId;
entity.Priority = job.Priority;
entity.RetryCount = job.RetryCount;
entity.MaxRetries = job.MaxRetries;
entity.RetryAfter = job.RetryAfter;
entity.IsRetryable = job.IsRetryable;
entity.FailureCategory = job.FailureCategory.HasValue ? (int)job.FailureCategory.Value : null;
await _context.SaveChangesAsync();
}
public async Task<IEnumerable<Job>> GetByBundleRequestIdAsync(Guid bundleRequestId)
{
var entities = await _context.Jobs
.Where(j => j.BundleRequestId == bundleRequestId)
.ToListAsync();
return entities.Select(MapToDomain);
}
public async Task<IEnumerable<Job>> GetByUserIdAsync(int userId)
{
var entities = await _context.Jobs
.Where(j => j.UserId == userId)
.ToListAsync();
return entities.Select(MapToDomain);
}
/// <summary>
/// Gets all running jobs assigned to a specific worker
/// </summary>
public async Task<IEnumerable<Job>> GetRunningJobsByWorkerIdAsync(string workerId)
{
var entities = await _context.Jobs
.Where(j => j.AssignedWorkerId == workerId && j.Status == (int)JobStatus.Running)
.ToListAsync();
return entities.Select(MapToDomain);
}
public async Task<IEnumerable<Job>> GetByGeneticRequestIdAsync(string geneticRequestId)
{
var entities = await _context.Jobs
.Where(j => j.GeneticRequestId == geneticRequestId)
.ToListAsync();
return entities.Select(MapToDomain);
}
public async Task<Dictionary<int, int>> GetRunningJobCountsByUserIdAsync(string workerId, JobType jobType)
{
// Get running job counts per user across all workers (global limit per user)
var counts = await _context.Jobs
.Where(j => j.Status == (int)JobStatus.Running &&
j.JobType == (int)jobType)
.GroupBy(j => j.UserId)
.Select(g => new { UserId = g.Key, Count = g.Count() })
.ToListAsync();
return counts.ToDictionary(x => x.UserId, x => x.Count);
}
public async Task<Job?> ClaimRandomJobAsync(string workerId, JobType jobType, int maxConcurrentPerUser)
{
// Use execution strategy to support retry with transactions
var strategy = _context.Database.CreateExecutionStrategy();
return await strategy.ExecuteAsync(async () =>
{
await using var transaction = await _context.Database.BeginTransactionAsync();
try
{
// Build SQL query that atomically excludes users at capacity using a subquery
// This ensures thread-safety across multiple workers - the check and claim happen atomically
var sql = @"
SELECT j.""Id"", j.""BundleRequestId"", j.""UserId"", j.""Status"", j.""JobType"", j.""Priority"",
j.""ConfigJson"", j.""StartDate"", j.""EndDate"", j.""ProgressPercentage"",
j.""AssignedWorkerId"", j.""LastHeartbeat"", j.""CreatedAt"", j.""StartedAt"",
j.""CompletedAt"", j.""ResultJson"", j.""ErrorMessage"", j.""RequestId"",
j.""GeneticRequestId"", j.""RetryCount"", j.""MaxRetries"", j.""RetryAfter"",
j.""IsRetryable"", j.""FailureCategory""
FROM ""Jobs"" j
WHERE j.""Status"" = @status
AND j.""JobType"" = @jobType
AND (j.""RetryAfter"" IS NULL OR j.""RetryAfter"" <= @now)
AND (
SELECT COUNT(*)
FROM ""Jobs"" running
WHERE running.""UserId"" = j.""UserId""
AND running.""Status"" = @runningStatus
AND running.""JobType"" = @jobType
) < @maxConcurrentPerUser
ORDER BY RANDOM()
LIMIT 1
FOR UPDATE SKIP LOCKED";
var parameters = new List<NpgsqlParameter>
{
new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending },
new NpgsqlParameter("jobType", NpgsqlDbType.Integer) { Value = (int)jobType },
new NpgsqlParameter("runningStatus", NpgsqlDbType.Integer) { Value = (int)JobStatus.Running },
new NpgsqlParameter("maxConcurrentPerUser", NpgsqlDbType.Integer) { Value = maxConcurrentPerUser },
new NpgsqlParameter("now", NpgsqlDbType.TimestampTz) { Value = DateTime.UtcNow }
};
_logger.LogDebug("Claiming random job atomically (maxConcurrentPerUser: {MaxConcurrent})", maxConcurrentPerUser);
// Execute raw SQL using ADO.NET to get the job with row-level locking
var connection = _context.Database.GetDbConnection();
await using var command = connection.CreateCommand();
command.Transaction = transaction.GetDbTransaction();
command.CommandText = sql;
command.Parameters.AddRange(parameters.ToArray());
JobEntity? job = null;
await using var reader = await command.ExecuteReaderAsync();
if (await reader.ReadAsync())
{
job = new JobEntity
{
Id = reader.GetGuid(reader.GetOrdinal("Id")),
BundleRequestId = reader.IsDBNull(reader.GetOrdinal("BundleRequestId")) ? null : reader.GetGuid(reader.GetOrdinal("BundleRequestId")),
UserId = reader.GetInt32(reader.GetOrdinal("UserId")),
Status = reader.GetInt32(reader.GetOrdinal("Status")),
JobType = reader.GetInt32(reader.GetOrdinal("JobType")),
Priority = reader.GetInt32(reader.GetOrdinal("Priority")),
ConfigJson = reader.GetString(reader.GetOrdinal("ConfigJson")),
StartDate = reader.GetDateTime(reader.GetOrdinal("StartDate")),
EndDate = reader.GetDateTime(reader.GetOrdinal("EndDate")),
ProgressPercentage = reader.GetInt32(reader.GetOrdinal("ProgressPercentage")),
AssignedWorkerId = reader.IsDBNull(reader.GetOrdinal("AssignedWorkerId")) ? null : reader.GetString(reader.GetOrdinal("AssignedWorkerId")),
LastHeartbeat = reader.IsDBNull(reader.GetOrdinal("LastHeartbeat")) ? null : reader.GetDateTime(reader.GetOrdinal("LastHeartbeat")),
CreatedAt = reader.GetDateTime(reader.GetOrdinal("CreatedAt")),
StartedAt = reader.IsDBNull(reader.GetOrdinal("StartedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("StartedAt")),
CompletedAt = reader.IsDBNull(reader.GetOrdinal("CompletedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("CompletedAt")),
ResultJson = reader.IsDBNull(reader.GetOrdinal("ResultJson")) ? null : reader.GetString(reader.GetOrdinal("ResultJson")),
ErrorMessage = reader.IsDBNull(reader.GetOrdinal("ErrorMessage")) ? null : reader.GetString(reader.GetOrdinal("ErrorMessage")),
RequestId = reader.IsDBNull(reader.GetOrdinal("RequestId")) ? null : reader.GetString(reader.GetOrdinal("RequestId")),
GeneticRequestId = reader.IsDBNull(reader.GetOrdinal("GeneticRequestId")) ? null : reader.GetString(reader.GetOrdinal("GeneticRequestId")),
RetryCount = reader.GetInt32(reader.GetOrdinal("RetryCount")),
MaxRetries = reader.GetInt32(reader.GetOrdinal("MaxRetries")),
RetryAfter = reader.IsDBNull(reader.GetOrdinal("RetryAfter")) ? null : reader.GetDateTime(reader.GetOrdinal("RetryAfter")),
IsRetryable = reader.GetBoolean(reader.GetOrdinal("IsRetryable")),
FailureCategory = reader.IsDBNull(reader.GetOrdinal("FailureCategory")) ? null : reader.GetInt32(reader.GetOrdinal("FailureCategory"))
};
}
await reader.CloseAsync();
if (job == null)
{
_logger.LogDebug("No random job found to claim for worker {WorkerId}", workerId);
await transaction.CommitAsync();
return null;
}
// Attach and update the job entity
_context.Jobs.Attach(job);
job.Status = (int)JobStatus.Running;
job.AssignedWorkerId = workerId;
job.StartedAt = DateTime.UtcNow;
job.LastHeartbeat = DateTime.UtcNow;
await _context.SaveChangesAsync();
await transaction.CommitAsync();
_logger.LogInformation("Claimed random job {JobId} (UserId: {UserId}) for worker {WorkerId}",
job.Id, job.UserId, workerId);
return MapToDomain(job);
}
catch (Exception ex)
{
await transaction.RollbackAsync();
_logger.LogError(ex, "Error claiming random job for worker {WorkerId}", workerId);
throw;
}
});
}
public async Task<(IEnumerable<Job> Jobs, int TotalCount)> GetPaginatedAsync(
int page,
int pageSize,
string sortBy = "CreatedAt",
string sortOrder = "desc",
JobStatus? status = null,
JobType? jobType = null,
int? userId = null,
string? workerId = null,
Guid? bundleRequestId = null)
{
var query = _context.Jobs.AsQueryable();
// Apply filters
if (status.HasValue)
{
query = query.Where(j => j.Status == (int)status.Value);
}
if (jobType.HasValue)
{
query = query.Where(j => j.JobType == (int)jobType.Value);
}
if (userId.HasValue)
{
query = query.Where(j => j.UserId == userId.Value);
}
if (!string.IsNullOrEmpty(workerId))
{
query = query.Where(j => j.AssignedWorkerId == workerId);
}
if (bundleRequestId.HasValue)
{
query = query.Where(j => j.BundleRequestId == bundleRequestId.Value);
}
// Get total count before pagination
var totalCount = await query.CountAsync();
// Apply sorting
query = sortBy.ToLower() switch
{
"createdat" => sortOrder.ToLower() == "asc"
? query.OrderBy(j => j.CreatedAt)
: query.OrderByDescending(j => j.CreatedAt),
"startedat" => sortOrder.ToLower() == "asc"
? query.OrderBy(j => j.StartedAt)
: query.OrderByDescending(j => j.StartedAt),
"completedat" => sortOrder.ToLower() == "asc"
? query.OrderBy(j => j.CompletedAt)
: query.OrderByDescending(j => j.CompletedAt),
"priority" => sortOrder.ToLower() == "asc"
? query.OrderBy(j => j.Priority)
: query.OrderByDescending(j => j.Priority),
"status" => sortOrder.ToLower() == "asc"
? query.OrderBy(j => j.Status)
: query.OrderByDescending(j => j.Status),
"jobtype" => sortOrder.ToLower() == "asc"
? query.OrderBy(j => j.JobType)
: query.OrderByDescending(j => j.JobType),
_ => query.OrderByDescending(j => j.CreatedAt) // Default sort
};
// Apply pagination
var entities = await query
.Skip((page - 1) * pageSize)
.Take(pageSize)
.ToListAsync();
var jobs = entities.Select(MapToDomain);
return (jobs, totalCount);
}
public async Task<Job?> GetByIdAsync(Guid jobId)
{
var entity = await _context.Jobs
.FirstOrDefaultAsync(j => j.Id == jobId);
return entity != null ? MapToDomain(entity) : null;
}
public async Task<IEnumerable<Job>> GetStaleJobsAsync(int timeoutMinutes = 5)
{
var timeoutThreshold = DateTime.UtcNow.AddMinutes(-timeoutMinutes);
var entities = await _context.Jobs
.Where(j => j.Status == (int)JobStatus.Running &&
(j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold))
.ToListAsync();
return entities.Select(MapToDomain);
}
public async Task<int> ResetStaleJobsAsync(int timeoutMinutes = 5)
{
var timeoutThreshold = DateTime.UtcNow.AddMinutes(-timeoutMinutes);
// Use AsTracking() to enable change tracking since DbContext uses NoTracking by default
var staleJobs = await _context.Jobs
.AsTracking()
.Where(j =>
// Running jobs with stale or missing heartbeats
(j.Status == (int)JobStatus.Running &&
(j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold)) ||
// Pending jobs that were assigned to a worker but never started (stale assignment)
(j.Status == (int)JobStatus.Pending &&
j.AssignedWorkerId != null &&
(j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold)))
.ToListAsync();
foreach (var job in staleJobs)
{
job.Status = (int)JobStatus.Pending;
job.AssignedWorkerId = null;
job.LastHeartbeat = null;
}
var count = staleJobs.Count;
if (count > 0)
{
await _context.SaveChangesAsync();
_logger.LogInformation("Reset {Count} stale jobs back to Pending status", count);
}
return count;
}
public async Task<int> ResetJobsByWorkerIdAsync(string workerId)
{
// Use AsTracking() to enable change tracking since DbContext uses NoTracking by default
// Reset ALL jobs (Running or Pending) assigned to this worker - they belong to a previous instance
var jobsToReset = await _context.Jobs
.AsTracking()
.Where(j => j.AssignedWorkerId == workerId &&
(j.Status == (int)JobStatus.Running || j.Status == (int)JobStatus.Pending))
.ToListAsync();
foreach (var job in jobsToReset)
{
job.Status = (int)JobStatus.Pending;
job.AssignedWorkerId = null;
job.LastHeartbeat = null;
}
var count = jobsToReset.Count;
if (count > 0)
{
await _context.SaveChangesAsync();
_logger.LogInformation("Reset {Count} jobs assigned to worker {WorkerId} back to Pending status", count, workerId);
}
return count;
}
public async Task<JobSummary> GetSummaryAsync()
{
// Use ADO.NET directly for aggregation queries to avoid EF Core mapping issues
var connection = _context.Database.GetDbConnection();
await connection.OpenAsync();
try
{
var statusCounts = new List<StatusCountResult>();
var jobTypeCounts = new List<JobTypeCountResult>();
var statusTypeCounts = new List<StatusTypeCountResult>();
var totalJobs = 0;
// Query 1: Status summary
var statusSummarySql = @"
SELECT ""Status"", COUNT(*) as Count
FROM ""Jobs""
GROUP BY ""Status""
ORDER BY ""Status""";
using (var command = connection.CreateCommand())
{
command.CommandText = statusSummarySql;
using (var reader = await command.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
statusCounts.Add(new StatusCountResult
{
Status = reader.GetInt32(0),
Count = reader.GetInt32(1)
});
}
}
}
// Query 2: Job type summary
var jobTypeSummarySql = @"
SELECT ""JobType"", COUNT(*) as Count
FROM ""Jobs""
GROUP BY ""JobType""
ORDER BY ""JobType""";
using (var command = connection.CreateCommand())
{
command.CommandText = jobTypeSummarySql;
using (var reader = await command.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
jobTypeCounts.Add(new JobTypeCountResult
{
JobType = reader.GetInt32(0),
Count = reader.GetInt32(1)
});
}
}
}
// Query 3: Status + Job type summary
var statusTypeSummarySql = @"
SELECT ""Status"", ""JobType"", COUNT(*) as Count
FROM ""Jobs""
GROUP BY ""Status"", ""JobType""
ORDER BY ""Status"", ""JobType""";
using (var command = connection.CreateCommand())
{
command.CommandText = statusTypeSummarySql;
using (var reader = await command.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
statusTypeCounts.Add(new StatusTypeCountResult
{
Status = reader.GetInt32(0),
JobType = reader.GetInt32(1),
Count = reader.GetInt32(2)
});
}
}
}
// Query 4: Total count
var totalCountSql = @"
SELECT COUNT(*) as Count
FROM ""Jobs""";
using (var command = connection.CreateCommand())
{
command.CommandText = totalCountSql;
var result = await command.ExecuteScalarAsync();
totalJobs = result != null ? Convert.ToInt32(result) : 0;
}
return new JobSummary
{
StatusCounts = statusCounts.Select(s => new JobStatusCount
{
Status = (JobStatus)s.Status,
Count = s.Count
}).ToList(),
JobTypeCounts = jobTypeCounts.Select(j => new JobTypeCount
{
JobType = (JobType)j.JobType,
Count = j.Count
}).ToList(),
StatusTypeCounts = statusTypeCounts.Select(st => new JobStatusTypeCount
{
Status = (JobStatus)st.Status,
JobType = (JobType)st.JobType,
Count = st.Count
}).ToList(),
TotalJobs = totalJobs
};
}
finally
{
await connection.CloseAsync();
}
}
public async Task DeleteAsync(Guid jobId)
{
// Use AsTracking() to enable change tracking since DbContext uses NoTracking by default
var entity = await _context.Jobs
.AsTracking()
.FirstOrDefaultAsync(e => e.Id == jobId);
if (entity == null)
{
_logger.LogWarning("Job {JobId} not found for deletion", jobId);
throw new InvalidOperationException($"Job with ID {jobId} not found.");
}
_context.Jobs.Remove(entity);
await _context.SaveChangesAsync();
_logger.LogInformation("Deleted job {JobId}", jobId);
}
// Helper classes for raw SQL query results
private class StatusCountResult
{
public int Status { get; set; }
public int Count { get; set; }
}
private class JobTypeCountResult
{
public int JobType { get; set; }
public int Count { get; set; }
}
private class StatusTypeCountResult
{
public int Status { get; set; }
public int JobType { get; set; }
public int Count { get; set; }
}
private class TotalCountResult
{
public int Count { get; set; }
}
private static JobEntity MapToEntity(Job job)
{
return new JobEntity
{
Id = job.Id,
BundleRequestId = job.BundleRequestId,
UserId = job.UserId,
Status = (int)job.Status,
JobType = (int)job.JobType,
Priority = job.Priority,
ConfigJson = job.ConfigJson,
StartDate = job.StartDate,
EndDate = job.EndDate,
ProgressPercentage = job.ProgressPercentage,
AssignedWorkerId = job.AssignedWorkerId,
LastHeartbeat = job.LastHeartbeat,
CreatedAt = job.CreatedAt,
StartedAt = job.StartedAt,
CompletedAt = job.CompletedAt,
ResultJson = job.ResultJson,
ErrorMessage = job.ErrorMessage,
RequestId = job.RequestId,
GeneticRequestId = job.GeneticRequestId,
RetryCount = job.RetryCount,
MaxRetries = job.MaxRetries,
RetryAfter = job.RetryAfter,
IsRetryable = job.IsRetryable,
FailureCategory = job.FailureCategory.HasValue ? (int)job.FailureCategory.Value : null
};
}
private static Job MapToDomain(JobEntity entity)
{
return new Job
{
Id = entity.Id,
BundleRequestId = entity.BundleRequestId,
UserId = entity.UserId,
Status = (JobStatus)entity.Status,
JobType = (JobType)entity.JobType,
Priority = entity.Priority,
ConfigJson = entity.ConfigJson,
StartDate = entity.StartDate,
EndDate = entity.EndDate,
ProgressPercentage = entity.ProgressPercentage,
AssignedWorkerId = entity.AssignedWorkerId,
LastHeartbeat = entity.LastHeartbeat,
CreatedAt = entity.CreatedAt,
StartedAt = entity.StartedAt,
CompletedAt = entity.CompletedAt,
ResultJson = entity.ResultJson,
ErrorMessage = entity.ErrorMessage,
RequestId = entity.RequestId,
GeneticRequestId = entity.GeneticRequestId,
RetryCount = entity.RetryCount,
MaxRetries = entity.MaxRetries,
RetryAfter = entity.RetryAfter,
IsRetryable = entity.IsRetryable,
FailureCategory = entity.FailureCategory.HasValue ? (FailureCategory)entity.FailureCategory.Value : null
};
}
}