Fix update agent save + revert market in redis
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
using Managing.Application.Abstractions;
|
||||
using Managing.Application.Abstractions.Repositories;
|
||||
using Managing.Application.Abstractions.Services;
|
||||
using Managing.Application.Bots.Grains;
|
||||
using Managing.Application.Bots.Models;
|
||||
@@ -22,6 +23,7 @@ public class AgentGrainTests
|
||||
private readonly Mock<IAccountService> _mockAccountService;
|
||||
private readonly Mock<ITradingService> _mockTradingService;
|
||||
private readonly Mock<IServiceScopeFactory> _mockScopeFactory;
|
||||
private readonly Mock<IAgentBalanceRepository> _mockAgentBalanceRepository;
|
||||
|
||||
public AgentGrainTests()
|
||||
{
|
||||
@@ -34,6 +36,7 @@ public class AgentGrainTests
|
||||
_mockAccountService = new Mock<IAccountService>();
|
||||
_mockTradingService = new Mock<ITradingService>();
|
||||
_mockScopeFactory = new Mock<IServiceScopeFactory>();
|
||||
_mockAgentBalanceRepository = new Mock<IAgentBalanceRepository>();
|
||||
|
||||
// Setup default state
|
||||
_mockState.Setup(x => x.State).Returns(new AgentGrainState
|
||||
@@ -94,6 +97,7 @@ public class AgentGrainTests
|
||||
_mockUserService.Object,
|
||||
_mockAccountService.Object,
|
||||
_mockTradingService.Object,
|
||||
_mockAgentBalanceRepository.Object,
|
||||
_mockScopeFactory.Object);
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ using Managing.Domain.Scenarios;
|
||||
using Managing.Domain.Strategies;
|
||||
using Managing.Domain.Strategies.Signals;
|
||||
using Microsoft.AspNetCore.SignalR;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Moq;
|
||||
using Newtonsoft.Json;
|
||||
using Xunit;
|
||||
@@ -41,8 +42,10 @@ namespace Managing.Application.Tests
|
||||
var backtestLogger = TradingBaseTests.CreateBacktesterLogger();
|
||||
var botService = new Mock<IBotService>().Object;
|
||||
var agentService = new Mock<IAgentService>().Object;
|
||||
var _scopeFactory = new Mock<IServiceScopeFactory>();
|
||||
_backtester = new Backtester(_exchangeService, backtestRepository, backtestLogger,
|
||||
scenarioService, _accountService.Object, messengerService, kaigenService, hubContext, null, agentService);
|
||||
scenarioService, _accountService.Object, messengerService, kaigenService, hubContext, null,
|
||||
_scopeFactory.Object);
|
||||
_elapsedTimes = new List<double>();
|
||||
|
||||
// Initialize cross-platform file paths
|
||||
|
||||
@@ -61,5 +61,11 @@ namespace Managing.Application.Abstractions.Grains
|
||||
/// <param name="accountName">The account name to check balances for</param>
|
||||
/// <returns>BalanceCheckResult indicating the status and reason for any failure</returns>
|
||||
Task<BalanceCheckResult> CheckAndEnsureEthBalanceAsync(Guid requestingBotId, string accountName);
|
||||
|
||||
/// <summary>
|
||||
/// Forces an update of the agent summary.
|
||||
/// </summary>
|
||||
[OneWay]
|
||||
Task ForceUpdateSummary();
|
||||
}
|
||||
}
|
||||
@@ -161,6 +161,21 @@ public class AgentGrain : Grain, IAgentGrain
|
||||
}
|
||||
}
|
||||
|
||||
[OneWay]
|
||||
public async Task ForceUpdateSummary()
|
||||
{
|
||||
// Check if last update was more than 2 minutes ago
|
||||
if (_state.State.LastSummaryUpdateTime.HasValue &&
|
||||
DateTime.UtcNow - _state.State.LastSummaryUpdateTime.Value < TimeSpan.FromMinutes(2))
|
||||
{
|
||||
_logger.LogDebug("Skipping summary update for agent {UserId} - last update was {TimeAgo} ago",
|
||||
this.GetPrimaryKeyLong(), DateTime.UtcNow - _state.State.LastSummaryUpdateTime.Value);
|
||||
return;
|
||||
}
|
||||
|
||||
await UpdateSummary();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Updates the agent summary by recalculating from position data (used for initialization or manual refresh)
|
||||
/// </summary>
|
||||
@@ -274,6 +289,10 @@ public class AgentGrain : Grain, IAgentGrain
|
||||
// Save summary to database
|
||||
await _agentService.SaveOrUpdateAgentSummary(summary);
|
||||
|
||||
// Update last summary update time
|
||||
_state.State.LastSummaryUpdateTime = DateTime.UtcNow;
|
||||
await _state.WriteStateAsync();
|
||||
|
||||
// Insert balance tracking data
|
||||
InsertBalanceTrackingData(totalBalance, botsAllocationUsdValue, netPnL, usdcWalletValue,
|
||||
usdcInPositionsValue);
|
||||
|
||||
@@ -845,7 +845,7 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
|
||||
// Pass the complete Bot object to BotService for saving
|
||||
var success = await ServiceScopeHelpers.WithScopedService<IBotService, bool>(_scopeFactory,
|
||||
async (botService) => { return await botService.SaveBotStatisticsAsync(bot); });
|
||||
|
||||
|
||||
if (success)
|
||||
{
|
||||
_logger.LogDebug(
|
||||
@@ -857,6 +857,12 @@ public class LiveTradingBotGrain : Grain, ILiveTradingBotGrain, IRemindable
|
||||
{
|
||||
_logger.LogWarning("Failed to save bot statistics for bot {BotId}", _state.State.Identifier);
|
||||
}
|
||||
|
||||
// await ServiceScopeHelpers.WithScopedService<IGrainFactory>(_scopeFactory, async grainFactory =>
|
||||
// {
|
||||
// var agentGrain = grainFactory.GetGrain<IAgentGrain>(_state.State.User.Id);
|
||||
// await agentGrain.ForceUpdateSummary();
|
||||
// });
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
|
||||
@@ -32,6 +32,12 @@ namespace Managing.Application.Bots.Models
|
||||
/// </summary>
|
||||
[Id(5)]
|
||||
public decimal TotalFees { get; set; } = 0;
|
||||
|
||||
/// <summary>
|
||||
/// Timestamp of the last summary update to implement update throttling
|
||||
/// </summary>
|
||||
[Id(6)]
|
||||
public DateTime? LastSummaryUpdateTime { get; set; } = null;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
||||
@@ -2,7 +2,6 @@ import fp from 'fastify-plugin'
|
||||
import {FastifyReply, FastifyRequest} from 'fastify'
|
||||
import {z} from 'zod'
|
||||
import {GmxSdk} from '../../generated/gmxsdk/index.js'
|
||||
import {createClient} from 'redis'
|
||||
|
||||
import {arbitrum} from 'viem/chains';
|
||||
import {getTokenBySymbol} from '../../generated/gmxsdk/configs/tokens.js';
|
||||
@@ -51,11 +50,32 @@ interface CacheEntry {
|
||||
timestamp: number;
|
||||
}
|
||||
|
||||
const CACHE_TTL = 30 * 60; // 30 minutes in seconds (Redis TTL)
|
||||
const CACHE_TTL = 30 * 60 * 1000; // 30 minutes in milliseconds
|
||||
const marketsCache = new Map<string, CacheEntry>();
|
||||
const MAX_CACHE_SIZE = 5; // Limit cache size to prevent memory issues
|
||||
const OPERATION_TIMEOUT = 30000; // 30 seconds timeout for operations
|
||||
|
||||
const MEMORY_WARNING_THRESHOLD = 0.8; // Warn when memory usage exceeds 80%
|
||||
const MAX_GAS_FEE_USD = 1.5; // Maximum gas fee in USD (1 USDC)
|
||||
|
||||
// Memory monitoring function
|
||||
function checkMemoryUsage() {
|
||||
const used = process.memoryUsage();
|
||||
const total = used.heapTotal;
|
||||
const usage = used.heapUsed / total;
|
||||
|
||||
if (usage > MEMORY_WARNING_THRESHOLD) {
|
||||
console.warn(`⚠️ High memory usage detected: ${(usage * 100).toFixed(1)}% (${Math.round(used.heapUsed / 1024 / 1024)}MB / ${Math.round(total / 1024 / 1024)}MB)`);
|
||||
|
||||
// Clear cache if memory usage is too high
|
||||
if (usage > 0.9) {
|
||||
console.warn(`🧹 Clearing markets cache due to high memory usage`);
|
||||
marketsCache.clear();
|
||||
pendingRequests.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the user has sufficient ETH balance for gas fees
|
||||
* @param sdk The GMX SDK client
|
||||
@@ -258,37 +278,20 @@ async function executeWithFallback<T>(
|
||||
// Add a promise cache to prevent concurrent calls to the same endpoint
|
||||
const pendingRequests = new Map<string, Promise<{ marketsInfoData: MarketsInfoData; tokensData: TokensData }>>();
|
||||
|
||||
// Redis client helper function
|
||||
async function getRedisClient() {
|
||||
const redisUrl = process.env.REDIS_URL || 'redis://localhost:6379';
|
||||
const redisPassword = process.env.REDIS_PASSWORD;
|
||||
|
||||
const redisConfig: any = {
|
||||
url: redisUrl,
|
||||
socket: {
|
||||
connectTimeout: 2000, // 2 second connection timeout
|
||||
reconnectStrategy: false // Don't retry on health check
|
||||
}
|
||||
};
|
||||
|
||||
if (redisPassword) {
|
||||
redisConfig.password = redisPassword;
|
||||
}
|
||||
|
||||
const redisClient = createClient(redisConfig);
|
||||
|
||||
// Suppress error logs to avoid spam
|
||||
redisClient.on('error', () => {
|
||||
// Silently ignore errors
|
||||
});
|
||||
|
||||
await redisClient.connect();
|
||||
return redisClient;
|
||||
}
|
||||
|
||||
async function getMarketsInfoWithCache(sdk: GmxSdk): Promise<{ marketsInfoData: MarketsInfoData; tokensData: TokensData }> {
|
||||
// Check memory usage before proceeding
|
||||
checkMemoryUsage();
|
||||
|
||||
const cacheKey = `markets_${sdk.chainId}`;
|
||||
const now = Date.now();
|
||||
const cached = marketsCache.get(cacheKey);
|
||||
|
||||
if (cached && (now - cached.timestamp) < CACHE_TTL) {
|
||||
if (!cached.data.marketsInfoData || !cached.data.tokensData) {
|
||||
throw new Error("Invalid cached data: missing markets or tokens info");
|
||||
}
|
||||
return cached.data as { marketsInfoData: MarketsInfoData; tokensData: TokensData };
|
||||
}
|
||||
|
||||
// Check if there's already a pending request for this chain
|
||||
if (pendingRequests.has(cacheKey)) {
|
||||
@@ -297,39 +300,7 @@ async function getMarketsInfoWithCache(sdk: GmxSdk): Promise<{ marketsInfoData:
|
||||
|
||||
// Create a new request and cache the promise
|
||||
const requestPromise = (async () => {
|
||||
let redisClient = null;
|
||||
try {
|
||||
// Try to get data from Redis cache first
|
||||
try {
|
||||
redisClient = await getRedisClient();
|
||||
const cachedData = await redisClient.get(cacheKey);
|
||||
|
||||
if (cachedData) {
|
||||
const cached: CacheEntry = JSON.parse(cachedData);
|
||||
const ageMinutes = (now - cached.timestamp) / (1000 * 60);
|
||||
|
||||
if (ageMinutes < CACHE_TTL / 60) { // Convert seconds to minutes for comparison
|
||||
if (!cached.data.marketsInfoData || !cached.data.tokensData) {
|
||||
throw new Error("Invalid cached data: missing markets or tokens info");
|
||||
}
|
||||
console.log(`✅ Markets info retrieved from Redis cache for chain ${sdk.chainId}`);
|
||||
return cached.data as { marketsInfoData: MarketsInfoData; tokensData: TokensData };
|
||||
}
|
||||
}
|
||||
} catch (redisError) {
|
||||
console.warn(`⚠️ Redis cache unavailable, fetching fresh data: ${redisError instanceof Error ? redisError.message : 'Unknown error'}`);
|
||||
} finally {
|
||||
if (redisClient) {
|
||||
try {
|
||||
await redisClient.disconnect();
|
||||
} catch (disconnectError) {
|
||||
// Silently ignore disconnect errors
|
||||
}
|
||||
redisClient = null;
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch fresh data from GMX
|
||||
console.log(`🔄 Fetching markets info for chain ${sdk.chainId}...`);
|
||||
const data = await withTimeout(sdk.markets.getMarketsInfo(), OPERATION_TIMEOUT);
|
||||
|
||||
@@ -337,64 +308,34 @@ async function getMarketsInfoWithCache(sdk: GmxSdk): Promise<{ marketsInfoData:
|
||||
throw new Error("Invalid response from GMX: missing markets or tokens info");
|
||||
}
|
||||
|
||||
// Try to cache the data in Redis
|
||||
try {
|
||||
if (!redisClient) {
|
||||
redisClient = await getRedisClient();
|
||||
}
|
||||
|
||||
const cacheEntry: CacheEntry = {
|
||||
data: data as { marketsInfoData: MarketsInfoData; tokensData: TokensData },
|
||||
timestamp: now
|
||||
};
|
||||
|
||||
await redisClient.setEx(cacheKey, CACHE_TTL, JSON.stringify(cacheEntry));
|
||||
console.log(`✅ Markets info cached in Redis for chain ${sdk.chainId}`);
|
||||
} catch (redisError) {
|
||||
console.warn(`⚠️ Failed to cache data in Redis: ${redisError instanceof Error ? redisError.message : 'Unknown error'}`);
|
||||
} finally {
|
||||
if (redisClient) {
|
||||
try {
|
||||
await redisClient.disconnect();
|
||||
} catch (disconnectError) {
|
||||
// Silently ignore disconnect errors
|
||||
}
|
||||
}
|
||||
// Implement cache size limit to prevent memory issues
|
||||
if (marketsCache.size >= MAX_CACHE_SIZE) {
|
||||
// Remove the oldest entry
|
||||
const oldestKey = marketsCache.keys().next().value;
|
||||
marketsCache.delete(oldestKey);
|
||||
}
|
||||
|
||||
marketsCache.set(cacheKey, {
|
||||
data: data as { marketsInfoData: MarketsInfoData; tokensData: TokensData },
|
||||
timestamp: now
|
||||
});
|
||||
|
||||
console.log(`✅ Markets info cached for chain ${sdk.chainId}`);
|
||||
return data as { marketsInfoData: MarketsInfoData; tokensData: TokensData };
|
||||
} catch (error) {
|
||||
console.error(`❌ Failed to fetch markets info for chain ${sdk.chainId}:`, error);
|
||||
|
||||
// If RPC is failing, return empty data
|
||||
// If RPC is failing, return empty data to prevent memory issues
|
||||
const emptyData = {
|
||||
marketsInfoData: {} as MarketsInfoData,
|
||||
tokensData: {} as TokensData
|
||||
};
|
||||
|
||||
// Try to cache the empty data for a shorter time to allow retries
|
||||
try {
|
||||
if (!redisClient) {
|
||||
redisClient = await getRedisClient();
|
||||
}
|
||||
|
||||
const cacheEntry: CacheEntry = {
|
||||
data: emptyData,
|
||||
timestamp: now - (CACHE_TTL * 1000 - 60000) // Cache for only 1 minute
|
||||
};
|
||||
|
||||
await redisClient.setEx(cacheKey, 60, JSON.stringify(cacheEntry)); // 1 minute TTL
|
||||
} catch (redisError) {
|
||||
console.warn(`⚠️ Failed to cache empty data in Redis: ${redisError instanceof Error ? redisError.message : 'Unknown error'}`);
|
||||
} finally {
|
||||
if (redisClient) {
|
||||
try {
|
||||
await redisClient.disconnect();
|
||||
} catch (disconnectError) {
|
||||
// Silently ignore disconnect errors
|
||||
}
|
||||
}
|
||||
}
|
||||
// Cache the empty data for a shorter time to allow retries
|
||||
marketsCache.set(cacheKey, {
|
||||
data: emptyData,
|
||||
timestamp: now - (CACHE_TTL - 60000) // Cache for only 1 minute
|
||||
});
|
||||
|
||||
console.log(`⚠️ Returning empty markets data for chain ${sdk.chainId} due to RPC failure`);
|
||||
return emptyData;
|
||||
@@ -2037,14 +1978,12 @@ export const claimGmxFundingFeesImpl = async (
|
||||
const marketAddresses: string[] = [];
|
||||
const tokenAddresses: string[] = [];
|
||||
|
||||
// Get markets info data once for all iterations
|
||||
const { marketsInfoData } = await getMarketsInfoWithCache(sdk);
|
||||
|
||||
// Build arrays of markets and tokens that have claimable amounts
|
||||
Object.entries(claimableFundingData).forEach(([marketAddress, data]) => {
|
||||
if (data.claimableFundingAmountLong > 0) {
|
||||
marketAddresses.push(marketAddress);
|
||||
// Get the market info to find the long token address
|
||||
const { marketsInfoData } = marketsCache.get(`markets_${sdk.chainId}`)?.data || {};
|
||||
if (marketsInfoData?.[marketAddress]) {
|
||||
tokenAddresses.push(marketsInfoData[marketAddress].longToken.address);
|
||||
}
|
||||
@@ -2053,6 +1992,7 @@ export const claimGmxFundingFeesImpl = async (
|
||||
if (data.claimableFundingAmountShort > 0) {
|
||||
marketAddresses.push(marketAddress);
|
||||
// Get the market info to find the short token address
|
||||
const { marketsInfoData } = marketsCache.get(`markets_${sdk.chainId}`)?.data || {};
|
||||
if (marketsInfoData?.[marketAddress]) {
|
||||
tokenAddresses.push(marketsInfoData[marketAddress].shortToken.address);
|
||||
}
|
||||
@@ -2316,11 +2256,10 @@ export const claimGmxUiFeesImpl = async (
|
||||
const marketAddresses: string[] = [];
|
||||
const tokenAddresses: string[] = [];
|
||||
|
||||
// Get markets info data once for all iterations
|
||||
const { marketsInfoData } = await getMarketsInfoWithCache(sdk);
|
||||
|
||||
// Build arrays of markets and tokens that have claimable amounts
|
||||
Object.entries(claimableUiFeeData).forEach(([marketAddress, data]) => {
|
||||
const { marketsInfoData } = marketsCache.get(`markets_${sdk.chainId}`)?.data || {};
|
||||
|
||||
if (!marketsInfoData?.[marketAddress]) {
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user