Add jobs
This commit is contained in:
422
src/Managing.Application/Workers/BacktestComputeWorker.cs
Normal file
422
src/Managing.Application/Workers/BacktestComputeWorker.cs
Normal file
@@ -0,0 +1,422 @@
|
||||
using System.Text.Json;
|
||||
using Managing.Application.Abstractions.Repositories;
|
||||
using Managing.Application.Abstractions.Services;
|
||||
using Managing.Application.Backtests;
|
||||
using Managing.Domain.Backtests;
|
||||
using Managing.Domain.Bots;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using static Managing.Common.Enums;
|
||||
|
||||
namespace Managing.Application.Workers;
|
||||
|
||||
/// <summary>
|
||||
/// Background worker that processes backtest jobs from the queue.
|
||||
/// Polls for pending jobs, claims them using advisory locks, and processes them.
|
||||
/// </summary>
|
||||
public class BacktestComputeWorker : BackgroundService
|
||||
{
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly ILogger<BacktestComputeWorker> _logger;
|
||||
private readonly BacktestComputeWorkerOptions _options;
|
||||
private readonly SemaphoreSlim _semaphore;
|
||||
|
||||
public BacktestComputeWorker(
|
||||
IServiceScopeFactory scopeFactory,
|
||||
ILogger<BacktestComputeWorker> logger,
|
||||
IOptions<BacktestComputeWorkerOptions> options)
|
||||
{
|
||||
_scopeFactory = scopeFactory;
|
||||
_logger = logger;
|
||||
_options = options.Value;
|
||||
_semaphore = new SemaphoreSlim(_options.MaxConcurrentBacktests, _options.MaxConcurrentBacktests);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"BacktestComputeWorker starting. WorkerId: {WorkerId}, MaxConcurrent: {MaxConcurrent}, PollInterval: {PollInterval}s",
|
||||
_options.WorkerId, _options.MaxConcurrentBacktests, _options.JobPollIntervalSeconds);
|
||||
|
||||
// Background task for stale job recovery
|
||||
var staleJobRecoveryTask = Task.Run(() => StaleJobRecoveryLoop(stoppingToken), stoppingToken);
|
||||
|
||||
// Background task for heartbeat updates
|
||||
var heartbeatTask = Task.Run(() => HeartbeatLoop(stoppingToken), stoppingToken);
|
||||
|
||||
// Main job processing loop
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await ProcessJobsAsync(stoppingToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in BacktestComputeWorker main loop");
|
||||
SentrySdk.CaptureException(ex);
|
||||
}
|
||||
|
||||
await Task.Delay(TimeSpan.FromSeconds(_options.JobPollIntervalSeconds), stoppingToken);
|
||||
}
|
||||
|
||||
_logger.LogInformation("BacktestComputeWorker stopping");
|
||||
}
|
||||
|
||||
private async Task ProcessJobsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
// Check if we have capacity
|
||||
if (!await _semaphore.WaitAsync(0, cancellationToken))
|
||||
{
|
||||
// At capacity, skip this iteration
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IBacktestJobRepository>();
|
||||
|
||||
// Try to claim a job
|
||||
var job = await jobRepository.ClaimNextJobAsync(_options.WorkerId);
|
||||
|
||||
if (job == null)
|
||||
{
|
||||
// No jobs available, release semaphore
|
||||
_semaphore.Release();
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation("Claimed backtest job {JobId} for worker {WorkerId}", job.Id, _options.WorkerId);
|
||||
|
||||
// Process the job asynchronously (don't await, let it run in background)
|
||||
// Create a new scope for the job processing to ensure proper lifetime management
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await ProcessJobAsync(job, cancellationToken);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_semaphore.Release();
|
||||
}
|
||||
}, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error claiming or processing job");
|
||||
_semaphore.Release();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessJobAsync(
|
||||
BacktestJob job,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IBacktestJobRepository>();
|
||||
var executor = scope.ServiceProvider.GetRequiredService<BacktestExecutor>();
|
||||
var userService = scope.ServiceProvider.GetRequiredService<IUserService>();
|
||||
var exchangeService = scope.ServiceProvider.GetRequiredService<IExchangeService>();
|
||||
|
||||
try
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"Processing backtest job {JobId} (BundleRequestId: {BundleRequestId}, UserId: {UserId})",
|
||||
job.Id, job.BundleRequestId, job.UserId);
|
||||
|
||||
// Deserialize config
|
||||
var config = JsonSerializer.Deserialize<TradingBotConfig>(job.ConfigJson);
|
||||
if (config == null)
|
||||
{
|
||||
throw new InvalidOperationException("Failed to deserialize TradingBotConfig from job");
|
||||
}
|
||||
|
||||
// Load user
|
||||
var user = await userService.GetUserByIdAsync(job.UserId);
|
||||
if (user == null)
|
||||
{
|
||||
throw new InvalidOperationException($"User {job.UserId} not found");
|
||||
}
|
||||
|
||||
// Load candles
|
||||
var candles = await exchangeService.GetCandlesInflux(
|
||||
TradingExchanges.Evm,
|
||||
config.Ticker,
|
||||
job.StartDate,
|
||||
config.Timeframe,
|
||||
job.EndDate);
|
||||
|
||||
if (candles == null || candles.Count == 0)
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
$"No candles found for {config.Ticker} on {config.Timeframe} from {job.StartDate} to {job.EndDate}");
|
||||
}
|
||||
|
||||
// Progress callback to update job progress
|
||||
Func<int, Task> progressCallback = async (percentage) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
job.ProgressPercentage = percentage;
|
||||
job.LastHeartbeat = DateTime.UtcNow;
|
||||
await jobRepository.UpdateAsync(job);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error updating job progress for job {JobId}", job.Id);
|
||||
}
|
||||
};
|
||||
|
||||
// Execute the backtest
|
||||
var result = await executor.ExecuteAsync(
|
||||
config,
|
||||
candles,
|
||||
user,
|
||||
save: true,
|
||||
withCandles: false,
|
||||
requestId: job.RequestId,
|
||||
metadata: null,
|
||||
progressCallback: progressCallback);
|
||||
|
||||
// Update job with result
|
||||
job.Status = BacktestJobStatus.Completed;
|
||||
job.ProgressPercentage = 100;
|
||||
job.ResultJson = JsonSerializer.Serialize(result);
|
||||
job.CompletedAt = DateTime.UtcNow;
|
||||
job.LastHeartbeat = DateTime.UtcNow;
|
||||
|
||||
await jobRepository.UpdateAsync(job);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Completed backtest job {JobId}. Score: {Score}, PnL: {PnL}",
|
||||
job.Id, result.Score, result.FinalPnl);
|
||||
|
||||
// Update bundle request if this is part of a bundle
|
||||
if (job.BundleRequestId.HasValue)
|
||||
{
|
||||
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error processing backtest job {JobId}", job.Id);
|
||||
SentrySdk.CaptureException(ex);
|
||||
|
||||
// Update job status to failed
|
||||
try
|
||||
{
|
||||
job.Status = BacktestJobStatus.Failed;
|
||||
job.ErrorMessage = ex.Message;
|
||||
job.CompletedAt = DateTime.UtcNow;
|
||||
await jobRepository.UpdateAsync(job);
|
||||
|
||||
// Update bundle request if this is part of a bundle
|
||||
if (job.BundleRequestId.HasValue)
|
||||
{
|
||||
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider);
|
||||
}
|
||||
}
|
||||
catch (Exception updateEx)
|
||||
{
|
||||
_logger.LogError(updateEx, "Error updating job {JobId} status to failed", job.Id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task UpdateBundleRequestProgress(Guid bundleRequestId, IServiceProvider serviceProvider)
|
||||
{
|
||||
try
|
||||
{
|
||||
var backtestRepository = serviceProvider.GetRequiredService<IBacktestRepository>();
|
||||
var jobRepository = serviceProvider.GetRequiredService<IBacktestJobRepository>();
|
||||
var userService = serviceProvider.GetRequiredService<IUserService>();
|
||||
|
||||
// Get all jobs for this bundle
|
||||
var jobs = await jobRepository.GetByBundleRequestIdAsync(bundleRequestId);
|
||||
var completedJobs = jobs.Count(j => j.Status == BacktestJobStatus.Completed);
|
||||
var failedJobs = jobs.Count(j => j.Status == BacktestJobStatus.Failed);
|
||||
var runningJobs = jobs.Count(j => j.Status == BacktestJobStatus.Running);
|
||||
var totalJobs = jobs.Count();
|
||||
|
||||
if (totalJobs == 0)
|
||||
{
|
||||
return; // No jobs yet
|
||||
}
|
||||
|
||||
// Get user from first job
|
||||
var firstJob = jobs.First();
|
||||
var user = await userService.GetUserByIdAsync(firstJob.UserId);
|
||||
if (user == null)
|
||||
{
|
||||
_logger.LogWarning("User {UserId} not found for bundle request {BundleRequestId}", firstJob.UserId, bundleRequestId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Get bundle request
|
||||
var bundleRequest = backtestRepository.GetBundleBacktestRequestByIdForUser(user, bundleRequestId);
|
||||
if (bundleRequest == null)
|
||||
{
|
||||
_logger.LogWarning("Bundle request {BundleRequestId} not found for user {UserId}", bundleRequestId, user.Id);
|
||||
return;
|
||||
}
|
||||
|
||||
// Update bundle request progress
|
||||
bundleRequest.CompletedBacktests = completedJobs;
|
||||
bundleRequest.FailedBacktests = failedJobs;
|
||||
|
||||
// Update status based on job states
|
||||
if (completedJobs + failedJobs == totalJobs)
|
||||
{
|
||||
// All jobs completed or failed
|
||||
if (failedJobs == 0)
|
||||
{
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Completed;
|
||||
}
|
||||
else if (completedJobs == 0)
|
||||
{
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Failed;
|
||||
bundleRequest.ErrorMessage = "All backtests failed";
|
||||
}
|
||||
else
|
||||
{
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Completed;
|
||||
bundleRequest.ErrorMessage = $"{failedJobs} backtests failed";
|
||||
}
|
||||
bundleRequest.CompletedAt = DateTime.UtcNow;
|
||||
bundleRequest.CurrentBacktest = null;
|
||||
}
|
||||
else if (runningJobs > 0)
|
||||
{
|
||||
// Some jobs still running
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Running;
|
||||
}
|
||||
|
||||
// Update results list from completed jobs
|
||||
var completedJobResults = jobs
|
||||
.Where(j => j.Status == BacktestJobStatus.Completed && !string.IsNullOrEmpty(j.ResultJson))
|
||||
.Select(j =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var result = JsonSerializer.Deserialize<LightBacktest>(j.ResultJson);
|
||||
return result?.Id;
|
||||
}
|
||||
catch
|
||||
{
|
||||
return null;
|
||||
}
|
||||
})
|
||||
.Where(id => !string.IsNullOrEmpty(id))
|
||||
.ToList();
|
||||
|
||||
bundleRequest.Results = completedJobResults!;
|
||||
|
||||
await backtestRepository.UpdateBundleBacktestRequestAsync(bundleRequest);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Updated bundle request {BundleRequestId} progress: {Completed}/{Total} completed, {Failed} failed, {Running} running",
|
||||
bundleRequestId, completedJobs, totalJobs, failedJobs, runningJobs);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error updating bundle request {BundleRequestId} progress", bundleRequestId);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task StaleJobRecoveryLoop(CancellationToken cancellationToken)
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken); // Check every minute
|
||||
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IBacktestJobRepository>();
|
||||
|
||||
var resetCount = await jobRepository.ResetStaleJobsAsync(_options.StaleJobTimeoutMinutes);
|
||||
|
||||
if (resetCount > 0)
|
||||
{
|
||||
_logger.LogInformation("Reset {Count} stale backtest jobs back to Pending status", resetCount);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in stale job recovery loop");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HeartbeatLoop(CancellationToken cancellationToken)
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromSeconds(_options.HeartbeatIntervalSeconds), cancellationToken);
|
||||
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IBacktestJobRepository>();
|
||||
|
||||
// Update heartbeat for all jobs assigned to this worker
|
||||
var runningJobs = await jobRepository.GetRunningJobsByWorkerIdAsync(_options.WorkerId);
|
||||
|
||||
foreach (var job in runningJobs)
|
||||
{
|
||||
job.LastHeartbeat = DateTime.UtcNow;
|
||||
await jobRepository.UpdateAsync(job);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in heartbeat loop");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
_semaphore?.Dispose();
|
||||
base.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Configuration options for BacktestComputeWorker
|
||||
/// </summary>
|
||||
public class BacktestComputeWorkerOptions
|
||||
{
|
||||
public const string SectionName = "BacktestComputeWorker";
|
||||
|
||||
/// <summary>
|
||||
/// Unique identifier for this worker instance
|
||||
/// </summary>
|
||||
public string WorkerId { get; set; } = Environment.MachineName;
|
||||
|
||||
/// <summary>
|
||||
/// Maximum number of concurrent backtests to process
|
||||
/// </summary>
|
||||
public int MaxConcurrentBacktests { get; set; } = 6;
|
||||
|
||||
/// <summary>
|
||||
/// Interval in seconds between job polling attempts
|
||||
/// </summary>
|
||||
public int JobPollIntervalSeconds { get; set; } = 5;
|
||||
|
||||
/// <summary>
|
||||
/// Interval in seconds between heartbeat updates
|
||||
/// </summary>
|
||||
public int HeartbeatIntervalSeconds { get; set; } = 30;
|
||||
|
||||
/// <summary>
|
||||
/// Timeout in minutes for considering a job stale
|
||||
/// </summary>
|
||||
public int StaleJobTimeoutMinutes { get; set; } = 5;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user