Add Agent tracking balance

This commit is contained in:
2025-05-16 22:30:18 +07:00
parent b34e3aa886
commit 1cfb83f0b1
34 changed files with 764 additions and 115 deletions

View File

@@ -0,0 +1,107 @@
using InfluxDB.Client.Api.Domain;
using Managing.Application.Abstractions.Repositories;
using Managing.Domain.Statistics;
using Managing.Infrastructure.Databases.InfluxDb.Abstractions;
using Managing.Infrastructure.Databases.InfluxDb.Models;
using Microsoft.Extensions.Logging;
namespace Managing.Infrastructure.Databases;
public class AgentBalanceRepository : IAgentBalanceRepository
{
private readonly string _balanceBucket = "agent-balances-bucket";
private readonly IInfluxDbRepository _influxDbRepository;
private readonly ILogger<AgentBalanceRepository> _logger;
public AgentBalanceRepository(IInfluxDbRepository influxDbRepository, ILogger<AgentBalanceRepository> logger)
{
_influxDbRepository = influxDbRepository;
_logger = logger;
}
public void InsertAgentBalance(AgentBalance balance)
{
_influxDbRepository.Write(write =>
{
var balanceDto = new AgentBalanceDto
{
AgentName = balance.AgentName,
TotalValue = balance.TotalValue,
TotalAccountUsdValue = balance.TotalAccountUsdValue,
BotsAllocationUsdValue = balance.BotsAllocationUsdValue,
PnL = balance.PnL,
Time = balance.Time
};
write.WriteMeasurement(
balanceDto,
WritePrecision.Ns,
_balanceBucket,
_influxDbRepository.Organization);
});
}
public async Task<IList<AgentBalance>> GetAgentBalances(string agentName, DateTime start, DateTime? end = null)
{
var results = await _influxDbRepository.QueryAsync(async query =>
{
var flux = $"from(bucket:\"{_balanceBucket}\") " +
$"|> range(start: {start:s}Z" +
(end.HasValue ? $", stop: {end.Value:s}Z" : "") +
$") " +
$"|> filter(fn: (r) => r[\"agent_name\"] == \"{agentName}\")";
var result = await query.QueryAsync<AgentBalanceDto>(flux, _influxDbRepository.Organization);
return result.Select(balance => new AgentBalance
{
AgentName = balance.AgentName,
TotalValue = balance.TotalValue,
TotalAccountUsdValue = balance.TotalAccountUsdValue,
BotsAllocationUsdValue = balance.BotsAllocationUsdValue,
PnL = balance.PnL,
Time = balance.Time
}).ToList();
});
return results;
}
public async Task<(IList<AgentBalanceHistory> result, int totalCount)> GetAllAgentBalancesWithHistory(
DateTime start, DateTime? end)
{
var results = await _influxDbRepository.QueryAsync(async query =>
{
// Get all balances within the time range, pivoted so each row is a full AgentBalanceDto
var flux = $@"
from(bucket: ""{_balanceBucket}"")
|> range(start: {start:s}Z{(end.HasValue ? $", stop: {end.Value:s}Z" : "")})
|> filter(fn: (r) => r._measurement == ""agent_balance"")
|> pivot(rowKey: [""_time""], columnKey: [""_field""], valueColumn: ""_value"")
";
var balances = await query.QueryAsync<AgentBalanceDto>(flux, _influxDbRepository.Organization);
// Group balances by agent name
var agentGroups = balances
.GroupBy(b => b.AgentName)
.Select(g => new AgentBalanceHistory
{
AgentName = g.Key,
AgentBalances = g.Select(b => new AgentBalance
{
AgentName = b.AgentName,
TotalValue = b.TotalValue,
TotalAccountUsdValue = b.TotalAccountUsdValue,
BotsAllocationUsdValue = b.BotsAllocationUsdValue,
PnL = b.PnL,
Time = b.Time
}).OrderBy(b => b.Time).ToList()
}).ToList();
return (agentGroups, agentGroups.Count);
});
return results;
}
}

View File

@@ -0,0 +1,19 @@
using InfluxDB.Client.Core;
namespace Managing.Infrastructure.Databases.InfluxDb.Models;
[Measurement("agent_balance")]
public class AgentBalanceDto
{
[Column("agent_name", IsTag = true)] public string AgentName { get; set; }
[Column("total_value")] public decimal TotalValue { get; set; }
[Column("total_account_usd_value")] public decimal TotalAccountUsdValue { get; set; }
[Column("bots_allocation_usd_value")] public decimal BotsAllocationUsdValue { get; set; }
[Column("pnl")] public decimal PnL { get; set; }
[Column(IsTimestamp = true)] public DateTime Time { get; set; }
}