Add the bundle healthcheck worker

This commit is contained in:
2025-11-11 05:31:06 +07:00
parent 8a27155418
commit 0a676d1fb7

View File

@@ -0,0 +1,543 @@
using System.Text.Json;
using Managing.Application.Abstractions.Repositories;
using Managing.Application.Abstractions.Services;
using Managing.Application.Backtests;
using Managing.Domain.Backtests;
using Managing.Domain.Bots;
using Managing.Domain.MoneyManagements;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Application.Workers;
/// <summary>
/// Background worker that monitors BundleBacktestRequest health.
/// Runs every 30 minutes to detect and handle stuck bundles that are missing jobs or have stalled progress.
/// </summary>
public class BundleBacktestHealthCheckWorker : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<BundleBacktestHealthCheckWorker> _logger;
private readonly TimeSpan _checkInterval = TimeSpan.FromMinutes(30);
private readonly TimeSpan _inactiveThreshold = TimeSpan.FromMinutes(30); // Check bundles inactive for 30+ minutes
private readonly TimeSpan _stuckThreshold = TimeSpan.FromHours(2); // Consider bundle stuck if no progress for 2 hours
private readonly IMessengerService _messengerService;
public BundleBacktestHealthCheckWorker(
IServiceScopeFactory scopeFactory,
ILogger<BundleBacktestHealthCheckWorker> logger,
IMessengerService messengerService)
{
_scopeFactory = scopeFactory;
_logger = logger;
_messengerService = messengerService;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
"BundleBacktestHealthCheckWorker starting. Check interval: {CheckInterval} minutes, Inactive threshold: {InactiveThreshold} minutes, Stuck threshold: {StuckThreshold} hours",
_checkInterval.TotalMinutes, _inactiveThreshold.TotalMinutes, _stuckThreshold.TotalHours);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await CheckBundleHealthAsync(stoppingToken);
}
catch (OperationCanceledException)
{
_logger.LogInformation("BundleBacktestHealthCheckWorker stopping due to cancellation");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in BundleBacktestHealthCheckWorker health check cycle");
}
try
{
await Task.Delay(_checkInterval, stoppingToken);
}
catch (OperationCanceledException)
{
_logger.LogInformation("BundleBacktestHealthCheckWorker stopping during delay");
break;
}
}
_logger.LogInformation("BundleBacktestHealthCheckWorker stopped");
}
private async Task CheckBundleHealthAsync(CancellationToken stoppingToken)
{
using var scope = _scopeFactory.CreateScope();
var backtestRepository = scope.ServiceProvider.GetRequiredService<IBacktestRepository>();
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
_logger.LogInformation("Starting bundle health check...");
// Check bundles in Pending and Running status
var pendingBundles = await backtestRepository.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Pending);
var runningBundles = await backtestRepository.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Running);
// Only check bundles that haven't been updated in more than the inactive threshold
var inactiveThresholdTime = DateTime.UtcNow.Add(-_inactiveThreshold);
var allBundlesToCheck = pendingBundles.Concat(runningBundles)
.Where(b => b.UpdatedAt < inactiveThresholdTime)
.ToList();
_logger.LogInformation(
"Found {TotalCount} bundles (from {PendingTotal} pending and {RunningTotal} running) that haven't been updated in >30 minutes",
allBundlesToCheck.Count, pendingBundles.Count(), runningBundles.Count());
var stuckBundlesCount = 0;
var missingJobsCount = 0;
var healthyBundlesCount = 0;
foreach (var bundle in allBundlesToCheck)
{
if (stoppingToken.IsCancellationRequested)
break;
try
{
var (stuckCount, missingJobs, healthyCount) = await CheckSingleBundleHealthAsync(
bundle, backtestRepository, jobRepository);
stuckBundlesCount += stuckCount;
missingJobsCount += missingJobs;
healthyBundlesCount += healthyCount;
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error checking health for bundle {BundleRequestId}",
bundle.RequestId);
}
}
_logger.LogInformation(
"Bundle health check completed. Healthy: {HealthyCount}, Stuck: {StuckCount}, Missing Jobs: {MissingJobsCount}",
healthyBundlesCount, stuckBundlesCount, missingJobsCount);
}
private async Task<(int StuckCount, int MissingJobsCount, int HealthyCount)> CheckSingleBundleHealthAsync(
BundleBacktestRequest bundle,
IBacktestRepository backtestRepository,
IJobRepository jobRepository)
{
// Get all jobs associated with this bundle
var jobs = (await jobRepository.GetByBundleRequestIdAsync(bundle.RequestId)).ToList();
// Calculate expected number of jobs based on TotalBacktests
var expectedJobCount = bundle.TotalBacktests;
var actualJobCount = jobs.Count;
_logger.LogDebug(
"Bundle {BundleRequestId} ({Status}): Expected {Expected} jobs, Found {Actual} jobs, Completed {Completed}/{Total}",
bundle.RequestId, bundle.Status, expectedJobCount, actualJobCount, bundle.CompletedBacktests, bundle.TotalBacktests);
// Check 1: Missing jobs - bundle has no jobs or fewer jobs than expected
if (actualJobCount == 0 || actualJobCount < expectedJobCount)
{
await HandleBundleWithMissingJobsAsync(bundle, actualJobCount, expectedJobCount, backtestRepository);
return (StuckCount: 0, MissingJobsCount: 1, HealthyCount: 0);
}
// Check 2: Stuck bundle - Running for too long without progress
if (bundle.Status == BundleBacktestRequestStatus.Running)
{
var timeSinceUpdate = DateTime.UtcNow - bundle.UpdatedAt;
if (timeSinceUpdate > _stuckThreshold)
{
await HandleStuckBundleAsync(bundle, timeSinceUpdate, jobs, backtestRepository, jobRepository);
return (StuckCount: 1, MissingJobsCount: 0, HealthyCount: 0);
}
}
// Check 3: Pending bundle that's been pending too long (jobs created but never started)
if (bundle.Status == BundleBacktestRequestStatus.Pending)
{
var timeSinceCreation = DateTime.UtcNow - bundle.CreatedAt;
// If bundle has been pending for more than the stuck threshold, check job statuses
if (timeSinceCreation > _stuckThreshold)
{
var allJobsPending = jobs.All(j => j.Status == JobStatus.Pending);
var hasFailedJobs = jobs.Any(j => j.Status == JobStatus.Failed);
if (allJobsPending || hasFailedJobs)
{
await HandleStalePendingBundleAsync(bundle, timeSinceCreation, jobs, backtestRepository, jobRepository);
return (StuckCount: 1, MissingJobsCount: 0, HealthyCount: 0);
}
}
}
return (StuckCount: 0, MissingJobsCount: 0, HealthyCount: 1);
}
private async Task HandleBundleWithMissingJobsAsync(
BundleBacktestRequest bundle,
int actualJobCount,
int expectedJobCount,
IBacktestRepository backtestRepository)
{
_logger.LogWarning(
"⚠️ Bundle {BundleRequestId} ({Status}) is missing jobs. Expected: {Expected}, Actual: {Actual}. " +
"Attempting to recreate missing jobs.",
bundle.RequestId, bundle.Status, expectedJobCount, actualJobCount);
try
{
using var scope = _scopeFactory.CreateScope();
var jobService = scope.ServiceProvider.GetRequiredService<JobService>();
var jobRepository = scope.ServiceProvider.GetRequiredService<IJobRepository>();
// Generate all backtest requests from bundle variants
var allBacktestRequests = await GenerateBacktestRequestsFromVariants(bundle);
if (allBacktestRequests == null || !allBacktestRequests.Any())
{
_logger.LogError(
"Failed to generate backtest requests from variants for bundle {BundleRequestId}",
bundle.RequestId);
bundle.ErrorMessage = $"Failed to regenerate jobs: Could not generate backtest requests from variants.";
bundle.UpdatedAt = DateTime.UtcNow;
await backtestRepository.UpdateBundleBacktestRequestAsync(bundle);
return;
}
// Get existing jobs for this bundle
var existingJobs = (await jobRepository.GetByBundleRequestIdAsync(bundle.RequestId)).ToList();
// Create a set of existing job config signatures for quick lookup
var existingJobSignatures = existingJobs
.Select(j =>
{
var config = JsonSerializer.Deserialize<TradingBotConfig>(j.ConfigJson);
return config != null
? $"{config.Ticker}_{config.Timeframe}_{config.Name}_{j.StartDate:yyyyMMdd}_{j.EndDate:yyyyMMdd}"
: null;
})
.Where(s => s != null)
.ToHashSet();
// Filter out backtest requests that already have jobs
var missingRequests = allBacktestRequests
.Where(req =>
{
var signature = $"{req.Config.Ticker}_{req.Config.Timeframe}_{req.Config.Name}_{req.StartDate:yyyyMMdd}_{req.EndDate:yyyyMMdd}";
return !existingJobSignatures.Contains(signature);
})
.ToList();
if (missingRequests.Any())
{
_logger.LogInformation(
"Creating {MissingCount} missing jobs for bundle {BundleRequestId}",
missingRequests.Count, bundle.RequestId);
// Create jobs for missing requests
var createdJobs = await jobService.CreateBundleJobsAsync(bundle, missingRequests);
_logger.LogInformation(
"Successfully created {CreatedCount} missing jobs for bundle {BundleRequestId}",
createdJobs.Count, bundle.RequestId);
// Update bundle status back to Pending if it was in a failed state, or keep Running if it was running
if (bundle.Status == BundleBacktestRequestStatus.Failed)
{
bundle.Status = BundleBacktestRequestStatus.Pending;
bundle.ErrorMessage = null;
bundle.CompletedAt = null;
}
bundle.UpdatedAt = DateTime.UtcNow;
await backtestRepository.UpdateBundleBacktestRequestAsync(bundle);
_logger.LogInformation(
"Recreated {CreatedCount} missing jobs for bundle {BundleRequestId}. Bundle status: {Status}",
createdJobs.Count, bundle.RequestId, bundle.Status);
}
else
{
_logger.LogWarning(
"No missing jobs found to recreate for bundle {BundleRequestId}. All {ExpectedCount} jobs already exist.",
bundle.RequestId, expectedJobCount);
bundle.UpdatedAt = DateTime.UtcNow;
await backtestRepository.UpdateBundleBacktestRequestAsync(bundle);
}
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error recreating missing jobs for bundle {BundleRequestId}",
bundle.RequestId);
bundle.ErrorMessage = $"Error recreating jobs: {ex.Message}";
bundle.UpdatedAt = DateTime.UtcNow;
await backtestRepository.UpdateBundleBacktestRequestAsync(bundle);
}
}
private async Task<List<RunBacktestRequest>> GenerateBacktestRequestsFromVariants(
BundleBacktestRequest bundleRequest)
{
// Generate backtest requests from bundle variants (duplicated logic from Backtester since it's private)
try
{
// Deserialize the variant configurations
var universalConfig =
JsonSerializer.Deserialize<BundleBacktestUniversalConfig>(bundleRequest.UniversalConfigJson);
var dateTimeRanges = JsonSerializer.Deserialize<List<DateTimeRange>>(bundleRequest.DateTimeRangesJson);
var moneyManagementVariants =
JsonSerializer.Deserialize<List<MoneyManagementVariant>>(bundleRequest.MoneyManagementVariantsJson);
var tickerVariants = JsonSerializer.Deserialize<List<Ticker>>(bundleRequest.TickerVariantsJson);
if (universalConfig == null || dateTimeRanges == null || moneyManagementVariants == null ||
tickerVariants == null)
{
_logger.LogError("Failed to deserialize variant configurations for bundle request {RequestId}",
bundleRequest.RequestId);
return new List<RunBacktestRequest>();
}
// Get the first account for the user
using var scope = _scopeFactory.CreateScope();
var accountService = scope.ServiceProvider.GetRequiredService<IAccountService>();
var accounts = await accountService.GetAccountsByUserAsync(bundleRequest.User, hideSecrets: true, getBalance: false);
var firstAccount = accounts.FirstOrDefault();
if (firstAccount == null)
{
_logger.LogError("No accounts found for user {UserId} in bundle request {RequestId}",
bundleRequest.User.Id, bundleRequest.RequestId);
return new List<RunBacktestRequest>();
}
var backtestRequests = new List<RunBacktestRequest>();
foreach (var dateRange in dateTimeRanges)
{
foreach (var mmVariant in moneyManagementVariants)
{
foreach (var ticker in tickerVariants)
{
var config = new TradingBotConfigRequest
{
AccountName = firstAccount.Name,
Ticker = ticker,
Timeframe = universalConfig.Timeframe,
IsForWatchingOnly = universalConfig.IsForWatchingOnly,
BotTradingBalance = universalConfig.BotTradingBalance,
Name = $"{bundleRequest.Name} #{backtestRequests.Count + 1}",
FlipPosition = universalConfig.FlipPosition,
CooldownPeriod = universalConfig.CooldownPeriod,
MaxLossStreak = universalConfig.MaxLossStreak,
Scenario = universalConfig.Scenario,
ScenarioName = universalConfig.ScenarioName,
MoneyManagement = mmVariant.MoneyManagement,
MaxPositionTimeHours = universalConfig.MaxPositionTimeHours,
CloseEarlyWhenProfitable = universalConfig.CloseEarlyWhenProfitable,
FlipOnlyWhenInProfit = universalConfig.FlipOnlyWhenInProfit,
UseSynthApi = universalConfig.UseSynthApi,
UseForPositionSizing = universalConfig.UseForPositionSizing,
UseForSignalFiltering = universalConfig.UseForSignalFiltering,
UseForDynamicStopLoss = universalConfig.UseForDynamicStopLoss
};
var backtestRequest = new RunBacktestRequest
{
Config = config,
StartDate = dateRange.StartDate,
EndDate = dateRange.EndDate,
Balance = universalConfig.BotTradingBalance,
WatchOnly = universalConfig.WatchOnly,
Save = universalConfig.Save,
WithCandles = false,
MoneyManagement = mmVariant.MoneyManagement != null
? new MoneyManagement
{
Name = mmVariant.MoneyManagement.Name,
Timeframe = mmVariant.MoneyManagement.Timeframe,
StopLoss = mmVariant.MoneyManagement.StopLoss,
TakeProfit = mmVariant.MoneyManagement.TakeProfit,
Leverage = mmVariant.MoneyManagement.Leverage
}
: null
};
backtestRequests.Add(backtestRequest);
}
}
}
return backtestRequests;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error generating backtest requests from variants for bundle request {RequestId}",
bundleRequest.RequestId);
return new List<RunBacktestRequest>();
}
}
private async Task HandleStuckBundleAsync(
BundleBacktestRequest bundle,
TimeSpan timeSinceUpdate,
List<Job> jobs,
IBacktestRepository backtestRepository,
IJobRepository jobRepository)
{
_logger.LogWarning(
"⚠️ Bundle {BundleRequestId} is stuck in Running status. No progress for {TimeSinceUpdate} hours. " +
"Completed: {Completed}/{Total}",
bundle.RequestId, timeSinceUpdate.TotalHours, bundle.CompletedBacktests, bundle.TotalBacktests);
// Check job statuses to understand why bundle is stuck
var jobStatusSummary = jobs
.GroupBy(j => j.Status)
.Select(g => $"{g.Key}: {g.Count()}")
.ToList();
_logger.LogInformation(
"Job status summary for stuck bundle {BundleRequestId}: {JobStatusSummary}",
bundle.RequestId, string.Join(", ", jobStatusSummary));
// Check if all jobs are actually completed but bundle wasn't updated
var completedJobs = jobs.Count(j => j.Status == JobStatus.Completed);
var failedJobs = jobs.Count(j => j.Status == JobStatus.Failed);
var totalProcessedJobs = completedJobs + failedJobs;
if (totalProcessedJobs == bundle.TotalBacktests)
{
// All jobs are done, update bundle status accordingly
_logger.LogInformation(
"Bundle {BundleRequestId} has all jobs completed ({Completed} completed, {Failed} failed). Updating bundle status.",
bundle.RequestId, completedJobs, failedJobs);
bundle.Status = failedJobs == 0 ? BundleBacktestRequestStatus.Completed : BundleBacktestRequestStatus.Completed;
bundle.CompletedBacktests = completedJobs;
bundle.FailedBacktests = failedJobs;
bundle.CompletedAt = DateTime.UtcNow;
bundle.UpdatedAt = DateTime.UtcNow;
if (failedJobs > 0)
{
bundle.ErrorMessage = $"{failedJobs} backtests failed";
}
}
else
{
// Some jobs are still pending or running - bundle is genuinely stuck
// Reset any stale running jobs back to pending
var runningJobs = jobs.Where(j => j.Status == JobStatus.Running).ToList();
foreach (var job in runningJobs)
{
var timeSinceJobHeartbeat = job.LastHeartbeat.HasValue
? DateTime.UtcNow - job.LastHeartbeat.Value
: DateTime.UtcNow - job.CreatedAt;
if (timeSinceJobHeartbeat > TimeSpan.FromMinutes(30))
{
_logger.LogInformation(
"Resetting stale job {JobId} for bundle {BundleRequestId} back to Pending",
job.Id, bundle.RequestId);
job.Status = JobStatus.Pending;
job.AssignedWorkerId = null;
job.LastHeartbeat = null;
await jobRepository.UpdateAsync(job);
}
}
// Update bundle timestamp to give it another chance
bundle.UpdatedAt = DateTime.UtcNow;
bundle.ErrorMessage = $"Bundle was stuck. Reset {runningJobs.Count(j => j.Status == JobStatus.Pending)} stale jobs to pending.";
}
await backtestRepository.UpdateBundleBacktestRequestAsync(bundle);
_logger.LogInformation(
"Handled stuck bundle {BundleRequestId}. New status: {Status}",
bundle.RequestId, bundle.Status);
}
private async Task HandleStalePendingBundleAsync(
BundleBacktestRequest bundle,
TimeSpan timeSinceCreation,
List<Job> jobs,
IBacktestRepository backtestRepository,
IJobRepository jobRepository)
{
_logger.LogWarning(
"⚠️ Bundle {BundleRequestId} has been in Pending status for {TimeSinceCreation} hours. " +
"Jobs: {JobCount}, All Pending: {AllPending}",
bundle.RequestId, timeSinceCreation.TotalHours, jobs.Count, jobs.All(j => j.Status == JobStatus.Pending));
var hasFailedJobs = jobs.Any(j => j.Status == JobStatus.Failed);
if (hasFailedJobs)
{
// If all jobs failed, mark bundle as failed
var failedJobCount = jobs.Count(j => j.Status == JobStatus.Failed);
if (failedJobCount == bundle.TotalBacktests)
{
_logger.LogInformation(
"All {FailedCount} jobs for bundle {BundleRequestId} have failed. Marking bundle as Failed.",
failedJobCount, bundle.RequestId);
bundle.Status = BundleBacktestRequestStatus.Failed;
bundle.FailedBacktests = failedJobCount;
bundle.ErrorMessage = $"All {failedJobCount} backtest jobs failed";
bundle.CompletedAt = DateTime.UtcNow;
bundle.UpdatedAt = DateTime.UtcNow;
await backtestRepository.UpdateBundleBacktestRequestAsync(bundle);
await NotifyUserOfFailedBundle(bundle, "All jobs failed");
}
}
else
{
// All jobs are pending - just update the timestamp to avoid repeated warnings
bundle.UpdatedAt = DateTime.UtcNow;
await backtestRepository.UpdateBundleBacktestRequestAsync(bundle);
_logger.LogInformation(
"Bundle {BundleRequestId} has {JobCount} pending jobs waiting to be processed. Updated timestamp.",
bundle.RequestId, jobs.Count);
}
}
private async Task NotifyUserOfFailedBundle(BundleBacktestRequest bundle, string reason)
{
try
{
if (bundle.User?.TelegramChannel != null)
{
var message = $"❌ Bundle backtest '{bundle.Name}' (ID: {bundle.RequestId}) failed: {reason}";
await _messengerService.SendMessage(message, bundle.User.TelegramChannel);
_logger.LogInformation(
"Sent failure notification to user {UserId} for bundle {BundleRequestId}",
bundle.User.Id, bundle.RequestId);
}
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to send notification to user for bundle {BundleRequestId}",
bundle.RequestId);
}
}
}