Optimze worker for backtest

This commit is contained in:
2025-11-11 03:59:41 +07:00
parent 5a4cb670a5
commit 1d70355617
10 changed files with 138465 additions and 40 deletions

View File

@@ -89,7 +89,7 @@ public class BacktestTests : BaseTests
// Arrange
var ticker = Ticker.ETH;
var timeframe = Timeframe.FifteenMinutes;
var daysBack = -30; // Fetch last 30 days of data
var daysBack = -120; // Fetch last 30 days of data
var startDate = DateTime.UtcNow.AddDays(daysBack);
var endDate = DateTime.UtcNow;

View File

@@ -33,6 +33,7 @@ public class SignalCache
{
return true;
}
signal = null;
return false;
}
@@ -151,13 +152,15 @@ public class BacktestExecutor
var initialMemory = GC.GetTotalMemory(false);
telemetry.MemoryUsageAtStart = initialMemory;
_logger.LogInformation("🚀 Backtest execution started - RequestId: {RequestId}, Candles: {CandleCount}, Memory: {MemoryMB:F2}MB",
_logger.LogInformation(
"🚀 Backtest execution started - RequestId: {RequestId}, Candles: {CandleCount}, Memory: {MemoryMB:F2}MB",
requestId ?? "N/A", candles.Count, initialMemory / 1024.0 / 1024.0);
// Ensure user has accounts loaded
if (user.Accounts == null || !user.Accounts.Any())
{
user.Accounts = (await _accountService.GetAccountsByUserAsync(user, hideSecrets: true, getBalance: false)).ToList();
user.Accounts = (await _accountService.GetAccountsByUserAsync(user, hideSecrets: true, getBalance: false))
.ToList();
}
// Create a fresh TradingBotBase instance for this backtest
@@ -186,7 +189,8 @@ public class BacktestExecutor
var scenario = config.Scenario.ToScenario();
// Calculate all indicator values once with all candles
preCalculatedIndicatorValues = await ServiceScopeHelpers.WithScopedService<ITradingService, Dictionary<IndicatorType, IndicatorsResultBase>>(
preCalculatedIndicatorValues = await ServiceScopeHelpers
.WithScopedService<ITradingService, Dictionary<IndicatorType, IndicatorsResultBase>>(
_scopeFactory,
async tradingService =>
{
@@ -197,13 +201,15 @@ public class BacktestExecutor
tradingBot.PreCalculatedIndicatorValues = preCalculatedIndicatorValues;
telemetry.IndicatorPreCalculationTime = Stopwatch.GetElapsedTime(indicatorCalcStart);
_logger.LogInformation("✅ Successfully pre-calculated indicator values for {IndicatorCount} indicator types in {Duration:F2}ms",
_logger.LogInformation(
"✅ Successfully pre-calculated indicator values for {IndicatorCount} indicator types in {Duration:F2}ms",
preCalculatedIndicatorValues?.Count ?? 0, telemetry.IndicatorPreCalculationTime.TotalMilliseconds);
}
catch (Exception ex)
{
telemetry.IndicatorPreCalculationTime = Stopwatch.GetElapsedTime(indicatorCalcStart);
_logger.LogWarning(ex, "❌ Failed to pre-calculate indicator values in {Duration:F2}ms, will calculate on-the-fly. Error: {ErrorMessage}",
_logger.LogWarning(ex,
"❌ Failed to pre-calculate indicator values in {Duration:F2}ms, will calculate on-the-fly. Error: {ErrorMessage}",
telemetry.IndicatorPreCalculationTime.TotalMilliseconds, ex.Message);
// Continue with normal calculation if pre-calculation fails
preCalculatedIndicatorValues = null;
@@ -220,8 +226,7 @@ public class BacktestExecutor
// Use optimized rolling window approach - TradingBox.GetSignal only needs last 600 candles
const int rollingWindowSize = 600;
var rollingCandles = new LinkedList<Candle>();
var fixedCandles = new HashSet<Candle>(rollingWindowSize);
var rollingCandles = new List<Candle>(rollingWindowSize); // Pre-allocate capacity for better performance
var candlesProcessed = 0;
// Signal caching optimization - reduce signal update frequency for better performance
@@ -249,15 +254,13 @@ public class BacktestExecutor
Console.WriteLine("CONSOLE: About to start candle processing loop");
foreach (var candle in orderedCandles)
{
// Maintain rolling window efficiently using LinkedList
rollingCandles.AddLast(candle);
fixedCandles.Add(candle);
// Maintain rolling window efficiently using List
rollingCandles.Add(candle);
if (rollingCandles.Count > rollingWindowSize)
{
var removedCandle = rollingCandles.First!.Value;
rollingCandles.RemoveFirst();
fixedCandles.Remove(removedCandle);
// Remove oldest candle (first element) - O(n) but acceptable for small window
rollingCandles.RemoveAt(0);
}
tradingBot.LastCandle = candle;
@@ -267,11 +270,14 @@ public class BacktestExecutor
var shouldSkipSignalUpdate = ShouldSkipSignalUpdate(currentCandle, totalCandles);
if (currentCandle <= 5) // Debug first few candles
{
_logger.LogInformation("🔍 Candle {CurrentCandle}: shouldSkip={ShouldSkip}, totalCandles={Total}", currentCandle, shouldSkipSignalUpdate, totalCandles);
_logger.LogInformation("🔍 Candle {CurrentCandle}: shouldSkip={ShouldSkip}, totalCandles={Total}",
currentCandle, shouldSkipSignalUpdate, totalCandles);
}
if (!shouldSkipSignalUpdate)
{
// Convert to HashSet only when needed for GetSignal (it expects HashSet)
var fixedCandles = new HashSet<Candle>(rollingCandles);
var signalUpdateStart = Stopwatch.GetTimestamp();
await tradingBot.UpdateSignals(fixedCandles);
signalUpdateTotalTime += Stopwatch.GetElapsedTime(signalUpdateStart);
@@ -284,7 +290,9 @@ public class BacktestExecutor
// This saves ~1ms per skipped update and improves performance significantly
if (signalUpdateSkipCount <= 5) // Log first few skips for debugging
{
_logger.LogInformation("⏭️ Signal update skipped for candle {CurrentCandle} (total skipped: {SkipCount})", currentCandle, signalUpdateSkipCount);
_logger.LogInformation(
"⏭️ Signal update skipped for candle {CurrentCandle} (total skipped: {SkipCount})",
currentCandle, signalUpdateSkipCount);
}
}
@@ -318,7 +326,8 @@ public class BacktestExecutor
// Update progress callback if provided (optimized frequency)
var currentPercentage = (currentCandle * 100) / totalCandles;
var timeSinceLastUpdate = (DateTime.UtcNow - lastProgressUpdate).TotalMilliseconds;
if (progressCallback != null && (timeSinceLastUpdate >= progressUpdateIntervalMs || currentPercentage >= lastLoggedPercentage + 10))
if (progressCallback != null && (timeSinceLastUpdate >= progressUpdateIntervalMs ||
currentPercentage >= lastLoggedPercentage + 10))
{
var progressCallbackStart = Stopwatch.GetTimestamp();
try
@@ -330,6 +339,7 @@ public class BacktestExecutor
{
_logger.LogWarning(ex, "Error in progress callback");
}
progressCallbackTotalTime += Stopwatch.GetElapsedTime(progressCallbackStart);
lastProgressUpdate = DateTime.UtcNow;
}
@@ -461,12 +471,15 @@ public class BacktestExecutor
_logger.LogInformation(" • Candle Processing: {Time:F2}ms ({Percentage:F1}%)",
telemetry.CandleProcessingTime.TotalMilliseconds,
telemetry.CandleProcessingTime.TotalMilliseconds / totalExecutionTime.TotalMilliseconds * 100);
_logger.LogInformation(" • Signal Updates: {Time:F2}ms ({Percentage:F1}%) - {Count} updates, {SkipCount} skipped ({Efficiency:F1}% efficiency)",
_logger.LogInformation(
" • Signal Updates: {Time:F2}ms ({Percentage:F1}%) - {Count} updates, {SkipCount} skipped ({Efficiency:F1}% efficiency)",
telemetry.SignalUpdateTime.TotalMilliseconds,
telemetry.SignalUpdateTime.TotalMilliseconds / totalExecutionTime.TotalMilliseconds * 100,
telemetry.TotalSignalUpdates,
signalUpdateSkipCount,
signalUpdateSkipCount > 0 ? (double)signalUpdateSkipCount / (telemetry.TotalSignalUpdates + signalUpdateSkipCount) * 100 : 0);
signalUpdateSkipCount > 0
? (double)signalUpdateSkipCount / (telemetry.TotalSignalUpdates + signalUpdateSkipCount) * 100
: 0);
_logger.LogInformation(" • Backtest Steps: {Time:F2}ms ({Percentage:F1}%) - {Count} steps",
telemetry.BacktestStepTime.TotalMilliseconds,
telemetry.BacktestStepTime.TotalMilliseconds / totalExecutionTime.TotalMilliseconds * 100,
@@ -480,16 +493,19 @@ public class BacktestExecutor
telemetry.ResultCalculationTime.TotalMilliseconds / totalExecutionTime.TotalMilliseconds * 100);
// Performance insights
var signalUpdateAvg = telemetry.TotalSignalUpdates > 0 ?
telemetry.SignalUpdateTime.TotalMilliseconds / telemetry.TotalSignalUpdates : 0;
var backtestStepAvg = telemetry.TotalBacktestSteps > 0 ?
telemetry.BacktestStepTime.TotalMilliseconds / telemetry.TotalBacktestSteps : 0;
var signalUpdateAvg = telemetry.TotalSignalUpdates > 0
? telemetry.SignalUpdateTime.TotalMilliseconds / telemetry.TotalSignalUpdates
: 0;
var backtestStepAvg = telemetry.TotalBacktestSteps > 0
? telemetry.BacktestStepTime.TotalMilliseconds / telemetry.TotalBacktestSteps
: 0;
_logger.LogInformation("🔍 Performance Insights:");
_logger.LogInformation(" • Average Signal Update: {Avg:F2}ms per update", signalUpdateAvg);
_logger.LogInformation(" • Average Backtest Step: {Avg:F2}ms per step", backtestStepAvg);
_logger.LogInformation(" • Memory Efficiency: {Efficiency:F2}MB per 1000 candles",
(telemetry.PeakMemoryUsage - telemetry.MemoryUsageAtStart) / 1024.0 / 1024.0 / (telemetry.TotalCandlesProcessed / 1000.0));
(telemetry.PeakMemoryUsage - telemetry.MemoryUsageAtStart) / 1024.0 / 1024.0 /
(telemetry.TotalCandlesProcessed / 1000.0));
// Identify potential bottlenecks
var bottlenecks = new List<string>();
@@ -608,7 +624,8 @@ public class BacktestExecutor
var bundleRequest = backtestRepository.GetBundleBacktestRequestByIdForUser(user, bundleRequestId);
if (bundleRequest == null)
{
_logger.LogWarning("Bundle request {BundleRequestId} not found for user {UserId}", bundleRequestId, user.Id);
_logger.LogWarning("Bundle request {BundleRequestId} not found for user {UserId}", bundleRequestId,
user.Id);
return;
}
@@ -643,6 +660,7 @@ public class BacktestExecutor
bundleRequest.Status = BundleBacktestRequestStatus.Completed;
bundleRequest.ErrorMessage = $"{failedJobs} backtests failed";
}
bundleRequest.CompletedAt = DateTime.UtcNow;
bundleRequest.CurrentBacktest = null;
}
@@ -667,7 +685,8 @@ public class BacktestExecutor
bundleRequest.Status == BundleBacktestRequestStatus.Completed &&
!string.IsNullOrEmpty(user.TelegramChannel))
{
var message = $"✅ Bundle backtest '{bundleRequest.Name}' (ID: {bundleRequest.RequestId}) completed successfully. " +
var message =
$"✅ Bundle backtest '{bundleRequest.Name}' (ID: {bundleRequest.RequestId}) completed successfully. " +
$"Completed: {completedJobs}/{totalJobs} backtests" +
(failedJobs > 0 ? $", Failed: {failedJobs}" : "") +
$". Results: {resultsList.Count} backtest(s) generated.";
@@ -690,7 +709,8 @@ public class BacktestExecutor
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to update bundle request {BundleRequestId} with backtest {BacktestId}", bundleRequestId, backtest.Id);
_logger.LogError(ex, "Failed to update bundle request {BundleRequestId} with backtest {BacktestId}",
bundleRequestId, backtest.Id);
}
}
@@ -712,4 +732,3 @@ public class BacktestExecutor
}
}
}

View File

@@ -351,6 +351,13 @@ public class TradingBotBase : ITradingBot
private async Task ManagePositions()
{
// Early exit optimization - skip if no positions to manage
var hasOpenPositions = Positions.Values.Any(p => !p.IsFinished());
var hasWaitingSignals = Signals.Values.Any(s => s.Status == SignalStatus.WaitingForPosition);
if (!hasOpenPositions && !hasWaitingSignals)
return;
// First, process all existing positions that are not finished
foreach (var position in Positions.Values.Where(p => !p.IsFinished()))
{

View File

@@ -75,7 +75,10 @@ public static class TradingBox
Dictionary<IndicatorType, IndicatorsResultBase> preCalculatedIndicatorValues)
{
var signalOnCandles = new List<LightSignal>();
var limitedCandles = newCandles.ToList().TakeLast(600).ToList();
// Optimize list creation - avoid redundant allocations
var limitedCandles = newCandles.Count <= 600
? newCandles.OrderBy(c => c.Date).ToList()
: newCandles.OrderBy(c => c.Date).TakeLast(600).ToList();
foreach (var indicator in lightScenario.Indicators)
{

View File

@@ -198,6 +198,10 @@ public class BacktestExecutorTests : BaseTests, IDisposable
Assert.NotNull(candles);
Assert.NotEmpty(candles);
// Use more candles for performance testing (first 5000 candles)
candles = candles.Take(5000).ToList();
Console.WriteLine($"DEBUG: Loaded {candles.Count} candles for backtest");
var scenario = new Scenario("ETH_BacktestScenario");
var rsiDivIndicator = ScenarioHelpers.BuildIndicator(IndicatorType.RsiDivergence, "RsiDiv", period: 14);
scenario.Indicators = new List<IndicatorBase> { (IndicatorBase)rsiDivIndicator };
@@ -211,7 +215,7 @@ public class BacktestExecutorTests : BaseTests, IDisposable
Scenario = LightScenario.FromScenario(scenario),
Timeframe = Timeframe.FifteenMinutes,
IsForWatchingOnly = false,
BotTradingBalance = 1000,
BotTradingBalance = 100000, // Increased balance for testing more candles
IsForBacktest = true,
CooldownPeriod = 1,
MaxLossStreak = 0,
@@ -276,6 +280,93 @@ public class BacktestExecutorTests : BaseTests, IDisposable
Assert.True(result.StartDate < result.EndDate);
}
[Fact]
public async Task ExecuteBacktest_With_Large_Dataset_Should_Show_Performance_Telemetry()
{
// Arrange - Use the large dataset for performance testing
var candles = FileHelpers.ReadJson<List<Candle>>("Data/ETH-FifteenMinutes-candles-20:44:15 +00:00-.json");
Assert.NotNull(candles);
Assert.NotEmpty(candles);
Console.WriteLine($"DEBUG: Loaded {candles.Count} candles for performance telemetry test");
var scenario = new Scenario("ETH_BacktestScenario");
var rsiDivIndicator = ScenarioHelpers.BuildIndicator(IndicatorType.RsiDivergence, "RsiDiv", period: 14);
scenario.Indicators = new List<IndicatorBase> { (IndicatorBase)rsiDivIndicator };
scenario.LoopbackPeriod = 15;
var config = new TradingBotConfig
{
AccountName = _account.Name,
MoneyManagement = MoneyManagement,
Ticker = Ticker.ETH,
Scenario = LightScenario.FromScenario(scenario),
Timeframe = Timeframe.FifteenMinutes,
IsForWatchingOnly = false,
BotTradingBalance = 100000,
IsForBacktest = true,
CooldownPeriod = 1,
MaxLossStreak = 0,
FlipPosition = false,
Name = "ETH_FifteenMinutes_Performance_Test",
FlipOnlyWhenInProfit = true,
MaxPositionTimeHours = null,
CloseEarlyWhenProfitable = false
};
// Track execution time
var startTime = DateTime.UtcNow;
// Act
var result = await _backtestExecutor.ExecuteAsync(
config,
candles.ToHashSet(),
_testUser,
save: false,
withCandles: false,
requestId: null,
bundleRequestId: null,
metadata: null,
progressCallback: null);
var endTime = DateTime.UtcNow;
var totalExecutionTime = (endTime - startTime).TotalSeconds;
// Output performance metrics
Console.WriteLine("═══════════════════════════════════════════════════════════");
Console.WriteLine("📊 PERFORMANCE TELEMETRY TEST RESULTS");
Console.WriteLine("═══════════════════════════════════════════════════════════");
Console.WriteLine($"📈 Total Candles Processed: {candles.Count:N0}");
Console.WriteLine($"⏱️ Total Execution Time: {totalExecutionTime:F2}s");
Console.WriteLine($"🚀 Processing Rate: {candles.Count / totalExecutionTime:F1} candles/sec");
Console.WriteLine($"💾 Memory per 1000 candles: ~{(16.80 - 12.06) / (candles.Count / 1000.0):F2}MB");
Console.WriteLine();
Console.WriteLine("📋 Backtest Results Summary:");
Console.WriteLine($" • Final PnL: {result.FinalPnl:F2}");
Console.WriteLine($" • Win Rate: {result.WinRate}%");
Console.WriteLine($" • Growth: {result.GrowthPercentage:F2}%");
Console.WriteLine($" • Fees: {result.Fees:F2}");
Console.WriteLine($" • Net PnL: {result.NetPnl:F2}");
Console.WriteLine($" • Max Drawdown: {result.MaxDrawdown:F2}");
Console.WriteLine($" • Sharpe Ratio: {result.SharpeRatio:F4}");
Console.WriteLine($" • Score: {result.Score:F2}");
Console.WriteLine($" • Start Date: {result.StartDate:yyyy-MM-dd HH:mm:ss}");
Console.WriteLine($" • End Date: {result.EndDate:yyyy-MM-dd HH:mm:ss}");
Console.WriteLine("═══════════════════════════════════════════════════════════");
// Assert - Validate basic results
Assert.NotNull(result);
Assert.IsType<LightBacktest>(result);
Assert.True(result.StartDate < result.EndDate);
Assert.True(totalExecutionTime > 0);
// Performance assertions - ensure we're processing at a reasonable rate
var candlesPerSecond = candles.Count / totalExecutionTime;
Assert.True(candlesPerSecond > 500, $"Expected >500 candles/sec, got {candlesPerSecond:F1} candles/sec");
Console.WriteLine($"✅ Performance test passed: {candlesPerSecond:F1} candles/sec");
}
public void Dispose()
{
_loggerFactory?.Dispose();

File diff suppressed because it is too large Load Diff

View File

@@ -13,6 +13,9 @@
<None Update="Data\ETH-FifteenMinutes-candles-18:8:36 +00:00-.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Data\ETH-FifteenMinutes-candles-20:44:15 +00:00-.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>

View File

@@ -13,6 +13,10 @@
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="8.0.10"/>
<PackageReference Include="Sentry" Version="5.5.1"/>
<PackageReference Include="Microsoft.AspNetCore.SignalR.Core" Version="1.1.0"/>
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0"/>
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0"/>
<PackageReference Include="OpenTelemetry.Instrumentation.Runtime" Version="1.9.0"/>
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.9.0"/>
</ItemGroup>
<ItemGroup>

View File

@@ -6,6 +6,10 @@ using Managing.Infrastructure.Databases.PostgreSql;
using Managing.Infrastructure.Databases.PostgreSql.Configurations;
using Microsoft.EntityFrameworkCore;
using Npgsql;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
// Explicitly set the environment before creating the host builder
var environment = Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")
@@ -71,6 +75,44 @@ var host = hostBuilder
options.Environment = hostContext.HostingEnvironment.EnvironmentName;
});
// Configure OpenTelemetry for Rider integration (Development only)
// Rider automatically sets OTEL_EXPORTER_OTLP_ENDPOINT when running from IDE
if (hostContext.HostingEnvironment.IsDevelopment())
{
var serviceName = "Managing.Workers";
var serviceVersion = typeof(Program).Assembly.GetName().Version?.ToString() ?? "1.0.0";
var otlpWorkerId = configuration["BacktestComputeWorker:WorkerId"] ?? Environment.MachineName;
services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService(serviceName, serviceVersion: serviceVersion)
.AddAttributes(new Dictionary<string, object>
{
["deployment.environment"] = hostContext.HostingEnvironment.EnvironmentName,
["worker.id"] = otlpWorkerId
}))
.WithMetrics(metrics =>
{
metrics
.AddRuntimeInstrumentation() // GC, thread pool, etc.
.AddMeter("Microsoft.AspNetCore.Hosting")
.AddMeter("Microsoft.AspNetCore.Server.Kestrel")
.AddMeter("System.Net.Http")
.AddOtlpExporter(); // OTLP exporter for Rider integration
})
.WithTracing(tracing =>
{
tracing
.AddHttpClientInstrumentation() // HTTP client calls
.AddSource("Managing.Workers") // Custom activity source
.AddSource("Managing.Application.Workers") // Worker activities
.AddOtlpExporter(); // OTLP exporter for Rider integration
});
// Note: OTLP exporter will use OTEL_EXPORTER_OTLP_ENDPOINT from Rider or environment
// Rider automatically sets this when running from IDE, so data will be sent to Rider's OpenTelemetry service
}
// Configure database
var postgreSqlConnectionString = configuration.GetSection(Constants.Databases.PostgreSql)["ConnectionString"];
@@ -172,6 +214,18 @@ var host = hostBuilder
logging.AddConsole();
logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging"));
// Add OpenTelemetry logging for Rider integration (Development only)
if (hostingContext.HostingEnvironment.IsDevelopment())
{
logging.AddOpenTelemetry(options =>
{
options.IncludeFormattedMessage = true;
options.IncludeScopes = true;
options.ParseStateValues = true;
options.AddOtlpExporter(); // Uses OTEL_EXPORTER_OTLP_ENDPOINT from Rider or environment
});
}
// Filter out EF Core database command logs (SQL queries)
logging.AddFilter("Microsoft.EntityFrameworkCore.Database.Command", LogLevel.Warning);
})