#nullable enable using Managing.Application.Abstractions; using Managing.Application.Abstractions.Grains; using Managing.Application.Abstractions.Repositories; using Managing.Application.Abstractions.Services; using Managing.Application.Bots.Models; using Managing.Application.Orleans; using Managing.Common; using Managing.Core; using Managing.Core.Exceptions; using Managing.Domain.Bots; using Managing.Domain.Shared.Helpers; using Managing.Domain.Statistics; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Orleans.Concurrency; using static Managing.Common.Enums; namespace Managing.Application.Bots.Grains; /// /// Orleans grain for Agent operations. /// Uses custom trading placement with load balancing and built-in fallback. /// [TradingPlacement] // Use custom trading placement with load balancing public class AgentGrain : Grain, IAgentGrain { private readonly IPersistentState _state; private readonly ILogger _logger; private readonly IBotService _botService; private readonly IAgentService _agentService; private readonly IExchangeService _exchangeService; private readonly IUserService _userService; private readonly IAccountService _accountService; private readonly ITradingService _tradingService; private readonly IAgentBalanceRepository _agentBalanceRepository; private readonly IServiceScopeFactory _scopeFactory; public AgentGrain( [PersistentState("agent-state", "agent-store")] IPersistentState state, ILogger logger, IBotService botService, IAgentService agentService, IExchangeService exchangeService, IUserService userService, IAccountService accountService, ITradingService tradingService, IAgentBalanceRepository agentBalanceRepository, IServiceScopeFactory scopeFactory) { _state = state; _logger = logger; _botService = botService; _agentService = agentService; _exchangeService = exchangeService; _userService = userService; _accountService = accountService; _tradingService = tradingService; _agentBalanceRepository = agentBalanceRepository; _scopeFactory = scopeFactory; } public override async Task OnActivateAsync(CancellationToken cancellationToken) { _logger.LogInformation("AgentGrain activated for user {UserId}", this.GetPrimaryKeyLong()); await base.OnActivateAsync(cancellationToken); } public async Task InitializeAsync(int userId, string agentName) { _state.State.AgentName = agentName; await _state.WriteStateAsync(); // Create an empty AgentSummary for the new agent var emptySummary = new AgentSummary { UserId = userId, AgentName = agentName, TotalPnL = 0, TotalROI = 0, Wins = 0, Losses = 0, Runtime = null, CreatedAt = DateTime.UtcNow, UpdatedAt = DateTime.UtcNow, ActiveStrategiesCount = 0, TotalVolume = 0, TotalBalance = 0, TotalFees = 0 }; await _agentService.SaveOrUpdateAgentSummary(emptySummary); _logger.LogInformation("Agent {UserId} initialized with name {AgentName} and empty summary", userId, agentName); // Notify platform summary about new agent activation await ServiceScopeHelpers.WithScopedService(_scopeFactory, async grainFactory => { var platformGrain = grainFactory.GetGrain("platform-summary"); await platformGrain.RefreshAgentCountAsync(); _logger.LogDebug("Notified platform summary about new agent activation for user {UserId}", userId); }); } public async Task UpdateAgentNameAsync(string agentName) { _state.State.AgentName = agentName; await _state.WriteStateAsync(); // Use the efficient method to update only the agent name in the summary await _agentService.UpdateAgentSummaryNameAsync((int)this.GetPrimaryKeyLong(), agentName); _logger.LogInformation("Agent {UserId} updated with name {AgentName}", this.GetPrimaryKeyLong(), agentName); } public async Task OnPositionOpenedAsync(PositionOpenEvent evt) { try { _logger.LogInformation("Position opened event received for user {UserId}, position: {PositionId}", this.GetPrimaryKeyLong(), evt.PositionIdentifier); await UpdateSummary(); } catch (Exception ex) { _logger.LogError(ex, "Error processing position opened event for user {UserId}", this.GetPrimaryKeyLong()); } } public async Task OnPositionClosedAsync(PositionClosedEvent evt) { try { _logger.LogInformation( "Position closed event received for user {UserId}, position: {PositionId}, PnL: {PnL}", this.GetPrimaryKeyLong(), evt.PositionIdentifier, evt.RealizedPnL); await UpdateSummary(); } catch (Exception ex) { _logger.LogError(ex, "Error processing position closed event for user {UserId}", this.GetPrimaryKeyLong()); } } public async Task OnPositionUpdatedAsync(PositionUpdatedEvent evt) { try { _logger.LogInformation("Position updated event received for user {UserId}, position: {PositionId}", this.GetPrimaryKeyLong(), evt.PositionIdentifier); await UpdateSummary(); } catch (Exception ex) { _logger.LogError(ex, "Error processing position updated event for user {UserId}", this.GetPrimaryKeyLong()); } } [OneWay] public async Task ForceUpdateSummary() { // Check if last update was more than 2 minutes ago if (_state.State.LastSummaryUpdateTime.HasValue && DateTime.UtcNow - _state.State.LastSummaryUpdateTime.Value < TimeSpan.FromMinutes(2)) { _logger.LogDebug("Skipping summary update for agent {UserId} - last update was {TimeAgo} ago", this.GetPrimaryKeyLong(), DateTime.UtcNow - _state.State.LastSummaryUpdateTime.Value); return; } await UpdateSummary(); } /// /// Forces an immediate update of the agent summary without cooldown check (for critical updates like after topup) /// Invalidates cached balance data to ensure fresh balance fetch /// public async Task ForceUpdateSummaryImmediate() { // Invalidate cached balance data to force fresh fetch _state.State.CachedBalanceData = null; await _state.WriteStateAsync(); _logger.LogInformation("Force updating agent summary immediately for user {UserId} (cache invalidated)", this.GetPrimaryKeyLong()); // Update summary immediately without cooldown check await UpdateSummary(); } /// /// Updates the agent summary by recalculating from position data (used for initialization or manual refresh) /// [OneWay] public async Task UpdateSummary() { try { // Get all positions for this agent's bots as initiator var positions = (await _tradingService.GetPositionByUserIdAsync((int)this.GetPrimaryKeyLong())) .Where(p => p.IsValidForMetrics()).ToList(); var metrics = TradingBox.CalculateAgentSummaryMetrics(positions); // Store total fees in grain state for caching _state.State.TotalFees = metrics.TotalFees; // Calculate total balance (USDC wallet + USDC in open positions value) decimal totalBalance = 0; decimal usdcWalletValue = 0; decimal usdcInPositionsValue = 0; try { var userId = (int)this.GetPrimaryKeyLong(); var user = await _userService.GetUserByIdAsync(userId); var userAccounts = await _accountService.GetAccountsByUserAsync(user, hideSecrets: true, true); foreach (var account in userAccounts) { // Get USDC balance var usdcBalances = await GetOrRefreshBalanceDataAsync(account.Name); var usdcBalance = usdcBalances?.UsdcValue ?? 0; usdcWalletValue += usdcBalance; } foreach (var position in positions.Where(p => p.IsOpen())) { var positionUsd = position.Open.Price * position.Open.Quantity; var net = position.ProfitAndLoss?.Net ?? 0; usdcInPositionsValue += positionUsd + net; } totalBalance = usdcWalletValue + usdcInPositionsValue; } catch (Exception ex) { _logger.LogError(ex, "Error calculating total balance for agent {UserId}", this.GetPrimaryKeyLong()); totalBalance = 0; // Set to 0 if calculation fails usdcWalletValue = 0; usdcInPositionsValue = 0; } var activeStrategies = await ServiceScopeHelpers.WithScopedService>(_scopeFactory, async (botService) => { return (await botService.GetBotsByUser((int)this.GetPrimaryKeyLong())) .Where(b => b.Status == BotStatus.Running) .ToList(); }); // Calculate Runtime based on the earliest position date DateTime? runtime = null; var botsAllocationUsdValue = 0m; if (activeStrategies.Any()) { runtime = activeStrategies.Min(p => p.StartupTime); // Calculate bots allocation USD value directly from Bot entities (avoid calling grains to prevent deadlocks) // Bot entities already contain BotTradingBalance from the database botsAllocationUsdValue = activeStrategies.Sum(bot => bot.BotTradingBalance); } var summary = new AgentSummary { UserId = (int)this.GetPrimaryKeyLong(), AgentName = _state.State.AgentName, TotalPnL = metrics.TotalPnL, // Gross PnL before fees NetPnL = metrics.NetPnL, // Net PnL after fees Wins = metrics.Wins, Losses = metrics.Losses, TotalROI = metrics.TotalROI, Runtime = runtime, ActiveStrategiesCount = activeStrategies.Count(), TotalVolume = metrics.TotalVolume, TotalBalance = totalBalance, TotalFees = metrics.TotalFees, }; // Save summary to database await _agentService.SaveOrUpdateAgentSummary(summary); // Update last summary update time _state.State.LastSummaryUpdateTime = DateTime.UtcNow; await _state.WriteStateAsync(); // Insert balance tracking data InsertBalanceTrackingData(totalBalance, botsAllocationUsdValue, metrics.NetPnL, usdcWalletValue, usdcInPositionsValue); _logger.LogDebug( "Updated agent summary from position data for user {UserId}: NetPnL={NetPnL}, TotalPnL={TotalPnL}, Fees={Fees}, Volume={Volume}, Wins={Wins}, Losses={Losses}", this.GetPrimaryKeyLong(), metrics.NetPnL, metrics.TotalPnL, metrics.TotalFees, metrics.TotalVolume, metrics.Wins, metrics.Losses); } catch (Exception ex) { _logger.LogError(ex, "Error calculating agent summary for user {UserId}", this.GetPrimaryKeyLong()); } } public async Task RegisterBotAsync(Guid botId) { if (_state.State.BotIds.Add(botId)) { await _state.WriteStateAsync(); _logger.LogInformation("Bot {BotId} registered to Agent {UserId}", botId, this.GetPrimaryKeyLong()); await UpdateSummary(); } } public async Task UnregisterBotAsync(Guid botId) { if (_state.State.BotIds.Remove(botId)) { await _state.WriteStateAsync(); _logger.LogInformation("Bot {BotId} unregistered from Agent {UserId}", botId, this.GetPrimaryKeyLong()); await UpdateSummary(); } } public async Task CheckAndEnsureEthBalanceAsync(Guid requestingBotId, string accountName) { // Get user settings var userId = (int)this.GetPrimaryKeyLong(); var user = await _userService.GetUserByIdAsync(userId); // Check if a swap is already in progress if (_state.State.IsSwapInProgress) { _logger.LogInformation("Swap already in progress for agent {UserId}, bot {RequestingBotId} will wait", this.GetPrimaryKeyLong(), requestingBotId); return new BalanceCheckResult { IsSuccessful = false, FailureReason = BalanceCheckFailureReason.SwapInProgress, Message = "Swap operation already in progress", ShouldStopBot = false }; } // Check cooldown period (5 minutes between swaps) if (_state.State.LastSwapTime.HasValue && DateTime.UtcNow - _state.State.LastSwapTime.Value < TimeSpan.FromMinutes(5)) { _logger.LogInformation( "Swap cooldown period active for agent {UserId}, bot {RequestingBotId} will wait", this.GetPrimaryKeyLong(), requestingBotId); return new BalanceCheckResult { IsSuccessful = false, FailureReason = BalanceCheckFailureReason.SwapCooldownActive, Message = "Swap cooldown period active", ShouldStopBot = false }; } // Get or refresh cached balance data var balanceData = await GetOrRefreshBalanceDataAsync(accountName); if (balanceData == null || balanceData.IsValid == false) { _logger.LogError("Failed to get balance data for account {AccountName}, user {UserId}", accountName, this.GetPrimaryKeyLong()); return new BalanceCheckResult { IsSuccessful = false, FailureReason = BalanceCheckFailureReason.BalanceFetchError, Message = "Failed to fetch balance data", ShouldStopBot = false }; } // Check low ETH amount alert threshold var lowEthAlertThreshold = user.LowEthAmountAlert ?? Constants.GMX.Config.MinimumTradeEthBalanceUsd; if (balanceData.EthValueInUsd < lowEthAlertThreshold) { _logger.LogWarning( "ETH balance below alert threshold for user {UserId} - ETH: {EthValue:F2} USD (threshold: {Threshold:F2} USD)", userId, balanceData.EthValueInUsd, lowEthAlertThreshold); } _logger.LogInformation( "Agent {UserId} balance check - ETH: {EthValue:F2} USD, USDC: {UsdcValue:F2} USD (cached: {IsCached})", this.GetPrimaryKeyLong(), balanceData.EthValueInUsd, balanceData.UsdcValue, _state.State.CachedBalanceData?.IsValid == true); // Check USDC minimum balance first (this will stop the bot if insufficient) if (balanceData.UsdcValue < Constants.GMX.Config.MinimumPositionAmount) { _logger.LogWarning( "USDC balance is below minimum required amount - ETH: {EthValue:F2} USD, USDC: {UsdcValue:F2} USD (minimum: {Minimum})", balanceData.EthValueInUsd, balanceData.UsdcValue, Constants.GMX.Config.MinimumPositionAmount); return new BalanceCheckResult { IsSuccessful = false, FailureReason = BalanceCheckFailureReason.InsufficientUsdcBelowMinimum, Message = $"USDC balance below minimum required amount ({Constants.GMX.Config.MinimumPositionAmount} USD)", ShouldStopBot = true }; } // If ETH balance is sufficient, return success // Use user's low ETH alert threshold as the minimum trading balance var minTradeEthBalance = user.LowEthAmountAlert ?? Constants.GMX.Config.MinimumTradeEthBalanceUsd; if (balanceData.EthValueInUsd >= minTradeEthBalance) { return new BalanceCheckResult { IsSuccessful = true, FailureReason = BalanceCheckFailureReason.None, Message = "Balance check successful - Enough ETH balance for trading", ShouldStopBot = false }; } // Check if ETH is below absolute minimum (half of the alert threshold) var minSwapEthBalance = minTradeEthBalance * 0.67m; // 67% of alert threshold if (balanceData.EthValueInUsd < minSwapEthBalance) { return new BalanceCheckResult { IsSuccessful = false, FailureReason = BalanceCheckFailureReason.InsufficientEthBelowMinimum, Message = $"ETH balance below minimum required amount ({minSwapEthBalance:F2} USD)", ShouldStopBot = true }; } // Check if autoswap is enabled for this user if (!user.EnableAutoswap) { _logger.LogInformation("Autoswap is disabled for user {UserId}, skipping swap", userId); return new BalanceCheckResult { IsSuccessful = false, FailureReason = BalanceCheckFailureReason.None, Message = "Autoswap is disabled for this user", ShouldStopBot = false }; } // Get autoswap amount from user settings or use default var autoswapAmount = user.AutoswapAmount ?? (decimal)Constants.GMX.Config.AutoSwapAmount; // Check if we have enough USDC for swap if (balanceData.UsdcValue < (Constants.GMX.Config.MinimumPositionAmount + autoswapAmount)) { _logger.LogWarning( "Insufficient USDC balance for swap - ETH: {EthValue:F2} USD, USDC: {UsdcValue:F2} USD (need {AutoSwapAmount} USD for swap)", balanceData.EthValueInUsd, balanceData.UsdcValue, autoswapAmount); return new BalanceCheckResult { IsSuccessful = false, FailureReason = BalanceCheckFailureReason.InsufficientUsdcForSwap, Message = $"Insufficient USDC balance for swap (need {autoswapAmount} USD)", ShouldStopBot = true }; } // Check if any bot has open positions before executing autoswap var hasOpenPositions = await HasAnyBotWithOpenPositionsAsync(); if (hasOpenPositions) { _logger.LogWarning( "Cannot execute autoswap - ETH: {EthValue:F2} USD, USDC: {UsdcValue:F2} USD (bots have open positions)", balanceData.EthValueInUsd, balanceData.UsdcValue); return new BalanceCheckResult { IsSuccessful = false, FailureReason = BalanceCheckFailureReason.BotsHaveOpenPositions, Message = "Cannot execute autoswap while bots have open positions", ShouldStopBot = false // Don't stop the bot, just skip this execution cycle }; } // Mark swap as in progress _state.State.IsSwapInProgress = true; await _state.WriteStateAsync(); try { _logger.LogInformation("Initiating USDC to ETH swap for agent {UserId} - swapping {Amount} USDC", this.GetPrimaryKeyLong(), autoswapAmount); // Perform the swap using user's autoswap amount var swapInfo = await _tradingService.SwapGmxTokensAsync(user, accountName, Ticker.USDC, Ticker.ETH, (double)autoswapAmount); if (swapInfo.Success) { _logger.LogInformation( "Successfully swapped {Amount} USDC to ETH for agent {UserId}, transaction hash: {Hash}", autoswapAmount, userId, swapInfo.Hash); // Update last swap time and invalidate cache _state.State.LastSwapTime = DateTime.UtcNow; _state.State.CachedBalanceData = null; // Invalidate cache after successful swap return new BalanceCheckResult { IsSuccessful = true, FailureReason = BalanceCheckFailureReason.None, Message = "Swap completed successfully", ShouldStopBot = false }; } else { _logger.LogError("Failed to swap USDC to ETH for agent {UserId}: {Error}", userId, swapInfo.Error ?? swapInfo.Message); return new BalanceCheckResult { IsSuccessful = false, FailureReason = BalanceCheckFailureReason.SwapExecutionError, Message = swapInfo.Error ?? swapInfo.Message ?? "Swap execution failed", ShouldStopBot = true }; } } catch (InvalidOperationException ex) when (ex.InnerException is InsufficientFundsException insufficientFundsEx) { // Handle allowance exception (insufficient ETH for gas fees) _logger.LogError(insufficientFundsEx, "Insufficient funds during autoswap for agent {UserId}: {ErrorMessage}", this.GetPrimaryKeyLong(), insufficientFundsEx.Message); return new BalanceCheckResult { IsSuccessful = false, FailureReason = BalanceCheckFailureReason.SwapExecutionError, Message = insufficientFundsEx.UserMessage ?? "Insufficient ETH for gas fees during autoswap. Bot cannot continue trading.", ShouldStopBot = true }; } catch (Exception ex) { _logger.LogError(ex, "Error during autoswap for agent {UserId}, bot {RequestingBotId}", this.GetPrimaryKeyLong(), requestingBotId); return new BalanceCheckResult { IsSuccessful = false, FailureReason = BalanceCheckFailureReason.SwapExecutionError, Message = ex.Message, ShouldStopBot = true }; } finally { // Always clear the swap in progress flag _state.State.IsSwapInProgress = false; await _state.WriteStateAsync(); } } /// /// Checks if any of the user's bots have open positions /// private async Task HasAnyBotWithOpenPositionsAsync() { try { // Get all bot IDs for this user from the registry var botRegistry = GrainFactory.GetGrain(0); var userBots = await botRegistry.GetBotsForUser((int)this.GetPrimaryKeyLong()); if (!userBots.Any()) { _logger.LogDebug("No bots found for user {UserId}", this.GetPrimaryKeyLong()); return false; } // Check each bot for open positions foreach (var botEntry in userBots.Where(b => b.Status == BotStatus.Running)) { try { var botGrain = GrainFactory.GetGrain(botEntry.Identifier); var hasOpenPositions = await botGrain.HasOpenPositionsAsync(); if (hasOpenPositions) { _logger.LogInformation("Bot {BotId} has open positions, blocking autoswap for user {UserId}", botEntry.Identifier, this.GetPrimaryKeyLong()); return true; } } catch (Exception ex) { _logger.LogWarning(ex, "Error checking open positions for bot {BotId}, skipping", botEntry.Identifier); // Continue checking other bots even if one fails } } _logger.LogDebug("No bots with open positions found for user {UserId}", this.GetPrimaryKeyLong()); return false; } catch (Exception ex) { _logger.LogError(ex, "Error checking for open positions across all bots for user {UserId}", this.GetPrimaryKeyLong()); return false; // Default to false on error to avoid blocking autoswap } } /// /// Gets cached balance data or fetches fresh data if cache is invalid/expired /// private async Task GetOrRefreshBalanceDataAsync(string accountName) { try { // Check if we have valid cached data for the same account if (_state.State.CachedBalanceData?.IsValid == true && _state.State.CachedBalanceData.AccountName == accountName) { _logger.LogDebug("Using cached balance data for account {AccountName}", accountName); return _state.State.CachedBalanceData; } // Fetch fresh balance data _logger.LogInformation("Fetching fresh balance data for account {AccountName}", accountName); var userId = (int)this.GetPrimaryKeyLong(); var user = await _userService.GetUserByIdAsync(userId); if (user == null) { _logger.LogError("User {UserId} not found for balance check", userId); return null; } var userAccounts = await _accountService.GetAccountsByUserAsync(user, hideSecrets: true, false); var account = userAccounts.FirstOrDefault(a => a.Name == accountName); if (account == null) { _logger.LogError("Account {AccountName} not found for user {UserId}", accountName, userId); return null; } // Get current balances var balances = await _exchangeService.GetBalances(account); var ethBalance = balances.FirstOrDefault(b => b.TokenName?.ToUpper() == "ETH"); var usdcBalance = balances.FirstOrDefault(b => b.TokenName?.ToUpper() == "USDC"); var ethValueInUsd = ethBalance?.Amount * ethBalance?.Price ?? 0; var usdc = usdcBalance?.Amount ?? 0; // Cache the balance data var balanceData = new CachedBalanceData { LastFetched = DateTime.UtcNow, AccountName = accountName, EthValueInUsd = ethValueInUsd, UsdcValue = usdc }; _state.State.CachedBalanceData = balanceData; await _state.WriteStateAsync(); return balanceData; } catch (Exception ex) { _logger.LogError(ex, "Error fetching balance data for account {AccountName}, user {UserId}", accountName, this.GetPrimaryKeyLong()); throw; } } /// /// Inserts balance tracking data into the AgentBalanceRepository /// private void InsertBalanceTrackingData(decimal totalAccountUsdValue, decimal botsAllocationUsdValue, decimal pnl, decimal usdcWalletValue, decimal usdcInPositionsValue) { try { var agentBalance = new AgentBalance { UserId = (int)this.GetPrimaryKeyLong(), TotalBalanceValue = totalAccountUsdValue, UsdcWalletValue = usdcWalletValue, UsdcInPositionsValue = usdcInPositionsValue, BotsAllocationUsdValue = botsAllocationUsdValue, PnL = pnl, Time = DateTime.UtcNow }; _agentBalanceRepository.InsertAgentBalance(agentBalance); _logger.LogDebug( "Inserted balance tracking data for user {UserId}: TotalBalanceValue={TotalBalanceValue}, BotsAllocationUsdValue={BotsAllocationUsdValue}, PnL={PnL}", agentBalance.UserId, agentBalance.TotalBalanceValue, agentBalance.BotsAllocationUsdValue, agentBalance.PnL); } catch (Exception ex) { _logger.LogError(ex, "Error inserting balance tracking data for user {UserId}", (int)this.GetPrimaryKeyLong()); } } }