Bundle from worker to grain
This commit is contained in:
411
src/Managing.Application/Grains/BundleBacktestGrain.cs
Normal file
411
src/Managing.Application/Grains/BundleBacktestGrain.cs
Normal file
@@ -0,0 +1,411 @@
|
||||
using System.Text.Json;
|
||||
using Managing.Application.Abstractions.Grains;
|
||||
using Managing.Application.Abstractions.Services;
|
||||
using Managing.Domain.Backtests;
|
||||
using Managing.Domain.Bots;
|
||||
using Managing.Domain.MoneyManagements;
|
||||
using Managing.Domain.Scenarios;
|
||||
using Managing.Domain.Strategies;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Orleans.Concurrency;
|
||||
|
||||
namespace Managing.Application.Grains;
|
||||
|
||||
/// <summary>
|
||||
/// Stateless worker grain for processing bundle backtest requests
|
||||
/// Uses the bundle request ID as the primary key (Guid)
|
||||
/// Implements IRemindable for automatic retry of failed bundles
|
||||
/// </summary>
|
||||
[StatelessWorker]
|
||||
public class BundleBacktestGrain : Grain, IBundleBacktestGrain, IRemindable
|
||||
{
|
||||
private readonly ILogger<BundleBacktestGrain> _logger;
|
||||
private readonly IServiceScopeFactory _scopeFactory;
|
||||
|
||||
// Reminder configuration
|
||||
private const string RETRY_REMINDER_NAME = "BundleBacktestRetry";
|
||||
private static readonly TimeSpan RETRY_INTERVAL = TimeSpan.FromMinutes(30);
|
||||
|
||||
public BundleBacktestGrain(
|
||||
ILogger<BundleBacktestGrain> logger,
|
||||
IServiceScopeFactory scopeFactory)
|
||||
{
|
||||
_logger = logger;
|
||||
_scopeFactory = scopeFactory;
|
||||
}
|
||||
|
||||
public async Task ProcessBundleRequestAsync()
|
||||
{
|
||||
// Get the RequestId from the grain's primary key
|
||||
var bundleRequestId = this.GetPrimaryKey();
|
||||
|
||||
try
|
||||
{
|
||||
// Create a new service scope to get fresh instances of services with scoped DbContext
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var backtester = scope.ServiceProvider.GetRequiredService<IBacktester>();
|
||||
var messengerService = scope.ServiceProvider.GetRequiredService<IMessengerService>();
|
||||
|
||||
// Get the specific bundle request by ID
|
||||
var bundleRequest = await GetBundleRequestById(backtester, bundleRequestId);
|
||||
if (bundleRequest == null)
|
||||
{
|
||||
_logger.LogError("Bundle request {RequestId} not found", bundleRequestId);
|
||||
return;
|
||||
}
|
||||
|
||||
// Process only this specific bundle request
|
||||
await ProcessBundleRequest(bundleRequest, backtester, messengerService);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error in BundleBacktestGrain for request {RequestId}", bundleRequestId);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<BundleBacktestRequest> GetBundleRequestById(IBacktester backtester, Guid bundleRequestId)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Get pending and failed bundle backtest requests for retry capability
|
||||
var pendingRequests = await backtester.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Pending);
|
||||
var failedRequests = await backtester.GetBundleBacktestRequestsByStatusAsync(BundleBacktestRequestStatus.Failed);
|
||||
|
||||
var allRequests = pendingRequests.Concat(failedRequests);
|
||||
return allRequests.FirstOrDefault(r => r.RequestId == bundleRequestId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to get bundle request {RequestId}", bundleRequestId);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessBundleRequest(
|
||||
BundleBacktestRequest bundleRequest,
|
||||
IBacktester backtester,
|
||||
IMessengerService messengerService)
|
||||
{
|
||||
try
|
||||
{
|
||||
_logger.LogInformation("Starting to process bundle backtest request {RequestId}", bundleRequest.RequestId);
|
||||
|
||||
// Update status to running
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Running;
|
||||
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
|
||||
|
||||
// Deserialize the backtest requests as strongly-typed objects
|
||||
var backtestRequests = JsonSerializer.Deserialize<List<RunBacktestRequest>>(bundleRequest.BacktestRequestsJson);
|
||||
if (backtestRequests == null)
|
||||
{
|
||||
throw new InvalidOperationException("Failed to deserialize backtest requests");
|
||||
}
|
||||
|
||||
// Process each backtest request sequentially
|
||||
for (int i = 0; i < backtestRequests.Count; i++)
|
||||
{
|
||||
await ProcessSingleBacktest(backtester, backtestRequests[i], bundleRequest, i);
|
||||
}
|
||||
|
||||
// Update final status and send notifications
|
||||
await UpdateFinalStatus(bundleRequest, backtester, messengerService);
|
||||
|
||||
_logger.LogInformation("Completed processing bundle backtest request {RequestId} with status {Status}",
|
||||
bundleRequest.RequestId, bundleRequest.Status);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error processing bundle backtest request {RequestId}", bundleRequest.RequestId);
|
||||
await HandleBundleRequestError(bundleRequest, backtester, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessSingleBacktest(
|
||||
IBacktester backtester,
|
||||
RunBacktestRequest runBacktestRequest,
|
||||
BundleBacktestRequest bundleRequest,
|
||||
int index)
|
||||
{
|
||||
try
|
||||
{
|
||||
// Get total count from deserialized requests instead of string splitting
|
||||
var backtestRequests = JsonSerializer.Deserialize<List<RunBacktestRequest>>(bundleRequest.BacktestRequestsJson);
|
||||
var totalCount = backtestRequests?.Count ?? 0;
|
||||
|
||||
// Update current backtest being processed
|
||||
bundleRequest.CurrentBacktest = $"Backtest {index + 1} of {totalCount}";
|
||||
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
|
||||
|
||||
// Run the backtest directly with the strongly-typed request
|
||||
var backtestId = await RunSingleBacktest(backtester, runBacktestRequest, bundleRequest, index);
|
||||
if (!string.IsNullOrEmpty(backtestId))
|
||||
{
|
||||
bundleRequest.Results.Add(backtestId);
|
||||
}
|
||||
|
||||
// Update progress
|
||||
bundleRequest.CompletedBacktests++;
|
||||
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
|
||||
|
||||
_logger.LogInformation("Completed backtest {Index} for bundle request {RequestId}",
|
||||
index + 1, bundleRequest.RequestId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error processing backtest {Index} for bundle request {RequestId}",
|
||||
index + 1, bundleRequest.RequestId);
|
||||
bundleRequest.FailedBacktests++;
|
||||
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<string> RunSingleBacktest(
|
||||
IBacktester backtester,
|
||||
RunBacktestRequest runBacktestRequest,
|
||||
BundleBacktestRequest bundleRequest,
|
||||
int index)
|
||||
{
|
||||
if (runBacktestRequest?.Config == null)
|
||||
{
|
||||
_logger.LogError("Invalid RunBacktestRequest in bundle (null config)");
|
||||
return string.Empty;
|
||||
}
|
||||
|
||||
// Map MoneyManagement
|
||||
MoneyManagement moneyManagement = null;
|
||||
if (!string.IsNullOrEmpty(runBacktestRequest.Config.MoneyManagementName))
|
||||
{
|
||||
_logger.LogWarning("MoneyManagementName provided but cannot resolve in grain context: {Name}",
|
||||
runBacktestRequest.Config.MoneyManagementName);
|
||||
}
|
||||
else if (runBacktestRequest.Config.MoneyManagement != null)
|
||||
{
|
||||
var mmReq = runBacktestRequest.Config.MoneyManagement;
|
||||
moneyManagement = new MoneyManagement
|
||||
{
|
||||
Name = mmReq.Name,
|
||||
Timeframe = mmReq.Timeframe,
|
||||
StopLoss = mmReq.StopLoss,
|
||||
TakeProfit = mmReq.TakeProfit,
|
||||
Leverage = mmReq.Leverage
|
||||
};
|
||||
moneyManagement.FormatPercentage();
|
||||
}
|
||||
|
||||
// Map Scenario
|
||||
LightScenario scenario = null;
|
||||
if (runBacktestRequest.Config.Scenario != null)
|
||||
{
|
||||
var sReq = runBacktestRequest.Config.Scenario;
|
||||
scenario = new LightScenario(sReq.Name, sReq.LoopbackPeriod)
|
||||
{
|
||||
Indicators = sReq.Indicators?.Select(i => new LightIndicator(i.Name, i.Type)
|
||||
{
|
||||
SignalType = i.SignalType,
|
||||
MinimumHistory = i.MinimumHistory,
|
||||
Period = i.Period,
|
||||
FastPeriods = i.FastPeriods,
|
||||
SlowPeriods = i.SlowPeriods,
|
||||
SignalPeriods = i.SignalPeriods,
|
||||
Multiplier = i.Multiplier,
|
||||
SmoothPeriods = i.SmoothPeriods,
|
||||
StochPeriods = i.StochPeriods,
|
||||
CyclePeriods = i.CyclePeriods
|
||||
}).ToList() ?? new List<LightIndicator>()
|
||||
};
|
||||
}
|
||||
|
||||
// Map TradingBotConfig
|
||||
var backtestConfig = new TradingBotConfig
|
||||
{
|
||||
AccountName = runBacktestRequest.Config.AccountName,
|
||||
MoneyManagement = moneyManagement,
|
||||
Ticker = runBacktestRequest.Config.Ticker,
|
||||
ScenarioName = runBacktestRequest.Config.ScenarioName,
|
||||
Scenario = scenario,
|
||||
Timeframe = runBacktestRequest.Config.Timeframe,
|
||||
IsForWatchingOnly = runBacktestRequest.Config.IsForWatchingOnly,
|
||||
BotTradingBalance = runBacktestRequest.Config.BotTradingBalance,
|
||||
IsForBacktest = true,
|
||||
CooldownPeriod = runBacktestRequest.Config.CooldownPeriod ?? 1,
|
||||
MaxLossStreak = runBacktestRequest.Config.MaxLossStreak,
|
||||
MaxPositionTimeHours = runBacktestRequest.Config.MaxPositionTimeHours,
|
||||
FlipOnlyWhenInProfit = runBacktestRequest.Config.FlipOnlyWhenInProfit,
|
||||
FlipPosition = runBacktestRequest.Config.FlipPosition,
|
||||
Name = $"{bundleRequest.Name} #{index + 1}",
|
||||
CloseEarlyWhenProfitable = runBacktestRequest.Config.CloseEarlyWhenProfitable,
|
||||
UseSynthApi = runBacktestRequest.Config.UseSynthApi,
|
||||
UseForPositionSizing = runBacktestRequest.Config.UseForPositionSizing,
|
||||
UseForSignalFiltering = runBacktestRequest.Config.UseForSignalFiltering,
|
||||
UseForDynamicStopLoss = runBacktestRequest.Config.UseForDynamicStopLoss
|
||||
};
|
||||
|
||||
// Run the backtest
|
||||
var result = await backtester.RunTradingBotBacktest(
|
||||
backtestConfig,
|
||||
runBacktestRequest.StartDate,
|
||||
runBacktestRequest.EndDate,
|
||||
bundleRequest.User,
|
||||
true,
|
||||
runBacktestRequest.WithCandles,
|
||||
bundleRequest.RequestId.ToString()
|
||||
);
|
||||
|
||||
_logger.LogInformation("Processed backtest for bundle request {RequestId}", bundleRequest.RequestId);
|
||||
return result.Id;
|
||||
}
|
||||
|
||||
private async Task UpdateFinalStatus(
|
||||
BundleBacktestRequest bundleRequest,
|
||||
IBacktester backtester,
|
||||
IMessengerService messengerService)
|
||||
{
|
||||
if (bundleRequest.FailedBacktests == 0)
|
||||
{
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Completed;
|
||||
await NotifyUser(bundleRequest, messengerService);
|
||||
}
|
||||
else if (bundleRequest.CompletedBacktests == 0)
|
||||
{
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Failed;
|
||||
bundleRequest.ErrorMessage = "All backtests failed";
|
||||
}
|
||||
else
|
||||
{
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Completed;
|
||||
bundleRequest.ErrorMessage = $"{bundleRequest.FailedBacktests} backtests failed";
|
||||
await NotifyUser(bundleRequest, messengerService);
|
||||
}
|
||||
|
||||
bundleRequest.CompletedAt = DateTime.UtcNow;
|
||||
bundleRequest.CurrentBacktest = null;
|
||||
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
|
||||
|
||||
// Unregister retry reminder since bundle completed
|
||||
await UnregisterRetryReminder();
|
||||
}
|
||||
|
||||
private async Task HandleBundleRequestError(
|
||||
BundleBacktestRequest bundleRequest,
|
||||
IBacktester backtester,
|
||||
Exception ex)
|
||||
{
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Failed;
|
||||
bundleRequest.ErrorMessage = ex.Message;
|
||||
bundleRequest.CompletedAt = DateTime.UtcNow;
|
||||
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
|
||||
|
||||
// Register retry reminder for failed bundle
|
||||
await RegisterRetryReminder();
|
||||
}
|
||||
|
||||
private async Task NotifyUser(BundleBacktestRequest bundleRequest, IMessengerService messengerService)
|
||||
{
|
||||
if (bundleRequest.User?.TelegramChannel != null)
|
||||
{
|
||||
var message = bundleRequest.FailedBacktests == 0
|
||||
? $"✅ Bundle backtest '{bundleRequest.Name}' (ID: {bundleRequest.RequestId}) completed successfully."
|
||||
: $"⚠️ Bundle backtest '{bundleRequest.Name}' (ID: {bundleRequest.RequestId}) completed with {bundleRequest.FailedBacktests} failed backtests.";
|
||||
|
||||
await messengerService.SendMessage(message, bundleRequest.User.TelegramChannel);
|
||||
}
|
||||
}
|
||||
|
||||
#region IRemindable Implementation
|
||||
|
||||
/// <summary>
|
||||
/// Handles reminder callbacks for automatic retry of failed bundle backtests
|
||||
/// </summary>
|
||||
public async Task ReceiveReminder(string reminderName, TickStatus status)
|
||||
{
|
||||
if (reminderName != RETRY_REMINDER_NAME)
|
||||
{
|
||||
_logger.LogWarning("Unknown reminder {ReminderName} received", reminderName);
|
||||
return;
|
||||
}
|
||||
|
||||
var bundleRequestId = this.GetPrimaryKey();
|
||||
_logger.LogInformation("Retry reminder triggered for bundle request {RequestId}", bundleRequestId);
|
||||
|
||||
try
|
||||
{
|
||||
using var scope = _scopeFactory.CreateScope();
|
||||
var backtester = scope.ServiceProvider.GetRequiredService<IBacktester>();
|
||||
|
||||
// Get the bundle request
|
||||
var bundleRequest = await GetBundleRequestById(backtester, bundleRequestId);
|
||||
if (bundleRequest == null)
|
||||
{
|
||||
_logger.LogWarning("Bundle request {RequestId} not found during retry", bundleRequestId);
|
||||
await UnregisterRetryReminder();
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if bundle is still failed
|
||||
if (bundleRequest.Status != BundleBacktestRequestStatus.Failed)
|
||||
{
|
||||
_logger.LogInformation("Bundle request {RequestId} is no longer failed (status: {Status}), unregistering reminder",
|
||||
bundleRequestId, bundleRequest.Status);
|
||||
await UnregisterRetryReminder();
|
||||
return;
|
||||
}
|
||||
|
||||
// Retry the bundle processing
|
||||
_logger.LogInformation("Retrying failed bundle request {RequestId}", bundleRequestId);
|
||||
|
||||
// Reset status to pending for retry
|
||||
bundleRequest.Status = BundleBacktestRequestStatus.Pending;
|
||||
bundleRequest.ErrorMessage = null;
|
||||
bundleRequest.CurrentBacktest = null;
|
||||
await backtester.UpdateBundleBacktestRequestAsync(bundleRequest);
|
||||
|
||||
// Process the bundle again
|
||||
await ProcessBundleRequestAsync();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Error during bundle backtest retry for request {RequestId}", bundleRequestId);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Registers a retry reminder for this bundle request
|
||||
/// </summary>
|
||||
private async Task RegisterRetryReminder()
|
||||
{
|
||||
try
|
||||
{
|
||||
await this.RegisterOrUpdateReminder(RETRY_REMINDER_NAME, RETRY_INTERVAL, RETRY_INTERVAL);
|
||||
_logger.LogInformation("Registered retry reminder for bundle request {RequestId}", this.GetPrimaryKey());
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to register retry reminder for bundle request {RequestId}", this.GetPrimaryKey());
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unregisters the retry reminder for this bundle request
|
||||
/// </summary>
|
||||
private async Task UnregisterRetryReminder()
|
||||
{
|
||||
try
|
||||
{
|
||||
var reminder = await this.GetReminder(RETRY_REMINDER_NAME);
|
||||
if (reminder != null)
|
||||
{
|
||||
await this.UnregisterReminder(reminder);
|
||||
_logger.LogInformation("Unregistered retry reminder for bundle request {RequestId}", this.GetPrimaryKey());
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to unregister retry reminder for bundle request {RequestId}", this.GetPrimaryKey());
|
||||
}
|
||||
}
|
||||
|
||||
#endregion
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user