Add test for executor

This commit is contained in:
2025-11-11 02:15:57 +07:00
parent d02a07f86b
commit e8e2ec5a43
18 changed files with 81418 additions and 170 deletions

View File

@@ -24,6 +24,7 @@ public class BacktestComputeWorker : BackgroundService
private readonly BacktestComputeWorkerOptions _options;
private readonly SemaphoreSlim _instanceSemaphore;
private readonly ConcurrentDictionary<Guid, Task> _runningJobTasks = new();
private readonly ConcurrentDictionary<Guid, JobProgressTracker> _jobProgressTrackers = new();
private readonly CancellationTokenSource _shutdownCts = new();
public BacktestComputeWorker(
@@ -54,6 +55,9 @@ public class BacktestComputeWorker : BackgroundService
// Background task for heartbeat updates
var heartbeatTask = Task.Run(() => HeartbeatLoop(cancellationToken), cancellationToken);
// Background task for progress persistence
var progressPersistenceTask = Task.Run(() => ProgressPersistenceLoop(cancellationToken), cancellationToken);
// Main job processing loop
try
{
@@ -230,30 +234,27 @@ public class BacktestComputeWorker : BackgroundService
$"No candles found for {config.Ticker} on {config.Timeframe} from {job.StartDate} to {job.EndDate}");
}
// Progress callback to update job progress
Func<int, Task> progressCallback = async (percentage) =>
// Create progress tracker for this job
var progressTracker = new JobProgressTracker(job.Id, _logger);
_jobProgressTrackers.TryAdd(job.Id, progressTracker);
// Progress callback that only updates in-memory progress (non-blocking)
Func<int, Task> progressCallback = (percentage) =>
{
try
// Check if job has been running too long
var elapsed = DateTime.UtcNow - jobStartTime;
if (elapsed.TotalMinutes > _options.JobTimeoutMinutes)
{
// Check if job has been running too long
var elapsed = DateTime.UtcNow - jobStartTime;
if (elapsed.TotalMinutes > _options.JobTimeoutMinutes)
{
_logger.LogWarning(
"Job {JobId} has been running for {ElapsedMinutes} minutes, exceeding timeout of {TimeoutMinutes} minutes",
job.Id, elapsed.TotalMinutes, _options.JobTimeoutMinutes);
throw new TimeoutException($"Job exceeded timeout of {_options.JobTimeoutMinutes} minutes");
}
job.ProgressPercentage = percentage;
job.LastHeartbeat = DateTime.UtcNow;
await jobRepository.UpdateAsync(job);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error updating job progress for job {JobId}", job.Id);
throw; // Re-throw timeout exceptions
_logger.LogWarning(
"Job {JobId} has been running for {ElapsedMinutes} minutes, exceeding timeout of {TimeoutMinutes} minutes",
job.Id, elapsed.TotalMinutes, _options.JobTimeoutMinutes);
throw new TimeoutException($"Job exceeded timeout of {_options.JobTimeoutMinutes} minutes");
}
// Update progress in memory only - persistence happens in background
progressTracker.UpdateProgress(percentage);
return Task.CompletedTask; // Non-blocking
};
// Execute the backtest with timeout
@@ -270,6 +271,7 @@ public class BacktestComputeWorker : BackgroundService
save: true,
withCandles: false,
requestId: job.RequestId,
bundleRequestId: job.BundleRequestId,
metadata: null,
progressCallback: progressCallback);
}
@@ -293,6 +295,9 @@ public class BacktestComputeWorker : BackgroundService
await jobRepository.UpdateAsync(job);
// Clean up progress tracker
_jobProgressTrackers.TryRemove(job.Id, out _);
// Increment backtest count for the user's agent summary
try
{
@@ -310,11 +315,7 @@ public class BacktestComputeWorker : BackgroundService
"Completed backtest job {JobId}. Score: {Score}, PnL: {PnL}, Duration: {DurationMinutes:F1} minutes",
job.Id, result.Score, result.FinalPnl, elapsedTime.TotalMinutes);
// Update bundle request if this is part of a bundle
if (job.BundleRequestId.HasValue)
{
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider);
}
// Bundle request is now updated in the BacktestExecutor
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
@@ -326,6 +327,9 @@ public class BacktestComputeWorker : BackgroundService
_logger.LogError(ex, "Error processing backtest job {JobId}", job.Id);
SentrySdk.CaptureException(ex);
// Clean up progress tracker on failure
_jobProgressTrackers.TryRemove(job.Id, out _);
await HandleJobFailure(job, ex, jobRepository, scope.ServiceProvider);
}
}
@@ -624,7 +628,7 @@ public class BacktestComputeWorker : BackgroundService
// Update heartbeat for all jobs assigned to this worker
var runningJobs = await jobRepository.GetRunningJobsByWorkerIdAsync(_options.WorkerId);
foreach (var job in runningJobs)
{
job.LastHeartbeat = DateTime.UtcNow;
@@ -643,6 +647,62 @@ public class BacktestComputeWorker : BackgroundService
}
}
private async Task ProgressPersistenceLoop(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(2), cancellationToken); // Check every 2 seconds
using var scope = _scopeFactory.CreateScope();
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
// Process all progress trackers that need persistence
var trackersToPersist = _jobProgressTrackers
.Where(kvp => kvp.Value.ShouldPersist())
.ToList();
if (trackersToPersist.Count > 0)
{
_logger.LogDebug("Persisting progress for {Count} jobs", trackersToPersist.Count);
foreach (var (jobId, tracker) in trackersToPersist)
{
try
{
var (percentage, lastUpdate) = tracker.GetProgressForPersistence();
// Get and update the job
var job = await jobRepository.GetByIdAsync(jobId);
if (job != null && job.Status == JobStatus.Running)
{
job.ProgressPercentage = percentage;
job.LastHeartbeat = lastUpdate;
await jobRepository.UpdateAsync(job);
_logger.LogDebug("Persisted progress {Percentage}% for job {JobId}", percentage, jobId);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Error persisting progress for job {JobId}", jobId);
}
}
}
}
catch (OperationCanceledException)
{
// Expected during shutdown, don't log as error
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in progress persistence loop");
}
}
}
private async Task HandleJobFailure(
Job job,
Exception ex,
@@ -764,6 +824,73 @@ public class BacktestComputeWorker : BackgroundService
}
}
/// <summary>
/// Tracks job progress with batched database updates for performance optimization
/// </summary>
public class JobProgressTracker
{
private readonly object _lock = new();
private int _lastPersistedPercentage;
private DateTime _lastPersistedTime;
private readonly ILogger _logger;
public Guid JobId { get; }
public int CurrentPercentage { get; private set; }
public DateTime LastUpdateTime { get; private set; }
public JobProgressTracker(Guid jobId, ILogger logger)
{
JobId = jobId;
_logger = logger;
_lastPersistedTime = DateTime.UtcNow;
}
/// <summary>
/// Updates progress in memory only - thread safe
/// </summary>
public void UpdateProgress(int percentage)
{
lock (_lock)
{
CurrentPercentage = percentage;
LastUpdateTime = DateTime.UtcNow;
}
}
/// <summary>
/// Checks if progress should be persisted to database based on time/percentage thresholds
/// </summary>
public bool ShouldPersist(int progressUpdateIntervalMs = 5000, int percentageThreshold = 5)
{
lock (_lock)
{
var timeSinceLastPersist = (DateTime.UtcNow - _lastPersistedTime).TotalMilliseconds;
var percentageSinceLastPersist = CurrentPercentage - _lastPersistedPercentage;
return timeSinceLastPersist >= progressUpdateIntervalMs ||
percentageSinceLastPersist >= percentageThreshold ||
CurrentPercentage >= 100; // Always persist completion
}
}
/// <summary>
/// Gets current progress and marks as persisted
/// </summary>
public (int percentage, DateTime lastUpdate) GetProgressForPersistence()
{
lock (_lock)
{
var percentage = CurrentPercentage;
var lastUpdate = LastUpdateTime;
_lastPersistedPercentage = percentage;
_lastPersistedTime = DateTime.UtcNow;
return (percentage, lastUpdate);
}
}
}
/// <summary>
/// Configuration options for BacktestComputeWorker
/// </summary>