From c60bc4123a7574e94b00e8079691123e4503da1e Mon Sep 17 00:00:00 2001 From: cryptooda Date: Sun, 14 Sep 2025 22:29:15 +0700 Subject: [PATCH] Add price grain generic --- .../Grains/IPriceFetcherGrain.cs | 18 ++ .../Grains/PriceFetcherGrain.cs | 244 ++++++++++++++++++ 2 files changed, 262 insertions(+) create mode 100644 src/Managing.Application.Abstractions/Grains/IPriceFetcherGrain.cs create mode 100644 src/Managing.Application/Grains/PriceFetcherGrain.cs diff --git a/src/Managing.Application.Abstractions/Grains/IPriceFetcherGrain.cs b/src/Managing.Application.Abstractions/Grains/IPriceFetcherGrain.cs new file mode 100644 index 00000000..b80795cd --- /dev/null +++ b/src/Managing.Application.Abstractions/Grains/IPriceFetcherGrain.cs @@ -0,0 +1,18 @@ +using Orleans; + +namespace Managing.Application.Abstractions.Grains; + +/// +/// Orleans grain interface for price fetching operations. +/// This stateless worker grain handles fetching price data from external APIs +/// and publishing to Orleans streams for a specific timeframe. +/// +public interface IPriceFetcherGrain : IGrainWithStringKey +{ + /// + /// Fetches price data for all supported exchange/ticker combinations + /// and publishes new candles to their respective streams. + /// + /// True if the operation completed successfully, false otherwise + Task FetchAndPublishPricesAsync(); +} diff --git a/src/Managing.Application/Grains/PriceFetcherGrain.cs b/src/Managing.Application/Grains/PriceFetcherGrain.cs new file mode 100644 index 00000000..7a05bdc3 --- /dev/null +++ b/src/Managing.Application/Grains/PriceFetcherGrain.cs @@ -0,0 +1,244 @@ +using Managing.Application.Abstractions.Grains; +using Managing.Application.Abstractions.Repositories; +using Managing.Application.Abstractions.Services; +using Managing.Application.Shared; +using Managing.Common; +using Managing.Domain.Accounts; +using Managing.Domain.Candles; +using Microsoft.Extensions.Logging; +using Orleans.Concurrency; +using Orleans.Streams; +using static Managing.Common.Enums; + +namespace Managing.Application.Grains; + +/// +/// StatelessWorker grain for fetching price data from external APIs and publishing to Orleans streams. +/// This grain runs periodically and processes all exchange/ticker combinations for a specific timeframe. +/// The timeframe is passed as the PrimaryKeyString to identify which timeframe this grain handles. +/// +[StatelessWorker] +public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable +{ + private readonly ILogger _logger; + private readonly IExchangeService _exchangeService; + private readonly ICandleRepository _candleRepository; + private readonly IGrainFactory _grainFactory; + + private string FetchPricesReminderName => $"Fetch{TargetTimeframe}PricesReminder"; + private IDisposable _timer; + + // Predefined lists of trading parameters to fetch + private static readonly TradingExchanges[] SupportedExchanges = + { + TradingExchanges.Evm + }; + + private static readonly Ticker[] SupportedTickers = Constants.GMX.Config.SupportedTickers; + + private Timeframe TargetTimeframe { get; set; } + + public PriceFetcherGrain( + ILogger logger, + IExchangeService exchangeService, + ICandleRepository candleRepository, + IGrainFactory grainFactory) + { + _logger = logger; + _exchangeService = exchangeService; + _candleRepository = candleRepository; + _grainFactory = grainFactory; + } + + public override async Task OnActivateAsync(CancellationToken cancellationToken) + { + // Parse timeframe from PrimaryKeyString + var timeframeString = this.GetPrimaryKeyString(); + if (!Enum.TryParse(timeframeString, true, out var timeframe)) + { + _logger.LogError("Invalid timeframe '{TimeframeString}' provided as PrimaryKeyString", timeframeString); + throw new ArgumentException($"Invalid timeframe: {timeframeString}", nameof(timeframeString)); + } + + TargetTimeframe = timeframe; + + _logger.LogInformation("{0} activated for timeframe {1}", nameof(PriceFetcherGrain), TargetTimeframe); + + // Register a reminder to enable timer if not existing + var intervalMinutes = GrainHelpers.GetIntervalMinutes(TargetTimeframe); + await this.RegisterOrUpdateReminder( + FetchPricesReminderName, + TimeSpan.FromSeconds(10), + TimeSpan.FromMinutes(intervalMinutes)); + + await base.OnActivateAsync(cancellationToken); + } + + public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken) + { + _logger.LogInformation("{0} deactivating for timeframe {1}. Reason: {Reason}", + nameof(PriceFetcherGrain), TargetTimeframe, reason.Description); + + StopAndDisposeTimer(); + await base.OnDeactivateAsync(reason, cancellationToken); + } + + public async Task FetchAndPublishPricesAsync() + { + try + { + _logger.LogInformation("Starting {timeframe} price fetch cycle", TargetTimeframe); + + var fetchTasks = new List(); + + // Create fetch tasks for all exchange/ticker combinations for the target timeframe + foreach (var exchange in SupportedExchanges) + { + foreach (var ticker in SupportedTickers) + { + fetchTasks.Add(FetchAndPublish(exchange, ticker, TargetTimeframe)); + } + } + + // Execute all fetch operations in parallel + await Task.WhenAll(fetchTasks); + + _logger.LogInformation("{0} - Completed {1} price fetch cycle for {2} combinations", + nameof(PriceFetcherGrain), TargetTimeframe, fetchTasks.Count); + + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during price fetch cycle for timeframe {Timeframe}", TargetTimeframe); + return false; + } + } + + private async Task FetchAndPublish(TradingExchanges exchange, Ticker ticker, Timeframe timeframe) + { + try + { + // Create a dummy account for API calls (this may need to be adjusted based on your implementation) + var account = new Account + { + Name = "PriceFetcher", + Exchange = exchange, + Type = AccountType.Watch + }; + + // Get the last candle date from database + var existingCandles = await _candleRepository.GetCandles(exchange, ticker, timeframe, + DateTime.UtcNow.AddDays(-30), DateTime.UtcNow.AddDays(1), 20); + + var isFirstCall = !existingCandles.Any(); + + var startDate = !isFirstCall + ? existingCandles.Last().Date + : new DateTime(2017, 1, 1); + + // Fetch new candles from external API + var newCandles = await _exchangeService.GetCandles(account, ticker, startDate, timeframe, isFirstCall); + + if (newCandles?.Any() == true) + { + var streamKey = CandleHelpers.GetCandleStoreGrainKey(exchange, ticker, timeframe); + + _logger.LogDebug("Fetched {CandleCount} new candles for {StreamKey}", + newCandles.Count, streamKey); + + // Process all new candles + var processedCandles = newCandles.OrderBy(c => c.Date) + .Where(c => c.Date <= DateTime.UtcNow) // Avoid duplicates + .ToList(); + + foreach (var candle in processedCandles) + { + // Ensure candle has correct metadata + candle.Exchange = exchange; + candle.Ticker = ticker; + candle.Timeframe = timeframe; + } + + // Save all candles to database in a single batch + if (processedCandles.Any()) + { + await _candleRepository.InsertCandles(processedCandles).ConfigureAwait(false); + + _logger.LogDebug( + "[{Ticker}][{Exchange}][{Timeframe}] Inserted {CandleCount} candles for {StreamKey}", + ticker, exchange, timeframe, + processedCandles.Count, streamKey); + } + + var streamProvider = this.GetStreamProvider("DefaultStreamProvider"); + var stream = streamProvider.GetStream(streamKey); + + // Publish to stream (if needed) + foreach (var candle in processedCandles) + { + await stream.OnNextAsync(candle); + _logger.LogTrace("Published candle for {StreamKey} at {Date}", + streamKey, candle.Date); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error fetching prices for {Exchange}-{Ticker}-{Timeframe}", + exchange, ticker, timeframe); + } + } + + /// + /// Starts the Orleans timer for periodic price fetching + /// + private void RegisterAndStartTimer() + { + if (_timer != null) return; + + // Calculate the next execution time aligned to X-minute boundaries + var now = DateTime.UtcNow; + var dueTime = CandleHelpers.GetDueTimeForTimeframe(TargetTimeframe, now); + var period = TimeSpan.FromSeconds(CandleHelpers.GetBaseIntervalInSeconds(TargetTimeframe)); + _logger.LogInformation("{0} next execution scheduled in {1} seconds and at {2:} UTC every {3} seconds", + nameof(PriceFetcherGrain), dueTime, now.Add(dueTime), period); + + _timer = this.RegisterGrainTimer( + async _ => await FetchAndPublishPricesAsync(), + new GrainTimerCreationOptions + { + Period = period, + DueTime = dueTime, + KeepAlive = true + }); + + _logger.LogInformation("{0} timer registered and started for timeframe {1}", nameof(PriceFetcherGrain), + TargetTimeframe); + } + + private void StopAndDisposeTimer() + { + if (_timer != null) + { + _timer?.Dispose(); + _timer = null; + _logger.LogInformation("{0} timer stopped and disposed for timeframe {1}", nameof(PriceFetcherGrain), + TargetTimeframe); + } + } + + public Task ReceiveReminder(string reminderName, TickStatus status) + { + if (reminderName == FetchPricesReminderName) + { + // Only enable timer if not existing anymore + if (_timer == null) + { + RegisterAndStartTimer(); + } + } + + return Task.CompletedTask; + } +} \ No newline at end of file