using InfluxDB.Client.Api.Domain; using InfluxDB.Client.Writes; using Managing.Application.Abstractions.Repositories; using Managing.Core; using Managing.Domain.Candles; using Managing.Infrastructure.Databases.InfluxDb; using Managing.Infrastructure.Databases.InfluxDb.Abstractions; using Managing.Infrastructure.Databases.InfluxDb.Models; using Microsoft.Extensions.Logging; using static Managing.Common.Enums; namespace Managing.Infrastructure.Databases; public class CandleRepository : ICandleRepository { private readonly string _priceBucket = "prices-bucket"; private readonly IInfluxDbRepository _influxDbRepository; private readonly ILogger _logger; public CandleRepository(IInfluxDbRepository influxDbRepository, ILogger logger) { _influxDbRepository = influxDbRepository; _logger = logger; } public async Task> GetCandles( TradingExchanges exchange, Ticker ticker, Timeframe timeframe, DateTime start) { var results = await _influxDbRepository.QueryAsync(async query => { var flux = $"from(bucket:\"{_priceBucket}\") " + $"|> range(start: {start:s}Z) " + $"|> filter(fn: (r) => r[\"exchange\"] == \"{exchange}\")" + $"|> filter(fn: (r) => r[\"ticker\"] == \"{ticker}\")" + $"|> filter(fn: (r) => r[\"timeframe\"] == \"{timeframe}\")" + $"|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"; var prices = await query.QueryAsync(flux, _influxDbRepository.Organization); return prices.Select(price => PriceHelpers.Map(price)).ToList(); }); return results; } public async Task> GetCandles( TradingExchanges exchange, Ticker ticker, Timeframe timeframe, DateTime start, DateTime end) { var results = await _influxDbRepository.QueryAsync(async query => { var flux = $"from(bucket:\"{_priceBucket}\") " + $"|> range(start: {start:s}Z, stop: {end:s}Z) " + $"|> filter(fn: (r) => r[\"exchange\"] == \"{exchange}\")" + $"|> filter(fn: (r) => r[\"ticker\"] == \"{ticker}\")" + $"|> filter(fn: (r) => r[\"timeframe\"] == \"{timeframe}\")" + $"|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")"; var prices = await query.QueryAsync(flux, _influxDbRepository.Organization); return prices.Select(price => PriceHelpers.Map(price)).ToList(); }); return results; } public async Task> GetTickersAsync( TradingExchanges exchange, Timeframe timeframe, DateTime start) { var results = await _influxDbRepository.QueryAsync(async query => { var flux = $"from(bucket:\"{_priceBucket}\") " + $"|> range(start: {start:s}Z, stop: now()) " + $"|> filter(fn: (r) => r[\"_measurement\"] == \"price\")" + $"|> filter(fn: (r) => r[\"exchange\"] == \"{exchange}\")" + $"|> filter(fn: (r) => r[\"timeframe\"] == \"{timeframe}\")" + $"|> keep(columns: [\"ticker\"])" + $"|> distinct()"; var tickers = new List(); var records = await query.QueryAsync(flux, _influxDbRepository.Organization); records.ForEach(table => { var fluxRecords = table.Records; fluxRecords.ForEach(fluxRecord => { tickers.AddItem( MiscExtensions.ParseEnum(fluxRecord.GetValueByKey("ticker").ToString())); }); }); return tickers; }); return results; } public void InsertCandle(Candle candle) { _influxDbRepository.Write(write => { PriceDto price = PriceHelpers.Map(candle); write.WriteMeasurement( price, WritePrecision.Ns, _priceBucket, _influxDbRepository.Organization); }); } public void Test(Candle candle) { _influxDbRepository.Write(write => { var point = PointData.Measurement(""); PriceDto price = PriceHelpers.Map(candle); point.Tag("", ""); point.Timestamp(price.OpenTime, WritePrecision.Ns); write.WritePoint( point, _priceBucket, _influxDbRepository.Organization); }); } }