using Managing.Application.Abstractions.Repositories; using Managing.Domain.Backtests; using Managing.Infrastructure.Databases.PostgreSql.Entities; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Logging; using Npgsql; using NpgsqlTypes; using static Managing.Common.Enums; namespace Managing.Infrastructure.Databases.PostgreSql; public class PostgreSqlJobRepository : IJobRepository { private readonly ManagingDbContext _context; private readonly ILogger _logger; public PostgreSqlJobRepository( ManagingDbContext context, ILogger logger) { _context = context; _logger = logger; } public async Task CreateAsync(Job job) { var entity = MapToEntity(job); _context.Jobs.Add(entity); await _context.SaveChangesAsync(); return MapToDomain(entity); } public async Task ClaimNextJobAsync(string workerId, JobType? jobType = null) { // Use execution strategy to support retry with transactions var strategy = _context.Database.CreateExecutionStrategy(); return await strategy.ExecuteAsync(async () => { await using var transaction = await _context.Database.BeginTransactionAsync(); try { // Build SQL query with optional job type filter // Use raw ADO.NET to avoid EF Core wrapping the query (which breaks FOR UPDATE SKIP LOCKED) var sql = @" SELECT ""Id"", ""BundleRequestId"", ""UserId"", ""Status"", ""JobType"", ""Priority"", ""ConfigJson"", ""StartDate"", ""EndDate"", ""ProgressPercentage"", ""AssignedWorkerId"", ""LastHeartbeat"", ""CreatedAt"", ""StartedAt"", ""CompletedAt"", ""ResultJson"", ""ErrorMessage"", ""RequestId"", ""GeneticRequestId"", ""RetryCount"", ""MaxRetries"", ""RetryAfter"", ""IsRetryable"", ""FailureCategory"" FROM ""Jobs"" WHERE ""Status"" = @status"; var parameters = new List { new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending } }; if (jobType.HasValue) { sql += @" AND ""JobType"" = @jobType"; parameters.Add(new NpgsqlParameter("jobType", NpgsqlDbType.Integer) { Value = (int)jobType.Value }); } sql += @" ORDER BY ""Priority"" DESC, ""CreatedAt"" ASC LIMIT 1 FOR UPDATE SKIP LOCKED"; _logger.LogDebug("Claiming job with SQL: {Sql}, Parameters: Status={Status}, JobType={JobType}", sql, (int)JobStatus.Pending, jobType.HasValue ? (int)jobType.Value : (int?)null); // Execute raw SQL using ADO.NET to get the job with row-level locking var connection = _context.Database.GetDbConnection(); await using var command = connection.CreateCommand(); command.Transaction = transaction.GetDbTransaction(); command.CommandText = sql; command.Parameters.AddRange(parameters.ToArray()); JobEntity? job = null; await using var reader = await command.ExecuteReaderAsync(); if (await reader.ReadAsync()) { job = new JobEntity { Id = reader.GetGuid(reader.GetOrdinal("Id")), BundleRequestId = reader.IsDBNull(reader.GetOrdinal("BundleRequestId")) ? null : reader.GetGuid(reader.GetOrdinal("BundleRequestId")), UserId = reader.GetInt32(reader.GetOrdinal("UserId")), Status = reader.GetInt32(reader.GetOrdinal("Status")), JobType = reader.GetInt32(reader.GetOrdinal("JobType")), Priority = reader.GetInt32(reader.GetOrdinal("Priority")), ConfigJson = reader.GetString(reader.GetOrdinal("ConfigJson")), StartDate = reader.GetDateTime(reader.GetOrdinal("StartDate")), EndDate = reader.GetDateTime(reader.GetOrdinal("EndDate")), ProgressPercentage = reader.GetInt32(reader.GetOrdinal("ProgressPercentage")), AssignedWorkerId = reader.IsDBNull(reader.GetOrdinal("AssignedWorkerId")) ? null : reader.GetString(reader.GetOrdinal("AssignedWorkerId")), LastHeartbeat = reader.IsDBNull(reader.GetOrdinal("LastHeartbeat")) ? null : reader.GetDateTime(reader.GetOrdinal("LastHeartbeat")), CreatedAt = reader.GetDateTime(reader.GetOrdinal("CreatedAt")), StartedAt = reader.IsDBNull(reader.GetOrdinal("StartedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("StartedAt")), CompletedAt = reader.IsDBNull(reader.GetOrdinal("CompletedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("CompletedAt")), ResultJson = reader.IsDBNull(reader.GetOrdinal("ResultJson")) ? null : reader.GetString(reader.GetOrdinal("ResultJson")), ErrorMessage = reader.IsDBNull(reader.GetOrdinal("ErrorMessage")) ? null : reader.GetString(reader.GetOrdinal("ErrorMessage")), RequestId = reader.IsDBNull(reader.GetOrdinal("RequestId")) ? null : reader.GetString(reader.GetOrdinal("RequestId")), GeneticRequestId = reader.IsDBNull(reader.GetOrdinal("GeneticRequestId")) ? null : reader.GetString(reader.GetOrdinal("GeneticRequestId")), RetryCount = reader.GetInt32(reader.GetOrdinal("RetryCount")), MaxRetries = reader.GetInt32(reader.GetOrdinal("MaxRetries")), RetryAfter = reader.IsDBNull(reader.GetOrdinal("RetryAfter")) ? null : reader.GetDateTime(reader.GetOrdinal("RetryAfter")), IsRetryable = reader.GetBoolean(reader.GetOrdinal("IsRetryable")), FailureCategory = reader.IsDBNull(reader.GetOrdinal("FailureCategory")) ? null : reader.GetInt32(reader.GetOrdinal("FailureCategory")) }; } await reader.CloseAsync(); if (job == null) { _logger.LogDebug("No job found to claim for worker {WorkerId}", workerId); await transaction.CommitAsync(); return null; } // Attach and update the job entity _context.Jobs.Attach(job); job.Status = (int)JobStatus.Running; job.AssignedWorkerId = workerId; job.StartedAt = DateTime.UtcNow; job.LastHeartbeat = DateTime.UtcNow; await _context.SaveChangesAsync(); await transaction.CommitAsync(); _logger.LogInformation("Claimed job {JobId} for worker {WorkerId}", job.Id, workerId); return MapToDomain(job); } catch (Exception ex) { await transaction.RollbackAsync(); _logger.LogError(ex, "Error claiming job for worker {WorkerId}", workerId); throw; } }); } public async Task UpdateAsync(Job job) { // Use AsTracking() to enable change tracking since DbContext uses NoTracking by default var entity = await _context.Jobs .AsTracking() .FirstOrDefaultAsync(e => e.Id == job.Id); if (entity == null) { _logger.LogWarning("Job {JobId} not found for update", job.Id); return; } // Update entity properties entity.Status = (int)job.Status; entity.JobType = (int)job.JobType; entity.ProgressPercentage = job.ProgressPercentage; entity.AssignedWorkerId = job.AssignedWorkerId; entity.LastHeartbeat = job.LastHeartbeat; entity.StartedAt = job.StartedAt; entity.CompletedAt = job.CompletedAt; entity.ResultJson = job.ResultJson; entity.ErrorMessage = job.ErrorMessage; entity.RequestId = job.RequestId; entity.GeneticRequestId = job.GeneticRequestId; entity.Priority = job.Priority; entity.RetryCount = job.RetryCount; entity.MaxRetries = job.MaxRetries; entity.RetryAfter = job.RetryAfter; entity.IsRetryable = job.IsRetryable; entity.FailureCategory = job.FailureCategory.HasValue ? (int)job.FailureCategory.Value : null; await _context.SaveChangesAsync(); } public async Task> GetByBundleRequestIdAsync(Guid bundleRequestId) { var entities = await _context.Jobs .Where(j => j.BundleRequestId == bundleRequestId) .ToListAsync(); return entities.Select(MapToDomain); } public async Task> GetByUserIdAsync(int userId) { var entities = await _context.Jobs .Where(j => j.UserId == userId) .ToListAsync(); return entities.Select(MapToDomain); } /// /// Gets all running jobs assigned to a specific worker /// public async Task> GetRunningJobsByWorkerIdAsync(string workerId) { var entities = await _context.Jobs .Where(j => j.AssignedWorkerId == workerId && j.Status == (int)JobStatus.Running) .ToListAsync(); return entities.Select(MapToDomain); } public async Task> GetByGeneticRequestIdAsync(string geneticRequestId) { var entities = await _context.Jobs .Where(j => j.GeneticRequestId == geneticRequestId) .ToListAsync(); return entities.Select(MapToDomain); } public async Task> GetRunningJobCountsByUserIdAsync(string workerId, JobType jobType) { // Get running job counts per user across all workers (global limit per user) var counts = await _context.Jobs .Where(j => j.Status == (int)JobStatus.Running && j.JobType == (int)jobType) .GroupBy(j => j.UserId) .Select(g => new { UserId = g.Key, Count = g.Count() }) .ToListAsync(); return counts.ToDictionary(x => x.UserId, x => x.Count); } public async Task ClaimRandomJobAsync(string workerId, JobType jobType, int maxConcurrentPerUser) { // Use execution strategy to support retry with transactions var strategy = _context.Database.CreateExecutionStrategy(); return await strategy.ExecuteAsync(async () => { await using var transaction = await _context.Database.BeginTransactionAsync(); try { // Build SQL query that atomically excludes users at capacity using a subquery // This ensures thread-safety across multiple workers - the check and claim happen atomically var sql = @" SELECT j.""Id"", j.""BundleRequestId"", j.""UserId"", j.""Status"", j.""JobType"", j.""Priority"", j.""ConfigJson"", j.""StartDate"", j.""EndDate"", j.""ProgressPercentage"", j.""AssignedWorkerId"", j.""LastHeartbeat"", j.""CreatedAt"", j.""StartedAt"", j.""CompletedAt"", j.""ResultJson"", j.""ErrorMessage"", j.""RequestId"", j.""GeneticRequestId"", j.""RetryCount"", j.""MaxRetries"", j.""RetryAfter"", j.""IsRetryable"", j.""FailureCategory"" FROM ""Jobs"" j WHERE j.""Status"" = @status AND j.""JobType"" = @jobType AND ( SELECT COUNT(*) FROM ""Jobs"" running WHERE running.""UserId"" = j.""UserId"" AND running.""Status"" = @runningStatus AND running.""JobType"" = @jobType ) < @maxConcurrentPerUser ORDER BY RANDOM() LIMIT 1 FOR UPDATE SKIP LOCKED"; var parameters = new List { new NpgsqlParameter("status", NpgsqlDbType.Integer) { Value = (int)JobStatus.Pending }, new NpgsqlParameter("jobType", NpgsqlDbType.Integer) { Value = (int)jobType }, new NpgsqlParameter("runningStatus", NpgsqlDbType.Integer) { Value = (int)JobStatus.Running }, new NpgsqlParameter("maxConcurrentPerUser", NpgsqlDbType.Integer) { Value = maxConcurrentPerUser } }; _logger.LogDebug("Claiming random job atomically (maxConcurrentPerUser: {MaxConcurrent})", maxConcurrentPerUser); // Execute raw SQL using ADO.NET to get the job with row-level locking var connection = _context.Database.GetDbConnection(); await using var command = connection.CreateCommand(); command.Transaction = transaction.GetDbTransaction(); command.CommandText = sql; command.Parameters.AddRange(parameters.ToArray()); JobEntity? job = null; await using var reader = await command.ExecuteReaderAsync(); if (await reader.ReadAsync()) { job = new JobEntity { Id = reader.GetGuid(reader.GetOrdinal("Id")), BundleRequestId = reader.IsDBNull(reader.GetOrdinal("BundleRequestId")) ? null : reader.GetGuid(reader.GetOrdinal("BundleRequestId")), UserId = reader.GetInt32(reader.GetOrdinal("UserId")), Status = reader.GetInt32(reader.GetOrdinal("Status")), JobType = reader.GetInt32(reader.GetOrdinal("JobType")), Priority = reader.GetInt32(reader.GetOrdinal("Priority")), ConfigJson = reader.GetString(reader.GetOrdinal("ConfigJson")), StartDate = reader.GetDateTime(reader.GetOrdinal("StartDate")), EndDate = reader.GetDateTime(reader.GetOrdinal("EndDate")), ProgressPercentage = reader.GetInt32(reader.GetOrdinal("ProgressPercentage")), AssignedWorkerId = reader.IsDBNull(reader.GetOrdinal("AssignedWorkerId")) ? null : reader.GetString(reader.GetOrdinal("AssignedWorkerId")), LastHeartbeat = reader.IsDBNull(reader.GetOrdinal("LastHeartbeat")) ? null : reader.GetDateTime(reader.GetOrdinal("LastHeartbeat")), CreatedAt = reader.GetDateTime(reader.GetOrdinal("CreatedAt")), StartedAt = reader.IsDBNull(reader.GetOrdinal("StartedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("StartedAt")), CompletedAt = reader.IsDBNull(reader.GetOrdinal("CompletedAt")) ? null : reader.GetDateTime(reader.GetOrdinal("CompletedAt")), ResultJson = reader.IsDBNull(reader.GetOrdinal("ResultJson")) ? null : reader.GetString(reader.GetOrdinal("ResultJson")), ErrorMessage = reader.IsDBNull(reader.GetOrdinal("ErrorMessage")) ? null : reader.GetString(reader.GetOrdinal("ErrorMessage")), RequestId = reader.IsDBNull(reader.GetOrdinal("RequestId")) ? null : reader.GetString(reader.GetOrdinal("RequestId")), GeneticRequestId = reader.IsDBNull(reader.GetOrdinal("GeneticRequestId")) ? null : reader.GetString(reader.GetOrdinal("GeneticRequestId")), RetryCount = reader.GetInt32(reader.GetOrdinal("RetryCount")), MaxRetries = reader.GetInt32(reader.GetOrdinal("MaxRetries")), RetryAfter = reader.IsDBNull(reader.GetOrdinal("RetryAfter")) ? null : reader.GetDateTime(reader.GetOrdinal("RetryAfter")), IsRetryable = reader.GetBoolean(reader.GetOrdinal("IsRetryable")), FailureCategory = reader.IsDBNull(reader.GetOrdinal("FailureCategory")) ? null : reader.GetInt32(reader.GetOrdinal("FailureCategory")) }; } await reader.CloseAsync(); if (job == null) { _logger.LogDebug("No random job found to claim for worker {WorkerId}", workerId); await transaction.CommitAsync(); return null; } // Attach and update the job entity _context.Jobs.Attach(job); job.Status = (int)JobStatus.Running; job.AssignedWorkerId = workerId; job.StartedAt = DateTime.UtcNow; job.LastHeartbeat = DateTime.UtcNow; await _context.SaveChangesAsync(); await transaction.CommitAsync(); _logger.LogInformation("Claimed random job {JobId} (UserId: {UserId}) for worker {WorkerId}", job.Id, job.UserId, workerId); return MapToDomain(job); } catch (Exception ex) { await transaction.RollbackAsync(); _logger.LogError(ex, "Error claiming random job for worker {WorkerId}", workerId); throw; } }); } public async Task<(IEnumerable Jobs, int TotalCount)> GetPaginatedAsync( int page, int pageSize, string sortBy = "CreatedAt", string sortOrder = "desc", JobStatus? status = null, JobType? jobType = null, int? userId = null, string? workerId = null, Guid? bundleRequestId = null) { var query = _context.Jobs.AsQueryable(); // Apply filters if (status.HasValue) { query = query.Where(j => j.Status == (int)status.Value); } if (jobType.HasValue) { query = query.Where(j => j.JobType == (int)jobType.Value); } if (userId.HasValue) { query = query.Where(j => j.UserId == userId.Value); } if (!string.IsNullOrEmpty(workerId)) { query = query.Where(j => j.AssignedWorkerId == workerId); } if (bundleRequestId.HasValue) { query = query.Where(j => j.BundleRequestId == bundleRequestId.Value); } // Get total count before pagination var totalCount = await query.CountAsync(); // Apply sorting query = sortBy.ToLower() switch { "createdat" => sortOrder.ToLower() == "asc" ? query.OrderBy(j => j.CreatedAt) : query.OrderByDescending(j => j.CreatedAt), "startedat" => sortOrder.ToLower() == "asc" ? query.OrderBy(j => j.StartedAt) : query.OrderByDescending(j => j.StartedAt), "completedat" => sortOrder.ToLower() == "asc" ? query.OrderBy(j => j.CompletedAt) : query.OrderByDescending(j => j.CompletedAt), "priority" => sortOrder.ToLower() == "asc" ? query.OrderBy(j => j.Priority) : query.OrderByDescending(j => j.Priority), "status" => sortOrder.ToLower() == "asc" ? query.OrderBy(j => j.Status) : query.OrderByDescending(j => j.Status), "jobtype" => sortOrder.ToLower() == "asc" ? query.OrderBy(j => j.JobType) : query.OrderByDescending(j => j.JobType), _ => query.OrderByDescending(j => j.CreatedAt) // Default sort }; // Apply pagination var entities = await query .Skip((page - 1) * pageSize) .Take(pageSize) .ToListAsync(); var jobs = entities.Select(MapToDomain); return (jobs, totalCount); } public async Task GetByIdAsync(Guid jobId) { var entity = await _context.Jobs .FirstOrDefaultAsync(j => j.Id == jobId); return entity != null ? MapToDomain(entity) : null; } public async Task> GetStaleJobsAsync(int timeoutMinutes = 5) { var timeoutThreshold = DateTime.UtcNow.AddMinutes(-timeoutMinutes); var entities = await _context.Jobs .Where(j => j.Status == (int)JobStatus.Running && (j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold)) .ToListAsync(); return entities.Select(MapToDomain); } public async Task ResetStaleJobsAsync(int timeoutMinutes = 5) { var timeoutThreshold = DateTime.UtcNow.AddMinutes(-timeoutMinutes); // Use AsTracking() to enable change tracking since DbContext uses NoTracking by default var staleJobs = await _context.Jobs .AsTracking() .Where(j => j.Status == (int)JobStatus.Running && (j.LastHeartbeat == null || j.LastHeartbeat < timeoutThreshold)) .ToListAsync(); foreach (var job in staleJobs) { job.Status = (int)JobStatus.Pending; job.AssignedWorkerId = null; job.LastHeartbeat = null; } var count = staleJobs.Count; if (count > 0) { await _context.SaveChangesAsync(); _logger.LogInformation("Reset {Count} stale jobs back to Pending status", count); } return count; } public async Task GetSummaryAsync() { // Use ADO.NET directly for aggregation queries to avoid EF Core mapping issues var connection = _context.Database.GetDbConnection(); await connection.OpenAsync(); try { var statusCounts = new List(); var jobTypeCounts = new List(); var statusTypeCounts = new List(); var totalJobs = 0; // Query 1: Status summary var statusSummarySql = @" SELECT ""Status"", COUNT(*) as Count FROM ""Jobs"" GROUP BY ""Status"" ORDER BY ""Status"""; using (var command = connection.CreateCommand()) { command.CommandText = statusSummarySql; using (var reader = await command.ExecuteReaderAsync()) { while (await reader.ReadAsync()) { statusCounts.Add(new StatusCountResult { Status = reader.GetInt32(0), Count = reader.GetInt32(1) }); } } } // Query 2: Job type summary var jobTypeSummarySql = @" SELECT ""JobType"", COUNT(*) as Count FROM ""Jobs"" GROUP BY ""JobType"" ORDER BY ""JobType"""; using (var command = connection.CreateCommand()) { command.CommandText = jobTypeSummarySql; using (var reader = await command.ExecuteReaderAsync()) { while (await reader.ReadAsync()) { jobTypeCounts.Add(new JobTypeCountResult { JobType = reader.GetInt32(0), Count = reader.GetInt32(1) }); } } } // Query 3: Status + Job type summary var statusTypeSummarySql = @" SELECT ""Status"", ""JobType"", COUNT(*) as Count FROM ""Jobs"" GROUP BY ""Status"", ""JobType"" ORDER BY ""Status"", ""JobType"""; using (var command = connection.CreateCommand()) { command.CommandText = statusTypeSummarySql; using (var reader = await command.ExecuteReaderAsync()) { while (await reader.ReadAsync()) { statusTypeCounts.Add(new StatusTypeCountResult { Status = reader.GetInt32(0), JobType = reader.GetInt32(1), Count = reader.GetInt32(2) }); } } } // Query 4: Total count var totalCountSql = @" SELECT COUNT(*) as Count FROM ""Jobs"""; using (var command = connection.CreateCommand()) { command.CommandText = totalCountSql; var result = await command.ExecuteScalarAsync(); totalJobs = result != null ? Convert.ToInt32(result) : 0; } return new JobSummary { StatusCounts = statusCounts.Select(s => new JobStatusCount { Status = (JobStatus)s.Status, Count = s.Count }).ToList(), JobTypeCounts = jobTypeCounts.Select(j => new JobTypeCount { JobType = (JobType)j.JobType, Count = j.Count }).ToList(), StatusTypeCounts = statusTypeCounts.Select(st => new JobStatusTypeCount { Status = (JobStatus)st.Status, JobType = (JobType)st.JobType, Count = st.Count }).ToList(), TotalJobs = totalJobs }; } finally { await connection.CloseAsync(); } } public async Task DeleteAsync(Guid jobId) { // Use AsTracking() to enable change tracking since DbContext uses NoTracking by default var entity = await _context.Jobs .AsTracking() .FirstOrDefaultAsync(e => e.Id == jobId); if (entity == null) { _logger.LogWarning("Job {JobId} not found for deletion", jobId); throw new InvalidOperationException($"Job with ID {jobId} not found."); } _context.Jobs.Remove(entity); await _context.SaveChangesAsync(); _logger.LogInformation("Deleted job {JobId}", jobId); } // Helper classes for raw SQL query results private class StatusCountResult { public int Status { get; set; } public int Count { get; set; } } private class JobTypeCountResult { public int JobType { get; set; } public int Count { get; set; } } private class StatusTypeCountResult { public int Status { get; set; } public int JobType { get; set; } public int Count { get; set; } } private class TotalCountResult { public int Count { get; set; } } private static JobEntity MapToEntity(Job job) { return new JobEntity { Id = job.Id, BundleRequestId = job.BundleRequestId, UserId = job.UserId, Status = (int)job.Status, JobType = (int)job.JobType, Priority = job.Priority, ConfigJson = job.ConfigJson, StartDate = job.StartDate, EndDate = job.EndDate, ProgressPercentage = job.ProgressPercentage, AssignedWorkerId = job.AssignedWorkerId, LastHeartbeat = job.LastHeartbeat, CreatedAt = job.CreatedAt, StartedAt = job.StartedAt, CompletedAt = job.CompletedAt, ResultJson = job.ResultJson, ErrorMessage = job.ErrorMessage, RequestId = job.RequestId, GeneticRequestId = job.GeneticRequestId, RetryCount = job.RetryCount, MaxRetries = job.MaxRetries, RetryAfter = job.RetryAfter, IsRetryable = job.IsRetryable, FailureCategory = job.FailureCategory.HasValue ? (int)job.FailureCategory.Value : null }; } private static Job MapToDomain(JobEntity entity) { return new Job { Id = entity.Id, BundleRequestId = entity.BundleRequestId, UserId = entity.UserId, Status = (JobStatus)entity.Status, JobType = (JobType)entity.JobType, Priority = entity.Priority, ConfigJson = entity.ConfigJson, StartDate = entity.StartDate, EndDate = entity.EndDate, ProgressPercentage = entity.ProgressPercentage, AssignedWorkerId = entity.AssignedWorkerId, LastHeartbeat = entity.LastHeartbeat, CreatedAt = entity.CreatedAt, StartedAt = entity.StartedAt, CompletedAt = entity.CompletedAt, ResultJson = entity.ResultJson, ErrorMessage = entity.ErrorMessage, RequestId = entity.RequestId, GeneticRequestId = entity.GeneticRequestId, RetryCount = entity.RetryCount, MaxRetries = entity.MaxRetries, RetryAfter = entity.RetryAfter, IsRetryable = entity.IsRetryable, FailureCategory = entity.FailureCategory.HasValue ? (FailureCategory)entity.FailureCategory.Value : null }; } }