From ffb98fe3597c998d70733fb147aa5f6293ad18a3 Mon Sep 17 00:00:00 2001 From: cryptooda Date: Wed, 8 Oct 2025 21:32:48 +0700 Subject: [PATCH] Fix update agent save + revert market in redis --- .../AgentGrainTests.cs | 4 + src/Managing.Application.Tests/BotsTests.cs | 5 +- .../Abstractions/Grains/IAgentGrain.cs | 6 + .../Bots/Grains/AgentGrain.cs | 19 ++ .../Bots/Grains/LiveTradingBotGrain.cs | 8 +- .../Bots/Models/AgentGrainState.cs | 6 + .../src/plugins/custom/gmx.ts | 169 ++++++------------ 7 files changed, 100 insertions(+), 117 deletions(-) diff --git a/src/Managing.Application.Tests/AgentGrainTests.cs b/src/Managing.Application.Tests/AgentGrainTests.cs index 82ddfaa0..ae2c200a 100644 --- a/src/Managing.Application.Tests/AgentGrainTests.cs +++ b/src/Managing.Application.Tests/AgentGrainTests.cs @@ -1,4 +1,5 @@ using Managing.Application.Abstractions; +using Managing.Application.Abstractions.Repositories; using Managing.Application.Abstractions.Services; using Managing.Application.Bots.Grains; using Managing.Application.Bots.Models; @@ -22,6 +23,7 @@ public class AgentGrainTests private readonly Mock _mockAccountService; private readonly Mock _mockTradingService; private readonly Mock _mockScopeFactory; + private readonly Mock _mockAgentBalanceRepository; public AgentGrainTests() { @@ -34,6 +36,7 @@ public class AgentGrainTests _mockAccountService = new Mock(); _mockTradingService = new Mock(); _mockScopeFactory = new Mock(); + _mockAgentBalanceRepository = new Mock(); // Setup default state _mockState.Setup(x => x.State).Returns(new AgentGrainState @@ -94,6 +97,7 @@ public class AgentGrainTests _mockUserService.Object, _mockAccountService.Object, _mockTradingService.Object, + _mockAgentBalanceRepository.Object, _mockScopeFactory.Object); } } \ No newline at end of file diff --git a/src/Managing.Application.Tests/BotsTests.cs b/src/Managing.Application.Tests/BotsTests.cs index 5200d220..8eaf947d 100644 --- a/src/Managing.Application.Tests/BotsTests.cs +++ b/src/Managing.Application.Tests/BotsTests.cs @@ -13,6 +13,7 @@ using Managing.Domain.Scenarios; using Managing.Domain.Strategies; using Managing.Domain.Strategies.Signals; using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.DependencyInjection; using Moq; using Newtonsoft.Json; using Xunit; @@ -41,8 +42,10 @@ namespace Managing.Application.Tests var backtestLogger = TradingBaseTests.CreateBacktesterLogger(); var botService = new Mock().Object; var agentService = new Mock().Object; + var _scopeFactory = new Mock(); _backtester = new Backtester(_exchangeService, backtestRepository, backtestLogger, - scenarioService, _accountService.Object, messengerService, kaigenService, hubContext, null, agentService); + scenarioService, _accountService.Object, messengerService, kaigenService, hubContext, null, + _scopeFactory.Object); _elapsedTimes = new List(); // Initialize cross-platform file paths diff --git a/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs b/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs index de39d0ae..41410f79 100644 --- a/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs +++ b/src/Managing.Application/Abstractions/Grains/IAgentGrain.cs @@ -61,5 +61,11 @@ namespace Managing.Application.Abstractions.Grains /// The account name to check balances for /// BalanceCheckResult indicating the status and reason for any failure Task CheckAndEnsureEthBalanceAsync(Guid requestingBotId, string accountName); + + /// + /// Forces an update of the agent summary. + /// + [OneWay] + Task ForceUpdateSummary(); } } \ No newline at end of file diff --git a/src/Managing.Application/Bots/Grains/AgentGrain.cs b/src/Managing.Application/Bots/Grains/AgentGrain.cs index 2b970674..9a5d80c3 100644 --- a/src/Managing.Application/Bots/Grains/AgentGrain.cs +++ b/src/Managing.Application/Bots/Grains/AgentGrain.cs @@ -161,6 +161,21 @@ public class AgentGrain : Grain, IAgentGrain } } + [OneWay] + public async Task ForceUpdateSummary() + { + // Check if last update was more than 2 minutes ago + if (_state.State.LastSummaryUpdateTime.HasValue && + DateTime.UtcNow - _state.State.LastSummaryUpdateTime.Value < TimeSpan.FromMinutes(2)) + { + _logger.LogDebug("Skipping summary update for agent {UserId} - last update was {TimeAgo} ago", + this.GetPrimaryKeyLong(), DateTime.UtcNow - _state.State.LastSummaryUpdateTime.Value); + return; + } + + await UpdateSummary(); + } + /// /// Updates the agent summary by recalculating from position data (used for initialization or manual refresh) /// @@ -274,6 +289,10 @@ public class AgentGrain : Grain, IAgentGrain // Save summary to database await _agentService.SaveOrUpdateAgentSummary(summary); + // Update last summary update time + _state.State.LastSummaryUpdateTime = DateTime.UtcNow; + await _state.WriteStateAsync(); + // Insert balance tracking data InsertBalanceTrackingData(totalBalance, botsAllocationUsdValue, netPnL, usdcWalletValue, usdcInPositionsValue); diff --git a/src/Managing.Application/Bots/Grains/LiveTradingBotGrain.cs b/src/Managing.Application/Bots/Grains/LiveTradingBotGrain.cs index 2af11657..bc8741d4 100644 --- a/src/Managing.Application/Bots/Grains/LiveTradingBotGrain.cs +++ b/src/Managing.Application/Bots/Grains/LiveTradingBotGrain.cs @@ -845,7 +845,7 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable // Pass the complete Bot object to BotService for saving var success = await ServiceScopeHelpers.WithScopedService(_scopeFactory, async (botService) => { return await botService.SaveBotStatisticsAsync(bot); }); - + if (success) { _logger.LogDebug( @@ -857,6 +857,12 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable { _logger.LogWarning("Failed to save bot statistics for bot {BotId}", _state.State.Identifier); } + + // await ServiceScopeHelpers.WithScopedService(_scopeFactory, async grainFactory => + // { + // var agentGrain = grainFactory.GetGrain(_state.State.User.Id); + // await agentGrain.ForceUpdateSummary(); + // }); } catch (Exception ex) { diff --git a/src/Managing.Application/Bots/Models/AgentGrainState.cs b/src/Managing.Application/Bots/Models/AgentGrainState.cs index 6bd9bc8e..dfa7bed6 100644 --- a/src/Managing.Application/Bots/Models/AgentGrainState.cs +++ b/src/Managing.Application/Bots/Models/AgentGrainState.cs @@ -32,6 +32,12 @@ namespace Managing.Application.Bots.Models /// [Id(5)] public decimal TotalFees { get; set; } = 0; + + /// + /// Timestamp of the last summary update to implement update throttling + /// + [Id(6)] + public DateTime? LastSummaryUpdateTime { get; set; } = null; } /// diff --git a/src/Managing.Web3Proxy/src/plugins/custom/gmx.ts b/src/Managing.Web3Proxy/src/plugins/custom/gmx.ts index 0c62a9c2..d1650555 100644 --- a/src/Managing.Web3Proxy/src/plugins/custom/gmx.ts +++ b/src/Managing.Web3Proxy/src/plugins/custom/gmx.ts @@ -2,7 +2,6 @@ import fp from 'fastify-plugin' import {FastifyReply, FastifyRequest} from 'fastify' import {z} from 'zod' import {GmxSdk} from '../../generated/gmxsdk/index.js' -import {createClient} from 'redis' import {arbitrum} from 'viem/chains'; import {getTokenBySymbol} from '../../generated/gmxsdk/configs/tokens.js'; @@ -51,11 +50,32 @@ interface CacheEntry { timestamp: number; } -const CACHE_TTL = 30 * 60; // 30 minutes in seconds (Redis TTL) +const CACHE_TTL = 30 * 60 * 1000; // 30 minutes in milliseconds +const marketsCache = new Map(); +const MAX_CACHE_SIZE = 5; // Limit cache size to prevent memory issues const OPERATION_TIMEOUT = 30000; // 30 seconds timeout for operations +const MEMORY_WARNING_THRESHOLD = 0.8; // Warn when memory usage exceeds 80% const MAX_GAS_FEE_USD = 1.5; // Maximum gas fee in USD (1 USDC) +// Memory monitoring function +function checkMemoryUsage() { + const used = process.memoryUsage(); + const total = used.heapTotal; + const usage = used.heapUsed / total; + + if (usage > MEMORY_WARNING_THRESHOLD) { + console.warn(`โš ๏ธ High memory usage detected: ${(usage * 100).toFixed(1)}% (${Math.round(used.heapUsed / 1024 / 1024)}MB / ${Math.round(total / 1024 / 1024)}MB)`); + + // Clear cache if memory usage is too high + if (usage > 0.9) { + console.warn(`๐Ÿงน Clearing markets cache due to high memory usage`); + marketsCache.clear(); + pendingRequests.clear(); + } + } +} + /** * Checks if the user has sufficient ETH balance for gas fees * @param sdk The GMX SDK client @@ -258,37 +278,20 @@ async function executeWithFallback( // Add a promise cache to prevent concurrent calls to the same endpoint const pendingRequests = new Map>(); -// Redis client helper function -async function getRedisClient() { - const redisUrl = process.env.REDIS_URL || 'redis://localhost:6379'; - const redisPassword = process.env.REDIS_PASSWORD; - - const redisConfig: any = { - url: redisUrl, - socket: { - connectTimeout: 2000, // 2 second connection timeout - reconnectStrategy: false // Don't retry on health check - } - }; - - if (redisPassword) { - redisConfig.password = redisPassword; - } - - const redisClient = createClient(redisConfig); - - // Suppress error logs to avoid spam - redisClient.on('error', () => { - // Silently ignore errors - }); - - await redisClient.connect(); - return redisClient; -} - async function getMarketsInfoWithCache(sdk: GmxSdk): Promise<{ marketsInfoData: MarketsInfoData; tokensData: TokensData }> { + // Check memory usage before proceeding + checkMemoryUsage(); + const cacheKey = `markets_${sdk.chainId}`; const now = Date.now(); + const cached = marketsCache.get(cacheKey); + + if (cached && (now - cached.timestamp) < CACHE_TTL) { + if (!cached.data.marketsInfoData || !cached.data.tokensData) { + throw new Error("Invalid cached data: missing markets or tokens info"); + } + return cached.data as { marketsInfoData: MarketsInfoData; tokensData: TokensData }; + } // Check if there's already a pending request for this chain if (pendingRequests.has(cacheKey)) { @@ -297,39 +300,7 @@ async function getMarketsInfoWithCache(sdk: GmxSdk): Promise<{ marketsInfoData: // Create a new request and cache the promise const requestPromise = (async () => { - let redisClient = null; try { - // Try to get data from Redis cache first - try { - redisClient = await getRedisClient(); - const cachedData = await redisClient.get(cacheKey); - - if (cachedData) { - const cached: CacheEntry = JSON.parse(cachedData); - const ageMinutes = (now - cached.timestamp) / (1000 * 60); - - if (ageMinutes < CACHE_TTL / 60) { // Convert seconds to minutes for comparison - if (!cached.data.marketsInfoData || !cached.data.tokensData) { - throw new Error("Invalid cached data: missing markets or tokens info"); - } - console.log(`โœ… Markets info retrieved from Redis cache for chain ${sdk.chainId}`); - return cached.data as { marketsInfoData: MarketsInfoData; tokensData: TokensData }; - } - } - } catch (redisError) { - console.warn(`โš ๏ธ Redis cache unavailable, fetching fresh data: ${redisError instanceof Error ? redisError.message : 'Unknown error'}`); - } finally { - if (redisClient) { - try { - await redisClient.disconnect(); - } catch (disconnectError) { - // Silently ignore disconnect errors - } - redisClient = null; - } - } - - // Fetch fresh data from GMX console.log(`๐Ÿ”„ Fetching markets info for chain ${sdk.chainId}...`); const data = await withTimeout(sdk.markets.getMarketsInfo(), OPERATION_TIMEOUT); @@ -337,64 +308,34 @@ async function getMarketsInfoWithCache(sdk: GmxSdk): Promise<{ marketsInfoData: throw new Error("Invalid response from GMX: missing markets or tokens info"); } - // Try to cache the data in Redis - try { - if (!redisClient) { - redisClient = await getRedisClient(); - } - - const cacheEntry: CacheEntry = { - data: data as { marketsInfoData: MarketsInfoData; tokensData: TokensData }, - timestamp: now - }; - - await redisClient.setEx(cacheKey, CACHE_TTL, JSON.stringify(cacheEntry)); - console.log(`โœ… Markets info cached in Redis for chain ${sdk.chainId}`); - } catch (redisError) { - console.warn(`โš ๏ธ Failed to cache data in Redis: ${redisError instanceof Error ? redisError.message : 'Unknown error'}`); - } finally { - if (redisClient) { - try { - await redisClient.disconnect(); - } catch (disconnectError) { - // Silently ignore disconnect errors - } - } + // Implement cache size limit to prevent memory issues + if (marketsCache.size >= MAX_CACHE_SIZE) { + // Remove the oldest entry + const oldestKey = marketsCache.keys().next().value; + marketsCache.delete(oldestKey); } + + marketsCache.set(cacheKey, { + data: data as { marketsInfoData: MarketsInfoData; tokensData: TokensData }, + timestamp: now + }); + console.log(`โœ… Markets info cached for chain ${sdk.chainId}`); return data as { marketsInfoData: MarketsInfoData; tokensData: TokensData }; } catch (error) { console.error(`โŒ Failed to fetch markets info for chain ${sdk.chainId}:`, error); - // If RPC is failing, return empty data + // If RPC is failing, return empty data to prevent memory issues const emptyData = { marketsInfoData: {} as MarketsInfoData, tokensData: {} as TokensData }; - // Try to cache the empty data for a shorter time to allow retries - try { - if (!redisClient) { - redisClient = await getRedisClient(); - } - - const cacheEntry: CacheEntry = { - data: emptyData, - timestamp: now - (CACHE_TTL * 1000 - 60000) // Cache for only 1 minute - }; - - await redisClient.setEx(cacheKey, 60, JSON.stringify(cacheEntry)); // 1 minute TTL - } catch (redisError) { - console.warn(`โš ๏ธ Failed to cache empty data in Redis: ${redisError instanceof Error ? redisError.message : 'Unknown error'}`); - } finally { - if (redisClient) { - try { - await redisClient.disconnect(); - } catch (disconnectError) { - // Silently ignore disconnect errors - } - } - } + // Cache the empty data for a shorter time to allow retries + marketsCache.set(cacheKey, { + data: emptyData, + timestamp: now - (CACHE_TTL - 60000) // Cache for only 1 minute + }); console.log(`โš ๏ธ Returning empty markets data for chain ${sdk.chainId} due to RPC failure`); return emptyData; @@ -2037,14 +1978,12 @@ export const claimGmxFundingFeesImpl = async ( const marketAddresses: string[] = []; const tokenAddresses: string[] = []; - // Get markets info data once for all iterations - const { marketsInfoData } = await getMarketsInfoWithCache(sdk); - // Build arrays of markets and tokens that have claimable amounts Object.entries(claimableFundingData).forEach(([marketAddress, data]) => { if (data.claimableFundingAmountLong > 0) { marketAddresses.push(marketAddress); // Get the market info to find the long token address + const { marketsInfoData } = marketsCache.get(`markets_${sdk.chainId}`)?.data || {}; if (marketsInfoData?.[marketAddress]) { tokenAddresses.push(marketsInfoData[marketAddress].longToken.address); } @@ -2053,6 +1992,7 @@ export const claimGmxFundingFeesImpl = async ( if (data.claimableFundingAmountShort > 0) { marketAddresses.push(marketAddress); // Get the market info to find the short token address + const { marketsInfoData } = marketsCache.get(`markets_${sdk.chainId}`)?.data || {}; if (marketsInfoData?.[marketAddress]) { tokenAddresses.push(marketsInfoData[marketAddress].shortToken.address); } @@ -2316,11 +2256,10 @@ export const claimGmxUiFeesImpl = async ( const marketAddresses: string[] = []; const tokenAddresses: string[] = []; - // Get markets info data once for all iterations - const { marketsInfoData } = await getMarketsInfoWithCache(sdk); - // Build arrays of markets and tokens that have claimable amounts Object.entries(claimableUiFeeData).forEach(([marketAddress, data]) => { + const { marketsInfoData } = marketsCache.get(`markets_${sdk.chainId}`)?.data || {}; + if (!marketsInfoData?.[marketAddress]) { return; }