Add genetic backtest to worker
This commit is contained in:
@@ -77,10 +77,10 @@ public class BacktestComputeWorker : BackgroundService
|
||||
try
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IBacktestJobRepository>();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
|
||||
|
||||
// Try to claim a job
|
||||
var job = await jobRepository.ClaimNextJobAsync(_options.WorkerId);
|
||||
// Try to claim a backtest job (exclude genetic jobs)
|
||||
var job = await jobRepository.ClaimNextJobAsync(_options.WorkerId, JobType.Backtest);
|
||||
|
||||
if (job == null)
|
||||
{
|
||||
@@ -114,11 +114,11 @@ public class BacktestComputeWorker : BackgroundService
|
||||
}
|
||||
|
||||
private async Task ProcessJobAsync(
|
||||
BacktestJob job,
|
||||
Job job,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IBacktestJobRepository>();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
|
||||
var executor = scope.ServiceProvider.GetRequiredService<BacktestExecutor>();
|
||||
var userService = scope.ServiceProvider.GetRequiredService<IUserService>();
|
||||
var exchangeService = scope.ServiceProvider.GetRequiredService<IExchangeService>();
|
||||
@@ -184,7 +184,7 @@ public class BacktestComputeWorker : BackgroundService
|
||||
progressCallback: progressCallback);
|
||||
|
||||
// Update job with result
|
||||
job.Status = BacktestJobStatus.Completed;
|
||||
job.Status = JobStatus.Completed;
|
||||
job.ProgressPercentage = 100;
|
||||
job.ResultJson = JsonSerializer.Serialize(result);
|
||||
job.CompletedAt = DateTime.UtcNow;
|
||||
@@ -207,24 +207,7 @@ public class BacktestComputeWorker : BackgroundService
|
||||
_logger.LogError(ex, "Error processing backtest job {JobId}", job.Id);
|
||||
SentrySdk.CaptureException(ex);
|
||||
|
||||
// Update job status to failed
|
||||
try
|
||||
{
|
||||
job.Status = BacktestJobStatus.Failed;
|
||||
job.ErrorMessage = ex.Message;
|
||||
job.CompletedAt = DateTime.UtcNow;
|
||||
await jobRepository.UpdateAsync(job);
|
||||
|
||||
// Update bundle request if this is part of a bundle
|
||||
if (job.BundleRequestId.HasValue)
|
||||
{
|
||||
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider);
|
||||
}
|
||||
}
|
||||
catch (Exception updateEx)
|
||||
{
|
||||
_logger.LogError(updateEx, "Error updating job {JobId} status to failed", job.Id);
|
||||
}
|
||||
await HandleJobFailure(job, ex, jobRepository, scope.ServiceProvider);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -233,14 +216,15 @@ public class BacktestComputeWorker : BackgroundService
|
||||
try
|
||||
{
|
||||
var backtestRepository = serviceProvider.GetRequiredService<IBacktestRepository>();
|
||||
var jobRepository = serviceProvider.GetRequiredService<IBacktestJobRepository>();
|
||||
var jobRepository = serviceProvider.GetRequiredService<IJobRepository>();
|
||||
var userService = serviceProvider.GetRequiredService<IUserService>();
|
||||
var webhookService = serviceProvider.GetRequiredService<IWebhookService>();
|
||||
|
||||
// Get all jobs for this bundle
|
||||
var jobs = await jobRepository.GetByBundleRequestIdAsync(bundleRequestId);
|
||||
var completedJobs = jobs.Count(j => j.Status == BacktestJobStatus.Completed);
|
||||
var failedJobs = jobs.Count(j => j.Status == BacktestJobStatus.Failed);
|
||||
var runningJobs = jobs.Count(j => j.Status == BacktestJobStatus.Running);
|
||||
var completedJobs = jobs.Count(j => j.Status == JobStatus.Completed);
|
||||
var failedJobs = jobs.Count(j => j.Status == JobStatus.Failed);
|
||||
var runningJobs = jobs.Count(j => j.Status == JobStatus.Running);
|
||||
var totalJobs = jobs.Count();
|
||||
|
||||
if (totalJobs == 0)
|
||||
@@ -265,6 +249,8 @@ public class BacktestComputeWorker : BackgroundService
|
||||
return;
|
||||
}
|
||||
|
||||
var previousStatus = bundleRequest.Status;
|
||||
|
||||
// Update bundle request progress
|
||||
bundleRequest.CompletedBacktests = completedJobs;
|
||||
bundleRequest.FailedBacktests = failedJobs;
|
||||
@@ -298,7 +284,7 @@ public class BacktestComputeWorker : BackgroundService
|
||||
|
||||
// Update results list from completed jobs
|
||||
var completedJobResults = jobs
|
||||
.Where(j => j.Status == BacktestJobStatus.Completed && !string.IsNullOrEmpty(j.ResultJson))
|
||||
.Where(j => j.Status == JobStatus.Completed && !string.IsNullOrEmpty(j.ResultJson))
|
||||
.Select(j =>
|
||||
{
|
||||
try
|
||||
@@ -318,6 +304,28 @@ public class BacktestComputeWorker : BackgroundService
|
||||
|
||||
await backtestRepository.UpdateBundleBacktestRequestAsync(bundleRequest);
|
||||
|
||||
// Send webhook notification if bundle request just completed
|
||||
if (previousStatus != BundleBacktestRequestStatus.Completed &&
|
||||
bundleRequest.Status == BundleBacktestRequestStatus.Completed &&
|
||||
!string.IsNullOrEmpty(user.TelegramChannel))
|
||||
{
|
||||
var message = $"✅ Bundle backtest '{bundleRequest.Name}' (ID: {bundleRequest.RequestId}) completed successfully. " +
|
||||
$"Completed: {completedJobs}/{totalJobs} backtests" +
|
||||
(failedJobs > 0 ? $", Failed: {failedJobs}" : "") +
|
||||
$". Results: {completedJobResults.Count} backtest(s) generated.";
|
||||
|
||||
await webhookService.SendMessage(message, user.TelegramChannel);
|
||||
}
|
||||
else if (previousStatus != BundleBacktestRequestStatus.Failed &&
|
||||
bundleRequest.Status == BundleBacktestRequestStatus.Failed &&
|
||||
!string.IsNullOrEmpty(user.TelegramChannel))
|
||||
{
|
||||
var message = $"❌ Bundle backtest '{bundleRequest.Name}' (ID: {bundleRequest.RequestId}) failed. " +
|
||||
$"All {totalJobs} backtests failed. Error: {bundleRequest.ErrorMessage}";
|
||||
|
||||
await webhookService.SendMessage(message, user.TelegramChannel);
|
||||
}
|
||||
|
||||
_logger.LogInformation(
|
||||
"Updated bundle request {BundleRequestId} progress: {Completed}/{Total} completed, {Failed} failed, {Running} running",
|
||||
bundleRequestId, completedJobs, totalJobs, failedJobs, runningJobs);
|
||||
@@ -337,13 +345,58 @@ public class BacktestComputeWorker : BackgroundService
|
||||
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken); // Check every minute
|
||||
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IBacktestJobRepository>();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
|
||||
|
||||
var resetCount = await jobRepository.ResetStaleJobsAsync(_options.StaleJobTimeoutMinutes);
|
||||
// Get stale jobs for this worker
|
||||
var runningJobs = await jobRepository.GetRunningJobsByWorkerIdAsync(_options.WorkerId);
|
||||
var staleJobs = runningJobs
|
||||
.Where(j => j.JobType == JobType.Backtest &&
|
||||
(j.LastHeartbeat == null ||
|
||||
j.LastHeartbeat < DateTime.UtcNow.AddMinutes(-_options.StaleJobTimeoutMinutes)))
|
||||
.ToList();
|
||||
|
||||
if (resetCount > 0)
|
||||
foreach (var job in staleJobs)
|
||||
{
|
||||
_logger.LogInformation("Reset {Count} stale backtest jobs back to Pending status", resetCount);
|
||||
// If it's stale but retryable, reset to pending with retry count
|
||||
if (job.RetryCount < job.MaxRetries)
|
||||
{
|
||||
job.Status = JobStatus.Pending;
|
||||
job.RetryCount++;
|
||||
var backoffMinutes = Math.Min(Math.Pow(2, job.RetryCount), _options.MaxRetryDelayMinutes);
|
||||
job.RetryAfter = DateTime.UtcNow.AddMinutes(backoffMinutes);
|
||||
job.ErrorMessage = $"Worker timeout - retry {job.RetryCount}/{job.MaxRetries}";
|
||||
job.FailureCategory = FailureCategory.SystemError;
|
||||
_logger.LogWarning(
|
||||
"Stale job {JobId} will be retried (attempt {RetryCount}/{MaxRetries}) after {RetryAfter}",
|
||||
job.Id, job.RetryCount, job.MaxRetries, job.RetryAfter);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Exceeded retries - mark as failed
|
||||
job.Status = JobStatus.Failed;
|
||||
job.ErrorMessage = "Worker timeout - exceeded max retries";
|
||||
job.FailureCategory = FailureCategory.SystemError;
|
||||
job.IsRetryable = false;
|
||||
job.CompletedAt = DateTime.UtcNow;
|
||||
|
||||
// Notify permanent failure
|
||||
await NotifyPermanentFailure(job, new TimeoutException("Worker timeout"), scope.ServiceProvider);
|
||||
|
||||
// Update bundle request if this is part of a bundle
|
||||
if (job.BundleRequestId.HasValue)
|
||||
{
|
||||
await UpdateBundleRequestProgress(job.BundleRequestId.Value, scope.ServiceProvider);
|
||||
}
|
||||
}
|
||||
|
||||
job.AssignedWorkerId = null;
|
||||
job.LastHeartbeat = null;
|
||||
await jobRepository.UpdateAsync(job);
|
||||
}
|
||||
|
||||
if (staleJobs.Count > 0)
|
||||
{
|
||||
_logger.LogInformation("Processed {Count} stale backtest jobs", staleJobs.Count);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
@@ -362,7 +415,7 @@ public class BacktestComputeWorker : BackgroundService
|
||||
await Task.Delay(TimeSpan.FromSeconds(_options.HeartbeatIntervalSeconds), cancellationToken);
|
||||
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IBacktestJobRepository>();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
|
||||
|
||||
// Update heartbeat for all jobs assigned to this worker
|
||||
var runningJobs = await jobRepository.GetRunningJobsByWorkerIdAsync(_options.WorkerId);
|
||||
@@ -380,6 +433,118 @@ public class BacktestComputeWorker : BackgroundService
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleJobFailure(
|
||||
Job job,
|
||||
Exception ex,
|
||||
IJobRepository jobRepository,
|
||||
IServiceProvider serviceProvider)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Categorize the failure
|
||||
var failureCategory = CategorizeFailure(ex);
|
||||
var isRetryable = IsRetryableFailure(ex, failureCategory);
|
||||
|
||||
// Check if we should retry
|
||||
if (isRetryable && job.RetryCount < job.MaxRetries)
|
||||
{
|
||||
// Calculate exponential backoff: 2^retryCount minutes, capped at MaxRetryDelayMinutes
|
||||
var backoffMinutes = Math.Min(Math.Pow(2, job.RetryCount), _options.MaxRetryDelayMinutes);
|
||||
job.RetryAfter = DateTime.UtcNow.AddMinutes(backoffMinutes);
|
||||
job.RetryCount++;
|
||||
job.Status = JobStatus.Pending; // Reset to pending for retry
|
||||
job.AssignedWorkerId = null; // Allow any worker to pick it up
|
||||
job.ErrorMessage = $"Retry {job.RetryCount}/{job.MaxRetries}: {ex.Message}";
|
||||
job.FailureCategory = failureCategory;
|
||||
job.IsRetryable = true;
|
||||
|
||||
_logger.LogWarning(
|
||||
"Job {JobId} will be retried (attempt {RetryCount}/{MaxRetries}) after {RetryAfter}. Error: {Error}",
|
||||
job.Id, job.RetryCount, job.MaxRetries, job.RetryAfter, ex.Message);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Permanent failure - mark as failed
|
||||
job.Status = JobStatus.Failed;
|
||||
job.ErrorMessage = ex.Message;
|
||||
job.FailureCategory = failureCategory;
|
||||
job.IsRetryable = false;
|
||||
job.CompletedAt = DateTime.UtcNow;
|
||||
|
||||
_logger.LogError(
|
||||
"Job {JobId} failed permanently after {RetryCount} retries. Error: {Error}",
|
||||
job.Id, job.RetryCount, ex.Message);
|
||||
|
||||
// Send notification for permanent failure
|
||||
await NotifyPermanentFailure(job, ex, serviceProvider);
|
||||
|
||||
// Update bundle request if this is part of a bundle
|
||||
if (job.BundleRequestId.HasValue)
|
||||
{
|
||||
await UpdateBundleRequestProgress(job.BundleRequestId.Value, serviceProvider);
|
||||
}
|
||||
}
|
||||
|
||||
job.LastHeartbeat = DateTime.UtcNow;
|
||||
await jobRepository.UpdateAsync(job);
|
||||
}
|
||||
catch (Exception updateEx)
|
||||
{
|
||||
_logger.LogError(updateEx, "Failed to update job {JobId} status after failure", job.Id);
|
||||
}
|
||||
}
|
||||
|
||||
private FailureCategory CategorizeFailure(Exception ex)
|
||||
{
|
||||
return ex switch
|
||||
{
|
||||
TimeoutException => FailureCategory.Transient,
|
||||
TaskCanceledException => FailureCategory.Transient,
|
||||
HttpRequestException => FailureCategory.Transient,
|
||||
InvalidOperationException when ex.Message.Contains("candles") || ex.Message.Contains("No candles") => FailureCategory.DataError,
|
||||
InvalidOperationException when ex.Message.Contains("User") || ex.Message.Contains("not found") => FailureCategory.UserError,
|
||||
OutOfMemoryException => FailureCategory.SystemError,
|
||||
_ => FailureCategory.Unknown
|
||||
};
|
||||
}
|
||||
|
||||
private bool IsRetryableFailure(Exception ex, FailureCategory category)
|
||||
{
|
||||
// Don't retry user errors or data errors (missing candles, invalid config)
|
||||
if (category == FailureCategory.UserError || category == FailureCategory.DataError)
|
||||
return false;
|
||||
|
||||
// Retry transient and system errors
|
||||
return category == FailureCategory.Transient || category == FailureCategory.SystemError;
|
||||
}
|
||||
|
||||
private async Task NotifyPermanentFailure(
|
||||
Job job,
|
||||
Exception ex,
|
||||
IServiceProvider serviceProvider)
|
||||
{
|
||||
try
|
||||
{
|
||||
var webhookService = serviceProvider.GetRequiredService<IWebhookService>();
|
||||
const string alertsChannel = "2676086723";
|
||||
|
||||
var jobTypeName = job.JobType == JobType.Genetic ? "Genetic" : "Backtest";
|
||||
var message = $"🚨 **{jobTypeName} Job Failed Permanently**\n" +
|
||||
$"Job ID: `{job.Id}`\n" +
|
||||
$"User ID: {job.UserId}\n" +
|
||||
$"Retry Attempts: {job.RetryCount}/{job.MaxRetries}\n" +
|
||||
$"Failure Category: {job.FailureCategory}\n" +
|
||||
$"Error: {ex.Message}\n" +
|
||||
$"Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC";
|
||||
|
||||
await webhookService.SendMessage(message, alertsChannel);
|
||||
}
|
||||
catch (Exception notifyEx)
|
||||
{
|
||||
_logger.LogError(notifyEx, "Failed to send permanent failure notification for job {JobId}", job.Id);
|
||||
}
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
_semaphore?.Dispose();
|
||||
@@ -418,5 +583,15 @@ public class BacktestComputeWorkerOptions
|
||||
/// Timeout in minutes for considering a job stale
|
||||
/// </summary>
|
||||
public int StaleJobTimeoutMinutes { get; set; } = 5;
|
||||
|
||||
/// <summary>
|
||||
/// Default maximum retry attempts for failed jobs
|
||||
/// </summary>
|
||||
public int DefaultMaxRetries { get; set; } = 3;
|
||||
|
||||
/// <summary>
|
||||
/// Maximum retry delay in minutes (cap for exponential backoff)
|
||||
/// </summary>
|
||||
public int MaxRetryDelayMinutes { get; set; } = 60;
|
||||
}
|
||||
|
||||
|
||||
429
src/Managing.Application/Workers/GeneticComputeWorker.cs
Normal file
429
src/Managing.Application/Workers/GeneticComputeWorker.cs
Normal file
@@ -0,0 +1,429 @@
|
||||
using System.Text.Json;
|
||||
using Managing.Application.Abstractions.Repositories;
|
||||
using Managing.Application.Abstractions.Services;
|
||||
using Managing.Application.Backtests;
|
||||
using Managing.Domain.Backtests;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
using static Managing.Common.Enums;
|
||||
|
||||
namespace Managing.Application.Workers;
|
||||
|
||||
/// <summary>
|
||||
/// Background worker that processes genetic algorithm jobs from the queue.
|
||||
/// Polls for pending genetic jobs, claims them using advisory locks, and processes them.
|
||||
/// </summary>
|
||||
public class GeneticComputeWorker : BackgroundService
|
||||
{
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly ILogger<GeneticComputeWorker> _logger;
|
||||
private readonly GeneticComputeWorkerOptions _options;
|
||||
private readonly SemaphoreSlim _semaphore;
|
||||
|
||||
public GeneticComputeWorker(
|
||||
IServiceScopeFactory scopeFactory,
|
||||
ILogger<GeneticComputeWorker> logger,
|
||||
IOptions<GeneticComputeWorkerOptions> options)
|
||||
{
|
||||
_scopeFactory = scopeFactory;
|
||||
_logger = logger;
|
||||
_options = options.Value;
|
||||
_semaphore = new SemaphoreSlim(_options.MaxConcurrentGenetics, _options.MaxConcurrentGenetics);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"GeneticComputeWorker starting. WorkerId: {WorkerId}, MaxConcurrent: {MaxConcurrent}, PollInterval: {PollInterval}s",
|
||||
_options.WorkerId, _options.MaxConcurrentGenetics, _options.JobPollIntervalSeconds);
|
||||
|
||||
// Background task for stale job recovery
|
||||
var staleJobRecoveryTask = Task.Run(() => StaleJobRecoveryLoop(stoppingToken), stoppingToken);
|
||||
|
||||
// Background task for heartbeat updates
|
||||
var heartbeatTask = Task.Run(() => HeartbeatLoop(stoppingToken), stoppingToken);
|
||||
|
||||
// Main job processing loop
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await ProcessJobsAsync(stoppingToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in GeneticComputeWorker main loop");
|
||||
SentrySdk.CaptureException(ex);
|
||||
}
|
||||
|
||||
await Task.Delay(TimeSpan.FromSeconds(_options.JobPollIntervalSeconds), stoppingToken);
|
||||
}
|
||||
|
||||
_logger.LogInformation("GeneticComputeWorker stopping");
|
||||
}
|
||||
|
||||
private async Task ProcessJobsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
// Check if we have capacity
|
||||
if (!await _semaphore.WaitAsync(0, cancellationToken))
|
||||
{
|
||||
// At capacity, skip this iteration
|
||||
return;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
|
||||
|
||||
// Try to claim a genetic job
|
||||
var job = await jobRepository.ClaimNextJobAsync(_options.WorkerId, JobType.Genetic);
|
||||
|
||||
if (job == null)
|
||||
{
|
||||
// No jobs available, release semaphore
|
||||
_semaphore.Release();
|
||||
return;
|
||||
}
|
||||
|
||||
_logger.LogInformation("Claimed genetic job {JobId} for worker {WorkerId}", job.Id, _options.WorkerId);
|
||||
|
||||
// Process the job asynchronously (don't await, let it run in background)
|
||||
// Create a new scope for the job processing to ensure proper lifetime management
|
||||
_ = Task.Run(async () =>
|
||||
{
|
||||
try
|
||||
{
|
||||
await ProcessJobAsync(job, cancellationToken);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_semaphore.Release();
|
||||
}
|
||||
}, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error claiming or processing genetic job");
|
||||
_semaphore.Release();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessJobAsync(
|
||||
Job job,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
|
||||
var geneticExecutor = scope.ServiceProvider.GetRequiredService<GeneticExecutor>();
|
||||
|
||||
try
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"Processing genetic job {JobId} (GeneticRequestId: {GeneticRequestId}, UserId: {UserId})",
|
||||
job.Id, job.GeneticRequestId, job.UserId);
|
||||
|
||||
if (string.IsNullOrEmpty(job.GeneticRequestId))
|
||||
{
|
||||
throw new InvalidOperationException("GeneticRequestId is required for genetic jobs");
|
||||
}
|
||||
|
||||
// Progress callback to update job progress
|
||||
Func<int, Task> progressCallback = async (percentage) =>
|
||||
{
|
||||
try
|
||||
{
|
||||
job.ProgressPercentage = percentage;
|
||||
job.LastHeartbeat = DateTime.UtcNow;
|
||||
await jobRepository.UpdateAsync(job);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex, "Error updating job progress for job {JobId}", job.Id);
|
||||
}
|
||||
};
|
||||
|
||||
// Execute the genetic algorithm
|
||||
var result = await geneticExecutor.ExecuteAsync(
|
||||
job.GeneticRequestId,
|
||||
progressCallback,
|
||||
cancellationToken);
|
||||
|
||||
// Update job with result
|
||||
job.Status = JobStatus.Completed;
|
||||
job.ProgressPercentage = 100;
|
||||
job.ResultJson = JsonSerializer.Serialize(new
|
||||
{
|
||||
BestFitness = result.BestFitness,
|
||||
BestIndividual = result.BestIndividual,
|
||||
ProgressInfo = result.ProgressInfo
|
||||
});
|
||||
job.CompletedAt = DateTime.UtcNow;
|
||||
job.LastHeartbeat = DateTime.UtcNow;
|
||||
|
||||
await jobRepository.UpdateAsync(job);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Completed genetic job {JobId}. Best Fitness: {BestFitness}",
|
||||
job.Id, result.BestFitness);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error processing genetic job {JobId}", job.Id);
|
||||
SentrySdk.CaptureException(ex);
|
||||
|
||||
await HandleJobFailure(job, ex, jobRepository, scope.ServiceProvider);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task StaleJobRecoveryLoop(CancellationToken cancellationToken)
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken); // Check every minute
|
||||
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
|
||||
|
||||
// Reset stale genetic jobs only
|
||||
var runningJobs = await jobRepository.GetRunningJobsByWorkerIdAsync(_options.WorkerId);
|
||||
var staleJobs = runningJobs
|
||||
.Where(j => j.JobType == JobType.Genetic &&
|
||||
(j.LastHeartbeat == null ||
|
||||
j.LastHeartbeat < DateTime.UtcNow.AddMinutes(-_options.StaleJobTimeoutMinutes)))
|
||||
.ToList();
|
||||
|
||||
foreach (var job in staleJobs)
|
||||
{
|
||||
// If it's stale but retryable, reset to pending with retry count
|
||||
if (job.RetryCount < job.MaxRetries)
|
||||
{
|
||||
job.Status = JobStatus.Pending;
|
||||
job.RetryCount++;
|
||||
var backoffMinutes = Math.Min(Math.Pow(2, job.RetryCount), _options.MaxRetryDelayMinutes);
|
||||
job.RetryAfter = DateTime.UtcNow.AddMinutes(backoffMinutes);
|
||||
job.ErrorMessage = $"Worker timeout - retry {job.RetryCount}/{job.MaxRetries}";
|
||||
job.FailureCategory = FailureCategory.SystemError;
|
||||
_logger.LogWarning(
|
||||
"Stale job {JobId} will be retried (attempt {RetryCount}/{MaxRetries}) after {RetryAfter}",
|
||||
job.Id, job.RetryCount, job.MaxRetries, job.RetryAfter);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Exceeded retries - mark as failed
|
||||
job.Status = JobStatus.Failed;
|
||||
job.ErrorMessage = "Worker timeout - exceeded max retries";
|
||||
job.FailureCategory = FailureCategory.SystemError;
|
||||
job.IsRetryable = false;
|
||||
job.CompletedAt = DateTime.UtcNow;
|
||||
|
||||
// Notify permanent failure
|
||||
await NotifyPermanentFailure(job, new TimeoutException("Worker timeout"), scope.ServiceProvider);
|
||||
}
|
||||
|
||||
job.AssignedWorkerId = null;
|
||||
job.LastHeartbeat = null;
|
||||
await jobRepository.UpdateAsync(job);
|
||||
}
|
||||
|
||||
if (staleJobs.Count > 0)
|
||||
{
|
||||
_logger.LogInformation("Processed {Count} stale genetic jobs", staleJobs.Count);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in stale job recovery loop");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HeartbeatLoop(CancellationToken cancellationToken)
|
||||
{
|
||||
while (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromSeconds(_options.HeartbeatIntervalSeconds), cancellationToken);
|
||||
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
|
||||
|
||||
// Update heartbeat for all genetic jobs assigned to this worker
|
||||
var runningJobs = await jobRepository.GetRunningJobsByWorkerIdAsync(_options.WorkerId);
|
||||
var geneticJobs = runningJobs.Where(j => j.JobType == JobType.Genetic);
|
||||
|
||||
foreach (var job in geneticJobs)
|
||||
{
|
||||
job.LastHeartbeat = DateTime.UtcNow;
|
||||
await jobRepository.UpdateAsync(job);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in heartbeat loop");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleJobFailure(
|
||||
Job job,
|
||||
Exception ex,
|
||||
IJobRepository jobRepository,
|
||||
IServiceProvider serviceProvider)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Categorize the failure
|
||||
var failureCategory = CategorizeFailure(ex);
|
||||
var isRetryable = IsRetryableFailure(ex, failureCategory);
|
||||
|
||||
// Check if we should retry
|
||||
if (isRetryable && job.RetryCount < job.MaxRetries)
|
||||
{
|
||||
// Calculate exponential backoff: 2^retryCount minutes, capped at MaxRetryDelayMinutes
|
||||
var backoffMinutes = Math.Min(Math.Pow(2, job.RetryCount), _options.MaxRetryDelayMinutes);
|
||||
job.RetryAfter = DateTime.UtcNow.AddMinutes(backoffMinutes);
|
||||
job.RetryCount++;
|
||||
job.Status = JobStatus.Pending; // Reset to pending for retry
|
||||
job.AssignedWorkerId = null; // Allow any worker to pick it up
|
||||
job.ErrorMessage = $"Retry {job.RetryCount}/{job.MaxRetries}: {ex.Message}";
|
||||
job.FailureCategory = failureCategory;
|
||||
job.IsRetryable = true;
|
||||
|
||||
_logger.LogWarning(
|
||||
"Job {JobId} will be retried (attempt {RetryCount}/{MaxRetries}) after {RetryAfter}. Error: {Error}",
|
||||
job.Id, job.RetryCount, job.MaxRetries, job.RetryAfter, ex.Message);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Permanent failure - mark as failed
|
||||
job.Status = JobStatus.Failed;
|
||||
job.ErrorMessage = ex.Message;
|
||||
job.FailureCategory = failureCategory;
|
||||
job.IsRetryable = false;
|
||||
job.CompletedAt = DateTime.UtcNow;
|
||||
|
||||
_logger.LogError(
|
||||
"Job {JobId} failed permanently after {RetryCount} retries. Error: {Error}",
|
||||
job.Id, job.RetryCount, ex.Message);
|
||||
|
||||
// Send notification for permanent failure
|
||||
await NotifyPermanentFailure(job, ex, serviceProvider);
|
||||
}
|
||||
|
||||
job.LastHeartbeat = DateTime.UtcNow;
|
||||
await jobRepository.UpdateAsync(job);
|
||||
}
|
||||
catch (Exception updateEx)
|
||||
{
|
||||
_logger.LogError(updateEx, "Failed to update job {JobId} status after failure", job.Id);
|
||||
}
|
||||
}
|
||||
|
||||
private FailureCategory CategorizeFailure(Exception ex)
|
||||
{
|
||||
return ex switch
|
||||
{
|
||||
TimeoutException => FailureCategory.Transient,
|
||||
TaskCanceledException => FailureCategory.Transient,
|
||||
HttpRequestException => FailureCategory.Transient,
|
||||
InvalidOperationException when ex.Message.Contains("candles") || ex.Message.Contains("data") => FailureCategory.DataError,
|
||||
InvalidOperationException when ex.Message.Contains("User") || ex.Message.Contains("not found") => FailureCategory.UserError,
|
||||
OutOfMemoryException => FailureCategory.SystemError,
|
||||
_ => FailureCategory.Unknown
|
||||
};
|
||||
}
|
||||
|
||||
private bool IsRetryableFailure(Exception ex, FailureCategory category)
|
||||
{
|
||||
// Don't retry user errors or data errors (missing candles, invalid config)
|
||||
if (category == FailureCategory.UserError || category == FailureCategory.DataError)
|
||||
return false;
|
||||
|
||||
// Retry transient and system errors
|
||||
return category == FailureCategory.Transient || category == FailureCategory.SystemError;
|
||||
}
|
||||
|
||||
private async Task NotifyPermanentFailure(
|
||||
Job job,
|
||||
Exception ex,
|
||||
IServiceProvider serviceProvider)
|
||||
{
|
||||
try
|
||||
{
|
||||
var webhookService = serviceProvider.GetRequiredService<IWebhookService>();
|
||||
const string alertsChannel = "2676086723";
|
||||
|
||||
var jobTypeName = job.JobType == JobType.Genetic ? "Genetic" : "Backtest";
|
||||
var message = $"🚨 **{jobTypeName} Job Failed Permanently**\n" +
|
||||
$"Job ID: `{job.Id}`\n" +
|
||||
$"User ID: {job.UserId}\n" +
|
||||
$"Retry Attempts: {job.RetryCount}/{job.MaxRetries}\n" +
|
||||
$"Failure Category: {job.FailureCategory}\n" +
|
||||
$"Error: {ex.Message}\n" +
|
||||
$"Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC";
|
||||
|
||||
await webhookService.SendMessage(message, alertsChannel);
|
||||
}
|
||||
catch (Exception notifyEx)
|
||||
{
|
||||
_logger.LogError(notifyEx, "Failed to send permanent failure notification for job {JobId}", job.Id);
|
||||
}
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
_semaphore?.Dispose();
|
||||
base.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Configuration options for GeneticComputeWorker
|
||||
/// </summary>
|
||||
public class GeneticComputeWorkerOptions
|
||||
{
|
||||
public const string SectionName = "GeneticComputeWorker";
|
||||
|
||||
/// <summary>
|
||||
/// Unique identifier for this worker instance
|
||||
/// </summary>
|
||||
public string WorkerId { get; set; } = Environment.MachineName + "-genetic";
|
||||
|
||||
/// <summary>
|
||||
/// Maximum number of concurrent genetic algorithm jobs to process
|
||||
/// </summary>
|
||||
public int MaxConcurrentGenetics { get; set; } = 2;
|
||||
|
||||
/// <summary>
|
||||
/// Interval in seconds between job polling attempts
|
||||
/// </summary>
|
||||
public int JobPollIntervalSeconds { get; set; } = 5;
|
||||
|
||||
/// <summary>
|
||||
/// Interval in seconds between heartbeat updates
|
||||
/// </summary>
|
||||
public int HeartbeatIntervalSeconds { get; set; } = 30;
|
||||
|
||||
/// <summary>
|
||||
/// Timeout in minutes for considering a job stale
|
||||
/// </summary>
|
||||
public int StaleJobTimeoutMinutes { get; set; } = 10;
|
||||
|
||||
/// <summary>
|
||||
/// Default maximum retry attempts for failed jobs
|
||||
/// </summary>
|
||||
public int DefaultMaxRetries { get; set; } = 3;
|
||||
|
||||
/// <summary>
|
||||
/// Maximum retry delay in minutes (cap for exponential backoff)
|
||||
/// </summary>
|
||||
public int MaxRetryDelayMinutes { get; set; } = 60;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user