Files
managing-apps/src/Managing.Infrastructure.Database/InfluxDb/CandleRepository.cs

161 lines
6.1 KiB
C#

using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Writes;
using Managing.Application.Abstractions.Repositories;
using Managing.Core;
using Managing.Domain.Candles;
using Managing.Infrastructure.Databases.InfluxDb;
using Managing.Infrastructure.Databases.InfluxDb.Abstractions;
using Managing.Infrastructure.Databases.InfluxDb.Models;
using Microsoft.Extensions.Logging;
using static Managing.Common.Enums;
namespace Managing.Infrastructure.Databases;
public class CandleRepository : ICandleRepository
{
private readonly string _priceBucket = "prices-bucket";
private readonly IInfluxDbRepository _influxDbRepository;
private readonly ILogger<CandleRepository> _logger;
public CandleRepository(IInfluxDbRepository influxDbRepository, ILogger<CandleRepository> logger)
{
_influxDbRepository = influxDbRepository;
_logger = logger;
}
public async Task<HashSet<Candle>> GetCandles(
TradingExchanges exchange,
Ticker ticker,
Timeframe timeframe,
DateTime start,
int? limit = null)
{
var results = await _influxDbRepository.QueryAsync(async query =>
{
var flux = $"from(bucket:\"{_priceBucket}\") " +
$"|> range(start: {start:s}Z) " +
$"|> filter(fn: (r) => r[\"exchange\"] == \"{exchange}\")" +
$"|> filter(fn: (r) => r[\"ticker\"] == \"{ticker}\")" +
$"|> filter(fn: (r) => r[\"timeframe\"] == \"{timeframe}\")" +
$"|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" +
$"|> keep(columns: [\"_time\", \"exchange\", \"ticker\", \"timeframe\", \"openTime\", \"closeTime\", \"open\", \"close\", \"high\", \"low\", \"baseVolume\", \"quoteVolume\", \"TradeCount\", \"takerBuyBaseVolume\", \"takerBuyQuoteVolume\"])";
if (limit != null)
{
flux += $"|> tail(n:{limit})";
}
var prices = await query.QueryAsync<PriceDto>(flux, _influxDbRepository.Organization);
return prices.Select(price => PriceHelpers.Map(price)).ToHashSet();
});
return results;
}
public async Task<HashSet<Candle>> GetCandles(
TradingExchanges exchange,
Ticker ticker,
Timeframe timeframe,
DateTime start,
DateTime end,
int? limit = null)
{
var results = await _influxDbRepository.QueryAsync(async query =>
{
var flux = $"from(bucket:\"{_priceBucket}\") " +
$"|> range(start: {start:s}Z, stop: {end:s}Z) " +
$"|> filter(fn: (r) => r[\"exchange\"] == \"{exchange}\")" +
$"|> filter(fn: (r) => r[\"ticker\"] == \"{ticker}\")" +
$"|> filter(fn: (r) => r[\"timeframe\"] == \"{timeframe}\")" +
$"|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")" +
$"|> keep(columns: [\"_time\", \"exchange\", \"ticker\", \"timeframe\", \"openTime\", \"closeTime\", \"open\", \"close\", \"high\", \"low\", \"baseVolume\", \"quoteVolume\", \"TradeCount\", \"takerBuyBaseVolume\", \"takerBuyQuoteVolume\"])";
if (limit != null)
{
flux += $"|> tail(n:{limit})";
}
var prices = await query.QueryAsync<PriceDto>(flux, _influxDbRepository.Organization);
return prices.Select(price => PriceHelpers.Map(price)).ToHashSet();
});
return results;
}
public async Task<IList<Ticker>> GetTickersAsync(
TradingExchanges exchange,
Timeframe timeframe,
DateTime start)
{
var results = await _influxDbRepository.QueryAsync(async query =>
{
var flux = $"from(bucket:\"{_priceBucket}\") " +
$"|> range(start: {start:s}Z, stop: now()) " +
$"|> filter(fn: (r) => r[\"_measurement\"] == \"price\")" +
$"|> filter(fn: (r) => r[\"exchange\"] == \"{exchange}\")" +
$"|> filter(fn: (r) => r[\"timeframe\"] == \"{timeframe}\")" +
$"|> keep(columns: [\"ticker\"])" +
$"|> distinct()";
var tickers = new List<Ticker>();
var records = await query.QueryAsync(flux, _influxDbRepository.Organization);
records.ForEach(table =>
{
var fluxRecords = table.Records;
fluxRecords.ForEach(fluxRecord =>
{
tickers.AddItem(
MiscExtensions.ParseEnum<Ticker>(fluxRecord.GetValueByKey("ticker").ToString()));
});
});
return tickers;
});
return results;
}
public Task InsertCandle(Candle candle)
{
_influxDbRepository.Write(write =>
{
PriceDto price = PriceHelpers.Map(candle);
write.WriteMeasurement(
price,
WritePrecision.Ns,
_priceBucket,
_influxDbRepository.Organization);
});
return Task.CompletedTask;
}
public Task InsertCandles(IEnumerable<Candle> candles)
{
_influxDbRepository.Write(write =>
{
foreach (var candle in candles)
{
PriceDto price = PriceHelpers.Map(candle);
write.WriteMeasurement(
price,
WritePrecision.Ns,
_priceBucket,
_influxDbRepository.Organization);
}
});
return Task.CompletedTask;
}
public void Test(Candle candle)
{
_influxDbRepository.Write(write =>
{
var point = PointData.Measurement("");
PriceDto price = PriceHelpers.Map(candle);
point.Tag("", "");
point.Timestamp(price.OpenTime, WritePrecision.Ns);
write.WritePoint(
point,
_priceBucket,
_influxDbRepository.Organization);
});
}
}