Files
managing-apps/src/Managing.Application/Bots/Grains/AgentGrain.cs
2025-09-28 20:57:42 +07:00

536 lines
21 KiB
C#

#nullable enable
using Managing.Application.Abstractions;
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Services;
using Managing.Application.Bots.Models;
using Managing.Application.Orleans;
using Managing.Common;
using Managing.Core;
using Managing.Core.Exceptions;
using Managing.Domain.Bots;
using Managing.Domain.Statistics;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using static Managing.Common.Enums;
namespace Managing.Application.Bots.Grains;
/// <summary>
/// Orleans grain for Agent operations.
/// Uses custom trading placement with load balancing and built-in fallback.
/// </summary>
[TradingPlacement] // Use custom trading placement with load balancing
public class AgentGrain : Grain, IAgentGrain
{
private readonly IPersistentState<AgentGrainState> _state;
private readonly ILogger<AgentGrain> _logger;
private readonly IBotService _botService;
private readonly IAgentService _agentService;
private readonly IExchangeService _exchangeService;
private readonly IUserService _userService;
private readonly IAccountService _accountService;
private readonly ITradingService _tradingService;
private readonly IServiceScopeFactory _scopeFactory;
public AgentGrain(
[PersistentState("agent-state", "agent-store")]
IPersistentState<AgentGrainState> state,
ILogger<AgentGrain> logger,
IBotService botService,
IAgentService agentService,
IExchangeService exchangeService,
IUserService userService,
IAccountService accountService,
ITradingService tradingService,
IServiceScopeFactory scopeFactory)
{
_state = state;
_logger = logger;
_botService = botService;
_agentService = agentService;
_exchangeService = exchangeService;
_userService = userService;
_accountService = accountService;
_tradingService = tradingService;
_scopeFactory = scopeFactory;
}
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("AgentGrain activated for user {UserId}", this.GetPrimaryKeyLong());
await base.OnActivateAsync(cancellationToken);
}
public async Task InitializeAsync(int userId, string agentName)
{
_state.State.AgentName = agentName;
await _state.WriteStateAsync();
// Create an empty AgentSummary for the new agent
var emptySummary = new AgentSummary
{
UserId = userId,
AgentName = agentName,
TotalPnL = 0,
TotalROI = 0,
Wins = 0,
Losses = 0,
Runtime = null,
CreatedAt = DateTime.UtcNow,
UpdatedAt = DateTime.UtcNow,
ActiveStrategiesCount = 0,
TotalVolume = 0,
TotalBalance = 0
};
await _agentService.SaveOrUpdateAgentSummary(emptySummary);
_logger.LogInformation("Agent {UserId} initialized with name {AgentName} and empty summary", userId, agentName);
// Notify platform summary about new agent activation
await ServiceScopeHelpers.WithScopedService<IGrainFactory>(_scopeFactory, async grainFactory =>
{
var platformGrain = grainFactory.GetGrain<IPlatformSummaryGrain>("platform-summary");
await platformGrain.IncrementAgentCountAsync();
_logger.LogDebug("Notified platform summary about new agent activation for user {UserId}", userId);
});
}
public async Task UpdateAgentNameAsync(string agentName)
{
_state.State.AgentName = agentName;
await _state.WriteStateAsync();
// Use the efficient method to update only the agent name in the summary
await _agentService.UpdateAgentSummaryNameAsync((int)this.GetPrimaryKeyLong(), agentName);
_logger.LogInformation("Agent {UserId} updated with name {AgentName}", this.GetPrimaryKeyLong(), agentName);
}
public async Task OnPositionOpenedAsync(PositionOpenEvent evt)
{
try
{
_logger.LogInformation("Position opened event received for user {UserId}, position: {PositionId}",
this.GetPrimaryKeyLong(), evt.PositionIdentifier);
await UpdateSummary();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing position opened event for user {UserId}",
this.GetPrimaryKeyLong());
}
}
public async Task OnPositionClosedAsync(PositionClosedEvent evt)
{
try
{
_logger.LogInformation("Position closed event received for user {UserId}, position: {PositionId}, PnL: {PnL}",
this.GetPrimaryKeyLong(), evt.PositionIdentifier, evt.RealizedPnL);
await UpdateSummary();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing position closed event for user {UserId}",
this.GetPrimaryKeyLong());
}
}
public async Task OnPositionUpdatedAsync(PositionUpdatedEvent evt)
{
try
{
_logger.LogInformation("Position updated event received for user {UserId}, position: {PositionId}",
this.GetPrimaryKeyLong(), evt.PositionIdentifier);
await UpdateSummary();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing position updated event for user {UserId}",
this.GetPrimaryKeyLong());
}
}
/// <summary>
/// Updates the agent summary by recalculating from position data (used for initialization or manual refresh)
/// </summary>
[OneWay]
private async Task UpdateSummary()
{
try
{
// Get all positions for this agent's bots as initiator
var positions = await _tradingService.GetPositionByUserIdAsync((int)this.GetPrimaryKeyLong());
// Calculate aggregated statistics from position data
var totalPnL = positions.Sum(p => p.ProfitAndLoss?.Realized ?? 0);
var totalVolume = positions.Sum(p => p.Open.Price * p.Open.Quantity * p.Open.Leverage);
var collateral = positions.Sum(p => p.Open.Price * p.Open.Quantity);
var totalFees = positions.Sum(p => p.CalculateTotalFees());
// Store total fees in grain state for caching
_state.State.TotalFees = totalFees;
// Calculate wins/losses from position PnL
var totalWins = positions.Count(p => (p.ProfitAndLoss?.Realized ?? 0) > 0);
var totalLosses = positions.Count(p => (p.ProfitAndLoss?.Realized ?? 0) <= 0);
// Calculate ROI based on PnL minus fees
var netPnL = totalPnL - totalFees;
var totalROI = collateral switch
{
> 0 => (netPnL / collateral) * 100,
>= 0 => 0,
_ => 0
};
// Calculate total balance (USDC + open positions value)
decimal totalBalance = 0;
try
{
var userId = (int)this.GetPrimaryKeyLong();
var user = await _userService.GetUserByIdAsync(userId);
if (user != null)
{
var userAccounts = await _accountService.GetAccountsByUserAsync(user, hideSecrets: true, true);
foreach (var account in userAccounts)
{
// Get USDC balance
var usdcBalances = await GetOrRefreshBalanceDataAsync(account.Name);
var usdcBalance = usdcBalances?.UsdcValue ?? 0;
totalBalance += usdcBalance;
}
foreach (var position in positions.Where(p => !p.IsFinished()))
{
totalBalance += position.Open.Price * position.Open.Quantity;
totalBalance += position.ProfitAndLoss?.Realized ?? 0;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error calculating total balance for agent {UserId}", this.GetPrimaryKeyLong());
totalBalance = 0; // Set to 0 if calculation fails
}
var bots = await ServiceScopeHelpers.WithScopedService<IBotService, IEnumerable<Bot>> (_scopeFactory, async (botService) => {
return await botService.GetBotsByUser((int)this.GetPrimaryKeyLong());
});
// Calculate Runtime based on the earliest position date
DateTime? runtime = null;
if (positions.Any())
{
runtime = bots.Min(p => p.StartupTime);
}
var activeStrategiesCount = bots.Count(b => b.Status == BotStatus.Running);
var summary = new AgentSummary
{
UserId = (int)this.GetPrimaryKeyLong(),
AgentName = _state.State.AgentName,
TotalPnL = totalPnL, // Use net PnL without fees
Wins = totalWins,
Losses = totalLosses,
TotalROI = totalROI,
Runtime = runtime,
ActiveStrategiesCount = activeStrategiesCount,
TotalVolume = totalVolume,
TotalBalance = totalBalance,
TotalFees = totalFees,
};
// Save summary to database
await _agentService.SaveOrUpdateAgentSummary(summary);
_logger.LogDebug("Updated agent summary from position data for user {UserId}: NetPnL={NetPnL}, TotalPnL={TotalPnL}, Fees={Fees}, Volume={Volume}, Wins={Wins}, Losses={Losses}",
this.GetPrimaryKeyLong(), netPnL, totalPnL, totalFees, totalVolume, totalWins, totalLosses);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error calculating agent summary for user {UserId}", this.GetPrimaryKeyLong());
}
}
public async Task RegisterBotAsync(Guid botId)
{
if (_state.State.BotIds.Add(botId))
{
await _state.WriteStateAsync();
_logger.LogInformation("Bot {BotId} registered to Agent {UserId}", botId, this.GetPrimaryKeyLong());
await UpdateSummary();
}
}
public async Task UnregisterBotAsync(Guid botId)
{
if (_state.State.BotIds.Remove(botId))
{
await _state.WriteStateAsync();
_logger.LogInformation("Bot {BotId} unregistered from Agent {UserId}", botId, this.GetPrimaryKeyLong());
await UpdateSummary();
}
}
public async Task<BalanceCheckResult> CheckAndEnsureEthBalanceAsync(Guid requestingBotId, string accountName)
{
// Check if a swap is already in progress
if (_state.State.IsSwapInProgress)
{
_logger.LogInformation("Swap already in progress for agent {UserId}, bot {RequestingBotId} will wait",
this.GetPrimaryKeyLong(), requestingBotId);
return new BalanceCheckResult
{
IsSuccessful = false,
FailureReason = BalanceCheckFailureReason.SwapInProgress,
Message = "Swap operation already in progress",
ShouldStopBot = false
};
}
// Check cooldown period (5 minutes between swaps)
if (_state.State.LastSwapTime.HasValue &&
DateTime.UtcNow - _state.State.LastSwapTime.Value < TimeSpan.FromMinutes(5))
{
_logger.LogInformation(
"Swap cooldown period active for agent {UserId}, bot {RequestingBotId} will wait",
this.GetPrimaryKeyLong(), requestingBotId);
return new BalanceCheckResult
{
IsSuccessful = false,
FailureReason = BalanceCheckFailureReason.SwapCooldownActive,
Message = "Swap cooldown period active",
ShouldStopBot = false
};
}
// Get or refresh cached balance data
var balanceData = await GetOrRefreshBalanceDataAsync(accountName);
if (balanceData == null)
{
_logger.LogError("Failed to get balance data for account {AccountName}, user {UserId}",
accountName, this.GetPrimaryKeyLong());
return new BalanceCheckResult
{
IsSuccessful = false,
FailureReason = BalanceCheckFailureReason.BalanceFetchError,
Message = "Failed to fetch balance data",
ShouldStopBot = false
};
}
_logger.LogInformation(
"Agent {UserId} balance check - ETH: {EthValue:F2} USD, USDC: {UsdcValue:F2} USD (cached: {IsCached})",
this.GetPrimaryKeyLong(), balanceData.EthValueInUsd, balanceData.UsdcValue,
_state.State.CachedBalanceData?.IsValid == true);
// Check USDC minimum balance first (this will stop the bot if insufficient)
if (balanceData.UsdcValue < Constants.GMX.Config.MinimumPositionAmount)
{
_logger.LogWarning(
"USDC balance is below minimum required amount - ETH: {EthValue:F2} USD, USDC: {UsdcValue:F2} USD (minimum: {Minimum})",
balanceData.EthValueInUsd, balanceData.UsdcValue, Constants.GMX.Config.MinimumPositionAmount);
return new BalanceCheckResult
{
IsSuccessful = false,
FailureReason = BalanceCheckFailureReason.InsufficientUsdcBelowMinimum,
Message =
$"USDC balance below minimum required amount ({Constants.GMX.Config.MinimumPositionAmount} USD)",
ShouldStopBot = true
};
}
// If ETH balance is sufficient, return success
if (balanceData.EthValueInUsd >= Constants.GMX.Config.MinimumTradeEthBalanceUsd)
{
return new BalanceCheckResult
{
IsSuccessful = true,
FailureReason = BalanceCheckFailureReason.None,
Message = "Balance check successful - Enough ETH balance for trading",
ShouldStopBot = false
};
}
if (balanceData.EthValueInUsd < Constants.GMX.Config.MinimumSwapEthBalanceUsd)
{
return new BalanceCheckResult
{
IsSuccessful = false,
FailureReason = BalanceCheckFailureReason.InsufficientEthBelowMinimum,
Message = "ETH balance below minimum required amount",
ShouldStopBot = true
};
}
// Check if we have enough USDC for swap (need at least 5 USD for swap)
if (balanceData.UsdcValue <
(Constants.GMX.Config.MinimumPositionAmount + (decimal)Constants.GMX.Config.AutoSwapAmount))
{
_logger.LogWarning(
"Insufficient USDC balance for swap - ETH: {EthValue:F2} USD, USDC: {UsdcValue:F2} USD (need {AutoSwapAmount} USD for swap)",
balanceData.EthValueInUsd, balanceData.UsdcValue, Constants.GMX.Config.AutoSwapAmount);
return new BalanceCheckResult
{
IsSuccessful = false,
FailureReason = BalanceCheckFailureReason.InsufficientUsdcForSwap,
Message = $"Insufficient USDC balance for swap (need {Constants.GMX.Config.AutoSwapAmount} USD)",
ShouldStopBot = true
};
}
// Mark swap as in progress
_state.State.IsSwapInProgress = true;
await _state.WriteStateAsync();
try
{
_logger.LogInformation("Initiating USDC to ETH swap for agent {UserId} - swapping 5 USDC",
this.GetPrimaryKeyLong());
// Get user for the swap
var userId = (int)this.GetPrimaryKeyLong();
var user = await _userService.GetUserByIdAsync(userId);
// Perform the swap
var swapInfo = await _accountService.SwapGmxTokensAsync(user, accountName,
Ticker.USDC, Ticker.ETH, Constants.GMX.Config.AutoSwapAmount);
if (swapInfo.Success)
{
_logger.LogInformation(
"Successfully swapped 5 USDC to ETH for agent {UserId}, transaction hash: {Hash}",
userId, swapInfo.Hash);
// Update last swap time and invalidate cache
_state.State.LastSwapTime = DateTime.UtcNow;
_state.State.CachedBalanceData = null; // Invalidate cache after successful swap
return new BalanceCheckResult
{
IsSuccessful = true,
FailureReason = BalanceCheckFailureReason.None,
Message = "Swap completed successfully",
ShouldStopBot = false
};
}
else
{
_logger.LogError("Failed to swap USDC to ETH for agent {UserId}: {Error}",
userId, swapInfo.Error ?? swapInfo.Message);
return new BalanceCheckResult
{
IsSuccessful = false,
FailureReason = BalanceCheckFailureReason.SwapExecutionError,
Message = swapInfo.Error ?? swapInfo.Message ?? "Swap execution failed",
ShouldStopBot = true
};
}
}
catch (InvalidOperationException ex) when (ex.InnerException is InsufficientFundsException insufficientFundsEx)
{
// Handle allowance exception (insufficient ETH for gas fees)
_logger.LogError(insufficientFundsEx,
"Insufficient funds during autoswap for agent {UserId}: {ErrorMessage}",
this.GetPrimaryKeyLong(), insufficientFundsEx.Message);
return new BalanceCheckResult
{
IsSuccessful = false,
FailureReason = BalanceCheckFailureReason.SwapExecutionError,
Message = insufficientFundsEx.UserMessage ??
"Insufficient ETH for gas fees during autoswap. Bot cannot continue trading.",
ShouldStopBot = true
};
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during autoswap for agent {UserId}, bot {RequestingBotId}",
this.GetPrimaryKeyLong(), requestingBotId);
return new BalanceCheckResult
{
IsSuccessful = false,
FailureReason = BalanceCheckFailureReason.SwapExecutionError,
Message = ex.Message,
ShouldStopBot = true
};
}
finally
{
// Always clear the swap in progress flag
_state.State.IsSwapInProgress = false;
await _state.WriteStateAsync();
}
}
/// <summary>
/// Gets cached balance data or fetches fresh data if cache is invalid/expired
/// </summary>
private async Task<CachedBalanceData?> GetOrRefreshBalanceDataAsync(string accountName)
{
try
{
// Check if we have valid cached data for the same account
if (_state.State.CachedBalanceData?.IsValid == true &&
_state.State.CachedBalanceData.AccountName == accountName)
{
_logger.LogDebug("Using cached balance data for account {AccountName}", accountName);
return _state.State.CachedBalanceData;
}
// Fetch fresh balance data
_logger.LogInformation("Fetching fresh balance data for account {AccountName}", accountName);
var userId = (int)this.GetPrimaryKeyLong();
var user = await _userService.GetUserByIdAsync(userId);
if (user == null)
{
_logger.LogError("User {UserId} not found for balance check", userId);
return null;
}
var userAccounts = await _accountService.GetAccountsByUserAsync(user, hideSecrets: true, true);
var account = userAccounts.FirstOrDefault(a => a.Name == accountName);
if (account == null)
{
_logger.LogError("Account {AccountName} not found for user {UserId}", accountName, userId);
return null;
}
// Get current balances
var balances = await _exchangeService.GetBalances(account);
var ethBalance = balances.FirstOrDefault(b => b.TokenName?.ToUpper() == "ETH");
var usdcBalance = balances.FirstOrDefault(b => b.TokenName?.ToUpper() == "USDC");
var ethValueInUsd = ethBalance?.Amount * ethBalance?.Price ?? 0;
var usdcValue = usdcBalance?.Value ?? 0;
// Cache the balance data
var balanceData = new CachedBalanceData
{
LastFetched = DateTime.UtcNow,
AccountName = accountName,
EthValueInUsd = ethValueInUsd,
UsdcValue = usdcValue
};
_state.State.CachedBalanceData = balanceData;
await _state.WriteStateAsync();
return balanceData;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error fetching balance data for account {AccountName}, user {UserId}",
accountName, this.GetPrimaryKeyLong());
return null;
}
}
}