Update pricing timing

This commit is contained in:
2025-09-14 22:27:54 +07:00
parent daeb26375b
commit 2847778c7c
10 changed files with 145 additions and 351 deletions

View File

@@ -1,229 +0,0 @@
using Managing.Application.Abstractions.Grains;
using Managing.Application.Abstractions.Repositories;
using Managing.Application.Abstractions.Services;
using Managing.Common;
using Managing.Domain.Accounts;
using Managing.Domain.Candles;
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Orleans.Streams;
using static Managing.Common.Enums;
namespace Managing.Application.Grains;
/// <summary>
/// StatelessWorker grain for fetching 5-minute price data from external APIs and publishing to Orleans streams.
/// This grain runs every 5 minutes and processes all exchange/ticker combinations for the 5-minute timeframe.
/// </summary>
[StatelessWorker]
public class PriceFetcher15MinGrain : Grain, IPriceFetcher15MinGrain, IRemindable
{
private readonly ILogger<PriceFetcher15MinGrain> _logger;
private readonly IExchangeService _exchangeService;
private readonly ICandleRepository _candleRepository;
private readonly IGrainFactory _grainFactory;
private const string FetchPricesReminderName = "Fetch15minPricesReminder";
private IDisposable _timer;
// Predefined lists of trading parameters to fetch
private static readonly TradingExchanges[] SupportedExchanges =
{
TradingExchanges.Evm
};
private static readonly Ticker[] SupportedTickers = Constants.GMX.Config.SupportedTickers;
private static readonly Timeframe TargetTimeframe = Timeframe.FifteenMinutes;
public PriceFetcher15MinGrain(
ILogger<PriceFetcher15MinGrain> logger,
IExchangeService exchangeService,
ICandleRepository candleRepository,
IGrainFactory grainFactory)
{
_logger = logger;
_exchangeService = exchangeService;
_candleRepository = candleRepository;
_grainFactory = grainFactory;
}
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("{0} activated", nameof(PriceFetcher15MinGrain));
// Register a reminder to enable timer if not existing
await this.RegisterOrUpdateReminder(
FetchPricesReminderName,
TimeSpan.FromSeconds(5),
TimeSpan.FromMinutes(7.5));
await base.OnActivateAsync(cancellationToken);
}
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
_logger.LogInformation("{0} deactivating. Reason: {Reason}",
nameof(PriceFetcher15MinGrain), reason.Description);
StopAndDisposeTimer();
await base.OnDeactivateAsync(reason, cancellationToken);
}
public async Task<bool> FetchAndPublishPricesAsync()
{
try
{
_logger.LogInformation("Starting {timeframe} price fetch cycle", TargetTimeframe);
var fetchTasks = new List<Task>();
// Create fetch tasks for all exchange/ticker combinations for 15-minute timeframe
foreach (var exchange in SupportedExchanges)
{
foreach (var ticker in SupportedTickers)
{
fetchTasks.Add(FetchAndPublish(exchange, ticker, TargetTimeframe));
}
}
// Execute all fetch operations in parallel
await Task.WhenAll(fetchTasks);
_logger.LogInformation("{0} - Completed {1} price fetch cycle for {2} combinations",
nameof(PriceFetcher15MinGrain), TargetTimeframe, fetchTasks.Count);
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error during price fetch cycle");
return false;
}
}
private async Task FetchAndPublish(TradingExchanges exchange, Ticker ticker, Timeframe timeframe)
{
try
{
// Create a dummy account for API calls (this may need to be adjusted based on your implementation)
var account = new Account
{
Name = "PriceFetcher",
Exchange = exchange,
Type = AccountType.Watch
};
// Get the last candle date from database
var existingCandles = await _candleRepository.GetCandles(exchange, ticker, timeframe,
DateTime.UtcNow.AddDays(-30), DateTime.UtcNow.AddDays(1), 20);
var isFirstCall = !existingCandles.Any();
var startDate = !isFirstCall
? existingCandles.Last().Date
: new DateTime(2017, 1, 1);
// Fetch new candles from external API
var newCandles = await _exchangeService.GetCandles(account, ticker, startDate, timeframe, isFirstCall);
if (newCandles?.Any() == true)
{
var streamKey = CandleHelpers.GetCandleStoreGrainKey(exchange, ticker, timeframe);
_logger.LogDebug("Fetched {CandleCount} new candles for {StreamKey}",
newCandles.Count, streamKey);
// Process all new candles
var processedCandles = newCandles.OrderBy(c => c.Date)
.Where(c => c.Date <= DateTime.UtcNow) // Avoid duplicates
.ToList();
foreach (var candle in processedCandles)
{
// Ensure candle has correct metadata
candle.Exchange = exchange;
candle.Ticker = ticker;
candle.Timeframe = timeframe;
}
// Save all candles to database in a single batch
if (processedCandles.Any())
{
await _candleRepository.InsertCandles(processedCandles).ConfigureAwait(false);
_logger.LogDebug(
"[{Ticker}][{Exchange}][{Timeframe}] Inserted {CandleCount} candles for {StreamKey}",
ticker, exchange, timeframe,
processedCandles.Count, streamKey);
}
var streamProvider = this.GetStreamProvider("DefaultStreamProvider");
var stream = streamProvider.GetStream<Candle>(streamKey);
// Publish to stream (if needed)
foreach (var candle in processedCandles)
{
await stream.OnNextAsync(candle);
_logger.LogTrace("Published candle for {StreamKey} at {Date}",
streamKey, candle.Date);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error fetching prices for {Exchange}-{Ticker}-{Timeframe}",
exchange, ticker, timeframe);
}
}
/// <summary>
/// Starts the Orleans timer for periodic price fetching
/// </summary>
private void RegisterAndStartTimer()
{
if (_timer != null) return;
// Calculate the next execution time aligned to X-minute boundaries
var now = DateTime.UtcNow;
var dueTime = CandleHelpers.GetDueTimeForTimeframe(TargetTimeframe, now);
var period = TimeSpan.FromSeconds(CandleHelpers.GetBaseIntervalInSeconds(TargetTimeframe));
_logger.LogInformation("{0} next execution scheduled in {1} seconds and at {2:} UTC every {3} seconds",
nameof(PriceFetcher15MinGrain), dueTime, now.Add(dueTime), period);
_timer = this.RegisterGrainTimer(
async _ => await FetchAndPublishPricesAsync(),
new GrainTimerCreationOptions
{
Period = period,
DueTime = dueTime,
KeepAlive = true
});
_logger.LogInformation("{0} timer registered and started", nameof(PriceFetcher15MinGrain));
}
private void StopAndDisposeTimer()
{
if (_timer != null)
{
_timer?.Dispose();
_timer = null;
_logger.LogInformation("{0} timer stopped and disposed", nameof(PriceFetcher15MinGrain));
}
}
public Task ReceiveReminder(string reminderName, TickStatus status)
{
if (reminderName == FetchPricesReminderName)
{
// Only enable timer if not existing anymore
if (_timer == null)
{
RegisterAndStartTimer();
}
}
return Task.CompletedTask;
}
}

View File

@@ -1,5 +1,6 @@
using Managing.Application.Abstractions.Grains;
using Microsoft.Extensions.Hosting;
using static Managing.Common.Enums;
namespace Managing.Application.Grains;
@@ -14,14 +15,26 @@ public class PriceFetcherInitializer : IHostedService
public async Task StartAsync(CancellationToken cancellationToken)
{
var fiveMinute = _grainFactory.GetGrain<IPriceFetcher15MinGrain>(0);
Console.WriteLine("GrainId : {0}", fiveMinute.GetGrainId());
// Initialize grains for different timeframes
var timeframes = new[]
{
Timeframe.FifteenMinutes,
Timeframe.OneHour,
Timeframe.FourHour,
Timeframe.OneDay
};
// Actually call a method on the grain to activate it
// This will trigger OnActivateAsync and register the reminder
await fiveMinute.FetchAndPublishPricesAsync();
foreach (var timeframe in timeframes)
{
var grain = _grainFactory.GetGrain<IPriceFetcherGrain>(timeframe.ToString());
Console.WriteLine("GrainId for {0}: {1}", timeframe, grain.GetGrainId());
Console.WriteLine("PriceFetcher5MinGrain activated and initial fetch completed");
// Actually call a method on the grain to activate it
// This will trigger OnActivateAsync and register the reminder
await grain.FetchAndPublishPricesAsync();
Console.WriteLine("PriceFetcherGrain for {0} activated and initial fetch completed", timeframe);
}
}
public Task StopAsync(CancellationToken cancellationToken)