Files
managing-apps/src/Managing.Infrastructure.Database/InfluxDb/AgentBalanceRepository.cs
2025-10-07 01:21:25 +07:00

109 lines
4.1 KiB
C#

using InfluxDB.Client.Api.Domain;
using Managing.Application.Abstractions.Repositories;
using Managing.Domain.Statistics;
using Managing.Infrastructure.Databases.InfluxDb.Abstractions;
using Managing.Infrastructure.Databases.InfluxDb.Models;
using Microsoft.Extensions.Logging;
namespace Managing.Infrastructure.Databases.InfluxDb;
public class AgentBalanceRepository : IAgentBalanceRepository
{
private readonly string _balanceBucket = "agent-balances-bucket";
private readonly IInfluxDbRepository _influxDbRepository;
private readonly ILogger<AgentBalanceRepository> _logger;
public AgentBalanceRepository(IInfluxDbRepository influxDbRepository, ILogger<AgentBalanceRepository> logger)
{
_influxDbRepository = influxDbRepository;
_logger = logger;
}
public void InsertAgentBalance(AgentBalance balance)
{
_influxDbRepository.Write(write =>
{
var balanceDto = new AgentBalanceDto
{
UserId = balance.UserId,
TotalBalanceValue = balance.TotalBalanceValue,
UsdcWalletValue = balance.UsdcWalletValue,
UsdcInPositionsValue = balance.UsdcInPositionsValue,
BotsAllocationUsdValue = balance.BotsAllocationUsdValue,
PnL = balance.PnL,
Time = balance.Time
};
write.WriteMeasurement(
balanceDto,
WritePrecision.Ns,
_balanceBucket,
_influxDbRepository.Organization);
});
}
public async Task<IList<AgentBalance>> GetAgentBalancesByUserId(int userId, DateTime start, DateTime? end = null)
{
var results = await _influxDbRepository.QueryAsync(async query =>
{
var effectiveEnd = end ?? DateTime.UtcNow;
var timeRange = effectiveEnd - start;
// Determine sampling interval based on time range to limit data points
string samplingInterval;
if (timeRange.TotalDays <= 1)
{
// Less than 1 day: 5-minute intervals (max ~288 points)
samplingInterval = "5m";
}
else if (timeRange.TotalDays <= 7)
{
// 1-7 days: 30-minute intervals (max ~336 points)
samplingInterval = "30m";
}
else if (timeRange.TotalDays <= 30)
{
// 1-30 days: 2-hour intervals (max ~360 points)
samplingInterval = "2h";
}
else if (timeRange.TotalDays <= 90)
{
// 1-3 months: 6-hour intervals (max ~360 points)
samplingInterval = "6h";
}
else if (timeRange.TotalDays <= 365)
{
// 3-12 months: 1-day intervals (max ~365 points)
samplingInterval = "1d";
}
else
{
// More than 1 year: 7-day intervals (max ~52 points)
samplingInterval = "7d";
}
var flux = $"from(bucket:\"{_balanceBucket}\") " +
$"|> range(start: {start:s}Z" +
(end.HasValue ? $", stop: {end.Value:s}Z" : "") +
$") " +
$"|> filter(fn: (r) => r[\"user_id\"] == \"{userId}\") " +
$"|> aggregateWindow(every: {samplingInterval}, fn: last, createEmpty: false) " +
$"|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")";
var result = await query.QueryAsync<AgentBalanceDto>(flux, _influxDbRepository.Organization);
return result.Select(balance => new AgentBalance
{
UserId = balance.UserId,
TotalBalanceValue = balance.TotalBalanceValue,
UsdcWalletValue = balance.UsdcWalletValue,
UsdcInPositionsValue = balance.UsdcInPositionsValue,
BotsAllocationUsdValue = balance.BotsAllocationUsdValue,
PnL = balance.PnL,
Time = balance.Time
}).ToList();
});
return results;
}
}