Fix stuck the bundle backtests
This commit is contained in:
@@ -340,7 +340,7 @@ public class BacktestComputeWorker : BackgroundService
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider);
|
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider, job);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -421,7 +421,7 @@ public class BacktestComputeWorker : BackgroundService
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task UpdateBundleRequestProgress(Guid bundleRequestId, IServiceProvider serviceProvider)
|
private async Task UpdateBundleRequestProgress(Guid bundleRequestId, IServiceProvider serviceProvider, Job? updatedJob = null)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@@ -432,10 +432,49 @@ public class BacktestComputeWorker : BackgroundService
|
|||||||
|
|
||||||
// Get all jobs for this bundle
|
// Get all jobs for this bundle
|
||||||
var jobs = await jobRepository.GetByBundleRequestIdAsync(bundleRequestId);
|
var jobs = await jobRepository.GetByBundleRequestIdAsync(bundleRequestId);
|
||||||
|
var totalJobs = jobs.Count();
|
||||||
|
|
||||||
|
// If we have an updated job, use its current status instead of querying again
|
||||||
|
// This avoids race conditions where the job was just updated but not yet reflected in the query
|
||||||
var completedJobs = jobs.Count(j => j.Status == JobStatus.Completed);
|
var completedJobs = jobs.Count(j => j.Status == JobStatus.Completed);
|
||||||
var failedJobs = jobs.Count(j => j.Status == JobStatus.Failed);
|
var failedJobs = jobs.Count(j => j.Status == JobStatus.Failed);
|
||||||
var runningJobs = jobs.Count(j => j.Status == JobStatus.Running);
|
var runningJobs = jobs.Count(j => j.Status == JobStatus.Running);
|
||||||
var totalJobs = jobs.Count();
|
|
||||||
|
// Adjust counts based on the updated job if provided
|
||||||
|
if (updatedJob != null)
|
||||||
|
{
|
||||||
|
var existingJob = jobs.FirstOrDefault(j => j.Id == updatedJob.Id);
|
||||||
|
if (existingJob != null)
|
||||||
|
{
|
||||||
|
// Remove the old status count
|
||||||
|
switch (existingJob.Status)
|
||||||
|
{
|
||||||
|
case JobStatus.Completed:
|
||||||
|
completedJobs--;
|
||||||
|
break;
|
||||||
|
case JobStatus.Failed:
|
||||||
|
failedJobs--;
|
||||||
|
break;
|
||||||
|
case JobStatus.Running:
|
||||||
|
runningJobs--;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the new status count
|
||||||
|
switch (updatedJob.Status)
|
||||||
|
{
|
||||||
|
case JobStatus.Completed:
|
||||||
|
completedJobs++;
|
||||||
|
break;
|
||||||
|
case JobStatus.Failed:
|
||||||
|
failedJobs++;
|
||||||
|
break;
|
||||||
|
case JobStatus.Running:
|
||||||
|
runningJobs++;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (totalJobs == 0)
|
if (totalJobs == 0)
|
||||||
{
|
{
|
||||||
@@ -467,7 +506,7 @@ public class BacktestComputeWorker : BackgroundService
|
|||||||
bundleRequest.UpdatedAt = DateTime.UtcNow;
|
bundleRequest.UpdatedAt = DateTime.UtcNow;
|
||||||
|
|
||||||
// CRITICAL: If bundle is already in a final state (Completed/Failed with CompletedAt set),
|
// CRITICAL: If bundle is already in a final state (Completed/Failed with CompletedAt set),
|
||||||
// don't overwrite it unless we're detecting a legitimate change
|
// don't overwrite it unless we're detecting a legitimate inconsistency that needs fixing
|
||||||
if (bundleRequest.CompletedAt.HasValue &&
|
if (bundleRequest.CompletedAt.HasValue &&
|
||||||
(bundleRequest.Status == BundleBacktestRequestStatus.Completed ||
|
(bundleRequest.Status == BundleBacktestRequestStatus.Completed ||
|
||||||
bundleRequest.Status == BundleBacktestRequestStatus.Failed))
|
bundleRequest.Status == BundleBacktestRequestStatus.Failed))
|
||||||
@@ -629,7 +668,7 @@ public class BacktestComputeWorker : BackgroundService
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await UpdateBundleRequestProgress(stuckJob.BundleRequestId.Value, scope.ServiceProvider);
|
await UpdateBundleRequestProgress(stuckJob.BundleRequestId.Value, scope.ServiceProvider, stuckJob);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@@ -706,7 +745,7 @@ public class BacktestComputeWorker : BackgroundService
|
|||||||
// Update bundle request if this is part of a bundle
|
// Update bundle request if this is part of a bundle
|
||||||
if (job.BundleRequestId.HasValue)
|
if (job.BundleRequestId.HasValue)
|
||||||
{
|
{
|
||||||
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider);
|
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider, job);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -868,7 +907,7 @@ public class BacktestComputeWorker : BackgroundService
|
|||||||
// Update bundle request if this is part of a bundle
|
// Update bundle request if this is part of a bundle
|
||||||
if (job.BundleRequestId.HasValue)
|
if (job.BundleRequestId.HasValue)
|
||||||
{
|
{
|
||||||
await UpdateBundleRequestProgress(job.BundleRequestId.Value, serviceProvider);
|
await UpdateBundleRequestProgress(job.BundleRequestId.Value, serviceProvider, job);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user