Add admin endpoint to delete bundle backtest requests and implement related UI functionality + Add job resilient
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Text.Json;
|
||||
using Managing.Application.Abstractions.Repositories;
|
||||
using Managing.Application.Abstractions.Services;
|
||||
@@ -22,6 +23,8 @@ public class BacktestComputeWorker : BackgroundService
|
||||
private readonly ILogger<BacktestComputeWorker> _logger;
|
||||
private readonly BacktestComputeWorkerOptions _options;
|
||||
private readonly SemaphoreSlim _instanceSemaphore;
|
||||
private readonly ConcurrentDictionary<Guid, Task> _runningJobTasks = new();
|
||||
private readonly CancellationTokenSource _shutdownCts = new();
|
||||
|
||||
public BacktestComputeWorker(
|
||||
IServiceScopeFactory scopeFactory,
|
||||
@@ -37,32 +40,78 @@ public class BacktestComputeWorker : BackgroundService
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"BacktestComputeWorker starting. WorkerId: {WorkerId}, MaxConcurrentPerUser: {MaxConcurrentPerUser}, MaxConcurrentPerInstance: {MaxConcurrentPerInstance}, PollInterval: {PollInterval}s",
|
||||
_options.WorkerId, _options.MaxConcurrentPerUser, _options.MaxConcurrentPerInstance, _options.JobPollIntervalSeconds);
|
||||
"BacktestComputeWorker starting. WorkerId: {WorkerId}, MaxConcurrentPerUser: {MaxConcurrentPerUser}, MaxConcurrentPerInstance: {MaxConcurrentPerInstance}, PollInterval: {PollInterval}s, JobTimeout: {JobTimeoutMinutes}min",
|
||||
_options.WorkerId, _options.MaxConcurrentPerUser, _options.MaxConcurrentPerInstance,
|
||||
_options.JobPollIntervalSeconds, _options.JobTimeoutMinutes);
|
||||
|
||||
// Link cancellation tokens
|
||||
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, _shutdownCts.Token);
|
||||
var cancellationToken = linkedCts.Token;
|
||||
|
||||
// Background task for stale job recovery
|
||||
var staleJobRecoveryTask = Task.Run(() => StaleJobRecoveryLoop(stoppingToken), stoppingToken);
|
||||
var staleJobRecoveryTask = Task.Run(() => StaleJobRecoveryLoop(cancellationToken), cancellationToken);
|
||||
|
||||
// Background task for heartbeat updates
|
||||
var heartbeatTask = Task.Run(() => HeartbeatLoop(stoppingToken), stoppingToken);
|
||||
var heartbeatTask = Task.Run(() => HeartbeatLoop(cancellationToken), cancellationToken);
|
||||
|
||||
// Main job processing loop
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
try
|
||||
{
|
||||
try
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
await ProcessJobsAsync(stoppingToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in BacktestComputeWorker main loop");
|
||||
SentrySdk.CaptureException(ex);
|
||||
}
|
||||
try
|
||||
{
|
||||
await ProcessJobsAsync(cancellationToken);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Expected during shutdown
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in BacktestComputeWorker main loop");
|
||||
SentrySdk.CaptureException(ex);
|
||||
}
|
||||
|
||||
await Task.Delay(TimeSpan.FromSeconds(_options.JobPollIntervalSeconds), stoppingToken);
|
||||
try
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromSeconds(_options.JobPollIntervalSeconds), cancellationToken);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_logger.LogInformation("BacktestComputeWorker stopping, waiting for {Count} running jobs to complete or timeout",
|
||||
_runningJobTasks.Count);
|
||||
|
||||
// Signal shutdown
|
||||
_shutdownCts.Cancel();
|
||||
|
||||
// Wait for running jobs with timeout
|
||||
var waitTasks = _runningJobTasks.Values.ToArray();
|
||||
if (waitTasks.Length > 0)
|
||||
{
|
||||
var timeoutTask = Task.Delay(TimeSpan.FromMinutes(_options.GracefulShutdownTimeoutMinutes), CancellationToken.None);
|
||||
var completedTask = await Task.WhenAny(Task.WhenAll(waitTasks), timeoutTask);
|
||||
|
||||
if (completedTask == timeoutTask)
|
||||
{
|
||||
_logger.LogWarning("Graceful shutdown timeout reached, {Count} jobs may still be running",
|
||||
_runningJobTasks.Count);
|
||||
}
|
||||
else
|
||||
{
|
||||
_logger.LogInformation("All running jobs completed during graceful shutdown");
|
||||
}
|
||||
}
|
||||
|
||||
_logger.LogInformation("BacktestComputeWorker stopped");
|
||||
}
|
||||
|
||||
_logger.LogInformation("BacktestComputeWorker stopping");
|
||||
}
|
||||
|
||||
private async Task ProcessJobsAsync(CancellationToken cancellationToken)
|
||||
@@ -99,22 +148,32 @@ public class BacktestComputeWorker : BackgroundService
|
||||
|
||||
// Process the job asynchronously (don't await, let it run in background)
|
||||
// Create a new scope for the job processing to ensure proper lifetime management
|
||||
_ = Task.Run(async () =>
|
||||
var jobTask = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await ProcessJobAsync(job, cancellationToken);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Handle cancellation gracefully
|
||||
_logger.LogInformation("Job {JobId} was cancelled during processing", job.Id);
|
||||
await HandleJobCancellation(job);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error processing job {JobId}", job.Id);
|
||||
throw;
|
||||
// Error handling is done in ProcessJobAsync
|
||||
}
|
||||
finally
|
||||
{
|
||||
_runningJobTasks.TryRemove(job.Id, out _);
|
||||
_instanceSemaphore.Release();
|
||||
}
|
||||
}, cancellationToken);
|
||||
|
||||
// Track the running job task
|
||||
_runningJobTasks.TryAdd(job.Id, jobTask);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
@@ -135,6 +194,8 @@ public class BacktestComputeWorker : BackgroundService
|
||||
var exchangeService = scope.ServiceProvider.GetRequiredService<IExchangeService>();
|
||||
var agentSummaryRepository = scope.ServiceProvider.GetRequiredService<IAgentSummaryRepository>();
|
||||
|
||||
var jobStartTime = DateTime.UtcNow;
|
||||
|
||||
try
|
||||
{
|
||||
_logger.LogInformation(
|
||||
@@ -174,6 +235,16 @@ public class BacktestComputeWorker : BackgroundService
|
||||
{
|
||||
try
|
||||
{
|
||||
// Check if job has been running too long
|
||||
var elapsed = DateTime.UtcNow - jobStartTime;
|
||||
if (elapsed.TotalMinutes > _options.JobTimeoutMinutes)
|
||||
{
|
||||
_logger.LogWarning(
|
||||
"Job {JobId} has been running for {ElapsedMinutes} minutes, exceeding timeout of {TimeoutMinutes} minutes",
|
||||
job.Id, elapsed.TotalMinutes, _options.JobTimeoutMinutes);
|
||||
throw new TimeoutException($"Job exceeded timeout of {_options.JobTimeoutMinutes} minutes");
|
||||
}
|
||||
|
||||
job.ProgressPercentage = percentage;
|
||||
job.LastHeartbeat = DateTime.UtcNow;
|
||||
await jobRepository.UpdateAsync(job);
|
||||
@@ -181,19 +252,37 @@ public class BacktestComputeWorker : BackgroundService
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error updating job progress for job {JobId}", job.Id);
|
||||
throw; // Re-throw timeout exceptions
|
||||
}
|
||||
};
|
||||
|
||||
// Execute the backtest
|
||||
var result = await executor.ExecuteAsync(
|
||||
config,
|
||||
candles,
|
||||
user,
|
||||
save: true,
|
||||
withCandles: false,
|
||||
requestId: job.RequestId,
|
||||
metadata: null,
|
||||
progressCallback: progressCallback);
|
||||
// Execute the backtest with timeout
|
||||
var timeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(_options.JobTimeoutMinutes));
|
||||
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token);
|
||||
|
||||
LightBacktest result;
|
||||
try
|
||||
{
|
||||
result = await executor.ExecuteAsync(
|
||||
config,
|
||||
candles,
|
||||
user,
|
||||
save: true,
|
||||
withCandles: false,
|
||||
requestId: job.RequestId,
|
||||
metadata: null,
|
||||
progressCallback: progressCallback);
|
||||
}
|
||||
catch (OperationCanceledException) when (timeoutCts.Token.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
var elapsed = DateTime.UtcNow - jobStartTime;
|
||||
throw new TimeoutException($"Job {job.Id} exceeded timeout of {_options.JobTimeoutMinutes} minutes (ran for {elapsed.TotalMinutes:F1} minutes)");
|
||||
}
|
||||
finally
|
||||
{
|
||||
timeoutCts.Dispose();
|
||||
linkedCts.Dispose();
|
||||
}
|
||||
|
||||
// Update job with result
|
||||
job.Status = JobStatus.Completed;
|
||||
@@ -216,9 +305,10 @@ public class BacktestComputeWorker : BackgroundService
|
||||
// Don't fail the job if this update fails
|
||||
}
|
||||
|
||||
var elapsedTime = DateTime.UtcNow - jobStartTime;
|
||||
_logger.LogInformation(
|
||||
"Completed backtest job {JobId}. Score: {Score}, PnL: {PnL}",
|
||||
job.Id, result.Score, result.FinalPnl);
|
||||
"Completed backtest job {JobId}. Score: {Score}, PnL: {PnL}, Duration: {DurationMinutes:F1} minutes",
|
||||
job.Id, result.Score, result.FinalPnl, elapsedTime.TotalMinutes);
|
||||
|
||||
// Update bundle request if this is part of a bundle
|
||||
if (job.BundleRequestId.HasValue)
|
||||
@@ -226,6 +316,11 @@ public class BacktestComputeWorker : BackgroundService
|
||||
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
_logger.LogInformation("Job {JobId} was cancelled", job.Id);
|
||||
throw; // Re-throw to be handled by the caller
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error processing backtest job {JobId}", job.Id);
|
||||
@@ -234,6 +329,62 @@ public class BacktestComputeWorker : BackgroundService
|
||||
await HandleJobFailure(job, ex, jobRepository, scope.ServiceProvider);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleJobCancellation(Job job)
|
||||
{
|
||||
try
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
|
||||
|
||||
// Reload job to get latest state
|
||||
var currentJob = await jobRepository.GetByIdAsync(job.Id);
|
||||
if (currentJob == null)
|
||||
{
|
||||
_logger.LogWarning("Job {JobId} not found when handling cancellation", job.Id);
|
||||
return;
|
||||
}
|
||||
|
||||
// If job is still running, mark it for retry
|
||||
if (currentJob.Status == JobStatus.Running)
|
||||
{
|
||||
if (currentJob.RetryCount < currentJob.MaxRetries)
|
||||
{
|
||||
currentJob.Status = JobStatus.Pending;
|
||||
currentJob.RetryCount++;
|
||||
var backoffMinutes = Math.Min(Math.Pow(2, currentJob.RetryCount), _options.MaxRetryDelayMinutes);
|
||||
currentJob.RetryAfter = DateTime.UtcNow.AddMinutes(backoffMinutes);
|
||||
currentJob.ErrorMessage = $"Worker shutdown - retry {currentJob.RetryCount}/{currentJob.MaxRetries}";
|
||||
currentJob.FailureCategory = FailureCategory.Transient;
|
||||
currentJob.AssignedWorkerId = null;
|
||||
currentJob.LastHeartbeat = null;
|
||||
|
||||
_logger.LogWarning(
|
||||
"Job {JobId} cancelled during shutdown, will be retried (attempt {RetryCount}/{MaxRetries})",
|
||||
currentJob.Id, currentJob.RetryCount, currentJob.MaxRetries);
|
||||
}
|
||||
else
|
||||
{
|
||||
currentJob.Status = JobStatus.Failed;
|
||||
currentJob.ErrorMessage = "Worker shutdown - exceeded max retries";
|
||||
currentJob.FailureCategory = FailureCategory.SystemError;
|
||||
currentJob.IsRetryable = false;
|
||||
currentJob.CompletedAt = DateTime.UtcNow;
|
||||
currentJob.AssignedWorkerId = null;
|
||||
|
||||
_logger.LogError("Job {JobId} cancelled during shutdown and exceeded max retries", currentJob.Id);
|
||||
|
||||
await NotifyPermanentFailure(currentJob, new OperationCanceledException("Worker shutdown"), scope.ServiceProvider);
|
||||
}
|
||||
|
||||
await jobRepository.UpdateAsync(currentJob);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error handling job cancellation for job {JobId}", job.Id);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task UpdateBundleRequestProgress(Guid bundleRequestId, IServiceProvider serviceProvider)
|
||||
{
|
||||
@@ -373,14 +524,35 @@ public class BacktestComputeWorker : BackgroundService
|
||||
|
||||
// Get stale jobs for this worker
|
||||
var runningJobs = await jobRepository.GetRunningJobsByWorkerIdAsync(_options.WorkerId);
|
||||
var now = DateTime.UtcNow;
|
||||
var staleJobs = runningJobs
|
||||
.Where(j => j.JobType == JobType.Backtest &&
|
||||
(j.LastHeartbeat == null ||
|
||||
j.LastHeartbeat < DateTime.UtcNow.AddMinutes(-_options.StaleJobTimeoutMinutes)))
|
||||
(
|
||||
// Stale heartbeat (no heartbeat in timeout period)
|
||||
j.LastHeartbeat == null ||
|
||||
j.LastHeartbeat < now.AddMinutes(-_options.StaleJobTimeoutMinutes) ||
|
||||
// Job running too long (even with recent heartbeat)
|
||||
(j.StartedAt.HasValue &&
|
||||
j.StartedAt.Value < now.AddMinutes(-_options.JobTimeoutMinutes))
|
||||
))
|
||||
.ToList();
|
||||
|
||||
foreach (var job in staleJobs)
|
||||
{
|
||||
var elapsed = job.StartedAt.HasValue
|
||||
? (DateTime.UtcNow - job.StartedAt.Value).TotalMinutes
|
||||
: (double?)null;
|
||||
var lastHeartbeatAge = job.LastHeartbeat.HasValue
|
||||
? (DateTime.UtcNow - job.LastHeartbeat.Value).TotalMinutes
|
||||
: (double?)null;
|
||||
|
||||
_logger.LogWarning(
|
||||
"Detected stale job {JobId}: Started {StartedAt}, LastHeartbeat: {LastHeartbeat} ({HeartbeatAge} min ago), Elapsed: {Elapsed} min",
|
||||
job.Id, job.StartedAt, job.LastHeartbeat, lastHeartbeatAge, elapsed);
|
||||
|
||||
// Remove from running tasks if still tracked
|
||||
_runningJobTasks.TryRemove(job.Id, out _);
|
||||
|
||||
// If it's stale but retryable, reset to pending with retry count
|
||||
if (job.RetryCount < job.MaxRetries)
|
||||
{
|
||||
@@ -388,7 +560,7 @@ public class BacktestComputeWorker : BackgroundService
|
||||
job.RetryCount++;
|
||||
var backoffMinutes = Math.Min(Math.Pow(2, job.RetryCount), _options.MaxRetryDelayMinutes);
|
||||
job.RetryAfter = DateTime.UtcNow.AddMinutes(backoffMinutes);
|
||||
job.ErrorMessage = $"Worker timeout - retry {job.RetryCount}/{job.MaxRetries}";
|
||||
job.ErrorMessage = $"Job timeout/stale - retry {job.RetryCount}/{job.MaxRetries} (ran for {elapsed:F1} min)";
|
||||
job.FailureCategory = FailureCategory.SystemError;
|
||||
_logger.LogWarning(
|
||||
"Stale job {JobId} will be retried (attempt {RetryCount}/{MaxRetries}) after {RetryAfter}",
|
||||
@@ -398,13 +570,17 @@ public class BacktestComputeWorker : BackgroundService
|
||||
{
|
||||
// Exceeded retries - mark as failed
|
||||
job.Status = JobStatus.Failed;
|
||||
job.ErrorMessage = "Worker timeout - exceeded max retries";
|
||||
job.ErrorMessage = $"Job timeout/stale - exceeded max retries (ran for {elapsed:F1} min)";
|
||||
job.FailureCategory = FailureCategory.SystemError;
|
||||
job.IsRetryable = false;
|
||||
job.CompletedAt = DateTime.UtcNow;
|
||||
|
||||
_logger.LogError(
|
||||
"Stale job {JobId} exceeded max retries after running for {Elapsed} minutes",
|
||||
job.Id, elapsed);
|
||||
|
||||
// Notify permanent failure
|
||||
await NotifyPermanentFailure(job, new TimeoutException("Worker timeout"), scope.ServiceProvider);
|
||||
await NotifyPermanentFailure(job, new TimeoutException($"Job timeout after {elapsed:F1} minutes"), scope.ServiceProvider);
|
||||
|
||||
// Update bundle request if this is part of a bundle
|
||||
if (job.BundleRequestId.HasValue)
|
||||
@@ -423,6 +599,11 @@ public class BacktestComputeWorker : BackgroundService
|
||||
_logger.LogInformation("Processed {Count} stale backtest jobs", staleJobs.Count);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Expected during shutdown, don't log as error
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in stale job recovery loop");
|
||||
@@ -450,6 +631,11 @@ public class BacktestComputeWorker : BackgroundService
|
||||
await jobRepository.UpdateAsync(job);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Expected during shutdown, don't log as error
|
||||
break;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in heartbeat loop");
|
||||
@@ -571,6 +757,8 @@ public class BacktestComputeWorker : BackgroundService
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
_shutdownCts?.Cancel();
|
||||
_shutdownCts?.Dispose();
|
||||
_instanceSemaphore?.Dispose();
|
||||
base.Dispose();
|
||||
}
|
||||
@@ -622,5 +810,15 @@ public class BacktestComputeWorkerOptions
|
||||
/// Maximum retry delay in minutes (cap for exponential backoff)
|
||||
/// </summary>
|
||||
public int MaxRetryDelayMinutes { get; set; } = 60;
|
||||
|
||||
/// <summary>
|
||||
/// Maximum time in minutes a job can run before being considered timed out
|
||||
/// </summary>
|
||||
public int JobTimeoutMinutes { get; set; } = 60;
|
||||
|
||||
/// <summary>
|
||||
/// Timeout in minutes to wait for running jobs during graceful shutdown
|
||||
/// </summary>
|
||||
public int GracefulShutdownTimeoutMinutes { get; set; } = 5;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user