Add benchmark for backtest on the test
This commit is contained in:
@@ -230,6 +230,9 @@ public class BacktestExecutor
|
||||
var fixedCandlesHashSet = new HashSet<Candle>(rollingWindowSize); // Reuse HashSet to avoid allocations
|
||||
var candlesProcessed = 0;
|
||||
|
||||
// Pre-allocate reusable collections to minimize allocations during processing
|
||||
var tempCandlesList = new List<Candle>(rollingWindowSize);
|
||||
|
||||
// Signal caching optimization - reduce signal update frequency for better performance
|
||||
var signalUpdateSkipCount = 0;
|
||||
|
||||
@@ -253,26 +256,39 @@ public class BacktestExecutor
|
||||
// Process all candles with optimized rolling window approach
|
||||
_logger.LogInformation("🎯 Starting to process {Count} candles in loop", orderedCandles.Count);
|
||||
Console.WriteLine("CONSOLE: About to start candle processing loop");
|
||||
|
||||
// Optimize: Pre-populate rolling window with initial candles to avoid repeated checks
|
||||
var initialWindowSize = Math.Min(rollingWindowSize, orderedCandles.Count);
|
||||
for (int i = 0; i < initialWindowSize; i++)
|
||||
{
|
||||
var candle = orderedCandles[i];
|
||||
rollingCandles.Add(candle);
|
||||
fixedCandlesHashSet.Add(candle);
|
||||
}
|
||||
|
||||
foreach (var candle in orderedCandles)
|
||||
{
|
||||
// Maintain rolling window efficiently using List
|
||||
rollingCandles.Add(candle);
|
||||
|
||||
if (rollingCandles.Count > rollingWindowSize)
|
||||
// Optimized rolling window maintenance - only modify when window is full
|
||||
if (rollingCandles.Count >= rollingWindowSize)
|
||||
{
|
||||
// Remove oldest candle from both structures
|
||||
// Remove oldest candle from both structures efficiently
|
||||
var removedCandle = rollingCandles[0];
|
||||
rollingCandles.RemoveAt(0);
|
||||
fixedCandlesHashSet.Remove(removedCandle);
|
||||
}
|
||||
|
||||
// Add to HashSet for reuse
|
||||
fixedCandlesHashSet.Add(candle);
|
||||
// Add new candle to rolling window (skip if already in initial population)
|
||||
if (!fixedCandlesHashSet.Contains(candle))
|
||||
{
|
||||
rollingCandles.Add(candle);
|
||||
fixedCandlesHashSet.Add(candle);
|
||||
}
|
||||
|
||||
tradingBot.LastCandle = candle;
|
||||
|
||||
// Smart signal caching - reduce signal update frequency for performance
|
||||
// RSI and similar indicators don't need updates every candle for 15-minute data
|
||||
var shouldSkipSignalUpdate = ShouldSkipSignalUpdate(currentCandle, totalCandles);
|
||||
var shouldSkipSignalUpdate = ShouldSkipSignalUpdate(currentCandle, totalCandles, config);
|
||||
if (currentCandle <= 5) // Debug first few candles
|
||||
{
|
||||
_logger.LogInformation("🔍 Candle {CurrentCandle}: shouldSkip={ShouldSkip}, totalCandles={Total}",
|
||||
@@ -533,24 +549,70 @@ public class BacktestExecutor
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Advanced signal caching based on indicator update frequency
|
||||
/// Instead of hashing candles, we cache signals based on how often indicators need updates
|
||||
/// Advanced signal caching based on indicator update frequency and timeframe
|
||||
/// Dynamically adjusts update frequency based on timeframe and indicator characteristics
|
||||
/// </summary>
|
||||
private bool ShouldSkipSignalUpdate(int currentCandleIndex, int totalCandles)
|
||||
private bool ShouldSkipSignalUpdate(int currentCandleIndex, int totalCandles, TradingBotConfig config)
|
||||
{
|
||||
// RSI and similar indicators don't need to be recalculated every candle
|
||||
// For 15-minute candles, we can update signals every 3-5 candles without significant accuracy loss
|
||||
const int signalUpdateFrequency = 3; // Update signals every N candles
|
||||
|
||||
// Always update signals for the first few candles to establish baseline
|
||||
if (currentCandleIndex < 10)
|
||||
if (currentCandleIndex < 20)
|
||||
return false;
|
||||
|
||||
// Always update signals near the end to ensure final trades are calculated
|
||||
if (currentCandleIndex > totalCandles - 10)
|
||||
if (currentCandleIndex > totalCandles - 20)
|
||||
return false;
|
||||
|
||||
// Skip signal updates based on frequency
|
||||
// Adaptive update frequency based on timeframe
|
||||
// Shorter timeframes can skip more updates as they're more volatile
|
||||
int signalUpdateFrequency;
|
||||
switch (config.Timeframe)
|
||||
{
|
||||
case Timeframe.OneMinute:
|
||||
case Timeframe.FiveMinutes:
|
||||
signalUpdateFrequency = 10; // Update every 10 candles for fast timeframes
|
||||
break;
|
||||
case Timeframe.FifteenMinutes:
|
||||
case Timeframe.ThirtyMinutes:
|
||||
signalUpdateFrequency = 5; // Update every 5 candles for medium timeframes
|
||||
break;
|
||||
case Timeframe.OneHour:
|
||||
case Timeframe.FourHour:
|
||||
signalUpdateFrequency = 3; // Update every 3 candles for slower timeframes
|
||||
break;
|
||||
case Timeframe.OneDay:
|
||||
signalUpdateFrequency = 1; // Update every candle for daily (already slow)
|
||||
break;
|
||||
default:
|
||||
signalUpdateFrequency = 5; // Default fallback
|
||||
break;
|
||||
}
|
||||
|
||||
// Further optimize based on indicator types in the scenario
|
||||
if (config.Scenario?.Indicators != null)
|
||||
{
|
||||
var hasFastIndicators = config.Scenario.Indicators.Any(ind =>
|
||||
ind.Type == IndicatorType.RsiDivergence ||
|
||||
ind.Type == IndicatorType.StochRsiTrend ||
|
||||
ind.Type == IndicatorType.MacdCross);
|
||||
|
||||
var hasSlowIndicators = config.Scenario.Indicators.Any(ind =>
|
||||
ind.Type == IndicatorType.EmaCross ||
|
||||
ind.Type == IndicatorType.EmaTrend ||
|
||||
ind.Type == IndicatorType.SuperTrend);
|
||||
|
||||
// If we have mostly slow indicators, we can update less frequently
|
||||
if (!hasFastIndicators && hasSlowIndicators)
|
||||
{
|
||||
signalUpdateFrequency = Math.Max(signalUpdateFrequency, 8);
|
||||
}
|
||||
// If we have fast indicators, we need more frequent updates
|
||||
else if (hasFastIndicators && !hasSlowIndicators)
|
||||
{
|
||||
signalUpdateFrequency = Math.Min(signalUpdateFrequency, 3);
|
||||
}
|
||||
}
|
||||
|
||||
// Skip signal updates based on calculated frequency
|
||||
return (currentCandleIndex % signalUpdateFrequency) != 0;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using Managing.Application.Abstractions.Repositories;
|
||||
using System.Collections.Concurrent;
|
||||
using Managing.Application.Abstractions.Repositories;
|
||||
using Managing.Application.Abstractions.Services;
|
||||
using Managing.Domain.Accounts;
|
||||
using Managing.Domain.Bots;
|
||||
@@ -431,6 +432,7 @@ public class TradingService : ITradingService
|
||||
|
||||
/// <summary>
|
||||
/// Calculates indicators values for a given scenario and candles.
|
||||
/// Uses parallel processing for independent indicator calculations to improve performance.
|
||||
/// </summary>
|
||||
/// <param name="scenario">The scenario containing indicators.</param>
|
||||
/// <param name="candles">The candles to calculate indicators for.</param>
|
||||
@@ -439,7 +441,7 @@ public class TradingService : ITradingService
|
||||
Scenario scenario,
|
||||
HashSet<Candle> candles)
|
||||
{
|
||||
// Offload CPU-bound indicator calculations to thread pool
|
||||
// Offload CPU-bound indicator calculations to thread pool with parallel processing
|
||||
return await Task.Run(() =>
|
||||
{
|
||||
var indicatorsValues = new Dictionary<IndicatorType, IndicatorsResultBase>();
|
||||
@@ -449,19 +451,39 @@ public class TradingService : ITradingService
|
||||
return indicatorsValues;
|
||||
}
|
||||
|
||||
// Build indicators from scenario
|
||||
foreach (var indicator in scenario.Indicators)
|
||||
// Use parallel processing for independent indicator calculations
|
||||
// Configure parallelism based on indicator count and system capabilities
|
||||
var maxDegreeOfParallelism = Math.Min(scenario.Indicators.Count, Environment.ProcessorCount);
|
||||
|
||||
var options = new ParallelOptions
|
||||
{
|
||||
MaxDegreeOfParallelism = maxDegreeOfParallelism,
|
||||
CancellationToken = CancellationToken.None
|
||||
};
|
||||
|
||||
// Use thread-safe concurrent dictionary for parallel writes
|
||||
var concurrentResults = new ConcurrentDictionary<IndicatorType, IndicatorsResultBase>();
|
||||
|
||||
// Parallel calculation of indicators
|
||||
Parallel.ForEach(scenario.Indicators, options, indicator =>
|
||||
{
|
||||
try
|
||||
{
|
||||
var buildedIndicator = ScenarioHelpers.BuildIndicator(ScenarioHelpers.BaseToLight(indicator));
|
||||
indicatorsValues[indicator.Type] = buildedIndicator.GetIndicatorValues(candles);
|
||||
var result = buildedIndicator.GetIndicatorValues(candles);
|
||||
concurrentResults[indicator.Type] = result;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error calculating indicator {IndicatorName}: {ErrorMessage}",
|
||||
indicator.Name, ex.Message);
|
||||
}
|
||||
});
|
||||
|
||||
// Convert to regular dictionary for return
|
||||
foreach (var kvp in concurrentResults)
|
||||
{
|
||||
indicatorsValues[kvp.Key] = kvp.Value;
|
||||
}
|
||||
|
||||
return indicatorsValues;
|
||||
|
||||
@@ -21,7 +21,7 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
private readonly ILogger<BundleBacktestHealthCheckWorker> _logger;
|
||||
private readonly TimeSpan _checkInterval = TimeSpan.FromMinutes(30);
|
||||
private readonly TimeSpan _inactiveThreshold = TimeSpan.FromMinutes(30); // Check bundles inactive for 30+ minutes
|
||||
private readonly TimeSpan _inactiveThreshold = TimeSpan.FromMinutes(2); // Check bundles inactive for 2+ minutes
|
||||
private readonly TimeSpan _stuckThreshold = TimeSpan.FromHours(2); // Consider bundle stuck if no progress for 2 hours
|
||||
private readonly IMessengerService _messengerService;
|
||||
|
||||
@@ -90,8 +90,8 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
|
||||
.ToList();
|
||||
|
||||
_logger.LogInformation(
|
||||
"Found {TotalCount} bundles (from {PendingTotal} pending and {RunningTotal} running) that haven't been updated in >30 minutes",
|
||||
allBundlesToCheck.Count, pendingBundles.Count(), runningBundles.Count());
|
||||
"Found {TotalCount} bundles (from {PendingTotal} pending and {RunningTotal} running) that haven't been updated in >{InactiveMinutes} minutes",
|
||||
allBundlesToCheck.Count, pendingBundles.Count(), runningBundles.Count(), _inactiveThreshold.TotalMinutes);
|
||||
|
||||
var stuckBundlesCount = 0;
|
||||
var missingJobsCount = 0;
|
||||
@@ -163,13 +163,13 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
|
||||
if (bundle.Status == BundleBacktestRequestStatus.Pending)
|
||||
{
|
||||
var timeSinceCreation = DateTime.UtcNow - bundle.CreatedAt;
|
||||
|
||||
|
||||
// If bundle has been pending for more than the stuck threshold, check job statuses
|
||||
if (timeSinceCreation > _stuckThreshold)
|
||||
{
|
||||
var allJobsPending = jobs.All(j => j.Status == JobStatus.Pending);
|
||||
var hasFailedJobs = jobs.Any(j => j.Status == JobStatus.Failed);
|
||||
|
||||
|
||||
if (allJobsPending || hasFailedJobs)
|
||||
{
|
||||
await HandleStalePendingBundleAsync(bundle, timeSinceCreation, jobs, backtestRepository, jobRepository);
|
||||
@@ -178,6 +178,18 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
|
||||
}
|
||||
}
|
||||
|
||||
// Check 4: Bundle with all jobs completed but bundle status not updated
|
||||
var completedJobs = jobs.Count(j => j.Status == JobStatus.Completed);
|
||||
var failedJobs = jobs.Count(j => j.Status == JobStatus.Failed);
|
||||
var totalProcessedJobs = completedJobs + failedJobs;
|
||||
|
||||
if (totalProcessedJobs == bundle.TotalBacktests &&
|
||||
(bundle.Status == BundleBacktestRequestStatus.Running || bundle.Status == BundleBacktestRequestStatus.Pending))
|
||||
{
|
||||
await HandleCompletedBundleAsync(bundle, completedJobs, failedJobs, backtestRepository);
|
||||
return (StuckCount: 0, MissingJobsCount: 0, HealthyCount: 1);
|
||||
}
|
||||
|
||||
return (StuckCount: 0, MissingJobsCount: 0, HealthyCount: 1);
|
||||
}
|
||||
|
||||
@@ -471,6 +483,39 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
|
||||
bundle.RequestId, bundle.Status);
|
||||
}
|
||||
|
||||
private async Task HandleCompletedBundleAsync(
|
||||
BundleBacktestRequest bundle,
|
||||
int completedJobs,
|
||||
int failedJobs,
|
||||
IBacktestRepository backtestRepository)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"✅ Bundle {BundleRequestId} has all jobs finished ({Completed} completed, {Failed} failed) but bundle status was {OldStatus}. Updating to Completed.",
|
||||
bundle.RequestId, completedJobs, failedJobs, bundle.Status);
|
||||
|
||||
// Update bundle status to Completed (or keep as Completed if it was already)
|
||||
bundle.Status = failedJobs == 0 ? BundleBacktestRequestStatus.Completed : BundleBacktestRequestStatus.Completed;
|
||||
bundle.CompletedBacktests = completedJobs;
|
||||
bundle.FailedBacktests = failedJobs;
|
||||
bundle.CompletedAt = DateTime.UtcNow;
|
||||
bundle.UpdatedAt = DateTime.UtcNow;
|
||||
|
||||
if (failedJobs > 0)
|
||||
{
|
||||
bundle.ErrorMessage = $"{failedJobs} backtests failed";
|
||||
}
|
||||
else
|
||||
{
|
||||
bundle.ErrorMessage = null; // Clear any previous error message
|
||||
}
|
||||
|
||||
await backtestRepository.UpdateBundleBacktestRequestAsync(bundle);
|
||||
|
||||
_logger.LogInformation(
|
||||
"Successfully updated bundle {BundleRequestId} status to {Status} with {Completed}/{Total} backtests completed",
|
||||
bundle.RequestId, bundle.Status, bundle.CompletedBacktests, bundle.TotalBacktests);
|
||||
}
|
||||
|
||||
private async Task HandleStalePendingBundleAsync(
|
||||
BundleBacktestRequest bundle,
|
||||
TimeSpan timeSinceCreation,
|
||||
@@ -484,12 +529,12 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
|
||||
bundle.RequestId, timeSinceCreation.TotalHours, jobs.Count, jobs.All(j => j.Status == JobStatus.Pending));
|
||||
|
||||
var hasFailedJobs = jobs.Any(j => j.Status == JobStatus.Failed);
|
||||
|
||||
|
||||
if (hasFailedJobs)
|
||||
{
|
||||
// If all jobs failed, mark bundle as failed
|
||||
var failedJobCount = jobs.Count(j => j.Status == JobStatus.Failed);
|
||||
|
||||
|
||||
if (failedJobCount == bundle.TotalBacktests)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
@@ -511,7 +556,7 @@ public class BundleBacktestHealthCheckWorker : BackgroundService
|
||||
// All jobs are pending - just update the timestamp to avoid repeated warnings
|
||||
bundle.UpdatedAt = DateTime.UtcNow;
|
||||
await backtestRepository.UpdateBundleBacktestRequestAsync(bundle);
|
||||
|
||||
|
||||
_logger.LogInformation(
|
||||
"Bundle {BundleRequestId} has {JobCount} pending jobs waiting to be processed. Updated timestamp.",
|
||||
bundle.RequestId, jobs.Count);
|
||||
|
||||
Reference in New Issue
Block a user