Files
managing-apps/src/Managing.Application/Bots/Grains/LiveTradingBotGrain.cs
cryptooda 97103fbfe8 Add master strategy validation in LiveTradingBotGrain
- Introduced a check to ensure the master strategy is retrieved successfully before proceeding with key validation.
- Added logging for cases where the master strategy is not found, improving traceability in the bot's operation.
2025-11-19 23:39:38 +07:00

1214 lines
48 KiB
C#

using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Services;
using Managing.Application.Orleans;
using Managing.Application.Shared;
using Managing.Core;
using Managing.Domain.Accounts;
using Managing.Domain.Bots;
using Managing.Domain.Indicators;
using Managing.Domain.Shared.Helpers;
using Managing.Domain.Trades;
using Managing.Domain.Users;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Orleans.Streams;
using static Managing.Common.Enums;
namespace Managing.Application.Bots.Grains;
/// <summary>
/// Orleans grain for live trading bot operations.
/// Uses composition with TradingBotBase to maintain separation of concerns.
/// This grain handles live trading scenarios with real-time market data and execution.
/// Uses custom trading placement with load balancing and built-in fallback.
/// </summary>
[TradingPlacement] // Use custom trading placement with load balancing
public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
{
private readonly IPersistentState<TradingBotGrainState> _state;
private readonly ILogger<LiveTradingBotGrain> _logger;
private readonly IServiceScopeFactory _scopeFactory;
private readonly IKaigenService _kaigenService;
private TradingBotBase? _tradingBot;
private IDisposable? _timer;
private string _reminderName = "RebootReminder";
private StreamSubscriptionHandle<Position>? _copyTradingStreamHandle;
public LiveTradingBotGrain(
[PersistentState("live-trading-bot", "bot-store")]
IPersistentState<TradingBotGrainState> state,
ILogger<LiveTradingBotGrain> logger,
IServiceScopeFactory scopeFactory,
IKaigenService kaigenService)
{
_logger = logger;
_scopeFactory = scopeFactory;
_state = state;
_kaigenService = kaigenService;
}
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("LiveTradingBotGrain {GrainId} activated", this.GetPrimaryKey());
await ResumeBotIfRequiredAsync();
await base.OnActivateAsync(cancellationToken);
}
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
_logger.LogInformation("LiveTradingBotGrain {GrainId} deactivating. Reason: {Reason}",
this.GetPrimaryKey(), reason.Description);
StopAndDisposeTimer();
await base.OnDeactivateAsync(reason, cancellationToken);
}
public async Task CreateAsync(TradingBotConfig config, User user)
{
if (config == null || string.IsNullOrEmpty(config.Name))
{
throw new InvalidOperationException("Bot configuration is not properly initialized");
}
if (config.IsForBacktest)
{
throw new InvalidOperationException("LiveTradingBotGrain cannot be used for backtesting");
}
// This is a new bot, so we can assume it's not registered or active.
_state.State.Config = config;
_state.State.User = user;
_state.State.CreateDate = DateTime.UtcNow;
_state.State.Identifier = this.GetPrimaryKey();
await _state.WriteStateAsync();
var botRegistry = GrainFactory.GetGrain<ILiveBotRegistryGrain>(0);
await botRegistry.RegisterBot(_state.State.Identifier, user.Id);
// Register the bot with the user's agent
var agentGrain = GrainFactory.GetGrain<IAgentGrain>(user.Id);
await agentGrain.RegisterBotAsync(_state.State.Identifier);
await SaveBotAsync(BotStatus.Saved);
_logger.LogInformation("LiveTradingBotGrain {GrainId} created successfully", this.GetPrimaryKey());
}
private async Task ResumeBotIfRequiredAsync()
{
var botRegistry = GrainFactory.GetGrain<ILiveBotRegistryGrain>(0);
var botId = this.GetPrimaryKey();
var registryStatus = await botRegistry.GetBotStatus(botId);
_logger.LogInformation("LiveTradingBotGrain {GrainId} activated. Registry status: {Status}",
botId, registryStatus);
if (registryStatus == BotStatus.Running && _tradingBot == null)
{
// Bot is running in registry, proceed with resuming (no DB check needed)
await ResumeBotInternalAsync(registryStatus);
}
else if (registryStatus == BotStatus.Stopped)
{
// Registry says stopped, but check database to see if it should be running
var databaseStatus = await GetDatabaseBotStatus(botId);
_logger.LogInformation(
"LiveTradingBotGrain {GrainId} registry: {RegistryStatus}, database: {DatabaseStatus}",
botId, registryStatus, databaseStatus);
if (databaseStatus == BotStatus.Running)
{
// Database says running but registry says stopped - trust database
_logger.LogWarning(
"Status mismatch detected for bot {BotId}. Registry: {RegistryStatus}, Database: {DatabaseStatus}. Trusting database and updating registry.",
botId, registryStatus, databaseStatus);
// Update registry to match database (source of truth)
await botRegistry.UpdateBotStatus(botId, databaseStatus);
// Now proceed with resuming the bot
await ResumeBotInternalAsync(databaseStatus);
}
else
{
// Both registry and database agree it should be stopped
await UnregisterReminder();
_logger.LogInformation("LiveTradingBotGrain {GrainId} status is stopped, reminder unregistered", botId);
}
}
}
/// <summary>
/// Gets the bot status from the database (source of truth)
/// </summary>
private async Task<BotStatus> GetDatabaseBotStatus(Guid botId)
{
try
{
var bot = await ServiceScopeHelpers.WithScopedService<IBotService, Bot>(
_scopeFactory,
async botService => await botService.GetBotByIdentifier(botId));
return bot?.Status ?? BotStatus.Saved;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to get database status for bot {BotId}, defaulting to Saved", botId);
return BotStatus.Saved;
}
}
private async Task ResumeBotInternalAsync(BotStatus previousStatus)
{
// Idempotency check
if (_tradingBot != null)
{
return;
}
try
{
// Handle fallback for empty AccountName before creating the trading bot instance
var config = _state.State.Config;
if (string.IsNullOrEmpty(config.AccountName))
{
// Fallback: Get the first account for the user
if (_state.State.User == null)
{
throw new InvalidOperationException(
$"Bot '{config.Name}' (ID: {_state.State.Identifier}) has no user information. Cannot determine fallback account.");
}
var firstAccount = await ServiceScopeHelpers.WithScopedService<IAccountService, Account>(
_scopeFactory,
async accountService =>
{
var userAccounts = await accountService.GetAccountsByUserAsync(_state.State.User, true, true);
var account = userAccounts.FirstOrDefault();
if (account == null)
{
throw new InvalidOperationException(
$"User '{_state.State.User.Name}' has no accounts configured.");
}
return account;
});
// Update the configuration with the fallback account name
config.AccountName = firstAccount.Name;
_state.State.Config = config;
await _state.WriteStateAsync();
_logger.LogInformation(
"Bot '{BotName}' (ID: {BotId}) using fallback account '{AccountName}' for user '{UserName}'",
config.Name, _state.State.Identifier, firstAccount.Name, _state.State.User.Name);
}
// Create and initialize trading bot instance
_tradingBot = CreateTradingBotInstance(config);
await _tradingBot.Start(previousStatus);
// Set startup time only once (first successful start)
if (_state.State.StartupTime == default)
{
_state.State.StartupTime = DateTime.UtcNow;
}
if (previousStatus != BotStatus.Running)
{
_state.State.LastStartTime = DateTime.UtcNow;
}
await _state.WriteStateAsync();
// Start the in-memory timer and persistent reminder
RegisterAndStartTimer();
await RegisterReminder();
// Subscribe to copy trading stream if configured
await SubscribeToCopyTradingStreamAsync();
// Update both database and registry status
await SaveBotAsync(BotStatus.Running);
await UpdateBotRegistryStatus(BotStatus.Running);
_logger.LogInformation("LiveTradingBotGrain {GrainId} resumed successfully", this.GetPrimaryKey());
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to resume bot {GrainId}", this.GetPrimaryKey());
_tradingBot = null; // Clean up on failure
await UpdateBotRegistryStatus(BotStatus.Stopped);
throw;
}
}
public async Task StartAsync()
{
var botRegistry = GrainFactory.GetGrain<ILiveBotRegistryGrain>(0);
var botId = this.GetPrimaryKey();
var status = await botRegistry.GetBotStatus(botId);
// Check if already running
if (status == BotStatus.Running && _tradingBot != null)
{
await RegisterReminder();
// Ensure runtime timestamps are consistent if already running
if (!_state.State.LastStartTime.HasValue)
{
_state.State.LastStartTime = DateTime.UtcNow;
_state.State.LastStopTime = null;
await _state.WriteStateAsync();
await SaveBotAsync(BotStatus.Running);
}
_logger.LogInformation("LiveTradingBotGrain {GrainId} is already running", this.GetPrimaryKey());
return;
}
try
{
// Resume the bot - this handles registry status update internally
await ResumeBotInternalAsync(status);
_logger.LogInformation("LiveTradingBotGrain {GrainId} started successfully", this.GetPrimaryKey());
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to start LiveTradingBotGrain {GrainId}", this.GetPrimaryKey());
// Ensure registry status is correct on failure
await UpdateBotRegistryStatus(BotStatus.Stopped);
throw;
}
}
private async Task RegisterReminder()
{
await this.RegisterOrUpdateReminder(_reminderName, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(5));
}
/// <summary>
/// Starts the Orleans timer for periodic bot execution
/// </summary>
private void RegisterAndStartTimer()
{
if (_tradingBot == null) return;
if (_timer != null) return;
var botOptions = GrainHelpers.GetDynamicRandomizedTimerOptions(TimeSpan.FromMinutes(1), 100);
_timer = this.RegisterGrainTimer(
async _ => await ExecuteBotCycle(),
new GrainTimerCreationOptions
{
Period = botOptions.period,
DueTime = botOptions.dueTime,
KeepAlive = true
});
}
public async Task StopAsync(string reason)
{
await StopAsyncInternal(false, reason);
}
private async Task StopAsyncInternal(bool isRestarting, string? reason = null)
{
// Only check for open positions if this is not part of a restart operation
if (!isRestarting)
{
await CloseAllOpenPositionsAsync();
}
// The check is now against the registry status
var botRegistry = GrainFactory.GetGrain<ILiveBotRegistryGrain>(0);
var botStatus = await botRegistry.GetBotStatus(this.GetPrimaryKey());
if (botStatus == BotStatus.Stopped)
{
_logger.LogInformation("Bot {GrainId} is already stopped", this.GetPrimaryKey());
await UnregisterReminder();
return;
}
try
{
StopAndDisposeTimer();
await UnregisterReminder();
// Unsubscribe from copy trading stream
await UnsubscribeFromCopyTradingStreamAsync();
// Track runtime: accumulate current session runtime when stopping
if (_state.State.LastStartTime.HasValue)
{
var currentSessionSeconds = (long)(DateTime.UtcNow - _state.State.LastStartTime.Value).TotalSeconds;
_state.State.AccumulatedRunTimeSeconds += currentSessionSeconds;
_state.State.LastStopTime = DateTime.UtcNow;
_state.State.LastStartTime = null; // Clear since bot is no longer running
_logger.LogInformation(
"Bot {GrainId} accumulated {Seconds} seconds of runtime. Total: {TotalSeconds} seconds",
this.GetPrimaryKey(), currentSessionSeconds, _state.State.AccumulatedRunTimeSeconds);
}
// Sync state from the volatile TradingBotBase before destroying it
SyncStateFromBase();
await _state.WriteStateAsync();
await SaveBotAsync(BotStatus.Stopped);
await _tradingBot?.StopBot(reason)!;
_tradingBot = null;
await UpdateBotRegistryStatus(BotStatus.Stopped);
_logger.LogInformation("LiveTradingBotGrain {GrainId} stopped successfully", this.GetPrimaryKey());
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to stop LiveTradingBotGrain {GrainId}", this.GetPrimaryKey());
throw;
}
}
private void StopAndDisposeTimer()
{
if (_timer != null)
{
// Stop the timer
_timer?.Dispose();
_timer = null;
}
}
private async Task UnregisterReminder()
{
var reminder = await this.GetReminder(_reminderName);
if (reminder != null)
{
await this.UnregisterReminder(reminder);
}
}
/// <summary>
/// Subscribes to the copy trading stream if this bot is configured for copy trading
/// </summary>
private async Task SubscribeToCopyTradingStreamAsync()
{
// Only subscribe if this is a copy trading bot and we have a master bot identifier
if (!_state.State.Config.IsForCopyTrading || !_state.State.Config.MasterBotIdentifier.HasValue)
{
return;
}
try
{
var streamProvider = this.GetStreamProvider("ManagingStreamProvider");
var streamId = StreamId.Create("CopyTrading", _state.State.Config.MasterBotIdentifier.Value);
_copyTradingStreamHandle = await streamProvider.GetStream<Position>(streamId)
.SubscribeAsync(OnCopyTradingPositionReceivedAsync);
_logger.LogInformation("LiveTradingBotGrain {GrainId} subscribed to copy trading stream for master bot {MasterBotId}",
this.GetPrimaryKey(), _state.State.Config.MasterBotIdentifier.Value);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to subscribe to copy trading stream for bot {GrainId}", this.GetPrimaryKey());
}
}
/// <summary>
/// Unsubscribes from the copy trading stream
/// </summary>
private async Task UnsubscribeFromCopyTradingStreamAsync()
{
if (_copyTradingStreamHandle != null)
{
await _copyTradingStreamHandle.UnsubscribeAsync();
_copyTradingStreamHandle = null;
_logger.LogInformation("LiveTradingBotGrain {GrainId} unsubscribed from copy trading stream", this.GetPrimaryKey());
}
}
/// <summary>
/// Handles incoming positions from the copy trading stream
/// </summary>
private async Task OnCopyTradingPositionReceivedAsync(Position masterPosition, StreamSequenceToken token)
{
try
{
if (_tradingBot == null)
{
_logger.LogWarning("Received copy trading position {PositionId} but trading bot is not running for bot {GrainId}",
masterPosition.Identifier, this.GetPrimaryKey());
return;
}
_logger.LogInformation("📡 Copy trading: Received position {PositionId} from master bot for bot {GrainId}",
masterPosition.Identifier, this.GetPrimaryKey());
// Create a copy of the position for this bot
await _tradingBot.CopyPositionFromMasterAsync(masterPosition);
_logger.LogInformation("✅ Copy trading: Successfully copied position {PositionId} for bot {GrainId}",
masterPosition.Identifier, this.GetPrimaryKey());
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to handle copy trading position {PositionId} for bot {GrainId}",
masterPosition.Identifier, this.GetPrimaryKey());
}
}
/// <summary>
/// Creates a TradingBotBase instance using composition
/// </summary>
private TradingBotBase CreateTradingBotInstance(TradingBotConfig config)
{
if (string.IsNullOrEmpty(config.AccountName))
{
throw new InvalidOperationException("Account name is required for live trading");
}
using var scope = _scopeFactory.CreateScope();
var logger = scope.ServiceProvider.GetRequiredService<ILogger<TradingBotBase>>();
var streamProvider = this.GetStreamProvider("ManagingStreamProvider");
var tradingBot = new TradingBotBase(logger, _scopeFactory, config, streamProvider);
// Load state into the trading bot instance
LoadStateIntoTradingBot(tradingBot);
return tradingBot;
}
/// <summary>
/// Loads grain state into a trading bot instance
/// </summary>
private void LoadStateIntoTradingBot(TradingBotBase tradingBot)
{
tradingBot.Signals = _state.State.Signals;
tradingBot.Positions = _state.State.Positions;
tradingBot.WalletBalances = _state.State.WalletBalances;
tradingBot.PreloadedCandlesCount = _state.State.PreloadedCandlesCount;
tradingBot.ExecutionCount = _state.State.ExecutionCount;
tradingBot.Identifier = _state.State.Identifier;
tradingBot.LastPositionClosingTime = _state.State.LastPositionClosingTime;
tradingBot.LastCandle = _state.State.LastCandle;
}
/// <summary>
/// Executes one cycle of the trading bot
/// </summary>
private async Task ExecuteBotCycle()
{
try
{
if (_tradingBot == null)
{
return;
}
// Check if copy trading authorization is still valid
if (_state.State.Config.IsForCopyTrading && _state.State.Config.MasterBotIdentifier.HasValue)
{
try
{
var ownedKeys = await _kaigenService.GetOwnedKeysAsync(_state.State.User);
var masterStrategy = await ServiceScopeHelpers.WithScopedService<IBotService, Bot>(
_scopeFactory,
async botService => await botService.GetBotByIdentifier(_state.State.Config.MasterBotIdentifier.Value));
if (masterStrategy == null)
{
_logger.LogWarning("Master strategy {MasterBotId} not found", _state.State.Config.MasterBotIdentifier.Value);
return;
}
var hasMasterStrategyKey = ownedKeys.Items.Any(key =>
string.Equals(key.AgentName, masterStrategy.User.AgentName, StringComparison.OrdinalIgnoreCase) &&
key.Owned >= 1);
if (!hasMasterStrategyKey)
{
_logger.LogWarning(
"Copy trading bot {GrainId} no longer has authorization for master strategy {MasterBotId}. Stopping bot.",
this.GetPrimaryKey(), _state.State.Config.MasterBotIdentifier.Value);
await StopAsync("Copy trading authorization revoked - user no longer owns keys for master strategy");
return;
}
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to verify copy trading authorization for bot {GrainId} with master strategy {MasterBotId}. Continuing execution.",
this.GetPrimaryKey(), _state.State.Config.MasterBotIdentifier.Value);
SentrySdk.CaptureException(ex);
}
}
if (_tradingBot.Positions.Any(p => p.Value.IsOpen() || p.Value.Status.Equals(PositionStatus.New)))
{
_logger.LogInformation(
"Bot {BotId} has open positions. Trading loop will continue managing existing positions.",
_state.State.Identifier);
}
else
{
// If no open positions, ensure ETH balance is sufficient for new positions
// Use coordinated balance checking and swap management through AgentGrain
try
{
var agentGrain = GrainFactory.GetGrain<IAgentGrain>(_state.State.User.Id);
var balanceCheckResult =
await agentGrain.CheckAndEnsureEthBalanceAsync(_state.State.Identifier,
_tradingBot.Account.Name);
if (!balanceCheckResult.IsSuccessful)
{
// Check if the bot should stop due to this failure
if (balanceCheckResult.ShouldStopBot)
{
await StopAsync(balanceCheckResult.Message);
return;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during coordinated balance check for bot {BotId}",
_state.State.Identifier);
SentrySdk.CaptureException(ex);
// Continue execution to avoid stopping the bot due to coordination errors
}
}
// Execute the bot's Run method
await _tradingBot.Run();
SyncStateFromBase();
await _state.WriteStateAsync();
// Save bot statistics to database
await SaveBotAsync(BotStatus.Running);
}
catch (ObjectDisposedException)
{
// Gracefully handle disposed service provider during shutdown
_logger.LogInformation("Service provider disposed during shutdown for LiveTradingBotGrain {GrainId}",
this.GetPrimaryKey());
return;
}
catch (Exception ex)
{
// TODO : Turn off the bot if an error occurs
_logger.LogError(ex, "Error during bot execution cycle for LiveTradingBotGrain {GrainId}",
this.GetPrimaryKey());
}
}
public async Task<LightSignal> CreateManualSignalAsync(TradeDirection direction)
{
try
{
if (_tradingBot == null)
{
throw new InvalidOperationException("Bot is not running");
}
// Ensure LastCandle is available for manual position opening
if (_tradingBot.LastCandle == null)
{
_logger.LogInformation("LastCandle is null, loading latest candle data for manual position opening");
await _tradingBot.LoadLastCandle();
// Sync the loaded candle to grain state
SyncStateFromBase();
await _state.WriteStateAsync();
}
return await _tradingBot.CreateManualSignal(direction);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to open manual position for LiveTradingBotGrain {GrainId}",
this.GetPrimaryKey());
throw;
}
}
public Task<LiveTradingBotModel> GetBotDataAsync()
{
try
{
if (_tradingBot == null)
{
// For non-running bots, return data from grain state only
return Task.FromResult(new LiveTradingBotModel
{
Identifier = _state.State.Identifier,
Name = _state.State.Config?.Name ?? "Unknown",
Config = _state.State.Config,
Positions = _state.State.Positions ?? new Dictionary<Guid, Position>(),
Signals = _state.State.Signals,
WalletBalances = _state.State.WalletBalances ?? new Dictionary<DateTime, decimal>(),
ProfitAndLoss = 0, // Calculate from persisted positions if needed
WinRate = 0, // Calculate from persisted positions if needed
ExecutionCount = _state.State.ExecutionCount,
StartupTime = _state.State.StartupTime,
CreateDate = _state.State.CreateDate
});
}
// For running bots, return live data
return Task.FromResult(new LiveTradingBotModel
{
Identifier = _state.State.Identifier,
Name = _state.State.Config?.Name ?? "Unknown",
Config = _state.State.Config,
Positions = _tradingBot.Positions,
Signals = _tradingBot.Signals,
WalletBalances = _tradingBot.WalletBalances,
ProfitAndLoss = TradingBox.GetTotalNetPnL(_tradingBot.Positions),
WinRate = TradingBox.GetWinRate(_tradingBot.Positions),
ExecutionCount = _state.State.ExecutionCount,
StartupTime = _state.State.StartupTime,
CreateDate = _state.State.CreateDate
});
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to get bot data for LiveTradingBotGrain {GrainId}", this.GetPrimaryKey());
throw;
}
}
private void SyncStateFromBase()
{
if (_tradingBot == null) return;
_state.State.Signals = _tradingBot.Signals;
_state.State.Positions = _tradingBot.Positions;
_state.State.WalletBalances = _tradingBot.WalletBalances;
_state.State.PreloadedCandlesCount = _tradingBot.PreloadedCandlesCount;
_state.State.ExecutionCount = _tradingBot.ExecutionCount;
_state.State.Identifier = _tradingBot.Identifier;
_state.State.LastPositionClosingTime = _tradingBot.LastPositionClosingTime;
_state.State.LastCandle = _tradingBot.LastCandle;
_state.State.Config = _tradingBot.Config;
}
public async Task<bool> UpdateConfiguration(TradingBotConfig newConfig)
{
if (_tradingBot == null)
{
// For non-running bots, just update the configuration
_state.State.Config = newConfig;
await _state.WriteStateAsync();
var botRegistry = GrainFactory.GetGrain<ILiveBotRegistryGrain>(0);
var status = await botRegistry.GetBotStatus(this.GetPrimaryKey());
await SaveBotAsync(status);
return true;
}
var result = await _tradingBot.UpdateConfiguration(newConfig);
if (result)
{
var botRegistry = GrainFactory.GetGrain<ILiveBotRegistryGrain>(0);
var status = await botRegistry.GetBotStatus(this.GetPrimaryKey());
_state.State.Config = newConfig;
await _state.WriteStateAsync();
await SaveBotAsync(status);
}
return result;
}
public Task<Account> GetAccount()
{
if (_tradingBot == null)
{
throw new InvalidOperationException("Bot is not running - cannot get account information");
}
return Task.FromResult(_tradingBot.Account);
}
public Task<TradingBotConfig> GetConfiguration()
{
return Task.FromResult(_state.State.Config);
}
public async Task<Position> ClosePositionAsync(Guid positionId)
{
if (_tradingBot == null)
{
throw new InvalidOperationException("Bot is not running");
}
if (!_tradingBot.Positions.TryGetValue(positionId, out var position))
{
throw new InvalidOperationException($"Position with ID {positionId} not found");
}
var signal = _tradingBot.Signals.TryGetValue(position.SignalIdentifier, out var foundSignal)
? foundSignal
: null;
if (signal == null)
{
throw new InvalidOperationException($"Signal with ID {position.SignalIdentifier} not found");
}
await _tradingBot.CloseTrade(signal, position, position.Open, _tradingBot.LastCandle.Close, true);
return position;
}
public async Task RestartAsync()
{
_logger.LogInformation("Restarting LiveTradingBotGrain {GrainId}", this.GetPrimaryKey());
try
{
// Use internal stop method that bypasses open position check for restart
await StopAsyncInternal(true);
// Add a small delay to ensure stop operations complete
await Task.Delay(100);
await StartAsync();
// Verify the restart was successful
var botRegistry = GrainFactory.GetGrain<ILiveBotRegistryGrain>(0);
var finalStatus = await botRegistry.GetBotStatus(this.GetPrimaryKey());
_logger.LogInformation("LiveTradingBotGrain {GrainId} restart completed with final status: {Status}",
this.GetPrimaryKey(), finalStatus);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to restart LiveTradingBotGrain {GrainId}", this.GetPrimaryKey());
throw;
}
}
public async Task DeleteAsync()
{
// Check if bot has open positions in database before allowing deletion
var hasOpenPositions = await HasOpenPositionsInDatabaseAsync();
if (hasOpenPositions)
{
_logger.LogWarning("Cannot delete LiveTradingBotGrain {GrainId} - bot has open positions in database",
this.GetPrimaryKey());
throw new InvalidOperationException(
"Cannot delete bot while it has open positions. Please close all positions first.");
}
try
{
// Stop the bot first if it's running
await StopAsync("Deleting bot");
// Unregister from the bot registry
var botRegistry = GrainFactory.GetGrain<ILiveBotRegistryGrain>(0);
await botRegistry.UnregisterBot(_state.State.Identifier);
// Unregister from the user's agent
if (_state.State.User != null)
{
var agentGrain = GrainFactory.GetGrain<IAgentGrain>(_state.State.User.Id);
await agentGrain.UnregisterBotAsync(_state.State.Identifier);
}
// Clear the state
_tradingBot = null;
await _state.ClearStateAsync();
_logger.LogInformation("LiveTradingBotGrain {GrainId} deleted successfully", this.GetPrimaryKey());
}
catch (InvalidOperationException)
{
// Re-throw InvalidOperationException from StopAsync (open positions check)
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to delete LiveTradingBotGrain {GrainId}", this.GetPrimaryKey());
throw;
}
}
/// <summary>
/// Updates the bot status in the central BotRegistry with retry logic
/// </summary>
private async Task UpdateBotRegistryStatus(BotStatus status)
{
const int maxRetries = 3;
var botId = this.GetPrimaryKey();
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
try
{
var botRegistry = GrainFactory.GetGrain<ILiveBotRegistryGrain>(0);
await botRegistry.UpdateBotStatus(botId, status);
// Verify the update was successful
var actualStatus = await botRegistry.GetBotStatus(botId);
if (actualStatus == status)
{
_logger.LogDebug(
"Bot {BotId} status successfully updated to {Status} in BotRegistry (attempt {Attempt})",
botId, status, attempt);
return;
}
else
{
_logger.LogWarning(
"Bot {BotId} status update verification failed. Expected: {Expected}, Actual: {Actual} (attempt {Attempt})",
botId, status, actualStatus, attempt);
}
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to update bot {BotId} status to {Status} in BotRegistry (attempt {Attempt})",
botId, status, attempt);
if (attempt == maxRetries)
{
throw;
}
}
// Wait before retry
if (attempt < maxRetries)
{
await Task.Delay(TimeSpan.FromMilliseconds(100 * attempt));
}
}
}
public async Task ReceiveReminder(string reminderName, TickStatus status)
{
_logger.LogInformation("Reminder '{ReminderName}' received for grain {GrainId}.", reminderName,
this.GetPrimaryKey());
if (reminderName == _reminderName)
{
// Now a single, clean call to the method that handles all the logic
await ResumeBotIfRequiredAsync();
}
}
/// <summary>
/// Saves the current bot statistics to the database using BotService
/// </summary>
private async Task SaveBotAsync(BotStatus status)
{
try
{
Bot bot = null;
if (_tradingBot == null)
{
// Save bot statistics for saved bots
bot = new Bot
{
Identifier = _state.State.Identifier,
Name = _state.State.Config.Name,
Ticker = _state.State.Config.Ticker,
User = _state.State.User,
Status = status,
CreateDate = _state.State.CreateDate,
StartupTime = _state.State.StartupTime,
TradeWins = 0,
TradeLosses = 0,
Pnl = 0,
Roi = 0,
Volume = 0,
Fees = 0
};
}
else
{
// Ensure we have a User reference; fetch from DB if missing
if (_state.State.User == null)
{
try
{
var existingBot = await ServiceScopeHelpers.WithScopedService<IBotService, Bot>(
_scopeFactory,
async botService => await botService.GetBotByIdentifier(_state.State.Identifier));
if (existingBot?.User != null)
{
_state.State.User = existingBot.User;
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Unable to load user for bot {BotId} while saving stats",
_state.State.Identifier);
}
}
var positionForMetrics = await ServiceScopeHelpers.WithScopedService<ITradingService, List<Position>>(
_scopeFactory,
async tradingService =>
{
return (await tradingService.GetPositionsByInitiatorIdentifierAsync(this.GetPrimaryKey()))
.Where(p => p.IsValidForMetrics()).ToList();
});
// Calculate statistics using TradingBox.CalculateAgentSummaryMetrics
var agentMetrics = TradingBox.CalculateAgentSummaryMetrics(positionForMetrics);
// Calculate long and short position counts
var longPositionCount = positionForMetrics
.Count(p => p.OriginDirection == TradeDirection.Long);
var shortPositionCount = positionForMetrics
.Count(p => p.OriginDirection == TradeDirection.Short);
// Create complete Bot object with all statistics
bot = new Bot
{
Identifier = _state.State.Identifier,
Name = _state.State.Config.Name,
Ticker = _state.State.Config.Ticker,
User = _state.State.User,
Status = status,
StartupTime = _state.State.StartupTime,
LastStartTime = _state.State.LastStartTime,
LastStopTime = _state.State.LastStopTime,
AccumulatedRunTimeSeconds = _state.State.AccumulatedRunTimeSeconds,
CreateDate = _state.State.CreateDate,
TradeWins = agentMetrics.Wins,
TradeLosses = agentMetrics.Losses,
Pnl = agentMetrics.TotalPnL, // Gross PnL before fees
NetPnL = agentMetrics.NetPnL, // Net PnL after fees
Roi = agentMetrics.TotalROI,
Volume = agentMetrics.TotalVolume,
Fees = agentMetrics.TotalFees,
LongPositionCount = longPositionCount,
ShortPositionCount = shortPositionCount
};
}
// Pass the complete Bot object to BotService for saving
var success = await ServiceScopeHelpers.WithScopedService<IBotService, bool>(_scopeFactory,
async (botService) => { return await botService.SaveBotStatisticsAsync(bot); });
if (success)
{
_logger.LogDebug(
"Successfully saved bot statistics for bot {BotId}: Wins={Wins}, Losses={Losses}, PnL={PnL}, ROI={ROI}%, Volume={Volume}, Fees={Fees}, Long={LongPositions}, Short={ShortPositions}",
_state.State.Identifier, bot.TradeWins, bot.TradeLosses, bot.Pnl, bot.Roi, bot.Volume, bot.Fees,
bot.LongPositionCount, bot.ShortPositionCount);
}
else
{
_logger.LogWarning("Failed to save bot statistics for bot {BotId}", _state.State.Identifier);
}
// await ServiceScopeHelpers.WithScopedService<IGrainFactory>(_scopeFactory, async grainFactory =>
// {
// var agentGrain = grainFactory.GetGrain<IAgentGrain>(_state.State.User.Id);
// await agentGrain.ForceUpdateSummary();
// });
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to save bot statistics for bot {BotId}", _state.State.Identifier);
}
}
/// <summary>
/// Notifies the user about swap operations via webhook/telegram
/// </summary>
private async Task NotifyUserAboutSwap(bool isSuccess, decimal amount, string? transactionHash,
string? errorMessage = null)
{
try
{
var message = isSuccess
? $"🔄 Automatic Swap Successful\n\n" +
$"🎯 Bot: {_tradingBot?.Identifier}\n" +
$"💰 Amount: {amount} USDC → ETH\n" +
$"✅ Status: Success\n" +
$"🔗 Transaction: {transactionHash}\n" +
$"⏰ Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC"
: $"❌ Automatic Swap Failed\n\n" +
$"🎯 Bot: {_tradingBot?.Identifier}\n" +
$"💰 Amount: {amount} USDC → ETH\n" +
$"❌ Status: Failed\n" +
$"⚠️ Error: {errorMessage}\n" +
$"⏰ Time: {DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} UTC";
// Send notification via webhook service
await ServiceScopeHelpers.WithScopedService<IWebhookService>(_scopeFactory,
async webhookService =>
{
await webhookService.SendMessage(message, _state.State.User?.TelegramChannel);
});
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send swap notification for bot {BotId}", _tradingBot?.Identifier);
}
}
/// <summary>
/// Pings the bot to reactivate it and ensure reminders are registered
/// Used during startup to reactivate bots that may have lost their reminders
/// The grain activation will automatically handle reminder registration
/// </summary>
public Task<bool> PingAsync()
{
try
{
_logger.LogInformation("Ping received for LiveTradingBotGrain {GrainId}", this.GetPrimaryKey());
// The grain activation (OnActivateAsync) will automatically call ResumeBotIfRequiredAsync()
// which handles checking the registry status and re-registering reminders if needed
// So we just need to return true to indicate the ping was received
return Task.FromResult(true);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during ping for LiveTradingBotGrain {GrainId}", this.GetPrimaryKey());
return Task.FromResult(false);
}
}
/// <summary>
/// Checks if the bot has any open positions
/// Returns true if there are open positions, false otherwise
/// </summary>
public Task<bool> HasOpenPositionsAsync()
{
try
{
if (_tradingBot == null)
{
// For non-running bots, check grain state positions
var hasOpenPositions = _state.State.Positions?.Values.Any(p => p.IsOpen()) ?? false;
_logger.LogDebug("Bot {GrainId} has open positions: {HasOpenPositions} (from grain state)",
this.GetPrimaryKey(), hasOpenPositions);
return Task.FromResult(hasOpenPositions);
}
// For running bots, check live positions
var hasLiveOpenPositions = _tradingBot.Positions?.Values.Any(p => p.IsOpen()) ?? false;
_logger.LogDebug("Bot {GrainId} has open positions: {HasOpenPositions} (from live data)",
this.GetPrimaryKey(), hasLiveOpenPositions);
return Task.FromResult(hasLiveOpenPositions);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error checking open positions for LiveTradingBotGrain {GrainId}",
this.GetPrimaryKey());
return Task.FromResult(false); // Default to false on error to avoid blocking autoswap
}
}
/// <summary>
/// Closes all open positions for this bot before stopping
/// </summary>
private async Task CloseAllOpenPositionsAsync()
{
try
{
var botId = this.GetPrimaryKey();
var positions = await ServiceScopeHelpers.WithScopedService<ITradingService, IEnumerable<Position>>(
_scopeFactory,
async tradingService => await tradingService.GetPositionsByInitiatorIdentifierAsync(botId));
var openPositions = positions?.Where(p => p.IsOpen() || p.Status.Equals(PositionStatus.New)).ToList() ?? new List<Position>();
if (openPositions.Any())
{
_logger.LogInformation(
"Bot {GrainId} has {Count} open positions that will be closed before stopping: {Positions}",
botId, openPositions.Count, string.Join(", ", openPositions.Select(p => p.Identifier)));
foreach (var position in openPositions)
{
try
{
_logger.LogInformation("Closing position {PositionId} for bot {GrainId}", position.Identifier, botId);
await ClosePositionAsync(position.Identifier);
_logger.LogInformation("Successfully closed position {PositionId} for bot {GrainId}", position.Identifier, botId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to close position {PositionId} for bot {GrainId}", position.Identifier, botId);
// Continue with other positions even if one fails
}
}
_logger.LogInformation("Finished closing all open positions for bot {GrainId}", botId);
}
else
{
_logger.LogDebug("Bot {GrainId} has no open positions to close", botId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error closing open positions for bot {GrainId}", this.GetPrimaryKey());
// Don't throw here - we want to continue with the stop process even if position closing fails
}
}
/// <summary>
/// Checks for open positions in database by bot identifier (initiator identifier).
/// This is the source of truth for preventing bot stop when there are unfinished positions.
/// </summary>
private async Task<bool> HasOpenPositionsInDatabaseAsync()
{
try
{
var botId = this.GetPrimaryKey();
var positions = await ServiceScopeHelpers.WithScopedService<ITradingService, IEnumerable<Position>>(
_scopeFactory,
async tradingService => await tradingService.GetPositionsByInitiatorIdentifierAsync(botId));
var hasOpenPositions = positions?.Any(p => p.IsOpen() || p.Status.Equals(PositionStatus.New)) ?? false;
_logger.LogDebug("Bot {GrainId} has open positions in database: {HasOpenPositions}",
botId, hasOpenPositions);
if (hasOpenPositions)
{
var openPositions = positions?.Where(p => p.IsOpen()).ToList() ?? new List<Position>();
_logger.LogWarning(
"Bot {GrainId} cannot be stopped - has {Count} open positions in database: {Positions}",
botId, openPositions.Count, string.Join(", ", openPositions.Select(p => p.Identifier)));
}
return hasOpenPositions;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error checking database positions for bot {GrainId}", this.GetPrimaryKey());
// Default to true on error to err on the side of caution - don't stop bot if we can't verify
return true;
}
}
/// <summary>
/// Gets the user who owns this bot
/// </summary>
public Task<User> GetUserAsync()
{
if (_state.State.User == null)
{
throw new InvalidOperationException(
$"Bot '{_state.State.Config?.Name}' (ID: {_state.State.Identifier}) has no user information.");
}
return Task.FromResult(_state.State.User);
}
}