using InfluxDB.Client.Api.Domain; using Managing.Application.Abstractions.Repositories; using Managing.Domain.Statistics; using Managing.Infrastructure.Databases.InfluxDb.Abstractions; using Managing.Infrastructure.Databases.InfluxDb.Models; using Microsoft.Extensions.Logging; namespace Managing.Infrastructure.Databases; public class AgentBalanceRepository : IAgentBalanceRepository { private readonly string _balanceBucket = "agent-balances-bucket"; private readonly IInfluxDbRepository _influxDbRepository; private readonly ILogger _logger; public AgentBalanceRepository(IInfluxDbRepository influxDbRepository, ILogger logger) { _influxDbRepository = influxDbRepository; _logger = logger; } public void InsertAgentBalance(AgentBalance balance) { _influxDbRepository.Write(write => { var balanceDto = new AgentBalanceDto { AgentName = balance.AgentName, TotalValue = balance.TotalValue, TotalAccountUsdValue = balance.TotalAccountUsdValue, BotsAllocationUsdValue = balance.BotsAllocationUsdValue, PnL = balance.PnL, Time = balance.Time }; write.WriteMeasurement( balanceDto, WritePrecision.Ns, _balanceBucket, _influxDbRepository.Organization); }); } public async Task> GetAgentBalances(string agentName, DateTime start, DateTime? end = null) { var results = await _influxDbRepository.QueryAsync(async query => { var flux = $"from(bucket:\"{_balanceBucket}\") " + $"|> range(start: {start:s}Z" + (end.HasValue ? $", stop: {end.Value:s}Z" : "") + $") " + $"|> filter(fn: (r) => r[\"agent_name\"] == \"{agentName}\") " + $"|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"; var result = await query.QueryAsync(flux, _influxDbRepository.Organization); return result.Select(balance => new AgentBalance { AgentName = balance.AgentName, TotalValue = balance.TotalValue, TotalAccountUsdValue = balance.TotalAccountUsdValue, BotsAllocationUsdValue = balance.BotsAllocationUsdValue, PnL = balance.PnL, Time = balance.Time }).ToList(); }); return results; } public async Task<(IList result, int totalCount)> GetAllAgentBalancesWithHistory( DateTime start, DateTime? end) { var results = await _influxDbRepository.QueryAsync(async query => { // Get all balances within the time range, pivoted so each row is a full AgentBalanceDto var flux = $@" from(bucket: ""{_balanceBucket}"") |> range(start: {start:s}Z{(end.HasValue ? $", stop: {end.Value:s}Z" : "")}) |> filter(fn: (r) => r._measurement == ""agent_balance"") |> pivot(rowKey: [""_time""], columnKey: [""_field""], valueColumn: ""_value"") "; var balances = await query.QueryAsync(flux, _influxDbRepository.Organization); // Group balances by agent name var agentGroups = balances .GroupBy(b => b.AgentName) .Select(g => new AgentBalanceHistory { AgentName = g.Key, AgentBalances = g.Select(b => new AgentBalance { AgentName = b.AgentName, TotalValue = b.TotalValue, TotalAccountUsdValue = b.TotalAccountUsdValue, BotsAllocationUsdValue = b.BotsAllocationUsdValue, PnL = b.PnL, Time = b.Time }).OrderBy(b => b.Time).ToList() }).ToList(); return (agentGroups, agentGroups.Count); }); return results; } }