Add price grain generic
This commit is contained in:
@@ -0,0 +1,18 @@
|
|||||||
|
using Orleans;
|
||||||
|
|
||||||
|
namespace Managing.Application.Abstractions.Grains;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Orleans grain interface for price fetching operations.
|
||||||
|
/// This stateless worker grain handles fetching price data from external APIs
|
||||||
|
/// and publishing to Orleans streams for a specific timeframe.
|
||||||
|
/// </summary>
|
||||||
|
public interface IPriceFetcherGrain : IGrainWithStringKey
|
||||||
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Fetches price data for all supported exchange/ticker combinations
|
||||||
|
/// and publishes new candles to their respective streams.
|
||||||
|
/// </summary>
|
||||||
|
/// <returns>True if the operation completed successfully, false otherwise</returns>
|
||||||
|
Task<bool> FetchAndPublishPricesAsync();
|
||||||
|
}
|
||||||
244
src/Managing.Application/Grains/PriceFetcherGrain.cs
Normal file
244
src/Managing.Application/Grains/PriceFetcherGrain.cs
Normal file
@@ -0,0 +1,244 @@
|
|||||||
|
using Managing.Application.Abstractions.Grains;
|
||||||
|
using Managing.Application.Abstractions.Repositories;
|
||||||
|
using Managing.Application.Abstractions.Services;
|
||||||
|
using Managing.Application.Shared;
|
||||||
|
using Managing.Common;
|
||||||
|
using Managing.Domain.Accounts;
|
||||||
|
using Managing.Domain.Candles;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Orleans.Concurrency;
|
||||||
|
using Orleans.Streams;
|
||||||
|
using static Managing.Common.Enums;
|
||||||
|
|
||||||
|
namespace Managing.Application.Grains;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// StatelessWorker grain for fetching price data from external APIs and publishing to Orleans streams.
|
||||||
|
/// This grain runs periodically and processes all exchange/ticker combinations for a specific timeframe.
|
||||||
|
/// The timeframe is passed as the PrimaryKeyString to identify which timeframe this grain handles.
|
||||||
|
/// </summary>
|
||||||
|
[StatelessWorker]
|
||||||
|
public class PriceFetcherGrain : Grain, IPriceFetcherGrain, IRemindable
|
||||||
|
{
|
||||||
|
private readonly ILogger<PriceFetcherGrain> _logger;
|
||||||
|
private readonly IExchangeService _exchangeService;
|
||||||
|
private readonly ICandleRepository _candleRepository;
|
||||||
|
private readonly IGrainFactory _grainFactory;
|
||||||
|
|
||||||
|
private string FetchPricesReminderName => $"Fetch{TargetTimeframe}PricesReminder";
|
||||||
|
private IDisposable _timer;
|
||||||
|
|
||||||
|
// Predefined lists of trading parameters to fetch
|
||||||
|
private static readonly TradingExchanges[] SupportedExchanges =
|
||||||
|
{
|
||||||
|
TradingExchanges.Evm
|
||||||
|
};
|
||||||
|
|
||||||
|
private static readonly Ticker[] SupportedTickers = Constants.GMX.Config.SupportedTickers;
|
||||||
|
|
||||||
|
private Timeframe TargetTimeframe { get; set; }
|
||||||
|
|
||||||
|
public PriceFetcherGrain(
|
||||||
|
ILogger<PriceFetcherGrain> logger,
|
||||||
|
IExchangeService exchangeService,
|
||||||
|
ICandleRepository candleRepository,
|
||||||
|
IGrainFactory grainFactory)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
_exchangeService = exchangeService;
|
||||||
|
_candleRepository = candleRepository;
|
||||||
|
_grainFactory = grainFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async Task OnActivateAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
// Parse timeframe from PrimaryKeyString
|
||||||
|
var timeframeString = this.GetPrimaryKeyString();
|
||||||
|
if (!Enum.TryParse<Timeframe>(timeframeString, true, out var timeframe))
|
||||||
|
{
|
||||||
|
_logger.LogError("Invalid timeframe '{TimeframeString}' provided as PrimaryKeyString", timeframeString);
|
||||||
|
throw new ArgumentException($"Invalid timeframe: {timeframeString}", nameof(timeframeString));
|
||||||
|
}
|
||||||
|
|
||||||
|
TargetTimeframe = timeframe;
|
||||||
|
|
||||||
|
_logger.LogInformation("{0} activated for timeframe {1}", nameof(PriceFetcherGrain), TargetTimeframe);
|
||||||
|
|
||||||
|
// Register a reminder to enable timer if not existing
|
||||||
|
var intervalMinutes = GrainHelpers.GetIntervalMinutes(TargetTimeframe);
|
||||||
|
await this.RegisterOrUpdateReminder(
|
||||||
|
FetchPricesReminderName,
|
||||||
|
TimeSpan.FromSeconds(10),
|
||||||
|
TimeSpan.FromMinutes(intervalMinutes));
|
||||||
|
|
||||||
|
await base.OnActivateAsync(cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
_logger.LogInformation("{0} deactivating for timeframe {1}. Reason: {Reason}",
|
||||||
|
nameof(PriceFetcherGrain), TargetTimeframe, reason.Description);
|
||||||
|
|
||||||
|
StopAndDisposeTimer();
|
||||||
|
await base.OnDeactivateAsync(reason, cancellationToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async Task<bool> FetchAndPublishPricesAsync()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_logger.LogInformation("Starting {timeframe} price fetch cycle", TargetTimeframe);
|
||||||
|
|
||||||
|
var fetchTasks = new List<Task>();
|
||||||
|
|
||||||
|
// Create fetch tasks for all exchange/ticker combinations for the target timeframe
|
||||||
|
foreach (var exchange in SupportedExchanges)
|
||||||
|
{
|
||||||
|
foreach (var ticker in SupportedTickers)
|
||||||
|
{
|
||||||
|
fetchTasks.Add(FetchAndPublish(exchange, ticker, TargetTimeframe));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Execute all fetch operations in parallel
|
||||||
|
await Task.WhenAll(fetchTasks);
|
||||||
|
|
||||||
|
_logger.LogInformation("{0} - Completed {1} price fetch cycle for {2} combinations",
|
||||||
|
nameof(PriceFetcherGrain), TargetTimeframe, fetchTasks.Count);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Error during price fetch cycle for timeframe {Timeframe}", TargetTimeframe);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task FetchAndPublish(TradingExchanges exchange, Ticker ticker, Timeframe timeframe)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
// Create a dummy account for API calls (this may need to be adjusted based on your implementation)
|
||||||
|
var account = new Account
|
||||||
|
{
|
||||||
|
Name = "PriceFetcher",
|
||||||
|
Exchange = exchange,
|
||||||
|
Type = AccountType.Watch
|
||||||
|
};
|
||||||
|
|
||||||
|
// Get the last candle date from database
|
||||||
|
var existingCandles = await _candleRepository.GetCandles(exchange, ticker, timeframe,
|
||||||
|
DateTime.UtcNow.AddDays(-30), DateTime.UtcNow.AddDays(1), 20);
|
||||||
|
|
||||||
|
var isFirstCall = !existingCandles.Any();
|
||||||
|
|
||||||
|
var startDate = !isFirstCall
|
||||||
|
? existingCandles.Last().Date
|
||||||
|
: new DateTime(2017, 1, 1);
|
||||||
|
|
||||||
|
// Fetch new candles from external API
|
||||||
|
var newCandles = await _exchangeService.GetCandles(account, ticker, startDate, timeframe, isFirstCall);
|
||||||
|
|
||||||
|
if (newCandles?.Any() == true)
|
||||||
|
{
|
||||||
|
var streamKey = CandleHelpers.GetCandleStoreGrainKey(exchange, ticker, timeframe);
|
||||||
|
|
||||||
|
_logger.LogDebug("Fetched {CandleCount} new candles for {StreamKey}",
|
||||||
|
newCandles.Count, streamKey);
|
||||||
|
|
||||||
|
// Process all new candles
|
||||||
|
var processedCandles = newCandles.OrderBy(c => c.Date)
|
||||||
|
.Where(c => c.Date <= DateTime.UtcNow) // Avoid duplicates
|
||||||
|
.ToList();
|
||||||
|
|
||||||
|
foreach (var candle in processedCandles)
|
||||||
|
{
|
||||||
|
// Ensure candle has correct metadata
|
||||||
|
candle.Exchange = exchange;
|
||||||
|
candle.Ticker = ticker;
|
||||||
|
candle.Timeframe = timeframe;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save all candles to database in a single batch
|
||||||
|
if (processedCandles.Any())
|
||||||
|
{
|
||||||
|
await _candleRepository.InsertCandles(processedCandles).ConfigureAwait(false);
|
||||||
|
|
||||||
|
_logger.LogDebug(
|
||||||
|
"[{Ticker}][{Exchange}][{Timeframe}] Inserted {CandleCount} candles for {StreamKey}",
|
||||||
|
ticker, exchange, timeframe,
|
||||||
|
processedCandles.Count, streamKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
var streamProvider = this.GetStreamProvider("DefaultStreamProvider");
|
||||||
|
var stream = streamProvider.GetStream<Candle>(streamKey);
|
||||||
|
|
||||||
|
// Publish to stream (if needed)
|
||||||
|
foreach (var candle in processedCandles)
|
||||||
|
{
|
||||||
|
await stream.OnNextAsync(candle);
|
||||||
|
_logger.LogTrace("Published candle for {StreamKey} at {Date}",
|
||||||
|
streamKey, candle.Date);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, "Error fetching prices for {Exchange}-{Ticker}-{Timeframe}",
|
||||||
|
exchange, ticker, timeframe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Starts the Orleans timer for periodic price fetching
|
||||||
|
/// </summary>
|
||||||
|
private void RegisterAndStartTimer()
|
||||||
|
{
|
||||||
|
if (_timer != null) return;
|
||||||
|
|
||||||
|
// Calculate the next execution time aligned to X-minute boundaries
|
||||||
|
var now = DateTime.UtcNow;
|
||||||
|
var dueTime = CandleHelpers.GetDueTimeForTimeframe(TargetTimeframe, now);
|
||||||
|
var period = TimeSpan.FromSeconds(CandleHelpers.GetBaseIntervalInSeconds(TargetTimeframe));
|
||||||
|
_logger.LogInformation("{0} next execution scheduled in {1} seconds and at {2:} UTC every {3} seconds",
|
||||||
|
nameof(PriceFetcherGrain), dueTime, now.Add(dueTime), period);
|
||||||
|
|
||||||
|
_timer = this.RegisterGrainTimer(
|
||||||
|
async _ => await FetchAndPublishPricesAsync(),
|
||||||
|
new GrainTimerCreationOptions
|
||||||
|
{
|
||||||
|
Period = period,
|
||||||
|
DueTime = dueTime,
|
||||||
|
KeepAlive = true
|
||||||
|
});
|
||||||
|
|
||||||
|
_logger.LogInformation("{0} timer registered and started for timeframe {1}", nameof(PriceFetcherGrain),
|
||||||
|
TargetTimeframe);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void StopAndDisposeTimer()
|
||||||
|
{
|
||||||
|
if (_timer != null)
|
||||||
|
{
|
||||||
|
_timer?.Dispose();
|
||||||
|
_timer = null;
|
||||||
|
_logger.LogInformation("{0} timer stopped and disposed for timeframe {1}", nameof(PriceFetcherGrain),
|
||||||
|
TargetTimeframe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Task ReceiveReminder(string reminderName, TickStatus status)
|
||||||
|
{
|
||||||
|
if (reminderName == FetchPricesReminderName)
|
||||||
|
{
|
||||||
|
// Only enable timer if not existing anymore
|
||||||
|
if (_timer == null)
|
||||||
|
{
|
||||||
|
RegisterAndStartTimer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Task.CompletedTask;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user